[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [stem/master] Limiting the number of buffered descriptors
commit adbc1991fdae353825b893f5e90aee6ee882e0a4
Author: Damian Johnson <atagar@xxxxxxxxxxxxxx>
Date: Mon Mar 12 19:17:33 2012 -0700
Limiting the number of buffered descriptors
Preventing the DescriptorReader from having unbounded memory usage by limiting
the number of descriptors that we'll store before we wait for our caller to
request some. This doesn't technically make our memory usage bounded since a
single descriptor doesn't have a limited size, but if one descripter can
trigger the OOM killer then we have a problem. :)
This isn't yet tested because we only have a single descriptor in our test data
(we need at least two before we can test this). Adding a todo note for now.
---
stem/descriptor/reader.py | 34 ++++++++++++++++++++--------------
test/integ/descriptor/reader.py | 1 +
2 files changed, 21 insertions(+), 14 deletions(-)
diff --git a/stem/descriptor/reader.py b/stem/descriptor/reader.py
index 6026408..6af2fc6 100644
--- a/stem/descriptor/reader.py
+++ b/stem/descriptor/reader.py
@@ -71,16 +71,6 @@ import threading
import mimetypes
import Queue
-# TODO: Remianing impementation items...
-# - implement gzip and bz2 reading
-# - maximum read-ahead
-
-# Maximum number of descriptors that we'll read ahead before waiting for our
-# caller to fetch some of them. This is included to avoid unbounded memory
-# usage. This condition will be removed if set to zero.
-
-MAX_STORED_DESCRIPTORS = 20
-
# flag to indicate when the reader thread is out of descriptor files to read
FINISHED = "DONE"
@@ -189,13 +179,19 @@ class DescriptorReader:
Iterator for the descriptor data on the local file system. This can process
text files, tarball archives (gzip or bzip2), or recurse directories.
+ By default this limits the number of descriptors that we'll read ahead before
+ waiting for our caller to fetch some of them. This is included to avoid
+ unbounded memory usage.
+
Arguments:
targets (list) - paths for files or directories to be read from
follow_links (bool) - determines if we'll follow symlinks when transversing
directories
+ buffer_size (int) - descriptors we'll buffer before waiting for some to
+ be read, this is unbounded if zero
"""
- def __init__(self, targets, follow_links = False):
+ def __init__(self, targets, follow_links = False, buffer_size = 100):
self._targets = targets
self._follow_links = follow_links
self._skip_listeners = []
@@ -213,7 +209,7 @@ class DescriptorReader:
# Descriptors that we have read but not yet provided to the caller. A
# FINISHED entry is used by the reading thread to indicate the end.
- self._unreturned_descriptors = Queue.Queue()
+ self._unreturned_descriptors = Queue.Queue(buffer_size)
def get_processed_files(self):
"""
@@ -335,7 +331,7 @@ class DescriptorReader:
else:
self._notify_skip_listeners(target, UnrecognizedType(target_type))
- self._unreturned_descriptors.put(FINISHED)
+ self._enqueue_descriptor(FINISHED)
self._iter_notice.set()
def __iter__(self):
@@ -354,7 +350,7 @@ class DescriptorReader:
try:
# TODO: replace with actual descriptor parsing when we have it
target_file = open(target)
- self._unreturned_descriptors.put(target_file.read())
+ self._enqueue_descriptor(target_file.read())
self._iter_notice.set()
except IOError, exc:
self._notify_skip_listeners(target, ReadFailed(exc))
@@ -365,6 +361,16 @@ class DescriptorReader:
def _handle_archive_bzip(self, target):
pass # TODO: implement
+ def _enqueue_descriptor(self, descriptor):
+ # blocks until their is either room for the descriptor or we're stopped
+
+ while True:
+ try:
+ self._unreturned_descriptors.put(descriptor, timeout = 0.1)
+ return
+ except Queue.Full:
+ if self._is_stopped.is_set(): return
+
def _notify_skip_listeners(self, path, exception):
for listener in self._skip_listeners:
listener(path, exception)
diff --git a/test/integ/descriptor/reader.py b/test/integ/descriptor/reader.py
index a2383ee..d071db6 100644
--- a/test/integ/descriptor/reader.py
+++ b/test/integ/descriptor/reader.py
@@ -44,6 +44,7 @@ class SkipListener:
def listener(self, path, exception):
self.results.append((path, exception))
+# TODO: test buffer_size when we have more descriptor examples
class TestDescriptorReader(unittest.TestCase):
def tearDown(self):
# cleans up 'processed file' listings that we made
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits