[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] Begin refactoring ServerQueue.
Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv16558/lib/mixminion/server
Modified Files:
ServerQueue.py
Log Message:
Begin refactoring ServerQueue.
(Client queues and fragment pools both behave enough like "generic
filestore with metadata" that it doesn't make much sense to have a
separate implementation any more.)
Index: ServerQueue.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerQueue.py,v
retrieving revision 1.25
retrieving revision 1.26
diff -u -d -r1.25 -r1.26
--- ServerQueue.py 13 Jul 2003 03:45:35 -0000 1.25
+++ ServerQueue.py 24 Jul 2003 03:22:57 -0000 1.26
@@ -13,6 +13,8 @@
import cPickle
import threading
+import mixminion.Filestore
+
from mixminion.Common import MixError, MixFatalError, secureDelete, LOG, \
createPrivateDir, readPickled, writePickled, formatTime, readFile
from mixminion.Crypto import getCommonPRNG
@@ -30,261 +32,7 @@
# trash.
INPUT_TIMEOUT = 6000
-class Queue:
- """A Queue is an unordered collection of files with secure insert, move,
- and delete operations.
-
- Implementation: a queue is a directory of 'messages'. Each
- filename in the directory has a name in one of the following
- formats:
- rmv_HANDLE (A message waiting to be deleted)
- msg_HANDLE (A message waiting in the queue.
- inp_HANDLE (An incomplete message being created.)
-
- (Where HANDLE is a randomly chosen 8-character string of characters
- chosen from 'A-Za-z0-9+-'. [Collision probability is negligible, and
- collisions are detected.])
-
- Threading notes: Although Queue itself is threadsafe, you'll want
- to synchronize around any multistep operations that you want to
- run atomically. Use Queue.lock() and Queue.unlock() for this.
-
- In the Mixminion server, no queue currently has more than one producer
- or more than one consumer ... so synchronization turns out to be
- fairly easy.
- """
- # Threading: If we ever get more than one producer, we're fine. With
- # more than one consumer, we'll need to modify DeliveryQueue below.
-
- # Fields: dir--the location of the queue.
- # n_entries: the number of complete messages in the queue.
- # <0 if we haven't counted yet.
- # _lock: A lock that must be held while modifying or accessing
- # the queue object. Filesystem operations are allowed
- # without holding the lock, but they must not be visible
- # to users of the queue.
- def __init__(self, location, create=0, scrub=0):
- """Creates a queue object for a given directory, 'location'. If
- 'create' is true, creates the directory if necessary. If 'scrub'
- is true, removes any incomplete or invalidated messages from the
- Queue."""
-
- secureDelete([]) # Make sure secureDelete is configured. HACK!
-
- self._lock = threading.RLock()
- self.dir = location
-
- if not os.path.isabs(location):
- LOG.warn("Queue path %s isn't absolute.", location)
-
- if os.path.exists(location) and not os.path.isdir(location):
- raise MixFatalError("%s is not a directory" % location)
-
- createPrivateDir(location, nocreate=(not create))
-
- if scrub:
- self.cleanQueue()
-
- # Count messages on first time through.
- self.n_entries = -1
-
- def lock(self):
- """Prevent access to this queue from other threads."""
- self._lock.acquire()
-
- def unlock(self):
- """Release the lock on this queue."""
- self._lock.release()
-
- def queueMessage(self, contents):
- """Creates a new message in the queue whose contents are 'contents',
- and returns a handle to that message."""
- f, handle = self.openNewMessage()
- f.write(contents)
- self.finishMessage(f, handle) # handles locking
- return handle
-
- def queueObject(self, object):
- """Queue an object using cPickle, and return a handle to that
- object."""
- f, handle = self.openNewMessage()
- cPickle.dump(object, f, 1)
- self.finishMessage(f, handle) # handles locking
- return handle
-
- def count(self, recount=0):
- """Returns the number of complete messages in the queue."""
- try:
- self._lock.acquire()
- if self.n_entries >= 0 and not recount:
- return self.n_entries
- else:
- res = 0
- for fn in os.listdir(self.dir):
- if fn.startswith("msg_"):
- res += 1
- self.n_entries = res
- return res
- finally:
- self._lock.release()
-
- def pickRandom(self, count=None):
- """Returns a list of 'count' handles to messages in this queue.
- The messages are chosen randomly, and returned in a random order.
-
- If there are fewer than 'count' messages in the queue, all the
- messages will be retained."""
- handles = self.getAllMessages() # handles locking
-
- return getCommonPRNG().shuffle(handles, count)
-
- def getAllMessages(self):
- """Returns handles for all messages currently in the queue.
- Note: this ordering is not guaranteed to be random"""
- self._lock.acquire()
- hs = [fn[4:] for fn in os.listdir(self.dir) if fn.startswith("msg_")]
- self._lock.release()
- return hs
-
- def removeMessage(self, handle):
- """Given a handle, removes the corresponding message from the queue."""
- self.__changeState(handle, "msg", "rmv") # handles locking.
-
- def removeAll(self, secureDeleteFn=None):
- """Removes all messages from this queue."""
- try:
- self._lock.acquire()
- for m in os.listdir(self.dir):
- if m[:4] in ('inp_', 'msg_'):
- self.__changeState(m[4:], m[:3], "rmv")
- self.n_entries = 0
- self.cleanQueue(secureDeleteFn)
- finally:
- self._lock.release()
-
- def moveMessage(self, handle, queue):
- """Given a handle and a queue, moves the corresponding message from
- this queue to the queue provided. Returns a new handle for
- the message in the destination queue."""
- # Since we're switching handles, we don't want to just rename;
- # We really want to copy and delete the old file.
- try:
- self._lock.acquire()
- newHandle = queue.queueMessage(self.messageContents(handle))
- self.removeMessage(handle)
- finally:
- self._lock.release()
-
- return newHandle
-
- def getMessagePath(self, handle):
- """Given a handle for an existing message, return the name of the
- file that contains that message."""
- # We don't need to lock here: the handle is still valid, or it isn't.
- return os.path.join(self.dir, "msg_"+handle)
-
- def openMessage(self, handle):
- """Given a handle for an existing message, returns a file descriptor
- open to read that message."""
- # We don't need to lock here; the handle is still valid, or it isn't.
- return open(os.path.join(self.dir, "msg_"+handle), 'rb')
-
- def messageContents(self, handle):
- """Given a message handle, returns the contents of the corresponding
- message."""
- try:
- self._lock.acquire()
- return readFile(os.path.join(self.dir, "msg_"+handle), 1)
- finally:
- self._lock.release()
-
- def getObject(self, handle):
- """Given a message handle, read and unpickle the contents of the
- corresponding message."""
- try:
- self._lock.acquire()
- f = open(os.path.join(self.dir, "msg_"+handle), 'rb')
- res = cPickle.load(f)
- f.close()
- return res
- finally:
- self._lock.release()
-
- def openNewMessage(self):
- """Returns (file, handle) tuple to create a new message. Once
- you're done writing, you must call finishMessage to
- commit your changes, or abortMessage to reject them."""
- file, handle = getCommonPRNG().openNewFile(self.dir, "inp_", 1)
- return file, handle
-
- def finishMessage(self, f, handle):
- """Given a file and a corresponding handle, closes the file
- commits the corresponding message."""
- f.close()
- self.__changeState(handle, "inp", "msg")
-
- def abortMessage(self, f, handle):
- """Given a file and a corresponding handle, closes the file
- rejects the corresponding message."""
- f.close()
- self.__changeState(handle, "inp", "rmv")
-
- def cleanQueue(self, secureDeleteFn=None):
- """Removes all timed-out or trash messages from the queue.
-
- If secureDeleteFn is provided, it is called with a list of
- filenames to be removed. Otherwise, files are removed using
- secureDelete.
-
- Returns 1 if a clean is already in progress; otherwise
- returns 0.
- """
- # We don't need to hold the lock here; we synchronize via the
- # filesystem.
-
- rmv = []
- allowedTime = int(time.time()) - INPUT_TIMEOUT
- for m in os.listdir(self.dir):
- if m.startswith("rmv_"):
- rmv.append(os.path.join(self.dir, m))
- elif m.startswith("inp_"):
- try:
- s = os.stat(m)
- if s[stat.ST_MTIME] < allowedTime:
- self.__changeState(m[4:], "inp", "rmv")
- rmv.append(os.path.join(self.dir, m))
- except OSError:
- pass
- if secureDeleteFn:
- secureDeleteFn(rmv)
- else:
- secureDelete(rmv, blocking=1)
- return 0
-
- def __changeState(self, handle, s1, s2):
- """Helper method: changes the state of message 'handle' from 's1'
- to 's2', and changes the internal count."""
- try:
- self._lock.acquire()
- try:
- os.rename(os.path.join(self.dir, s1+"_"+handle),
- os.path.join(self.dir, s2+"_"+handle))
- except OSError, e:
- contents = os.listdir(self.dir)
- LOG.error("Error while trying to change %s from %s to %s: %s",
- handle, s1, s2, e)
- LOG.error("Directory %s contains: %s", self.dir, contents)
- self.count(1)
- return
-
- if self.n_entries < 0:
- return
- if s1 == 'msg' and s2 != 'msg':
- self.n_entries -= 1
- elif s1 != 'msg' and s2 == 'msg':
- self.n_entries += 1
- finally:
- self._lock.release()
+Queue = mixminion.Filestore.MixedStore
class _DeliveryState:
"""Helper class: holds the state needed to schedule delivery or
@@ -406,7 +154,7 @@
self.message = self.queue.getObject(self.handle)
return self.message
-class DeliveryQueue(Queue):
+class XDeliveryQueue(mixminion.Filestore.ObjectMetadataStore):
"""A DeliveryQueue implements a queue that greedily sends messages to
outgoing streams that occasionally fail. All underlying messages
are pickled objects. Additionally, we store metadata about
@@ -433,8 +181,6 @@
# currently sending.
# retrySchedule -- a list of intervals at which delivery of messages
# should be reattempted, as described in "setRetrySchedule".
- # deliveryState -- a dict from handle->_DeliveryState object for
- # all handles.
# nextAttempt -- a dict from handle->time-of-next-scheduled-delivery,
# for all handles. Not meaningful for handles in 'pending'.
# If the time is in the past, delivery can be tried now.
@@ -451,7 +197,8 @@
"""Create a new DeliveryQueue object that stores its files in
<location>. If retrySchedule is provided, it is interpreted as
in setRetrySchedule."""
- Queue.__init__(self, location, create=1, scrub=1)
+ mixminion.Filestore.ObjectMetadataStore.__init__(
+ self, location, create=1, scrub=1)
self.retrySchedule = None
self._rescan()
if retrySchedule is not None:
@@ -497,39 +244,12 @@
finally:
self._lock.release()
+ def _getDeliveryState(self,h):
+ return self.getMetadata(h)
+
def _loadState(self):
"""Read all DeliveryState objects from the disk."""
- # must hold lock.
- self.deliveryState = {}
- for h in self.getAllMessages():
- fn = os.path.join(self.dir, "meta_"+h)
- if os.path.exists(fn):
- self.deliveryState[h] = readPickled(fn)
- else:
- LOG.warn("No metadata for file handle %s", h)
- self.deliveryState[h] = _DeliveryState()
- self._writeState(h)
-
- for fn in os.listdir(self.dir):
- if fn.startswith("meta_"):
- h = fn[5:]
- if not self.deliveryState.has_key(h):
- LOG.warn("Metadata for nonexistent handle %s", h)
- os.unlink(os.path.join(self.dir, fn))
-
- def _writeState(self, h):
- """Helper method: writes out the metadata for handle 'h'. If that
- handle has been removed, removes the metadata.
- """
- fn = os.path.join(self.dir, "meta_"+h)
- ds = self.deliveryState.get(h)
- if ds is not None:
- writePickled(fn, self.deliveryState[h])
- else:
- try:
- os.rename(fn, os.path.join(self.dir, "rmv_meta_"+h))
- except OSError:
- pass
+ self.loadAllMetadata(lambda h: _DeliveryState())
def _rebuildNextAttempt(self, now=None):
"""Helper: Reconstruct self.nextAttempt from self.retrySchedule and
@@ -543,7 +263,7 @@
rs = self.retrySchedule
nextAttempt = {}
- for h, ds in self.deliveryState.items():
+ for h,ds in self._metadata_cache.items():
nextAttempt[h] = ds.getNextAttempt(rs, now)
self.nextAttempt = nextAttempt
self._repOk()
@@ -563,7 +283,7 @@
if allHandles != knownHandles:
LOG.error("_repOK: %s != %s", allHandles, knownHandles)
assert allHandles == knownHandles
- dsHandles = self.deliveryState.keys()
+ dsHandles = self._metadata_cache.keys()
naHandles = self.nextAttempt.keys()
dsHandles.sort()
naHandles.sort()
@@ -572,9 +292,6 @@
finally:
self._lock.release()
- def queueMessage(self, msg):
- if 1: raise MixError("Tried to call DeliveryQueue.queueMessage.")
-
def queueDeliveryMessage(self, msg, address=None, now=None):
"""Schedule a message for delivery.
msg -- the message. This can be any pickleable object.
@@ -584,12 +301,13 @@
self._lock.acquire()
handle = self.queueObject(msg)
self.sendable.append(handle)
- ds = self.deliveryState[handle] = _DeliveryState(now,None,address)
+
+ ds = _DeliveryState(now,None,address)
+ self.setMetadata(handle, ds)
self.nextAttempt[handle] = \
ds.getNextAttempt(self.retrySchedule, now)
LOG.trace("ServerQueue got message %s for %s",
handle, self.dir)
- self._writeState(handle)
finally:
self._lock.release()
@@ -600,10 +318,8 @@
for a given message handle. For testing. """
self._repOk()
o = self.getObject(handle)
- return (o,
- self.deliveryState[handle].queuedTime,
- self.deliveryState[handle].lastAttempt,
- self.nextAttempt[handle])
+ ds = self._getDeliveryState(handle)
+ return (o, ds.queuedTime, ds.lastAttempt, self.nextAttempt[handle])
def removeExpiredMessages(self, now=None):
"""Remove every message expired in this queue according to the
@@ -643,7 +359,7 @@
self.removeMessage(h)
elif next <= now:
LOG.trace(" [%s] is ready for delivery", h)
- state = self.deliveryState.get(h)
+ state = self._getDeliveryState(h)
if state is None:
addr = None
else:
@@ -678,7 +394,7 @@
def removeMessage(self, handle):
try:
self._lock.acquire()
- Queue.removeMessage(self, handle)
+ mixminion.Filestore.BaseMetadataStore.removeMessage(self, handle)
try:
del self.pending[handle]
pending = 1
@@ -686,10 +402,6 @@
pending = 0
try:
- del self.deliveryState[handle]
- except KeyError:
- LOG.error("Removing message %s with no delivery state", handle)
- try:
del self.nextAttempt[handle]
except KeyError:
LOG.error("Removing message %s with no nextAttempt", handle)
@@ -700,20 +412,14 @@
if not pending:
LOG.error("Removing message %s in neither "
"'sendable' nor 'pending' list", handle)
-
- self._writeState(handle)
finally:
self._lock.release()
def removeAll(self, secureDeleteFn=None):
try:
self._lock.acquire()
- for m in os.listdir(self.dir):
- if m[:5] == 'meta_':
- os.rename(os.path.join(self.dir, m),
- os.path.join(self.dir, "rmv_"+m))
- Queue.removeAll(self, secureDeleteFn)
- self.deliveryState = {}
+ mixminion.Filestore.ObjectMetadataStore.removeAll(self,
+ secureDeleteFn)
self.pending = {}
self.nextAttempt = {}
self.sendable = []
@@ -721,7 +427,6 @@
finally:
self._lock.release()
-
def deliverySucceeded(self, handle):
"""Removes a message from the outgoing queue. This method
should be invoked after the corresponding message has been
@@ -756,12 +461,13 @@
# with the most recent attempt, and see if there's another
# attempt in the future.
try:
- ds = self.deliveryState[handle]
+ ds = self._getDeliveryState(handle)
except KeyError:
# This should never happen
LOG.error_exc(sys.exc_info(),
"Handle %s had no state", handle)
- ds = self.deliveryState[handle] = _DeliveryState(now)
+ ds = _DeliveryState(now)
+ self.setMetadata(handle, ds)
ds.setLastAttempt(lastAttempt)
nextAttempt = ds.getNextAttempt(self.retrySchedule, now)
@@ -777,7 +483,7 @@
except KeyError:
LOG.error("Handle %s was not pending", handle)
- self._writeState(handle)
+ self.setMetadata(handle, ds)
return
# Otherwise, fallthrough.