[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[minion-cvs] Improve documentation; refactor random filename generat...



Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv20294/lib/mixminion/server

Modified Files:
	ServerQueue.py 
Log Message:
Improve documentation; refactor random filename generation into Crypto.RNG.

Index: ServerQueue.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerQueue.py,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -d -r1.4 -r1.5
--- ServerQueue.py	17 Jan 2003 06:18:06 -0000	1.4
+++ ServerQueue.py	4 Feb 2003 02:08:37 -0000	1.5
@@ -7,7 +7,6 @@
    """
 
 import os
-import base64
 import time
 import stat
 import cPickle
@@ -40,8 +39,10 @@
              rmv_HANDLE  (A message waiting to be deleted)
              msg_HANDLE  (A message waiting in the queue.
              inp_HANDLE  (An incomplete message being created.)
-       (Where HANDLE is a randomly chosen 12-character selection from the
-       characters 'A-Za-z0-9+-'.  [Collision probability is negligable.])
+
+       (Where HANDLE is a randomly chosen 8-character string of characters
+       chosen from 'A-Za-z0-9+-'.  [Collision probability is negligable, and
+       collisions are detected.])
 
        Threading notes:  Although Queue itself is threadsafe, you'll want
        to synchronize around any multistep operations that you want to
@@ -51,18 +52,8 @@
        or more than one consumer ... so synchronization turns out to be
        fairly easy.
        """
-       # How negligible are the chances of collision?  A back-of-the-envelope
-       # approximation: The chance of a collision reaches .1% when you have 3e9
-       # messages in a single queue.  If Alice somehow manages to accumulate a
-       # 96 gigabyte backlog, we'll have bigger problems than name
-       # collision... such as the fact that most Unices behave badly when
-       # confronted with 3 billion files in the same directory... or the fact
-       # that, at today's processor speeds, it will take Alice 3 or 4 CPU-years
-       # to clear her backlog.
-       #
        # Threading: If we ever get more than one producer, we're fine.  With
        #    more than one consumer, we'll need to modify DeliveryQueue below.
-       
 
     # Fields:   dir--the location of the queue.
     #           n_entries: the number of complete messages in the queue.
@@ -225,11 +216,9 @@
         """Returns (file, handle) tuple to create a new message.  Once
            you're done writing, you must call finishMessage to
            commit your changes, or abortMessage to reject them."""
-        handle = self.__newHandle()
-        fname = os.path.join(self.dir, "inp_"+handle)
-        fd = os.open(fname, _NEW_MESSAGE_FLAGS, 0600)
-        return os.fdopen(fd, 'wb'), handle
-
+        file, handle = getCommonPRNG().openNewFile(self.dir, "inp_", 1)
+        return file, handle
+    
     def finishMessage(self, f, handle):
         """Given a file and a corresponding handle, closes the file
            commits the corresponding message."""
@@ -245,10 +234,12 @@
     def cleanQueue(self, secureDeleteFn=None):
         """Removes all timed-out or trash messages from the queue.
 
+           If secureDeleteFn is provided, it is called with a list of 
+           filenames to be removed.  Otherwise, files are removed using
+           secureDelete.
+
            Returns 1 if a clean is already in progress; otherwise
            returns 0.
-
-           DOCDOC secureDeleteFn
         """
         # We don't need to hold the lock here; we synchronize via the
         # filesystem.
@@ -294,16 +285,14 @@
         finally:
             self._lock.release()
 
-    def __newHandle(self):
-        """Helper method: creates a new random handle."""
-        junk = getCommonPRNG().getBytes(9)
-        return base64.encodestring(junk).strip().replace("/","-")
-
 class DeliveryQueue(Queue):
     """A DeliveryQueue implements a queue that greedily sends messages
        to outgoing streams that occasionally fail.  Messages in a
        DeliveryQueue are no longer unstructured text, but rather
-       tuples of: (n_retries, addressing info, msg).
+       tuples of: (n_retries, None, message, nextAttempt), where
+       n_retries is the number of delivery attempts so far,
+       the message is an arbitrary pickled object, and nextAttempt is a time
+       before which no further delivery should be attempted.
 
        This class is abstract. Implementors of this class should
        subclass it to add a _deliverMessages method.  Multiple
@@ -320,10 +309,12 @@
     """
     ###
     # Fields:
-    #    sendable -- A list of handles for all messages
-    #           that we're not currently sending.
-    #    pending -- Dict from handle->1, for all messages that we're
+    #    sendable -- A list of handles for all messages that we're not
+    #           currently sending.
+    #    pending -- Dict from handle->time_sent, for all messages that we're
     #           currently sending.
+    #    retrySchedule: a list of intervals at which delivery of messages
+    #           should be reattempted, as described in "setRetrySchedule".
 
     def __init__(self, location, retrySchedule=None):
         Queue.__init__(self, location, create=1, scrub=1)
@@ -333,7 +324,22 @@
         else:
             self.retrySchedule = retrySchedule[:]
 
-    def setRetrySchedule(self, schedule):#DOCDOC
+    def setRetrySchedule(self, schedule):
+        """Set the retry schedule for this queue.  A retry schedule is
+           a list of integers, each representing a number of seconds.
+           For example, a schedule of [ 120, 120, 3600, 3600 ] will
+           cause undeliverable messages to be retried after 2 minutes,
+           then 2 minutes later, then 1 hour later, then 1 hour later.
+
+           Retry schedules are not strictly guaranteed, for two reasons:
+             1) Message delivery can fail _unretriably_, in which case
+                no further attempts are made.
+             2) Retries are only actually attempted when sendReadyMessages
+                is called.  If the schedule specifies retry attempts at
+                10-second intervals, but sendReadyMessages is invoked only
+                every 30 minutes, messages will only me retried once every
+                30 minutes.
+        """              
         self.retrySchedule = schedule[:]
 
     def _rescan(self):
@@ -349,12 +355,13 @@
     def queueMessage(self, msg):
         if 1: raise MixError("Tried to call DeliveryQueue.queueMessage.")
 
-    def queueDeliveryMessage(self,  msg, retry=0, nextAttempt=0):
+    def queueDeliveryMessage(self, msg, retry=0, nextAttempt=0):
         """Schedule a message for delivery.
-             addr -- An object to indicate the message's destination
-             msg -- the message itself
+             msg -- the message.  This can be any pickleable object
              retry -- how many times so far have we tried to send?
-             DOCDOC"""
+             nextAttempt -- A time before which no further attempts to
+                  deliver should be made.
+        """ 
         try:
             self._lock.acquire()
             #DOCDOC nextAttempt 
@@ -366,7 +373,7 @@
         return handle
 
     def get(self,handle):
-        """Returns a (n_retries, addr, msg, nextAttempt?) payload for a given
+        """Returns a (n_retries, msg, nextAttempt) tuple for a given
            message handle."""
         o = self.getObject(handle)
         if len(o) == 3:# XXXX For legacy queues; delete after 0.0.3
@@ -438,21 +445,22 @@
                 retries, msg, schedAttempt = self.get(handle)
 
                 # Multiple retry intervals may have passed in between the most
-                # recent failed delivery attempt (lastAttempt) and the time
-                # it was schedule (schedAttempt).  Increment 'retries' and
-                # efore it (prevAttempt).  Increment 'retries' to reflect the
-                # number of retry intervals that have passed between first
-                # sending the message and nextAttempt.
-                #DOCDOC
+                # recent failed delivery attempt (lastAttempt) and the time it
+                # was scheduled (schedAttempt).  Increment 'retries' and to
+                # reflect the number of retry intervals that have passed
+                # between first sending the message and nextAttempt.
                 if self.retrySchedule and retries < len(self.retrySchedule):
                     nextAttempt = schedAttempt
                     if nextAttempt == 0:
                         nextAttempt = lastAttempt
+                    # Increment nextAttempt and retries according to the
+                    # retry schedule, until nextAttempt is after lastAttempt.
                     while retries < len(self.retrySchedule):
                         nextAttempt += self.retrySchedule[retries]
                         retries += 1
                         if nextAttempt > lastAttempt:
                             break
+                    # If there are more attempts to be made, queue the message.
                     if retries <= len(self.retrySchedule):
                         self.queueDeliveryMessage(msg, retries, nextAttempt)
                 elif not self.retrySchedule:
@@ -461,7 +469,8 @@
                     nextAttempt = 0
                     if retries < 10:
                         self.queueDeliveryMessage(msg, retries, nextAttempt)
-                                              
+
+            # Now, it's okay to remove the failed message.
             self.removeMessage(handle)
         finally:
             self._lock.release()
@@ -508,7 +517,6 @@
            that there is still a matter of some controversy whether it ever
            makes sense to set minSend != 1.)
            """
-
         # Note that there was a bit of confusion here: earlier versions
         # implemented an algorithm called "mixmaster" that wasn't actually the
         # mixmaster algorithm.  I picked up the other algorithm from an early