[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] All known tasks for 0.0.5 are done. Only testing is le...
Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv16597/lib/mixminion/server
Modified Files:
Modules.py PacketHandler.py ServerConfig.py ServerKeys.py
ServerMain.py ServerQueue.py
Log Message:
All known tasks for 0.0.5 are done. Only testing is left before 0.0.5rc1.
BuildMessage, ClientMain:
- Debug SURBLog
- Return SURBLog backward compatibility.
Packet:
- Debug ServerSideFragmentedMessage.pack
test:
- Add tests for random-length paths.
- Tests for clearable queues.
- Tests for reassembly module.
- Tests for maximum message sizes.
- Tests for paths with filenames containing commas and colons.
- Tests for SURB logs
Module:
- Activate modules in a fixed order.
ServerKeys:
- Don't crash when the directory server is down.
ServerMain:
- Even in TRACE mode, don't link incoming messages to pool messages.
*:
- Add all waiting documentation, resolve all XXXX005's.
Index: Modules.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Modules.py,v
retrieving revision 1.53
retrieving revision 1.54
diff -u -d -r1.53 -r1.54
--- Modules.py 28 Aug 2003 01:40:08 -0000 1.53
+++ Modules.py 31 Aug 2003 19:29:29 -0000 1.54
@@ -172,6 +172,13 @@
# There is no underlying queue to worry about here; do nothing.
pass
+ def getPriority(self):
+ """Return the order at which this queue should be flushed. Queues
+ are flushed from lowest-valued priority to highest. Most modules
+ should use priority 0. Modules which insert messages into other
+ modules should use priority <0."""
+ return 0
+
class SimpleModuleDeliveryQueue(mixminion.server.ServerQueue.DeliveryQueue):
"""Helper class used as a default delivery queue for modules that
don't care about batching messages to like addresses."""
@@ -181,7 +188,9 @@
mixminion.server.ServerQueue.DeliveryQueue.__init__(self, directory,
retrySchedule)
self.module = module
-
+
+ def getPriority(self):
+ return 0
def _deliverMessages(self, msgList):
for handle in msgList:
@@ -457,7 +466,10 @@
"""Actual implementation of message delivery. Tells every module's
queue to send pending messages. This is called directly if
we aren't threading, and from the delivery thread if we are."""
- for name, queue in self.queues.items():
+ queuelist = [ (queue.getPriority(), queue)
+ for queue in self.queues.values() ]
+ queuelist.sort()
+ for _, queue in queuelist:
queue.sendReadyMessages()
def getServerInfoBlocks(self):
@@ -502,7 +514,21 @@
#----------------------------------------------------------------------
class FragmentModule(DeliveryModule):
- """DOCDOC"""
+ """Module used to handle server-side reassembly of fragmented payloads.
+
+ When a message is fragmented for reassembly by the exit node, it
+ is sent in packets of exit type FRAGMENT. The actual exit type and
+ delivery address are encoded at the start of the reassembled message.
+ """
+ ##
+ # _queue: An instance of FragmentDeliveryQueue, or None
+ # manager: A pointer back to the module manager. Used to insert
+ # reassembled messages into other modules' queues.
+ # maxMessageSize: The largest allowable message size. (In bytes,
+ # after defragmentation, before uncompression.)
+ # maxInterval: The longest we hold onto a fragment of a message before
+ # we give up on receiving the whole message. (In seconds.)
+ # maxFragments: The largest allowable message size, in fragments.
def __init__(self):
DeliveryModule.__init__(self)
self._queue = None
@@ -531,7 +557,6 @@
self.maxFragments = fp.nChunks * fp.n
self.manager = manager
manager.enableModule(self)
-
def getServerInfoBlock(self):
return """[Delivery/Fragmented]
Version: 0.1
@@ -553,12 +578,25 @@
self._queue = None
class FragmentDeliveryQueue:
+ """Delivery queue for FragmentModule.
+
+ Wraps mixminion.fragments.FragmentPool."""
+ ##Fields:
+ # module: the FragmentModule.
+ # directory: location used for the FragmentPool
+ # pool: instance of FragmentPool
def __init__(self, module, directory, manager):
self.module = module
self.directory = directory
self.manager = manager
self.pool = mixminion.Fragments.FragmentPool(self.directory)
+ def getPriority(self):
+ # We want to make sure that fragmented messages get reassembled
+ # before any other modules deliver their messages. This way,
+ # reassembled messages get delivered as soon as they're ready.
+ return -1
+
def queueDeliveryMessage(self, packet, retry=0, lastAttempt=0):
if packet.isError():
LOG.warn("Dropping FRAGMENT packet with decoding error: %s",
@@ -566,7 +604,6 @@
return
elif not packet.isFragment():
LOG.warn("Dropping FRAGMENT packet with non-fragment payload.")
- print packet.type, packet.isfrag
return
elif packet.getAddress():
LOG.warn("Dropping FRAGMENT packet with spurious addressing info.")
@@ -595,19 +632,25 @@
LOG.warn("Dropping over-long fragmented message")
self.pool.markMessageCompleted(msgid, rejected=1)
continue
-
+
fm = _FragmentedDeliveryMessage(ssfm)
self.manager.queueDecodedMessage(fm)
self.pool.markMessageCompleted(msgid)
cutoff = previousMidnight(time.time()) - self.module.maxInterval
self.pool.expireMessages(cutoff)
-
class _FragmentedDeliveryMessage:
"""Helper class: obeys the interface of mixminion.server.PacketHandler.
DeliveryMessage, but contains a long message reassembled from
fragments."""
+ ##Fields:
+ # m: an instance of ServerSideFragmentedMessage.
+ # exitType, address: the routing type and routing info for this message
+ # contents: None, or the uncompressed contents off the message if it's
+ # been decoded.
+ # headers: None, or a dict of the message's headers.
+ # tp: 'plain' or 'err' or 'long'.
def __init__(self, ssfm):
"""Create a _FragmentedDeliveryMessage object from an instance of
mixminion.Packet.ServerSideFragmentedMessage."""
@@ -665,7 +708,7 @@
mixminion.Packet.parseMessageAndHeaders(c)
self.tp = 'plain'
except CompressedDataTooLong:
- self.contents = self.m.uncompressedContents
+ self.contents = self.m.compressedContents
self.tp = 'long'
self.headers = {}
return
Index: PacketHandler.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/PacketHandler.py,v
retrieving revision 1.24
retrieving revision 1.25
diff -u -d -r1.24 -r1.25
--- PacketHandler.py 28 Aug 2003 01:40:08 -0000 1.24
+++ PacketHandler.py 31 Aug 2003 19:29:29 -0000 1.25
@@ -268,6 +268,8 @@
# isfrag -- Is this packet a fragment of a complete message? If so, the
# type must be 'plain'.
# dPayload -- An instance of mixminion.Packet.Payload for this object.
+ # error -- None, or a string containing an error encountered while trying
+ # to decode the payload.
def __init__(self, routingType, routingInfo, applicationKey,
tag, payload):
"""Construct a new DeliveryPacket."""
@@ -286,7 +288,7 @@
self.headers = None
self.isfrag = 0
self.dPayload = None
- self.error = None #DOCDOC
+ self.error = None
def __setstate__(self, state):
self.__dict__.update(state)
@@ -383,7 +385,7 @@
self.headers = {}
except MixError, e:
self.contents = message
- self.error = str(e) #DOCDOC
+ self.error = str(e)
self.type = 'err'
self.headers = {}
Index: ServerConfig.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerConfig.py,v
retrieving revision 1.36
retrieving revision 1.37
diff -u -d -r1.36 -r1.37
--- ServerConfig.py 13 Jul 2003 03:45:35 -0000 1.36
+++ ServerConfig.py 31 Aug 2003 19:29:29 -0000 1.37
@@ -145,7 +145,8 @@
return reasons
def getConfigurationSummary(self):
- """DOCDOC"""
+ """Return a human-readable description of this server's configuration,
+ for inclusion in the testing section of the server descriptor."""
res = []
for section,entries in [
("Server", ['LogLevel', 'LogStats', 'StatsInterval',
@@ -153,8 +154,14 @@
'MixInterval', 'MixPoolRate', 'MixPoolMinSize',
'Timeout',]),
("Outgoing/MMTP", ['Retry']),
- ("Delivery/SMTP", ['Enabled','Retry']),
- ("Delivery/SMTP-Via-Mixmaster", ['Enabled', 'Retry', 'Server']),
+ ("Delivery/SMTP",
+ ['Enabled', 'Retry', 'SMTPServer', 'ReturnAddress', 'FromTag',
+ 'SubjectLine', 'MaximumSize']),
+ ("Delivery/SMTP-Via-Mixmaster",
+ ['Enabled', 'Retry', 'Server', 'ReturnAddress', 'FromTag',
+ 'SubjectLine', 'MaximumSize']),
+ ("Delivery/Fragmented",
+ ['Enabled', 'MaximumSize','MaximumInterval']),
]:
sec = self[section]
for k in entries:
@@ -281,6 +288,7 @@
'MixPoolRate' : ('ALLOW', _parseFraction, "60%"),
'MixPoolMinSize' : ('ALLOW', C._parseInt, "5"),
'Timeout' : ('ALLOW', C._parseInterval, "5 min"),
+ #XXXX006 remove this.
'__DEBUG_GC' : ('ALLOW', C._parseBoolean, "no"),
},
'DirectoryServers' : { # '__SECTION__' : ('REQUIRE', None, None),
Index: ServerKeys.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerKeys.py,v
retrieving revision 1.49
retrieving revision 1.50
diff -u -d -r1.49 -r1.50
--- ServerKeys.py 21 Aug 2003 21:34:03 -0000 1.49
+++ ServerKeys.py 31 Aug 2003 19:29:29 -0000 1.50
@@ -674,17 +674,22 @@
fname = self.getDescriptorFileName()
descriptor = readFile(fname)
fields = urllib.urlencode({"desc" : descriptor})
+ f = None
try:
try:
f = urllib2.urlopen(url, fields)
info = f.info()
reply = f.read()
+ except IOError, e:
+ LOG.error("Error while publishing server descriptor: %s",e)
+ return 'error'
except:
LOG.error_exc(sys.exc_info(),
"Error publishing server descriptor")
return 'error'
finally:
- f.close()
+ if f is not None:
+ f.close()
if info.get('Content-Type') != 'text/plain':
LOG.error("Bad content type %s from directory"%info.get(
@@ -1143,7 +1148,7 @@
writeFile(filename, certText+identityCertText, 0600)
def getPlatformSummary():
- """XXXX005 move; DOCDOC"""
+ """Return a string describing the current software and platform."""
if hasattr(os, "uname"):
uname = " ".join(os.uname())
else:
@@ -1151,4 +1156,3 @@
return "Mixminion %s; Python %r on %r" % (
mixminion.__version__, sys.version, uname)
-
Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.91
retrieving revision 1.92
diff -u -d -r1.91 -r1.92
--- ServerMain.py 28 Aug 2003 01:40:08 -0000 1.91
+++ ServerMain.py 31 Aug 2003 19:29:29 -0000 1.92
@@ -170,8 +170,8 @@
h2 = self.mixPool.queueObject(res)
self.removeMessage(handle)
- LOG.debug("Processed message IN:%s; inserting into mix pool as MIX:%s",
- handle, h2)
+ LOG.debug("Processed message IN:%s; inserting into mix pool",
+ handle)
except mixminion.Crypto.CryptoError, e:
LOG.warn("Invalid PK or misencrypted header in message IN:%s: %s",
handle, e)
Index: ServerQueue.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerQueue.py,v
retrieving revision 1.32
retrieving revision 1.33
diff -u -d -r1.32 -r1.33
--- ServerQueue.py 25 Aug 2003 21:05:34 -0000 1.32
+++ ServerQueue.py 31 Aug 2003 19:29:29 -0000 1.33
@@ -202,17 +202,18 @@
"""
###
# Fields:
- # retrySchedule -- a list of intervals at which delivery of messages
- # should be reattempted, as described in "setRetrySchedule".
- #
- # XXXX Refactor as many of these fields as possible into _DeliveryState.
- #
- # DOCDOC list of fields is now inaccurate -- qname, store, _lock
-
+ # store -- An ObjectMetadataStore to back this queue. The objects
+ # are instances of whatever deliverable object this queue contains;
+ # the metadata are instances of _DeliveryState.
+ # retrySchedule -- a list of intervals at which delivery of messages
+ # should be reattempted, as described in "setRetrySchedule".
+ # _lock -- a reference to the RLock used to control access to the
+ # store.
def __init__(self, location, retrySchedule=None, now=None, name=None):
"""Create a new DeliveryQueue object that stores its files in
<location>. If retrySchedule is provided, it is interpreted as
- in setRetrySchedule. DOCDOC name"""
+ in setRetrySchedule. Name, if present, is a human-readable
+ name used in log messages."""
self.store = mixminion.Filestore.ObjectMetadataStore(
location,create=1,scrub=1)
self._lock = self.store._lock