[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [ooni-probe/master] Implement tasks and add new APIs
commit c71375eed5db6dd1e11881c7045c2392c68aa42c
Author: Arturo Filastò <arturo@xxxxxxxxxxx>
Date: Fri Jul 29 00:20:45 2016 +0200
Implement tasks and add new APIs
* Add new API endpoints for deleting and keeping reports
* Implement tasks for uploading not submitted reports
* Implement tasks for cleaning up old reports
---
ooni/agent/agent.py | 11 +++++++
ooni/agent/scheduler.py | 69 ++++++++++++++++++++++++++++++++++++-------
ooni/deck/deck.py | 2 +-
ooni/measurements.py | 67 +++++++++++++++++++++++++++++++++++++++++
ooni/results.py | 40 -------------------------
ooni/scripts/oonireport.py | 8 +++++
ooni/ui/web/client/index.html | 2 +-
ooni/ui/web/server.py | 40 +++++++++++++++++++++++--
ooni/utils/__init__.py | 6 ++--
9 files changed, 188 insertions(+), 57 deletions(-)
diff --git a/ooni/agent/agent.py b/ooni/agent/agent.py
index a1394f0..f5322ed 100644
--- a/ooni/agent/agent.py
+++ b/ooni/agent/agent.py
@@ -1,6 +1,7 @@
from twisted.application import service
from ooni.director import Director
from ooni.settings import config
+from ooni.utils import log
from ooni.ui.web.web import WebUIService
from ooni.agent.scheduler import SchedulerService
@@ -19,3 +20,13 @@ class AgentService(service.MultiService):
self.scheduler_service = SchedulerService(director)
self.scheduler_service.setServiceParent(self)
+
+ def startService(self):
+ service.MultiService.startService(self)
+
+ log.start()
+
+ def stopService(self):
+ service.MultiService.stopService(self)
+
+ log.stop()
diff --git a/ooni/agent/scheduler.py b/ooni/agent/scheduler.py
index 1288d6c..167a14a 100644
--- a/ooni/agent/scheduler.py
+++ b/ooni/agent/scheduler.py
@@ -4,12 +4,17 @@ from twisted.application import service
from twisted.internet import task, defer
from twisted.python.filepath import FilePath
+from ooni.scripts import oonireport
from ooni import resources
-from ooni.utils import log
+from ooni.utils import log, SHORT_DATE
from ooni.deck.store import input_store
from ooni.settings import config
from ooni.contrib import croniter
from ooni.geoip import probe_ip
+from ooni.measurements import list_measurements
+
+class DidNotRun(Exception):
+ pass
class ScheduledTask(object):
_time_format = "%Y-%m-%dT%H:%M:%SZ"
@@ -58,7 +63,7 @@ class ScheduledTask(object):
yield self._last_run_lock.deferUntilLocked()
if not self.should_run:
self._last_run_lock.unlock()
- defer.returnValue(None)
+ raise DidNotRun
try:
yield self.task()
self._update_last_run()
@@ -68,7 +73,7 @@ class ScheduledTask(object):
self._last_run_lock.unlock()
class UpdateInputsAndResources(ScheduledTask):
- identifier = "ooni-update-inputs"
+ identifier = "update-inputs"
schedule = "@daily"
@defer.inlineCallbacks
@@ -78,13 +83,50 @@ class UpdateInputsAndResources(ScheduledTask):
yield resources.check_for_update(probe_ip.geodata['countrycode'])
yield input_store.update(probe_ip.geodata['countrycode'])
-class CleanupInProgressReports(ScheduledTask):
- identifier = 'ooni-cleanup-reports'
+
+class UploadReports(ScheduledTask):
+ """
+ This task is used to submit to the collector reports that have not been
+ submitted and those that have been partially uploaded.
+ """
+ identifier = 'upload-reports'
+ schedule = '@hourly'
+
+ @defer.inlineCallbacks
+ def task(self):
+ yield oonireport.upload_all(upload_incomplete=True)
+
+
+class DeleteOldReports(ScheduledTask):
+ """
+ This task is used to delete reports that are older than a week.
+ """
+ identifier = 'delete-old-reports'
schedule = '@daily'
-class UploadMissingReports(ScheduledTask):
- identifier = 'ooni-cleanup-reports'
- schedule = '@weekly'
+ def task(self):
+ measurement_path = FilePath(config.measurements_directory)
+ for measurement in list_measurements():
+ if measurement['keep'] is True:
+ continue
+ delta = datetime.utcnow() - \
+ datetime.strptime(measurement['test_start_time'],
+ SHORT_DATE)
+ if delta.days >= 7:
+ log.debug("Deleting old report {0}".format(measurement["id"]))
+ measurement_path.child(measurement['id']).remove()
+
+class SendHeartBeat(ScheduledTask):
+ """
+ This task is used to send a heartbeat that the probe is still alive and
+ well.
+ """
+ identifier = 'send-heartbeat'
+ schedule = '@hourly'
+
+ def task(self):
+ # XXX implement this
+ pass
# Order mattters
SYSTEM_TASKS = [
@@ -122,8 +164,12 @@ class SchedulerService(service.MultiService):
def schedule(self, task):
self._scheduled_tasks.append(task)
+ def _task_did_not_run(self, failure, task):
+ failure.trap(DidNotRun)
+ log.debug("Did not run {0}".format(task.identifier))
+
def _task_failed(self, failure, task):
- log.debug("Failed to run {0}".format(task.identifier))
+ log.err("Failed to run {0}".format(task.identifier))
log.exception(failure)
def _task_success(self, result, task):
@@ -137,13 +183,16 @@ class SchedulerService(service.MultiService):
for task in self._scheduled_tasks:
log.debug("Running task {0}".format(task.identifier))
d = task.run()
- d.addErrback(self._task_failed, task)
+ d.addErrback(self._task_did_not_run, task)
d.addCallback(self._task_success, task)
+ d.addErrback(self._task_failed, task)
def startService(self):
service.MultiService.startService(self)
self.schedule(UpdateInputsAndResources())
+ self.schedule(UploadReports())
+ self.schedule(DeleteOldReports())
self._looping_call.start(self.interval)
diff --git a/ooni/deck/deck.py b/ooni/deck/deck.py
index c9b3fc5..868bfb8 100644
--- a/ooni/deck/deck.py
+++ b/ooni/deck/deck.py
@@ -13,7 +13,7 @@ from ooni.deck.legacy import convert_legacy_deck
from ooni.deck.store import input_store
from ooni.geoip import probe_ip
from ooni.nettest import NetTestLoader, nettest_to_path
-from ooni.results import generate_summary
+from ooni.measurements import generate_summary
from ooni.settings import config
from ooni.utils import log, generate_filename
diff --git a/ooni/measurements.py b/ooni/measurements.py
new file mode 100644
index 0000000..f830a99
--- /dev/null
+++ b/ooni/measurements.py
@@ -0,0 +1,67 @@
+import json
+from twisted.python.filepath import FilePath
+from ooni.settings import config
+
+class Process():
+ supported_tests = [
+ "web_connectivity"
+ ]
+ @staticmethod
+ def web_connectivity(entry):
+ result = {}
+ result['anomaly'] = False
+ if entry['test_keys']['blocking'] is not False:
+ result['anomaly'] = True
+ result['url'] = entry['input']
+ return result
+
+def generate_summary(input_file, output_file):
+ results = {}
+ with open(input_file) as in_file:
+ for idx, line in enumerate(in_file):
+ entry = json.loads(line.strip())
+ result = {}
+ if entry['test_name'] in Process.supported_tests:
+ result = getattr(Process, entry['test_name'])(entry)
+ result['idx'] = idx
+ results['test_name'] = entry['test_name']
+ results['test_start_time'] = entry['test_start_time']
+ results['country_code'] = entry['probe_cc']
+ results['asn'] = entry['probe_asn']
+ results['results'] = results.get('results', [])
+ results['results'].append(result)
+
+ with open(output_file, "w") as fw:
+ json.dump(results, fw)
+
+
+def list_measurements():
+ measurements = []
+ measurement_path = FilePath(config.measurements_directory)
+ for measurement_id in measurement_path.listdir():
+ measurement = measurement_path.child(measurement_id)
+ completed = True
+ keep = False
+ if measurement.child("measurement.njson.progress").exists():
+ completed = False
+ if measurement.child("keep").exists():
+ keep = True
+ test_start_time, country_code, asn, test_name = \
+ measurement_id.split("-")[:4]
+ measurements.append({
+ "test_name": test_name,
+ "country_code": country_code,
+ "asn": asn,
+ "test_start_time": test_start_time,
+ "id": measurement_id,
+ "completed": completed,
+ "keep": keep
+ })
+ return measurements
+
+if __name__ == "__main__":
+ import sys
+ if len(sys.argv) != 3:
+ print("Usage: {0} [input_file] [output_file]".format(sys.argv[0]))
+ sys.exit(1)
+ generate_summary(sys.argv[1], sys.argv[2])
diff --git a/ooni/results.py b/ooni/results.py
deleted file mode 100644
index 39477a1..0000000
--- a/ooni/results.py
+++ /dev/null
@@ -1,40 +0,0 @@
-import json
-
-class Process():
- supported_tests = [
- "web_connectivity"
- ]
- @staticmethod
- def web_connectivity(entry):
- result = {}
- result['anomaly'] = False
- if entry['test_keys']['blocking'] is not False:
- result['anomaly'] = True
- result['url'] = entry['input']
- return result
-
-def generate_summary(input_file, output_file):
- results = {}
- with open(input_file) as in_file:
- for idx, line in enumerate(in_file):
- entry = json.loads(line.strip())
- result = {}
- if entry['test_name'] in Process.supported_tests:
- result = getattr(Process, entry['test_name'])(entry)
- result['idx'] = idx
- results['test_name'] = entry['test_name']
- results['test_start_time'] = entry['test_start_time']
- results['country_code'] = entry['probe_cc']
- results['asn'] = entry['probe_asn']
- results['results'] = results.get('results', [])
- results['results'].append(result)
-
- with open(output_file, "w") as fw:
- json.dump(results, fw)
-
-if __name__ == "__main__":
- import sys
- if len(sys.argv) != 3:
- print("Usage: {0} [input_file] [output_file]".format(sys.argv[0]))
- sys.exit(1)
- generate_summary(sys.argv[1], sys.argv[2])
diff --git a/ooni/scripts/oonireport.py b/ooni/scripts/oonireport.py
index f59a843..13d8473 100644
--- a/ooni/scripts/oonireport.py
+++ b/ooni/scripts/oonireport.py
@@ -122,6 +122,14 @@ def upload_all(collector=None, bouncer=None, upload_incomplete=False):
except Exception as exc:
log.exception(exc)
+ if upload_incomplete:
+ reports_to_upload = yield oonib_report_log.get_incomplete()
+ for report_file, value in reports_to_upload:
+ try:
+ yield upload(report_file, collector, bouncer,
+ value['measurement_id'])
+ except Exception as exc:
+ log.exception(exc)
def print_report(report_file, value):
print("* %s" % report_file)
diff --git a/ooni/ui/web/client/index.html b/ooni/ui/web/client/index.html
index c6d83b7..7ba33fb 100644
--- a/ooni/ui/web/client/index.html
+++ b/ooni/ui/web/client/index.html
@@ -13,5 +13,5 @@
<app>
Loading...
</app>
- <script type="text/javascript" src="app.bundle.js?16bac0b4c21c5b120b04"></script></body>
+ <script type="text/javascript" src="app.bundle.js?afba2f26969b4c8f00ec"></script></body>
</html>
diff --git a/ooni/ui/web/server.py b/ooni/ui/web/server.py
index a6fd315..9d98e62 100644
--- a/ooni/ui/web/server.py
+++ b/ooni/ui/web/server.py
@@ -20,7 +20,7 @@ from ooni.deck import NGDeck
from ooni.settings import config
from ooni.utils import log
from ooni.director import DirectorEvent
-from ooni.results import generate_summary
+from ooni.measurements import generate_summary
from ooni.geoip import probe_ip
config.advanced.debug = True
@@ -64,7 +64,8 @@ def xsrf_protect(check=True):
if (token_cookie != instance._xsrf_token and
instance._enable_xsrf_protection):
request.addCookie(u'XSRF-TOKEN',
- instance._xsrf_token)
+ instance._xsrf_token,
+ path=u'/')
if should_check and token_cookie != token_header:
raise WebUIError(404, "Invalid XSRF token")
return f(instance, request, *a, **kw)
@@ -353,6 +354,41 @@ class WebUIAPI(object):
return self.render_json(r, request)
+ @app.route('/api/measurement/<string:measurement_id>', methods=["DELETE"])
+ @xsrf_protect(check=True)
+ def api_measurement_delete(self, request, measurement_id):
+ try:
+ measurement_dir = self.measurement_path.child(measurement_id)
+ except InsecurePath:
+ raise WebUIError(500, "invalid measurement id")
+
+ if measurement_dir.child("measurements.njson.progress").exists():
+ raise WebUIError(400, "measurement in progress")
+
+ try:
+ measurement_dir.remove()
+ except:
+ raise WebUIError(400, "Failed to delete report")
+
+ return self.render_json({"result": "ok"}, request)
+
+ @app.route('/api/measurement/<string:measurement_id>/keep', methods=["POST"])
+ @xsrf_protect(check=True)
+ def api_measurement_keep(self, request, measurement_id):
+ try:
+ measurement_dir = self.measurement_path.child(measurement_id)
+ except InsecurePath:
+ raise WebUIError(500, "invalid measurement id")
+
+ if measurement_dir.child("measurements.njson.progress").exists():
+ raise WebUIError(400, "measurement in progress")
+
+ summary = measurement_dir.child("keep")
+ with summary.open("w+") as f:
+ pass
+
+ return self.render_json({"result": "ok"}, request)
+
@app.route('/api/measurement/<string:measurement_id>/<int:idx>',
methods=["GET"])
@xsrf_protect(check=False)
diff --git a/ooni/utils/__init__.py b/ooni/utils/__init__.py
index f8ee953..35d419d 100644
--- a/ooni/utils/__init__.py
+++ b/ooni/utils/__init__.py
@@ -92,6 +92,9 @@ def randomStr(length, num=True):
chars += string.digits
return ''.join(random.choice(chars) for x in range(length))
+LONG_DATE = "%Y-%m-%d %H:%M:%S"
+SHORT_DATE = "%Y%m%dT%H%M%SZ"
+
def generate_filename(test_details, prefix=None, extension=None):
"""
Returns a filename for every test execution.
@@ -99,9 +102,6 @@ def generate_filename(test_details, prefix=None, extension=None):
It's used to assure that all files of a certain test have a common basename but different
extension.
"""
- LONG_DATE = "%Y-%m-%d %H:%M:%S"
- SHORT_DATE = "%Y%m%dT%H%M%SZ"
-
kwargs = {}
filename_format = ""
if prefix is not None:
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits