[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] Deprecate, document, and extend. Implement faster shut...
Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv25437/minion/lib/mixminion/server
Modified Files:
MMTPServer.py ServerKeys.py ServerMain.py ServerQueue.py
Log Message:
Deprecate, document, and extend. Implement faster shutdown.
Index: MMTPServer.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/MMTPServer.py,v
retrieving revision 1.40
retrieving revision 1.41
diff -u -d -r1.40 -r1.41
--- MMTPServer.py 13 Jun 2003 01:03:46 -0000 1.40
+++ MMTPServer.py 26 Jun 2003 03:23:53 -0000 1.41
@@ -700,6 +700,7 @@
NULL_KEYID = "\x00"*20
class DeliverableMessage:
+ """Interface to be impemented by messages deliverable by MMTP """
def __init__(self):
pass
def getContents(self):
@@ -710,6 +711,10 @@
raise NotImplementedError
class DeliverablePacket(DeliverableMessage):
+ """Implementation of DelierableMessage.
+
+ Wraps a ServerQueue.PendingMessage object for a queue holding
+ PacketHandler.RelayPacket objects."""
def __init__(self, pending):
DeliverableMessage.__init__(self)
self.pending = pending
@@ -724,12 +729,13 @@
"""Asynchronious implementation of the sending ("client") side of a
mixminion connection."""
## Fields:
- # ip, port, keyID, messageList, handleList, sendCallback, failCallback,
- # finishedCallback, certCache:
+ # ip, port, keyID, messageList, finishedCallback, certCache:
# As described in the docstring for __init__ below. We remove entries
# from the front of messageList/handleList as we begin sending them.
- # _curMessage, _curHandle: Correspond to the message and handle
- # that we are currently trying to deliver.
+ # _curMessage, _curHandle: Correspond to the message that we are
+ # currently trying to deliver. If _curHandle is None, _curMessage
+ # is a control string. If _curHandle is a DeliverableMessage,
+ # _curMessage is the corresponding 32KB string.
# junk: A list of 32KB padding chunks that we're going to send. We
# pregenerate these to avoid timing attacks. They correspond to
# the 'JUNK' entries in messageList.
@@ -752,8 +758,10 @@
port -- The port to connect to.
keyID -- None, or the expected SHA1 hash of the server's public key
messageList -- a list of message payloads and control strings.
- The control string "JUNK" sends 32KB of padding; the control
- string "RENEGOTIATE" renegotiates the connection key.
+ Message payloads must implement the DeliverableMessage
+ interface above; allowable control strings are either
+ string "JUNK", which sends 32KB of padding; or the control
+ string "RENEGOTIATE" which renegotiates the connection key.
finishedCallback -- None, or a function to be called when this
connection is closed.
certCache -- an instance of PeerCertificateCache to use for
@@ -761,11 +769,9 @@
"""
# Generate junk before connecting to avoid timing attacks
self.junk = []
- #DOCDOC
self.messageList = []
self.active = 1
- #DOCDOC
self.addMessages(messageList)
if certCache is None:
@@ -802,9 +808,10 @@
return self.active
def addMessages(self, messages):
- """Given a list of messages and handles, as given to
+ """Given a list of messages and control strings, as given to
MMTPServer.__init__, cause this connection to deliver that new
- set of messages after it's done with those it's currently sending.
+ set of messages after it's done with those it's currently
+ sending.
"""
assert self.active
for m in messages:
@@ -866,7 +873,7 @@
if not self.messageList:
self.shutdown(0)
return
-
+
self._getNextMessage()
msg = self._curMessage
if msg == 'RENEGOTIATE':
@@ -892,7 +899,8 @@
self.finished = self.__sentMessage
def _getNextMessage(self):
- """DOCDOC"""
+ """Helper function: pull the next _curHandle, _curMessage pair from
+ self.messageList."""
m = self.messageList[0]
del self.messageList[0]
if hasattr(m, 'getContents'):
@@ -1039,7 +1047,11 @@
self.listener.shutdown()
def sendMessages(self, ip, port, keyID, deliverable):
- """Begin sending a set of messages to a given server."""
+ """Begin sending a set of messages to a given server.
+
+ deliverable is a list of objects obeying the DeliverableMessage
+ lsit.
+ """
try:
# Is there an existing connection open to the right server?
Index: ServerKeys.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerKeys.py,v
retrieving revision 1.44
retrieving revision 1.45
diff -u -d -r1.44 -r1.45
--- ServerKeys.py 10 Jun 2003 10:40:21 -0000 1.44
+++ ServerKeys.py 26 Jun 2003 03:23:53 -0000 1.45
@@ -352,8 +352,9 @@
self.checkKeys()
def getDeadKeys(self,now=None):
- """DOCDOC
- doesn't checkKeys.
+ """Helper function: return a list of (informative-message, keyset
+ object) for each expired keyset in the keystore. Does not rescan
+ the keystore or remove dead keys.
"""
if now is None:
now = time.time()
@@ -580,6 +581,7 @@
files = [self.packetKeyFile,
self.mmtpKeyFile,
self.certFile,
+ self.certFile+"_tmp",
self.descFile,
self.publishedFile,
self.hashlogFile ]
Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.80
retrieving revision 1.81
diff -u -d -r1.80 -r1.81
--- ServerMain.py 13 Jun 2003 01:03:46 -0000 1.80
+++ ServerMain.py 26 Jun 2003 03:23:53 -0000 1.81
@@ -729,6 +729,25 @@
finally:
if lock: self.keyring.unlock()
+ def scheduleRecurringComplexBackground(self, first, name, cb):
+ """DOCDOC"""
+ #????005 Is this worth it?
+ backgroundJob = [None]
+ scheduler = [None]
+
+ def _bg(cb=cb, self=self, scheduler=scheduler, name=name):
+ next = cb()
+ if next is not None:
+ self.scheduleOnce(next, name, scheduler[0])
+
+ def _scheduler(backgroundJob=backgroundJob, self=self):
+ self.processingThread.addJob(backgroundJob[0])
+
+ backgroundJob[0] = _bg
+ scheduler[0] = _scheduler
+
+ self.scheduleOnce(first,name,_scheduler)
+
def generateKeys(self):
"""Callback used to schedule key-generation"""
@@ -791,8 +810,8 @@
nextMix = self.mixPool.getNextMixTime(now)
LOG.debug("First mix at %s", formatTime(nextMix,1))
- self.scheduleOnce(self.mixPool.getNextMixTime(now),
- "MIX", self.doMix)
+ self.scheduleRecurringComplex(self.mixPool.getNextMixTime(now),
+ "MIX", self.doMix)
LOG.info("Entering main loop: Mixminion %s", mixminion.__version__)
@@ -868,11 +887,10 @@
# Send exit messages
self.moduleManager.sendReadyMessages()
- #XXXX005 use new schedulerecurringcomplex interface
# Choose next mix interval
nextMix = self.mixPool.getNextMixTime(now)
- self.scheduleOnce(nextMix, "MIX", self.doMix)
LOG.trace("Next mix at %s", formatTime(nextMix,1))
+ return nextMix
def cleanQueues(self):
"""Remove all deleted messages from queues"""
Index: ServerQueue.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerQueue.py,v
retrieving revision 1.23
retrieving revision 1.24
diff -u -d -r1.23 -r1.24
--- ServerQueue.py 13 Jun 2003 01:03:46 -0000 1.23
+++ ServerQueue.py 26 Jun 2003 03:23:53 -0000 1.24
@@ -294,7 +294,9 @@
# inserted into the queue.
# lastAttempt: The most recent time at which we attempted to
# deliver the message. (None means 'never').
- # address: Pickleable object holding address information. DOCDOC
+ # address: Pickleable object holding address information. Delivery
+ # code uses this field to group messages by address before loading
+ # them all from disk.
def __init__(self, queuedTime=None, lastAttempt=None, address=None):
"""Create a new _DeliveryState for a message received at
queuedTime (default now), whose last delivery attempt was
@@ -303,7 +305,7 @@
queuedTime = time.time()
self.queuedTime = queuedTime
self.lastAttempt = lastAttempt
- self.address = None
+ self.address = address
def __getstate__(self):
# For pickling. All future versions of deliverystate will pickle
@@ -318,6 +320,7 @@
self.address = state[3]
elif state[0] == "V0":
#XXXX006 remove this case.
+ # 0.0.4 used a format that didn't have an 'address' field.
self.queuedTime = state[1]
self.lastAttempt = state[2]
self.address = None
@@ -362,7 +365,16 @@
self.lastAttempt = when
class PendingMessage:
- """DOCDOC"""
+ """PendingMessage represents a message in a DeliveryQueue, for delivery
+ to a specific address. See DeliveryQueue._deliverMessages for more
+ information about the interface."""
+ ##
+ # queue: the deliveryqueue holding this message
+ # handle: the handle for this message in the queue
+ # address: The address passed to queueDeliveryMessage for this message,
+ # or None
+ # message: The object queued as this message, or None if the object
+ # has not yet been loaded.
def __init__(self, handle, queue, address, message=None):
self.handle = handle
self.queue = queue
@@ -376,19 +388,24 @@
return self.handle
def succeeded(self):
+ """Mark this message as having been successfully deleted, removing
+ it from the queue."""
self.queue.deliverySucceeded(self.handle)
self.queue = self.message = None
def failed(self, retriable=0, now=None):
+ """Mark this message as has having failed delivery, either rescheduling
+ it or removing it from the queue."""
self.queue.deliveryFailed(self.handle, retriable, now=now)
self.queue = self.message = None
def getMessage(self):
+ """Return the underlying object stored in the delivery queue, loading
+ it from disk if necessary."""
if self.message is None:
self.message = self.queue.getObject(self.handle)
return self.message
-
class DeliveryQueue(Queue):
"""A DeliveryQueue implements a queue that greedily sends messages to
outgoing streams that occasionally fail. All underlying messages
@@ -644,14 +661,13 @@
self._repOk()
def _deliverMessages(self, msgList):
- """Abstract method; Invoked with a list of (handle, message)
- tuples every time we have a batch of messages to send.
-
- For every handle in the list, delierySucceeded or deliveryFailed
- should eventually be called, or the message will sit in the queue
- indefinitely, without being retried."""
+ """Abstract method; Invoked with a list of PendingMessage objects
+ every time we have a batch of messages to send.
- #DOCDOC
+ For every PendingMessage object on the list, the object's
+ .succeeded() or .failed() method should eventually be called, or
+ the message will sit in the queue indefinitely, without being
+ retried."""
# We could implement this as a single _deliverMessage(h,addr)
# method, but that wouldn't allow implementations to batch