[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [ooni-probe/master] * Start implementing worker
commit 2e6edd21f3a7b272c2c487b8ce399fe2e0fde3a3
Author: Arturo Filastò <hellais@xxxxxxxxx>
Date: Fri Mar 16 20:16:55 2012 -0700
* Start implementing worker
* Add hackish resume support
---
plugoo/tests.py | 25 ++++++++++++++++++++-----
plugoo/workers.py | 36 ++++++++++++++++++++++++++++++++++++
2 files changed, 56 insertions(+), 5 deletions(-)
diff --git a/plugoo/tests.py b/plugoo/tests.py
index d57c0a8..3d6b36b 100644
--- a/plugoo/tests.py
+++ b/plugoo/tests.py
@@ -32,7 +32,7 @@ class Test:
"""
pass
- def load_assets(self, assets):
+ def load_assets(self, assets, index=None):
"""
Takes as input an array of Asset objects and
outputs an iterator for the loaded assets.
@@ -57,21 +57,36 @@ class Test:
smallassets = list(assets)
smallassets.pop(bigidx)
+ i = 0
for x in assets[bigidx]:
if asset_count > 1:
# XXX this will only work in python 2.6, maybe refactor?
for comb in itertools.product(*smallassets):
- yield (x,) + comb
+ if index and i < index:
+ i += 1
+ else:
+ yield (x,) + comb
else:
- yield (x)
+ if index and i < index:
+ i += 1
+ else:
+ yield (x)
- def run(self, assets=None, buffer=10, timeout=100000):
+ def run(self, assets=None, extradata=None, buffer=10, timeout=100000):
self.logger.info("Starting %s", self.name)
jobs = []
if assets:
self.logger.debug("Running through tests")
- for i, data in enumerate(self.load_assets(assets)):
+
+ if extradata['index']:
+ index = extradata['index']
+ else:
+ index = None
+
+ for i, data in enumerate(self.load_assets(assets, index)):
args = {'data': data}
+ if extradata:
+ args = dict(args.items()+extradata.items())
# Append to the job queue
jobs.append(gevent.spawn(self.experiment, **args))
# If the buffer is full run the jobs
diff --git a/plugoo/workers.py b/plugoo/workers.py
new file mode 100644
index 0000000..3c8397d
--- /dev/null
+++ b/plugoo/workers.py
@@ -0,0 +1,36 @@
+import gevent
+from gevent.pool import Pool
+
+class WorkFactory:
+ """
+ This class is responsible for producing
+ units of work.
+ """
+ def __init__(self, assets=None,
+ nodes=None, rule=None):
+ pass
+
+ def _process_rule(self):
+ pass
+
+ def get_work_unit():
+ pass
+
+class UnitOfWork:
+ def __init__(self, tests, poolsize=20,
+ unit_of_work=None):
+ pass
+
+ def _read_unit_of_work(self):
+ pass
+
+ def _build_pools(self):
+ for i, x in enumerate(self.tests):
+ if i % self.poolsize == 0:
+
+
+ def do(self):
+ with gevent.Timeout():
+ self.pool.join()
+
+
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits