[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [flashproxy/master] simplify Endpoints a bit
commit e027db3d3348944c7f75cda52c41b020f02b2aba
Author: Ximin Luo <infinity0@xxxxxxx>
Date: Fri Oct 11 15:21:14 2013 +0100
simplify Endpoints a bit
- remove Endpoints.supports() and related code - the important thing actually is whether a proxy supports a transport, not whether we have relays that support it
- use defaultdict to get rid of some boilerplate, and populate _indexes unconditionally
---
facilitator/facilitator | 89 +++++++++++++++++-------------------------
facilitator/facilitator-test | 61 ++++++++---------------------
2 files changed, 53 insertions(+), 97 deletions(-)
diff --git a/facilitator/facilitator b/facilitator/facilitator
index d011013..9e11826 100755
--- a/facilitator/facilitator
+++ b/facilitator/facilitator
@@ -8,6 +8,7 @@ import sys
import threading
import time
import traceback
+from collections import defaultdict
import fac
from fac import Transport, Endpoint
@@ -26,6 +27,7 @@ CLIENT_TIMEOUT = 1.0
READLINE_MAX_LENGTH = 10240
MAX_PROXIES_PER_CLIENT = 5
+DEFAULT_OUTER_TRANSPORTS = ["websocket"]
LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
@@ -45,6 +47,7 @@ class options(object):
pid_filename = None
privdrop_username = None
safe_logging = True
+ outer_transports = DEFAULT_OUTER_TRANSPORTS
def usage(f = sys.stdout):
print >> f, """\
@@ -59,11 +62,15 @@ again. Listen on 127.0.0.1 and port PORT (by default %(port)d).
--pidfile FILENAME write PID to FILENAME after daemonizing.
--privdrop-user USER switch UID and GID to those of USER.
-r, --relay-file RELAY learn relays from FILE.
+ --outer-transports TRANSPORTS
+ comma-sep list of outer transports to accept proxies
+ for (by default %(outer-transports)s)
--unsafe-logging don't scrub IP addresses from logs.\
""" % {
"progname": sys.argv[0],
"port": DEFAULT_LISTEN_PORT,
"log": DEFAULT_LOG_FILENAME,
+ "outer-transports": ",".join(DEFAULT_OUTER_TRANSPORTS)
}
def safe_str(s):
@@ -87,16 +94,13 @@ class Endpoints(object):
matchingLock = threading.Condition()
- def __init__(self, af, maxserve=float("inf"), known_outer=("websocket",)):
+ def __init__(self, af, maxserve=float("inf")):
self.af = af
self._maxserve = maxserve
self._endpoints = {} # address -> transport
- self._indexes = {} # outer -> [ addresses ]
+ self._indexes = defaultdict(lambda: defaultdict(set)) # outer -> inner -> [ addresses ]
self._served = {} # address -> num_times_served
self._cv = threading.Condition()
- self.known_outer = set(known_outer)
- for outer in self.known_outer:
- self._ensureIndexForOuter(outer)
def getNumEndpoints(self):
""":returns: the number of endpoints known to us."""
@@ -119,9 +123,10 @@ class Endpoints(object):
transport = Transport.parse(transport)
with self._cv:
if addr in self._endpoints: return False
+ inner, outer = transport
self._endpoints[addr] = transport
self._served[addr] = 0
- self._addAddrIntoIndexes(addr)
+ self._indexes[outer][inner].add(addr)
self._cv.notify()
return True
@@ -133,43 +138,26 @@ class Endpoints(object):
"""
with self._cv:
if addr not in self._endpoints: return False
- self._delAddrFromIndexes(addr)
+ inner, outer = self._endpoints[addr]
+ self._indexes[outer][inner].remove(addr) # TODO(infinity0): maybe delete empty bins
del self._served[addr]
del self._endpoints[addr]
self._cv.notify()
return True
- def supports(self, transport):
- """
- Estimate whether we support the given transport. May give false
- positives, but doing a proper match later on will catch these.
-
- :returns: True if we know, or have met, proxies that might be able
- to satisfy the requested transport against our known endpoints.
- """
- transport = Transport.parse(transport)
- with self._cv:
- known_inner = self._findInnerForOuter(*self.known_outer).keys()
- inner, outer = transport.inner, transport.outer
- return inner in known_inner and outer in self.known_outer
-
def _findInnerForOuter(self, *supported_outer):
"""
:returns: { inner: [addr] }, where each address supports some outer
from supported_outer. TODO(infinity0): describe better
"""
- self.known_outer.update(supported_outer)
- inners = {}
- for outer in supported_outer:
- self._ensureIndexForOuter(outer)
- for addr in self._indexes[outer]:
- inner = self._endpoints[addr].inner
- inners.setdefault(inner, set()).add(addr)
+ inners = defaultdict(set)
+ for outer in set(supported_outer) & set(self._indexes.iterkeys()):
+ for inner, addrs in self._indexes[outer].iteritems():
+ if addrs:
+ # don't add empty bins, to avoid false-positive key checks
+ inners[inner].update(addrs)
return inners
- def _avServed(self, addrpool):
- return sum(self._served[a] for a in addrpool) / float(len(addrpool))
-
def _serveReg(self, addrpool):
"""
:param list addrpool: List of candidate addresses.
@@ -186,20 +174,6 @@ class Endpoints(object):
self.delEndpoint(prio_addr)
return prio_addr
- def _ensureIndexForOuter(self, outer):
- if outer in self._indexes: return
- addrs = set(addr for addr, transport in self._endpoints.iteritems()
- if transport.outer == outer)
- self._indexes[outer] = addrs
-
- def _addAddrIntoIndexes(self, addr):
- outer = self._endpoints[addr].outer
- if outer in self._indexes: self._indexes[outer].add(addr)
-
- def _delAddrFromIndexes(self, addr):
- outer = self._endpoints[addr].outer
- if outer in self._indexes: self._indexes[outer].remove(addr)
-
EMPTY_MATCH = (None, None)
@staticmethod
def match(ptsClient, ptsServer, supported_outer):
@@ -208,7 +182,9 @@ class Endpoints(object):
the available endpoints that can satisfy supported_outer.
"""
if ptsClient.af != ptsServer.af:
- raise ValueError("address family not equal!")
+ raise ValueError("address family not equal")
+ if ptsServer._maxserve < float("inf"):
+ raise ValueError("servers mustn't run out")
# need to operate on both structures
# so hold both locks plus a pair-wise lock
with Endpoints.matchingLock, ptsClient._cv, ptsServer._cv:
@@ -216,16 +192,19 @@ class Endpoints(object):
client_inner = ptsClient._findInnerForOuter(*supported_outer)
both = set(server_inner.keys()) & set(client_inner.keys())
if not both: return Endpoints.EMPTY_MATCH
- # pick the inner whose client address pool is least well-served
- # TODO: this may be manipulated by clients, needs research
- assert all(client_inner.itervalues()) # no pool is empty
- inner = min(both, key=lambda p: ptsClient._avServed(client_inner[p]))
- client_addr = ptsClient._serveReg(client_inner[inner])
+ # find a client to serve
+ client_pool = [addr for inner in both for addr in client_inner[inner]]
+ assert len(client_pool)
+ client_addr = ptsClient._serveReg(client_pool)
+ # find a server to serve that has the same inner transport
+ inner = ptsClient._endpoints[client_addr].inner
+ assert inner in server_inner and len(server_inner[inner])
server_addr = ptsServer._serveReg(server_inner[inner])
# assume servers never run out
client_transport = ptsClient._endpoints[client_addr]
server_transport = ptsServer._endpoints[server_addr]
- return Endpoint(client_addr, client_transport), Endpoint(server_addr, server_transport)
+ return (Endpoint(client_addr, client_transport),
+ Endpoint(server_addr, server_transport))
class Handler(SocketServer.StreamRequestHandler):
@@ -354,7 +333,7 @@ class Handler(SocketServer.StreamRequestHandler):
transport = Transport.parse(transport)
# See if we have relays that support this transport
- if all(not pts.supports(transport) for pts in SERVERS.itervalues()):
+ if transport.outer not in options.outer_transports:
return self.error(u"Unrecognized transport: %s" % transport)
client_spec = fac.param_first("CLIENT", params)
@@ -445,6 +424,8 @@ def parse_relay_file(servers, fp):
raise ValueError("Wrong line format: %s." % repr(line))
addr = fac.parse_addr_spec(addr_spec, defport=DEFAULT_RELAY_PORT, resolve=True)
transport = Transport.parse(transport_spec)
+ if transport.outer not in options.outer_transports:
+ raise ValueError(u"Unrecognized transport: %s" % transport)
af = addr_af(addr[0])
servers[af].addEndpoint(addr, transport)
@@ -468,6 +449,8 @@ def main():
options.privdrop_username = a
elif o == "-r" or o == "--relay-file":
options.relay_filename = a
+ elif o == "--outer-transports":
+ options.outer_transports = a.split(",")
elif o == "--unsafe-logging":
options.safe_logging = False
diff --git a/facilitator/facilitator-test b/facilitator/facilitator-test
index 8e06053..8143348 100755
--- a/facilitator/facilitator-test
+++ b/facilitator/facilitator-test
@@ -29,56 +29,29 @@ class EndpointsTest(unittest.TestCase):
def setUp(self):
self.pts = Endpoints(af=socket.AF_INET)
- def _observeProxySupporting(self, *supported_outer):
- # semantically observe the existence of a proxy, to make our intent
- # a bit clearer than simply calling _findInnerForOuter
- self.pts._findInnerForOuter(*supported_outer)
-
def test_addEndpoints_twice(self):
self.pts.addEndpoint("A", "a|b|p")
self.assertFalse(self.pts.addEndpoint("A", "zzz"))
self.assertEquals(self.pts._endpoints["A"], Transport("a|b", "p"))
- def test_addEndpoints_lazy_indexing(self):
+ def test_delEndpoints_twice(self):
+ self.pts.addEndpoint("A", "a|b|p")
+ self.assertTrue(self.pts.delEndpoint("A"))
+ self.assertFalse(self.pts.delEndpoint("A"))
+ self.assertEquals(self.pts._endpoints.get("A"), None)
+
+ def test_Endpoints_indexing(self):
+ self.assertEquals(self.pts._indexes.get("p"), None)
+ # test defaultdict works as expected
+ self.assertEquals(self.pts._indexes["p"]["a|b"], set(""))
self.pts.addEndpoint("A", "a|b|p")
- default_index = {"websocket": set()} # we always index known_outer
-
- # no index until we've asked for it
- self.assertEquals(self.pts._indexes, default_index)
- self._observeProxySupporting("p")
- self.assertEquals(self.pts._indexes["p"], set("A"))
-
- # indexes are updated correctly after observing new addresses
- self.pts.addEndpoint("B", "c|p")
- self.assertEquals(self.pts._indexes["p"], set("AB"))
-
- # indexes are updated correctly after observing new proxies
- self.pts.addEndpoint("C", "a|q")
- self._observeProxySupporting("q")
- self.assertEquals(self.pts._indexes["q"], set("C"))
-
- def test_supports_default(self):
- # we know there are websocket-capable proxies out there;
- # support them implicitly without needing to see a proxy.
- self.pts.addEndpoint("A", "obfs3|websocket")
- self.assertTrue(self.pts.supports("obfs3|websocket"))
- self.assertFalse(self.pts.supports("xxx|websocket"))
- self.assertFalse(self.pts.supports("websocket"))
- self.assertFalse(self.pts.supports("unknownwhat"))
- # doesn't matter what the first part is
- self.pts.addEndpoint("B", "xxx|websocket")
- self.assertTrue(self.pts.supports("xxx|websocket"))
-
- def test_supports_seen_proxy(self):
- # OTOH if some 3rd-party proxy decides to implement its own transport
- # we are fully capable of supporting them too, but only if we have
- # an endpoint that also speaks it.
- self.assertFalse(self.pts.supports("obfs3|unknownwhat"))
- self._observeProxySupporting("unknownwhat")
- self.assertFalse(self.pts.supports("obfs3|unknownwhat"))
- self.pts.addEndpoint("A", "obfs3|unknownwhat")
- self.assertTrue(self.pts.supports("obfs3|unknownwhat"))
- self.assertFalse(self.pts.supports("obfs2|unknownwhat"))
+ self.assertEquals(self.pts._indexes["p"]["a|b"], set("A"))
+ self.pts.addEndpoint("B", "a|b|p")
+ self.assertEquals(self.pts._indexes["p"]["a|b"], set("AB"))
+ self.pts.delEndpoint("A")
+ self.assertEquals(self.pts._indexes["p"]["a|b"], set("B"))
+ self.pts.delEndpoint("B")
+ self.assertEquals(self.pts._indexes["p"]["a|b"], set(""))
def test_serveReg_maxserve_infinite_roundrobin(self):
# case for servers, they never exhaust
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits