[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [ooni-probe/master] Add minimal outline of the ooniprobe-agent and new deck format
commit 7829363f1066a469995c0410025d7e895522363f
Author: Arturo Filastò <arturo@xxxxxxxxxxx>
Date: Mon Jul 25 16:08:05 2016 +0200
Add minimal outline of the ooniprobe-agent and new deck format
* Use DuckDuckGo to perform geoip lookups instead of torproject.org
* Big refactoring of the Director
---
ooni/agent/__init__.py | 0
ooni/agent/agent.py | 21 ++
ooni/agent/scheduler.py | 27 ++
ooni/deck.py | 715 +++++++++++++++++++++++++++++------------
ooni/director.py | 231 ++++++++-----
ooni/geoip.py | 8 +-
ooni/measurements.py | 43 ---
ooni/nettest.py | 3 +-
ooni/results.py | 39 +++
ooni/tests/bases.py | 2 +-
ooni/tests/test_deck.py | 57 +++-
ooni/tests/test_director.py | 12 +-
ooni/tests/test_nettest.py | 8 +-
ooni/tests/test_oonideckgen.py | 7 +-
ooni/tests/test_oonireport.py | 18 +-
ooni/ui/cli.py | 8 +-
ooni/ui/web/client/index.html | 2 +-
ooni/ui/web/server.py | 203 ++++++------
ooni/ui/web/web.py | 56 +---
ooni/utils/onion.py | 14 -
20 files changed, 974 insertions(+), 500 deletions(-)
diff --git a/ooni/agent/__init__.py b/ooni/agent/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/ooni/agent/agent.py b/ooni/agent/agent.py
new file mode 100644
index 0000000..a1394f0
--- /dev/null
+++ b/ooni/agent/agent.py
@@ -0,0 +1,21 @@
+from twisted.application import service
+from ooni.director import Director
+from ooni.settings import config
+
+from ooni.ui.web.web import WebUIService
+from ooni.agent.scheduler import SchedulerService
+
+class AgentService(service.MultiService):
+ def __init__(self):
+ service.MultiService.__init__(self)
+
+ director = Director()
+ config.set_paths()
+ config.initialize_ooni_home()
+ config.read_config_file()
+
+ self.web_ui_service = WebUIService(director)
+ self.web_ui_service.setServiceParent(self)
+
+ self.scheduler_service = SchedulerService(director)
+ self.scheduler_service.setServiceParent(self)
diff --git a/ooni/agent/scheduler.py b/ooni/agent/scheduler.py
new file mode 100644
index 0000000..1004597
--- /dev/null
+++ b/ooni/agent/scheduler.py
@@ -0,0 +1,27 @@
+from twisted.application import service
+from twisted.internet import task
+
+class SchedulerService(service.MultiService):
+ """
+ This service is responsible for running the periodic tasks.
+ """
+ def __init__(self, director, interval=30):
+ service.MultiService.__init__(self)
+ self.director = director
+ self.interval = interval
+ self._looping_call = task.LoopingCall(self._should_run)
+
+ def _should_run(self):
+ """
+ This function is called every self.interval seconds to check
+ which periodic tasks should be run.
+ """
+ pass
+
+ def startService(self):
+ service.MultiService.startService(self)
+ self._looping_call.start(self.interval)
+
+ def stopService(self):
+ service.MultiService.stopService(self)
+ self._looping_call.stop()
diff --git a/ooni/deck.py b/ooni/deck.py
index 9f17530..1746d26 100644
--- a/ooni/deck.py
+++ b/ooni/deck.py
@@ -1,27 +1,28 @@
# -*- coding: utf-8 -*-
-import csv
import os
-import yaml
+import csv
import json
+from copy import deepcopy
from hashlib import sha256
-from datetime import datetime
-from ooni.backend_client import CollectorClient, BouncerClient
-from ooni.backend_client import WebConnectivityClient, guess_backend_type
-from ooni.nettest import NetTestLoader
-from ooni.settings import config
-from ooni.otime import timestampNowISO8601UTC
+import yaml
-from ooni.resources.update import check_for_update
+from twisted.internet import defer
+from twisted.python.filepath import FilePath
-from ooni.utils import log
from ooni import constants
from ooni import errors as e
+from ooni.backend_client import CollectorClient, BouncerClient
+from ooni.backend_client import WebConnectivityClient, guess_backend_type
+from ooni.nettest import NetTestLoader
+from ooni.otime import timestampNowISO8601UTC
+from ooni.resources import check_for_update
+from ooni.settings import config
+from ooni.utils import generate_filename
+from ooni.utils import log
-from twisted.python.filepath import FilePath
-from twisted.internet import defer
-
+from ooni.results import generate_summary
class InputFile(object):
def __init__(self, input_hash, base_path=config.inputs_directory):
@@ -116,6 +117,25 @@ def nettest_to_path(path, allow_arbitrary_paths=False):
return found_path
+def get_preferred_bouncer():
+ preferred_backend = config.advanced.get(
+ "preferred_backend", "onion"
+ )
+ bouncer_address = getattr(
+ constants, "CANONICAL_BOUNCER_{0}".format(
+ preferred_backend.upper()
+ )
+ )
+ if preferred_backend == "cloudfront":
+ return BouncerClient(
+ settings={
+ 'address': bouncer_address[0],
+ 'front': bouncer_address[1],
+ 'type': 'cloudfront'
+ })
+ else:
+ return BouncerClient(bouncer_address)
+
class Deck(InputFile):
# this exists so we can mock it out in unittests
_BouncerClient = BouncerClient
@@ -227,175 +247,10 @@ class Deck(InputFile):
if self.bouncer:
log.msg("Looking up collector and test helpers with {0}".format(
self.bouncer.base_address))
- yield self.lookupCollectorAndTestHelpers()
-
-
- def sortAddressesByPriority(self, priority_address, alternate_addresses):
- prioritised_addresses = []
-
- backend_type = guess_backend_type(priority_address)
- priority_address = {
- 'address': priority_address,
- 'type': backend_type
- }
- address_priority = ['onion', 'https', 'cloudfront', 'http']
- address_priority.remove(self.preferred_backend)
- address_priority.insert(0, self.preferred_backend)
-
- def filter_by_type(collectors, collector_type):
- return filter(lambda x: x['type'] == collector_type, collectors)
-
- if (priority_address['type'] != self.preferred_backend):
- valid_alternatives = filter_by_type(alternate_addresses,
- self.preferred_backend)
- if len(valid_alternatives) > 0:
- alternate_addresses += [priority_address]
- priority_address = valid_alternatives[0]
- alternate_addresses.remove(priority_address)
-
- prioritised_addresses += [priority_address]
- for address_type in address_priority:
- prioritised_addresses += filter_by_type(alternate_addresses,
- address_type)
-
- return prioritised_addresses
-
- @defer.inlineCallbacks
- def getReachableCollector(self, collector_address, collector_alternate):
- # We prefer onion collector to https collector to cloudfront
- # collectors to plaintext collectors
- for collector_settings in self.sortAddressesByPriority(collector_address,
- collector_alternate):
- collector = self._CollectorClient(settings=collector_settings)
- if not collector.isSupported():
- log.err("Unsupported %s collector %s" % (
- collector_settings['type'],
- collector_settings['address']))
- continue
- reachable = yield collector.isReachable()
- if not reachable:
- log.err("Unreachable %s collector %s" % (
- collector_settings['type'],
- collector_settings['address']))
- continue
- defer.returnValue(collector)
-
- raise e.NoReachableCollectors
-
- @defer.inlineCallbacks
- def getReachableTestHelper(self, test_helper_name, test_helper_address,
- test_helper_alternate):
- # For the moment we look for alternate addresses only of
- # web_connectivity test helpers.
- if test_helper_name == 'web-connectivity':
- for web_connectivity_settings in self.sortAddressesByPriority(
- test_helper_address, test_helper_alternate):
- web_connectivity_test_helper = WebConnectivityClient(
- settings=web_connectivity_settings)
- if not web_connectivity_test_helper.isSupported():
- log.err("Unsupported %s web_connectivity test_helper "
- "%s" % (
- web_connectivity_settings['type'],
- web_connectivity_settings['address']
- ))
- continue
- reachable = yield web_connectivity_test_helper.isReachable()
- if not reachable:
- log.err("Unreachable %s web_connectivity test helper %s" % (
- web_connectivity_settings['type'],
- web_connectivity_settings['address']
- ))
- continue
- defer.returnValue(web_connectivity_settings)
- raise e.NoReachableTestHelpers
- else:
- defer.returnValue(test_helper_address.encode('ascii'))
-
- @defer.inlineCallbacks
- def getReachableTestHelpersAndCollectors(self, net_tests):
- for net_test in net_tests:
-
- primary_address = net_test['collector']
- alternate_addresses = net_test.get('collector-alternate', [])
- net_test['collector'] = yield self.getReachableCollector(
- primary_address,
- alternate_addresses
- )
-
- for test_helper_name, test_helper_address in net_test['test-helpers'].items():
- test_helper_alternate = \
- net_test.get('test-helpers-alternate', {}).get(test_helper_name, [])
- net_test['test-helpers'][test_helper_name] = \
- yield self.getReachableTestHelper(
- test_helper_name,
- test_helper_address,
- test_helper_alternate)
-
- defer.returnValue(net_tests)
-
- @defer.inlineCallbacks
- def lookupCollectorAndTestHelpers(self):
- required_nettests = []
-
- requires_test_helpers = False
- requires_collector = False
- for net_test_loader in self.netTestLoaders:
- nettest = {
- 'name': net_test_loader.testName,
- 'version': net_test_loader.testVersion,
- 'test-helpers': [],
- 'input-hashes': [x['hash'] for x in net_test_loader.inputFiles]
- }
- if not net_test_loader.collector and not self.no_collector:
- requires_collector = True
-
- if len(net_test_loader.missingTestHelpers) > 0:
- requires_test_helpers = True
- nettest['test-helpers'] += map(lambda x: x[1],
- net_test_loader.missingTestHelpers)
-
- required_nettests.append(nettest)
-
- if not requires_test_helpers and not requires_collector:
- defer.returnValue(None)
-
- response = yield self.bouncer.lookupTestCollector(required_nettests)
- try:
- provided_net_tests = yield self.getReachableTestHelpersAndCollectors(response['net-tests'])
- except e.NoReachableCollectors:
- log.err("Could not find any reachable collector")
- raise
- except e.NoReachableTestHelpers:
- log.err("Could not find any reachable test helpers")
- raise
-
- def find_collector_and_test_helpers(test_name, test_version, input_files):
- input_files = [u""+x['hash'] for x in input_files]
- for net_test in provided_net_tests:
- if net_test['name'] != test_name:
- continue
- if net_test['version'] != test_version:
- continue
- if set(net_test['input-hashes']) != set(input_files):
- continue
- return net_test['collector'], net_test['test-helpers']
-
- for net_test_loader in self.netTestLoaders:
- log.msg("Setting collector and test helpers for %s" %
- net_test_loader.testName)
-
- collector, test_helpers = \
- find_collector_and_test_helpers(test_name=net_test_loader.testName,
- test_version=net_test_loader.testVersion,
- input_files=net_test_loader.inputFiles)
-
- for option, name in net_test_loader.missingTestHelpers:
- test_helper_address_or_settings = test_helpers[name]
- net_test_loader.localOptions[option] = test_helper_address_or_settings
- net_test_loader.testHelpers[option] = test_helper_address_or_settings
-
- if not net_test_loader.collector:
- net_test_loader.collector = collector
+ yield lookup_collector_and_test_helpers(self.netTestLoaders,
+ self.bouncer,
+ self.preferred_backend,
+ self.no_collector)
@defer.inlineCallbacks
def fetchAndVerifyNetTestInput(self, net_test_loader):
@@ -419,17 +274,197 @@ class Deck(InputFile):
i['test_options'][i['key']] = input_file.cached_file
+def lookup_collector_and_test_helpers(net_test_loaders,
+ bouncer,
+ preferred_backend,
+ no_collector=False):
+ required_nettests = []
+
+ requires_test_helpers = False
+ requires_collector = False
+ for net_test_loader in net_test_loaders:
+ nettest = {
+ 'name': net_test_loader.testName,
+ 'version': net_test_loader.testVersion,
+ 'test-helpers': [],
+ 'input-hashes': [x['hash'] for x in net_test_loader.inputFiles]
+ }
+ if not net_test_loader.collector and not no_collector:
+ requires_collector = True
+
+ if len(net_test_loader.missingTestHelpers) > 0:
+ requires_test_helpers = True
+ nettest['test-helpers'] += map(lambda x: x[1],
+ net_test_loader.missingTestHelpers)
+
+ required_nettests.append(nettest)
+
+ if not requires_test_helpers and not requires_collector:
+ defer.returnValue(None)
+
+ response = yield bouncer.lookupTestCollector(required_nettests)
+ try:
+ provided_net_tests = yield get_reachable_test_helpers_and_collectors(
+ response['net-tests'], preferred_backend)
+ except e.NoReachableCollectors:
+ log.err("Could not find any reachable collector")
+ raise
+ except e.NoReachableTestHelpers:
+ log.err("Could not find any reachable test helpers")
+ raise
+
+ def find_collector_and_test_helpers(test_name, test_version, input_files):
+ input_files = [u""+x['hash'] for x in input_files]
+ for net_test in provided_net_tests:
+ if net_test['name'] != test_name:
+ continue
+ if net_test['version'] != test_version:
+ continue
+ if set(net_test['input-hashes']) != set(input_files):
+ continue
+ return net_test['collector'], net_test['test-helpers']
+
+ for net_test_loader in net_test_loaders:
+ log.msg("Setting collector and test helpers for %s" %
+ net_test_loader.testName)
+
+ collector, test_helpers = \
+ find_collector_and_test_helpers(test_name=net_test_loader.testName,
+ test_version=net_test_loader.testVersion,
+ input_files=net_test_loader.inputFiles)
+
+ for option, name in net_test_loader.missingTestHelpers:
+ test_helper_address_or_settings = test_helpers[name]
+ net_test_loader.localOptions[option] = test_helper_address_or_settings
+ net_test_loader.testHelpers[option] = test_helper_address_or_settings
+
+ if not net_test_loader.collector:
+ net_test_loader.collector = collector
+
+
+@defer.inlineCallbacks
+def get_reachable_test_helpers_and_collectors(net_tests, preferred_backend):
+ for net_test in net_tests:
+ primary_address = net_test['collector']
+ alternate_addresses = net_test.get('collector-alternate', [])
+ net_test['collector'] = yield get_reachable_collector(
+ primary_address, alternate_addresses, preferred_backend)
+
+ for test_helper_name, test_helper_address in net_test['test-helpers'].items():
+ test_helper_alternate = \
+ net_test.get('test-helpers-alternate', {}).get(test_helper_name, [])
+ net_test['test-helpers'][test_helper_name] = \
+ yield get_reachable_test_helper(test_helper_name,
+ test_helper_address,
+ test_helper_alternate,
+ preferred_backend)
+
+ defer.returnValue(net_tests)
+
+@defer.inlineCallbacks
+def get_reachable_collector(collector_address, collector_alternate,
+ preferred_backend):
+ # We prefer onion collector to https collector to cloudfront
+ # collectors to plaintext collectors
+ for collector_settings in sort_addresses_by_priority(
+ collector_address,
+ collector_alternate,
+ preferred_backend):
+ collector = CollectorClient(settings=collector_settings)
+ if not collector.isSupported():
+ log.err("Unsupported %s collector %s" % (
+ collector_settings['type'],
+ collector_settings['address']))
+ continue
+ reachable = yield collector.isReachable()
+ if not reachable:
+ log.err("Unreachable %s collector %s" % (
+ collector_settings['type'],
+ collector_settings['address']))
+ continue
+ defer.returnValue(collector)
+
+ raise e.NoReachableCollectors
+
+
+@defer.inlineCallbacks
+def get_reachable_test_helper(test_helper_name, test_helper_address,
+ test_helper_alternate, preferred_backend):
+ # For the moment we look for alternate addresses only of
+ # web_connectivity test helpers.
+ if test_helper_name == 'web-connectivity':
+ for web_connectivity_settings in sort_addresses_by_priority(
+ test_helper_address, test_helper_alternate,
+ preferred_backend):
+ web_connectivity_test_helper = WebConnectivityClient(
+ settings=web_connectivity_settings)
+ if not web_connectivity_test_helper.isSupported():
+ log.err("Unsupported %s web_connectivity test_helper "
+ "%s" % (
+ web_connectivity_settings['type'],
+ web_connectivity_settings['address']
+ ))
+ continue
+ reachable = yield web_connectivity_test_helper.isReachable()
+ if not reachable:
+ log.err("Unreachable %s web_connectivity test helper %s" % (
+ web_connectivity_settings['type'],
+ web_connectivity_settings['address']
+ ))
+ continue
+ defer.returnValue(web_connectivity_settings)
+ raise e.NoReachableTestHelpers
+ else:
+ defer.returnValue(test_helper_address.encode('ascii'))
+
+def sort_addresses_by_priority(priority_address,
+ alternate_addresses,
+ preferred_backend):
+ prioritised_addresses = []
+
+ backend_type = guess_backend_type(priority_address)
+ priority_address = {
+ 'address': priority_address,
+ 'type': backend_type
+ }
+ address_priority = ['onion', 'https', 'cloudfront', 'http']
+ address_priority.remove(preferred_backend)
+ address_priority.insert(0, preferred_backend)
+
+ def filter_by_type(collectors, collector_type):
+ return filter(lambda x: x['type'] == collector_type, collectors)
+
+ if (priority_address['type'] != preferred_backend):
+ valid_alternatives = filter_by_type(alternate_addresses,
+ preferred_backend)
+ if len(valid_alternatives) > 0:
+ alternate_addresses += [priority_address]
+ priority_address = valid_alternatives[0]
+ alternate_addresses.remove(priority_address)
+
+ prioritised_addresses += [priority_address]
+ for address_type in address_priority:
+ prioritised_addresses += filter_by_type(alternate_addresses,
+ address_type)
+
+ return prioritised_addresses
+
+
+class InputNotFound(Exception):
+ pass
+
+
class InputStore(object):
def __init__(self):
self.path = FilePath(config.inputs_directory)
self.resources = FilePath(config.resources_directory)
+ self._cache_stale = True
+ self._cache = {}
@defer.inlineCallbacks
def update_url_lists(self, country_code):
countries = ["global"]
- if country_code == "ZZ":
- country_code = None
- else:
+ if country_code != "ZZ":
countries.append(country_code)
for cc in countries:
@@ -466,30 +501,58 @@ class InputStore(object):
"name": name,
"filepath": out_file.path,
"last_updated": timestampNowISO8601UTC(),
- "id": "citizenlab_test_lists_{0}_txt".format(cc),
+ "id": "citizenlab_{0}_urls".format(cc),
"type": "file/url"
}, out_fh)
@defer.inlineCallbacks
def create(self, country_code=None):
+ # XXX This is a hax to avoid race conditions in testing because this
+ # object is a singleton and config can have a custom home directory
+ # passed at runtime.
+ self.path = FilePath(config.inputs_directory)
+ self.resources = FilePath(config.resources_directory)
+
self.path.child("descriptors").makedirs(ignoreExistingDirectory=True)
self.path.child("data").makedirs(ignoreExistingDirectory=True)
yield self.update_url_lists(country_code)
@defer.inlineCallbacks
def update(self, country_code=None):
- yield self.update_url_lists(country_code)
+ # XXX why do we make a difference between create and update?
+ yield self.create(country_code)
- def list(self):
- inputs = []
+ def _update_cache(self):
descs = self.path.child("descriptors")
if not descs.exists():
- return inputs
+ self._cache = {}
+ return
for fn in descs.listdir():
with descs.child(fn).open("r") as in_fh:
- inputs.append(json.load(in_fh))
- return inputs
+ input_desc = json.load(in_fh)
+ self._cache[input_desc.pop("id")] = input_desc
+ self._cache_stale = False
+ return
+
+ def list(self):
+ if self._cache_stale:
+ self._update_cache()
+ return self._cache
+
+ def get(self, input_id):
+ if self._cache_stale:
+ self._update_cache()
+ try:
+ input_desc = self._cache[input_id]
+ except KeyError:
+ raise InputNotFound(input_id)
+ return input_desc
+
+ def getContent(self, input_id):
+ input_desc = self.get(input_id)
+ with open(input_desc["filepath"]) as fh:
+ return fh.read()
class DeckStore(object):
def __init__(self):
@@ -501,10 +564,268 @@ class DeckStore(object):
def get(self):
pass
-class NGInput(object):
- def __init__(self, input_name):
- pass
+def resolve_file_path(v, prepath=None):
+ if v.startswith("$"):
+ # This raises InputNotFound and we let it carry onto the caller
+ return input_store.get(v[1:])["filepath"]
+ elif prepath is not None and (not os.path.isabs(v)):
+ return FilePath(prepath).preauthChild(v).path
+ return v
+
+def options_to_args(options, prepath=None):
+ args = []
+ for k, v in options.items():
+ if v is None:
+ continue
+ if k == "file":
+ v = resolve_file_path(v, prepath)
+ args.append('--'+k)
+ args.append(v)
+ return args
+
+class UnknownTaskKey(Exception):
+ pass
+
+class MissingTaskDataKey(Exception):
+ pass
+
+class DeckTask(object):
+ _metadata_keys = ["name"]
+ _supported_tasks = ["ooni"]
+
+ def __init__(self, data, parent_metadata={}, cwd=None):
+ self.parent_metadata = parent_metadata
+ self.cwd = cwd
+ self.data = deepcopy(data)
+
+ self.id = ""
+
+ self.type = None
+ self.metadata = {}
+ self.requires_tor = False
+ self.requires_bouncer = False
+
+ self.ooni = {
+ 'bouncer_client': None,
+ 'test_details': {}
+ }
+
+ self._load(data)
+
+ def _load_ooni(self, task_data):
+ required_keys = ["test_name"]
+ for required_key in required_keys:
+ if required_key not in task_data:
+ raise MissingTaskDataKey(required_key)
+
+ # This raises e.NetTestNotFound, we let it go onto the caller
+ nettest_path = nettest_to_path(task_data.pop("test_name"))
+
+ try:
+ annotations = task_data.pop('annotations')
+ except KeyError:
+ annotations = self.parent_metadata.get('annotations', {})
+
+ try:
+ collector_address = task_data.pop('collector')
+ except KeyError:
+ collector_address = self.parent_metadata.get('collector', None)
+
+ net_test_loader = NetTestLoader(
+ options_to_args(task_data),
+ annotations=annotations,
+ test_file=nettest_path
+ )
+
+ if isinstance(collector_address, dict):
+ net_test_loader.collector = CollectorClient(
+ settings=collector_address
+ )
+ elif collector_address is not None:
+ net_test_loader.collector = CollectorClient(
+ collector_address
+ )
+
+ if (net_test_loader.collector is not None and
+ net_test_loader.collector.backend_type == "onion"):
+ self.requires_tor = True
+
+ try:
+ net_test_loader.checkOptions()
+ if net_test_loader.requiresTor:
+ self.requires_tor = True
+ except e.MissingTestHelper:
+ self.requires_bouncer = True
+
+ self.ooni['net_test_loader'] = net_test_loader
+ # Need to ensure that this is called only once we have looked up the
+ # probe IP address and have geoip data.
+ self.ooni['test_details'] = net_test_loader.getTestDetails()
+ self.id = generate_filename(self.ooni['test_details'])
+
+ def _load(self, data):
+ for key in self._metadata_keys:
+ try:
+ self.metadata[key] = data.pop(key)
+ except KeyError:
+ continue
+
+ task_type, task_data = data.popitem()
+ if task_type not in self._supported_tasks:
+ raise UnknownTaskKey(task_type)
+ self.type = task_type
+ getattr(self, "_load_"+task_type)(task_data)
+
+ assert len(data) == 0
class NGDeck(object):
- def __init__(self, deck_path):
- pass
+ def __init__(self, deck_data=None,
+ deck_path=None, no_collector=False):
+ # Used to resolve relative paths inside of decks.
+ self.deck_directory = None
+ self.requires_tor = False
+ self.no_collector = no_collector
+ self.name = ""
+ self.description = ""
+ self.schedule = None
+
+ self.metadata = {}
+ self.bouncer = None
+
+ self._measurement_path = FilePath(config.measurements_directory)
+ self._tasks = []
+ self.task_ids = []
+
+ if deck_path is not None:
+ self.open(deck_path)
+ elif deck_data is not None:
+ self.load(deck_data)
+
+ def open(self, deck_path):
+ with open(deck_path) as fh:
+ deck_data = yaml.safe_load(fh)
+ self.load(deck_data)
+
+ def write(self, fh):
+ """
+ Writes a properly formatted deck to the supplied file handle.
+ :param fh: an open file handle
+ :return:
+ """
+ deck_data = {
+ "name": self.name,
+ "description": self.description,
+ "tasks": [task.data for task in self._tasks]
+ }
+ if self.schedule is not None:
+ deck_data["schedule"] = self.schedule
+ for key, value in self.metadata.items():
+ deck_data[key] = value
+
+ fh.write("---\n")
+ yaml.safe_dump(deck_data, fh, default_flow_style=False)
+
+ def load(self, deck_data):
+ self.name = deck_data.pop("name", "Un-named Deck")
+ self.description = deck_data.pop("description", "No description")
+
+ bouncer_address = deck_data.pop("bouncer", None)
+ if bouncer_address is None:
+ self.bouncer = get_preferred_bouncer()
+ elif isinstance(bouncer_address, dict):
+ self.bouncer = BouncerClient(settings=bouncer_address)
+ else:
+ self.bouncer = BouncerClient(bouncer_address)
+
+ self.schedule = deck_data.pop("schedule", None)
+
+ tasks_data = deck_data.pop("tasks", [])
+ for key, metadata in deck_data.items():
+ self.metadata[key] = metadata
+
+ for task_data in tasks_data:
+ deck_task = DeckTask(task_data, self.metadata, self.deck_directory)
+ if deck_task.requires_tor:
+ self.requires_tor = True
+ if (deck_task.requires_bouncer and
+ self.bouncer.backend_type == "onion"):
+ self.requires_tor = True
+ self._tasks.append(deck_task)
+ self.task_ids.append(deck_task.id)
+
+ @defer.inlineCallbacks
+ def query_bouncer(self):
+ preferred_backend = config.advanced.get(
+ "preferred_backend", "onion"
+ )
+ log.msg("Looking up collector and test helpers with {0}".format(
+ self.bouncer.base_address)
+ )
+ net_test_loaders = []
+ for task in self._tasks:
+ if task.type == "ooni":
+ net_test_loaders.append(task.ooni["net_test_loader"])
+
+ yield lookup_collector_and_test_helpers(
+ net_test_loaders,
+ self.bouncer,
+ preferred_backend,
+ self.no_collector
+ )
+
+ def _measurement_completed(self, result, measurement_id):
+ log.msg("{0}".format(result))
+ measurement_dir = self._measurement_path.child(measurement_id)
+ measurement_dir.child("measurements.njson.progress").moveTo(
+ measurement_dir.child("measurements.njson")
+ )
+ generate_summary(
+ measurement_dir.child("measurements.njson").path,
+ measurement_dir.child("summary.json").path
+ )
+ measurement_dir.child("running.pid").remove()
+
+ def _measurement_failed(self, failure, measurement_id):
+ measurement_dir = self._measurement_path.child(measurement_id)
+ measurement_dir.child("running.pid").remove()
+ # XXX do we also want to delete measurements.njson.progress?
+ return failure
+
+ def _run_ooni_task(self, task, director):
+ net_test_loader = task.ooni["net_test_loader"]
+ test_details = task.ooni["test_details"]
+ measurement_id = task.id
+
+ measurement_dir = self._measurement_path.child(measurement_id)
+ measurement_dir.createDirectory()
+
+ report_filename = measurement_dir.child("measurements.njson.progress").path
+ pid_file = measurement_dir.child("running.pid")
+
+ with pid_file.open('w') as out_file:
+ out_file.write("{0}".format(os.getpid()))
+
+ d = director.start_net_test_loader(
+ net_test_loader,
+ report_filename,
+ test_details=test_details
+ )
+ d.addCallback(self._measurement_completed, measurement_id)
+ d.addErrback(self._measurement_failed, measurement_id)
+ return d
+
+ @defer.inlineCallbacks
+ def run(self, director):
+ tasks = []
+ preferred_backend = config.advanced.get("preferred_backend", "onion")
+ yield self.query_bouncer()
+ for task in self._tasks:
+ if task.requires_tor:
+ yield director.start_tor()
+ elif task.requires_bouncer and preferred_backend == "onion":
+ yield director.start_tor()
+ if task.type == "ooni":
+ tasks.append(self._run_ooni_task(task, director))
+ defer.returnValue(tasks)
+
+input_store = InputStore()
diff --git a/ooni/director.py b/ooni/director.py
index d39a11b..793975e 100644
--- a/ooni/director.py
+++ b/ooni/director.py
@@ -1,19 +1,24 @@
import pwd
import os
+from twisted.internet import defer
+from twisted.python.failure import Failure
+
from ooni.managers import ReportEntryManager, MeasurementManager
from ooni.reporter import Report
from ooni.utils import log, generate_filename
from ooni.utils.net import randomFreePort
from ooni.nettest import NetTest, getNetTestInformation
from ooni.settings import config
-from ooni import errors
from ooni.nettest import normalizeTestName
from ooni.deck import InputStore
from ooni.utils.onion import start_tor, connect_to_control_port
-from twisted.internet import defer
+class DirectorEvent(object):
+ def __init__(self, type="update", message=""):
+ self.type = type
+ self.message = message
class Director(object):
@@ -86,8 +91,6 @@ class Director(object):
self.failures = []
- self.torControlProtocol = None
-
# This deferred is fired once all the measurements and their reporting
# tasks are completed.
self.allTestsDone = defer.Deferred()
@@ -95,6 +98,58 @@ class Director(object):
self.input_store = InputStore()
+ self._reset_director_state()
+ self._reset_tor_state()
+
+ self._subscribers = []
+
+ def subscribe(self, handler):
+ self._subscribers.append(handler)
+
+ def unsubscribe(self, handler):
+ self._subscribers.remove(handler)
+
+ def notify(self, event):
+ for handler in self._subscribers:
+ handler(event)
+
+ def _reset_director_state(self):
+ self._director_state = 'not-running'
+ self._director_starting = defer.Deferred()
+ self._director_starting.addErrback(self._director_startup_failure)
+ self._director_starting.addCallback(self._director_startup_success)
+
+ def _director_startup_failure(self, failure):
+ self._reset_director_state()
+ self.notify(DirectorEvent("error",
+ "Failed to start the director"))
+ return failure
+
+ def _director_startup_success(self, result):
+ self._director_state = 'running'
+ self.notify(DirectorEvent("success",
+ "Successfully started the director"))
+ return result
+
+ def _reset_tor_state(self):
+ # This can be either 'not-running', 'starting' or 'running'
+ self._tor_state = 'not-running'
+ self._tor_starting = defer.Deferred()
+ self._tor_starting.addErrback(self._tor_startup_failure)
+ self._tor_starting.addCallback(self._tor_startup_success)
+
+ def _tor_startup_failure(self, failure):
+ self._reset_tor_state()
+ self.notify(DirectorEvent("error",
+ "Failed to start Tor"))
+ return failure
+
+ def _tor_startup_success(self, result):
+ self._tor_state = 'running'
+ self.notify(DirectorEvent("success",
+ "Successfully started Tor"))
+ return result
+
def getNetTests(self):
nettests = {}
@@ -126,16 +181,11 @@ class Director(object):
return nettests
@defer.inlineCallbacks
- def start(self, start_tor=False, check_incoherences=True):
+ def _start(self, start_tor, check_incoherences):
self.netTests = self.getNetTests()
if start_tor:
- if check_incoherences:
- yield config.check_tor()
- if config.advanced.start_tor and config.tor_state is None:
- yield self.startTor()
- elif config.tor.control_port and config.tor_state is None:
- yield connect_to_control_port()
+ yield self.start_tor(check_incoherences)
if config.global_options.get('no-geoip'):
aux = [False]
@@ -146,8 +196,21 @@ class Director(object):
log.msg("You should add annotations for the country, city and ASN")
else:
yield config.probe_ip.lookup()
+ self.notify(DirectorEvent("success",
+ "Looked up Probe IP"))
yield self.input_store.create(config.probe_ip.geodata["countrycode"])
+ self.notify(DirectorEvent("success",
+ "Created input store"))
+
+ @defer.inlineCallbacks
+ def start(self, start_tor=False, check_incoherences=True):
+ self._director_state = 'starting'
+ try:
+ yield self._start(start_tor, check_incoherences)
+ self._director_starting.callback(self._director_state)
+ except Exception as exc:
+ self._director_starting.errback(Failure(exc))
@property
def measurementSuccessRatio(self):
@@ -217,26 +280,17 @@ class Director(object):
measurement.result = failure
return measurement
- def reporterFailed(self, failure, net_test):
- """
- This gets called every time a reporter is failing and has been removed
- from the reporters of a NetTest.
- Once a report has failed to be created that net_test will never use the
- reporter again.
-
- XXX hook some logic here.
- note: failure contains an extra attribute called failure.reporter
- """
- pass
-
def netTestDone(self, net_test):
+ self.notify(DirectorEvent("success",
+ "Successfully ran net_test"))
self.activeNetTests.remove(net_test)
if len(self.activeNetTests) == 0:
self.allTestsDone.callback(None)
@defer.inlineCallbacks
- def startNetTest(self, net_test_loader, report_filename,
- collector_client=None, no_yamloo=False):
+ def start_net_test_loader(self, net_test_loader, report_filename,
+ collector_client=None, no_yamloo=False,
+ test_details=None):
"""
Create the Report for the NetTest and start the report NetTest.
@@ -244,14 +298,15 @@ class Director(object):
net_test_loader:
an instance of :class:ooni.nettest.NetTestLoader
"""
- test_details = net_test_loader.getTestDetails()
+ if test_details is None:
+ test_details = net_test_loader.getTestDetails()
test_cases = net_test_loader.getTestCases()
if self.allTestsDone.called:
self.allTestsDone = defer.Deferred()
if config.privacy.includepcap or config.global_options.get('pcapfile', None):
- self.startSniffing(test_details)
+ self.start_sniffing(test_details)
report = Report(test_details, report_filename,
self.reportEntryManager,
collector_client,
@@ -271,7 +326,7 @@ class Director(object):
finally:
self.netTestDone(net_test)
- def startSniffing(self, test_details):
+ def start_sniffing(self, test_details):
""" Start sniffing with Scapy. Exits if required privileges (root) are not
available.
"""
@@ -303,57 +358,83 @@ class Director(object):
log.msg("Starting packet capture to: %s" % filename_pcap)
- def startTor(self):
+ @defer.inlineCallbacks
+ def start_tor(self, check_incoherences=False):
""" Starts Tor
Launches a Tor with :param: socks_port :param: control_port
:param: tor_binary set in ooniprobe.conf
"""
- log.msg("Starting Tor...")
-
from txtorcon import TorConfig
+ if self._tor_state == 'running':
+ log.debug("Tor is already running")
+ defer.returnValue(self._tor_state)
+ elif self._tor_state == 'starting':
+ yield self._tor_starting
+ defer.returnValue(self._tor_state)
- tor_config = TorConfig()
- if config.tor.control_port is None:
- config.tor.control_port = int(randomFreePort())
- if config.tor.socks_port is None:
- config.tor.socks_port = int(randomFreePort())
-
- tor_config.ControlPort = config.tor.control_port
- tor_config.SocksPort = config.tor.socks_port
-
- if config.tor.data_dir:
- data_dir = os.path.expanduser(config.tor.data_dir)
-
- if not os.path.exists(data_dir):
- log.msg("%s does not exist. Creating it." % data_dir)
- os.makedirs(data_dir)
- tor_config.DataDirectory = data_dir
-
- if config.tor.bridges:
- tor_config.UseBridges = 1
- if config.advanced.obfsproxy_binary:
- tor_config.ClientTransportPlugin = (
- 'obfs2,obfs3 exec %s managed' %
- config.advanced.obfsproxy_binary
- )
- bridges = []
- with open(config.tor.bridges) as f:
- for bridge in f:
- if 'obfs' in bridge:
- if config.advanced.obfsproxy_binary:
+ log.msg("Starting Tor...")
+ self._tor_state = 'starting'
+ if check_incoherences:
+ yield config.check_tor()
+
+ if config.advanced.start_tor and config.tor_state is None:
+ tor_config = TorConfig()
+ if config.tor.control_port is None:
+ config.tor.control_port = int(randomFreePort())
+ if config.tor.socks_port is None:
+ config.tor.socks_port = int(randomFreePort())
+
+ tor_config.ControlPort = config.tor.control_port
+ tor_config.SocksPort = config.tor.socks_port
+
+ if config.tor.data_dir:
+ data_dir = os.path.expanduser(config.tor.data_dir)
+
+ if not os.path.exists(data_dir):
+ log.msg("%s does not exist. Creating it." % data_dir)
+ os.makedirs(data_dir)
+ tor_config.DataDirectory = data_dir
+
+ if config.tor.bridges:
+ tor_config.UseBridges = 1
+ if config.advanced.obfsproxy_binary:
+ tor_config.ClientTransportPlugin = (
+ 'obfs2,obfs3 exec %s managed' %
+ config.advanced.obfsproxy_binary
+ )
+ bridges = []
+ with open(config.tor.bridges) as f:
+ for bridge in f:
+ if 'obfs' in bridge:
+ if config.advanced.obfsproxy_binary:
+ bridges.append(bridge.strip())
+ else:
bridges.append(bridge.strip())
- else:
- bridges.append(bridge.strip())
- tor_config.Bridge = bridges
-
- if config.tor.torrc:
- for i in config.tor.torrc.keys():
- setattr(tor_config, i, config.tor.torrc[i])
-
- if os.geteuid() == 0:
- tor_config.User = pwd.getpwuid(os.geteuid()).pw_name
-
- tor_config.save()
- log.debug("Setting control port as %s" % tor_config.ControlPort)
- log.debug("Setting SOCKS port as %s" % tor_config.SocksPort)
- return start_tor(tor_config)
+ tor_config.Bridge = bridges
+
+ if config.tor.torrc:
+ for i in config.tor.torrc.keys():
+ setattr(tor_config, i, config.tor.torrc[i])
+
+ if os.geteuid() == 0:
+ tor_config.User = pwd.getpwuid(os.geteuid()).pw_name
+
+ tor_config.save()
+ log.debug("Setting control port as %s" % tor_config.ControlPort)
+ log.debug("Setting SOCKS port as %s" % tor_config.SocksPort)
+ try:
+ yield start_tor(tor_config)
+ log.err("Calling tor callback")
+ self._tor_starting.callback(self._tor_state)
+ log.err("called")
+ except Exception as exc:
+ log.err("Failed to start tor")
+ log.exc(exc)
+ self._tor_starting.errback(Failure(exc))
+
+ elif config.tor.control_port and config.tor_state is None:
+ try:
+ yield connect_to_control_port()
+ self._tor_starting.callback(self._tor_state)
+ except Exception as exc:
+ self._tor_starting.errback(Failure(exc))
diff --git a/ooni/geoip.py b/ooni/geoip.py
index fa6d1ae..28e0e1e 100644
--- a/ooni/geoip.py
+++ b/ooni/geoip.py
@@ -136,11 +136,11 @@ class UbuntuGeoIP(HTTPGeoIPLookupper):
probe_ip = m.group(1)
return probe_ip
-class TorProjectGeoIP(HTTPGeoIPLookupper):
- url = "https://check.torproject.org/"
+class DuckDuckGoGeoIP(HTTPGeoIPLookupper):
+ url = "https://duckduckgo.com/?q=ip&ia=answer"
def parseResponse(self, response_body):
- regexp = "Your IP address appears to be: <strong>((\d+\.)+(\d+))"
+ regexp = "Your IP address is (.*) in "
probe_ip = re.search(regexp, response_body).group(1)
return probe_ip
@@ -151,7 +151,7 @@ class ProbeIP(object):
def __init__(self):
self.geoIPServices = {
'ubuntu': UbuntuGeoIP,
- 'torproject': TorProjectGeoIP
+ 'duckduckgo': DuckDuckGoGeoIP
}
self.geodata = {
'asn': 'AS0',
diff --git a/ooni/measurements.py b/ooni/measurements.py
deleted file mode 100644
index 976b125..0000000
--- a/ooni/measurements.py
+++ /dev/null
@@ -1,43 +0,0 @@
-import json
-
-class GenerateResults(object):
- supported_tests = [
- "web_connectivity"
- ]
-
- def __init__(self, input_file):
- self.input_file = input_file
-
- def process_web_connectivity(self, entry):
- result = {}
- result['anomaly'] = False
- if entry['test_keys']['blocking'] is not False:
- result['anomaly'] = True
- result['url'] = entry['input']
- return result
-
- def output(self, output_file):
- results = {}
- with open(self.input_file) as in_file:
- for idx, line in enumerate(in_file):
- entry = json.loads(line.strip())
- if entry['test_name'] not in self.supported_tests:
- raise Exception("Unsupported test")
- result = getattr(self, 'process_'+entry['test_name'])(entry)
- result['idx'] = idx
- results['test_name'] = entry['test_name']
- results['country_code'] = entry['probe_cc']
- results['asn'] = entry['probe_asn']
- results['results'] = results.get('results', [])
- results['results'].append(result)
-
- with open(output_file, "w") as fw:
- json.dump(results, fw)
-
-if __name__ == "__main__":
- import sys
- if len(sys.argv) != 3:
- print("Usage: {0} [input_file] [output_file]".format(sys.argv[0]))
- sys.exit(1)
- gr = GenerateResults(sys.argv[1])
- gr.output(sys.argv[2])
diff --git a/ooni/nettest.py b/ooni/nettest.py
index d01cf7b..2a33a2f 100644
--- a/ooni/nettest.py
+++ b/ooni/nettest.py
@@ -254,7 +254,8 @@ class NetTestLoader(object):
h = sha256()
for l in f:
h.update(l)
- except:
+ except Exception as exc:
+ log.exception(exc)
raise e.InvalidInputFile(filename)
input_file['hash'] = h.hexdigest()
self.inputFiles.append(input_file)
diff --git a/ooni/results.py b/ooni/results.py
new file mode 100644
index 0000000..21fe997
--- /dev/null
+++ b/ooni/results.py
@@ -0,0 +1,39 @@
+import json
+
+class Process():
+ supported_tests = [
+ "web_connectivity"
+ ]
+ @staticmethod
+ def web_connectivity(entry):
+ result = {}
+ result['anomaly'] = False
+ if entry['test_keys']['blocking'] is not False:
+ result['anomaly'] = True
+ result['url'] = entry['input']
+ return result
+
+def generate_summary(input_file, output_file):
+ results = {}
+ with open(input_file) as in_file:
+ for idx, line in enumerate(in_file):
+ entry = json.loads(line.strip())
+ result = {}
+ if entry['test_name'] in Process.supported_tests:
+ result = getattr(Process, entry['test_name'])(entry)
+ result['idx'] = idx
+ results['test_name'] = entry['test_name']
+ results['country_code'] = entry['probe_cc']
+ results['asn'] = entry['probe_asn']
+ results['results'] = results.get('results', [])
+ results['results'].append(result)
+
+ with open(output_file, "w") as fw:
+ json.dump(results, fw)
+
+if __name__ == "__main__":
+ import sys
+ if len(sys.argv) != 3:
+ print("Usage: {0} [input_file] [output_file]".format(sys.argv[0]))
+ sys.exit(1)
+ generate_summary(sys.argv[1], sys.argv[2])
diff --git a/ooni/tests/bases.py b/ooni/tests/bases.py
index 904aaba..40e3b5e 100644
--- a/ooni/tests/bases.py
+++ b/ooni/tests/bases.py
@@ -10,7 +10,7 @@ class ConfigTestCase(unittest.TestCase):
def setUp(self):
self.ooni_home_dir = os.path.abspath("ooni_home")
self.config = config
- self.config.initialize_ooni_home("ooni_home")
+ self.config.initialize_ooni_home(self.ooni_home_dir)
super(ConfigTestCase, self).setUp()
def skipTest(self, reason):
diff --git a/ooni/tests/test_deck.py b/ooni/tests/test_deck.py
index ba87ec8..455f4e1 100644
--- a/ooni/tests/test_deck.py
+++ b/ooni/tests/test_deck.py
@@ -1,11 +1,15 @@
import os
+from copy import deepcopy
+
from twisted.internet import defer
from twisted.trial import unittest
from hashlib import sha256
from ooni import errors
-from ooni.deck import InputFile, Deck, nettest_to_path
+from ooni.deck import input_store, lookup_collector_and_test_helpers
+from ooni.nettest import NetTestLoader
+from ooni.deck import InputFile, Deck, nettest_to_path, DeckTask, NGDeck
from ooni.tests.bases import ConfigTestCase
from ooni.tests.mocks import MockBouncerClient, MockCollectorClient
@@ -182,7 +186,8 @@ class TestDeck(BaseTestCase, ConfigTestCase):
self.assertEqual(len(deck.netTestLoaders[0].missingTestHelpers), 1)
- yield deck.lookupCollectorAndTestHelpers()
+ yield lookup_collector_and_test_helpers(deck.preferred_backend,
+ deck.netTestLoaders)
self.assertEqual(deck.netTestLoaders[0].collector.settings['address'],
'httpo://thirteenchars123.onion')
@@ -229,7 +234,8 @@ class TestDeck(BaseTestCase, ConfigTestCase):
self.assertEqual(len(deck.netTestLoaders[0].missingTestHelpers), 1)
- yield deck.lookupCollectorAndTestHelpers()
+ yield lookup_collector_and_test_helpers(deck.preferred_backend,
+ deck.netTestLoaders)
self.assertEqual(
deck.netTestLoaders[0].collector.settings['address'],
@@ -258,7 +264,8 @@ class TestDeck(BaseTestCase, ConfigTestCase):
self.assertEqual(len(deck.netTestLoaders[0].missingTestHelpers), 1)
- yield deck.lookupCollectorAndTestHelpers()
+ yield lookup_collector_and_test_helpers(deck.preferred_backend,
+ deck.netTestLoaders)
self.assertEqual(
deck.netTestLoaders[0].collector.settings['address'],
@@ -269,3 +276,45 @@ class TestDeck(BaseTestCase, ConfigTestCase):
deck.netTestLoaders[0].localOptions['backend'],
'127.0.0.1'
)
+
+class TestInputStore(ConfigTestCase):
+ @defer.inlineCallbacks
+ def test_update_input_store(self):
+ self.skipTest("antani")
+ yield input_store.update("ZZ")
+ print os.listdir(os.path.join(
+ self.config.resources_directory, "citizenlab-test-lists"))
+ print os.listdir(os.path.join(self.config.inputs_directory))
+
+TASK_DATA = {
+ "name": "Some Task",
+ "ooni": {
+ "test_name": "web_connectivity",
+ "file": "$citizen_lab_global_urls"
+ }
+}
+
+DECK_DATA = {
+ "name": "My deck",
+ "description": "Something",
+ "tasks": [
+ deepcopy(TASK_DATA)
+ ]
+}
+class TestNGDeck(ConfigTestCase):
+ skip = True
+ def test_deck_task(self):
+ if self.skip:
+ self.skipTest("Skip is set to true")
+ yield input_store.update("ZZ")
+ deck_task = DeckTask(TASK_DATA)
+ self.assertIsInstance(deck_task.ooni["net_test_loader"],
+ NetTestLoader)
+
+ @defer.inlineCallbacks
+ def test_deck_load(self):
+ if self.skip:
+ self.skipTest("Skip is set to true")
+ yield input_store.update("ZZ")
+ deck = NGDeck(deck_data=DECK_DATA)
+ self.assertEqual(len(deck.tasks), 1)
diff --git a/ooni/tests/test_director.py b/ooni/tests/test_director.py
index 18377f5..6638adb 100644
--- a/ooni/tests/test_director.py
+++ b/ooni/tests/test_director.py
@@ -73,7 +73,7 @@ class TestDirector(ConfigTestCase):
@defer.inlineCallbacks
def director_start_tor():
director = Director()
- yield director.startTor()
+ yield director.start_tor()
assert config.tor.socks_port == 4242
assert config.tor.control_port == 4242
@@ -93,7 +93,7 @@ class TestDirector(ConfigTestCase):
net_test_loader.loadNetTestString(test_failing_twice)
director = Director()
director.netTestDone = net_test_done
- director.startNetTest(net_test_loader, None, no_yamloo=True)
+ director.start_net_test_loader(net_test_loader, None, no_yamloo=True)
return finished
@@ -113,7 +113,7 @@ class TestStartSniffing(unittest.TestCase):
def test_start_sniffing_once(self):
with patch('ooni.settings.config.scapyFactory') as mock_scapy_factory:
with patch('ooni.utils.txscapy.ScapySniffer') as mock_scapy_sniffer:
- self.director.startSniffing(self.testDetails)
+ self.director.start_sniffing(self.testDetails)
sniffer = mock_scapy_sniffer.return_value
mock_scapy_factory.registerProtocol.assert_called_once_with(sniffer)
@@ -122,7 +122,7 @@ class TestStartSniffing(unittest.TestCase):
with patch('ooni.utils.txscapy.ScapySniffer') as mock_scapy_sniffer:
sniffer = mock_scapy_sniffer.return_value
sniffer.pcapwriter.filename = 'foo1_filename'
- self.director.startSniffing(self.testDetails)
+ self.director.start_sniffing(self.testDetails)
self.assertEqual(len(self.director.sniffers), 1)
self.testDetails = {
@@ -132,13 +132,13 @@ class TestStartSniffing(unittest.TestCase):
with patch('ooni.utils.txscapy.ScapySniffer') as mock_scapy_sniffer:
sniffer = mock_scapy_sniffer.return_value
sniffer.pcapwriter.filename = 'foo2_filename'
- self.director.startSniffing(self.testDetails)
+ self.director.start_sniffing(self.testDetails)
self.assertEqual(len(self.director.sniffers), 2)
def test_measurement_succeeded(self):
with patch('ooni.settings.config.scapyFactory') as mock_scapy_factory:
with patch('ooni.utils.txscapy.ScapySniffer') as mock_scapy_sniffer:
- self.director.startSniffing(self.testDetails)
+ self.director.start_sniffing(self.testDetails)
self.assertEqual(len(self.director.sniffers), 1)
measurement = MagicMock()
measurement.testInstance = self.FooTestCase()
diff --git a/ooni/tests/test_nettest.py b/ooni/tests/test_nettest.py
index 239080a..1592149 100644
--- a/ooni/tests/test_nettest.py
+++ b/ooni/tests/test_nettest.py
@@ -355,7 +355,7 @@ class TestNetTest(ConfigTestCase):
director = Director()
self.filename = 'dummy_report.yamloo'
- d = director.startNetTest(ntl, self.filename)
+ d = director.start_net_test_loader(ntl, self.filename)
@d.addCallback
def complete(result):
@@ -382,7 +382,7 @@ class TestNetTest(ConfigTestCase):
director = Director()
self.filename = 'dummy_report.yamloo'
- d = director.startNetTest(ntl, self.filename)
+ d = director.start_net_test_loader(ntl, self.filename)
@d.addCallback
def complete(result):
@@ -410,7 +410,7 @@ class TestNetTest(ConfigTestCase):
director = Director()
self.filename = 'dummy_report.yamloo'
- d = director.startNetTest(ntl, self.filename)
+ d = director.start_net_test_loader(ntl, self.filename)
@d.addCallback
def complete(result):
@@ -469,7 +469,7 @@ class TestNettestTimeout(ConfigTestCase):
director = Director()
self.filename = 'dummy_report.yamloo'
- d = director.startNetTest(ntl, self.filename)
+ d = director.start_net_test_loader(ntl, self.filename)
@d.addCallback
def complete(result):
diff --git a/ooni/tests/test_oonideckgen.py b/ooni/tests/test_oonideckgen.py
index 4a52377..11a852f 100644
--- a/ooni/tests/test_oonideckgen.py
+++ b/ooni/tests/test_oonideckgen.py
@@ -1,10 +1,11 @@
import os
-import yaml
import tempfile
+import yaml
+
+from ooni.scripts import oonideckgen
from .bases import ConfigTestCase
-from ooni.deckgen import cli
class TestOONIDeckgen(ConfigTestCase):
def setUp(self):
@@ -25,7 +26,7 @@ class TestOONIDeckgen(ConfigTestCase):
def test_generate_deck(self):
temp_dir = tempfile.mkdtemp()
- cli.generate_deck({
+ oonideckgen.generate_deck({
"country-code": "it",
"output": temp_dir,
"collector": None,
diff --git a/ooni/tests/test_oonireport.py b/ooni/tests/test_oonireport.py
index 2275672..d71a403 100644
--- a/ooni/tests/test_oonireport.py
+++ b/ooni/tests/test_oonireport.py
@@ -5,8 +5,6 @@ from mock import patch, MagicMock
from twisted.internet import defer
from ooni.tests.bases import ConfigTestCase
-from ooni.report import tool
-
mock_tor_check = MagicMock(return_value=True)
class TestOONIReport(ConfigTestCase):
@@ -51,9 +49,9 @@ class TestOONIReport(ConfigTestCase):
cli.run(["upload"])
self.assertTrue(mock_tool.upload_all.called)
- @patch('ooni.report.tool.CollectorClient')
- @patch('ooni.report.tool.OONIBReportLog')
- @patch('ooni.report.tool.OONIBReporter')
+ @patch('ooni.report.cli.CollectorClient')
+ @patch('ooni.report.cli.OONIBReportLog')
+ @patch('ooni.report.cli.OONIBReporter')
def test_tool_upload(self, mock_oonib_reporter, mock_oonib_report_log,
mock_collector_client):
@@ -70,7 +68,7 @@ class TestOONIReport(ConfigTestCase):
self._create_reporting_yaml(report_name)
self._write_dummy_report(report_name)
- d = tool.upload(report_name)
+ d = cli.upload(report_name)
@d.addCallback
def cb(result):
mock_oonib_reporter_i.writeReportEntry.assert_called_with(
@@ -78,9 +76,9 @@ class TestOONIReport(ConfigTestCase):
)
return d
- @patch('ooni.report.tool.CollectorClient')
- @patch('ooni.report.tool.OONIBReportLog')
- @patch('ooni.report.tool.OONIBReporter')
+ @patch('ooni.report.cli.CollectorClient')
+ @patch('ooni.report.cli.OONIBReportLog')
+ @patch('ooni.report.cli.OONIBReporter')
def test_tool_upload_all(self, mock_oonib_reporter, mock_oonib_report_log,
mock_collector_client):
@@ -98,7 +96,7 @@ class TestOONIReport(ConfigTestCase):
self._create_reporting_yaml(report_name)
self._write_dummy_report(report_name)
- d = tool.upload_all()
+ d = cli.upload_all()
@d.addCallback
def cb(result):
mock_oonib_reporter_i.writeReportEntry.assert_called_with(
diff --git a/ooni/ui/cli.py b/ooni/ui/cli.py
index 7a0036e..fe24bf6 100644
--- a/ooni/ui/cli.py
+++ b/ooni/ui/cli.py
@@ -334,10 +334,10 @@ def runTestWithDirector(director, global_options, url=None, start_tor=True):
collector_client = setupCollector(global_options,
net_test_loader.collector)
- yield director.startNetTest(net_test_loader,
- global_options['reportfile'],
- collector_client,
- global_options['no-yamloo'])
+ yield director.start_net_test_loader(net_test_loader,
+ global_options['reportfile'],
+ collector_client,
+ global_options['no-yamloo'])
d.addCallback(setup_nettest)
d.addCallback(post_director_start)
diff --git a/ooni/ui/web/client/index.html b/ooni/ui/web/client/index.html
index cc45067..e306ef2 100644
--- a/ooni/ui/web/client/index.html
+++ b/ooni/ui/web/client/index.html
@@ -13,5 +13,5 @@
<app>
Loading...
</app>
- <script type="text/javascript" src="app.bundle.js?27ae67e2c74ae4ae9a82"></script></body>
+ <script type="text/javascript" src="app.bundle.js?7ed7d7510803fa1a4ad8"></script></body>
</html>
diff --git a/ooni/ui/web/server.py b/ooni/ui/web/server.py
index e63c08f..a862fe7 100644
--- a/ooni/ui/web/server.py
+++ b/ooni/ui/web/server.py
@@ -13,44 +13,58 @@ from werkzeug.exceptions import NotFound
from ooni import __version__ as ooniprobe_version
from ooni import errors
-from ooni.deck import Deck
+from ooni.deck import NGDeck
from ooni.settings import config
-from ooni.nettest import NetTestLoader
-from ooni.measurements import GenerateResults
-from ooni.utils import generate_filename
+from ooni.utils import log
+from ooni.director import DirectorEvent
+
+config.advanced.debug = True
def rpath(*path):
context = os.path.abspath(os.path.dirname(__file__))
return os.path.join(context, *path)
-def getNetTestLoader(test_options, test_file):
- """
- Args:
- test_options: (dict) containing as keys the option names.
-
- test_file: (string) the path to the test_file to be run.
- Returns:
- an instance of :class:`ooni.nettest.NetTestLoader` with the specified
- test_file and the specified options.
- """
- options = []
- for k, v in test_options.items():
- if v is None:
- print("Skipping %s because none" % k)
- continue
- options.append('--'+k)
- options.append(v)
-
- net_test_loader = NetTestLoader(options,
- test_file=test_file)
- return net_test_loader
-
-
class WebUIError(Exception):
def __init__(self, code, message):
self.code = code
self.message = message
+class LongPoller(object):
+ def __init__(self, timeout, _reactor=reactor):
+ self.lock = defer.DeferredLock()
+
+ self.deferred_subscribers = []
+ self._reactor = _reactor
+ self._timeout = timeout
+
+ self.timer = task.LoopingCall(
+ self.notify,
+ DirectorEvent("null", "No updates"),
+ )
+ self.timer.clock = self._reactor
+
+ def start(self):
+ self.timer.start(self._timeout)
+
+ def stop(self):
+ self.timer.stop()
+
+ def _notify(self, lock, event):
+ for d in self.deferred_subscribers[:]:
+ assert not d.called, "Deferred is already called"
+ d.callback(event)
+ self.deferred_subscribers.remove(d)
+ self.timer.reset()
+ lock.release()
+
+ def notify(self, event=None):
+ self.lock.acquire().addCallback(self._notify, event)
+
+ def get(self):
+ d = defer.Deferred()
+ self.deferred_subscribers.append(d)
+ return d
+
class WebUIAPI(object):
app = Klein()
# Maximum number in seconds after which to return a result even if not
@@ -58,7 +72,7 @@ class WebUIAPI(object):
_long_polling_timeout = 5
_reactor = reactor
- def __init__(self, config, director):
+ def __init__(self, config, director, _reactor=reactor):
self.director = director
self.config = config
self.measurement_path = FilePath(config.measurements_directory)
@@ -74,12 +88,26 @@ class WebUIAPI(object):
"director_started": False,
"failures": []
}
- self.status_updates = []
- d = self.director.start(start_tor=True)
+
+ self.status_poller = LongPoller(
+ self._long_polling_timeout, _reactor)
+ self.director_event_poller = LongPoller(
+ self._long_polling_timeout, _reactor)
+
+ # XXX move this elsewhere
+ self.director_event_poller.start()
+ self.status_poller.start()
+
+ self.director.subscribe(self.handle_director_event)
+ d = self.director.start()
d.addCallback(self.director_started)
d.addErrback(self.director_startup_failed)
- d.addBoth(lambda _: self.broadcast_status_update())
+ d.addBoth(lambda _: self.status_poller.notify())
+
+ def handle_director_event(self, event):
+ log.msg("Handling event {0}".format(event.type))
+ self.director_event_poller.notify(event)
def add_failure(self, failure):
self.status['failures'].append(str(failure))
@@ -92,26 +120,12 @@ class WebUIAPI(object):
def director_startup_failed(self, failure):
self.add_failure(failure)
- def broadcast_status_update(self):
- for su in self.status_updates:
- if not su.called:
- su.callback(None)
-
def completed_measurement(self, measurement_id):
del self.status['active_measurements'][measurement_id]
self.status['completed_measurements'].append(measurement_id)
- measurement_dir = self.measurement_path.child(measurement_id)
-
- measurement = measurement_dir.child('measurements.njson.progress')
-
- # Generate the summary.json file
- summary = measurement_dir.child('summary.json')
- gr = GenerateResults(measurement.path)
- gr.output(summary.path)
-
- measurement.moveTo(measurement_dir.child('measurements.njson'))
def failed_measurement(self, measurement_id, failure):
+ log.exception(failure)
del self.status['active_measurements'][measurement_id]
self.add_failure(str(failure))
@@ -119,8 +133,9 @@ class WebUIAPI(object):
def not_found(self, request, _):
request.redirect('/client/')
- @app.handle_error(WebUIError)
- def web_ui_error(self, request, error):
+ @app.handle_errors(WebUIError)
+ def web_ui_error(self, request, failure):
+ error = failure.value
request.setResponseCode(error.code)
return self.render_json({
"error_code": error.code,
@@ -133,24 +148,28 @@ class WebUIAPI(object):
request.setHeader('Content-Length', len(json_string))
return json_string
+ @app.route('/api/notify', methods=["GET"])
+ def api_notify(self, request):
+ def got_director_event(event):
+ return self.render_json({
+ "type": event.type,
+ "message": event.message
+ }, request)
+ d = self.director_event_poller.get()
+ d.addCallback(got_director_event)
+ return d
+
@app.route('/api/status', methods=["GET"])
def api_status(self, request):
return self.render_json(self.status, request)
@app.route('/api/status/update', methods=["GET"])
def api_status_update(self, request):
- status_update = defer.Deferred()
- status_update.addCallback(lambda _:
- self.status_updates.remove(status_update))
- status_update.addCallback(lambda _: self.api_status(request))
-
- self.status_updates.append(status_update)
-
- # After long_polling_timeout we fire the callback
- task.deferLater(self._reactor, self._long_polling_timeout,
- status_update.callback, None)
-
- return status_update
+ def got_status_update(event):
+ return self.api_status(request)
+ d = self.status_poller.get()
+ d.addCallback(got_status_update)
+ return d
@app.route('/api/deck/generate', methods=["GET"])
def api_deck_generate(self, request):
@@ -167,37 +186,23 @@ class WebUIAPI(object):
return self.render_json({"command": "deck-list"}, request)
- @defer.inlineCallbacks
def run_deck(self, deck):
- yield deck.setup()
- measurement_ids = []
- for net_test_loader in deck.netTestLoaders:
- # XXX synchronize this with startNetTest
- test_details = net_test_loader.getTestDetails()
- measurement_id = generate_filename(test_details)
-
- measurement_dir = self.measurement_path.child(measurement_id)
- measurement_dir.createDirectory()
-
- report_filename = measurement_dir.child(
- "measurements.njson.progress").path
-
- measurement_ids.append(measurement_id)
- self.status['active_measurements'][measurement_id] = {
- 'test_name': test_details['test_name'],
- 'test_start_time': test_details['test_start_time']
+ for task_id in deck.task_ids:
+ self.status['active_measurements'][task_id] = {
+ 'test_name': 'foobar',
+ 'test_start_time': 'some start time'
}
- self.broadcast_status_update()
- d = self.director.startNetTest(net_test_loader, report_filename)
- d.addCallback(lambda _:
- self.completed_measurement(measurement_id))
- d.addErrback(lambda failure:
- self.failed_measurement(measurement_id, failure))
+ self.status_poller.notify()
+ d = deck.run(self.director)
+ d.addCallback(lambda _:
+ self.completed_measurement(task_id))
+ d.addErrback(lambda failure:
+ self.failed_measurement(task_id, failure))
@app.route('/api/nettest/<string:test_name>/start', methods=["POST"])
def api_nettest_start(self, request, test_name):
try:
- net_test = self.director.netTests[test_name]
+ _ = self.director.netTests[test_name]
except KeyError:
raise WebUIError(500, 'Could not find the specified test')
@@ -206,10 +211,15 @@ class WebUIAPI(object):
except ValueError:
raise WebUIError(500, 'Invalid JSON message recevied')
- deck = Deck(no_collector=True) # XXX remove no_collector
- net_test_loader = getNetTestLoader(test_options, net_test['path'])
+ test_options["test_name"] = test_name
+ deck_data = {
+ "tasks": [
+ {"ooni": test_options}
+ ]
+ }
try:
- deck.insert(net_test_loader)
+ deck = NGDeck(no_collector=True)
+ deck.load(deck_data)
self.run_deck(deck)
except errors.MissingRequiredOption, option_name:
@@ -237,6 +247,19 @@ class WebUIAPI(object):
def api_input_list(self, request):
return self.render_json(self.director.input_store.list(), request)
+ @app.route('/api/input/<string:input_id>/content', methods=["GET"])
+ def api_input_content(self, request, input_id):
+ content = self.director.input_store.getContent(input_id)
+ request.setHeader('Content-Type', 'text/plain')
+ request.setHeader('Content-Length', len(content))
+ return content
+
+ @app.route('/api/input/<string:input_id>', methods=["GET"])
+ def api_input_details(self, request, input_id):
+ return self.render_json(
+ self.director.input_store.get(input_id), request
+ )
+
@app.route('/api/measurement', methods=["GET"])
def api_measurement_list(self, request):
measurements = []
@@ -299,7 +322,3 @@ class WebUIAPI(object):
def static(self, request):
path = rpath("client")
return static.File(path)
-<<<<<<< acda284b56fa3a75acbe7d000fbdefb643839948
-
-=======
->>>>>>> [Web UI] Refactoring of web UI
diff --git a/ooni/ui/web/web.py b/ooni/ui/web/web.py
index 40ee3b4..eca75cb 100644
--- a/ooni/ui/web/web.py
+++ b/ooni/ui/web/web.py
@@ -1,53 +1,27 @@
-import os
-
-from twisted.scripts import twistd
-from twisted.python import usage
-from twisted.internet import reactor
from twisted.web import server
+from twisted.internet import reactor
from twisted.application import service
+from ooni.ui.web.server import WebUIAPI
from ooni.settings import config
-from ooni.director import Director
-from ooni.utils import log
-
-from .server import WebUIAPI
class WebUIService(service.MultiService):
- portNum = 8822
+ def __init__(self, director, port_number=8842):
+ service.MultiService.__init__(self)
+
+ self.director = director
+ self.port_number = port_number
+
def startService(self):
service.MultiService.startService(self)
- config.set_paths()
- config.initialize_ooni_home()
- config.read_config_file()
- director = Director()
- web_ui_api = WebUIAPI(config, director)
- root = server.Site(web_ui_api.app.resource())
- self._port = reactor.listenTCP(self.portNum, root)
- d = director.start()
+
+ web_ui_api = WebUIAPI(config, self.director)
+ self._port = reactor.listenTCP(
+ self.port_number,
+ server.Site(web_ui_api.app.resource())
+ )
def stopService(self):
+ service.MultiService.stopService(self)
if self._port:
self._port.stopListening()
-
-class StartOoniprobeWebUIPlugin:
- tapname = "ooniprobe"
- def makeService(self, so):
- return WebUIService()
-
-class OoniprobeTwistdConfig(twistd.ServerOptions):
- subCommands = [("StartOoniprobeWebUI", None, usage.Options, "ooniprobe web ui")]
-
-def start():
- twistd_args = ["--nodaemon"]
- twistd_config = OoniprobeTwistdConfig()
- twistd_args.append("StartOoniprobeWebUI")
- try:
- twistd_config.parseOptions(twistd_args)
- except usage.error, ue:
- print("ooniprobe: usage error from twistd: {}\n".format(ue))
- twistd_config.loadedPlugins = {"StartOoniprobeWebUI": StartOoniprobeWebUIPlugin()}
- twistd.runApp(twistd_config)
- return 0
-
-if __name__ == "__main__":
- start()
diff --git a/ooni/utils/onion.py b/ooni/utils/onion.py
index cc2f2ff..e18a6ee 100644
--- a/ooni/utils/onion.py
+++ b/ooni/utils/onion.py
@@ -237,20 +237,6 @@ class TorLauncherWithRetries(object):
continue
setattr(new_tor_config, key, getattr(self.tor_config, key))
self.tor_config = new_tor_config
- self.timeout = timeout
-
- def _reset_tor_config(self):
- """
- This is used to reset the Tor configuration to before launch_tor
- modified it. This is in particular used to force the regeneration of the
- DataDirectory.
- """
- new_tor_config = TorConfig()
- for key in self.tor_config:
- if config.tor.data_dir is None and key == "DataDirectory":
- continue
- setattr(new_tor_config, key, getattr(self.tor_config, key))
- self.tor_config = new_tor_config
def _progress_updates(self, prog, tag, summary):
log.msg("%d%%: %s" % (prog, summary))
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits