[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [ooni-probe/master] Refactoring of the architecture of the taskManager



commit 813230cc416338740bf72acb04b97042ebd5261a
Author: Arturo Filastò <art@xxxxxxxxx>
Date:   Tue Jan 15 18:01:45 2013 +0100

    Refactoring of the architecture of the taskManager
    
    Minimize coupling, make clearer the chain of responsibilities
---
 ooni/director.py      |   23 ++++++++--
 ooni/managers.py      |   36 +++++---------
 ooni/nettest.py       |  123 +++++++++++++++++++++++++++++++++---------------
 ooni/reporter.py      |   70 +++++++++++++++++++--------
 ooni/tasks.py         |   65 ++-----------------------
 tests/test_nettest.py |    3 +
 6 files changed, 174 insertions(+), 146 deletions(-)

diff --git a/ooni/director.py b/ooni/director.py
index 582bb84..41100cf 100644
--- a/ooni/director.py
+++ b/ooni/director.py
@@ -120,13 +120,25 @@ class Director(object):
 
         self.successfulMeasurements += 1
 
+        return measurement.report.write(measurement)
+
     def measurementFailed(self, failure, measurement):
         self.totalMeasurementRuntime += measurement.runtime
 
         self.failedMeasurements += 1
         self.failures.append((failure, measurement))
 
-    def startTest(self, net_test_file, options):
+    def reportEntryFailed(self, failure):
+        # XXX add failure handling logic
+        return
+
+    def startMeasurements(self, measurements):
+        self.measurementManager.schedule(measurements)
+
+    def netTestDone(self, net_test):
+        self.activeNetTests.remove(net_test)
+
+    def startNetTest(self, net_test_file, options):
         """
         Create the Report for the NetTest and start the report NetTest.
 
@@ -139,12 +151,15 @@ class Director(object):
                 is a dict containing the options to be passed to the chosen net
                 test.
         """
-        report = Report(self.reporters)
-        report.reportEntryManager = self.reportEntryManager
+        report = Report(self.reporters, self.reportEntryManager)
 
         net_test = NetTest(net_test_file, options, report)
-        net_test.measurementManager = self.measurementManager
+        net_test.director = self
+
+        self.activeNetTests.append(net_test)
+        self.activeNetTests.append(net_test)
 
         d = net_test.start()
+        d.addBoth(self.netTestDone)
         return d
 
diff --git a/ooni/managers.py b/ooni/managers.py
index 3983705..fa59058 100644
--- a/ooni/managers.py
+++ b/ooni/managers.py
@@ -15,13 +15,12 @@ def makeIterable(item):
 class TaskManager(object):
     retries = 2
 
-    failures = []
     concurrency = 10
 
-    completedTasks = 0
-
-    _tasks = iter(())
-    _active_tasks = []
+    def __init__(self):
+        self._tasks = iter(())
+        self._active_tasks = []
+        self.failures = []
 
     def _failed(self, failure, task):
         """
@@ -35,7 +34,8 @@ class TaskManager(object):
             self._tasks = itertools.chain(self._tasks,
                     makeIterable(task))
         else:
-            task.done.callback((failure, task))
+            # This fires the errback when the task is done but has failed.
+            task.done.callback(failure)
 
         self.failed(failure, task)
 
@@ -58,11 +58,11 @@ class TaskManager(object):
         We have successfully completed a measurement.
         """
         self._active_tasks.remove(task)
-        self.completedTasks += 1
 
         self._fillSlots()
 
-        task.done.callback(result)
+        # Fires the done deferred when the task has completed
+        task.done.callback(task)
         self.succeeded(result, task)
 
     def _run(self, task):
@@ -105,12 +105,6 @@ class TaskManager(object):
 
         self._fillSlots()
 
-    def started(self, task):
-        """
-        This hook will get called every time a task has been started.
-        """
-        pass
-
     def failed(self, failure, task):
         """
         This hoook is called every time a task has failed.
@@ -138,28 +132,24 @@ class MeasurementManager(TaskManager):
     NetTest on the contrary is aware of the typology of measurements that it is
     dispatching as they are logically grouped by test file.
     """
+    # XXX tweak these values
     retries = 2
-
-    failures = []
     concurrency = 10
 
     director = None
 
-    def started(self, measurement):
-        self.director.measurementStarted(measurement)
-
     def succeeded(self, result, measurement):
         self.director.measurementSucceeded(measurement)
 
     def failed(self, failure, measurement):
         self.director.measurementFailed(failure, measurement)
 
-
 class ReportEntryManager(TaskManager):
-    director = None
+    # XXX tweak these values
+    retries = 3
+    concurrency = 20
 
-    def started(self, task):
-        pass
+    director = None
 
     def succeeded(self, result, task):
         pass
diff --git a/ooni/nettest.py b/ooni/nettest.py
index d6cadc3..9146924 100644
--- a/ooni/nettest.py
+++ b/ooni/nettest.py
@@ -4,7 +4,7 @@ from twisted.internet import defer, reactor
 from twisted.trial.runner import filenameToModule
 from twisted.python import usage, reflect
 
-from ooni.tasks import Measurement, TaskMediator
+from ooni.tasks import Measurement
 from ooni.utils import log, checkForRoot, NotRootError
 
 from inspect import getmembers
@@ -13,8 +13,53 @@ from StringIO import StringIO
 class NoTestCasesFound(Exception):
     pass
 
+class NetTestState(object):
+    def __init__(self, allTasksDone):
+        """
+        This keeps track of the state of a running NetTests case.
+
+        Args:
+            allTasksDone is a deferred that will get fired once all the NetTest
+            cases have reached a final done state.
+        """
+        self.doneTasks = 0
+        self.tasks = 0
+
+        self.completedScheduling = False
+        self.allTasksDone = allTasksDone
+
+    def created(self):
+        self.tasks += 1
+
+    def checkAllTasksDone(self):
+        if self.completedScheduling and \
+                self.doneTasks == self.tasks:
+            self.allTasksDone.callback(self.doneTasks)
+
+    def taskDone(self, result):
+        """
+        This is called every time a task has finished running.
+        """
+        self.doneTasks += 1
+        self.checkAllTasksDone()
+
+    def allTasksScheduled(self):
+        """
+        This should be called once all the tasks that need to run have been
+        scheduled.
+
+        XXX this is ghetto.
+        The reason for which we are calling allTasksDone inside of the
+        allTasksScheduled method is called after all tasks are done, then we
+        will run into a race condition. The race is that we don't end up
+        checking that all the tasks are complete because no task is to be
+        scheduled.
+        """
+        self.completedScheduling = True
+        self.checkAllTasksDone()
+
 class NetTest(object):
-    measurementManager = None
+    director = None
     method_prefix = 'test'
 
     def __init__(self, net_test_file, options, report):
@@ -29,17 +74,12 @@ class NetTest(object):
         self.report = report
         self.test_cases = self.loadNetTest(net_test_file)
 
-        self.allMeasurementsDone = defer.Deferred()
-        self.allReportsDone = defer.Deferred()
-
-        # This should fire when all the measurements have been completed and
+        # This will fire when all the measurements have been completed and
         # all the reports are done. Done means that they have either completed
         # successfully or all the possible retries have been reached.
-        self.done = defer.DeferredList([self.allMeasurementsDone,
-            self.allReportsDone])
+        self.done = defer.Deferred()
 
-        # XXX Fire the done when also all the reporting tasks have been completed.
-        # self.done = self.allMeasurementsDone
+        self.state = NetTestState(self.done)
 
     def start(self):
         """
@@ -47,10 +87,42 @@ class NetTest(object):
         Start tests and generate measurements.
         """
         self.setUpNetTestCases()
-        self.measurementManager.schedule(self.generateMeasurements())
-
+        self.director.startMeasurements(self.generateMeasurements())
         return self.done
 
+    def doneReport(self, result):
+        """
+        This will get called every time a measurement is done and therefore a
+        measurement is done.
+
+        The state for the NetTest is informed of the fact that another task has
+        reached the done state.
+        """
+        self.state.taskDone()
+        return result
+
+    def generateMeasurements(self):
+        """
+        This is a generator that yields measurements and registers the
+        callbacks for when a measurement is successful or has failed.
+        """
+        for test_class, test_method in self.test_cases:
+            for test_input in test_class.inputs:
+                measurement = Measurement(test_class, test_method, test_input)
+
+                measurement.done.addCallback(self.director.measurementSucceeded)
+                measurement.done.addErrback(self.director.measurementFailed)
+
+                measurement.done.addCallback(self.report.write)
+                measurement.done.addErrback(self.director.reportEntryFailed)
+
+                measurement.done.addBoth(self.doneReport)
+
+                self.state.taskCreated()
+                yield measurement
+
+        self.state.allTasksScheduled()
+
     def loadNetTest(self, net_test_file):
         """
         Creates all the necessary test_cases (a list of tuples containing the
@@ -124,33 +196,6 @@ class NetTest(object):
             pass
         return test_cases
 
-    def succeeded(self, measurement):
-        """
-        This gets called when a measurement has succeeded.
-        """
-        self.report.write(measurement)
-
-    def generateMeasurements(self):
-        """
-        This is a generator that yields measurements and sets their timeout
-        value and their netTest attribute.
-        """
-        task_mediator = TaskMediator(self.allMeasurementsDone)
-        for test_class, test_method in self.test_cases:
-            for test_input in test_class.inputs:
-                measurement = Measurement(test_class, test_method,
-                        test_input, self, task_mediator)
-                measurement.netTest = self
-                yield measurement
-        task_mediator.allTasksScheduled()
-
-        @task_mediator.allTasksDone.addCallback
-        def done(result):
-            """
-            Once all the MeasurementsTasks have been completed all the report
-            tasks will have been scheduled.
-            """
-            self.report.report_mediator.allTasksScheduled()
 
     def setUpNetTestCases(self):
         """
diff --git a/ooni/reporter.py b/ooni/reporter.py
index d24edcc..80595f9 100644
--- a/ooni/reporter.py
+++ b/ooni/reporter.py
@@ -31,7 +31,7 @@ from ooni.utils.net import BodyReceiver, StringProducer, userAgents
 
 from ooni import config
 
-from ooni.tasks import ReportEntry, TaskMediator
+from ooni.tasks import ReportEntry
 
 def createPacketReport(packet_list):
     """
@@ -158,6 +158,8 @@ def getTestDetails(options):
     return test_details
 
 class OReporter(object):
+    created = defer.Deferred()
+
     def __init__(self, cmd_line_options):
         self.cmd_line_options = dict(cmd_line_options)
 
@@ -380,15 +382,19 @@ class ReportClosed(Exception):
     pass
 
 class Report(object):
-    reportEntryManager = None
-
-    def __init__(self, reporters):
+    def __init__(self, reporters, reportEntryManager):
         """
-        This will instantiate all the reporters and add them to the list of
-        available reporters.
+        This is an abstraction layer on top of all the configured reporters.
+
+        It allows to lazily write to the reporters that are to be used.
+
+        Args:
+
+            reporters:
+                a list of :class:ooni.reporter.OReporter
 
-        net_test:
-            is a reference to the net_test to which the report object belongs to.
+            reportEntryManager:
+                an instance of :class:ooni.tasks.ReportEntryManager
         """
         self.reporters = []
         for r in reporters:
@@ -398,13 +404,14 @@ class Report(object):
         self.createReports()
 
         self.done = defer.Deferred()
-        self.done.addCallback(self.finish)
+        self.done.addCallback(self.close)
 
-        self.report_mediator = TaskMediator(self.done)
+        self.reportEntryManager = reportEntryManager
 
-    def createReports(self):
+    def open(self):
         """
-        This will create all the reports that need to be created.
+        This will create all the reports that need to be created and fires the
+        created callback of the reporter whose report got created.
         """
         for reporter in self.reporters:
             d = defer.maybeDeferred(reporter.createReport)
@@ -413,20 +420,41 @@ class Report(object):
     def write(self, measurement):
         """
         This is a lazy call that will write to all the reporters by waiting on
-        the created callback to fire.
+        them to be created.
 
-        The report_write_task is created before we attach the callback so that
-        the report mediator is aware of the total number of created reportEntry
-        tasks.
+        Will return a deferred that will fire once the report for the specified
+        measurement have been written to all the reporters.
+
+        Args:
+
+            measurement:
+                an instance of :class:ooni.tasks.Measurement
+
+        Returns:
+            a deferred list that will fire once all the report entries have
+            been written.
         """
+        dl = []
         for reporter in self.reporters:
-            report_write_task = ReportEntry(reporter, measurement,
-                    self.report_mediator)
-            @reporter.created.addCallback
-            def cb(result):
+            def writeReportEntry(result):
+                report_write_task = ReportEntry(reporter, measurement)
                 self.reportEntryManager.schedule(report_write_task)
+                return report_write_task.done
 
-    def finish(self, result):
+            d = reporter.created.addBoth(writeReportEntry)
+            dl.append(d)
+
+        return defer.DeferredList(dl)
+
+    def close(self, _):
+        """
+        Close the report by calling it's finish method.
+
+        Returns:
+            a :class:twisted.internet.defer.DeferredList that will fire when
+            all the reports have been closed.
+
+        """
         dl = []
         for reporter in self.reporters:
             d = defer.maybeDeferred(reporter.finish)
diff --git a/ooni/tasks.py b/ooni/tasks.py
index f2a9bae..28aaca4 100644
--- a/ooni/tasks.py
+++ b/ooni/tasks.py
@@ -5,7 +5,7 @@ from twisted.internet import defer, reactor
 class BaseTask(object):
     _timer = None
 
-    def __init__(self, mediator=None):
+    def __init__(self):
         """
         If you want to schedule a task multiple times, remember to create fresh
         instances of it.
@@ -19,10 +19,8 @@ class BaseTask(object):
         # This is a deferred that gets called when a test has reached it's
         # final status, this means: all retries have been attempted or the test
         # has successfully executed.
+        # Such deferred will be called on completion by the TaskManager.
         self.done = defer.Deferred()
-        if mediator:
-            mediator.created()
-            self.done.addCallback(mediator.taskDone)
 
     def _failed(self, failure):
         self.failures += 1
@@ -97,8 +95,7 @@ class TaskWithTimeout(BaseTask):
         pass
 
 class Measurement(TaskWithTimeout):
-    def __init__(self, test_class, test_method, test_input, net_test,
-            mediator):
+    def __init__(self, test_class, test_method, test_input):
         """
         test_class:
             is the class, subclass of NetTestCase, of the test to be run
@@ -121,9 +118,7 @@ class Measurement(TaskWithTimeout):
         self.test_instance.setUp()
         self.test = getattr(self.test_instance, test_method)
 
-        self.netTest = net_test
-
-        TaskWithTimeout.__init__(self, mediator)
+        TaskWithTimeout.__init__(self)
 
     def succeeded(self, result):
         return self.netTest.succeeded(self)
@@ -139,60 +134,12 @@ class Measurement(TaskWithTimeout):
         return d
 
 class ReportEntry(TaskWithTimeout):
-    def __init__(self, reporter, measurement, task_mediator):
+    def __init__(self, reporter, measurement):
         self.reporter = reporter
         self.measurement = measurement
 
-        TaskWithTimeout.__init__(self, task_mediator)
+        TaskWithTimeout.__init__(self)
 
     def run(self):
         return self.reporter.writeReportEntry(self.measurement)
 
-
-class TaskMediator(object):
-    def __init__(self, allTasksDone):
-        """
-        This implements a Mediator/Observer pattern to keep track of when Tasks
-        that are logically linked together have all reached a final done stage.
-
-        Args:
-            allTasksDone is a deferred that will get fired once all the tasks
-            have been completed.
-        """
-        self.doneTasks = 0
-        self.tasks = 0
-
-        self.completedScheduling = False
-        self.allTasksDone = allTasksDone
-
-    def created(self):
-        self.tasks += 1
-
-    def checkAllTasksDone(self):
-        if self.completedScheduling and \
-                self.doneTasks == self.tasks:
-            self.allTasksDone.callback(self.doneTasks)
-
-    def taskDone(self, result):
-        """
-        This is called every time a task has finished running.
-        """
-        self.doneTasks += 1
-        self.checkAllTasksDone()
-
-    def allTasksScheduled(self):
-        """
-        This should be called once all the tasks that need to run have been
-        scheduled.
-
-        XXX this is ghetto.
-        The reason for which we are calling allTasksDone inside of the
-        allTasksScheduled method is called after all tasks are done, then we
-        will run into a race condition. The race is that we don't end up
-        checking that all the tasks are complete because no task is to be
-        scheduled.
-        """
-        self.completedScheduling = True
-        self.checkAllTasksDone()
-
-
diff --git a/tests/test_nettest.py b/tests/test_nettest.py
index 3c7bdf6..a029fae 100644
--- a/tests/test_nettest.py
+++ b/tests/test_nettest.py
@@ -10,6 +10,8 @@ from ooni.nettest import FailureToLoadNetTest
 from ooni.tasks import BaseTask
 from ooni.utils import NotRootError
 
+from ooni.director import Director
+
 from ooni.managers import TaskManager
 
 from tests.mocks import MockMeasurement, MockMeasurementFailOnce
@@ -223,6 +225,7 @@ class TestNetTest(unittest.TestCase):
         net_test = NetTest(StringIO(net_test_string_with_file),
             dummyOptionsWithFile, MockReporter())
         net_test.measurementManager = MockMeasurementManager()
+        net_test.director = Director()
 
         d = net_test.start()
         @d.addCallback



_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits