[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [ooni-probe/master] Start outlining the Manager classes (#7852)
commit 559c3fa2969346234c86a510ad9c78c6d0baa21b
Author: Arturo Filastò <art@xxxxxxxxx>
Date: Thu Jan 10 16:08:45 2013 +0100
Start outlining the Manager classes (#7852)
These will keep track of the running measurements and reporting tasks.
---
ooni/managers.py | 130 ++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 130 insertions(+), 0 deletions(-)
diff --git a/ooni/managers.py b/ooni/managers.py
new file mode 100644
index 0000000..2bffe0f
--- /dev/null
+++ b/ooni/managers.py
@@ -0,0 +1,130 @@
+import itertools
+
+from .ratelimiting import StaticRateLimiter
+from .measurements import Measurement, NetTest
+
+class MeasurementsManager(object):
+ """
+ This is the Measurement Tracker. In here we keep track of active measurements
+ and issue new measurements once the active ones have been completed.
+
+ MeasurementTracker does not keep track of the typology of measurements that
+ it is running. It just considers a measurement something that has an input
+ and a method to be called.
+
+ NetTest on the contrary is aware of the typology of measurements that it is
+ dispatching as they are logically grouped by test file.
+ """
+ retries = 2
+
+ failures = []
+ concurrency = 10
+
+ _measurements = iter()
+ _active_measurements = []
+
+ def __init__(self, manager, netTests=None):
+ self.netTests = netTests if netTests else []
+ self.manager = manager
+
+ @property
+ def failedMeasurements(self):
+ return len(self.failures)
+
+ def start(self):
+ """
+ Start running the measurements.
+ """
+ self.populateMeasurements()
+ self.runMoreMeasurements()
+
+ def populateMeasurements(self):
+ """
+ Take all the setup netTests and create the measurements iterator from
+ them.
+ """
+ for net_test in self.netTests:
+ self._measurements = itertools.chain(self._measurements,
+ net_test.generateMeasurements())
+
+ def availableSlots(self):
+ """
+ Returns the number of available slots for running tests.
+ """
+ return self.concurrency - len(self._active_measurements)
+
+ def schedule(self, measurement):
+ self._active_measurements.append(measurement)
+
+ d = measurement.run()
+ d.addCallback(self.done)
+ d.addCallback(self.failed)
+ return d
+
+ def fillSlots(self):
+ """
+ Called on test completion and schedules measurements to be run for the
+ available slots.
+ """
+ for _ in range(self.availableSlots()):
+ try:
+ measurement = self._measurements.next()
+ self.schedule(measurement)
+ except StopIteration:
+ break
+
+ def done(self, result, measurement):
+ """
+ We have successfully completed a measurement.
+ """
+ self._active_measurements.remove(measurement)
+ self.completedMeasurements += 1
+
+ self.fillSlots()
+
+ def failed(self, failure, measurement):
+ """
+ The measurement has failed to complete.
+ """
+ self._active_measurements.remove(measurement)
+ self.failures.append((failure, measurement))
+
+ if measurement.failures < self.retries:
+ self._measurements = itertools.chain(self._measurements,
+ iter(measurement))
+
+ self.fillSlots()
+
+class OManager(object):
+ """
+ Singleton object responsible for managing the Measurements Tracker and the
+ Reporting Tracker.
+ """
+
+ _scheduledTests = 0
+
+ def __init__(self, reporters=[]):
+ self.reporters = reporters
+
+ self.netTests = []
+
+ self.measurements_tracker = MeasurementsTracker(manager=self,
+ netTests=self.netTests)
+ self.measurements_tracker.manager = self
+
+ def writeReport(self, measurement):
+ """
+ Write to all the configured reporters.
+ """
+ for reporter in self.reporters:
+ reporter.write(measurement)
+
+ def writeFailure(self, measurement, failure):
+ pass
+
+ def addNetTest(self, net_test):
+ """
+ This is called to add a NetTest to the list of running network tests.
+ """
+ self.netTests.append(net_test)
+
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits