[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [ooni-probe/master] Work on TaskManager and write tests for it
commit f621794d11c3dda37edbccf82a56db1025e348c7
Author: Arturo Filastò <art@xxxxxxxxx>
Date: Sat Jan 12 14:58:11 2013 +0100
Work on TaskManager and write tests for it
Changes to the API of the MeasurementManager and ReportManager
---
ooni/managers.py | 47 +++++++++++++++++++++++------------------------
tests/test_manager.py | 5 +++++
2 files changed, 28 insertions(+), 24 deletions(-)
diff --git a/ooni/managers.py b/ooni/managers.py
index f6d4c74..26ac3f9 100644
--- a/ooni/managers.py
+++ b/ooni/managers.py
@@ -18,9 +18,14 @@ class TaskManager(object):
failures = []
concurrency = 10
- _tasks = iter()
+ _tasks = iter(())
_active_tasks = []
+ def _tasksDone(self):
+ self.tasksDone.callback(None)
+
+ self.tasksDone = defer.Deferred()
+
def _failed(self, failure, task):
"""
The has failed to complete, we append it to the end of the task chain
@@ -31,9 +36,9 @@ class TaskManager(object):
if task.failures < self.retries:
self._tasks = itertools.chain(self._tasks,
- iter(task))
+ makeIterable(task))
- self.fillSlots()
+ self._fillSlots()
self.failed(failure, task)
@@ -42,11 +47,12 @@ class TaskManager(object):
Called on test completion and schedules measurements to be run for the
available slots.
"""
- for _ in range(self.availableSlots()):
+ for _ in range(self.availableSlots):
try:
task = self._tasks.next()
self._run(task)
except StopIteration:
+ self._tasksDone()
break
def _suceeded(self, result, task):
@@ -56,7 +62,7 @@ class TaskManager(object):
self._active_tasks.remove(task)
self.completedTasks += 1
- self.fillSlots()
+ self._fillSlots()
self.suceeded(result, task)
@@ -68,13 +74,14 @@ class TaskManager(object):
self._active_tasks.append(task)
d = task.run()
- d.addCallback(self.succeeded)
- d.addCallback(self.failed)
+ d.addCallback(self.succeeded, task)
+ d.addErrback(self.failed, task)
@property
def failedMeasurements(self):
return len(self.failures)
+ @property
def availableSlots(self):
"""
Returns the number of available slots for running tests.
@@ -86,19 +93,15 @@ class TaskManager(object):
Takes as argument a single task or a task iterable and appends it to the task
generator queue.
"""
- self._tasks = itertools.chain(self._tasks, task_or_task_iterator)
- def start(self):
- self.initializeTaskList()
+ iterable = makeIterable(task_or_task_iterator)
+
+ self._tasks = itertools.chain(self._tasks, iterable)
self._fillSlots()
- def initializeTaskList(self):
- """
- This should contain all the logic that gets run at first start to
- pre-populate the list of tasks to be run and the tasks currently
- running.
- """
- raise NotImplemented
+ def start(self):
+ self.tasksDone = defer.Deferred()
+ self._fillSlots()
def failed(self, failure, task):
"""
@@ -132,13 +135,6 @@ class MeasurementsManager(TaskManager):
director = None
- def __init__(self, netTests=None):
- self.netTests = netTests if netTests else []
-
- def initializeTaskList(self):
- for net_test in self.netTests:
- self.schedule(net_test.generateMeasurements())
-
def suceeded(self, result, measurement):
pass
@@ -146,6 +142,8 @@ class MeasurementsManager(TaskManager):
pass
class Report(object):
+ reportEntryManager = None
+
def __init__(self, reporters, net_test):
"""
This will instantiate all the reporters and add them to the list of
@@ -178,6 +176,7 @@ class Report(object):
@reporter.created.addCallback
def cb(result):
report_write_task = ReportWrite(reporter, measurement)
+ self.reportEntryManager.schedule(report_write_task)
class ReportEntryManager(object):
diff --git a/tests/test_manager.py b/tests/test_manager.py
index 0ed77b0..f5ac052 100644
--- a/tests/test_manager.py
+++ b/tests/test_manager.py
@@ -65,6 +65,11 @@ class TestNetTest(unittest.TestCase):
self.assertEqual([(DummyTestCase, 'test_a'), (DummyTestCase,
'test_b')], net_test.test_cases)
+ def test_net_test_timeout(self):
+ """Instantiate a test and verify that the timeout works properly when we call it."""
+ net_test = NetTest(net_test_file, dummyInputs, dummyOptions)
+ # Where net_test_file is a test that will take longer than
+
class TestMeasurementsTracker(unittest.TestCase):
def setUp(self):
self.mock_mt = MeasurementsTracker(DummyManager())
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits