[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] Begin refactoring ServerQueue.



Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv16558/lib/mixminion/server

Modified Files:
	ServerQueue.py 
Log Message:
Begin refactoring ServerQueue.

(Client queues and fragment pools both behave enough like "generic
filestore with metadata" that it doesn't make much sense to have a
separate implementation any more.)


Index: ServerQueue.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerQueue.py,v
retrieving revision 1.25
retrieving revision 1.26
diff -u -d -r1.25 -r1.26
--- ServerQueue.py	13 Jul 2003 03:45:35 -0000	1.25
+++ ServerQueue.py	24 Jul 2003 03:22:57 -0000	1.26
@@ -13,6 +13,8 @@
 import cPickle
 import threading
 
+import mixminion.Filestore
+
 from mixminion.Common import MixError, MixFatalError, secureDelete, LOG, \
      createPrivateDir, readPickled, writePickled, formatTime, readFile
 from mixminion.Crypto import getCommonPRNG
@@ -30,261 +32,7 @@
 # trash.
 INPUT_TIMEOUT = 6000
 
-class Queue:
-    """A Queue is an unordered collection of files with secure insert, move,
-       and delete operations.
-
-       Implementation: a queue is a directory of 'messages'.  Each
-       filename in the directory has a name in one of the following
-       formats:
-             rmv_HANDLE  (A message waiting to be deleted)
-             msg_HANDLE  (A message waiting in the queue.
-             inp_HANDLE  (An incomplete message being created.)
-
-       (Where HANDLE is a randomly chosen 8-character string of characters
-       chosen from 'A-Za-z0-9+-'.  [Collision probability is negligible, and
-       collisions are detected.])
-
-       Threading notes:  Although Queue itself is threadsafe, you'll want
-       to synchronize around any multistep operations that you want to
-       run atomically.  Use Queue.lock() and Queue.unlock() for this.
-
-       In the Mixminion server, no queue currently has more than one producer
-       or more than one consumer ... so synchronization turns out to be
-       fairly easy.
-       """
-       # Threading: If we ever get more than one producer, we're fine.  With
-       #    more than one consumer, we'll need to modify DeliveryQueue below.
-
-    # Fields:   dir--the location of the queue.
-    #           n_entries: the number of complete messages in the queue.
-    #                 <0 if we haven't counted yet.
-    #           _lock: A lock that must be held while modifying or accessing
-    #                 the queue object.  Filesystem operations are allowed
-    #                 without holding the lock, but they must not be visible
-    #                 to users of the queue.
-    def __init__(self, location, create=0, scrub=0):
-        """Creates a queue object for a given directory, 'location'.  If
-           'create' is true, creates the directory if necessary.  If 'scrub'
-           is true, removes any incomplete or invalidated messages from the
-           Queue."""
-
-        secureDelete([]) # Make sure secureDelete is configured. HACK!
-
-        self._lock = threading.RLock()
-        self.dir = location
-
-        if not os.path.isabs(location):
-            LOG.warn("Queue path %s isn't absolute.", location)
-
-        if os.path.exists(location) and not os.path.isdir(location):
-            raise MixFatalError("%s is not a directory" % location)
-
-        createPrivateDir(location, nocreate=(not create))
-
-        if scrub:
-            self.cleanQueue()
-
-        # Count messages on first time through.
-        self.n_entries = -1
-
-    def lock(self):
-        """Prevent access to this queue from other threads."""
-        self._lock.acquire()
-
-    def unlock(self):
-        """Release the lock on this queue."""
-        self._lock.release()
-
-    def queueMessage(self, contents):
-        """Creates a new message in the queue whose contents are 'contents',
-           and returns a handle to that message."""
-        f, handle = self.openNewMessage()
-        f.write(contents)
-        self.finishMessage(f, handle) # handles locking
-        return handle
-
-    def queueObject(self, object):
-        """Queue an object using cPickle, and return a handle to that
-           object."""
-        f, handle = self.openNewMessage()
-        cPickle.dump(object, f, 1)
-        self.finishMessage(f, handle) # handles locking
-        return handle
-
-    def count(self, recount=0):
-        """Returns the number of complete messages in the queue."""
-        try:
-            self._lock.acquire()
-            if self.n_entries >= 0 and not recount:
-                return self.n_entries
-            else:
-                res = 0
-                for fn in os.listdir(self.dir):
-                    if fn.startswith("msg_"):
-                        res += 1
-                self.n_entries = res
-                return res
-        finally:
-            self._lock.release()
-
-    def pickRandom(self, count=None):
-        """Returns a list of 'count' handles to messages in this queue.
-           The messages are chosen randomly, and returned in a random order.
-
-           If there are fewer than 'count' messages in the queue, all the
-           messages will be retained."""
-        handles = self.getAllMessages() # handles locking
-
-        return getCommonPRNG().shuffle(handles, count)
-
-    def getAllMessages(self):
-        """Returns handles for all messages currently in the queue.
-           Note: this ordering is not guaranteed to be random"""
-        self._lock.acquire()
-        hs = [fn[4:] for fn in os.listdir(self.dir) if fn.startswith("msg_")]
-        self._lock.release()
-        return hs
-
-    def removeMessage(self, handle):
-        """Given a handle, removes the corresponding message from the queue."""
-        self.__changeState(handle, "msg", "rmv") # handles locking.
-
-    def removeAll(self, secureDeleteFn=None):
-        """Removes all messages from this queue."""
-        try:
-            self._lock.acquire()
-            for m in os.listdir(self.dir):
-                if m[:4] in ('inp_', 'msg_'):
-                    self.__changeState(m[4:], m[:3], "rmv")
-            self.n_entries = 0
-            self.cleanQueue(secureDeleteFn)
-        finally:
-            self._lock.release()
-
-    def moveMessage(self, handle, queue):
-        """Given a handle and a queue, moves the corresponding message from
-           this queue to the queue provided.  Returns a new handle for
-           the message in the destination queue."""
-        # Since we're switching handles, we don't want to just rename;
-        # We really want to copy and delete the old file.
-        try:
-            self._lock.acquire()
-            newHandle = queue.queueMessage(self.messageContents(handle))
-            self.removeMessage(handle)
-        finally:
-            self._lock.release()
-
-        return newHandle
-
-    def getMessagePath(self, handle):
-        """Given a handle for an existing message, return the name of the
-           file that contains that message."""
-        # We don't need to lock here: the handle is still valid, or it isn't.
-        return os.path.join(self.dir, "msg_"+handle)
-
-    def openMessage(self, handle):
-        """Given a handle for an existing message, returns a file descriptor
-           open to read that message."""
-        # We don't need to lock here; the handle is still valid, or it isn't.
-        return open(os.path.join(self.dir, "msg_"+handle), 'rb')
-
-    def messageContents(self, handle):
-        """Given a message handle, returns the contents of the corresponding
-           message."""
-        try:
-            self._lock.acquire()
-            return readFile(os.path.join(self.dir, "msg_"+handle), 1)
-        finally:
-            self._lock.release()
-
-    def getObject(self, handle):
-        """Given a message handle, read and unpickle the contents of the
-           corresponding message."""
-        try:
-            self._lock.acquire()
-            f = open(os.path.join(self.dir, "msg_"+handle), 'rb')
-            res = cPickle.load(f)
-            f.close()
-            return res
-        finally:
-            self._lock.release()
-
-    def openNewMessage(self):
-        """Returns (file, handle) tuple to create a new message.  Once
-           you're done writing, you must call finishMessage to
-           commit your changes, or abortMessage to reject them."""
-        file, handle = getCommonPRNG().openNewFile(self.dir, "inp_", 1)
-        return file, handle
-
-    def finishMessage(self, f, handle):
-        """Given a file and a corresponding handle, closes the file
-           commits the corresponding message."""
-        f.close()
-        self.__changeState(handle, "inp", "msg")
-
-    def abortMessage(self, f, handle):
-        """Given a file and a corresponding handle, closes the file
-           rejects the corresponding message."""
-        f.close()
-        self.__changeState(handle, "inp", "rmv")
-
-    def cleanQueue(self, secureDeleteFn=None):
-        """Removes all timed-out or trash messages from the queue.
-
-           If secureDeleteFn is provided, it is called with a list of
-           filenames to be removed.  Otherwise, files are removed using
-           secureDelete.
-
-           Returns 1 if a clean is already in progress; otherwise
-           returns 0.
-        """
-        # We don't need to hold the lock here; we synchronize via the
-        # filesystem.
-
-        rmv = []
-        allowedTime = int(time.time()) - INPUT_TIMEOUT
-        for m in os.listdir(self.dir):
-            if m.startswith("rmv_"):
-                rmv.append(os.path.join(self.dir, m))
-            elif m.startswith("inp_"):
-                try:
-                    s = os.stat(m)
-                    if s[stat.ST_MTIME] < allowedTime:
-                        self.__changeState(m[4:], "inp", "rmv")
-                        rmv.append(os.path.join(self.dir, m))
-                except OSError:
-                    pass
-        if secureDeleteFn:
-            secureDeleteFn(rmv)
-        else:
-            secureDelete(rmv, blocking=1)
-        return 0
-
-    def __changeState(self, handle, s1, s2):
-        """Helper method: changes the state of message 'handle' from 's1'
-           to 's2', and changes the internal count."""
-        try:
-            self._lock.acquire()
-            try:
-                os.rename(os.path.join(self.dir, s1+"_"+handle),
-                          os.path.join(self.dir, s2+"_"+handle))
-            except OSError, e:
-                contents = os.listdir(self.dir)
-                LOG.error("Error while trying to change %s from %s to %s: %s",
-                          handle, s1, s2, e)
-                LOG.error("Directory %s contains: %s", self.dir, contents)
-                self.count(1)
-                return
-
-            if self.n_entries < 0:
-                return
-            if s1 == 'msg' and s2 != 'msg':
-                self.n_entries -= 1
-            elif s1 != 'msg' and s2 == 'msg':
-                self.n_entries += 1
-        finally:
-            self._lock.release()
+Queue = mixminion.Filestore.MixedStore
 
 class _DeliveryState:
     """Helper class: holds the state needed to schedule delivery or
@@ -406,7 +154,7 @@
             self.message = self.queue.getObject(self.handle)
         return self.message
 
-class DeliveryQueue(Queue):
+class XDeliveryQueue(mixminion.Filestore.ObjectMetadataStore):
     """A DeliveryQueue implements a queue that greedily sends messages to
        outgoing streams that occasionally fail.  All underlying messages
        are pickled objects.  Additionally, we store metadata about
@@ -433,8 +181,6 @@
     #           currently sending.
     #    retrySchedule -- a list of intervals at which delivery of messages
     #           should be reattempted, as described in "setRetrySchedule".
-    #    deliveryState -- a dict from handle->_DeliveryState object for
-    #           all handles.
     #    nextAttempt -- a dict from handle->time-of-next-scheduled-delivery,
     #           for all handles.  Not meaningful for handles in 'pending'.
     #           If the time is in the past, delivery can be tried now.
@@ -451,7 +197,8 @@
         """Create a new DeliveryQueue object that stores its files in
            <location>.  If retrySchedule is provided, it is interpreted as
            in setRetrySchedule."""
-        Queue.__init__(self, location, create=1, scrub=1)
+        mixminion.Filestore.ObjectMetadataStore.__init__(
+            self, location, create=1, scrub=1)
         self.retrySchedule = None
         self._rescan()
         if retrySchedule is not None:
@@ -497,39 +244,12 @@
         finally:
             self._lock.release()
 
+    def _getDeliveryState(self,h):
+        return self.getMetadata(h)
+
     def _loadState(self):
         """Read all DeliveryState objects from the disk."""
-        # must hold lock.
-        self.deliveryState = {}
-        for h in self.getAllMessages():
-            fn = os.path.join(self.dir, "meta_"+h)
-            if os.path.exists(fn):
-                self.deliveryState[h] = readPickled(fn)
-            else:
-                LOG.warn("No metadata for file handle %s", h)
-                self.deliveryState[h] = _DeliveryState()
-                self._writeState(h)
-
-        for fn in os.listdir(self.dir):
-            if fn.startswith("meta_"):
-                h = fn[5:]
-                if not self.deliveryState.has_key(h):
-                    LOG.warn("Metadata for nonexistent handle %s", h)
-                os.unlink(os.path.join(self.dir, fn))
-
-    def _writeState(self, h):
-        """Helper method: writes out the metadata for handle 'h'.  If that
-           handle has been removed, removes the metadata.
-        """
-        fn = os.path.join(self.dir, "meta_"+h)
-        ds = self.deliveryState.get(h)
-        if ds is not None:
-            writePickled(fn, self.deliveryState[h])
-        else:
-            try:
-                os.rename(fn, os.path.join(self.dir, "rmv_meta_"+h))
-            except OSError:
-                pass
+        self.loadAllMetadata(lambda h: _DeliveryState())
 
     def _rebuildNextAttempt(self, now=None):
         """Helper: Reconstruct self.nextAttempt from self.retrySchedule and
@@ -543,7 +263,7 @@
             rs = self.retrySchedule
 
         nextAttempt = {}
-        for h, ds in self.deliveryState.items():
+        for h,ds in self._metadata_cache.items():
             nextAttempt[h] = ds.getNextAttempt(rs, now)
         self.nextAttempt = nextAttempt
         self._repOk()
@@ -563,7 +283,7 @@
             if allHandles != knownHandles:
                 LOG.error("_repOK: %s != %s", allHandles, knownHandles)
                 assert allHandles == knownHandles
-            dsHandles = self.deliveryState.keys()
+            dsHandles = self._metadata_cache.keys()
             naHandles = self.nextAttempt.keys()
             dsHandles.sort()
             naHandles.sort()
@@ -572,9 +292,6 @@
         finally:
             self._lock.release()
 
-    def queueMessage(self, msg):
-        if 1: raise MixError("Tried to call DeliveryQueue.queueMessage.")
-
     def queueDeliveryMessage(self, msg, address=None, now=None):
         """Schedule a message for delivery.
              msg -- the message.  This can be any pickleable object.
@@ -584,12 +301,13 @@
             self._lock.acquire()
             handle = self.queueObject(msg)
             self.sendable.append(handle)
-            ds = self.deliveryState[handle] = _DeliveryState(now,None,address)
+            
+            ds = _DeliveryState(now,None,address)
+            self.setMetadata(handle, ds)
             self.nextAttempt[handle] = \
                      ds.getNextAttempt(self.retrySchedule, now)
             LOG.trace("ServerQueue got message %s for %s",
                       handle, self.dir)
-            self._writeState(handle)
         finally:
             self._lock.release()
 
@@ -600,10 +318,8 @@
            for a given message handle.  For testing. """
         self._repOk()
         o = self.getObject(handle)
-        return (o,
-                self.deliveryState[handle].queuedTime,
-                self.deliveryState[handle].lastAttempt,
-                self.nextAttempt[handle])
+        ds = self._getDeliveryState(handle)
+        return (o, ds.queuedTime, ds.lastAttempt, self.nextAttempt[handle])
 
     def removeExpiredMessages(self, now=None):
         """Remove every message expired in this queue according to the
@@ -643,7 +359,7 @@
                     self.removeMessage(h)
                 elif next <= now:
                     LOG.trace("     [%s] is ready for delivery", h)
-                    state = self.deliveryState.get(h)
+                    state = self._getDeliveryState(h)
                     if state is None:
                         addr = None
                     else:
@@ -678,7 +394,7 @@
     def removeMessage(self, handle):
         try:
             self._lock.acquire()
-            Queue.removeMessage(self, handle)
+            mixminion.Filestore.BaseMetadataStore.removeMessage(self, handle)
             try:
                 del self.pending[handle]
                 pending = 1
@@ -686,10 +402,6 @@
                 pending = 0
 
             try:
-                del self.deliveryState[handle]
-            except KeyError:
-                LOG.error("Removing message %s with no delivery state", handle)
-            try:
                 del self.nextAttempt[handle]
             except KeyError:
                 LOG.error("Removing message %s with no nextAttempt", handle)
@@ -700,20 +412,14 @@
                 if not pending:
                     LOG.error("Removing message %s in neither "
                               "'sendable' nor 'pending' list", handle)
-
-            self._writeState(handle)
         finally:
             self._lock.release()
 
     def removeAll(self, secureDeleteFn=None):
         try:
             self._lock.acquire()
-            for m in os.listdir(self.dir):
-                if m[:5] == 'meta_':
-                    os.rename(os.path.join(self.dir, m),
-                              os.path.join(self.dir, "rmv_"+m))
-            Queue.removeAll(self, secureDeleteFn)
-            self.deliveryState = {}
+            mixminion.Filestore.ObjectMetadataStore.removeAll(self,
+                                                              secureDeleteFn)
             self.pending = {}
             self.nextAttempt = {}
             self.sendable = []
@@ -721,7 +427,6 @@
         finally:
             self._lock.release()
 
-
     def deliverySucceeded(self, handle):
         """Removes a message from the outgoing queue.  This method
            should be invoked after the corresponding message has been
@@ -756,12 +461,13 @@
                 # with the most recent attempt, and see if there's another
                 # attempt in the future.
                 try:
-                    ds = self.deliveryState[handle]
+                    ds = self._getDeliveryState(handle)
                 except KeyError:
                     # This should never happen
                     LOG.error_exc(sys.exc_info(),
                                   "Handle %s had no state", handle)
-                    ds = self.deliveryState[handle] = _DeliveryState(now)
+                    ds = _DeliveryState(now)
+                    self.setMetadata(handle, ds)
 
                 ds.setLastAttempt(lastAttempt)
                 nextAttempt = ds.getNextAttempt(self.retrySchedule, now)
@@ -777,7 +483,7 @@
                     except KeyError:
                         LOG.error("Handle %s was not pending", handle)
 
-                    self._writeState(handle)
+                    self.setMetadata(handle, ds)
                     return
 
                 # Otherwise, fallthrough.