[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [chutney/master] Refactor Source and Sink to not know about their data



commit 1bbe2079c40ec6d6517c2125274e9b9510c09197
Author: Nick Mathewson <nickm@xxxxxxxxxxxxxx>
Date:   Fri May 10 10:12:00 2019 -0400

    Refactor Source and Sink to not know about their data
    
    Now data is generated by a DataSource type, and consumed by a
    DataChecker type that compares its incoming data against the data
    in a DataSource.
---
 lib/chutney/Traffic.py | 113 +++++++++++++++++++++++++++++--------------------
 1 file changed, 68 insertions(+), 45 deletions(-)

diff --git a/lib/chutney/Traffic.py b/lib/chutney/Traffic.py
index f877452..72b9ca3 100755
--- a/lib/chutney/Traffic.py
+++ b/lib/chutney/Traffic.py
@@ -107,7 +107,6 @@ class TestSuite(object):
         return('%s: %d/%d/%d' % (self.tests, self.not_done, self.successes,
                                  self.failures))
 
-
 class Listener(asyncore.dispatcher):
     "A TCP listener, binding, listening and accepting new connections."
 
@@ -132,14 +131,69 @@ class Listener(asyncore.dispatcher):
     def fileno(self):
         return self.socket.fileno()
 
+class DataSource(object):
+    """A data source generates some number of bytes of data, and then
+       returns None.
+
+       For convenience, it conforms to the 'producer' api.
+    """
+    def __init__(self, data, repetitions=1):
+        self.data = data
+        self.repetitions = repetitions
+        self.sent_any = False
+
+    def copy(self):
+        assert not self.sent_any
+        return DataSource(self.data, self.repetitions)
+
+    def more(self):
+        self.sent_any = True
+        if self.repetitions > 0:
+            self.repetitions -= 1
+            return self.data
+
+        return None
+
+class DataChecker(object):
+    """A data checker verifies its input against bytes in a stream."""
+    def __init__(self, source):
+        self.source = source
+        self.pending = b""
+        self.succeeded = False
+        self.failed = False
+
+    def consume(self, inp):
+        if self.failed:
+            return
+        if self.succeeded and len(inp):
+            self.succeeded = False
+            self.failed = True
+            return
+
+        while len(inp):
+            n = min(len(inp), len(self.pending))
+            if inp[:n] != self.pending[:n]:
+                self.failed = True
+                return
+            inp = inp[n:]
+            self.pending = self.pending[n:]
+            if not self.pending:
+                self.pending = self.source.more()
+
+                if self.pending is None:
+                    if len(inp):
+                        self.failed = True
+                    else:
+                        self.succeeded = True
+                    return
+
 class Sink(asynchat.async_chat):
     "A data sink, reading from its peer and verifying the data."
     def __init__(self, sock, tt):
         asynchat.async_chat.__init__(self, sock)
-        self.inbuf = b""
         self.set_terminator(None)
         self.tt = tt
-        self.repetitions = tt.repetitions
+        self.data_checker = DataChecker(tt.data_source.copy())
         self.testname = "recv-data%s"%id(self)
 
     def get_test_names(self):
@@ -148,33 +202,16 @@ class Sink(asynchat.async_chat):
     def collect_incoming_data(self, inp):
         # shortcut read when we don't ever expect any data
 
-        self.inbuf += inp
-        data = self.tt.data
-        debug("successfully received (bytes=%d)" % len(self.inbuf))
-        while len(self.inbuf) >= len(data):
-            assert(len(self.inbuf) <= len(data) or self.repetitions > 1)
-            if self.inbuf[:len(data)] != data:
-                debug("receive comparison failed (bytes=%d)" % len(data))
-                self.tt.failure(self.testname)
-                self.close()
-            # if we're not debugging, print a dot every dot_repetitions reps
-            elif (not debug_flag and self.tt.dot_repetitions > 0 and
-                  self.repetitions % self.tt.dot_repetitions == 0):
-                sys.stdout.write('.')
-                sys.stdout.flush()
-            # repeatedly check data against self.inbuf if required
-            debug("receive comparison success (bytes=%d)" % len(data))
-            self.inbuf = self.inbuf[len(data):]
-            debug("receive leftover bytes (bytes=%d)" % len(self.inbuf))
-            self.repetitions -= 1
-            debug("receive remaining repetitions (reps=%d)" % self.repetitions)
-        if self.repetitions == 0 and len(self.inbuf) == 0:
+        debug("successfully received (bytes=%d)" % len(inp))
+        self.data_checker.consume(inp)
+        if self.data_checker.succeeded:
             debug("successful verification")
             self.close()
             self.tt.success(self.testname)
-        # calculate the actual length of data remaining, including reps
-        debug("receive remaining bytes (bytes=%d)"
-              % (self.repetitions*len(data) - len(self.inbuf)))
+        elif self.data_checker.failed:
+            debug("receive comparison failed")
+            self.tt.failure(self.testname)
+            self.close()
 
     def fileno(self):
         return self.socket.fileno()
@@ -197,22 +234,13 @@ class Source(asynchat.async_chat):
 
     def __init__(self, tt, server, buf, proxy=None, repetitions=1):
         asynchat.async_chat.__init__(self)
-        self.data = buf
-        self.outbuf = b''
+        self.data_source = DataSource(buf, repetitions)
         self.inbuf = b''
         self.proxy = proxy
         self.server = server
-        self.repetitions = repetitions
-        self._sent_no_bytes = 0
         self.tt = tt
         self.testname = "send-data%s"%id(self)
 
-        # sanity checks
-        if len(self.data) == 0:
-            self.repetitions = 0
-        if self.repetitions == 0:
-            self.data = b""
-
         self.set_terminator(None)
         dest = (self.proxy or self.server)
         self.create_socket(addr_to_family(dest[0]), socket.SOCK_STREAM)
@@ -251,8 +279,7 @@ class Source(asynchat.async_chat):
                     self.close()
 
     def push_output(self):
-        for _ in range(self.repetitions):
-            self.push_with_producer(asynchat.simple_producer(self.data))
+        self.push_with_producer(self.data_source)
 
         self.push_with_producer(CloseSourceProducer(self))
         self.close_when_done()
@@ -279,13 +306,9 @@ class TrafficTester(object):
         self.pending_close = []
         self.timeout = timeout
         self.tests = TestSuite()
-        self.data = data
-        self.repetitions = repetitions
+        self.data_source = DataSource(data, repetitions)
+
         # sanity checks
-        if len(self.data) == 0:
-            self.repetitions = 0
-        if self.repetitions == 0:
-            self.data = b""
         self.dot_repetitions = dot_repetitions
         debug("listener fd=%d" % self.listener.fileno())
 



_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits