[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [stem/master] Removing timeout from descriptor reader enqueue calls



commit b56bb55187d6baa00510b98c6ae1c91739476acf
Author: Damian Johnson <atagar@xxxxxxxxxxxxxx>
Date:   Fri Apr 13 20:36:33 2012 -0700

    Removing timeout from descriptor reader enqueue calls
    
    The reader had a timeout for enqueue operations so it could periodically check
    if we were stopped while waiting to insert a descriptor that we've read. As
    Karsten thought we were able to do better. Dropped the timeout by clearing
    our queue on shutdown (to unblock any enqueue calls in process), and skip
    further enqueuing while we're shutting down.
    
    Both iterating and stopping are under a read lock so we don't need to worry
    about this changing the order in which descriptors are provided to callers.
---
 stem/descriptor/reader.py |   28 +++++++++++++++-------------
 1 files changed, 15 insertions(+), 13 deletions(-)

diff --git a/stem/descriptor/reader.py b/stem/descriptor/reader.py
index bbedefc..66ed65a 100644
--- a/stem/descriptor/reader.py
+++ b/stem/descriptor/reader.py
@@ -302,6 +302,13 @@ class DescriptorReader:
     with self._reader_thread_lock:
       self._is_stopped.set()
       self._iter_notice.set()
+      
+      # clears our queue to unblock enqueue calls
+      try:
+        while True:
+          self._unreturned_descriptors.get_nowait()
+      except Queue.Empty: pass
+      
       self._reader_thread.join()
       self._reader_thread = None
   
@@ -352,7 +359,10 @@ class DescriptorReader:
           self._notify_skip_listeners(target, UnrecognizedType(target_type))
     
     self._processed_files = new_processed_files
-    self._enqueue_descriptor(FINISHED)
+    
+    if not self._is_stopped.is_set():
+      self._unreturned_descriptors.put(FINISHED)
+    
     self._iter_notice.set()
   
   def __iter__(self):
@@ -371,7 +381,8 @@ class DescriptorReader:
     try:
       with open(target) as target_file:
         for desc in stem.descriptor.parse_file(target, target_file):
-          self._enqueue_descriptor(desc)
+          if self._is_stopped.is_set(): return
+          self._unreturned_descriptors.put(desc)
           self._iter_notice.set()
     except TypeError, exc:
       self._notify_skip_listeners(target, UnrecognizedType(None))
@@ -388,7 +399,8 @@ class DescriptorReader:
             entry = tar_file.extractfile(tar_entry)
             
             for desc in stem.descriptor.parse_file(target, entry):
-              self._enqueue_descriptor(desc)
+              if self._is_stopped.is_set(): return
+              self._unreturned_descriptors.put(desc)
               self._iter_notice.set()
             
             entry.close()
@@ -397,16 +409,6 @@ class DescriptorReader:
     except IOError, exc:
       self._notify_skip_listeners(target, ReadFailed(exc))
   
-  def _enqueue_descriptor(self, descriptor):
-    # blocks until there is either room for the descriptor or we're stopped
-    
-    while True:
-      try:
-        self._unreturned_descriptors.put(descriptor, timeout = 0.1)
-        return
-      except Queue.Full:
-        if self._is_stopped.is_set(): return
-  
   def _notify_skip_listeners(self, path, exception):
     for listener in self._skip_listeners:
       listener(path, exception)



_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits