[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [stem/master] Removing timeout from descriptor reader enqueue calls
commit b56bb55187d6baa00510b98c6ae1c91739476acf
Author: Damian Johnson <atagar@xxxxxxxxxxxxxx>
Date: Fri Apr 13 20:36:33 2012 -0700
Removing timeout from descriptor reader enqueue calls
The reader had a timeout for enqueue operations so it could periodically check
if we were stopped while waiting to insert a descriptor that we've read. As
Karsten thought we were able to do better. Dropped the timeout by clearing
our queue on shutdown (to unblock any enqueue calls in process), and skip
further enqueuing while we're shutting down.
Both iterating and stopping are under a read lock so we don't need to worry
about this changing the order in which descriptors are provided to callers.
---
stem/descriptor/reader.py | 28 +++++++++++++++-------------
1 files changed, 15 insertions(+), 13 deletions(-)
diff --git a/stem/descriptor/reader.py b/stem/descriptor/reader.py
index bbedefc..66ed65a 100644
--- a/stem/descriptor/reader.py
+++ b/stem/descriptor/reader.py
@@ -302,6 +302,13 @@ class DescriptorReader:
with self._reader_thread_lock:
self._is_stopped.set()
self._iter_notice.set()
+
+ # clears our queue to unblock enqueue calls
+ try:
+ while True:
+ self._unreturned_descriptors.get_nowait()
+ except Queue.Empty: pass
+
self._reader_thread.join()
self._reader_thread = None
@@ -352,7 +359,10 @@ class DescriptorReader:
self._notify_skip_listeners(target, UnrecognizedType(target_type))
self._processed_files = new_processed_files
- self._enqueue_descriptor(FINISHED)
+
+ if not self._is_stopped.is_set():
+ self._unreturned_descriptors.put(FINISHED)
+
self._iter_notice.set()
def __iter__(self):
@@ -371,7 +381,8 @@ class DescriptorReader:
try:
with open(target) as target_file:
for desc in stem.descriptor.parse_file(target, target_file):
- self._enqueue_descriptor(desc)
+ if self._is_stopped.is_set(): return
+ self._unreturned_descriptors.put(desc)
self._iter_notice.set()
except TypeError, exc:
self._notify_skip_listeners(target, UnrecognizedType(None))
@@ -388,7 +399,8 @@ class DescriptorReader:
entry = tar_file.extractfile(tar_entry)
for desc in stem.descriptor.parse_file(target, entry):
- self._enqueue_descriptor(desc)
+ if self._is_stopped.is_set(): return
+ self._unreturned_descriptors.put(desc)
self._iter_notice.set()
entry.close()
@@ -397,16 +409,6 @@ class DescriptorReader:
except IOError, exc:
self._notify_skip_listeners(target, ReadFailed(exc))
- def _enqueue_descriptor(self, descriptor):
- # blocks until there is either room for the descriptor or we're stopped
-
- while True:
- try:
- self._unreturned_descriptors.put(descriptor, timeout = 0.1)
- return
- except Queue.Full:
- if self._is_stopped.is_set(): return
-
def _notify_skip_listeners(self, path, exception):
for listener in self._skip_listeners:
listener(path, exception)
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits