[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [stem/master] Adding tarball support to the DescriptorReader
commit 828d5dac1c0eda9db996438af45515c0dec0cef0
Author: Damian Johnson <atagar@xxxxxxxxxxxxxx>
Date: Mon Mar 12 20:28:16 2012 -0700
Adding tarball support to the DescriptorReader
Adding support for reading directly from tarballs (which is how metrics are
commonly fetched). This supports all forms of compression that the tarfile
module does (gzip and bz2 among other). Including some tests and archives to
read against.
---
stem/descriptor/reader.py | 25 +++++---
test/integ/descriptor/data/descriptor_archive.tar | Bin 0 -> 20480 bytes
.../descriptor/data/descriptor_archive.tar.bz2 | Bin 0 -> 3322 bytes
.../descriptor/data/descriptor_archive.tar.gz | Bin 0 -> 2844 bytes
test/integ/descriptor/reader.py | 59 +++++++++++++++++++-
5 files changed, 74 insertions(+), 10 deletions(-)
diff --git a/stem/descriptor/reader.py b/stem/descriptor/reader.py
index 3b6e04a..198605b 100644
--- a/stem/descriptor/reader.py
+++ b/stem/descriptor/reader.py
@@ -67,6 +67,7 @@ FileSkipped - Base exception for a file that was skipped.
"""
import os
+import tarfile
import threading
import mimetypes
import Queue
@@ -328,10 +329,9 @@ class DescriptorReader:
if target_type[0] in (None, 'text/plain'):
# either '.txt' or an unknown type
self._handle_descriptor_file(target)
- elif target_type == ('application/x-tar', 'gzip'):
- self._handle_archive_gzip(target)
- elif target_type == ('application/x-tar', 'bzip2'):
- self._handle_archive_gzip(target)
+ elif tarfile.is_tarfile(target):
+ # handles gzip, bz2, and decompressed tarballs among others
+ self._handle_archive(target)
else:
self._notify_skip_listeners(target, UnrecognizedType(target_type))
@@ -355,15 +355,22 @@ class DescriptorReader:
# TODO: replace with actual descriptor parsing when we have it
target_file = open(target)
self._enqueue_descriptor(target_file.read())
+ target_file.close()
+
self._iter_notice.set()
except IOError, exc:
self._notify_skip_listeners(target, ReadFailed(exc))
- def _handle_archive_gzip(self, target):
- pass # TODO: implement
-
- def _handle_archive_bzip(self, target):
- pass # TODO: implement
+ def _handle_archive(self, target):
+ with tarfile.open(target) as tar_file:
+ for tar_entry in tar_file:
+ if tar_entry.isfile():
+ # TODO: replace with actual descriptor parsing when we have it
+ entry = tar_file.extractfile(tar_entry)
+ self._enqueue_descriptor(entry.read())
+ entry.close()
+
+ self._iter_notice.set()
def _enqueue_descriptor(self, descriptor):
# blocks until their is either room for the descriptor or we're stopped
diff --git a/test/integ/descriptor/data/descriptor_archive.tar b/test/integ/descriptor/data/descriptor_archive.tar
new file mode 100644
index 0000000..2c40716
Binary files /dev/null and b/test/integ/descriptor/data/descriptor_archive.tar differ
diff --git a/test/integ/descriptor/data/descriptor_archive.tar.bz2 b/test/integ/descriptor/data/descriptor_archive.tar.bz2
new file mode 100644
index 0000000..ba1f239
Binary files /dev/null and b/test/integ/descriptor/data/descriptor_archive.tar.bz2 differ
diff --git a/test/integ/descriptor/data/descriptor_archive.tar.gz b/test/integ/descriptor/data/descriptor_archive.tar.gz
new file mode 100644
index 0000000..63a6a57
Binary files /dev/null and b/test/integ/descriptor/data/descriptor_archive.tar.gz differ
diff --git a/test/integ/descriptor/reader.py b/test/integ/descriptor/reader.py
index d071db6..aa204e9 100644
--- a/test/integ/descriptor/reader.py
+++ b/test/integ/descriptor/reader.py
@@ -6,6 +6,7 @@ import os
import sys
import time
import signal
+import tarfile
import unittest
import stem.descriptor.reader
@@ -20,6 +21,8 @@ BASIC_LISTING = """
my_dir = os.path.dirname(__file__)
DESCRIPTOR_TEST_DATA = os.path.join(my_dir, "data")
+TAR_DESCRIPTORS = []
+
def _get_processed_files_path():
return os.path.join(test.runner.get_runner().get_test_dir(), "descriptor_processed_files")
@@ -37,6 +40,21 @@ def _make_processed_files_listing(contents):
return test_listing_path
+def _get_raw_tar_descriptors():
+ global TAR_DESCRIPTORS
+
+ if not TAR_DESCRIPTORS:
+ test_path = os.path.join(DESCRIPTOR_TEST_DATA, "descriptor_archive.tar")
+
+ with tarfile.open(test_path) as tar_file:
+ for tar_entry in tar_file:
+ if tar_entry.isfile():
+ entry = tar_file.extractfile(tar_entry)
+ TAR_DESCRIPTORS.append(entry.read())
+ entry.close()
+
+ return TAR_DESCRIPTORS
+
class SkipListener:
def __init__(self):
self.results = [] # (path, exception) tuples that we've received
@@ -133,7 +151,7 @@ class TestDescriptorReader(unittest.TestCase):
# running this test multiple times to flush out concurrency issues
for i in xrange(15):
- reader = stem.descriptor.reader.DescriptorReader([DESCRIPTOR_TEST_DATA])
+ reader = stem.descriptor.reader.DescriptorReader([descriptor_path])
remaining_entries = list(descriptor_entries)
with reader:
@@ -174,6 +192,45 @@ class TestDescriptorReader(unittest.TestCase):
with reader:
self.assertEquals(1, len(list(reader)))
+ def test_archived_uncompressed(self):
+ """
+ Checks that we can read descriptors from an uncompressed archive.
+ """
+
+ expected_results = _get_raw_tar_descriptors()
+ test_path = os.path.join(DESCRIPTOR_TEST_DATA, "descriptor_archive.tar")
+ reader = stem.descriptor.reader.DescriptorReader([test_path])
+
+ with reader:
+ read_descriptors = [str(desc) for desc in list(reader)]
+ self.assertEquals(expected_results, read_descriptors)
+
+ def test_archived_gzip(self):
+ """
+ Checks that we can read descriptors from a gzipped archive.
+ """
+
+ expected_results = _get_raw_tar_descriptors()
+ test_path = os.path.join(DESCRIPTOR_TEST_DATA, "descriptor_archive.tar.gz")
+ reader = stem.descriptor.reader.DescriptorReader([test_path])
+
+ with reader:
+ read_descriptors = [str(desc) for desc in list(reader)]
+ self.assertEquals(expected_results, read_descriptors)
+
+ def test_archived_bz2(self):
+ """
+ Checks that we can read descriptors from an bzipped archive.
+ """
+
+ expected_results = _get_raw_tar_descriptors()
+ test_path = os.path.join(DESCRIPTOR_TEST_DATA, "descriptor_archive.tar.bz2")
+ reader = stem.descriptor.reader.DescriptorReader([test_path])
+
+ with reader:
+ read_descriptors = [str(desc) for desc in list(reader)]
+ self.assertEquals(expected_results, read_descriptors)
+
def test_stop(self):
"""
Runs a DescriptorReader over the root directory, then checks that calling
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits