[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [stem/master] Limiting the number of buffered descriptors



commit adbc1991fdae353825b893f5e90aee6ee882e0a4
Author: Damian Johnson <atagar@xxxxxxxxxxxxxx>
Date:   Mon Mar 12 19:17:33 2012 -0700

    Limiting the number of buffered descriptors
    
    Preventing the DescriptorReader from having unbounded memory usage by limiting
    the number of descriptors that we'll store before we wait for our caller to
    request some. This doesn't technically make our memory usage bounded since a
    single descriptor doesn't have a limited size, but if one descripter can
    trigger the OOM killer then we have a problem. :)
    
    This isn't yet tested because we only have a single descriptor in our test data
    (we need at least two before we can test this). Adding a todo note for now.
---
 stem/descriptor/reader.py       |   34 ++++++++++++++++++++--------------
 test/integ/descriptor/reader.py |    1 +
 2 files changed, 21 insertions(+), 14 deletions(-)

diff --git a/stem/descriptor/reader.py b/stem/descriptor/reader.py
index 6026408..6af2fc6 100644
--- a/stem/descriptor/reader.py
+++ b/stem/descriptor/reader.py
@@ -71,16 +71,6 @@ import threading
 import mimetypes
 import Queue
 
-# TODO: Remianing impementation items...
-# - implement gzip and bz2 reading
-# - maximum read-ahead
-
-# Maximum number of descriptors that we'll read ahead before waiting for our
-# caller to fetch some of them. This is included to avoid unbounded memory
-# usage. This condition will be removed if set to zero.
-
-MAX_STORED_DESCRIPTORS = 20
-
 # flag to indicate when the reader thread is out of descriptor files to read
 FINISHED = "DONE"
 
@@ -189,13 +179,19 @@ class DescriptorReader:
   Iterator for the descriptor data on the local file system. This can process
   text files, tarball archives (gzip or bzip2), or recurse directories.
   
+  By default this limits the number of descriptors that we'll read ahead before
+  waiting for our caller to fetch some of them. This is included to avoid
+  unbounded memory usage.
+  
   Arguments:
     targets (list)      - paths for files or directories to be read from
     follow_links (bool) - determines if we'll follow symlinks when transversing
                           directories
+    buffer_size (int)   - descriptors we'll buffer before waiting for some to
+                          be read, this is unbounded if zero
   """
   
-  def __init__(self, targets, follow_links = False):
+  def __init__(self, targets, follow_links = False, buffer_size = 100):
     self._targets = targets
     self._follow_links = follow_links
     self._skip_listeners = []
@@ -213,7 +209,7 @@ class DescriptorReader:
     # Descriptors that we have read but not yet provided to the caller. A
     # FINISHED entry is used by the reading thread to indicate the end.
     
-    self._unreturned_descriptors = Queue.Queue()
+    self._unreturned_descriptors = Queue.Queue(buffer_size)
   
   def get_processed_files(self):
     """
@@ -335,7 +331,7 @@ class DescriptorReader:
         else:
           self._notify_skip_listeners(target, UnrecognizedType(target_type))
     
-    self._unreturned_descriptors.put(FINISHED)
+    self._enqueue_descriptor(FINISHED)
     self._iter_notice.set()
   
   def __iter__(self):
@@ -354,7 +350,7 @@ class DescriptorReader:
     try:
       # TODO: replace with actual descriptor parsing when we have it
       target_file = open(target)
-      self._unreturned_descriptors.put(target_file.read())
+      self._enqueue_descriptor(target_file.read())
       self._iter_notice.set()
     except IOError, exc:
       self._notify_skip_listeners(target, ReadFailed(exc))
@@ -365,6 +361,16 @@ class DescriptorReader:
   def _handle_archive_bzip(self, target):
     pass # TODO: implement
   
+  def _enqueue_descriptor(self, descriptor):
+    # blocks until their is either room for the descriptor or we're stopped
+    
+    while True:
+      try:
+        self._unreturned_descriptors.put(descriptor, timeout = 0.1)
+        return
+      except Queue.Full:
+        if self._is_stopped.is_set(): return
+  
   def _notify_skip_listeners(self, path, exception):
     for listener in self._skip_listeners:
       listener(path, exception)
diff --git a/test/integ/descriptor/reader.py b/test/integ/descriptor/reader.py
index a2383ee..d071db6 100644
--- a/test/integ/descriptor/reader.py
+++ b/test/integ/descriptor/reader.py
@@ -44,6 +44,7 @@ class SkipListener:
   def listener(self, path, exception):
     self.results.append((path, exception))
 
+# TODO: test buffer_size when we have more descriptor examples
 class TestDescriptorReader(unittest.TestCase):
   def tearDown(self):
     # cleans up 'processed file' listings that we made



_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits