[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [stem/master] Fixing sinister concurrency issue



commit 664552e72719776f17cd654928dbf5d0a12e48d9
Author: Damian Johnson <atagar@xxxxxxxxxxxxxx>
Date:   Sun Mar 11 13:41:48 2012 -0700

    Fixing sinister concurrency issue
    
    Replacing the _is_reading event flag with a 'FINISHED' entry in the
    _unreturned_descriptors queue. This is because python's queues stupidly have no
    notion of flushing, so there's no method for me to make a reliable check of 'if
    the reading thread is finished AND the queue is empty'. I may have called
    'put'. I may have a proveably not-empty queue. But can I make that check work?
    Nooooo. That is... frustrating. >:(
---
 stem/descriptor/reader.py |   25 ++++++++++++++-----------
 1 files changed, 14 insertions(+), 11 deletions(-)

diff --git a/stem/descriptor/reader.py b/stem/descriptor/reader.py
index a8f3284..9be6e8f 100644
--- a/stem/descriptor/reader.py
+++ b/stem/descriptor/reader.py
@@ -81,6 +81,9 @@ import Queue
 
 MAX_STORED_DESCRIPTORS = 20
 
+# flag to indicate when the reader thread is out of descriptor files to read
+FINISHED = "DONE"
+
 def load_processed_files(path):
   """
   Loads a dictionary of 'path => last modified timestamp' mappings, as
@@ -169,10 +172,11 @@ class DescriptorReader(threading.Thread):
     
     self._iter_lock = threading.RLock()
     self._iter_notice = threading.Event()
-    self._is_reading = threading.Event()
     self._is_stopped = threading.Event()
     
-    # descriptors that we have read, but not yet provided to the caller
+    # Descriptors that we have read but not yet provided to the caller. A
+    # FINISHED entry is used by the reading thread to indicate the end.
+    
     self._unreturned_descriptors = Queue.Queue()
   
   def get_processed_files(self):
@@ -229,10 +233,9 @@ class DescriptorReader(threading.Thread):
     self._iter_notice.set()
   
   def run(self):
-    self._is_reading.set()
     remaining_files = list(self._targets)
     
-    while remaining_files and not self._is_stopped.isSet():
+    while remaining_files and not self._is_stopped.is_set():
       target = remaining_files.pop(0)
       if not os.path.exists(target): continue
       
@@ -243,7 +246,7 @@ class DescriptorReader(threading.Thread):
             remaining_files.append(os.path.join(root, filename))
           
           # this can take a while if, say, we're including the root directory
-          if self._is_stopped.isSet(): break
+          if self._is_stopped.is_set(): break
       else:
         # This is a file. Register it's last modified timestamp and check if
         # it's a file that we should skip.
@@ -277,18 +280,18 @@ class DescriptorReader(threading.Thread):
           elif target_type[1] == 'bzip2':
             pass # TODO: implement
     
-    self._is_reading.clear()
+    self._unreturned_descriptors.put(FINISHED)
     self._iter_notice.set()
   
   def __iter__(self):
     with self._iter_lock:
-      while not self._is_stopped.isSet():
+      while not self._is_stopped.is_set():
         try:
-          yield self._unreturned_descriptors.get_nowait()
-        except Queue.Empty:
-          # if we've finished and there aren't any descriptors then we're done
-          if not self._is_reading.isSet() and self._unreturned_descriptors.empty(): break
+          descriptor = self._unreturned_descriptors.get_nowait()
           
+          if descriptor == FINISHED: break
+          else: yield descriptor
+        except Queue.Empty:
           self._iter_notice.wait()
           self._iter_notice.clear()
   



_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits