[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [ooni-probe/master] Work with aagbsn to make the TaskManager pass all tests
commit 8fc8056fa1796c3f5d096d98076aa9d0bbbe47d9
Author: Arturo Filastò <art@xxxxxxxxx>
Date: Sat Jan 12 16:56:47 2013 +0100
Work with aagbsn to make the TaskManager pass all tests
Tests will retry to run until the retry limit has been reached
Test failures are properly kept track of
---
ooni/managers.py | 35 +++++-----
ooni/tasks.py | 19 ++++--
tests/test_managers.py | 174 ++++++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 205 insertions(+), 23 deletions(-)
diff --git a/ooni/managers.py b/ooni/managers.py
index 26ac3f9..be11473 100644
--- a/ooni/managers.py
+++ b/ooni/managers.py
@@ -18,14 +18,11 @@ class TaskManager(object):
failures = []
concurrency = 10
+ completedTasks = 0
+
_tasks = iter(())
_active_tasks = []
- def _tasksDone(self):
- self.tasksDone.callback(None)
-
- self.tasksDone = defer.Deferred()
-
def _failed(self, failure, task):
"""
The has failed to complete, we append it to the end of the task chain
@@ -34,14 +31,16 @@ class TaskManager(object):
self._active_tasks.remove(task)
self.failures.append((failure, task))
- if task.failures < self.retries:
+ if task.failures <= self.retries:
self._tasks = itertools.chain(self._tasks,
makeIterable(task))
-
- self._fillSlots()
+ else:
+ task.done.callback((failure, task))
self.failed(failure, task)
+ self._fillSlots()
+
def _fillSlots(self):
"""
Called on test completion and schedules measurements to be run for the
@@ -52,10 +51,9 @@ class TaskManager(object):
task = self._tasks.next()
self._run(task)
except StopIteration:
- self._tasksDone()
break
- def _suceeded(self, result, task):
+ def _succeeded(self, result, task):
"""
We have successfully completed a measurement.
"""
@@ -64,7 +62,9 @@ class TaskManager(object):
self._fillSlots()
- self.suceeded(result, task)
+ task.done.callback(result)
+
+ self.succeeded(result, task)
def _run(self, task):
"""
@@ -73,9 +73,9 @@ class TaskManager(object):
"""
self._active_tasks.append(task)
- d = task.run()
- d.addCallback(self.succeeded, task)
- d.addErrback(self.failed, task)
+ d = task.start()
+ d.addCallback(self._succeeded, task)
+ d.addErrback(self._failed, task)
@property
def failedMeasurements(self):
@@ -93,13 +93,14 @@ class TaskManager(object):
Takes as argument a single task or a task iterable and appends it to the task
generator queue.
"""
-
iterable = makeIterable(task_or_task_iterator)
self._tasks = itertools.chain(self._tasks, iterable)
self._fillSlots()
def start(self):
+ self.failures = []
+
self.tasksDone = defer.Deferred()
self._fillSlots()
@@ -135,7 +136,7 @@ class MeasurementsManager(TaskManager):
director = None
- def suceeded(self, result, measurement):
+ def succeeded(self, result, measurement):
pass
def failed(self, failure, measurement):
@@ -192,7 +193,7 @@ class ReportEntryManager(object):
def initializeTaskList(self):
pass
- def suceeded(self, result, measurement):
+ def succeeded(self, result, measurement):
pass
def failed(self, failure, measurement):
diff --git a/ooni/tasks.py b/ooni/tasks.py
index 909f3e5..55f09bd 100644
--- a/ooni/tasks.py
+++ b/ooni/tasks.py
@@ -1,24 +1,31 @@
+from twisted.internet import defer
+
class BaseTask(object):
_timer = None
def __init__(self):
self.running = False
self.failures = 0
+ # This is a deferred that gets called when a test has reached it's
+ # final status, this means: all retries have been attempted or the test
+ # has successfully executed.
+ self.done = defer.Deferred()
def _failed(self, failure):
self.failures += 1
self.failed(failure)
- return
+ return failure
- def _run(self):
+ def _succeeded(self, result):
+ self.succeeded(result)
+ return result
+
+ def start(self):
d = self.run()
d.addErrback(self._failed)
d.addCallback(self._succeeded)
return d
- def _succeeded(self, result):
- self.succeeded(result)
-
def succeeded(self, result):
"""
Place here the logic to handle a successful execution of the task.
@@ -113,7 +120,7 @@ class Measurement(TaskWithTimeout):
d.addErrback(self.failure)
return d
-class ReportEntry(TimedOutTask):
+class ReportEntry(TaskWithTimeout):
def __init__(self, reporter, measurement):
self.reporter = reporter
self.measurement = measurement
diff --git a/tests/test_managers.py b/tests/test_managers.py
new file mode 100644
index 0000000..59eba5f
--- /dev/null
+++ b/tests/test_managers.py
@@ -0,0 +1,174 @@
+from twisted.trial import unittest
+from twisted.python import failure
+from twisted.internet import defer
+
+from ooni.tasks import BaseTask
+from ooni.managers import TaskManager
+
+
+mockFailure = failure.Failure(Exception('mock'))
+
+class MockSuccessTask(BaseTask):
+ def run(self):
+ return defer.succeed(42)
+
+class MockFailTask(BaseTask):
+ def run(self):
+ return defer.fail(mockFailure)
+
+class MockFailOnceTask(BaseTask):
+ def run(self):
+ if self.failures >= 1:
+ return defer.succeed(42)
+ else:
+ return defer.fail(mockFailure)
+
+class MockTaskManager(TaskManager):
+ def __init__(self):
+ self.successes = []
+
+ def failed(self, failure, task):
+ # print "TASK"
+ # print task
+ # print "FAILURES (%s)" % task.failures
+ # print failure
+ pass
+
+ def succeeded(self, result, task):
+ self.successes.append((result, task))
+
+class TestTaskManager(unittest.TestCase):
+ def setUp(self):
+ self.taskManager = MockTaskManager()
+ self.taskManager.concurrency = 10
+ self.taskManager.retries = 2
+
+ self.taskManager.start()
+
+ def tearDown(self):
+ pass
+
+ def test_schedule_successful_one_task(self):
+ mock_task = MockSuccessTask()
+ self.taskManager.schedule(mock_task)
+
+ @mock_task.done.addCallback
+ def done(res):
+ self.assertEqual(self.taskManager.successes,
+ [(42, mock_task)])
+ return mock_task.done
+
+ def test_schedule_failing_one_task(self):
+ mock_task = MockFailTask()
+ self.taskManager.schedule(mock_task)
+
+ @mock_task.done.addCallback
+ def done(failure):
+ self.assertEqual(len(self.taskManager.failures), 3)
+
+ self.assertEqual(failure, (mockFailure, mock_task))
+
+ return mock_task.done
+
+ def test_schedule_successful_ten_tasks(self):
+ all_done = []
+ for x in range(10):
+ mock_task = MockSuccessTask()
+ all_done.append(mock_task.done)
+ self.taskManager.schedule(mock_task)
+
+ d = defer.DeferredList(all_done)
+ @d.addCallback
+ def done(res):
+ for task_result, task_instance in self.taskManager.successes:
+ self.assertEqual(task_result, 42)
+ self.assertIsInstance(task_instance, MockSuccessTask)
+
+ return d
+
+ def test_schedule_failing_ten_tasks(self):
+ all_done = []
+ for x in range(10):
+ mock_task = MockFailTask()
+ all_done.append(mock_task.done)
+ self.taskManager.schedule(mock_task)
+
+ d = defer.DeferredList(all_done)
+ @d.addCallback
+ def done(res):
+ # 10*2 because 2 is the number of retries
+ self.assertEqual(len(self.taskManager.failures), 10*3)
+ for task_result, task_instance in self.taskManager.failures:
+ self.assertEqual(task_result, mockFailure)
+ self.assertIsInstance(task_instance, MockFailTask)
+
+ return d
+
+ def test_schedule_successful_27_tasks(self):
+ all_done = []
+ for x in range(27):
+ mock_task = MockSuccessTask()
+ all_done.append(mock_task.done)
+ self.taskManager.schedule(mock_task)
+
+ d = defer.DeferredList(all_done)
+ @d.addCallback
+ def done(res):
+ for task_result, task_instance in self.taskManager.successes:
+ self.assertEqual(task_result, 42)
+ self.assertIsInstance(task_instance, MockSuccessTask)
+
+ return d
+
+ def test_schedule_failing_27_tasks(self):
+ all_done = []
+ for x in range(27):
+ mock_task = MockFailTask()
+ all_done.append(mock_task.done)
+ self.taskManager.schedule(mock_task)
+
+ d = defer.DeferredList(all_done)
+ @d.addCallback
+ def done(res):
+ # 10*2 because 2 is the number of retries
+ self.assertEqual(len(self.taskManager.failures), 27*3)
+ for task_result, task_instance in self.taskManager.failures:
+ self.assertEqual(task_result, mockFailure)
+ self.assertIsInstance(task_instance, MockFailTask)
+
+ return d
+
+
+ def test_task_retry_and_succeed(self):
+ mock_task = MockFailOnceTask()
+ self.taskManager.schedule(mock_task)
+
+ @mock_task.done.addCallback
+ def done(res):
+ self.assertEqual(len(self.taskManager.failures), 1)
+
+ self.assertEqual(self.taskManager.failures,
+ [(mockFailure, mock_task)])
+ self.assertEqual(self.taskManager.successes,
+ [(42, mock_task)])
+
+ return mock_task.done
+
+ def test_task_retry_and_succeed_56_tasks(self):
+ all_done = []
+ for x in range(56):
+ mock_task = MockFailOnceTask()
+ all_done.append(mock_task.done)
+ self.taskManager.schedule(mock_task)
+
+ d = defer.DeferredList(all_done)
+ @d.addCallback
+ def done(res):
+ self.assertEqual(len(self.taskManager.failures), 56)
+
+ for task_result, task_instance in self.taskManager.successes:
+ self.assertEqual(task_result, 42)
+ self.assertIsInstance(task_instance, MockFailOnceTask)
+
+ return d
+
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits