[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] Break and fix server repeatedly until DNS-based routing...
Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv21642/lib/mixminion/server
Modified Files:
DNSFarm.py MMTPServer.py PacketHandler.py ServerConfig.py
ServerKeys.py ServerMain.py ServerQueue.py
Log Message:
Break and fix server repeatedly until DNS-based routing works. Hurray.
TODO:
- Reflect state of work
- Defer client-side fragment reassembly
setup.py
- Bump version to 0.0.6alpha2
BuildMessage.py
- Choose relay types and routing info based on server capabilities -- don't
just assume that IPV4 is right for everyone.
ClientDirectory.py
- Document everything.
- Remove spurious isSURB field from ExitAddress
- Fix theoretical bug that would crash path generation with non-mixminion
servers.
- Deprecate unadorned '*' in paths.
- Note bug with path length checking.
ClientMain.py
- Deprecate -H; use -P foo instead.
- Make list-servers work
Config.py, NetUtils.py:
- Refactor _parseIP and _parseIP6 validation functions into NetUtils.
Config.py
- Add documentation
NetUtils.py
- Add documentation
- Debug getIP
- Add function to detect static IP4 or IP6 addresses.
Packet.py
- Debug parseRelayInfoByType
- Documentation
ServerInfo.py:
- Documentation
- Disable hostname-based routing with 0.0.6alpha1 servers. (I don't
want to break Tonga and peertech.)
test.py:
- Add tests for DNS functionality
- Add tests for DNS farm functionality
- Add tests for new ServerInfo methods
DNSFarm.py
- Add documentation
- Make DNSCache.shutdown() more failsafe, and make shutdown(wait=1) not
deadlock the server.
- Add special-case test to skip hostname lookup for static IP addresses
MMTPServer.py
- Make sure that we get real RelayPacket objects.
- Choose whether to use IP4 or IP6 connections in MMTPClientConnection
PacketHandler.py
- Accept HOST relay types and MMTPHostInfo routinginfo.
ServerConfig.py:
- Remove obsolete __DEBUG_GC option.
ServerMain:
- Change same-server detection mechanism to only look at key ID.
ServerQueue.py:
- Add assert to catch weird bug case.
Index: DNSFarm.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/DNSFarm.py,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -d -r1.3 -r1.4
--- DNSFarm.py 7 Nov 2003 08:11:36 -0000 1.3
+++ DNSFarm.py 10 Nov 2003 04:12:20 -0000 1.4
@@ -1,86 +1,137 @@
# Copyright 2003 Nick Mathewson. See LICENSE for licensing information.
# $Id$
-"""mixminion.server.DNSFarm DOCDOC"""
+"""mixminion.server.DNSFarm: code to implement asynchronous DNS resolves with
+ background threads and cachhe the results.
+ """
import socket
import threading
import time
import sys
from mixminion.Common import LOG, TimeoutQueue, QueueEmpty
-from mixminion.NetUtils import getIP
+from mixminion.NetUtils import getIP, nameIsStaticIP
__all__ = [ 'DNSCache' ]
class _Pending:
+ """Class to represent resolves that we're waiting for an answer on."""
def __cmp__(self,o):
return cmp(type(self), type(o))
-PENDING = _Pending
+PENDING = _Pending()
+# We never shutdown threads if doing so would leave fewer than MIN_THREADS.
MIN_THREADS = 2
+# We never shutdown threads if doing so would leave fewer than MIN_FREE_THREADS
+# idle threads.
MIN_FREE_THREADS = 1
+# We never start a new thread if doing so would make more than MAX_THREADS.
MAX_THREADS = 8
+# Subject to MIN_THREADS and MIN_FREE_THREADS, we shutdown threads when
+# they're idele for more than MAX_THREAD_IDLE seconds.
MAX_THREAD_IDLE = 5*60
-MAX_ENTRY_TTL = 15*60
-
+# We clear entries from the DNS cache when they're more than MAX_ENTRY_TTL
+# seconds old.
+MAX_ENTRY_TTL = 30*60
class DNSCache:
- """DOCDOC"""
+ """Class to cache answers to DNS requests and manager DNS threads."""
+ ## Fields:
+ # _isShutdown: boolean: are the threads shutting down? (While the
+ # threads are shutting down, we don't answer any requests.)
+ # cache: map from name to PENDING or getIP result.
+ # callbacks: map from name to list of callback functions. (See lookup
+ # for definition of callback.)
+ # lock: Lock to control access to this class's shared state.
+ # nBusyThreads: Number of threads that are currently handling requests.
+ # nLiveThreads: Number of threads that are currently running.
+ # queue: Instance of TimeoutQueue that holds either names to resolve,
+ # or instances of None to shutdown threads.
+ # threads: List of DNSThreads, some of which may be dead.
def __init__(self):
- self.cache = {} # name -> getIP return / PENDING
+ """Create a new DNSCache"""
+ self.cache = {}
self.callbacks = {}
self.lock = threading.RLock()
self.queue = TimeoutQueue()
self.threads = []
self.nLiveThreads = 0
self.nBusyThreads = 0
+ self._isShutdown = 0
self.cleanCache()
def getNonblocking(self, name):
+ """Return the cached result for the lookup of name. If we're
+ waiting for an answer, return PENDING. If there is no cached
+ result, return None.
+ """
try:
self.lock.acquire()
return self.cache.get(name)
finally:
self.lock.release()
def lookup(self,name,cb):
+ """Look up the name 'name', and pass the result to the callback
+ function 'cb' when we're done. The result will be of the
+ same form as the return value of NetUtils.getIP: either
+ (Family, Address, Time) or ('NOENT', Reason, Time).
+
+ Note: The callback may be invoked from a different thread. Either
+ this thread or a DNS thread will block until the callback finishes,
+ so it shouldn't be especially time-consuming.
+ """
+ # Check for a static IP first; no need to resolve that.
+ v = nameIsStaticIP(name)
+ if v is not None:
+ cb(name,v)
+ return
try:
self.lock.acquire()
v = self.cache.get(name)
+ # If we don't have a cached answer, add cb to self.callbacks
if v is None or v is PENDING:
self.callbacks.setdefault(name, []).append(cb)
- #XXXX006 We should check for literal addresses before we queue
+ # If we aren't looking up the answer, start looking it up.
if v is None:
self._beginLookup(name)
finally:
self.lock.release()
+ # If we _did_ have an answer, invoke the callback now.
if v is not None and v is not PENDING:
cb(name,v)
+
def shutdown(self, wait=0):
+ """Tell all the DNS threads to shut down. If 'wait' is true,
+ don't wait until all the theads have completed."""
try:
self.lock.acquire()
+ self._isShutdown = 1
self.queue.clear()
for _ in xrange(self.nLiveThreads*2):
self.queue.put(None)
- if wait:
- for thr in self.threads:
- thr.join()
finally:
self.lock.release()
- def cleanCache(self):
+
+ if wait:
+ for thr in self.threads:
+ thr.join()
+
+ def cleanCache(self,now=None):
+ """Remove all expired entries from the cache."""
+ if now is None:
+ now = time.time()
try:
self.lock.acquire()
- # Purge old entries
- now = time.time()
- cache = self.cache
+ # Purge old entries from the
+ cache = self.cache
for name in cache.keys():
v = cache[name]
if v is PENDING: continue
if now-v[2] > MAX_ENTRY_TTL:
del cache[name]
- # Remove dead threads from self.threads
- self.threads = [ thr for thr in self.threads
- if thr.isLive() ]
+ # Remove dead threads from self.threads.
+ self.threads = [ thr for thr in self.threads if thr.isAlive() ]
# Make sure we have enough threads.
if len(self.threads) < MIN_THREADS:
@@ -89,18 +140,35 @@
self.threads[-1].start()
finally:
self.lock.release()
+
def _beginLookup(self,name):
- # Must hold lock
+ """Helper function: Begin looking up 'name'.
+
+ Caller must hold self.lock
+ """
self.cache[name] = PENDING
+ if self._isShutdown:
+ # If we've shut down the threads, don't queue the request at
+ # all; it'll stay pending indefinitely.
+ return
+ # Queue the request.
self.queue.put(name)
+ # If there aren't enough idle threads, and if we haven't maxed
+ # out the threads, start a new one.
if (self.nLiveThreads < self.nBusyThreads + MIN_FREE_THREADS
and self.nLiveThreads < MAX_THREADS):
- self.threads.append(DNSThread(self))
- self.threads[-1].start()
+ thread = DNSThread(self)
+ thread.start()
+ self.threads.append(thread)
def _lookupDone(self,name,val):
+ """Helper function: invoked when we get the answer 'val' for
+ a lookup of 'name'.
+ """
try:
self.lock.acquire()
+ # Insert the value in the cache.
self.cache[name]=val
+ # Get the callbacks for the name, if any.
cbs = self.callbacks.get(name,[])
try:
del self.callbacks[name]
@@ -108,34 +176,51 @@
pass
finally:
self.lock.release()
+ # Now that we've released the lock, invoke the callbacks.
for cb in cbs:
cb(name,val)
def _adjLiveThreads(self,n):
+ """Helper: adjust the number of live threads by n"""
self.lock.acquire()
self.nLiveThreads += n
self.lock.release()
def _adjBusyThreads(self,n):
+ """Helper: adjust the number of busy threads by n"""
self.lock.acquire()
self.nBusyThreads += n
self.lock.release()
class DNSThread(threading.Thread):
+ """Helper class: used by DNSCache to implement name resolution."""
+ ## Fields:
+ # dnscache: The DNSCache object that should receive our answers
def __init__(self, dnscache):
+ """Create a new DNSThread"""
threading.Thread.__init__(self)
- self.dnscache = dnscache
- self.setDaemon(1)
+ self.dnscache = dnscache
+ self.setDaemon(1) # When the process exits, don't wait for this thread.
def run(self):
- self.dnscache._adjLiveThreads(1)
+ """Thread body: pull questions from the DNS thread queue and
+ answer them."""
+ queue = self.dnscache.queue
+ _lookupDone = self.dnscache._lookupDone
+ _adjBusyThreads = self.dnscache._adjBusyThreads
+ _adjLiveThreads = self.dnscache._adjLiveThreads
try:
+ _adjLiveThreads(1)
try:
while 1:
- hostname = self.dnscache.queue.get(timeout=MAX_THREAD_IDLE)
+ # Get a question from the queue, but don't wait more than
+ # MAX_THREAD_IDLE seconds
+ hostname = queue.get(timeout=MAX_THREAD_IDLE)
+ # If the question is None, shutdown.
if hostname is None:
return
- self.dnscache._adjBusyThreads(1)
+ # Else, resolve the IP and send the answer to the dnscache
+ _adjBusyThreads(1)
result = getIP(hostname)
- self.dnscache._lookupDone(hostname, result)
- self.dnscache._adjBusyThreads(-1)
+ _lookupDone(hostname, result)
+ _adjBusyThreads(-1)
except QueueEmpty:
LOG.debug("DNS thread shutting down: idle for %s seconds.",
MAX_THREAD_IDLE)
@@ -143,6 +228,5 @@
LOG.error_exc(sys.exc_info(),
"Exception in DNS thread; shutting down.")
finally:
- self.dnscache.adjLiveThreads(-1)
-
+ _adjLiveThreads(-1)
Index: MMTPServer.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/MMTPServer.py,v
retrieving revision 1.55
retrieving revision 1.56
diff -u -d -r1.55 -r1.56
--- MMTPServer.py 7 Nov 2003 08:11:36 -0000 1.55
+++ MMTPServer.py 10 Nov 2003 04:12:20 -0000 1.56
@@ -32,7 +32,7 @@
from mixminion.Crypto import sha1, getCommonPRNG
from mixminion.Packet import MESSAGE_LEN, DIGEST_LEN, IPV4Info, MMTPHostInfo
from mixminion.MMTPClient import PeerCertificateCache
-from mixminion.NetUtils import IN_PROGRESS_ERRNOS, getProtocolSupport
+from mixminion.NetUtils import IN_PROGRESS_ERRNOS, getProtocolSupport, AF_INET, AF_INET6
import mixminion.server.EventStats as EventStats
from mixminion.Filestore import CorruptedFile
@@ -763,6 +763,9 @@
PacketHandler.RelayPacket objects."""
def __init__(self, pending):
DeliverableMessage.__init__(self)
+ assert hasattr(pending, 'succeeded')
+ assert hasattr(pending, 'failed')
+ assert hasattr(pending, 'getMessage')
self.pending = pending
def succeeded(self):
self.pending.succeeded()
@@ -823,8 +826,11 @@
if certCache is None:
certCache = PeerCertificateCache()
self.certCache = certCache
-
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ if ':' in ip:
+ family = AF_INET6
+ else:
+ family = AF_INET
+ sock = socket.socket(family, socket.SOCK_STREAM)
sock.setblocking(0)
self.keyID = keyID
self.ip = ip
@@ -1087,8 +1093,8 @@
port = config['Incoming/MMTP']['Port']
self.listeners = [] #DOCDOC
- for (supported, addr, family) in [(ip4_supported,IP,socket.AF_INET),
- (ip6_supported,IP6,socket.AF_INET6)]:
+ for (supported, addr, family) in [(ip4_supported,IP,AF_INET),
+ (ip6_supported,IP6,AF_INET6)]:
if not supported or not addr:
continue
listener = ListenConnection(family, addr, port,
@@ -1136,7 +1142,7 @@
def sendMessagesByRouting(self, routing, deliverable):
"""DOCDOC"""
if isinstance(routing, IPV4Info):
- self.sendMessages(socket.AF_INET, routing.ip, routing.port,
+ self.sendMessages(AF_INET, routing.ip, routing.port,
routing.keyinfo, deliverable)
else:
assert isinstance(routing, MMTPHostInfo)
Index: PacketHandler.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/PacketHandler.py,v
retrieving revision 1.29
retrieving revision 1.30
diff -u -d -r1.29 -r1.30
--- PacketHandler.py 13 Oct 2003 17:30:24 -0000 1.29
+++ PacketHandler.py 10 Nov 2003 04:12:20 -0000 1.30
@@ -212,7 +212,8 @@
# If we're not an exit node, make sure that what we recognize our
# routing type.
- if rt not in (Packet.SWAP_FWD_IPV4_TYPE, Packet.FWD_IPV4_TYPE):
+ if rt not in (Packet.SWAP_FWD_IPV4_TYPE, Packet.FWD_IPV4_TYPE,
+ Packet.SWAP_FWD_HOST_TYPE, Packet.FWD_HOST_TYPE):
raise ContentError("Unrecognized Mixminion routing type")
# Decrypt header 2.
@@ -232,7 +233,7 @@
header1, header2 = header2, header1
# Build the address object for the next hop
- address = Packet.parseIPV4Info(subh.routinginfo)
+ address = Packet.parseRelayInfoByType(rt, subh.routinginfo)
# Construct the message for the next hop.
pkt = Packet.Packet(header1, header2, payload).pack()
@@ -246,9 +247,9 @@
# address -- an instance of IPV4Info
# msg -- a 32K packet.
def __init__(self, address, msg):
- """Create a new packet, given an instance of IPV4Info and a 32K
- packet."""
- assert isinstance(address, Packet.IPV4Info)
+ """Create a new packet, given an instance of IPV4Info or
+ MMTPHostInfo and a 32K packet."""
+ assert isinstance(address, Packet.IPV4Info) or isinstance(address, Packet.MMTPHostInfo)
assert len(msg) == 1<<15
self.address = address
self.msg = msg
@@ -258,8 +259,8 @@
return 0
def getAddress(self):
- """Return an instance of IPV4Info indicating the address where this
- packet is to be delivered."""
+ """Return an instance of IPV4Info or MMTPHostInfo indicating
+ the address where this packet is to be delivered."""
return self.address
def getPacket(self):
Index: ServerConfig.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerConfig.py,v
retrieving revision 1.39
retrieving revision 1.40
diff -u -d -r1.39 -r1.40
--- ServerConfig.py 7 Nov 2003 07:03:28 -0000 1.39
+++ ServerConfig.py 10 Nov 2003 04:12:20 -0000 1.40
@@ -289,8 +289,6 @@
'MixPoolRate' : ('ALLOW', "fraction", "60%"),
'MixPoolMinSize' : ('ALLOW', "int", "5"),
'Timeout' : ('ALLOW', "interval", "5 min"),
- #XXXX006 remove this.
- '__DEBUG_GC' : ('ALLOW', "boolean", "no"),
},
'DirectoryServers' : { # '__SECTION__' : ('REQUIRE', None, None),
'ServerURL' : ('ALLOW*', None, None),
Index: ServerKeys.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerKeys.py,v
retrieving revision 1.56
retrieving revision 1.57
diff -u -d -r1.56 -r1.57
--- ServerKeys.py 7 Nov 2003 08:11:36 -0000 1.56
+++ ServerKeys.py 10 Nov 2003 04:12:20 -0000 1.57
@@ -1196,6 +1196,7 @@
LOG.warn("Hostname %r resolves to reserved address %s",
name,addr)
except socket.error, e:
+ # XXXX006 Turn into a warning?
raise UIError("Cannot resolve hostname %r: %s"%(name,e))
_KNOWN_LOCAL_HOSTNAMES[name] = 1
Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.99
retrieving revision 1.100
diff -u -d -r1.99 -r1.100
--- ServerMain.py 7 Nov 2003 08:11:36 -0000 1.99
+++ ServerMain.py 10 Nov 2003 04:12:20 -0000 1.100
@@ -300,7 +300,7 @@
mixminion.server.ServerQueue.DeliveryQueue.__init__(self, location)
self.server = None
self.incomingQueue = None
- self.addr = (ip,port,keyid) #XXXX006 need to detect same host.
+ self.keyID = keyid
def configure(self, config):
"""Set up this queue according to a ServerConfig object."""
@@ -327,9 +327,7 @@
continue
msgs.setdefault(addr, []).append(pending)
for routing, messages in msgs.items():
- if self.addr[:2] == (routing.ip, routing.port): #XXX006 detect host
- if self.addr[2] != routing.keyinfo:
- LOG.warn("Delivering messages to myself with bad KeyID")
+ if self.keyID == routing.keyinfo:
for pending in messages:
LOG.trace("Delivering message OUT:%s to myself.",
pending.getHandle())
@@ -1056,12 +1054,6 @@
LOG.fatal_exc(info,"Exception while configuring server")
LOG.fatal("Shutting down because of exception: %s", info[0])
sys.exit(1)
-
- # Undocumented feature to cajole python into dumping gc info.
- if config['Server']['__DEBUG_GC']:
- import gc
- gc.set_debug(gc.DEBUG_STATS|gc.DEBUG_COLLECTABLE|gc.DEBUG_UNCOLLECTABLE
- |gc.DEBUG_INSTANCES|gc.DEBUG_OBJECTS)
if daemonMode:
LOG.info("Starting server in the background")
Index: ServerQueue.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerQueue.py,v
retrieving revision 1.34
retrieving revision 1.35
diff -u -d -r1.34 -r1.35
--- ServerQueue.py 28 Sep 2003 04:12:29 -0000 1.34
+++ ServerQueue.py 10 Nov 2003 04:12:21 -0000 1.35
@@ -176,6 +176,7 @@
def getMessage(self):
"""Return the underlying object stored in the delivery queue, loading
it from disk if necessary. May raise CorruptedFile."""
+ assert self.handle is not None
if self.message is None:
self.message = self.queue.store.getObject(self.handle)
return self.message