[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [stem/master] Helper function for resolution of inputs to fingerprints
commit fb6962ed28463a56d100c9cac7cc7517879dec99
Author: Damian Johnson <atagar@xxxxxxxxxxxxxx>
Date: Sun May 4 19:14:35 2014 -0700
Helper function for resolution of inputs to fingerprints
Splitting a big chunk of do_info() into a helper. Much better for both
readability and testability.
---
stem/interpretor/commands.py | 199 ++++++++++++++++++++-----------------
stem/interpretor/help.py | 2 +-
test/unit/interpretor/commands.py | 64 +++++++++++-
3 files changed, 170 insertions(+), 95 deletions(-)
diff --git a/stem/interpretor/commands.py b/stem/interpretor/commands.py
index 2a9d402..faf9509 100644
--- a/stem/interpretor/commands.py
+++ b/stem/interpretor/commands.py
@@ -13,6 +13,70 @@ from stem.interpretor import STANDARD_OUTPUT, BOLD_OUTPUT, ERROR_OUTPUT, msg
from stem.util.term import format
+def _get_fingerprint(arg, controller):
+ """
+ Resolves user input into a relay fingerprint. This accepts...
+
+ * Fingerprints
+ * Nicknames
+ * IPv4 addresses, either with or without an ORPort
+ * Empty input, which is resolved to ourselves if we're a relay
+
+ :param str arg: input to be resolved to a relay fingerprint
+ :param stem.control.Controller controller: tor control connection
+
+ :returns: **str** for the relay fingerprint
+
+ :raises: **ValueError** if we're unable to resolve the input to a relay
+ """
+
+ if not arg:
+ try:
+ return controller.get_info('fingerprint')
+ except:
+ raise ValueError("We aren't a relay, no information to provide")
+ elif stem.util.tor_tools.is_valid_fingerprint(arg):
+ return arg
+ elif stem.util.tor_tools.is_valid_nickname(arg):
+ try:
+ return controller.get_network_status(arg).fingerprint
+ except:
+ raise ValueError("Unable to find a relay with the nickname of '%s'" % arg)
+ elif ':' in arg or stem.util.connection.is_valid_ipv4_address(arg):
+ if ':' in arg:
+ address, port = arg.split(':', 1)
+
+ if not stem.util.connection.is_valid_ipv4_address(address):
+ raise ValueError("'%s' isn't a valid IPv4 address" % address)
+ elif port and not stem.util.connection.is_valid_port(port):
+ raise ValueError("'%s' isn't a valid port" % port)
+
+ port = int(port)
+ else:
+ address, port = arg, None
+
+ matches = {}
+
+ for desc in controller.get_network_statuses():
+ if desc.address == address:
+ if not port or desc.or_port == port:
+ matches[desc.or_port] = desc.fingerprint
+
+ if len(matches) == 0:
+ raise ValueError('No relays found at %s' % arg)
+ elif len(matches) == 1:
+ return matches.values()[0]
+ else:
+ response = "There's multiple relays at %s, include a port to specify which.\n\n" % arg
+
+ for i, or_port in enumerate(matches):
+ response += " %i. %s:%s, fingerprint: %s\n" % (i + 1, address, or_port, matches[or_port])
+
+ raise ValueError(response)
+ else:
+ raise ValueError("'%s' isn't a fingerprint, nickname, or IP address" % arg)
+
+
class ControlInterpretor(object):
"""
Handles issuing requests and providing nicely formed responses, with support
@@ -60,117 +124,66 @@ class ControlInterpretor(object):
pretty fashion.
"""
- output, fingerprint = '', None
+ try:
+ fingerprint = _get_fingerprint(arg, self._controller)
+ except ValueError as exc:
+ return format(str(exc), *ERROR_OUTPUT)
- # determines the fingerprint, leaving it unset and adding an error message
- # if unsuccessful
+ micro_desc = self._controller.get_microdescriptor(fingerprint, None)
+ server_desc = self._controller.get_server_descriptor(fingerprint, None)
+ ns_desc = self._controller.get_network_status(fingerprint, None)
- if not arg:
- # uses our fingerprint if we're a relay, otherwise gives an error
+ # We'll mostly rely on the router status entry. Either the server
+ # descriptor or microdescriptor will be missing, so we'll treat them as
+ # being optional.
- fingerprint = self._controller.get_info('fingerprint', None)
+ if not ns_desc:
+ return format("Unable to find consensus information for %s" % fingerprint, *ERROR_OUTPUT)
- if not fingerprint:
- output += format("We aren't a relay, no information to provide", *ERROR_OUTPUT)
- elif stem.util.tor_tools.is_valid_fingerprint(arg):
- fingerprint = arg
- elif stem.util.tor_tools.is_valid_nickname(arg):
- desc = self._controller.get_network_status(arg, None)
+ locale = self._controller.get_info('ip-to-country/%s' % ns_desc.address, None)
+ locale_label = ' (%s)' % locale if locale else ''
- if desc:
- fingerprint = desc.fingerprint
- else:
- return format("Unable to find a relay with the nickname of '%s'" % arg, *ERROR_OUTPUT)
- elif ':' in arg or stem.util.connection.is_valid_ipv4_address(arg):
- # we got an address, so looking up the fingerprint
-
- if ':' in arg:
- address, port = arg.split(':', 1)
-
- if not stem.util.connection.is_valid_ipv4_address(address):
- return format("'%s' isn't a valid IPv4 address" % address, *ERROR_OUTPUT)
- elif port and not stem.util.connection.is_valid_port(port):
- return format("'%s' isn't a valid port" % port, *ERROR_OUTPUT)
-
- port = int(port)
- else:
- address, port = arg, None
-
- matches = {}
-
- for desc in self._controller.get_network_statuses():
- if desc.address == address:
- if not port or desc.or_port == port:
- matches[desc.or_port] = desc.fingerprint
-
- if len(matches) == 0:
- output += format('No relays found at %s' % arg, *ERROR_OUTPUT)
- elif len(matches) == 1:
- fingerprint = matches.values()[0]
- else:
- output += format("There's multiple relays at %s, include a port to specify which.\n\n" % arg, *ERROR_OUTPUT)
-
- for i, or_port in enumerate(matches):
- output += format(" %i. %s:%s, fingerprint: %s\n" % (i + 1, address, or_port, matches[or_port]), *ERROR_OUTPUT)
+ if server_desc:
+ exit_policy_label = server_desc.exit_policy.summary()
+ elif micro_desc:
+ exit_policy_label = micro_desc.exit_policy.summary()
else:
- return format("'%s' isn't a fingerprint, nickname, or IP address" % arg, *ERROR_OUTPUT)
-
- if fingerprint:
- micro_desc = self._controller.get_microdescriptor(fingerprint, None)
- server_desc = self._controller.get_server_descriptor(fingerprint, None)
- ns_desc = self._controller.get_network_status(fingerprint, None)
-
- # We'll mostly rely on the router status entry. Either the server
- # descriptor or microdescriptor will be missing, so we'll treat them as
- # being optional.
-
- if not ns_desc:
- return format("Unable to find consensus information for %s" % fingerprint, *ERROR_OUTPUT)
-
- locale = self._controller.get_info('ip-to-country/%s' % ns_desc.address, None)
- locale_label = ' (%s)' % locale if locale else ''
-
- if server_desc:
- exit_policy_label = server_desc.exit_policy.summary()
- elif micro_desc:
- exit_policy_label = micro_desc.exit_policy.summary()
- else:
- exit_policy_label = 'Unknown'
+ exit_policy_label = 'Unknown'
- output += '%s (%s)\n' % (ns_desc.nickname, fingerprint)
+ output = '%s (%s)\n' % (ns_desc.nickname, fingerprint)
- output += format('address: ', *BOLD_OUTPUT)
- output += '%s:%s%s\n' % (ns_desc.address, ns_desc.or_port, locale_label)
+ output += format('address: ', *BOLD_OUTPUT)
+ output += '%s:%s%s\n' % (ns_desc.address, ns_desc.or_port, locale_label)
- output += format('published: ', *BOLD_OUTPUT)
- output += ns_desc.published.strftime('%H:%M:%S %d/%m/%Y') + '\n'
+ output += format('published: ', *BOLD_OUTPUT)
+ output += ns_desc.published.strftime('%H:%M:%S %d/%m/%Y') + '\n'
- if server_desc:
- output += format('os: ', *BOLD_OUTPUT)
- output += server_desc.platform.decode('utf-8', 'replace') + '\n'
+ if server_desc:
+ output += format('os: ', *BOLD_OUTPUT)
+ output += server_desc.platform.decode('utf-8', 'replace') + '\n'
- output += format('version: ', *BOLD_OUTPUT)
- output += str(server_desc.tor_version) + '\n'
+ output += format('version: ', *BOLD_OUTPUT)
+ output += str(server_desc.tor_version) + '\n'
- output += format('flags: ', *BOLD_OUTPUT)
- output += ', '.join(ns_desc.flags) + '\n'
+ output += format('flags: ', *BOLD_OUTPUT)
+ output += ', '.join(ns_desc.flags) + '\n'
- output += format('exit policy: ', *BOLD_OUTPUT)
- output += exit_policy_label + '\n'
+ output += format('exit policy: ', *BOLD_OUTPUT)
+ output += exit_policy_label + '\n'
- if server_desc:
- contact = server_desc.contact
+ if server_desc:
+ contact = server_desc.contact
- # clears up some highly common obscuring
+ # clears up some highly common obscuring
- for alias in (' at ', ' AT '):
- contact = contact.replace(alias, '@')
+ for alias in (' at ', ' AT '):
+ contact = contact.replace(alias, '@')
- for alias in (' dot ', ' DOT '):
- contact = contact.replace(alias, '.')
+ for alias in (' dot ', ' DOT '):
+ contact = contact.replace(alias, '.')
- output += format('contact: ', *BOLD_OUTPUT)
- output += contact + '\n'
+ output += format('contact: ', *BOLD_OUTPUT)
+ output += contact + '\n'
return output.strip()
diff --git a/stem/interpretor/help.py b/stem/interpretor/help.py
index 20439d6..039bbef 100644
--- a/stem/interpretor/help.py
+++ b/stem/interpretor/help.py
@@ -23,7 +23,7 @@ def response(controller, arg):
"""
Provides our /help response.
- :param stem.Controller controller: tor control connection
+ :param stem.control.Controller controller: tor control connection
:param str arg: controller or interpretor command to provide help output for
:returns: **str** with our help response
diff --git a/test/unit/interpretor/commands.py b/test/unit/interpretor/commands.py
index db0a53b..1517e3a 100644
--- a/test/unit/interpretor/commands.py
+++ b/test/unit/interpretor/commands.py
@@ -1,20 +1,82 @@
import unittest
+import stem
import stem.response
-from stem.interpretor.commands import ControlInterpretor
+from stem.interpretor.commands import ControlInterpretor, _get_fingerprint
from test import mocking
from test.unit.interpretor import CONTROLLER
+try:
+ # added in python 3.3
+ from unittest.mock import Mock
+except ImportError:
+ from mock import Mock
+
EXPECTED_EVENTS_RESPONSE = """\
\x1b[34mBW 15 25\x1b[0m
\x1b[34mBW 758 570\x1b[0m
\x1b[34mDEBUG connection_edge_process_relay_cell(): Got an extended cell! Yay.\x1b[0m
"""
+FINGERPRINT = '9695DFC35FFEB861329B9F1AB04C46397020CE31'
+
class TestInterpretorCommands(unittest.TestCase):
+ def test_get_fingerprint_for_ourselves(self):
+ controller = Mock()
+
+ controller.get_info.side_effect = lambda arg: {
+ 'fingerprint': FINGERPRINT,
+ }[arg]
+
+ self.assertEqual(FINGERPRINT, _get_fingerprint('', controller))
+
+ controller.get_info.side_effect = stem.ControllerError
+ self.assertRaises(ValueError, _get_fingerprint, '', controller)
+
+ def test_get_fingerprint_for_fingerprint(self):
+ self.assertEqual(FINGERPRINT, _get_fingerprint(FINGERPRINT, Mock()))
+
+ def test_get_fingerprint_for_nickname(self):
+ controller, descriptor = Mock(), Mock()
+ descriptor.fingerprint = FINGERPRINT
+
+ controller.get_network_status.side_effect = lambda arg: {
+ 'moria1': descriptor,
+ }[arg]
+
+ self.assertEqual(FINGERPRINT, _get_fingerprint('moria1', controller))
+
+ controller.get_network_status.side_effect = stem.ControllerError
+ self.assertRaises(ValueError, _get_fingerprint, 'moria1', controller)
+
+ def test_get_fingerprint_for_address(self):
+ controller = Mock()
+
+ self.assertRaises(ValueError, _get_fingerprint, '127.0.0.1:-1', controller)
+ self.assertRaises(ValueError, _get_fingerprint, '127.0.0.901:80', controller)
+
+ descriptor = Mock()
+ descriptor.address = '127.0.0.1'
+ descriptor.or_port = 80
+ descriptor.fingerprint = FINGERPRINT
+
+ controller.get_network_statuses.return_value = [descriptor]
+
+ self.assertEqual(FINGERPRINT, _get_fingerprint('127.0.0.1', controller))
+ self.assertEqual(FINGERPRINT, _get_fingerprint('127.0.0.1:80', controller))
+ self.assertRaises(ValueError, _get_fingerprint, '127.0.0.1:81', controller)
+ self.assertRaises(ValueError, _get_fingerprint, '127.0.0.2', controller)
+
+ def test_get_fingerprint_for_unrecognized_inputs(self):
+ self.assertRaises(ValueError, _get_fingerprint, 'blarg!', Mock())
+
+ def test_quit(self):
+ interpretor = ControlInterpretor(CONTROLLER)
+ self.assertRaises(stem.SocketClosed, interpretor.run_command, '/quit')
+
def test_help(self):
interpretor = ControlInterpretor(CONTROLLER)
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits