[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [ooni-probe/master] Write ooniprobe reports in JSON format on disk
commit 1ec88b611d77048b8281d3358b20883388bd8283
Author: Arturo Filastò <arturo@xxxxxxxxxxx>
Date: Thu Jul 14 21:30:01 2016 +0200
Write ooniprobe reports in JSON format on disk
* Implement various API endpoints
---
ooni/measurements.py | 19 +++----
ooni/reporter.py | 116 ++++++++++++++++++++++++++++------------
ooni/settings.py | 3 +-
ooni/ui/web/server.py | 142 +++++++++++++++++++++++++++----------------------
ooni/ui/web/web.py | 7 ++-
ooni/utils/__init__.py | 6 ++-
6 files changed, 179 insertions(+), 114 deletions(-)
diff --git a/ooni/measurements.py b/ooni/measurements.py
index 5244ea4..976b125 100644
--- a/ooni/measurements.py
+++ b/ooni/measurements.py
@@ -9,26 +9,27 @@ class GenerateResults(object):
self.input_file = input_file
def process_web_connectivity(self, entry):
- anomaly = {}
- anomaly['result'] = False
+ result = {}
+ result['anomaly'] = False
if entry['test_keys']['blocking'] is not False:
- anomaly['result'] = True
- anomaly['url'] = entry['input']
- return anomaly
+ result['anomaly'] = True
+ result['url'] = entry['input']
+ return result
def output(self, output_file):
results = {}
with open(self.input_file) as in_file:
- for line in in_file:
+ for idx, line in enumerate(in_file):
entry = json.loads(line.strip())
if entry['test_name'] not in self.supported_tests:
raise Exception("Unsupported test")
- anomaly = getattr(self, 'process_'+entry['test_name'])(entry)
+ result = getattr(self, 'process_'+entry['test_name'])(entry)
+ result['idx'] = idx
results['test_name'] = entry['test_name']
results['country_code'] = entry['probe_cc']
results['asn'] = entry['probe_asn']
- results['anomalies'] = results.get('anomalies', [])
- results['anomalies'].append(anomaly)
+ results['results'] = results.get('results', [])
+ results['results'].append(result)
with open(output_file, "w") as fw:
json.dump(results, fw)
diff --git a/ooni/reporter.py b/ooni/reporter.py
index f76fada..f07b3cf 100644
--- a/ooni/reporter.py
+++ b/ooni/reporter.py
@@ -1,5 +1,6 @@
import uuid
import yaml
+import json
import os
from copy import deepcopy
@@ -206,6 +207,56 @@ class YAMLReporter(OReporter):
def finish(self):
self._stream.close()
+class NJSONReporter(OReporter):
+
+ """
+ report_destination:
+ the destination directory of the report
+
+ """
+
+ def __init__(self, test_details, report_filename):
+ self.report_path = report_filename
+ OReporter.__init__(self, test_details)
+
+ def _writeln(self, line):
+ self._write("%s\n" % line)
+
+ def _write(self, data):
+ if not self._stream:
+ raise errors.ReportNotCreated
+ if self._stream.closed:
+ raise errors.ReportAlreadyClosed
+ s = str(data)
+ assert isinstance(s, type(''))
+ self._stream.write(s)
+ untilConcludes(self._stream.flush)
+
+ def writeReportEntry(self, entry):
+ if isinstance(entry, Measurement):
+ e = deepcopy(entry.testInstance.report)
+ elif isinstance(entry, dict):
+ e = deepcopy(entry)
+ else:
+ raise Exception("Failed to serialise entry")
+ report_entry = {
+ 'input': e.pop('input', None),
+ 'id': str(uuid.uuid4()),
+ 'test_start_time': e.pop('test_start_time', None),
+ 'measurement_start_time': e.pop('measurement_start_time', None),
+ 'test_runtime': e.pop('test_runtime', None),
+ 'test_keys': e
+ }
+ report_entry.update(self.testDetails)
+ self._write(json.dumps(report_entry))
+ self._write("\n")
+
+ def createReport(self):
+ self._stream = open(self.report_path, 'w+')
+
+ def finish(self):
+ self._stream.close()
+
class OONIBReporter(OReporter):
@@ -219,25 +270,20 @@ class OONIBReporter(OReporter):
def serializeEntry(self, entry, serialisation_format="yaml"):
if serialisation_format == "json":
if isinstance(entry, Measurement):
- report_entry = {
- 'input': entry.testInstance.report.pop('input', None),
- 'id': str(uuid.uuid4()),
- 'test_start_time': entry.testInstance.report.pop('test_start_time', None),
- 'measurement_start_time': entry.testInstance.report.pop('measurement_start_time', None),
- 'test_runtime': entry.testInstance.report.pop('test_runtime', None),
- 'test_keys': entry.testInstance.report
- }
+ e = deepcopy(entry.testInstance.report)
+
elif isinstance(entry, dict):
- report_entry = {
- 'input': entry.pop('input', None),
- 'id': str(uuid.uuid4()),
- 'test_start_time': entry.pop('test_start_time', None),
- 'measurement_start_time': entry.pop('measurement_start_time', None),
- 'test_runtime': entry.pop('test_runtime', None),
- 'test_keys': entry
- }
+ e = deepcopy(entry)
else:
raise Exception("Failed to serialise entry")
+ report_entry = {
+ 'input': e.pop('input', None),
+ 'id': str(uuid.uuid4()),
+ 'test_start_time': e.pop('test_start_time', None),
+ 'measurement_start_time': e.pop('measurement_start_time', None),
+ 'test_runtime': e.pop('test_runtime', None),
+ 'test_keys': e
+ }
report_entry.update(self.testDetails)
return report_entry
else:
@@ -468,7 +514,7 @@ class Report(object):
def __init__(self, test_details, report_filename,
reportEntryManager, collector_client=None,
- no_yamloo=False):
+ no_njson=False):
"""
This is an abstraction layer on top of all the configured reporters.
@@ -499,9 +545,9 @@ class Report(object):
self.report_log = OONIBReportLog()
- self.yaml_reporter = None
+ self.njson_reporter = None
self.oonib_reporter = None
- self.no_yamloo = no_yamloo
+ self.no_njson = no_njson
self.done = defer.Deferred()
self.reportEntryManager = reportEntryManager
@@ -509,7 +555,7 @@ class Report(object):
def generateReportFilename(self):
report_filename = generate_filename(self.test_details,
prefix='report',
- extension='yamloo')
+ extension='njson')
report_path = os.path.join('.', report_filename)
return os.path.abspath(report_path)
@@ -543,12 +589,12 @@ class Report(object):
self.collector_client)
self.test_details['report_id'] = yield self.open_oonib_reporter()
- if not self.no_yamloo:
- self.yaml_reporter = YAMLReporter(self.test_details,
- self.report_filename)
+ if not self.no_njson:
+ self.njson_reporter = NJSONReporter(self.test_details,
+ self.report_filename)
if not self.oonib_reporter:
yield self.report_log.not_created(self.report_filename)
- yield defer.maybeDeferred(self.yaml_reporter.createReport)
+ yield defer.maybeDeferred(self.njson_reporter.createReport)
defer.returnValue(self.reportId)
@@ -570,7 +616,7 @@ class Report(object):
d = defer.Deferred()
deferreds = []
- def yaml_report_failed(failure):
+ def njson_report_failed(failure):
d.errback(failure)
def oonib_report_failed(failure):
@@ -580,11 +626,11 @@ class Report(object):
if not d.called:
d.callback(None)
- if self.yaml_reporter:
- write_yaml_report = ReportEntry(self.yaml_reporter, measurement)
- self.reportEntryManager.schedule(write_yaml_report)
- write_yaml_report.done.addErrback(yaml_report_failed)
- deferreds.append(write_yaml_report.done)
+ if self.njson_reporter:
+ write_njson_report = ReportEntry(self.njson_reporter, measurement)
+ self.reportEntryManager.schedule(write_njson_report)
+ write_njson_report.done.addErrback(njson_report_failed)
+ deferreds.append(write_njson_report.done)
if self.oonib_reporter:
write_oonib_report = ReportEntry(self.oonib_reporter, measurement)
@@ -609,7 +655,7 @@ class Report(object):
d = defer.Deferred()
deferreds = []
- def yaml_report_failed(failure):
+ def njson_report_failed(failure):
d.errback(failure)
def oonib_report_closed(result):
@@ -623,10 +669,10 @@ class Report(object):
if not d.called:
d.callback(None)
- if self.yaml_reporter:
- close_yaml = defer.maybeDeferred(self.yaml_reporter.finish)
- close_yaml.addErrback(yaml_report_failed)
- deferreds.append(close_yaml)
+ if self.njson_reporter:
+ close_njson = defer.maybeDeferred(self.njson_reporter.finish)
+ close_njson.addErrback(njson_report_failed)
+ deferreds.append(close_njson)
if self.oonib_reporter:
close_oonib = self.oonib_reporter.finish()
diff --git a/ooni/settings.py b/ooni/settings.py
index ffbf68e..5245451 100644
--- a/ooni/settings.py
+++ b/ooni/settings.py
@@ -106,7 +106,8 @@ class OConfig(object):
else:
self.decks_directory = os.path.join(self.ooni_home, 'decks')
- self.reports_directory = os.path.join(self.ooni_home, 'reports')
+ self.measurements_directory = os.path.join(self.ooni_home,
+ 'measurements')
self.resources_directory = os.path.join(self.data_directory,
"resources")
if self.advanced.report_log_file:
diff --git a/ooni/ui/web/server.py b/ooni/ui/web/server.py
index 5be581c..ccc5d87 100644
--- a/ooni/ui/web/server.py
+++ b/ooni/ui/web/server.py
@@ -3,34 +3,20 @@ from __future__ import print_function
import os
import json
+from twisted.internet import defer
from twisted.python import usage
from twisted.python.filepath import FilePath, InsecurePath
from twisted.web import static
from klein import Klein
+from werkzeug.exceptions import NotFound
-from ooni.settings import config
from ooni import errors
+from ooni.deck import Deck
+from ooni.settings import config
from ooni.nettest import NetTestLoader
from ooni.measurements import GenerateResults
-
-class RouteNotFound(Exception):
- def __init__(self, path, method):
- self._path = path
- self._method = method
-
- def __repr__(self):
- return "<RouteNotFound {0} {1}>".format(self._path,
- self._method)
-
-def _resolvePath(request):
- path = b''
- if request.postpath:
- path = b'/'.join(request.postpath)
-
- if not path.startswith(b'/'):
- path = b'/' + path
- return path
+from ooni.utils import generate_filename
def rpath(*path):
context = os.path.abspath(os.path.dirname(__file__))
@@ -48,6 +34,9 @@ def getNetTestLoader(test_options, test_file):
"""
options = []
for k, v in test_options.items():
+ if v is None:
+ print("Skipping %s because none" % k)
+ continue
options.append('--'+k)
options.append(v)
@@ -61,6 +50,11 @@ class WebUIAPI(object):
def __init__(self, config, director):
self.director = director
self.config = config
+ self.active_measurements = {}
+
+ @app.handle_errors(NotFound)
+ def not_found(self, request, _):
+ request.redirect('/client/')
def render_json(self, obj, request):
json_string = json.dumps(obj) + "\n"
@@ -68,28 +62,43 @@ class WebUIAPI(object):
request.setHeader('Content-Length', len(json_string))
return json_string
- @app.route('/api/decks/generate', methods=["GET"])
- def generate_decks(self, request):
+ @app.route('/api/deck/generate', methods=["GET"])
+ def api_deck_generate(self, request):
return self.render_json({"generate": "deck"}, request)
- @app.route('/api/decks/<string:deck_name>/start', methods=["POST"])
- def start_deck(self, request, deck_name):
+ @app.route('/api/deck/<string:deck_name>/start', methods=["POST"])
+ def api_deck_start(self, request, deck_name):
return self.render_json({"start": deck_name}, request)
- @app.route('/api/decks/<string:deck_name>/stop', methods=["POST"])
- def stop_deck(self, request, deck_name):
- return self.render_json({"stop": deck_name}, request)
-
- @app.route('/api/decks/<string:deck_name>', methods=["GET"])
- def deck_status(self, request, deck_name):
- return self.render_json({"status": deck_name}, request)
-
- @app.route('/api/decks', methods=["GET"])
- def deck_list(self, request):
+ @app.route('/api/deck', methods=["GET"])
+ def api_deck_list(self, request):
return self.render_json({"command": "deck-list"}, request)
- @app.route('/api/net-tests/<string:test_name>/start', methods=["POST"])
- def test_start(self, request, test_name):
+ @defer.inlineCallbacks
+ def run_deck(self, deck):
+ yield deck.setup()
+ measurement_ids = []
+ for net_test_loader in deck.netTestLoaders:
+ # XXX synchronize this with startNetTest
+ test_details = net_test_loader.getTestDetails()
+ measurement_id = generate_filename(test_details)
+
+ measurement_dir = os.path.join(
+ config.measurements_directory,
+ measurement_id
+ )
+ os.mkdir(measurement_dir)
+ report_filename = os.path.join(measurement_dir,
+ "measurements.njson")
+ measurement_ids.append(measurement_id)
+ self.active_measurements[measurement_id] = {
+ 'test_name': test_details['test_name'],
+ 'test_start_time': test_details['test_start_time']
+ }
+ self.director.startNetTest(net_test_loader, report_filename)
+
+ @app.route('/api/nettest/<string:test_name>/start', methods=["POST"])
+ def api_nettest_start(self, request, test_name):
try:
net_test = self.director.netTests[test_name]
except KeyError:
@@ -99,21 +108,19 @@ class WebUIAPI(object):
'error_message': 'Could not find the specified test'
}, request)
try:
- test_options = json.load(request.content.read())
+ test_options = json.load(request.content)
except ValueError:
return self.render_json({
'error_code': 500,
'error_message': 'Invalid JSON message recevied'
}, request)
+ deck = Deck(no_collector=True) # XXX remove no_collector
net_test_loader = getNetTestLoader(test_options, net_test['path'])
try:
- net_test_loader.checkOptions()
- # XXX we actually want to generate the report_filename in a smart
- # way so that we can know where it is located and learn the results
- # of the measurement.
- report_filename = None
- self.director.startNetTest(net_test_loader, report_filename)
+ deck.insert(net_test_loader)
+ self.run_deck(deck)
+
except errors.MissingRequiredOption, option_name:
request.setResponseCode(500)
return self.render_json({
@@ -134,25 +141,18 @@ class WebUIAPI(object):
'error_message': 'Insufficient priviledges'
}, request)
- return self.render_json({"deck": "list"}, request)
-
- @app.route('/api/net-tests/<string:test_name>/start', methods=["POST"])
- def test_stop(self, request, test_name):
- return self.render_json({
- "command": "test-stop",
- "test-name": test_name
- }, request)
-
- @app.route('/api/net-tests/<string:test_name>', methods=["GET"])
- def test_status(self, request, test_name):
- return self.render_json({"command": "test-stop"}, request)
+ return self.render_json({"status": "started"}, request)
- @app.route('/api/net-tests', methods=["GET"])
- def test_list(self, request):
+ @app.route('/api/nettest', methods=["GET"])
+ def api_nettest_list(self, request):
return self.render_json(self.director.netTests, request)
+ @app.route('/api/status', methods=["GET"])
+ def api_status(self):
+ return self.render_json()
+
@app.route('/api/measurement', methods=["GET"])
- def measurement_list(self, request):
+ def api_measurement_list(self, request):
measurement_ids = os.listdir(os.path.join(config.ooni_home,
"measurements"))
measurements = []
@@ -169,8 +169,8 @@ class WebUIAPI(object):
return self.render_json({"measurements": measurements}, request)
@app.route('/api/measurement/<string:measurement_id>', methods=["GET"])
- def measurement_summary(self, request, measurement_id):
- measurement_path = FilePath(config.ooni_home).child("measurements")
+ def api_measurement_summary(self, request, measurement_id):
+ measurement_path = FilePath(config.measurements_directory)
try:
measurement_dir = measurement_path.child(measurement_id)
except InsecurePath:
@@ -189,13 +189,29 @@ class WebUIAPI(object):
@app.route('/api/measurement/<string:measurement_id>/<int:idx>',
methods=["GET"])
- def measurement_open(self, request, measurement_id, idx):
- return self.render_json({"command": "results"}, request)
+ def api_measurement_view(self, request, measurement_id, idx):
+ measurement_path = FilePath(config.measurements_directory)
+ try:
+ measurement_dir = measurement_path.child(measurement_id)
+ except InsecurePath:
+ return self.render_json({"error": "invalid measurement id"})
+ measurements = measurement_dir.child("measurements.njson")
+
+ # XXX maybe implement some caching here
+ with measurements.open("r") as f:
+ r = None
+ for f_idx, line in enumerate(f):
+ if f_idx == idx:
+ r = json.loads(line)
+ break
+ if r is None:
+ return self.render_json({"error": "Could not find measurement "
+ "with this idx"}, request)
+ return self.render_json(r, request)
@app.route('/client/', branch=True)
def static(self, request):
- path = rpath("build")
- print(path)
+ path = rpath("client")
return static.File(path)
<<<<<<< acda284b56fa3a75acbe7d000fbdefb643839948
diff --git a/ooni/ui/web/web.py b/ooni/ui/web/web.py
index f709c18..6c6971c 100644
--- a/ooni/ui/web/web.py
+++ b/ooni/ui/web/web.py
@@ -24,10 +24,9 @@ class WebUIService(service.MultiService):
root = server.Site(WebUIAPI(config, director).app.resource())
self._port = reactor.listenTCP(self.portNum, root)
director = Director()
- #d = director.start()
- #d.addCallback(_started)
- #d.addErrback(self._startupFailed)
- _started(None)
+ d = director.start()
+ d.addCallback(_started)
+ d.addErrback(self._startupFailed)
def _startupFailed(self, err):
log.err("Failed to start the director")
diff --git a/ooni/utils/__init__.py b/ooni/utils/__init__.py
index b28aadb..1aaac7b 100644
--- a/ooni/utils/__init__.py
+++ b/ooni/utils/__init__.py
@@ -97,18 +97,20 @@ def generate_filename(test_details, prefix=None, extension=None):
extension.
"""
LONG_DATE = "%Y-%m-%d %H:%M:%S"
- SHORT_DATE = "%Y-%m-%dT%H%M%SZ"
+ SHORT_DATE = "%Y%m%dT%H%M%SZ"
kwargs = {}
filename_format = ""
if prefix is not None:
kwargs["prefix"] = prefix
filename_format += "{prefix}-"
- filename_format += "{test_name}-{timestamp}"
+ filename_format += "{timestamp}-{probe_cc}-{probe_asn}-{test_name}"
if extension is not None:
kwargs["extension"] = extension
filename_format += ".{extension}"
kwargs['test_name'] = test_details['test_name']
+ kwargs['probe_cc'] = test_details['probe_cc']
+ kwargs['probe_asn'] = test_details['probe_asn']
kwargs['timestamp'] = datetime.strptime(test_details['test_start_time'],
LONG_DATE).strftime(SHORT_DATE)
return filename_format.format(**kwargs)
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits