[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [ooni-probe/master] Create a set of scheduled tasks to be run by the agent in background
commit 546d1e3b2d9b1cbf0bebe2b032acc0d8b87e13e4
Author: Arturo Filastò <arturo@xxxxxxxxxxx>
Date: Tue Jul 26 21:18:04 2016 +0200
Create a set of scheduled tasks to be run by the agent in background
* Remove unneeded filename hash and sprinkle notes on it's future deprecation
* Fix some bugs in the resources update procedure
---
ooni/agent/scheduler.py | 144 ++++++++++++++++++++-
ooni/contrib/__init__.py | 2 +-
ooni/contrib/croniter.py | 11 ++
ooni/deck.py | 38 +++---
ooni/director.py | 35 +++--
ooni/geoip.py | 12 +-
ooni/nettest.py | 25 +---
.../manipulation/http_invalid_request_line.py | 14 +-
ooni/resources.py | 71 ++++++----
ooni/settings.py | 2 +
ooni/ui/cli.py | 3 +-
ooni/ui/web/server.py | 34 +----
12 files changed, 271 insertions(+), 120 deletions(-)
diff --git a/ooni/agent/scheduler.py b/ooni/agent/scheduler.py
index 1004597..ace3bc5 100644
--- a/ooni/agent/scheduler.py
+++ b/ooni/agent/scheduler.py
@@ -1,5 +1,126 @@
+from datetime import datetime
+
from twisted.application import service
-from twisted.internet import task
+from twisted.internet import task, defer
+from twisted.python.filepath import FilePath
+
+from ooni import resources
+from ooni.utils import log
+from ooni.deck import input_store
+from ooni.settings import config
+from ooni.contrib import croniter
+
+class ScheduledTask(object):
+ _time_format = "%Y-%m-%dT%H:%M:%SZ"
+ schedule = None
+ identifier = None
+
+ def __init__(self, schedule=None):
+ if schedule is not None:
+ self.schedule = schedule
+
+ assert self.identifier is not None, "self.identifier must be set"
+ assert self.schedule is not None, "self.schedule must be set"
+ scheduler_directory = config.scheduler_directory
+
+ self._last_run = FilePath(scheduler_directory).child(self.identifier)
+ self._last_run_lock = defer.DeferredFilesystemLock(
+ FilePath(scheduler_directory).child(self.identifier + ".lock").path
+ )
+
+ @property
+ def should_run(self):
+ current_time = datetime.utcnow()
+ next_cycle = croniter(self.schedule, self.last_run).get_next(datetime)
+ if next_cycle <= current_time:
+ return True
+ return False
+
+ @property
+ def last_run(self):
+ if not self._last_run.exists():
+ return datetime.fromtimestamp(0)
+ with self._last_run.open('r') as in_file:
+ date_str = in_file.read()
+ return datetime.strptime(date_str, self._time_format)
+
+ def _update_last_run(self):
+ with self._last_run.open('w') as out_file:
+ current_time = datetime.utcnow()
+ out_file.write(current_time.strftime(self._time_format))
+
+ def task(self):
+ raise NotImplemented
+
+ @defer.inlineCallbacks
+ def run(self):
+ yield self._last_run_lock.deferUntilLocked()
+ if not self.should_run:
+ self._last_run_lock.unlock()
+ defer.returnValue(None)
+ try:
+ yield self.task()
+ self._update_last_run()
+ except:
+ raise
+ finally:
+ self._last_run_lock.unlock()
+
+class UpdateInputsAndResources(ScheduledTask):
+ identifier = "ooni-update-inputs"
+ schedule = "@daily"
+
+ @defer.inlineCallbacks
+ def task(self):
+ log.debug("Updating the inputs")
+ yield resources.check_for_update(config.probe_ip.geodata['countrycode'])
+ yield input_store.update(config.probe_ip.geodata['countrycode'])
+
+class UpdateProbeIp(ScheduledTask):
+ identifier = "ooni-update-probe-ip"
+ schedule = "@hourly"
+ # XXX we need to ensure this is always run the first time ooniprobe or
+ # ooniprobe-agent is started or implement on disk caching of the users
+ # IP address.
+
+ def task(self):
+ log.debug("Updating the probe IP")
+ return config.probe_ip.lookup()
+
+class CleanupInProgressReports(ScheduledTask):
+ identifier = 'ooni-cleanup-reports'
+ schedule = '@daily'
+
+class UploadMissingReports(ScheduledTask):
+ identifier = 'ooni-cleanup-reports'
+ schedule = '@weekly'
+
+# Order mattters
+SYSTEM_TASKS = [
+ UpdateProbeIp,
+ UpdateInputsAndResources
+]
+
+@defer.inlineCallbacks
+def run_system_tasks(no_geoip=False, no_input_store=False):
+ task_classes = SYSTEM_TASKS[:]
+
+ if no_geoip:
+ log.debug("Not updating probe IP")
+ task_classes.pop(UpdateProbeIp)
+
+ if no_input_store:
+ log.debug("Not updating the inputs")
+ task_classes.pop(UpdateInputsAndResources)
+
+ for task_class in task_classes:
+ task = task_class()
+ log.debug("Running task {0}".format(task.identifier))
+ try:
+ yield task.run()
+ except Exception as exc:
+ log.err("Failed to run task {0}".format(task.identifier))
+ log.exception(exc)
class SchedulerService(service.MultiService):
"""
@@ -10,16 +131,35 @@ class SchedulerService(service.MultiService):
self.director = director
self.interval = interval
self._looping_call = task.LoopingCall(self._should_run)
+ self._scheduled_tasks = []
+
+ def schedule(self, task):
+ self._scheduled_tasks.append(task)
+
+ def _task_failed(self, failure, task):
+ log.msg("Failed to run {0}".format(task.identifier))
+ log.exception(failure)
+
+ def _task_success(self, result, task):
+ log.msg("Ran {0}".format(task.identifier))
def _should_run(self):
"""
This function is called every self.interval seconds to check
which periodic tasks should be run.
"""
- pass
+ for task in self._scheduled_tasks:
+ log.debug("Running task {0}".format(task.identifier))
+ d = task.run()
+ d.addErrback(self._task_failed, task)
+ d.addCallback(self._task_success, task)
def startService(self):
service.MultiService.startService(self)
+
+ self.schedule(UpdateProbeIp())
+ self.schedule(UpdateInputsAndResources())
+
self._looping_call.start(self.interval)
def stopService(self):
diff --git a/ooni/contrib/__init__.py b/ooni/contrib/__init__.py
index 50b6b54..28aad30 100644
--- a/ooni/contrib/__init__.py
+++ b/ooni/contrib/__init__.py
@@ -1 +1 @@
-from ._crontab import CronTab
+from .croniter import croniter
diff --git a/ooni/contrib/croniter.py b/ooni/contrib/croniter.py
index 5864603..653dbbf 100644
--- a/ooni/contrib/croniter.py
+++ b/ooni/contrib/croniter.py
@@ -49,6 +49,15 @@ class croniter(object):
{},
)
+ ALIASES = {
+ '@yearly': '0 0 1 1 *',
+ '@annually': '0 0 1 1 *',
+ '@monthly': '0 0 1 * *',
+ '@weekly': '0 0 * * 0',
+ '@daily': '0 0 * * *',
+ '@hourly': '0 * * * *',
+ }
+
bad_length = 'Exactly 5 or 6 columns has to be specified for iterator' \
'expression.'
@@ -63,6 +72,8 @@ class croniter(object):
start_time = self._datetime_to_timestamp(start_time)
self.cur = start_time
+ if expr_format in self.ALIASES:
+ expr_format = self.ALIASES[expr_format]
self.exprs = expr_format.split()
if len(self.exprs) != 5 and len(self.exprs) != 6:
diff --git a/ooni/deck.py b/ooni/deck.py
index 4794d30..0434d81 100644
--- a/ooni/deck.py
+++ b/ooni/deck.py
@@ -4,7 +4,6 @@ import csv
import json
from copy import deepcopy
-from hashlib import sha256
import yaml
@@ -236,7 +235,8 @@ def lookup_collector_and_test_helpers(net_test_loaders,
'name': net_test_loader.testName,
'version': net_test_loader.testVersion,
'test-helpers': [],
- 'input-hashes': [x['hash'] for x in net_test_loader.inputFiles]
+ # XXX deprecate this very soon
+ 'input-hashes': []
}
if not net_test_loader.collector and not no_collector:
requires_collector = True
@@ -262,15 +262,16 @@ def lookup_collector_and_test_helpers(net_test_loaders,
log.err("Could not find any reachable test helpers")
raise
- def find_collector_and_test_helpers(test_name, test_version, input_files):
- input_files = [u""+x['hash'] for x in input_files]
+ def find_collector_and_test_helpers(test_name, test_version):
+ # input_files = [u""+x['hash'] for x in input_files]
for net_test in provided_net_tests:
if net_test['name'] != test_name:
continue
if net_test['version'] != test_version:
continue
- if set(net_test['input-hashes']) != set(input_files):
- continue
+ # XXX remove the notion of policies based on input file hashes
+ # if set(net_test['input-hashes']) != set(input_files):
+ # continue
return net_test['collector'], net_test['test-helpers']
for net_test_loader in net_test_loaders:
@@ -280,8 +281,8 @@ def lookup_collector_and_test_helpers(net_test_loaders,
collector, test_helpers = \
find_collector_and_test_helpers(
test_name=net_test_loader.testName,
- test_version=net_test_loader.testVersion,
- input_files=net_test_loader.inputFiles
+ test_version=net_test_loader.testVersion
+ # input_files=net_test_loader.inputFiles
)
for option, name in net_test_loader.missingTestHelpers:
@@ -455,6 +456,7 @@ class InputStore(object):
"id": "citizenlab_{0}_urls".format(cc),
"type": "file/url"
}, out_fh)
+ self._cache_stale = True
@defer.inlineCallbacks
def create(self, country_code=None):
@@ -523,13 +525,11 @@ def resolve_file_path(v, prepath=None):
return FilePath(prepath).preauthChild(v).path
return v
-def options_to_args(options, prepath=None):
+def options_to_args(options):
args = []
for k, v in options.items():
if v is None:
continue
- if k == "file":
- v = resolve_file_path(v, prepath)
if v == False or v == 0:
continue
if (len(k)) == 1:
@@ -625,7 +625,7 @@ class DeckTask(object):
collector_address = None
net_test_loader = NetTestLoader(
- options_to_args(task_data, self.cwd),
+ options_to_args(task_data),
annotations=annotations,
test_file=nettest_path
)
@@ -653,6 +653,9 @@ class DeckTask(object):
self.ooni['net_test_loader'] = net_test_loader
def _setup_ooni(self):
+ for input_file in self.ooni['net_test_loader'].inputFiles:
+ file_path = resolve_file_path(input_file['filename'], self.cwd)
+ input_file['test_options'][input_file['key']] = file_path
self.ooni['test_details'] = self.ooni['net_test_loader'].getTestDetails()
self.id = generate_filename(self.ooni['test_details'])
@@ -670,15 +673,8 @@ class DeckTask(object):
if task_type not in self._supported_tasks:
raise UnknownTaskKey(task_type)
self.type = task_type
- try:
- getattr(self, "_load_"+task_type)(task_data)
- except InputNotFound:
- log.debug(
- "Will skip running this test because I can't find the input"
- )
- self._skip = True
-
- assert len(data) == 0
+ getattr(self, "_load_"+task_type)(task_data)
+ assert len(data) == 0, "Got an unidentified key"
class NotAnOption(Exception):
pass
diff --git a/ooni/director.py b/ooni/director.py
index 84bc9aa..1ab076b 100644
--- a/ooni/director.py
+++ b/ooni/director.py
@@ -13,6 +13,7 @@ from ooni.settings import config
from ooni.nettest import normalizeTestName
from ooni.deck import InputStore
+from ooni.agent.scheduler import run_system_tasks
from ooni.utils.onion import start_tor, connect_to_control_port
class DirectorEvent(object):
@@ -139,12 +140,15 @@ class Director(object):
self._tor_starting.addCallback(self._tor_startup_success)
def _tor_startup_failure(self, failure):
+ log.msg("Failed to start tor")
+ log.exception(failure)
self._reset_tor_state()
self.notify(DirectorEvent("error",
"Failed to start Tor"))
return failure
def _tor_startup_success(self, result):
+ log.msg("Tor has started")
self._tor_state = 'running'
self.notify(DirectorEvent("success",
"Successfully started Tor"))
@@ -187,22 +191,21 @@ class Director(object):
if start_tor:
yield self.start_tor(check_incoherences)
- if config.global_options.get('no-geoip'):
+ no_geoip = config.global_options.get('no-geoip', False)
+ if no_geoip:
aux = [False]
if config.global_options.get('annotations') is not None:
annotations = [k.lower() for k in config.global_options['annotations'].keys()]
aux = map(lambda x: x in annotations, ["city", "country", "asn"])
if not all(aux):
log.msg("You should add annotations for the country, city and ASN")
- else:
- yield config.probe_ip.lookup()
- self.notify(DirectorEvent("success",
- "Looked up Probe IP"))
- if create_input_store:
- yield self.input_store.create(config.probe_ip.geodata["countrycode"])
- self.notify(DirectorEvent("success",
- "Created input store"))
+ self.notify(DirectorEvent("success",
+ "Running system tasks"))
+ yield run_system_tasks(no_geoip=no_geoip,
+ no_input_store=not create_input_store)
+ self.notify(DirectorEvent("success",
+ "Ran system tasks"))
@defer.inlineCallbacks
def start(self, start_tor=False, check_incoherences=True,
@@ -284,7 +287,8 @@ class Director(object):
def netTestDone(self, net_test):
self.notify(DirectorEvent("success",
- "Successfully ran net_test"))
+ "Successfully ran test {0}".format(
+ net_test.testDetails['test_name'])))
self.activeNetTests.remove(net_test)
if len(self.activeNetTests) == 0:
self.allTestsDone.callback(None)
@@ -371,13 +375,18 @@ class Director(object):
log.debug("Tor is already running")
defer.returnValue(self._tor_state)
elif self._tor_state == 'starting':
+ log.debug("Tor is starting")
yield self._tor_starting
defer.returnValue(self._tor_state)
log.msg("Starting Tor...")
self._tor_state = 'starting'
if check_incoherences:
- yield config.check_tor()
+ try:
+ yield config.check_tor()
+ except Exception as exc:
+ self._tor_starting.errback(Failure(exc))
+ raise exc
if config.advanced.start_tor and config.tor_state is None:
tor_config = TorConfig()
@@ -438,3 +447,7 @@ class Director(object):
self._tor_starting.callback(self._tor_state)
except Exception as exc:
self._tor_starting.errback(Failure(exc))
+ else:
+ # This happens when we require tor to not be started and the
+ # socks port is set.
+ self._tor_starting.callback(self._tor_state)
diff --git a/ooni/geoip.py b/ooni/geoip.py
index f118268..2a7ec92 100644
--- a/ooni/geoip.py
+++ b/ooni/geoip.py
@@ -31,8 +31,12 @@ class GeoIPDataFilesNotFound(Exception):
def IPToLocation(ipaddr):
from ooni.settings import config
- country_file = config.get_data_file_path('GeoIP/GeoIP.dat')
- asn_file = config.get_data_file_path('GeoIP/GeoIPASNum.dat')
+ country_file = config.get_data_file_path(
+ 'resources/maxmind-geoip/GeoIP.dat'
+ )
+ asn_file = config.get_data_file_path(
+ 'resources/maxmind-geoip/GeoIPASNum.dat'
+ )
location = {'city': None, 'countrycode': 'ZZ', 'asn': 'AS0'}
if not asn_file or not country_file:
@@ -69,7 +73,9 @@ def database_version():
}
for key in version.keys():
- geoip_file = config.get_data_file_path("GeoIP/" + key + ".dat")
+ geoip_file = config.get_data_file_path(
+ "resources/maxmind-geoip/" + key + ".dat"
+ )
if not geoip_file or not os.path.isfile(geoip_file):
continue
timestamp = os.stat(geoip_file).st_mtime
diff --git a/ooni/nettest.py b/ooni/nettest.py
index 2a33a2f..88e4953 100644
--- a/ooni/nettest.py
+++ b/ooni/nettest.py
@@ -2,7 +2,6 @@ import os
import re
import time
import sys
-from hashlib import sha256
from twisted.internet import defer
from twisted.trial.runner import filenameToModule
@@ -199,6 +198,7 @@ class NetTestLoader(object):
'probe_city': config.probe_ip.geodata['city'],
'software_name': 'ooniprobe',
'software_version': ooniprobe_version,
+ # XXX only sanitize the input files
'options': sanitize_options(self.options),
'annotations': self.annotations,
'data_format_version': '0.2.0',
@@ -206,8 +206,8 @@ class NetTestLoader(object):
'test_version': self.testVersion,
'test_helpers': self.testHelpers,
'test_start_time': otime.timestampNowLongUTC(),
- 'input_hashes': [input_file['hash']
- for input_file in self.inputFiles],
+ # XXX We should deprecate this key very soon
+ 'input_hashes': [],
'report_id': self.reportId
}
@@ -235,29 +235,14 @@ class NetTestLoader(object):
input_file = {
'key': key,
'test_options': self.localOptions,
- 'hash': None,
-
- 'url': None,
- 'address': None,
-
'filename': None
}
m = ONION_INPUT_REGEXP.match(filename)
if m:
- input_file['url'] = filename
- input_file['address'] = m.group(1)
- input_file['hash'] = m.group(2)
+ raise e.InvalidInputFile("Input files hosted on hidden services "
+ "are not longer supported")
else:
input_file['filename'] = filename
- try:
- with open(filename) as f:
- h = sha256()
- for l in f:
- h.update(l)
- except Exception as exc:
- log.exception(exc)
- raise e.InvalidInputFile(filename)
- input_file['hash'] = h.hexdigest()
self.inputFiles.append(input_file)
def _accumulateTestOptions(self, test_class):
diff --git a/ooni/nettests/manipulation/http_invalid_request_line.py b/ooni/nettests/manipulation/http_invalid_request_line.py
index 94b0b99..be0497c 100644
--- a/ooni/nettests/manipulation/http_invalid_request_line.py
+++ b/ooni/nettests/manipulation/http_invalid_request_line.py
@@ -42,14 +42,14 @@ class HTTPInvalidRequestLine(tcpt.TCPTest):
self.address = self.localOptions['backend']
self.report['tampering'] = None
- def check_for_manipulation(self, response, payload):
+ def check_for_manipulation(self, response, payload, manipulation_type):
log.debug("Checking if %s == %s" % (response, payload))
if response != payload:
- log.msg("Detected manipulation!")
+ log.msg("{0}: Detected manipulation!".format(manipulation_type))
log.msg(response)
self.report['tampering'] = True
else:
- log.msg("No manipulation detected.")
+ log.msg("{0}: No manipulation detected.".format(manipulation_type))
self.report['tampering'] = False
def test_random_invalid_method(self):
@@ -75,7 +75,7 @@ class HTTPInvalidRequestLine(tcpt.TCPTest):
payload = randomSTR(4) + " / HTTP/1.1\n\r"
d = self.sendPayload(payload)
- d.addCallback(self.check_for_manipulation, payload)
+ d.addCallback(self.check_for_manipulation, payload, 'random_invalid_method')
return d
def test_random_invalid_field_count(self):
@@ -91,7 +91,7 @@ class HTTPInvalidRequestLine(tcpt.TCPTest):
payload += "\n\r"
d = self.sendPayload(payload)
- d.addCallback(self.check_for_manipulation, payload)
+ d.addCallback(self.check_for_manipulation, payload, 'random_invalid_field_count')
return d
def test_random_big_request_method(self):
@@ -103,7 +103,7 @@ class HTTPInvalidRequestLine(tcpt.TCPTest):
payload = randomStr(1024) + ' / HTTP/1.1\n\r'
d = self.sendPayload(payload)
- d.addCallback(self.check_for_manipulation, payload)
+ d.addCallback(self.check_for_manipulation, payload, 'random_big_request_method')
return d
def test_random_invalid_version_number(self):
@@ -116,5 +116,5 @@ class HTTPInvalidRequestLine(tcpt.TCPTest):
payload += '\n\r'
d = self.sendPayload(payload)
- d.addCallback(self.check_for_manipulation, payload)
+ d.addCallback(self.check_for_manipulation, payload, 'random_invalid_version_number')
return d
diff --git a/ooni/resources.py b/ooni/resources.py
index d49e679..d67908c 100644
--- a/ooni/resources.py
+++ b/ooni/resources.py
@@ -1,5 +1,9 @@
+import os
+import gzip
import json
+import shutil
+from twisted.python.runtime import platform
from twisted.python.filepath import FilePath
from twisted.internet import defer
from twisted.web.client import downloadPage, getPage
@@ -66,11 +70,27 @@ def get_out_of_date_resources(current_manifest, new_manifest,
# the manifest claims we have a more up to date version.
# This happens if an update by country_code happened and a new
# country code is now required.
+ if filename.endswith(".gz"):
+ filename = filename[:-3]
if not _resources.child(pre_path).child(filename).exists():
paths_to_update.append(info)
return paths_to_update, paths_to_delete
+def gunzip(file_path):
+ tmp_location = FilePath(file_path).temporarySibling()
+ in_file = gzip.open(file_path)
+ with tmp_location.open('w') as out_file:
+ shutil.copyfileobj(in_file, out_file)
+ in_file.close()
+ rename(tmp_location.path, file_path)
+
+def rename(src, dst):
+ # Best effort atomic renaming
+ if platform.isWindows() and os.path.exists(dst):
+ os.unlink(dst)
+ os.rename(src, dst)
+
@defer.inlineCallbacks
def check_for_update(country_code=None):
"""
@@ -88,44 +108,48 @@ def check_for_update(country_code=None):
current_version = get_current_version()
latest_version = yield get_latest_version()
- # We are already at the latest version
- if current_version == latest_version:
- defer.returnValue(latest_version)
-
resources_dir = FilePath(config.resources_directory)
resources_dir.makedirs(ignoreExistingDirectory=True)
current_manifest = resources_dir.child("manifest.json")
- new_manifest = current_manifest.temporarySibling()
- new_manifest.alwaysCreate = 0
-
- temporary_files.append((current_manifest, new_manifest))
-
- try:
- yield downloadPage(
- get_download_url(latest_version, "manifest.json"),
- new_manifest.path
- )
- except:
- cleanup()
- raise UpdateFailure("Failed to download manifest")
-
- new_manifest_data = json.loads(new_manifest.getContent())
-
if current_manifest.exists():
with current_manifest.open("r") as f:
- current_manifest_data = json.loads(f)
+ current_manifest_data = json.load(f)
else:
current_manifest_data = {
"resources": []
}
+ # We should download a newer manifest
+ if current_version < latest_version:
+ new_manifest = current_manifest.temporarySibling()
+ new_manifest.alwaysCreate = 0
+
+ temporary_files.append((current_manifest, new_manifest))
+
+ try:
+ yield downloadPage(
+ get_download_url(latest_version, "manifest.json"),
+ new_manifest.path
+ )
+ except:
+ cleanup()
+ raise UpdateFailure("Failed to download manifest")
+
+ new_manifest_data = json.loads(new_manifest.getContent())
+ else:
+ new_manifest_data = current_manifest_data
+
to_update, to_delete = get_out_of_date_resources(
current_manifest_data, new_manifest_data, country_code)
try:
for resource in to_update:
+ gzipped = False
pre_path, filename = resource["path"].split("/")
+ if filename.endswith(".gz"):
+ filename = filename[:-3]
+ gzipped = True
dst_file = resources_dir.child(pre_path).child(filename)
dst_file.parent().makedirs(ignoreExistingDirectory=True)
src_file = dst_file.temporarySibling()
@@ -135,8 +159,9 @@ def check_for_update(country_code=None):
# The paths for the download require replacing "/" with "."
download_url = get_download_url(latest_version,
resource["path"].replace("/", "."))
- print("Downloading {0}".format(download_url))
yield downloadPage(download_url, src_file.path)
+ if gzipped:
+ gunzip(src_file.path)
except Exception as exc:
cleanup()
log.exception(exc)
@@ -145,7 +170,7 @@ def check_for_update(country_code=None):
for dst_file, src_file in temporary_files:
log.msg("Moving {0} to {1}".format(src_file.path,
dst_file.path))
- src_file.moveTo(dst_file)
+ rename(src_file.path, dst_file.path)
for resource in to_delete:
log.msg("Deleting old resources")
diff --git a/ooni/settings.py b/ooni/settings.py
index b73e2f2..632dbe4 100644
--- a/ooni/settings.py
+++ b/ooni/settings.py
@@ -101,6 +101,8 @@ class OConfig(object):
else:
self.inputs_directory = os.path.join(self.ooni_home, 'inputs')
+ self.scheduler_directory = os.path.join(self.ooni_home, 'scheduler')
+
if self.advanced.decks_dir:
self.decks_directory = self.advanced.decks_dir
else:
diff --git a/ooni/ui/cli.py b/ooni/ui/cli.py
index 57924ec..e8d747c 100644
--- a/ooni/ui/cli.py
+++ b/ooni/ui/cli.py
@@ -305,7 +305,8 @@ def createDeck(global_options, url=None):
return deck
-def runTestWithDirector(director, global_options, url=None, start_tor=True,
+def runTestWithDirector(director, global_options, url=None,
+ start_tor=True,
create_input_store=True):
deck = createDeck(global_options, url=url)
diff --git a/ooni/ui/web/server.py b/ooni/ui/web/server.py
index f9886ac..0a3d1ca 100644
--- a/ooni/ui/web/server.py
+++ b/ooni/ui/web/server.py
@@ -84,10 +84,7 @@ class WebUIAPI(object):
"software_name": "ooniprobe",
"asn": config.probe_ip.geodata['asn'],
"country_code": config.probe_ip.geodata['countrycode'],
- "active_measurements": {},
- "completed_measurements": [],
- "director_started": False,
- "failures": []
+ "director_started": False
}
self.status_poller = LongPoller(
@@ -103,33 +100,17 @@ class WebUIAPI(object):
d = self.director.start()
d.addCallback(self.director_started)
- d.addErrback(self.director_startup_failed)
d.addBoth(lambda _: self.status_poller.notify())
def handle_director_event(self, event):
log.msg("Handling event {0}".format(event.type))
self.director_event_poller.notify(event)
- def add_failure(self, failure):
- self.status['failures'].append(str(failure))
-
def director_started(self, _):
self.status['director_started'] = True
self.status["asn"] = config.probe_ip.geodata['asn']
self.status["country_code"] = config.probe_ip.geodata['countrycode']
- def director_startup_failed(self, failure):
- self.add_failure(failure)
-
- def completed_measurement(self, measurement_id):
- del self.status['active_measurements'][measurement_id]
- self.status['completed_measurements'].append(measurement_id)
-
- def failed_measurement(self, measurement_id, failure):
- log.exception(failure)
- del self.status['active_measurements'][measurement_id]
- self.add_failure(str(failure))
-
@app.handle_errors(NotFound)
def not_found(self, request, _):
request.redirect('/client/')
@@ -188,18 +169,9 @@ class WebUIAPI(object):
return self.render_json({"command": "deck-list"}, request)
def run_deck(self, deck):
- for task_id in deck.task_ids:
- self.status['active_measurements'][task_id] = {
- 'test_name': 'foobar',
- 'test_start_time': 'some start time'
- }
- self.status_poller.notify()
deck.setup()
- d = deck.run(self.director)
- d.addCallback(lambda _:
- self.completed_measurement(task_id))
- d.addErrback(lambda failure:
- self.failed_measurement(task_id, failure))
+ # Here there is a dangling deferred
+ deck.run(self.director)
@app.route('/api/nettest/<string:test_name>/start', methods=["POST"])
def api_nettest_start(self, request, test_name):
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits