[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [ooni-probe/master] Refactoring of the architecture of the taskManager
commit 813230cc416338740bf72acb04b97042ebd5261a
Author: Arturo Filastò <art@xxxxxxxxx>
Date: Tue Jan 15 18:01:45 2013 +0100
Refactoring of the architecture of the taskManager
Minimize coupling, make clearer the chain of responsibilities
---
ooni/director.py | 23 ++++++++--
ooni/managers.py | 36 +++++---------
ooni/nettest.py | 123 +++++++++++++++++++++++++++++++++---------------
ooni/reporter.py | 70 +++++++++++++++++++--------
ooni/tasks.py | 65 ++-----------------------
tests/test_nettest.py | 3 +
6 files changed, 174 insertions(+), 146 deletions(-)
diff --git a/ooni/director.py b/ooni/director.py
index 582bb84..41100cf 100644
--- a/ooni/director.py
+++ b/ooni/director.py
@@ -120,13 +120,25 @@ class Director(object):
self.successfulMeasurements += 1
+ return measurement.report.write(measurement)
+
def measurementFailed(self, failure, measurement):
self.totalMeasurementRuntime += measurement.runtime
self.failedMeasurements += 1
self.failures.append((failure, measurement))
- def startTest(self, net_test_file, options):
+ def reportEntryFailed(self, failure):
+ # XXX add failure handling logic
+ return
+
+ def startMeasurements(self, measurements):
+ self.measurementManager.schedule(measurements)
+
+ def netTestDone(self, net_test):
+ self.activeNetTests.remove(net_test)
+
+ def startNetTest(self, net_test_file, options):
"""
Create the Report for the NetTest and start the report NetTest.
@@ -139,12 +151,15 @@ class Director(object):
is a dict containing the options to be passed to the chosen net
test.
"""
- report = Report(self.reporters)
- report.reportEntryManager = self.reportEntryManager
+ report = Report(self.reporters, self.reportEntryManager)
net_test = NetTest(net_test_file, options, report)
- net_test.measurementManager = self.measurementManager
+ net_test.director = self
+
+ self.activeNetTests.append(net_test)
+ self.activeNetTests.append(net_test)
d = net_test.start()
+ d.addBoth(self.netTestDone)
return d
diff --git a/ooni/managers.py b/ooni/managers.py
index 3983705..fa59058 100644
--- a/ooni/managers.py
+++ b/ooni/managers.py
@@ -15,13 +15,12 @@ def makeIterable(item):
class TaskManager(object):
retries = 2
- failures = []
concurrency = 10
- completedTasks = 0
-
- _tasks = iter(())
- _active_tasks = []
+ def __init__(self):
+ self._tasks = iter(())
+ self._active_tasks = []
+ self.failures = []
def _failed(self, failure, task):
"""
@@ -35,7 +34,8 @@ class TaskManager(object):
self._tasks = itertools.chain(self._tasks,
makeIterable(task))
else:
- task.done.callback((failure, task))
+ # This fires the errback when the task is done but has failed.
+ task.done.callback(failure)
self.failed(failure, task)
@@ -58,11 +58,11 @@ class TaskManager(object):
We have successfully completed a measurement.
"""
self._active_tasks.remove(task)
- self.completedTasks += 1
self._fillSlots()
- task.done.callback(result)
+ # Fires the done deferred when the task has completed
+ task.done.callback(task)
self.succeeded(result, task)
def _run(self, task):
@@ -105,12 +105,6 @@ class TaskManager(object):
self._fillSlots()
- def started(self, task):
- """
- This hook will get called every time a task has been started.
- """
- pass
-
def failed(self, failure, task):
"""
This hoook is called every time a task has failed.
@@ -138,28 +132,24 @@ class MeasurementManager(TaskManager):
NetTest on the contrary is aware of the typology of measurements that it is
dispatching as they are logically grouped by test file.
"""
+ # XXX tweak these values
retries = 2
-
- failures = []
concurrency = 10
director = None
- def started(self, measurement):
- self.director.measurementStarted(measurement)
-
def succeeded(self, result, measurement):
self.director.measurementSucceeded(measurement)
def failed(self, failure, measurement):
self.director.measurementFailed(failure, measurement)
-
class ReportEntryManager(TaskManager):
- director = None
+ # XXX tweak these values
+ retries = 3
+ concurrency = 20
- def started(self, task):
- pass
+ director = None
def succeeded(self, result, task):
pass
diff --git a/ooni/nettest.py b/ooni/nettest.py
index d6cadc3..9146924 100644
--- a/ooni/nettest.py
+++ b/ooni/nettest.py
@@ -4,7 +4,7 @@ from twisted.internet import defer, reactor
from twisted.trial.runner import filenameToModule
from twisted.python import usage, reflect
-from ooni.tasks import Measurement, TaskMediator
+from ooni.tasks import Measurement
from ooni.utils import log, checkForRoot, NotRootError
from inspect import getmembers
@@ -13,8 +13,53 @@ from StringIO import StringIO
class NoTestCasesFound(Exception):
pass
+class NetTestState(object):
+ def __init__(self, allTasksDone):
+ """
+ This keeps track of the state of a running NetTests case.
+
+ Args:
+ allTasksDone is a deferred that will get fired once all the NetTest
+ cases have reached a final done state.
+ """
+ self.doneTasks = 0
+ self.tasks = 0
+
+ self.completedScheduling = False
+ self.allTasksDone = allTasksDone
+
+ def created(self):
+ self.tasks += 1
+
+ def checkAllTasksDone(self):
+ if self.completedScheduling and \
+ self.doneTasks == self.tasks:
+ self.allTasksDone.callback(self.doneTasks)
+
+ def taskDone(self, result):
+ """
+ This is called every time a task has finished running.
+ """
+ self.doneTasks += 1
+ self.checkAllTasksDone()
+
+ def allTasksScheduled(self):
+ """
+ This should be called once all the tasks that need to run have been
+ scheduled.
+
+ XXX this is ghetto.
+ The reason for which we are calling allTasksDone inside of the
+ allTasksScheduled method is called after all tasks are done, then we
+ will run into a race condition. The race is that we don't end up
+ checking that all the tasks are complete because no task is to be
+ scheduled.
+ """
+ self.completedScheduling = True
+ self.checkAllTasksDone()
+
class NetTest(object):
- measurementManager = None
+ director = None
method_prefix = 'test'
def __init__(self, net_test_file, options, report):
@@ -29,17 +74,12 @@ class NetTest(object):
self.report = report
self.test_cases = self.loadNetTest(net_test_file)
- self.allMeasurementsDone = defer.Deferred()
- self.allReportsDone = defer.Deferred()
-
- # This should fire when all the measurements have been completed and
+ # This will fire when all the measurements have been completed and
# all the reports are done. Done means that they have either completed
# successfully or all the possible retries have been reached.
- self.done = defer.DeferredList([self.allMeasurementsDone,
- self.allReportsDone])
+ self.done = defer.Deferred()
- # XXX Fire the done when also all the reporting tasks have been completed.
- # self.done = self.allMeasurementsDone
+ self.state = NetTestState(self.done)
def start(self):
"""
@@ -47,10 +87,42 @@ class NetTest(object):
Start tests and generate measurements.
"""
self.setUpNetTestCases()
- self.measurementManager.schedule(self.generateMeasurements())
-
+ self.director.startMeasurements(self.generateMeasurements())
return self.done
+ def doneReport(self, result):
+ """
+ This will get called every time a measurement is done and therefore a
+ measurement is done.
+
+ The state for the NetTest is informed of the fact that another task has
+ reached the done state.
+ """
+ self.state.taskDone()
+ return result
+
+ def generateMeasurements(self):
+ """
+ This is a generator that yields measurements and registers the
+ callbacks for when a measurement is successful or has failed.
+ """
+ for test_class, test_method in self.test_cases:
+ for test_input in test_class.inputs:
+ measurement = Measurement(test_class, test_method, test_input)
+
+ measurement.done.addCallback(self.director.measurementSucceeded)
+ measurement.done.addErrback(self.director.measurementFailed)
+
+ measurement.done.addCallback(self.report.write)
+ measurement.done.addErrback(self.director.reportEntryFailed)
+
+ measurement.done.addBoth(self.doneReport)
+
+ self.state.taskCreated()
+ yield measurement
+
+ self.state.allTasksScheduled()
+
def loadNetTest(self, net_test_file):
"""
Creates all the necessary test_cases (a list of tuples containing the
@@ -124,33 +196,6 @@ class NetTest(object):
pass
return test_cases
- def succeeded(self, measurement):
- """
- This gets called when a measurement has succeeded.
- """
- self.report.write(measurement)
-
- def generateMeasurements(self):
- """
- This is a generator that yields measurements and sets their timeout
- value and their netTest attribute.
- """
- task_mediator = TaskMediator(self.allMeasurementsDone)
- for test_class, test_method in self.test_cases:
- for test_input in test_class.inputs:
- measurement = Measurement(test_class, test_method,
- test_input, self, task_mediator)
- measurement.netTest = self
- yield measurement
- task_mediator.allTasksScheduled()
-
- @task_mediator.allTasksDone.addCallback
- def done(result):
- """
- Once all the MeasurementsTasks have been completed all the report
- tasks will have been scheduled.
- """
- self.report.report_mediator.allTasksScheduled()
def setUpNetTestCases(self):
"""
diff --git a/ooni/reporter.py b/ooni/reporter.py
index d24edcc..80595f9 100644
--- a/ooni/reporter.py
+++ b/ooni/reporter.py
@@ -31,7 +31,7 @@ from ooni.utils.net import BodyReceiver, StringProducer, userAgents
from ooni import config
-from ooni.tasks import ReportEntry, TaskMediator
+from ooni.tasks import ReportEntry
def createPacketReport(packet_list):
"""
@@ -158,6 +158,8 @@ def getTestDetails(options):
return test_details
class OReporter(object):
+ created = defer.Deferred()
+
def __init__(self, cmd_line_options):
self.cmd_line_options = dict(cmd_line_options)
@@ -380,15 +382,19 @@ class ReportClosed(Exception):
pass
class Report(object):
- reportEntryManager = None
-
- def __init__(self, reporters):
+ def __init__(self, reporters, reportEntryManager):
"""
- This will instantiate all the reporters and add them to the list of
- available reporters.
+ This is an abstraction layer on top of all the configured reporters.
+
+ It allows to lazily write to the reporters that are to be used.
+
+ Args:
+
+ reporters:
+ a list of :class:ooni.reporter.OReporter
- net_test:
- is a reference to the net_test to which the report object belongs to.
+ reportEntryManager:
+ an instance of :class:ooni.tasks.ReportEntryManager
"""
self.reporters = []
for r in reporters:
@@ -398,13 +404,14 @@ class Report(object):
self.createReports()
self.done = defer.Deferred()
- self.done.addCallback(self.finish)
+ self.done.addCallback(self.close)
- self.report_mediator = TaskMediator(self.done)
+ self.reportEntryManager = reportEntryManager
- def createReports(self):
+ def open(self):
"""
- This will create all the reports that need to be created.
+ This will create all the reports that need to be created and fires the
+ created callback of the reporter whose report got created.
"""
for reporter in self.reporters:
d = defer.maybeDeferred(reporter.createReport)
@@ -413,20 +420,41 @@ class Report(object):
def write(self, measurement):
"""
This is a lazy call that will write to all the reporters by waiting on
- the created callback to fire.
+ them to be created.
- The report_write_task is created before we attach the callback so that
- the report mediator is aware of the total number of created reportEntry
- tasks.
+ Will return a deferred that will fire once the report for the specified
+ measurement have been written to all the reporters.
+
+ Args:
+
+ measurement:
+ an instance of :class:ooni.tasks.Measurement
+
+ Returns:
+ a deferred list that will fire once all the report entries have
+ been written.
"""
+ dl = []
for reporter in self.reporters:
- report_write_task = ReportEntry(reporter, measurement,
- self.report_mediator)
- @reporter.created.addCallback
- def cb(result):
+ def writeReportEntry(result):
+ report_write_task = ReportEntry(reporter, measurement)
self.reportEntryManager.schedule(report_write_task)
+ return report_write_task.done
- def finish(self, result):
+ d = reporter.created.addBoth(writeReportEntry)
+ dl.append(d)
+
+ return defer.DeferredList(dl)
+
+ def close(self, _):
+ """
+ Close the report by calling it's finish method.
+
+ Returns:
+ a :class:twisted.internet.defer.DeferredList that will fire when
+ all the reports have been closed.
+
+ """
dl = []
for reporter in self.reporters:
d = defer.maybeDeferred(reporter.finish)
diff --git a/ooni/tasks.py b/ooni/tasks.py
index f2a9bae..28aaca4 100644
--- a/ooni/tasks.py
+++ b/ooni/tasks.py
@@ -5,7 +5,7 @@ from twisted.internet import defer, reactor
class BaseTask(object):
_timer = None
- def __init__(self, mediator=None):
+ def __init__(self):
"""
If you want to schedule a task multiple times, remember to create fresh
instances of it.
@@ -19,10 +19,8 @@ class BaseTask(object):
# This is a deferred that gets called when a test has reached it's
# final status, this means: all retries have been attempted or the test
# has successfully executed.
+ # Such deferred will be called on completion by the TaskManager.
self.done = defer.Deferred()
- if mediator:
- mediator.created()
- self.done.addCallback(mediator.taskDone)
def _failed(self, failure):
self.failures += 1
@@ -97,8 +95,7 @@ class TaskWithTimeout(BaseTask):
pass
class Measurement(TaskWithTimeout):
- def __init__(self, test_class, test_method, test_input, net_test,
- mediator):
+ def __init__(self, test_class, test_method, test_input):
"""
test_class:
is the class, subclass of NetTestCase, of the test to be run
@@ -121,9 +118,7 @@ class Measurement(TaskWithTimeout):
self.test_instance.setUp()
self.test = getattr(self.test_instance, test_method)
- self.netTest = net_test
-
- TaskWithTimeout.__init__(self, mediator)
+ TaskWithTimeout.__init__(self)
def succeeded(self, result):
return self.netTest.succeeded(self)
@@ -139,60 +134,12 @@ class Measurement(TaskWithTimeout):
return d
class ReportEntry(TaskWithTimeout):
- def __init__(self, reporter, measurement, task_mediator):
+ def __init__(self, reporter, measurement):
self.reporter = reporter
self.measurement = measurement
- TaskWithTimeout.__init__(self, task_mediator)
+ TaskWithTimeout.__init__(self)
def run(self):
return self.reporter.writeReportEntry(self.measurement)
-
-class TaskMediator(object):
- def __init__(self, allTasksDone):
- """
- This implements a Mediator/Observer pattern to keep track of when Tasks
- that are logically linked together have all reached a final done stage.
-
- Args:
- allTasksDone is a deferred that will get fired once all the tasks
- have been completed.
- """
- self.doneTasks = 0
- self.tasks = 0
-
- self.completedScheduling = False
- self.allTasksDone = allTasksDone
-
- def created(self):
- self.tasks += 1
-
- def checkAllTasksDone(self):
- if self.completedScheduling and \
- self.doneTasks == self.tasks:
- self.allTasksDone.callback(self.doneTasks)
-
- def taskDone(self, result):
- """
- This is called every time a task has finished running.
- """
- self.doneTasks += 1
- self.checkAllTasksDone()
-
- def allTasksScheduled(self):
- """
- This should be called once all the tasks that need to run have been
- scheduled.
-
- XXX this is ghetto.
- The reason for which we are calling allTasksDone inside of the
- allTasksScheduled method is called after all tasks are done, then we
- will run into a race condition. The race is that we don't end up
- checking that all the tasks are complete because no task is to be
- scheduled.
- """
- self.completedScheduling = True
- self.checkAllTasksDone()
-
-
diff --git a/tests/test_nettest.py b/tests/test_nettest.py
index 3c7bdf6..a029fae 100644
--- a/tests/test_nettest.py
+++ b/tests/test_nettest.py
@@ -10,6 +10,8 @@ from ooni.nettest import FailureToLoadNetTest
from ooni.tasks import BaseTask
from ooni.utils import NotRootError
+from ooni.director import Director
+
from ooni.managers import TaskManager
from tests.mocks import MockMeasurement, MockMeasurementFailOnce
@@ -223,6 +225,7 @@ class TestNetTest(unittest.TestCase):
net_test = NetTest(StringIO(net_test_string_with_file),
dummyOptionsWithFile, MockReporter())
net_test.measurementManager = MockMeasurementManager()
+ net_test.director = Director()
d = net_test.start()
@d.addCallback
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits