[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [stem/master] RouterStatusEntry unit tests

commit 239d9642bfc800b4f720880f359cdc92a713e63f
Author: Damian Johnson <atagar@xxxxxxxxxxxxxx>
Date:   Tue Aug 21 16:52:56 2012 -0700

    RouterStatusEntry unit tests
    Unit tests for the RouterStatusEntry use cases that come to mind. As normal
    they uncovered some bugs with the class.
 stem/descriptor/networkstatus.py      |   44 +++-
 test/unit/descriptor/networkstatus.py |  354 +++++++++++++++++++++++++++++++++
 2 files changed, 386 insertions(+), 12 deletions(-)

diff --git a/stem/descriptor/networkstatus.py b/stem/descriptor/networkstatus.py
index 96f10ce..021ac4c 100644
--- a/stem/descriptor/networkstatus.py
+++ b/stem/descriptor/networkstatus.py
@@ -531,7 +531,7 @@ class RouterStatusEntry(stem.descriptor.Descriptor):
         r_comp = value.split(" ")
-        if len(r_comp) < 5:
+        if len(r_comp) < 8:
           if not validate: continue
           raise ValueError("Router status entry's 'r' line line must have eight values: %s" % line)
@@ -564,7 +564,17 @@ class RouterStatusEntry(stem.descriptor.Descriptor):
         # "s" Flags
         # s Named Running Stable Valid
-        self.flags = value.split(" ")
+        if value == "":
+          self.flags = []
+        else:
+          self.flags = value.split(" ")
+        if validate:
+          for flag in self.flags:
+            if self.flags.count(flag) > 1:
+              raise ValueError("Router status entry had duplicate flags: %s" % line)
+            elif flag == "":
+              raise ValueError("Router status entry had extra whitespace on its 's' line: %s" % line)
       elif keyword == 'v':
         # "v" version
         # v Tor
@@ -595,16 +605,19 @@ class RouterStatusEntry(stem.descriptor.Descriptor):
           raise ValueError("Router status entry's 'w' line needs to start with a 'Bandwidth=' entry: %s" % line)
         for w_entry in w_comp:
-          w_key, w_value = w_entry.split('=', 1)
+          if '=' in w_entry:
+            w_key, w_value = w_entry.split('=', 1)
+          else:
+            w_key, w_value = w_entry, None
           if w_key == "Bandwidth":
-            if not w_value.isdigit():
+            if not (w_value and w_value.isdigit()):
               if not validate: continue
               raise ValueError("Router status entry's 'Bandwidth=' entry needs to have a numeric value: %s" % line)
             self.bandwidth = int(w_value)
           elif w_key == "Measured":
-            if not w_value.isdigit():
+            if not (w_value and w_value.isdigit()):
               if not validate: continue
               raise ValueError("Router status entry's 'Measured=' entry needs to have a numeric value: %s" % line)
@@ -627,9 +640,11 @@ class RouterStatusEntry(stem.descriptor.Descriptor):
         m_comp = value.split(" ")
-        if self.document.vote_status != "vote":
+        if not (self.document and self.document.vote_status == "vote"):
           if not validate: continue
-          raise ValueError("Router status entry's 'm' line should only appear in votes (appeared in a %s): %s" % (self.document.vote_status, line))
+          vote_status = self.document.vote_status if self.document else "<undefined document>"
+          raise ValueError("Router status entry's 'm' line should only appear in votes (appeared in a %s): %s" % (vote_status, line))
         elif len(m_comp) < 1:
           if not validate: continue
           raise ValueError("Router status entry's 'm' line needs to start with a series of methods: %s" % line)
@@ -884,7 +899,14 @@ def _decode_fingerprint(identity, validate):
   identity += "=" * missing_padding
   fingerprint = ""
-  for char in base64.b64decode(identity):
+  try:
+    identity_decoded = base64.b64decode(identity)
+  except TypeError, exc:
+    if not validate: return None
+    raise ValueError("Unable to decode identity string '%s'" % identity)
+  for char in identity_decoded:
     # Individual characters are either standard ascii or hex encoded, and each
     # represent two hex digits. For instnace...
@@ -898,10 +920,8 @@ def _decode_fingerprint(identity, validate):
     fingerprint += hex(ord(char))[2:].zfill(2).upper()
   if not stem.util.tor_tools.is_valid_fingerprint(fingerprint):
-    if validate:
-      raise ValueError("Decoded '%s' to be '%s', which isn't a valid fingerprint" % (identity, fingerprint))
-    else:
-      return None
+    if not validate: return None
+    raise ValueError("Decoded '%s' to be '%s', which isn't a valid fingerprint" % (identity, fingerprint))
   return fingerprint
diff --git a/test/unit/descriptor/networkstatus.py b/test/unit/descriptor/networkstatus.py
index e6a8514..c7672e6 100644
--- a/test/unit/descriptor/networkstatus.py
+++ b/test/unit/descriptor/networkstatus.py
@@ -6,6 +6,8 @@ import datetime
 import unittest
 from stem.descriptor.networkstatus import Flag, RouterStatusEntry, _decode_fingerprint
+from stem.version import Version
+from stem.exit_policy import MicrodescriptorExitPolicy
   ("r", "caerSidi p1aag7VwarGxqctS7/fS0y5FU+s oQZFLYe9e4A7bOkWKR7TaNxb0JE 2012-08-06 11:19:31 9001 0"),
@@ -90,4 +92,356 @@ class TestNetworkStatus(unittest.TestCase):
     self.assertEqual(None, entry.exit_policy)
     self.assertEqual(None, entry.microdescriptor_hashes)
     self.assertEqual([], entry.get_unrecognized_lines())
+  def test_rse_missing_fields(self):
+    """
+    Parses a router status entry that's missing fields.
+    """
+    content = get_router_status_entry(exclude = ('r', 's'))
+    self._expect_invalid_rse_attr(content, "address")
+    content = get_router_status_entry(exclude = ('r',))
+    self._expect_invalid_rse_attr(content, "address")
+    content = get_router_status_entry(exclude = ('s',))
+    self._expect_invalid_rse_attr(content, "flags")
+  def test_rse_unrecognized_lines(self):
+    """
+    Parses a router status entry with new keywords.
+    """
+    content = get_router_status_entry({'z': 'New tor feature: sparkly unicorns!'})
+    entry = RouterStatusEntry(content, None)
+    self.assertEquals(['z New tor feature: sparkly unicorns!'], entry.get_unrecognized_lines())
+  def test_rse_proceeding_line(self):
+    """
+    Includes content prior to the 'r' line.
+    """
+    content = 'z some stuff\n' + get_router_status_entry()
+    self._expect_invalid_rse_attr(content, "_unrecognized_lines", ['z some stuff'])
+  def test_rse_blank_lines(self):
+    """
+    Includes blank lines, which should be ignored.
+    """
+    content = get_router_status_entry() + "\n\nv Tor\n\n"
+    entry = RouterStatusEntry(content, None)
+    self.assertEqual("Tor", entry.version_line)
+  def test_rse_missing_r_field(self):
+    """
+    Excludes fields from the 'r' line.
+    """
+    components = (
+      ('nickname', 'caerSidi'),
+      ('fingerprint', 'p1aag7VwarGxqctS7/fS0y5FU+s'),
+      ('digest', 'oQZFLYe9e4A7bOkWKR7TaNxb0JE'),
+      ('published', '2012-08-06 11:19:31'),
+      ('address', ''),
+      ('or_port', '9001'),
+      ('dir_port', '0'),
+    )
+    for attr, value in components:
+      # construct the 'r' line without this field
+      test_components = [comp[1] for comp in components]
+      test_components.remove(value)
+      r_line = ' '.join(test_components)
+      content = get_router_status_entry({'r': r_line})
+      self._expect_invalid_rse_attr(content, attr)
+  def test_rse_malformed_nickname(self):
+    """
+    Parses an 'r' line with a malformed nickname.
+    """
+    test_values = (
+      "",
+      "saberrider2008ReallyLongNickname", # too long
+      "$aberrider2008", # invalid characters
+    )
+    for value in test_values:
+      r_line = ROUTER_STATUS_ENTRY_ATTR[0][1].replace("caerSidi", value)
+      content = get_router_status_entry({'r': r_line})
+      # TODO: Initial whitespace is consumed as part of the keyword/value
+      # divider. This is a bug in the case of V3 router status entries, but
+      # proper behavior for V2 router status entries and server/extrainfo
+      # descriptors.
+      #
+      # I'm inclined to leave this as-is for the moment since fixing it
+      # requires special KEYWORD_LINE handling, and the only result of this bug
+      # is that our validation doesn't catch the new SP restriction on V3
+      # entries.
+      if value == "": value = None
+      self._expect_invalid_rse_attr(content, "nickname", value)
+  def test_rse_malformed_fingerprint(self):
+    """
+    Parses an 'r' line with a malformed fingerprint.
+    """
+    test_values = (
+      "",
+      "zzzzz",
+      "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz",
+    )
+    for value in test_values:
+      r_line = ROUTER_STATUS_ENTRY_ATTR[0][1].replace("p1aag7VwarGxqctS7/fS0y5FU+s", value)
+      content = get_router_status_entry({'r': r_line})
+      self._expect_invalid_rse_attr(content, "fingerprint")
+  def test_rse_malformed_published_date(self):
+    """
+    Parses an 'r' line with a malformed published date.
+    """
+    test_values = (
+      "",
+      "2012-08-06 11:19:",
+      "2012-08-06 11:19:71",
+      "2012-08-06 11::31",
+      "2012-08-06 11:79:31",
+      "2012-08-06 :19:31",
+      "2012-08-06 41:19:31",
+      "2012-08- 11:19:31",
+      "2012-08-86 11:19:31",
+      "2012--06 11:19:31",
+      "2012-38-06 11:19:31",
+      "-08-06 11:19:31",
+      "2012-08-06   11:19:31",
+    )
+    for value in test_values:
+      r_line = ROUTER_STATUS_ENTRY_ATTR[0][1].replace("2012-08-06 11:19:31", value)
+      content = get_router_status_entry({'r': r_line})
+      self._expect_invalid_rse_attr(content, "published")
+  def test_rse_malformed_address(self):
+    """
+    Parses an 'r' line with a malformed address.
+    """
+    test_values = (
+      "",
+      "71.35.150.",
+      "71.35..29",
+      "71.35.150",
+      "",
+    )
+    for value in test_values:
+      r_line = ROUTER_STATUS_ENTRY_ATTR[0][1].replace("", value)
+      content = get_router_status_entry({'r': r_line})
+      self._expect_invalid_rse_attr(content, "address", value)
+  def test_rse_malformed_port(self):
+    """
+    Parses an 'r' line with a malformed ORPort or DirPort.
+    """
+    test_values = (
+      "",
+      "-1",
+      "399482",
+      "blarg",
+    )
+    for value in test_values:
+      for include_or_port in (False, True):
+        for include_dir_port in (False, True):
+          if not include_or_port and not include_dir_port:
+            continue
+          r_line = ROUTER_STATUS_ENTRY_ATTR[0][1]
+          if include_or_port:
+            r_line = r_line.replace(" 9001 ", " %s " % value)
+          if include_dir_port:
+            r_line = r_line[:-1] + value
+          attr = "or_port" if include_or_port else "dir_port"
+          expected = int(value) if value.isdigit() else None
+          content = get_router_status_entry({'r': r_line})
+          self._expect_invalid_rse_attr(content, attr, expected)
+  def test_rse_flags(self):
+    """
+    Handles a variety of flag inputs.
+    """
+    test_values = {
+      "": [],
+      "Fast": [Flag.FAST],
+      "Fast Valid": [Flag.FAST, Flag.VALID],
+      "Ugabuga": ["Ugabuga"],
+    }
+    for s_line, expected in test_values.items():
+      content = get_router_status_entry({'s': s_line})
+      entry = RouterStatusEntry(content, None)
+      self.assertEquals(expected, entry.flags)
+    # tries some invalid inputs
+    test_values = {
+      "Fast   ": [Flag.FAST, "", "", ""],
+      "Fast  Valid": [Flag.FAST, "", Flag.VALID],
+      "Fast Fast": [Flag.FAST, Flag.FAST],
+    }
+    for s_line, expected in test_values.items():
+      content = get_router_status_entry({'s': s_line})
+      self._expect_invalid_rse_attr(content, "flags", expected)
+  def test_rse_versions(self):
+    """
+    Handles a variety of version inputs.
+    """
+    test_values = {
+      "Tor": Version(""),
+      "Tor 0.1.2": Version("0.1.2"),
+      "Torr new_stuff": None,
+      "new_stuff and stuff": None,
+    }
+    for v_line, expected in test_values.items():
+      content = get_router_status_entry({'v': v_line})
+      entry = RouterStatusEntry(content, None)
+      self.assertEquals(expected, entry.version)
+      self.assertEquals(v_line, entry.version_line)
+    # tries an invalid input
+    content = get_router_status_entry({'v': "Tor ugabuga"})
+    self._expect_invalid_rse_attr(content, "version")
+  def test_rse_bandwidth(self):
+    """
+    Handles a variety of 'w' lines.
+    """
+    test_values = {
+      "Bandwidth=0": (0, None, []),
+      "Bandwidth=63138": (63138, None, []),
+      "Bandwidth=11111 Measured=482": (11111, 482, []),
+      "Bandwidth=11111 Measured=482 Blarg!": (11111, 482, ["Blarg!"]),
+    }
+    for w_line, expected in test_values.items():
+      content = get_router_status_entry({'w': w_line})
+      entry = RouterStatusEntry(content, None)
+      self.assertEquals(expected[0], entry.bandwidth)
+      self.assertEquals(expected[1], entry.measured)
+      self.assertEquals(expected[2], entry.unrecognized_bandwidth_entries)
+    # tries some invalid inputs
+    test_values = (
+      "",
+      "blarg",
+      "Bandwidth",
+      "Bandwidth=",
+      "Bandwidth:0",
+      "Bandwidth 0",
+      "Bandwidth=-10",
+      "Bandwidth=10 Measured",
+      "Bandwidth=10 Measured=",
+      "Bandwidth=10 Measured=-50",
+    )
+    for w_line in test_values:
+      content = get_router_status_entry({'w': w_line})
+      self._expect_invalid_rse_attr(content)
+  def test_rse_exit_policy(self):
+    """
+    Handles a variety of 'p' lines.
+    """
+    test_values = {
+      "reject 1-65535": MicrodescriptorExitPolicy("reject 1-65535"),
+      "accept 80,110,143,443": MicrodescriptorExitPolicy("accept 80,110,143,443"),
+    }
+    for p_line, expected in test_values.items():
+      content = get_router_status_entry({'p': p_line})
+      entry = RouterStatusEntry(content, None)
+      self.assertEquals(expected, entry.exit_policy)
+    # tries some invalid inputs
+    test_values = (
+      "",
+      "blarg",
+      "reject -50",
+      "accept 80,",
+    )
+    for p_line in test_values:
+      content = get_router_status_entry({'p': p_line})
+      self._expect_invalid_rse_attr(content, "exit_policy")
+  def test_rse_microdescriptor_hashes(self):
+    """
+    Handles a variety of 'm' lines.
+    """
+    test_values = {
+      "8,9,10,11,12":
+        [([8, 9, 10, 11, 12], {})],
+      "8,9,10,11,12 sha256=g1vx9si329muxV3tquWIXXySNOIwRGMeAESKs/v4DWs":
+        [([8, 9, 10, 11, 12], {"sha256": "g1vx9si329muxV3tquWIXXySNOIwRGMeAESKs/v4DWs"})],
+      "8,9,10,11,12 sha256=g1vx9si329muxV md5=3tquWIXXySNOIwRGMeAESKs/v4DWs":
+        [([8, 9, 10, 11, 12], {"sha256": "g1vx9si329muxV", "md5": "3tquWIXXySNOIwRGMeAESKs/v4DWs"})],
+    }
+    # we need a document that's a vote
+    mock_document = lambda x: x # just need anything with a __dict__
+    mock_document.__dict__["vote_status"] = "vote"
+    for m_line, expected in test_values.items():
+      content = get_router_status_entry({'m': m_line})
+      entry = RouterStatusEntry(content, mock_document)
+      self.assertEquals(expected, entry.microdescriptor_hashes)
+    # try without a document
+    content = get_router_status_entry({'m': "8,9,10,11,12"})
+    self._expect_invalid_rse_attr(content, "microdescriptor_hashes")
+    # tries some invalid inputs
+    test_values = (
+      "",
+      "4,a,2",
+      "1,2,3 stuff",
+    )
+    for m_line in test_values:
+      content = get_router_status_entry({'m': m_line})
+      self.assertRaises(ValueError, RouterStatusEntry, content, mock_document)
+  def _expect_invalid_rse_attr(self, content, attr = None, expected_value = None):
+    """
+    Asserts that construction will fail due to content having a malformed
+    attribute. If an attr is provided then we check that it matches an expected
+    value when we're constructed without validation.
+    """
+    self.assertRaises(ValueError, RouterStatusEntry, content, None)
+    entry = RouterStatusEntry(content, None, False)
+    if attr:
+      self.assertEquals(expected_value, getattr(entry, attr))
+    else:
+      self.assertEquals("caerSidi", entry.nickname)

tor-commits mailing list