[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] Resolve all DOCDOCs and most XXXX006s.
Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv16632/lib/mixminion/server
Modified Files:
MMTPServer.py Modules.py PacketHandler.py ServerConfig.py
ServerKeys.py ServerQueue.py
Log Message:
Resolve all DOCDOCs and most XXXX006s.
Additionally, tweak the list-servers interface a bit.
Index: MMTPServer.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/MMTPServer.py,v
retrieving revision 1.57
retrieving revision 1.58
diff -u -d -r1.57 -r1.58
--- MMTPServer.py 19 Nov 2003 09:48:10 -0000 1.57
+++ MMTPServer.py 24 Nov 2003 19:59:04 -0000 1.58
@@ -267,7 +267,7 @@
tls -- An underlying TLS connection.
serverMode -- If true, we start with a server-side negotatiation.
otherwise, we start with a client-side negotatiation.
- DOCDOC address
+ address -- A human-readable address for this server.
"""
self.__sock = sock
self.__con = tls
@@ -381,7 +381,6 @@
else:
LOG.trace("Shutdown returned zero -- entering read mode")
self.__awaitingShutdown = 1
- #DODOC is this right?
if 1:
self.finished = self.__readTooMuch
self.expectRead(128)
@@ -801,7 +800,8 @@
PROTOCOL_VERSIONS = [ '0.3' ]
def __init__(self, context, ip, port, keyID, packetList,
- finishedCallback=None, certCache=None):
+ finishedCallback=None, certCache=None,
+ address=None):
"""Create a connection to send packets to an MMTP server.
Raises socket.error if the connection fails.
@@ -817,6 +817,7 @@
connection is closed.
certCache -- an instance of PeerCertificateCache to use for
checking server certificates.
+ address -- a human-readable description of the destination server.
"""
# Generate junk before connecting to avoid timing attacks
self.junk = []
@@ -847,7 +848,9 @@
tls = context.sock(sock)
- SimpleTLSConnection.__init__(self, sock, tls, 0, "%s:%s"%(ip,port))
+ if address is None:
+ address = "%s:%s"%(ip,port)
+ SimpleTLSConnection.__init__(self, sock, tls, 0, address)
self.finished = self.__setupFinished
self.finishedCallback = finishedCallback
self.protocol = None
@@ -1066,9 +1069,13 @@
# clientConByAddr: A map from 3-tuples returned by MMTPClientConnection.
# getAddr, to MMTPClientConnection objects.
# certificateCache: A PeerCertificateCache object.
- # listener: A ListenConnection object.
+ # listeners: A list of ListenConnection objects.
# _timeout: The number of seconds of inactivity to allow on a connection
# before formerly shutting it down.
+ # dnsCache: An instance of mixminion.server.DNSFarm.DNSCache.
+ # msgQueue: An instance of MessageQueue to receive notification from DNS
+ # DNS threads. See _queueSendablePackets for more information.
+
def __init__(self, config, servercontext):
AsyncServer.__init__(self)
@@ -1094,7 +1101,7 @@
if port is None:
port = config['Incoming/MMTP']['Port']
- self.listeners = [] #DOCDOC
+ self.listeners = []
for (supported, addr, family) in [(ip4_supported,IP,AF_INET),
(ip6_supported,IP6,AF_INET6)]:
if not supported or not addr:
@@ -1105,14 +1112,16 @@
self.listeners.append(listener)
listener.register(self)
- #self.config = config
self._timeout = config['Server']['Timeout'].getSeconds()
self.clientConByAddr = {}
self.certificateCache = PeerCertificateCache()
- self.dnsCache = None #DOCDOC
- self.msgQueue = MessageQueue() #DOCDOC
+ self.dnsCache = None
+ self.msgQueue = MessageQueue()
def connectDNSCache(self, dnsCache):
+ """Use the DNSCache object 'DNSCache' to resolve DNS queries for
+ this server.
+ """
self.dnsCache = dnsCache
def setServerContext(self, servercontext):
@@ -1138,48 +1147,78 @@
return con
def stopListening(self):
+ """Shut down all the listeners for this server. Does not close open
+ connections.
+ """
for listener in self.listeners:
listener.shutdown()
+ self.listeners = []
def sendPacketsByRouting(self, routing, deliverable):
- """DOCDOC"""
+ """Given a RoutingInfo object (either an IPV4Info or an MMTPHostInfo),
+ and a list of DeliverableMessage objects, start sending all the
+ corresponding packets to the corresponding sever, doing a DNS
+ lookup first if necessary.
+ """
+ serverName = displayServer(routing)
if isinstance(routing, IPV4Info):
- self.sendPackets(AF_INET, routing.ip, routing.port,
- routing.keyinfo, deliverable)
+ self._sendPackets(AF_INET, routing.ip, routing.port,
+ routing.keyinfo, deliverable, serverName)
else:
assert isinstance(routing, MMTPHostInfo)
+ # This function is a callback for when the DNS lookup is over.
def lookupDone(name, (family, addr, when),
- self=self, routing=routing, deliverable=deliverable):
+ self=self, routing=routing, deliverable=deliverable,
+ serverName=serverName):
if addr == "NOENT":
+ # The lookup failed, so tell all of the message objects.
for m in deliverable:
try:
m.failed(1)
except AttributeError:
pass
else:
- self.queueSendablePackets(family, addr,
+ # We've got an IP address: tell the MMTPServer to start
+ # sending the deliverable packets to that address.
+ self._queueSendablePackets(family, addr,
routing.port, routing.keyinfo,
- deliverable)
+ deliverable, serverName)
+ # Start looking up the hostname for the destination, and call
+ # 'lookupDone' when we're done. This is a little fiddly, since
+ # 'lookupDone' might get invoked from this thread (if the result
+ # is in the cache) or from a DNS thread.
self.dnsCache.lookup(routing.hostname, lookupDone)
- def queueSendablePackets(self, family, addr, port, keyID, deliverable):
- """DOCDOC"""
+ def _queueSendablePackets(self, family, addr, port, keyID, deliverable,
+ serverName):
+ """Helper function: insert the DNS lookup results and list of
+ deliverable packets onto self.msgQueue. Subsequent invocations
+ of _sendQueuedPackets will begin sending those packets to their
+ destination.
+
+ It is safe to call this function from any thread.
+ """
self.msgQueue.put((family,addr,port,keyID,deliverable))
- def sendQueuedPackets(self):
- """DOCDOC"""
+ def _sendQueuedPackets(self):
+ """Helper function: Find all DNS lookup results and packets in
+ self.msgQueue, and begin sending packets to the resulting servers.
+
+ This function should only be called from the main thread.
+ """
while 1:
try:
- family,addr,port,keyID,deliverable=self.msgQueue.get(block=0)
+ family,addr,port,keyID,deliverable,serverName = \
+ self.msgQueue.get(block=0)
except QueueEmpty:
return
- self.sendPackets(family,addr,port,keyID,deliverable)
+ self._sendPackets(family,addr,port,keyID,deliverable)
- def sendPackets(self, family, ip, port, keyID, deliverable):
+ def _sendPackets(self, family, ip, port, keyID, deliverable, serverName):
"""Begin sending a set of packets to a given server.
- deliverable is a list of objects obeying the DeliverableMessage
+ 'deliverable' is a list of objects obeying the DeliverableMessage
interface.
"""
try:
@@ -1203,10 +1242,11 @@
con = MMTPClientConnection(self.clientContext,
ip, port, keyID, deliverable,
finishedCallback=finished,
- certCache=self.certificateCache)
+ certCache=self.certificateCache,
+ address=serverName)
except socket.error, e:
- LOG.error("Unexpected socket error connecting to %s:%s: %s",
- ip, port, e)
+ LOG.error("Unexpected socket error connecting to %s: %s",
+ serverName, e)
EventStats.log.failedConnect() #FFFF addr
for m in deliverable:
try:
@@ -1233,6 +1273,8 @@
pass
def process(self, timeout):
- """DOCDOC overrides"""
- self.sendQueuedPackets()
+ """overrides asyncserver.process to call sendQueuedPackets before
+ checking fd status.
+ """
+ self._sendQueuedPackets()
AsyncServer.process(self, timeout)
Index: Modules.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Modules.py,v
retrieving revision 1.61
retrieving revision 1.62
diff -u -d -r1.61 -r1.62
--- Modules.py 20 Nov 2003 08:50:19 -0000 1.61
+++ Modules.py 24 Nov 2003 19:59:04 -0000 1.62
@@ -911,7 +911,10 @@
#----------------------------------------------------------------------
def _cleanMaxSize(sz,modname):
- """DOCDOC"""
+ """Given a 'Maximum-Size' configuration value, ensure that it's at least
+ 32KB, and round it up to the next highest 1KB increment. Use 'modname'
+ as the name of the module in warning messages.
+ """
if sz < 32*1024:
LOG.warn("Ignoring low maximum message size for %s",modname)
sz = 32*1024
Index: PacketHandler.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/PacketHandler.py,v
retrieving revision 1.31
retrieving revision 1.32
diff -u -d -r1.31 -r1.32
--- PacketHandler.py 19 Nov 2003 09:48:10 -0000 1.31
+++ PacketHandler.py 24 Nov 2003 19:59:05 -0000 1.32
@@ -294,7 +294,7 @@
"""Construct a new DeliveryPacket."""
assert 0 <= routingType <= 0xFFFF
assert len(applicationKey) == 16
- #assert len(tag) == 20 #XXXX006 make tag system sane.
+ #assert len(tag) == 20 #XXXX007 make tag system sane.
assert len(tag) == 20 or routingType == Packet.FRAGMENT_TYPE
assert len(payload) == 28*1024
self.exitType = routingType
@@ -314,7 +314,8 @@
def __setstate__(self, state):
if type(state) == types.DictType:
- #XXXX006 remove this case.
+ #XXXX007 remove this case. (Not used since 0.0.5alpha)
+ LOG.warn("Found ancient packet format.")
self.__dict__.update(state)
if not hasattr(self, 'isfrag'):
self.isfrag = 0
Index: ServerConfig.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerConfig.py,v
retrieving revision 1.40
retrieving revision 1.41
diff -u -d -r1.40 -r1.41
--- ServerConfig.py 10 Nov 2003 04:12:20 -0000 1.40
+++ ServerConfig.py 24 Nov 2003 19:59:05 -0000 1.41
@@ -298,7 +298,7 @@
"10 minutes",) },
# FFFF Generic multi-port listen/publish options.
'Incoming/MMTP' : { 'Enabled' : ('REQUIRE', "boolean", "no"),
- #XXXX007 deprecate or remove IP.
+ #XXXX007/8 deprecate or remove IP.
'IP' : ('ALLOW', "IP", "0.0.0.0"),
'Hostname' : ('ALLOW', "host", None),
'Port' : ('ALLOW', "int", "48099"),
Index: ServerKeys.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerKeys.py,v
retrieving revision 1.57
retrieving revision 1.58
diff -u -d -r1.57 -r1.58
--- ServerKeys.py 10 Nov 2003 04:12:20 -0000 1.57
+++ ServerKeys.py 24 Nov 2003 19:59:05 -0000 1.58
@@ -623,7 +623,7 @@
os.rmdir(self.keydir)
def checkKeys(self):
- """DOCDOC"""
+ """Check whether all the required keys exist and are private."""
checkPrivateFile(self.packetKeyFile)
checkPrivateFile(self.mmtpKeyFile)
@@ -643,7 +643,7 @@
password)
def clear(self):
- """DOCDOC"""
+ """Stop holding the keys in memory."""
self.packetKey = self.mmtpKey = None
def getCertFileName(self): return self.certFile
@@ -1196,7 +1196,8 @@
LOG.warn("Hostname %r resolves to reserved address %s",
name,addr)
except socket.error, e:
- # XXXX006 Turn into a warning?
+ # ???? Turn this into a warning, if people have a real reason to
+ # ???? use a hostname that they themselves cannot resolve.
raise UIError("Cannot resolve hostname %r: %s"%(name,e))
_KNOWN_LOCAL_HOSTNAMES[name] = 1
Index: ServerQueue.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerQueue.py,v
retrieving revision 1.35
retrieving revision 1.36
diff -u -d -r1.35 -r1.36
--- ServerQueue.py 10 Nov 2003 04:12:21 -0000 1.35
+++ ServerQueue.py 24 Nov 2003 19:59:05 -0000 1.36
@@ -85,8 +85,9 @@
self.lastAttempt = state[2]
self.address = state[3]
elif state[0] == "V0":
- #XXXX006 remove this case.
+ #XXXX007 remove this case.
# 0.0.4 used a format that didn't have an 'address' field.
+ LOG.warn("Encountered an ancient queued message format.")
self.queuedTime = state[1]
self.lastAttempt = state[2]
self.address = None