[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [ooni-probe/master] Implement tasks and add new APIs



commit c71375eed5db6dd1e11881c7045c2392c68aa42c
Author: Arturo Filastò <arturo@xxxxxxxxxxx>
Date:   Fri Jul 29 00:20:45 2016 +0200

    Implement tasks and add new APIs
    
    * Add new API endpoints for deleting and keeping reports
    
    * Implement tasks for uploading not submitted reports
    
    * Implement tasks for cleaning up old reports
---
 ooni/agent/agent.py           | 11 +++++++
 ooni/agent/scheduler.py       | 69 ++++++++++++++++++++++++++++++++++++-------
 ooni/deck/deck.py             |  2 +-
 ooni/measurements.py          | 67 +++++++++++++++++++++++++++++++++++++++++
 ooni/results.py               | 40 -------------------------
 ooni/scripts/oonireport.py    |  8 +++++
 ooni/ui/web/client/index.html |  2 +-
 ooni/ui/web/server.py         | 40 +++++++++++++++++++++++--
 ooni/utils/__init__.py        |  6 ++--
 9 files changed, 188 insertions(+), 57 deletions(-)

diff --git a/ooni/agent/agent.py b/ooni/agent/agent.py
index a1394f0..f5322ed 100644
--- a/ooni/agent/agent.py
+++ b/ooni/agent/agent.py
@@ -1,6 +1,7 @@
 from twisted.application import service
 from ooni.director import Director
 from ooni.settings import config
+from ooni.utils import log
 
 from ooni.ui.web.web import WebUIService
 from ooni.agent.scheduler import SchedulerService
@@ -19,3 +20,13 @@ class AgentService(service.MultiService):
 
         self.scheduler_service = SchedulerService(director)
         self.scheduler_service.setServiceParent(self)
+
+    def startService(self):
+        service.MultiService.startService(self)
+
+        log.start()
+
+    def stopService(self):
+        service.MultiService.stopService(self)
+
+        log.stop()
diff --git a/ooni/agent/scheduler.py b/ooni/agent/scheduler.py
index 1288d6c..167a14a 100644
--- a/ooni/agent/scheduler.py
+++ b/ooni/agent/scheduler.py
@@ -4,12 +4,17 @@ from twisted.application import service
 from twisted.internet import task, defer
 from twisted.python.filepath import FilePath
 
+from ooni.scripts import oonireport
 from ooni import resources
-from ooni.utils import log
+from ooni.utils import log, SHORT_DATE
 from ooni.deck.store import input_store
 from ooni.settings import config
 from ooni.contrib import croniter
 from ooni.geoip import probe_ip
+from ooni.measurements import list_measurements
+
+class DidNotRun(Exception):
+    pass
 
 class ScheduledTask(object):
     _time_format = "%Y-%m-%dT%H:%M:%SZ"
@@ -58,7 +63,7 @@ class ScheduledTask(object):
         yield self._last_run_lock.deferUntilLocked()
         if not self.should_run:
             self._last_run_lock.unlock()
-            defer.returnValue(None)
+            raise DidNotRun
         try:
             yield self.task()
             self._update_last_run()
@@ -68,7 +73,7 @@ class ScheduledTask(object):
             self._last_run_lock.unlock()
 
 class UpdateInputsAndResources(ScheduledTask):
-    identifier = "ooni-update-inputs"
+    identifier = "update-inputs"
     schedule = "@daily"
 
     @defer.inlineCallbacks
@@ -78,13 +83,50 @@ class UpdateInputsAndResources(ScheduledTask):
         yield resources.check_for_update(probe_ip.geodata['countrycode'])
         yield input_store.update(probe_ip.geodata['countrycode'])
 
-class CleanupInProgressReports(ScheduledTask):
-    identifier = 'ooni-cleanup-reports'
+
+class UploadReports(ScheduledTask):
+    """
+    This task is used to submit to the collector reports that have not been
+    submitted and those that have been partially uploaded.
+    """
+    identifier = 'upload-reports'
+    schedule = '@hourly'
+
+    @defer.inlineCallbacks
+    def task(self):
+        yield oonireport.upload_all(upload_incomplete=True)
+
+
+class DeleteOldReports(ScheduledTask):
+    """
+    This task is used to delete reports that are older than a week.
+    """
+    identifier = 'delete-old-reports'
     schedule = '@daily'
 
-class UploadMissingReports(ScheduledTask):
-    identifier = 'ooni-cleanup-reports'
-    schedule = '@weekly'
+    def task(self):
+        measurement_path = FilePath(config.measurements_directory)
+        for measurement in list_measurements():
+            if measurement['keep'] is True:
+                continue
+            delta = datetime.utcnow() - \
+                    datetime.strptime(measurement['test_start_time'],
+                                      SHORT_DATE)
+            if delta.days >= 7:
+                log.debug("Deleting old report {0}".format(measurement["id"]))
+                measurement_path.child(measurement['id']).remove()
+
+class SendHeartBeat(ScheduledTask):
+    """
+    This task is used to send a heartbeat that the probe is still alive and
+    well.
+    """
+    identifier = 'send-heartbeat'
+    schedule = '@hourly'
+
+    def task(self):
+        # XXX implement this
+        pass
 
 # Order mattters
 SYSTEM_TASKS = [
@@ -122,8 +164,12 @@ class SchedulerService(service.MultiService):
     def schedule(self, task):
         self._scheduled_tasks.append(task)
 
+    def _task_did_not_run(self, failure, task):
+        failure.trap(DidNotRun)
+        log.debug("Did not run {0}".format(task.identifier))
+
     def _task_failed(self, failure, task):
-        log.debug("Failed to run {0}".format(task.identifier))
+        log.err("Failed to run {0}".format(task.identifier))
         log.exception(failure)
 
     def _task_success(self, result, task):
@@ -137,13 +183,16 @@ class SchedulerService(service.MultiService):
         for task in self._scheduled_tasks:
             log.debug("Running task {0}".format(task.identifier))
             d = task.run()
-            d.addErrback(self._task_failed, task)
+            d.addErrback(self._task_did_not_run, task)
             d.addCallback(self._task_success, task)
+            d.addErrback(self._task_failed, task)
 
     def startService(self):
         service.MultiService.startService(self)
 
         self.schedule(UpdateInputsAndResources())
+        self.schedule(UploadReports())
+        self.schedule(DeleteOldReports())
 
         self._looping_call.start(self.interval)
 
diff --git a/ooni/deck/deck.py b/ooni/deck/deck.py
index c9b3fc5..868bfb8 100644
--- a/ooni/deck/deck.py
+++ b/ooni/deck/deck.py
@@ -13,7 +13,7 @@ from ooni.deck.legacy import convert_legacy_deck
 from ooni.deck.store import input_store
 from ooni.geoip import probe_ip
 from ooni.nettest import NetTestLoader, nettest_to_path
-from ooni.results import generate_summary
+from ooni.measurements import generate_summary
 from ooni.settings import config
 from ooni.utils import log, generate_filename
 
diff --git a/ooni/measurements.py b/ooni/measurements.py
new file mode 100644
index 0000000..f830a99
--- /dev/null
+++ b/ooni/measurements.py
@@ -0,0 +1,67 @@
+import json
+from twisted.python.filepath import FilePath
+from ooni.settings import config
+
+class Process():
+    supported_tests = [
+        "web_connectivity"
+    ]
+    @staticmethod
+    def web_connectivity(entry):
+        result = {}
+        result['anomaly'] = False
+        if entry['test_keys']['blocking'] is not False:
+            result['anomaly'] = True
+        result['url'] = entry['input']
+        return result
+
+def generate_summary(input_file, output_file):
+    results = {}
+    with open(input_file) as in_file:
+        for idx, line in enumerate(in_file):
+            entry = json.loads(line.strip())
+            result = {}
+            if entry['test_name'] in Process.supported_tests:
+                result = getattr(Process, entry['test_name'])(entry)
+            result['idx'] = idx
+            results['test_name'] = entry['test_name']
+            results['test_start_time'] = entry['test_start_time']
+            results['country_code'] = entry['probe_cc']
+            results['asn'] = entry['probe_asn']
+            results['results'] = results.get('results', [])
+            results['results'].append(result)
+
+    with open(output_file, "w") as fw:
+        json.dump(results, fw)
+
+
+def list_measurements():
+    measurements = []
+    measurement_path = FilePath(config.measurements_directory)
+    for measurement_id in measurement_path.listdir():
+        measurement = measurement_path.child(measurement_id)
+        completed = True
+        keep = False
+        if measurement.child("measurement.njson.progress").exists():
+            completed = False
+        if measurement.child("keep").exists():
+            keep = True
+        test_start_time, country_code, asn, test_name = \
+            measurement_id.split("-")[:4]
+        measurements.append({
+            "test_name": test_name,
+            "country_code": country_code,
+            "asn": asn,
+            "test_start_time": test_start_time,
+            "id": measurement_id,
+            "completed": completed,
+            "keep": keep
+        })
+    return measurements
+
+if __name__ == "__main__":
+    import sys
+    if len(sys.argv) != 3:
+        print("Usage: {0} [input_file] [output_file]".format(sys.argv[0]))
+        sys.exit(1)
+    generate_summary(sys.argv[1], sys.argv[2])
diff --git a/ooni/results.py b/ooni/results.py
deleted file mode 100644
index 39477a1..0000000
--- a/ooni/results.py
+++ /dev/null
@@ -1,40 +0,0 @@
-import json
-
-class Process():
-    supported_tests = [
-        "web_connectivity"
-    ]
-    @staticmethod
-    def web_connectivity(entry):
-        result = {}
-        result['anomaly'] = False
-        if entry['test_keys']['blocking'] is not False:
-            result['anomaly'] = True
-        result['url'] = entry['input']
-        return result
-
-def generate_summary(input_file, output_file):
-    results = {}
-    with open(input_file) as in_file:
-        for idx, line in enumerate(in_file):
-            entry = json.loads(line.strip())
-            result = {}
-            if entry['test_name'] in Process.supported_tests:
-                result = getattr(Process, entry['test_name'])(entry)
-            result['idx'] = idx
-            results['test_name'] = entry['test_name']
-            results['test_start_time'] = entry['test_start_time']
-            results['country_code'] = entry['probe_cc']
-            results['asn'] = entry['probe_asn']
-            results['results'] = results.get('results', [])
-            results['results'].append(result)
-
-    with open(output_file, "w") as fw:
-        json.dump(results, fw)
-
-if __name__ == "__main__":
-    import sys
-    if len(sys.argv) != 3:
-        print("Usage: {0} [input_file] [output_file]".format(sys.argv[0]))
-        sys.exit(1)
-    generate_summary(sys.argv[1], sys.argv[2])
diff --git a/ooni/scripts/oonireport.py b/ooni/scripts/oonireport.py
index f59a843..13d8473 100644
--- a/ooni/scripts/oonireport.py
+++ b/ooni/scripts/oonireport.py
@@ -122,6 +122,14 @@ def upload_all(collector=None, bouncer=None, upload_incomplete=False):
         except Exception as exc:
             log.exception(exc)
 
+    if upload_incomplete:
+        reports_to_upload = yield oonib_report_log.get_incomplete()
+        for report_file, value in reports_to_upload:
+            try:
+                yield upload(report_file, collector, bouncer,
+                             value['measurement_id'])
+            except Exception as exc:
+                log.exception(exc)
 
 def print_report(report_file, value):
     print("* %s" % report_file)
diff --git a/ooni/ui/web/client/index.html b/ooni/ui/web/client/index.html
index c6d83b7..7ba33fb 100644
--- a/ooni/ui/web/client/index.html
+++ b/ooni/ui/web/client/index.html
@@ -13,5 +13,5 @@
     <app>
       Loading...
     </app>
-  <script type="text/javascript" src="app.bundle.js?16bac0b4c21c5b120b04"></script></body>
+  <script type="text/javascript" src="app.bundle.js?afba2f26969b4c8f00ec"></script></body>
 </html>
diff --git a/ooni/ui/web/server.py b/ooni/ui/web/server.py
index a6fd315..9d98e62 100644
--- a/ooni/ui/web/server.py
+++ b/ooni/ui/web/server.py
@@ -20,7 +20,7 @@ from ooni.deck import NGDeck
 from ooni.settings import config
 from ooni.utils import log
 from ooni.director import DirectorEvent
-from ooni.results import generate_summary
+from ooni.measurements import generate_summary
 from ooni.geoip import probe_ip
 
 config.advanced.debug = True
@@ -64,7 +64,8 @@ def xsrf_protect(check=True):
             if (token_cookie != instance._xsrf_token and
                     instance._enable_xsrf_protection):
                 request.addCookie(u'XSRF-TOKEN',
-                                  instance._xsrf_token)
+                                  instance._xsrf_token,
+                                  path=u'/')
             if should_check and token_cookie != token_header:
                 raise WebUIError(404, "Invalid XSRF token")
             return f(instance, request, *a, **kw)
@@ -353,6 +354,41 @@ class WebUIAPI(object):
 
         return self.render_json(r, request)
 
+    @app.route('/api/measurement/<string:measurement_id>', methods=["DELETE"])
+    @xsrf_protect(check=True)
+    def api_measurement_delete(self, request, measurement_id):
+        try:
+            measurement_dir = self.measurement_path.child(measurement_id)
+        except InsecurePath:
+            raise WebUIError(500, "invalid measurement id")
+
+        if measurement_dir.child("measurements.njson.progress").exists():
+            raise WebUIError(400, "measurement in progress")
+
+        try:
+            measurement_dir.remove()
+        except:
+            raise WebUIError(400, "Failed to delete report")
+
+        return self.render_json({"result": "ok"}, request)
+
+    @app.route('/api/measurement/<string:measurement_id>/keep', methods=["POST"])
+    @xsrf_protect(check=True)
+    def api_measurement_keep(self, request, measurement_id):
+        try:
+            measurement_dir = self.measurement_path.child(measurement_id)
+        except InsecurePath:
+            raise WebUIError(500, "invalid measurement id")
+
+        if measurement_dir.child("measurements.njson.progress").exists():
+            raise WebUIError(400, "measurement in progress")
+
+        summary = measurement_dir.child("keep")
+        with summary.open("w+") as f:
+            pass
+
+        return self.render_json({"result": "ok"}, request)
+
     @app.route('/api/measurement/<string:measurement_id>/<int:idx>',
                methods=["GET"])
     @xsrf_protect(check=False)
diff --git a/ooni/utils/__init__.py b/ooni/utils/__init__.py
index f8ee953..35d419d 100644
--- a/ooni/utils/__init__.py
+++ b/ooni/utils/__init__.py
@@ -92,6 +92,9 @@ def randomStr(length, num=True):
         chars += string.digits
     return ''.join(random.choice(chars) for x in range(length))
 
+LONG_DATE = "%Y-%m-%d %H:%M:%S"
+SHORT_DATE = "%Y%m%dT%H%M%SZ"
+
 def generate_filename(test_details, prefix=None, extension=None):
     """
     Returns a filename for every test execution.
@@ -99,9 +102,6 @@ def generate_filename(test_details, prefix=None, extension=None):
     It's used to assure that all files of a certain test have a common basename but different
     extension.
     """
-    LONG_DATE = "%Y-%m-%d %H:%M:%S"
-    SHORT_DATE = "%Y%m%dT%H%M%SZ"
-
     kwargs = {}
     filename_format = ""
     if prefix is not None:



_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits