[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[minion-cvs] Improve documentation; refactor random filename generat...
Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv20294/lib/mixminion/server
Modified Files:
ServerQueue.py
Log Message:
Improve documentation; refactor random filename generation into Crypto.RNG.
Index: ServerQueue.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerQueue.py,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -d -r1.4 -r1.5
--- ServerQueue.py 17 Jan 2003 06:18:06 -0000 1.4
+++ ServerQueue.py 4 Feb 2003 02:08:37 -0000 1.5
@@ -7,7 +7,6 @@
"""
import os
-import base64
import time
import stat
import cPickle
@@ -40,8 +39,10 @@
rmv_HANDLE (A message waiting to be deleted)
msg_HANDLE (A message waiting in the queue.
inp_HANDLE (An incomplete message being created.)
- (Where HANDLE is a randomly chosen 12-character selection from the
- characters 'A-Za-z0-9+-'. [Collision probability is negligable.])
+
+ (Where HANDLE is a randomly chosen 8-character string of characters
+ chosen from 'A-Za-z0-9+-'. [Collision probability is negligable, and
+ collisions are detected.])
Threading notes: Although Queue itself is threadsafe, you'll want
to synchronize around any multistep operations that you want to
@@ -51,18 +52,8 @@
or more than one consumer ... so synchronization turns out to be
fairly easy.
"""
- # How negligible are the chances of collision? A back-of-the-envelope
- # approximation: The chance of a collision reaches .1% when you have 3e9
- # messages in a single queue. If Alice somehow manages to accumulate a
- # 96 gigabyte backlog, we'll have bigger problems than name
- # collision... such as the fact that most Unices behave badly when
- # confronted with 3 billion files in the same directory... or the fact
- # that, at today's processor speeds, it will take Alice 3 or 4 CPU-years
- # to clear her backlog.
- #
# Threading: If we ever get more than one producer, we're fine. With
# more than one consumer, we'll need to modify DeliveryQueue below.
-
# Fields: dir--the location of the queue.
# n_entries: the number of complete messages in the queue.
@@ -225,11 +216,9 @@
"""Returns (file, handle) tuple to create a new message. Once
you're done writing, you must call finishMessage to
commit your changes, or abortMessage to reject them."""
- handle = self.__newHandle()
- fname = os.path.join(self.dir, "inp_"+handle)
- fd = os.open(fname, _NEW_MESSAGE_FLAGS, 0600)
- return os.fdopen(fd, 'wb'), handle
-
+ file, handle = getCommonPRNG().openNewFile(self.dir, "inp_", 1)
+ return file, handle
+
def finishMessage(self, f, handle):
"""Given a file and a corresponding handle, closes the file
commits the corresponding message."""
@@ -245,10 +234,12 @@
def cleanQueue(self, secureDeleteFn=None):
"""Removes all timed-out or trash messages from the queue.
+ If secureDeleteFn is provided, it is called with a list of
+ filenames to be removed. Otherwise, files are removed using
+ secureDelete.
+
Returns 1 if a clean is already in progress; otherwise
returns 0.
-
- DOCDOC secureDeleteFn
"""
# We don't need to hold the lock here; we synchronize via the
# filesystem.
@@ -294,16 +285,14 @@
finally:
self._lock.release()
- def __newHandle(self):
- """Helper method: creates a new random handle."""
- junk = getCommonPRNG().getBytes(9)
- return base64.encodestring(junk).strip().replace("/","-")
-
class DeliveryQueue(Queue):
"""A DeliveryQueue implements a queue that greedily sends messages
to outgoing streams that occasionally fail. Messages in a
DeliveryQueue are no longer unstructured text, but rather
- tuples of: (n_retries, addressing info, msg).
+ tuples of: (n_retries, None, message, nextAttempt), where
+ n_retries is the number of delivery attempts so far,
+ the message is an arbitrary pickled object, and nextAttempt is a time
+ before which no further delivery should be attempted.
This class is abstract. Implementors of this class should
subclass it to add a _deliverMessages method. Multiple
@@ -320,10 +309,12 @@
"""
###
# Fields:
- # sendable -- A list of handles for all messages
- # that we're not currently sending.
- # pending -- Dict from handle->1, for all messages that we're
+ # sendable -- A list of handles for all messages that we're not
+ # currently sending.
+ # pending -- Dict from handle->time_sent, for all messages that we're
# currently sending.
+ # retrySchedule: a list of intervals at which delivery of messages
+ # should be reattempted, as described in "setRetrySchedule".
def __init__(self, location, retrySchedule=None):
Queue.__init__(self, location, create=1, scrub=1)
@@ -333,7 +324,22 @@
else:
self.retrySchedule = retrySchedule[:]
- def setRetrySchedule(self, schedule):#DOCDOC
+ def setRetrySchedule(self, schedule):
+ """Set the retry schedule for this queue. A retry schedule is
+ a list of integers, each representing a number of seconds.
+ For example, a schedule of [ 120, 120, 3600, 3600 ] will
+ cause undeliverable messages to be retried after 2 minutes,
+ then 2 minutes later, then 1 hour later, then 1 hour later.
+
+ Retry schedules are not strictly guaranteed, for two reasons:
+ 1) Message delivery can fail _unretriably_, in which case
+ no further attempts are made.
+ 2) Retries are only actually attempted when sendReadyMessages
+ is called. If the schedule specifies retry attempts at
+ 10-second intervals, but sendReadyMessages is invoked only
+ every 30 minutes, messages will only me retried once every
+ 30 minutes.
+ """
self.retrySchedule = schedule[:]
def _rescan(self):
@@ -349,12 +355,13 @@
def queueMessage(self, msg):
if 1: raise MixError("Tried to call DeliveryQueue.queueMessage.")
- def queueDeliveryMessage(self, msg, retry=0, nextAttempt=0):
+ def queueDeliveryMessage(self, msg, retry=0, nextAttempt=0):
"""Schedule a message for delivery.
- addr -- An object to indicate the message's destination
- msg -- the message itself
+ msg -- the message. This can be any pickleable object
retry -- how many times so far have we tried to send?
- DOCDOC"""
+ nextAttempt -- A time before which no further attempts to
+ deliver should be made.
+ """
try:
self._lock.acquire()
#DOCDOC nextAttempt
@@ -366,7 +373,7 @@
return handle
def get(self,handle):
- """Returns a (n_retries, addr, msg, nextAttempt?) payload for a given
+ """Returns a (n_retries, msg, nextAttempt) tuple for a given
message handle."""
o = self.getObject(handle)
if len(o) == 3:# XXXX For legacy queues; delete after 0.0.3
@@ -438,21 +445,22 @@
retries, msg, schedAttempt = self.get(handle)
# Multiple retry intervals may have passed in between the most
- # recent failed delivery attempt (lastAttempt) and the time
- # it was schedule (schedAttempt). Increment 'retries' and
- # efore it (prevAttempt). Increment 'retries' to reflect the
- # number of retry intervals that have passed between first
- # sending the message and nextAttempt.
- #DOCDOC
+ # recent failed delivery attempt (lastAttempt) and the time it
+ # was scheduled (schedAttempt). Increment 'retries' and to
+ # reflect the number of retry intervals that have passed
+ # between first sending the message and nextAttempt.
if self.retrySchedule and retries < len(self.retrySchedule):
nextAttempt = schedAttempt
if nextAttempt == 0:
nextAttempt = lastAttempt
+ # Increment nextAttempt and retries according to the
+ # retry schedule, until nextAttempt is after lastAttempt.
while retries < len(self.retrySchedule):
nextAttempt += self.retrySchedule[retries]
retries += 1
if nextAttempt > lastAttempt:
break
+ # If there are more attempts to be made, queue the message.
if retries <= len(self.retrySchedule):
self.queueDeliveryMessage(msg, retries, nextAttempt)
elif not self.retrySchedule:
@@ -461,7 +469,8 @@
nextAttempt = 0
if retries < 10:
self.queueDeliveryMessage(msg, retries, nextAttempt)
-
+
+ # Now, it's okay to remove the failed message.
self.removeMessage(handle)
finally:
self._lock.release()
@@ -508,7 +517,6 @@
that there is still a matter of some controversy whether it ever
makes sense to set minSend != 1.)
"""
-
# Note that there was a bit of confusion here: earlier versions
# implemented an algorithm called "mixmaster" that wasn't actually the
# mixmaster algorithm. I picked up the other algorithm from an early