[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [stem/master] ExitPolicyRule class
commit 0c069727eb5337cc7c3f422387e3f4d0ad5caa9c
Author: Damian Johnson <atagar@xxxxxxxxxxxxxx>
Date: Sat Jul 14 18:32:50 2012 -0700
ExitPolicyRule class
Adding a class that conforms with the dir-spec's exitpattern entity. Plus lots
'o unit tests that told me over and over and over again that I was being
stupid. Hopefully this version is at least kinda sorta close to being right...
---
run_tests.py | 6 +-
stem/exit_policy.py | 363 +++++++++++++++++++++++++++++++++----
stem/util/connection.py | 17 ++-
test/unit/exit_policy.py | 110 -----------
test/unit/exit_policy/__init__.py | 6 +
test/unit/exit_policy/policy.py | 107 +++++++++++
test/unit/exit_policy/rule.py | 327 +++++++++++++++++++++++++++++++++
7 files changed, 780 insertions(+), 156 deletions(-)
diff --git a/run_tests.py b/run_tests.py
index 23f993e..3b36046 100755
--- a/run_tests.py
+++ b/run_tests.py
@@ -32,7 +32,8 @@ import test.unit.util.enum
import test.unit.util.proc
import test.unit.util.system
import test.unit.util.tor_tools
-import test.unit.exit_policy
+import test.unit.exit_policy.policy
+import test.unit.exit_policy.rule
import test.unit.version
import test.integ.connection.authentication
import test.integ.connection.connect
@@ -109,7 +110,8 @@ UNIT_TESTS = (
test.unit.descriptor.reader.TestDescriptorReader,
test.unit.descriptor.server_descriptor.TestServerDescriptor,
test.unit.descriptor.extrainfo_descriptor.TestExtraInfoDescriptor,
- test.unit.exit_policy.TestExitPolicy,
+ test.unit.exit_policy.rule.TestExitPolicyRule,
+ test.unit.exit_policy.policy.TestExitPolicy,
test.unit.version.TestVersion,
test.unit.response.control_message.TestControlMessage,
test.unit.response.control_line.TestControlLine,
diff --git a/stem/exit_policy.py b/stem/exit_policy.py
index 33f5cc1..7040563 100644
--- a/stem/exit_policy.py
+++ b/stem/exit_policy.py
@@ -1,61 +1,342 @@
"""
-Tor Exit Policy information and requirements for its features. These can be
-easily parsed and compared, for instance...
+Representation of tor exit policies. These can be easily used to check if
+exiting to a destination is permissable or not. For instance...
->>> exit_policies = stem.exit_policy.ExitPolicy()
->>> exit_policies.add("accept *:80")
->>> exit_policies.add("accept *:443")
->>> exit_policies.add("reject *:*")
->>> print exit_policies
-accept *:80 , accept *:443, reject *:*
->>> print exit_policies.get_summary()
-accept 80, 443
->>> exit_policies.check("www.google.com", 80)
-True
+::
->>> microdesc_exit_policy = stem.exit_policy.MicrodescriptorExitPolicy("accept 80,443")
->>> print microdesc_exit_policy
-accept 80,443
->>> microdesc_exit_policy.check("www.google.com", 80)
-True
->>> microdesc_exit_policy.check(80)
-True
+ >>> exit_policies = stem.exit_policy.ExitPolicy()
+ >>> exit_policies.add("accept *:80")
+ >>> exit_policies.add("accept *:443")
+ >>> exit_policies.add("reject *:*")
+ >>> print exit_policies
+ accept *:80 , accept *:443, reject *:*
+ >>> print exit_policies.get_summary()
+ accept 80, 443
+ >>> exit_policies.check("www.google.com", 80)
+ True
+
+ >>> microdesc_exit_policy = stem.exit_policy.MicrodescriptorExitPolicy("accept 80,443")
+ >>> print microdesc_exit_policy
+ accept 80,443
+ >>> microdesc_exit_policy.check("www.google.com", 80)
+ True
+ >>> microdesc_exit_policy.check(80)
+ True
+
+::
+
+ ExitPolicyRule - Single rule of an exit policy
+ |- is_address_wildcard - checks if we'll accept any address for our type
+ |- is_port_wildcard - checks if we'll accept any port
+ |- is_match - checks if we match a given destination
+ +- __str__ - string representation for this rule
-ExitPolicyLine - Single rule from the exit policy
- |- __str__ - string representation
- +- check - check if exiting to this ip is allowed
+ ExitPolicyLine - Single rule from the exit policy
+ |- __str__ - string representation
+ +- check - check if exiting to this ip is allowed
-ExitPolicy - List of ExitPolicyLine objects
- |- __str__ - string representation
- |- __iter__ - ExitPolicyLine entries for the exit policy
- |- check - check if exiting to this ip is allowed
- |- add - add new rule to the exit policy
- |- get_summary - provides a summary description of the policy chain
- +- is_exiting_allowed - check if exit node
+ ExitPolicy - List of ExitPolicyLine objects
+ |- __str__ - string representation
+ |- __iter__ - ExitPolicyLine entries for the exit policy
+ |- check - check if exiting to this ip is allowed
+ |- add - add new rule to the exit policy
+ |- get_summary - provides a summary description of the policy chain
+ +- is_exiting_allowed - check if exit node
-MicrodescriptorExitPolicy - Microdescriptor exit policy
- |- check - check if exiting to this port is allowed
- |- ports - returns a list of ports
- |- is_accept - check if it's a list of accepted/rejected ports
- +- __str__ - return the summary
+ MicrodescriptorExitPolicy - Microdescriptor exit policy
+ |- check - check if exiting to this port is allowed
+ |- ports - returns a list of ports
+ |- is_accept - check if it's a list of accepted/rejected ports
+ +- __str__ - return the summary
"""
import stem.util.connection
+import stem.util.enum
+AddressType = stem.util.enum.Enum(("WILDCARD", "Wildcard"), ("IPv4", "IPv4"), ("IPv6", "IPv6"))
# ip address ranges substituted by the 'private' keyword
-PRIVATE_IP_RANGES = ("0.0.0.0/8", "169.254.0.0/16", "127.0.0.0/8", "192.168.0.0/16", "10.0.0.0/8", "172.16.0.0/12")
+PRIVATE_IP_RANGES = ("0.0.0.0/8", "169.254.0.0/16", "127.0.0.0/8",
+ "192.168.0.0/16", "10.0.0.0/8", "172.16.0.0/12")
+
+class ExitPolicyRule:
+ """
+ Single rule from the user's exit policy. These rules are chained together to
+ form complete policies that describe where a relay will and will not allow
+ traffic to exit.
+
+ The format of these rules are formally described in the dir-spec as an
+ "exitpattern". Note that while these are similar to tor's man page entry for
+ ExitPolicies, it's not the exact same. An exitpattern is better defined and
+ scricter in what it'll accept. For instance, ports are not optional and it
+ does not contain the 'private' alias.
+
+ :var str rule: rule that we were created from
+ :var bool is_accept: indicates if exiting is allowed or disallowed
+
+ :var AddressType address_type: type of address that we have
+ :var str address: address that this rule is for
+ :var str mask: subnet mask for the address (ex. "255.255.255.0")
+ :var int masked_bits: number of bits the subnet mask represents, None if the mask can't have a bit representation
+
+ :var int min_port: lower end of the port range that we include (inclusive)
+ :var int max_port: upper end of the port range that we include (inclusive)
+
+ :param str rule: exit policy rule to be parsed
+
+ :raises: ValueError if input isn't a valid tor exit policy rule
+ """
+
+ # TODO: Exitpatterns are used everywhere except the torrc. This is fine for
+ # now, but we should add a subclass to handle those slight differences later
+ # if we want to provide the ability to parse torrcs.
+
+ def __init__(self, rule):
+ self.rule = rule
+
+ # policy ::= "accept" exitpattern | "reject" exitpattern
+ # exitpattern ::= addrspec ":" portspec
+
+ if rule.startswith("accept"):
+ self.is_accept = True
+ elif rule.startswith("reject"):
+ self.is_accept = False
+ else:
+ raise ValueError("An exit policy must start with either 'accept' or 'reject': %s" % rule)
+
+ exitpattern = rule[6:]
+
+ if not exitpattern.startswith(" ") or (len(exitpattern) - 1 != len(exitpattern.lstrip())) :
+ raise ValueError("An exit policy should have a space separating its accept/reject from the exit pattern: %s" % rule)
+
+ exitpattern = exitpattern[1:]
+
+ if not ":" in exitpattern:
+ raise ValueError("An exitpattern must be of the form 'addrspec:portspec': %s" % rule)
+
+ addrspec, portspec = exitpattern.rsplit(":", 1)
+ self._addr_bin = self._mask_bin = None
+
+ # Parses the addrspec...
+ # addrspec ::= "*" | ip4spec | ip6spec
+
+ if "/" in addrspec:
+ self.address, addr_extra = addrspec.split("/", 1)
+ else:
+ self.address, addr_extra = addrspec, None
+
+ if addrspec == "*":
+ self.address_type = AddressType.WILDCARD
+ self.address = self.mask = self.masked_bits = None
+ elif stem.util.connection.is_valid_ip_address(self.address):
+ # ipv4spec ::= ip4 | ip4 "/" num_ip4_bits | ip4 "/" ip4mask
+ # ip4 ::= an IPv4 address in dotted-quad format
+ # ip4mask ::= an IPv4 mask in dotted-quad format
+ # num_ip4_bits ::= an integer between 0 and 32
+
+ self.address_type = AddressType.IPv4
+
+ if addr_extra is None:
+ self.mask = stem.util.connection.FULL_IPv4_MASK
+ self.masked_bits = 32
+ elif stem.util.connection.is_valid_ip_address(addr_extra):
+ # provided with an ip4mask
+ self.mask = addr_extra
+
+ try:
+ self.masked_bits = stem.util.connection.get_masked_bits(addr_extra)
+ except ValueError:
+ # mask can't be represented as a number of bits (ex. "255.255.0.255")
+ self.masked_bits = None
+ elif addr_extra.isdigit():
+ # provided with a num_ip4_bits
+ self.mask = stem.util.connection.get_mask(int(addr_extra))
+ self.masked_bits = int(addr_extra)
+ else:
+ raise ValueError("The '%s' isn't a mask nor number of bits: %s" % (addr_extra, rule))
+ elif self.address.startswith("[") and self.address.endswith("]") and \
+ stem.util.connection.is_valid_ipv6_address(self.address[1:-1]):
+ # ip6spec ::= ip6 | ip6 "/" num_ip6_bits
+ # ip6 ::= an IPv6 address, surrounded by square brackets.
+ # num_ip6_bits ::= an integer between 0 and 128
+
+ self.address = stem.util.connection.expand_ipv6_address(self.address[1:-1].upper())
+ self.address_type = AddressType.IPv6
+
+ if addr_extra is None:
+ self.mask = stem.util.connection.FULL_IPv6_MASK
+ self.masked_bits = 128
+ elif addr_extra.isdigit():
+ # provided with a num_ip6_bits
+ self.mask = stem.util.connection.get_mask_ipv6(int(addr_extra))
+ self.masked_bits = int(addr_extra)
+ else:
+ raise ValueError("The '%s' isn't a number of bits: %s" % (addr_extra, rule))
+ else:
+ raise ValueError("Address isn't a wildcard, IPv4, or IPv6 address: %s" % rule)
+
+ # Parses the portspec...
+ # portspec ::= "*" | port | port "-" port
+ # port ::= an integer between 1 and 65535, inclusive.
+ #
+ # Due to a tor bug the spec says that we should accept port of zero, but
+ # connections to port zero are never permitted.
+
+ if portspec == "*":
+ self.min_port, self.max_port = 1, 65535
+ elif portspec.isdigit():
+ # provided with a single port
+ if stem.util.connection.is_valid_port(portspec, allow_zero = True):
+ self.min_port = self.max_port = int(portspec)
+ else:
+ raise ValueError("'%s' isn't within a valid port range: %s" % (portspec, rule))
+ elif "-" in portspec:
+ # provided with a port range
+ port_comp = portspec.split("-", 1)
+
+ if stem.util.connection.is_valid_port(port_comp, allow_zero = True):
+ self.min_port = int(port_comp[0])
+ self.max_port = int(port_comp[1])
+
+ if self.min_port > self.max_port:
+ raise ValueError("Port range has a lower bound that's greater than its upper bound: %s" % rule)
+ else:
+ raise ValueError("Malformed port range: %s" % rule)
+ else:
+ raise ValueError("Port value isn't a wildcard, integer, or range: %s" % rule)
+
+ # Pre-calculating the integer representation of our mask and masked
+ # address. These are used by our is_match() method to compare ourselves to
+ # other addresses.
+
+ if self.address_type == AddressType.WILDCARD:
+ # is_match() will short circuit so these are unused
+ self._mask_bin = self._addr_bin = None
+ else:
+ self._mask_bin = int(stem.util.connection.get_address_binary(self.mask), 2)
+ self._addr_bin = int(stem.util.connection.get_address_binary(self.address), 2) & self._mask_bin
+
+ self._str_representation = None
+
+ def is_address_wildcard(self):
+ """
+ True if we'll match against any address for our type, False otherwise.
+
+ :returns: bool for if our address matching is a wildcard
+ """
+
+ return self.address_type == AddressType.WILDCARD or self.masked_bits == 0
+
+ def is_port_wildcard(self):
+ """
+ True if we'll match against any port, False otherwise.
+
+ :returns: bool for if our port matching is a wildcard
+ """
+
+ return self.min_port in (0, 1) and self.max_port == 65535
+
+ def is_match(self, address = None, port = None):
+ """
+ True if we match against the given destination, False otherwise. If the
+ address or port is omitted then that'll only match against a wildcard.
+
+ :param str address: IPv4 or IPv6 address (with or without brackets)
+ :param int port: port number
+
+ :returns: bool indicating if we match against this destination
+
+ :raises: ValueError if provided with a malformed address or port
+ """
+
+ # validate our input and check if the argumement doens't match our address type
+ if address != None:
+ if stem.util.connection.is_valid_ip_address(address):
+ if self.address_type == AddressType.IPv6: return False
+ elif stem.util.connection.is_valid_ipv6_address(address, allow_brackets = True):
+ if self.address_type == AddressType.IPv4: return False
+
+ address = address.lstrip("[").rstrip("]")
+ else:
+ raise ValueError("'%s' isn't a valid ipv4 or ipv6 address" % address)
+
+ if port != None and not stem.util.connection.is_valid_port(port):
+ raise ValueError("'%s' isn't a valid port" % port)
+
+ if address is None:
+ if self.address_type != AddressType.WILDCARD:
+ return False
+ elif not self.is_address_wildcard():
+ # Already got the integer representation of our mask and our address
+ # with the mask applied. Just need to check if this address with the
+ # mask applied matches.
+
+ comparison_addr_bin = int(stem.util.connection.get_address_binary(address), 2)
+ comparison_addr_bin &= self._mask_bin
+ if self._addr_bin != comparison_addr_bin: return False
+
+ if not self.is_port_wildcard():
+ if port is None:
+ return False
+ elif port < self.min_port or port > self.max_port:
+ return False
+
+ return True
+
+ def __str__(self):
+ """
+ Provides the string representation of our policy. This does not
+ necessarily match the rule that we were constructed from (due to things
+ like IPv6 address collapsing or the multiple representations that our mask
+ can have). However, it is a valid that would be accepted by our constructor
+ to re-create this rule.
+ """
+
+ if self._str_representation is None:
+ label = "accept " if self.is_accept else "reject "
+
+ if self.address_type == AddressType.WILDCARD:
+ label += "*:"
+ else:
+ if self.address_type == AddressType.IPv4:
+ label += self.address
+ else:
+ label += "[%s]" % self.address
+
+ # Including our mask label as follows...
+ # - exclde our mask if it doesn't do anything
+ # - use our masked bit count if we can
+ # - use the mask itself otherwise
+
+ if self.mask in (stem.util.connection.FULL_IPv4_MASK, stem.util.connection.FULL_IPv6_MASK):
+ label += ":"
+ elif not self.masked_bits is None:
+ label += "/%i:" % self.masked_bits
+ else:
+ label += "/%s:" % self.mask
+
+ if self.is_port_wildcard():
+ label += "*"
+ elif self.min_port == self.max_port:
+ label += str(self.min_port)
+ else:
+ label += "%i-%i" % (self.min_port, self.max_port)
+
+ self._str_representation = label
+
+ return self._str_representation
class ExitPolicyLine:
"""
Single rule from the user's exit policy. These are chained together to form
complete policies.
"""
-
+
def __init__(self, rule_entry):
"""
Exit Policy line constructor.
"""
+
# sanitize the input a bit, cleaning up tabs and stripping quotes
rule_entry = rule_entry.replace("\\t", " ").replace("\"", "")
@@ -83,8 +364,10 @@ class ExitPolicyLine:
# constructs the binary address just in case of comparison with a mask
if self.ip_address != "*":
- if not (stem.util.connection.is_valid_ip_address(self.ip_address) and stem.util.connection.is_valid_ipv6_address(self.ip_address)):
- raise ExitPolicyError
+ if not stem.util.connection.is_valid_ip_address(self.ip_address) and \
+ not stem.util.connection.is_valid_ipv6_address(self.ip_address):
+ raise ExitPolicyError()
+
self.ip_address_bin = ""
for octet in self.ip_address.split("."):
# Converts the int to a binary string, padded with zeros. Source:
@@ -92,7 +375,7 @@ class ExitPolicyLine:
self.ip_address_bin += "".join([str((int(octet) >> y) & 1) for y in range(7, -1, -1)])
else:
self.ip_address_bin = "0" * 32
-
+
# sets the port component
self.min_port, self.max_port = 0, 0
self.is_port_wildcard = entry_port == "*"
@@ -109,7 +392,7 @@ class ExitPolicyLine:
raise ExitPolicyError
self.min_port = int(entry_port)
self.max_port = int(entry_port)
-
+
def __str__(self):
# This provides the actual policy rather than the entry used to construct
# it so the 'private' keyword is expanded.
diff --git a/stem/util/connection.py b/stem/util/connection.py
index a70389e..e1e8890 100644
--- a/stem/util/connection.py
+++ b/stem/util/connection.py
@@ -7,7 +7,7 @@ but for now just moving the parts we need.
::
is_valid_ip_address - checks if a string is a valid IPv4 address
- is_valid_ip_ipv6_address - checks if a string is a valid IPv6 address
+ is_valid_ipv6_address - checks if a string is a valid IPv6 address
is_valid_port - checks if something is a valid representation for a port
expand_ipv6_address - provides an IPv6 address with its collapsed portions expanded
get_mask - provides the mask representation for a given number of bits
@@ -24,6 +24,9 @@ import hashlib
CRYPTOVARIABLE_EQUALITY_COMPARISON_NONCE = os.urandom(32)
+FULL_IPv4_MASK = "255.255.255.255"
+FULL_IPv6_MASK = "FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF"
+
def is_valid_ip_address(address):
"""
Checks if a string is a valid IPv4 address.
@@ -45,15 +48,20 @@ def is_valid_ip_address(address):
return True
-def is_valid_ipv6_address(address):
+def is_valid_ipv6_address(address, allow_brackets = False):
"""
Checks if a string is a valid IPv6 address.
:param str address: string to be checked
+ :param bool allow_brackets: ignore brackets which form '[address]'
:returns: True if input is a valid IPv6 address, False otherwise
"""
+ if allow_brackets:
+ if address.startswith("[") and address.endswith("]"):
+ address = address[1:-1]
+
# addresses are made up of eight colon separated groups of four hex digits
# with leading zeros being optional
# https://en.wikipedia.org/wiki/IPv6#Address_format
@@ -85,9 +93,10 @@ def is_valid_port(entry, allow_zero = False):
if isinstance(entry, list):
for port in entry:
- if not is_valid_port(port):
+ if not is_valid_port(port, allow_zero):
return False
-
+
+ return True
elif isinstance(entry, str):
if not entry.isdigit():
return False
diff --git a/test/unit/exit_policy.py b/test/unit/exit_policy.py
deleted file mode 100644
index b12cd9a..0000000
--- a/test/unit/exit_policy.py
+++ /dev/null
@@ -1,110 +0,0 @@
-"""
-Unit tests for the stem.exit_policy.ExitPolicy parsing and class.
-"""
-
-import unittest
-import stem.exit_policy
-import stem.util.system
-
-import test.mocking as mocking
-
-class TestExitPolicy(unittest.TestCase):
- def tearDown(self):
- pass
-
- def test_parsing(self):
- """
- Tests parsing by the ExitPolicy class constructor.
- """
-
- exit_policies = stem.exit_policy.ExitPolicy()
- exit_policies.add("accept *:80")
- exit_policies.add("accept *:443")
- exit_policies.add("reject *:*")
- self.assertEqual(str(exit_policies), "accept *:80, accept *:443, reject *:*")
-
- exit_policies = stem.exit_policy.ExitPolicy()
-
- # check ip address
- self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept 256.255.255.255:80")
- self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept -10.255.255.255:80")
- self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept 255.-10.255.255:80")
- self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept 255.255.-10.255:80")
- self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept -255.255.255.-10:80")
- self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept a.b.c.d:80")
- self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept 255.255.255:80")
- self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept -255.255:80")
- self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept 255:80")
- self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept -:80")
- self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept :80")
- self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept ...:80")
-
- # check input string
- self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "foo 255.255.255.255:80")
-
- # check ports
- self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept *:0001")
- self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept *:0")
- self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept *:-1")
- self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept *:+1")
- self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept *:+1-1")
- self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept *:a")
- self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept *:70000")
-
- def test_check(self):
- """
- Tests if exiting to this ip is allowed.
- """
-
- exit_policies = stem.exit_policy.ExitPolicy()
- exit_policies = stem.exit_policy.ExitPolicy()
- exit_policies.add("accept *:80")
- exit_policies.add("accept *:443")
- exit_policies.add("reject *:*")
-
- self.assertTrue(exit_policies.check("www.google.com", 80))
- self.assertTrue(exit_policies.check("www.atagar.com", 443))
-
- self.assertFalse(exit_policies.check("www.atagar.com", 22))
- self.assertFalse(exit_policies.check("www.atagar.com", 8118))
-
- def test_is_exiting_allowed(self):
- """
- Tests if this is an exit node
- """
-
- exit_policies = stem.exit_policy.ExitPolicy()
- exit_policies = stem.exit_policy.ExitPolicy()
- exit_policies.add("accept *:80")
- exit_policies.add("accept *:443")
- exit_policies.add("reject *:*")
-
- self.assertTrue(exit_policies.is_exiting_allowed())
-
- exit_policies = stem.exit_policy.ExitPolicy()
- exit_policies = stem.exit_policy.ExitPolicy()
- exit_policies.add("reject *:*")
-
- self.assertFalse(exit_policies.is_exiting_allowed())
-
- def test_microdesc_exit_parsing(self):
- microdesc_exit_policy = stem.exit_policy.MicrodescriptorExitPolicy("accept 80,443")
-
- self.assertEqual(str(microdesc_exit_policy),"accept 80,443")
-
- self.assertRaises(stem.exit_policy.ExitPolicyError, stem.exit_policy.MicrodescriptorExitPolicy, "accept 80,-443")
- self.assertRaises(stem.exit_policy.ExitPolicyError, stem.exit_policy.MicrodescriptorExitPolicy, "accept 80,+443")
- self.assertRaises(stem.exit_policy.ExitPolicyError, stem.exit_policy.MicrodescriptorExitPolicy, "accept 80,66666")
- self.assertRaises(stem.exit_policy.ExitPolicyError, stem.exit_policy.MicrodescriptorExitPolicy, "reject 80,foo")
- self.assertRaises(stem.exit_policy.ExitPolicyError, stem.exit_policy.MicrodescriptorExitPolicy, "bar 80,foo")
- self.assertRaises(stem.exit_policy.ExitPolicyError, stem.exit_policy.MicrodescriptorExitPolicy, "foo")
- self.assertRaises(stem.exit_policy.ExitPolicyError, stem.exit_policy.MicrodescriptorExitPolicy, "bar 80-foo")
-
- def test_micodesc_exit_check(self):
- microdesc_exit_policy = stem.exit_policy.MicrodescriptorExitPolicy("accept 80,443")
-
- self.assertTrue(microdesc_exit_policy.check(80))
- self.assertTrue(microdesc_exit_policy.check("www.atagar.com", 443))
-
- self.assertFalse(microdesc_exit_policy.check(22))
- self.assertFalse(microdesc_exit_policy.check("www.atagar.com", 8118))
diff --git a/test/unit/exit_policy/__init__.py b/test/unit/exit_policy/__init__.py
new file mode 100644
index 0000000..99a4651
--- /dev/null
+++ b/test/unit/exit_policy/__init__.py
@@ -0,0 +1,6 @@
+"""
+Unit tests for stem.exit_policy.py contents.
+"""
+
+__all__ = ["policy", "rule"]
+
diff --git a/test/unit/exit_policy/policy.py b/test/unit/exit_policy/policy.py
new file mode 100644
index 0000000..10088e2
--- /dev/null
+++ b/test/unit/exit_policy/policy.py
@@ -0,0 +1,107 @@
+"""
+Unit tests for the stem.exit_policy.ExitPolicy parsing and class.
+"""
+
+import unittest
+import stem.exit_policy
+import stem.util.system
+
+import test.mocking as mocking
+
+class TestExitPolicy(unittest.TestCase):
+ def test_parsing(self):
+ """
+ Tests parsing by the ExitPolicy class constructor.
+ """
+
+ exit_policies = stem.exit_policy.ExitPolicy()
+ exit_policies.add("accept *:80")
+ exit_policies.add("accept *:443")
+ exit_policies.add("reject *:*")
+ self.assertEqual(str(exit_policies), "accept *:80, accept *:443, reject *:*")
+
+ exit_policies = stem.exit_policy.ExitPolicy()
+
+ # check ip address
+ self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept 256.255.255.255:80")
+ self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept -10.255.255.255:80")
+ self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept 255.-10.255.255:80")
+ self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept 255.255.-10.255:80")
+ self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept -255.255.255.-10:80")
+ self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept a.b.c.d:80")
+ self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept 255.255.255:80")
+ self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept -255.255:80")
+ self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept 255:80")
+ self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept -:80")
+ self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept :80")
+ self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept ...:80")
+
+ # check input string
+ self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "foo 255.255.255.255:80")
+
+ # check ports
+ self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept *:0001")
+ self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept *:0")
+ self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept *:-1")
+ self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept *:+1")
+ self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept *:+1-1")
+ self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept *:a")
+ self.assertRaises(stem.exit_policy.ExitPolicyError, exit_policies.add, "accept *:70000")
+
+ def test_check(self):
+ """
+ Tests if exiting to this ip is allowed.
+ """
+
+ exit_policies = stem.exit_policy.ExitPolicy()
+ exit_policies = stem.exit_policy.ExitPolicy()
+ exit_policies.add("accept *:80")
+ exit_policies.add("accept *:443")
+ exit_policies.add("reject *:*")
+
+ self.assertTrue(exit_policies.check("www.google.com", 80))
+ self.assertTrue(exit_policies.check("www.atagar.com", 443))
+
+ self.assertFalse(exit_policies.check("www.atagar.com", 22))
+ self.assertFalse(exit_policies.check("www.atagar.com", 8118))
+
+ def test_is_exiting_allowed(self):
+ """
+ Tests if this is an exit node
+ """
+
+ exit_policies = stem.exit_policy.ExitPolicy()
+ exit_policies = stem.exit_policy.ExitPolicy()
+ exit_policies.add("accept *:80")
+ exit_policies.add("accept *:443")
+ exit_policies.add("reject *:*")
+
+ self.assertTrue(exit_policies.is_exiting_allowed())
+
+ exit_policies = stem.exit_policy.ExitPolicy()
+ exit_policies = stem.exit_policy.ExitPolicy()
+ exit_policies.add("reject *:*")
+
+ self.assertFalse(exit_policies.is_exiting_allowed())
+
+ def test_microdesc_exit_parsing(self):
+ microdesc_exit_policy = stem.exit_policy.MicrodescriptorExitPolicy("accept 80,443")
+
+ self.assertEqual(str(microdesc_exit_policy),"accept 80,443")
+
+ self.assertRaises(stem.exit_policy.ExitPolicyError, stem.exit_policy.MicrodescriptorExitPolicy, "accept 80,-443")
+ self.assertRaises(stem.exit_policy.ExitPolicyError, stem.exit_policy.MicrodescriptorExitPolicy, "accept 80,+443")
+ self.assertRaises(stem.exit_policy.ExitPolicyError, stem.exit_policy.MicrodescriptorExitPolicy, "accept 80,66666")
+ self.assertRaises(stem.exit_policy.ExitPolicyError, stem.exit_policy.MicrodescriptorExitPolicy, "reject 80,foo")
+ self.assertRaises(stem.exit_policy.ExitPolicyError, stem.exit_policy.MicrodescriptorExitPolicy, "bar 80,foo")
+ self.assertRaises(stem.exit_policy.ExitPolicyError, stem.exit_policy.MicrodescriptorExitPolicy, "foo")
+ self.assertRaises(stem.exit_policy.ExitPolicyError, stem.exit_policy.MicrodescriptorExitPolicy, "bar 80-foo")
+
+ def test_micodesc_exit_check(self):
+ microdesc_exit_policy = stem.exit_policy.MicrodescriptorExitPolicy("accept 80,443")
+
+ self.assertTrue(microdesc_exit_policy.check(80))
+ self.assertTrue(microdesc_exit_policy.check("www.atagar.com", 443))
+
+ self.assertFalse(microdesc_exit_policy.check(22))
+ self.assertFalse(microdesc_exit_policy.check("www.atagar.com", 8118))
diff --git a/test/unit/exit_policy/rule.py b/test/unit/exit_policy/rule.py
new file mode 100644
index 0000000..13a414d
--- /dev/null
+++ b/test/unit/exit_policy/rule.py
@@ -0,0 +1,327 @@
+"""
+Unit tests for the stem.exit_policy.ExitPolicyRule class.
+"""
+
+import unittest
+
+from stem.exit_policy import AddressType, ExitPolicyRule
+
+class TestExitPolicyRule(unittest.TestCase):
+ def test_accept_or_reject(self):
+ self.assertTrue(ExitPolicyRule("accept *:*").is_accept)
+ self.assertFalse(ExitPolicyRule("reject *:*").is_accept)
+
+ invalid_inputs = (
+ "accept",
+ "reject",
+ "accept *:*",
+ "accept\t*:*",
+ "accept\n*:*",
+ "acceptt *:*",
+ "rejectt *:*",
+ "blarg *:*",
+ " *:*",
+ "*:*",
+ "",
+ )
+
+ for rule_arg in invalid_inputs:
+ self.assertRaises(ValueError, ExitPolicyRule, rule_arg)
+
+ def test_str_unchanged(self):
+ # provides a series of test inputs where the str() representation should
+ # match the input rule
+
+ test_inputs = (
+ "accept *:*",
+ "reject *:*",
+ "accept *:80",
+ "accept *:80-443",
+ "accept 127.0.0.1:80",
+ "accept 87.0.0.1/24:80",
+ "accept 156.5.38.3/255.255.0.255:80",
+ "accept [FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF]:80",
+ "accept [FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF]/32:80",
+ )
+
+ for rule_arg in test_inputs:
+ rule = ExitPolicyRule(rule_arg)
+ self.assertEquals(rule_arg, rule.rule)
+ self.assertEquals(rule_arg, str(rule))
+
+ def test_str_changed(self):
+ # some instances where our rule is valid but won't match our str() representation
+ test_inputs = {
+ "accept 10.0.0.1/32:80": "accept 10.0.0.1:80",
+ "accept 192.168.0.1/255.255.255.0:80": "accept 192.168.0.1/24:80",
+ "accept [::]/32:*": "accept [0000:0000:0000:0000:0000:0000:0000:0000]/32:*",
+ "accept [::]/128:*": "accept [0000:0000:0000:0000:0000:0000:0000:0000]:*",
+ }
+
+ for rule_arg, expected_str in test_inputs.items():
+ rule = ExitPolicyRule(rule_arg)
+ self.assertEquals(rule_arg, rule.rule)
+ self.assertEquals(expected_str, str(rule))
+
+ def test_valid_wildcard(self):
+ test_inputs = {
+ "reject *:*": (True, True),
+ "reject *:80": (True, False),
+ "accept 192.168.0.1:*": (False, True),
+ "accept 192.168.0.1:80": (False, False),
+
+ "reject 127.0.0.1/0:*": (True, True),
+ "reject 127.0.0.1/16:*": (False, True),
+ "reject 127.0.0.1/32:*": (False, True),
+ "reject [0000:0000:0000:0000:0000:0000:0000:0000]/0:80": (True, False),
+ "reject [0000:0000:0000:0000:0000:0000:0000:0000]/64:80": (False, False),
+ "reject [0000:0000:0000:0000:0000:0000:0000:0000]/128:80": (False, False),
+
+ "accept 192.168.0.1:0-65535": (False, True),
+ "accept 192.168.0.1:1-65535": (False, True),
+ "accept 192.168.0.1:2-65535": (False, False),
+ "accept 192.168.0.1:1-65534": (False, False),
+ }
+
+ for rule_arg, attr in test_inputs.items():
+ is_address_wildcard, is_port_wildcard = attr
+
+ rule = ExitPolicyRule(rule_arg)
+ self.assertEquals(is_address_wildcard, rule.is_address_wildcard())
+ self.assertEquals(is_port_wildcard, rule.is_port_wildcard())
+
+ def test_invalid_wildcard(self):
+ test_inputs = (
+ "reject */16:*",
+ "reject 127.0.0.1/*:*",
+ "reject *:0-*",
+ "reject *:*-15",
+ )
+
+ for rule_arg in test_inputs:
+ self.assertRaises(ValueError, ExitPolicyRule, rule_arg)
+
+ def test_wildcard_attributes(self):
+ rule = ExitPolicyRule("reject *:*")
+ self.assertEquals(AddressType.WILDCARD, rule.address_type)
+ self.assertEquals(None, rule.address)
+ self.assertEquals(None, rule.mask)
+ self.assertEquals(None, rule.masked_bits)
+ self.assertEquals(1, rule.min_port)
+ self.assertEquals(65535, rule.max_port)
+
+ def test_valid_ipv4_addresses(self):
+ test_inputs = {
+ "0.0.0.0": ("0.0.0.0", "255.255.255.255", 32),
+ "127.0.0.1/32": ("127.0.0.1", "255.255.255.255", 32),
+ "192.168.0.50/24": ("192.168.0.50", "255.255.255.0", 24),
+ "255.255.255.255/0": ("255.255.255.255", "0.0.0.0", 0),
+ }
+
+ for rule_addr, attr in test_inputs.items():
+ address, mask, masked_bits = attr
+
+ rule = ExitPolicyRule("accept %s:*" % rule_addr)
+ self.assertEquals(AddressType.IPv4, rule.address_type)
+ self.assertEquals(address, rule.address)
+ self.assertEquals(mask, rule.mask)
+ self.assertEquals(masked_bits, rule.masked_bits)
+
+ def test_invalid_ipv4_addresses(self):
+ test_inputs = {
+ "256.0.0.0",
+ "-1.0.0.0",
+ "0.0.0",
+ "0.0.0.",
+ "0.0.0.a",
+ "127.0.0.1/-1",
+ "127.0.0.1/33",
+ }
+
+ for rule_addr in test_inputs:
+ self.assertRaises(ValueError, ExitPolicyRule, "accept %s:*" % rule_addr)
+
+ def test_valid_ipv6_addresses(self):
+ test_inputs = {
+ "[fe80:0000:0000:0000:0202:b3ff:fe1e:8329]":
+ ("FE80:0000:0000:0000:0202:B3FF:FE1E:8329",
+ "FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF", 128),
+ "[FE80::0202:b3ff:fe1e:8329]":
+ ("FE80:0000:0000:0000:0202:B3FF:FE1E:8329",
+ "FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF", 128),
+ "[0000:0000:0000:0000:0000:0000:0000:0000]/0":
+ ("0000:0000:0000:0000:0000:0000:0000:0000",
+ "0000:0000:0000:0000:0000:0000:0000:0000", 0),
+ "[::]":
+ ("0000:0000:0000:0000:0000:0000:0000:0000",
+ "FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF:FFFF", 128),
+ }
+
+ for rule_addr, attr in test_inputs.items():
+ address, mask, masked_bits = attr
+
+ rule = ExitPolicyRule("accept %s:*" % rule_addr)
+ self.assertEquals(AddressType.IPv6, rule.address_type)
+ self.assertEquals(address, rule.address)
+ self.assertEquals(mask, rule.mask)
+ self.assertEquals(masked_bits, rule.masked_bits)
+
+ def test_invalid_ipv6_addresses(self):
+ test_inputs = (
+ "fe80::0202:b3ff:fe1e:8329",
+ "[fe80::0202:b3ff:fe1e:8329",
+ "fe80::0202:b3ff:fe1e:8329]",
+ "[fe80::0202:b3ff:fe1e:832g]",
+ "[fe80:::b3ff:fe1e:8329]",
+ "[fe80::b3ff::fe1e:8329]",
+ "[fe80::0202:b3ff:fe1e:8329]/-1",
+ "[fe80::0202:b3ff:fe1e:8329]/129",
+ )
+
+ for rule_addr in test_inputs:
+ self.assertRaises(ValueError, ExitPolicyRule, "accept %s:*" % rule_addr)
+
+ def test_valid_ports(self):
+ test_inputs = {
+ "0": (0, 0),
+ "1": (1, 1),
+ "80": (80, 80),
+ "80-443": (80, 443),
+ }
+
+ for rule_port, attr in test_inputs.items():
+ min_port, max_port = attr
+
+ rule = ExitPolicyRule("accept 127.0.0.1:%s" % rule_port)
+ self.assertEquals(min_port, rule.min_port)
+ self.assertEquals(max_port, rule.max_port)
+
+ def test_invalid_ports(self):
+ test_inputs = (
+ "65536",
+ "a",
+ "5-3",
+ "5-",
+ "-3",
+ )
+
+ for rule_port in test_inputs:
+ self.assertRaises(ValueError, ExitPolicyRule, "accept 127.0.0.1:%s" % rule_port)
+
+ def test_is_match_wildcard(self):
+ test_inputs = {
+ "reject *:*": {
+ ("192.168.0.1", 80): True,
+ ("0.0.0.0", 80): True,
+ ("255.255.255.255", 80): True,
+ ("FE80:0000:0000:0000:0202:B3FF:FE1E:8329", 80): True,
+ ("[FE80:0000:0000:0000:0202:B3FF:FE1E:8329]", 80): True,
+ ("192.168.0.1", None): True,
+ (None, 80): True,
+ (None, None): True,
+ },
+ "reject 255.255.255.255/0:*": {
+ ("192.168.0.1", 80): True,
+ ("0.0.0.0", 80): True,
+ ("255.255.255.255", 80): True,
+ ("FE80:0000:0000:0000:0202:B3FF:FE1E:8329", 80): False,
+ ("[FE80:0000:0000:0000:0202:B3FF:FE1E:8329]", 80): False,
+ ("192.168.0.1", None): True,
+ (None, 80): False,
+ (None, None): False,
+ },
+ }
+
+ for rule_arg, matches in test_inputs.items():
+ rule = ExitPolicyRule(rule_arg)
+
+ for match_args, expected_result in matches.items():
+ self.assertEquals(expected_result, rule.is_match(*match_args))
+
+ # port zero is special in that exit policies can include it, but it's not
+ # something that we can match against
+
+ rule = ExitPolicyRule("reject *:*")
+ self.assertRaises(ValueError, rule.is_match, "127.0.0.1", 0)
+
+ def test_is_match_ipv4(self):
+ test_inputs = {
+ "reject 192.168.0.50:*": {
+ ("192.168.0.50", 80): True,
+ ("192.168.0.51", 80): False,
+ ("192.168.0.49", 80): False,
+ (None, 80): False,
+ ("192.168.0.50", None): True,
+ },
+ "reject 0.0.0.0/24:*": {
+ ("0.0.0.0", 80): True,
+ ("0.0.0.1", 80): True,
+ ("0.0.0.255", 80): True,
+ ("0.0.1.0", 80): False,
+ ("0.1.0.0", 80): False,
+ ("1.0.0.0", 80): False,
+ (None, 80): False,
+ ("0.0.0.0", None): True,
+ },
+ }
+
+ for rule_arg, matches in test_inputs.items():
+ rule = ExitPolicyRule(rule_arg)
+
+ for match_args, expected_result in matches.items():
+ self.assertEquals(expected_result, rule.is_match(*match_args))
+
+ def test_is_match_ipv6(self):
+ test_inputs = {
+ "reject [FE80:0000:0000:0000:0202:B3FF:FE1E:8329]:*": {
+ ("FE80:0000:0000:0000:0202:B3FF:FE1E:8329", 80): True,
+ ("fe80:0000:0000:0000:0202:b3ff:fe1e:8329", 80): True,
+ ("[FE80:0000:0000:0000:0202:B3FF:FE1E:8329]", 80): True,
+ ("FE80:0000:0000:0000:0202:B3FF:FE1E:8330", 80): False,
+ ("FE80:0000:0000:0000:0202:B3FF:FE1E:8328", 80): False,
+ (None, 80): False,
+ ("FE80:0000:0000:0000:0202:B3FF:FE1E:8329", None): True,
+ },
+ "reject [FE80:0000:0000:0000:0202:B3FF:FE1E:8329]/112:*": {
+ ("FE80:0000:0000:0000:0202:B3FF:FE1E:8329", 80): True,
+ ("FE80:0000:0000:0000:0202:B3FF:FE1E:0000", 80): True,
+ ("FE80:0000:0000:0000:0202:B3FF:FE1E:FFFF", 80): True,
+ ("FE80:0000:0000:0000:0202:B3FF:FE1F:8329", 80): False,
+ ("FE81:0000:0000:0000:0202:B3FF:FE1E:8329", 80): False,
+ (None, 80): False,
+ ("FE80:0000:0000:0000:0202:B3FF:FE1E:8329", None): True,
+ },
+ }
+
+ for rule_arg, matches in test_inputs.items():
+ rule = ExitPolicyRule(rule_arg)
+
+ for match_args, expected_result in matches.items():
+ self.assertEquals(expected_result, rule.is_match(*match_args))
+
+ def test_is_match_port(self):
+ test_inputs = {
+ "reject *:80": {
+ ("192.168.0.50", 80): True,
+ ("192.168.0.50", 81): False,
+ ("192.168.0.50", 79): False,
+ (None, 80): True,
+ ("192.168.0.50", None): False,
+ },
+ "reject *:80-85": {
+ ("192.168.0.50", 79): False,
+ ("192.168.0.50", 80): True,
+ ("192.168.0.50", 83): True,
+ ("192.168.0.50", 85): True,
+ ("192.168.0.50", 86): False,
+ (None, 83): True,
+ ("192.168.0.50", None): False,
+ },
+ }
+
+ for rule_arg, matches in test_inputs.items():
+ rule = ExitPolicyRule(rule_arg)
+
+ for match_args, expected_result in matches.items():
+ self.assertEquals(expected_result, rule.is_match(*match_args))
+
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits