[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [stem/master] Implementation of DescriptorReader concurrency
commit 2d620558fa0885c532e8759c497881146a89ba0a
Author: Damian Johnson <atagar@xxxxxxxxxxxxxx>
Date: Sat Mar 10 18:09:52 2012 -0800
Implementation of DescriptorReader concurrency
After much hair pulling figured out a relatively producer/consumer simple model
for this class. It should be trivial to add stop() later, but making this
re-runable would greatly complicate the class and probably isn't worth it.
This isn't yet working, but this is a decent breaking point.
---
stem/descriptor/reader.py | 114 ++++++++++++++++++++++++++++++++++++---------
1 files changed, 92 insertions(+), 22 deletions(-)
diff --git a/stem/descriptor/reader.py b/stem/descriptor/reader.py
index b0c8f9a..8bdb5b4 100644
--- a/stem/descriptor/reader.py
+++ b/stem/descriptor/reader.py
@@ -52,6 +52,7 @@ save_processed_files - Saves a listing of processed files.
DescriptorReader - Iterator for descriptor data on the local file system.
|- get_processed_files - provides the listing of files that we've processed
|- set_processed_files - sets our tracking of the files we have processed
+ |- register_skip_listener - adds a listener that's notified of skipped files
|- start - begins reading descriptor data
|- stop - stops reading descriptor data
|- join - joins on the thread used to process descriptor data
@@ -64,6 +65,17 @@ import threading
import mimetypes
import Queue
+# TODO: Unimplemented concurrency features...
+# - stop()
+# - restarting when __iter__ is called additional times
+# - maximum read-ahead
+
+# Maximum number of descriptors that we'll read ahead before waiting for our
+# caller to fetch some of them. This is included to avoid unbounded memory
+# usage. This condition will be removed if set to zero.
+
+MAX_STORED_DESCRIPTORS = 20
+
def load_processed_files(path):
"""
Loads a dictionary of 'path => last modified timestamp' mappings, as
@@ -136,10 +148,28 @@ class DescriptorReader(threading.Thread):
text files, tarball archives (gzip or bzip2), or recurse directories.
"""
- def __init__(self, targets):
- self.targets = targets
- self.skip_listeners = []
- self.processed_files = {}
+ def __init__(self, targets, follow_links = False):
+ threading.Thread.__init__(self, name="Descriptor Reader")
+ self.setDaemon(True)
+
+ # flag to indicate if we'll follow symlinks when transversing directories
+ self._follow_links = follow_links
+
+ self._targets = targets
+ self._skip_listeners = []
+ self._processed_files = {}
+
+ self._iter_lock = threading.RLock()
+ self._iter_notice = threading.Event()
+
+ # targets that remain to be read in this iteration
+ self._remaining_files = list(self._targets)
+
+ # descriptors that we have read, but not yet provided to the caller
+ self._unreturned_descriptors = Queue.Queue()
+
+ # TODO: implement
+ # flag that's set when we're done
self._stop_event = threading.Event()
def get_processed_files(self):
@@ -156,7 +186,7 @@ class DescriptorReader(threading.Thread):
files we have processed
"""
- return dict(self.processed_files)
+ return dict(self._processed_files)
def set_processed_files(self, processed_files):
"""
@@ -169,7 +199,7 @@ class DescriptorReader(threading.Thread):
timestamps for the last modified time (int)
"""
- self.processed_files = dict(processed_files)
+ self._processed_files = dict(processed_files)
def register_skip_listener(self, listener):
"""
@@ -184,7 +214,7 @@ class DescriptorReader(threading.Thread):
valid descriptor data
"""
- self.skip_listeners.append(listener)
+ self._skip_listeners.append(listener)
def stop(self):
"""
@@ -192,23 +222,63 @@ class DescriptorReader(threading.Thread):
"""
self._stop_event.set()
+ self._iter_notice.set()
def run(self):
- # os.walk(path, followlinks = True)
- #
- # >>> mimetypes.guess_type("/home/atagar/Desktop/control-spec.txt")
- # ('text/plain', None)
- #
- # >>> mimetypes.guess_type("/home/atagar/Desktop/server-descriptors-2012-03.tar.bz2")
- # ('application/x-tar', 'bzip2')
- #
- # This only checks the file extension. To actually check the content (like
- # the 'file' command) an option would be pymagic...
- # https://github.com/cloudburst/pymagic
-
-
- while not self._stop_event.isSet():
- pass # TODO: implement
+ while self._remaining_files:
+ target = self._remaining_files.pop(0)
+ if not os.path.exists(target): continue
+
+ if os.path.isdir(target):
+ # adds all of the files that it contains
+ for root, _, files in os.walk(target, followlinks = self._follow_links):
+ for filename in files:
+ self._remaining_files.append(os.path.join(root, filename))
+ else:
+ # This is a file. Register it's last modified timestamp and check if
+ # it's a file that we should skip.
+
+ last_modified = os.stat(target).st_mtime
+ last_used = self._processed_files.get(target)
+
+ if last_used and last_used >= last_modified:
+ continue
+ else:
+ self._processed_files[target] = last_modified
+
+ # The mimetypes module only checks the file extension. To actually
+ # check the content (like the 'file' command) we'd need something like
+ # pymagic (https://github.com/cloudburst/pymagic).
+
+ target_type = mimetypes.guess_type(target)
+
+ if target_type[0] in (None, 'text/plain'):
+ # if either a '.txt' or unknown type then try to process it as a
+ # descriptor file
+
+ with open(target) as target_file:
+ # TODO: replace with actual descriptor parsing when we have it
+ self._unreturned_descriptors.put(target_file.read())
+ self._iter_notice.set()
+ elif target_type[0] == 'application/x-tar':
+ if target_type[1] == 'gzip':
+ pass # TODO: implement
+ elif target_type[1] == 'bzip2':
+ pass # TODO: implement
+
+ # TODO: bug: __iter__ should finish with the _unreturned_descriptors
+ # contents. Could be fixed by adding a 'is done reading' event.
+ self._stop_event.set()
+ self._iter_notice.set()
+
+ def __iter__(self):
+ with self._iter_lock:
+ while not self._stop_event.isSet():
+ try:
+ yield self._unreturned_descriptors.get_nowait()
+ except Queue.Empty:
+ self._iter_notice.wait()
+ self._iter_notice.clear()
def _notify_skip_listener(self, path, exception):
for listener in self.skip_listeners:
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits