[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [stem/master] Fix and test for DescriptorReader stop() method



commit 932ded67f961c67c515750bd6e1c9ddb735f8559
Author: Damian Johnson <atagar@xxxxxxxxxxxxxx>
Date:   Sat Mar 10 23:30:28 2012 -0800

    Fix and test for DescriptorReader stop() method
    
    Adding an integraion test and some fixes for the stop() method of the
    DescriptorReader class.
---
 stem/descriptor/reader.py       |   19 +++++++++++--------
 test/integ/descriptor/reader.py |   28 ++++++++++++++++++++++++++++
 2 files changed, 39 insertions(+), 8 deletions(-)

diff --git a/stem/descriptor/reader.py b/stem/descriptor/reader.py
index 0784ce4..c3bf1aa 100644
--- a/stem/descriptor/reader.py
+++ b/stem/descriptor/reader.py
@@ -66,7 +66,6 @@ import mimetypes
 import Queue
 
 # TODO: Unimplemented concurrency features...
-# - stop()
 # - restarting when __iter__ is called additional times
 # - maximum read-ahead
 
@@ -162,15 +161,16 @@ class DescriptorReader(threading.Thread):
     self._iter_lock = threading.RLock()
     self._iter_notice = threading.Event()
     
-    # targets that remain to be read in this iteration
+    # files or directories that remain to be read
     self._remaining_files = list(self._targets)
     
     # descriptors that we have read, but not yet provided to the caller
     self._unreturned_descriptors = Queue.Queue()
     
-    # TODO: implement
-    # flag that's set when we're done
-    self._stop_event = threading.Event()
+    # flag that's set when stop() is called
+    self._stop_called = threading.Event()
+    
+    # flag that's set when we're out of descriptor files to read
     self._finished_reading = threading.Event()
   
   def get_processed_files(self):
@@ -222,11 +222,11 @@ class DescriptorReader(threading.Thread):
     Stops further reading of descriptor files.
     """
     
-    self._stop_event.set()
+    self._stop_called.set()
     self._iter_notice.set()
   
   def run(self):
-    while self._remaining_files:
+    while self._remaining_files and not self._stop_called.isSet():
       target = self._remaining_files.pop(0)
       if not os.path.exists(target): continue
       
@@ -235,6 +235,9 @@ class DescriptorReader(threading.Thread):
         for root, _, files in os.walk(target, followlinks = self._follow_links):
           for filename in files:
             self._remaining_files.append(os.path.join(root, filename))
+          
+          # this can take a while if, say, we're including the root directory
+          if self._stop_called.isSet(): break
       else:
         # This is a file. Register it's last modified timestamp and check if
         # it's a file that we should skip.
@@ -272,7 +275,7 @@ class DescriptorReader(threading.Thread):
   
   def __iter__(self):
     with self._iter_lock:
-      while not self._stop_event.isSet():
+      while not self._stop_called.isSet():
         try:
           yield self._unreturned_descriptors.get_nowait()
         except Queue.Empty:
diff --git a/test/integ/descriptor/reader.py b/test/integ/descriptor/reader.py
index ca77e26..5fa61b8 100644
--- a/test/integ/descriptor/reader.py
+++ b/test/integ/descriptor/reader.py
@@ -3,6 +3,8 @@ Integration tests for stem.descriptor.reader.
 """
 
 import os
+import time
+import signal
 import unittest
 
 import stem.descriptor.reader
@@ -134,4 +136,30 @@ class TestDescriptorReader(unittest.TestCase):
     
     # check that we've seen all of the descriptor_entries
     self.assertTrue(len(descriptor_entries) == 0)
+  
+  def test_stop(self):
+    """
+    Runs a DescriptorReader over the root directory, then checks that calling
+    stop() makes it terminate in a timely fashion.
+    """
+    
+    is_test_running = True
+    reader = stem.descriptor.reader.DescriptorReader(["/"])
+    
+    # Fails the test after a couple seconds if we don't finish successfully.
+    # Depending on what we're blocked on this might not work when the test
+    # fails, requiring that we give a manual kill to the test.
+    
+    def timeout_handler(signum, frame):
+      if is_test_running:
+        self.fail()
+    
+    signal.signal(signal.SIGALRM, timeout_handler)
+    signal.alarm(2)
+    
+    reader.start()
+    time.sleep(0.1)
+    reader.stop()
+    reader.join()
+    is_test_running = False
 



_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits