[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [stem/master] Allowing the DescriptorReader to be run multiple times
commit ca837ddcb6f7cc6d118d4e1a774d2c4b22dbda79
Author: Damian Johnson <atagar@xxxxxxxxxxxxxx>
Date: Mon Mar 12 08:52:17 2012 -0700
Allowing the DescriptorReader to be run multiple times
Improving the usability of the DescriptorReader class by making it so callers
can reuse instance multiple times to get descriptor changes since the last run.
---
stem/descriptor/reader.py | 59 ++++++++++++++++++++++++--------------
test/integ/descriptor/reader.py | 26 ++++++++++++++++-
2 files changed, 62 insertions(+), 23 deletions(-)
diff --git a/stem/descriptor/reader.py b/stem/descriptor/reader.py
index 9ef6781..27557da 100644
--- a/stem/descriptor/reader.py
+++ b/stem/descriptor/reader.py
@@ -33,15 +33,15 @@ again...
reader.set_processed_files(processed_files)
except: pass # could not load, mabye this is the first run
- with reader:
- start_time = time.time()
-
- while time.time() - start_time < 60:
- # prints any descriptors that have changed since last checked
+ start_time = time.time()
+
+ while time.time() - start_time < 60:
+ # prints any descriptors that have changed since last checked
+ with reader:
for descriptor in reader:
print descriptor
-
- time.sleep(1)
+
+ time.sleep(1)
save_processed_files(reader.get_processed_files(), "/tmp/used_descriptors")
@@ -55,7 +55,6 @@ DescriptorReader - Iterator for descriptor data on the local file system.
|- register_skip_listener - adds a listener that's notified of skipped files
|- start - begins reading descriptor data
|- stop - stops reading descriptor data
- |- join - joins on the thread used to process descriptor data
|- __enter__ / __exit__ - manages the descriptor reader thread in the context
+- __iter__ - iterates over descriptor data in unread files
@@ -71,14 +70,9 @@ import threading
import mimetypes
import Queue
-# TODO: Unimplemented concurrency features...
-# - restarting when __iter__ is called additional times
-# - maximum read-ahead
-
# TODO: Remianing impementation items...
-# - impelment skip listening and add a test for it
-# - remove start and join methods from header?
# - implement gzip and bz2 reading
+# - maximum read-ahead
# Maximum number of descriptors that we'll read ahead before waiting for our
# caller to fetch some of them. This is included to avoid unbounded memory
@@ -182,7 +176,7 @@ def save_processed_files(processed_files, path):
output_file.write("%s %i\n" % (path, timestamp))
-class DescriptorReader(threading.Thread):
+class DescriptorReader:
"""
Iterator for the descriptor data on the local file system. This can process
text files, tarball archives (gzip or bzip2), or recurse directories.
@@ -194,17 +188,19 @@ class DescriptorReader(threading.Thread):
"""
def __init__(self, targets, follow_links = False):
- threading.Thread.__init__(self, name="Descriptor Reader")
- self.setDaemon(True)
-
self._targets = targets
self._follow_links = follow_links
self._skip_listeners = []
self._processed_files = {}
+ self._reader_thread = None
+ self._reader_thread_lock = threading.RLock()
+
self._iter_lock = threading.RLock()
self._iter_notice = threading.Event()
+
self._is_stopped = threading.Event()
+ self._is_stopped.set()
# Descriptors that we have read but not yet provided to the caller. A
# FINISHED entry is used by the reading thread to indicate the end.
@@ -256,15 +252,35 @@ class DescriptorReader(threading.Thread):
self._skip_listeners.append(listener)
+ def start(self):
+ """
+ Starts reading our descriptor files.
+
+ Raises:
+ ValueError if we're already reading the descriptor files
+ """
+
+ with self._reader_thread_lock:
+ if self._reader_thread:
+ raise ValueError("Already running, you need to call stop() first")
+ else:
+ self._is_stopped.clear()
+ self._reader_thread = threading.Thread(target = self._read_descriptor_files, name="Descriptor Reader")
+ self._reader_thread.setDaemon(True)
+ self._reader_thread.start()
+
def stop(self):
"""
Stops further reading of descriptor files.
"""
- self._is_stopped.set()
- self._iter_notice.set()
+ with self._reader_thread_lock:
+ self._is_stopped.set()
+ self._iter_notice.set()
+ self._reader_thread.join()
+ self._reader_thread = None
- def run(self):
+ def _read_descriptor_files(self):
remaining_files = list(self._targets)
while remaining_files and not self._is_stopped.is_set():
@@ -349,5 +365,4 @@ class DescriptorReader(threading.Thread):
def __exit__(self, type, value, traceback):
self.stop()
- self.join()
diff --git a/test/integ/descriptor/reader.py b/test/integ/descriptor/reader.py
index a6190ef..ded3d2d 100644
--- a/test/integ/descriptor/reader.py
+++ b/test/integ/descriptor/reader.py
@@ -148,6 +148,31 @@ class TestDescriptorReader(unittest.TestCase):
# check that we've seen all of the descriptor_entries
self.assertTrue(len(remaining_entries) == 0)
+ def test_multiple_runs(self):
+ """
+ Runs a DescriptorReader instance multiple times over the same content,
+ making sure that it can be used repeatedly.
+ """
+
+ descriptor_path = os.path.join(DESCRIPTOR_TEST_DATA, "example_descriptor")
+ reader = stem.descriptor.reader.DescriptorReader([descriptor_path])
+
+ with reader:
+ self.assertEquals(len(list(reader)), 1)
+
+ # run it a second time, this shouldn't provide any descriptors because we
+ # have already read it
+
+ with reader:
+ self.assertEquals(len(list(reader)), 0)
+
+ # clear the DescriptorReader's memory of seeing the file and run it again
+
+ reader.set_processed_files([])
+
+ with reader:
+ self.assertEquals(len(list(reader)), 1)
+
def test_stop(self):
"""
Runs a DescriptorReader over the root directory, then checks that calling
@@ -171,7 +196,6 @@ class TestDescriptorReader(unittest.TestCase):
reader.start()
time.sleep(0.1)
reader.stop()
- reader.join()
is_test_running = False
def test_get_processed_files(self):
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits