[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[minion-cvs] Timeouts, descriptor parsing, refactoring, more tests.
Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv22424/lib/mixminion/server
Modified Files:
ServerConfig.py ServerKeys.py ServerMain.py ServerQueue.py
Log Message:
Timeouts, descriptor parsing, refactoring, more tests.
ClientMain, MMTPClient, Config:
- Support configurable client timeout
Common:
- Make stdout/stderr wrappers not suck
Config, ServerInfo:
- Add support for ServerInfo and directory parsing to ignore unrecognized
keys
Packet, ServerList, ..., ServerKeys:
- Note work for 0.0.3
ServerMain:
- Change ProcessingThread to be callable-based, not packet-based.
test:
- Tests for log streams
- Tests for client timeouts
- Tests for interval list warnings
Index: ServerConfig.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerConfig.py,v
retrieving revision 1.16
retrieving revision 1.17
diff -u -d -r1.16 -r1.17
--- ServerConfig.py 14 Jan 2003 09:20:18 -0000 1.16
+++ ServerConfig.py 17 Jan 2003 06:18:06 -0000 1.17
@@ -238,5 +238,4 @@
'Deny' : ('ALLOW*', C._parseAddressSet_deny, None) },
# FFFF Missing: Queue-Size / Queue config options
# FFFF listen timeout??
- # FFFF Retry options
}
Index: ServerKeys.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerKeys.py,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -d -r1.9 -r1.10
--- ServerKeys.py 13 Jan 2003 06:18:55 -0000 1.9
+++ ServerKeys.py 17 Jan 2003 06:18:06 -0000 1.10
@@ -498,6 +498,10 @@
Valid-Until: %(ValidUntil)s
Packet-Key: %(PacketKey)s
""" % fields
+ # XXXX003 add 'packet-formats'
+ # Packet-Formats: 0.2
+ # XXXX003 add 'software'
+ # Software: Mixminion %(version)s
if contact:
info += "Contact: %s\n"%contact
if comments:
Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.32
retrieving revision 1.33
diff -u -d -r1.32 -r1.33
--- ServerMain.py 13 Jan 2003 06:35:52 -0000 1.32
+++ ServerMain.py 17 Jan 2003 06:18:06 -0000 1.33
@@ -48,15 +48,16 @@
self.packetHandler = packetHandler
self.mixPool = None
self.moduleManager = None
- self._queue = MessageQueue()
- for h in self.getAllMessages():
- assert h is not None
- self._queue.put(h)
- def connectQueues(self, mixPool, manager):
+ def connectQueues(self, mixPool, manager, processingThread):
"""Sets the target mix queue"""
self.mixPool = mixPool
self.moduleManager = manager #XXXX003 refactor.
+ self.processingThread = processingThread
+ for h in self.getAllMessages():
+ assert h is not None
+ self.processingThread.addJob(
+ lambda self=self, h=h: self.__deliverMessage(h))
def queueMessage(self, msg):
"""Add a message for delivery"""
@@ -64,12 +65,10 @@
formatBase64(msg[:8]))
h = mixminion.server.ServerQueue.Queue.queueMessage(self, msg)
assert h is not None
- self._queue.put(h)
-
- def getMessageQueue(self):
- return self._queue
+ self.processingThread.addJob(
+ lambda self=self, h=h: self.__deliverMessage(h))
- def deliverMessage(self, handle):
+ def __deliverMessage(self, handle):
"""Process a single message with a given handle, and insert it into
the Mix pool. This function is called from within the processing
thread."""
@@ -147,7 +146,7 @@
def queueObject(self, obj):
"""Insert an object into the queue."""
- obj.isDelivery() #XXXX remove this implicit typecheck.
+ obj.isDelivery() #XXXX003 remove this implicit typecheck.
self.queue.queueObject(obj)
def count(self):
@@ -172,7 +171,7 @@
for h in handles:
packet = self.queue.getObject(h)
- #XXXX remove the first case
+ #XXXX remove the first case after 0.0.3
if type(packet) == type(()):
LOG.debug(" (skipping message %s in obsolete format)", h)
elif packet.isDelivery():
@@ -216,7 +215,7 @@
"Implementation of abstract method from DeliveryQueue."
# Map from addr -> [ (handle, msg) ... ]
msgs = {}
- # XXXX SKIP DEAD MESSAGES!!!!
+ # XXXX003 SKIP DEAD MESSAGES!!!!
for handle, packet, n_retries in msgList:
addr = packet.getAddress()
message = packet.getPacket()
@@ -284,26 +283,31 @@
LOG.error_exc(sys.exc_info(),
"Exception while cleaning; shutting down thread.")
-class PacketProcessingThread(threading.Thread):
+class ProcessingThread(threading.Thread):
+ class _Shutdown:
+ def __call__(self):
+ raise self
+
#DOCDOC
def __init__(self, incomingQueue):
threading.Thread.__init__(self)
- # Clean up logic; maybe refactor. ????
- self.incomingQueue = incomingQueue
- self.mqueue = incomingQueue.getMessageQueue()
+ self.mqueue = MessageQueue()
def shutdown(self):
LOG.info("Telling processing thread to shut down.")
- self.mqueue.put(None)
+ self.mqueue.put(ProcessingThread._Shutdown())
+
+ def addJob(self, job):
+ self.mqueue.put(job)
def run(self):
try:
while 1:
- handle = self.mqueue.get()
- if handle is None:
- LOG.info("Processing thread shutting down.")
- return
- self.incomingQueue.deliverMessage(handle)
+ job = self.mqueue.get()
+ job()
+ except ProcessingThread._Shutdown:
+ LOG.info("Processing thread shutting down.")
+ return
except:
LOG.error_exc(sys.exc_info(),
"Exception while processing; shutting down thread.")
@@ -417,19 +421,20 @@
LOG.debug("Found %d pending messages in outgoing queue",
self.outgoingQueue.count())
+ self.cleaningThread = CleaningThread()
+ self.processingThread = ProcessingThread(self.incomingQueue)
+
LOG.debug("Connecting queues")
self.incomingQueue.connectQueues(mixPool=self.mixPool,
- manager=self.moduleManager)
+ manager=self.moduleManager,
+ processingThread=self.processingThread)
self.mixPool.connectQueues(outgoing=self.outgoingQueue,
manager=self.moduleManager)
self.outgoingQueue.connectQueues(server=self.mmtpServer)
self.mmtpServer.connectQueues(incoming=self.incomingQueue,
outgoing=self.outgoingQueue)
-
- self.cleaningThread = CleaningThread()
self.cleaningThread.start()
- self.processingThread = PacketProcessingThread(self.incomingQueue)
self.processingThread.start()
self.moduleManager.startThreading()
@@ -478,13 +483,6 @@
LOG.fatal("One of our threads has halted; shutting down.")
return
-## # Process any new messages that have come in, placing them
-## # into the mix pool.
-## self.incomingQueue.sendReadyMessages()
-## ##Prevent child processes from turning into zombies.
-## #???? I think we should just install a SIGCHLD handler.
-## waitForChildren(onceOnly=1,blocking=0)
-
# Calculate remaining time.
now = time.time()
timeLeft = nextEventTime - now
@@ -581,7 +579,7 @@
if pid != 0:
os._exit(0)
# Chdir to / so that we don't hold the CWD unnecessarily.
- os.chdir(os.path.normpath("/")) #???? Is this right on Win32?
+ os.chdir(os.path.normpath("/")) # WIN32 Is this right on Windows?
# Set umask to 000 so that we drop any (possibly nutty) umasks that
# our users had before.
os.umask(0000)
Index: ServerQueue.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerQueue.py,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -d -r1.3 -r1.4
--- ServerQueue.py 13 Jan 2003 06:35:52 -0000 1.3
+++ ServerQueue.py 17 Jan 2003 06:18:06 -0000 1.4
@@ -318,8 +318,6 @@
won't play nice if multiple instances are looking at the same
directory.
"""
- # XXXX separating addr was a mistake.
-
###
# Fields:
# sendable -- A list of handles for all messages
@@ -472,7 +470,6 @@
"""A TimedMixQueue holds a group of files, and returns some of them
as requested, according to a mixing algorithm that sends a batch
of messages every N seconds."""
- # FFFF : interval is unused.
## Fields:
# interval: scanning interval, in seconds.
def __init__(self, location, interval=600):
@@ -493,7 +490,6 @@
"""A CottrellMixQueue holds a group of files, and returns some of them
as requested, according the Cottrell (timed dynamic-pool) mixing
algorithm from Mixmaster."""
- # FFFF : interval is unused.
## Fields:
# interval: scanning interval, in seconds.
# minPool: Minimum number of messages to keep in pool.
- Prev by Date:
[minion-cvs] Timeouts, descriptor parsing, refactoring, more tests.
- Next by Date:
[minion-cvs] Timeouts, descriptor parsing, refactoring, more tests.
- Previous by thread:
[minion-cvs] Timeouts, descriptor parsing, refactoring, more tests.
- Next by thread:
[minion-cvs] Timeouts, descriptor parsing, refactoring, more tests.
- Index(es):