[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] Finish ServerQueue->Filestore refactoring
Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv30795/lib/mixminion/server
Modified Files:
Modules.py ServerMain.py ServerQueue.py
Log Message:
Finish ServerQueue->Filestore refactoring
Index: Modules.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Modules.py,v
retrieving revision 1.48
retrieving revision 1.49
diff -u -d -r1.48 -r1.49
--- Modules.py 13 Jul 2003 03:45:35 -0000 1.48
+++ Modules.py 24 Jul 2003 17:37:16 -0000 1.49
@@ -27,6 +27,7 @@
import mixminion.BuildMessage
import mixminion.Config
+import mixminion.Filestore
import mixminion.Packet
import mixminion.server.ServerQueue
import mixminion.server.ServerConfig
@@ -949,7 +950,7 @@
def createDeliveryQueue(self, queueDir):
# We create a temporary queue so we can hold files there for a little
# while before passing their names to mixmaster.
- self.tmpQueue = mixminion.server.ServerQueue.Queue(queueDir+"_tmp", 1, 1)
+ self.tmpQueue = mixminion.Filestore.StringStore(queueDir+"_tmp", 1, 1)
self.tmpQueue.removeAll()
return _MixmasterSMTPModuleDeliveryQueue(self, queueDir)
Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.86
retrieving revision 1.87
diff -u -d -r1.86 -r1.87
--- ServerMain.py 15 Jul 2003 15:30:56 -0000 1.86
+++ ServerMain.py 24 Jul 2003 17:37:16 -0000 1.87
@@ -52,6 +52,7 @@
import mixminion.Config
import mixminion.Crypto
+import mixminion.Filestore
import mixminion.server.MMTPServer
import mixminion.server.Modules
import mixminion.server.PacketHandler
@@ -118,7 +119,7 @@
return 1
-class IncomingQueue(mixminion.server.ServerQueue.Queue):
+class IncomingQueue(mixminion.Filestore.StringStore):
"""A Queue to accept packets from incoming MMTP connections,
and hold them until they can be processed. As packets arrive, and
are stored to disk, we notify a message queue so that another thread
@@ -130,7 +131,7 @@
def __init__(self, location, packetHandler):
"""Create an IncomingQueue that stores its messages in <location>
and processes them through <packetHandler>."""
- mixminion.server.ServerQueue.Queue.__init__(self, location, create=1)
+ mixminion.Filestore.StringStore.__init__(self, location, create=1)
self.packetHandler = packetHandler
self.mixPool = None
Index: ServerQueue.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerQueue.py,v
retrieving revision 1.27
retrieving revision 1.28
diff -u -d -r1.27 -r1.28
--- ServerQueue.py 24 Jul 2003 03:36:59 -0000 1.27
+++ ServerQueue.py 24 Jul 2003 17:37:16 -0000 1.28
@@ -4,6 +4,7 @@
"""mixminion.server.ServerQueue
Facility for fairly secure, directory-based, unordered queues.
+ DOCDOC not any more.
"""
import os
@@ -19,7 +20,7 @@
createPrivateDir, readPickled, writePickled, formatTime, readFile
from mixminion.Crypto import getCommonPRNG
-__all__ = [ 'Queue', 'DeliveryQueue', 'TimedMixPool', 'CottrellMixPool',
+__all__ = [ 'DeliveryQueue', 'TimedMixPool', 'CottrellMixPool',
'BinomialCottrellMixPool' ]
# Mode to pass to open(2) for creating a new file, and dying if it already
@@ -32,8 +33,6 @@
# trash.
INPUT_TIMEOUT = 6000
-Queue = mixminion.Filestore.MixedStore
-
class _DeliveryState:
"""Helper class: holds the state needed to schedule delivery or
eventual abandonment of a message in a DeliveryQueue."""
@@ -45,6 +44,12 @@
# address: Pickleable object holding address information. Delivery
# code uses this field to group messages by address before loading
# them all from disk.
+ # pendingAt: None (if we're not sending this message), or a time
+ # at which we begain sending this message.
+ # nextAttempt: None, or the time at which we'll next try to send
+ # this message. This field is invalid until someone calls
+ # setNextAttempt. If the time is in the past, delivery can
+ # be tried now. If None, the message may be removable.
def __init__(self, queuedTime=None, lastAttempt=None, address=None):
"""Create a new _DeliveryState for a message received at
queuedTime (default now), whose last delivery attempt was
@@ -54,7 +59,28 @@
self.queuedTime = queuedTime
self.lastAttempt = lastAttempt
self.address = address
+ self.pending = None
+ self.nextAttempt = None
+ self.remove = 0
+
+ def isPending(self):
+ """DOCDOC"""
+ return self.pending is not None
+
+ def setPending(self, now=None):
+ """DOCDOC"""
+ if now is None:
+ now = time.time()
+ self.pending = now
+ def setNonPending(self):
+ """DOCDOC"""
+ self.pending = None
+
+ def isRemovable(self):
+ """DOCDOC"""
+ return self.remove
+
def __getstate__(self):
# For pickling. All future versions of deliverystate will pickle
# to a tuple, whose first element will be a version string.
@@ -75,7 +101,11 @@
else:
raise MixFatalError("Unrecognized delivery state")
- def getNextAttempt(self, retrySchedule, now=None):
+ self.pending = None
+ self.nextAttempt = None
+ self.remove = 0
+
+ def setNextAttempt(self, retrySchedule, now=None):
"""Return the next time when we should try to deliver this message
according to the provided retrySchedule. If the time returned
is in the past, then immediate delivery is okay. If the time
@@ -84,12 +114,14 @@
if not now:
now = time.time()
+ self.remove = 0
last = self.lastAttempt
# If we've never tried to deliver the message, it's ready to
# go immediately.
if last is None:
- return now
+ self.nextAttempt = now
+ return
# Otherwise, we count from the time the message was first queued,
# until we find a scheduled delivery that falls after the last
@@ -102,11 +134,13 @@
for interval in retrySchedule:
attempt += interval
if attempt > last:
- return attempt
+ self.nextAttempt = attempt
+ return
# Oops: there are no scheduled deliveries after the last delivery.
# Time to drop this message.
- return None
+ self.nextAttempt = None
+ self.remove = 1
def setLastAttempt(self, when):
"""Update time of the last attempted delivery."""
@@ -151,10 +185,10 @@
"""Return the underlying object stored in the delivery queue, loading
it from disk if necessary."""
if self.message is None:
- self.message = self.queue.getObject(self.handle)
+ self.message = self.queue.store.getObject(self.handle)
return self.message
-class DeliveryQueue(mixminion.Filestore.ObjectMetadataStore):
+class DeliveryQueue:
"""A DeliveryQueue implements a queue that greedily sends messages to
outgoing streams that occasionally fail. All underlying messages
are pickled objects. Additionally, we store metadata about
@@ -175,30 +209,25 @@
"""
###
# Fields:
- # sendable -- A list of handles for all messages that we're not
- # currently sending.
- # pending -- Dict from handle->time_sent, for all messages that we're
- # currently sending.
# retrySchedule -- a list of intervals at which delivery of messages
# should be reattempted, as described in "setRetrySchedule".
- # nextAttempt -- a dict from handle->time-of-next-scheduled-delivery,
- # for all handles. Not meaningful for handles in 'pending'.
- # If the time is in the past, delivery can be tried now.
- # If None, the message may be removable.
#
# XXXX Refactor as many of these fields as possible into _DeliveryState.
#
- # Files:
- # meta_* : a pickled _DeliveryState object for each message in the
- # queue.
- # rmv_meta_*: a dead metafile, waiting for removal.
-
- def __init__(self, location, retrySchedule=None, now=None):
+ # DOCDOC list of fields is now inaccurate -- qname, store, _lock
+
+ def __init__(self, location, retrySchedule=None, now=None, name=None):
"""Create a new DeliveryQueue object that stores its files in
<location>. If retrySchedule is provided, it is interpreted as
- in setRetrySchedule."""
- mixminion.Filestore.ObjectMetadataStore.__init__(
- self, location, create=1, scrub=1)
+ in setRetrySchedule. DOCDOC name"""
+ self.store = mixminion.Filestore.ObjectMetadataStore(
+ location,create=1,scrub=1)
+ self._lock = self.store._lock
+ if name is None:
+ self.qname = os.path.split(location)[1]
+ else:
+ self.qname = name
+
self.retrySchedule = None
self._rescan()
if retrySchedule is not None:
@@ -232,24 +261,22 @@
def _rescan(self, now=None):
"""Helper: Rebuild the internal state of this queue from the
- underlying directory. Trashes 'pending' and 'sendable'."""
+ underlying directory. DOCDOC trashes .pending and .sendable."""
try:
self._lock.acquire()
- self.pending = {}
- self.nextAttempt = {}
- self.sendable = self.getAllMessages()
- self._loadState()
+ self.store.loadAllMetadata(lambda h: _DeliveryState())
self._rebuildNextAttempt(now)
self._repOk()
finally:
self._lock.release()
- def _getDeliveryState(self,h):
- return self.getMetadata(h)
+ def getAllMessages(self):
+ """DOCDOC"""
+ return self.store.getAllMessages()
- def _loadState(self):
- """Read all DeliveryState objects from the disk."""
- self.loadAllMetadata(lambda h: _DeliveryState())
+ def count(self):
+ """DOCDOC"""
+ return self.store.count()
def _rebuildNextAttempt(self, now=None):
"""Helper: Reconstruct self.nextAttempt from self.retrySchedule and
@@ -262,10 +289,8 @@
else:
rs = self.retrySchedule
- nextAttempt = {}
- for h,ds in self._metadata_cache.items():
- nextAttempt[h] = ds.getNextAttempt(rs, now)
- self.nextAttempt = nextAttempt
+ for ds in self.store._metadata_cache.values():
+ ds.setNextAttempt(rs, now)
self._repOk()
def _repOk(self):
@@ -276,19 +301,11 @@
try:
self._lock.acquire()
- allHandles = self.getAllMessages()
- knownHandles = self.pending.keys() + self.sendable
+ allHandles = self.store.getAllMessages()
allHandles.sort()
- knownHandles.sort()
- if allHandles != knownHandles:
- LOG.error("_repOK: %s != %s", allHandles, knownHandles)
- assert allHandles == knownHandles
- dsHandles = self._metadata_cache.keys()
- naHandles = self.nextAttempt.keys()
+ dsHandles = self.store._metadata_cache.keys()
dsHandles.sort()
- naHandles.sort()
assert allHandles == dsHandles
- assert allHandles == naHandles
finally:
self._lock.release()
@@ -299,15 +316,12 @@
assert self.retrySchedule is not None
try:
self._lock.acquire()
- handle = self.queueObject(msg)
- self.sendable.append(handle)
-
+ handle = self.store.queueObject(msg)
ds = _DeliveryState(now,None,address)
- self.setMetadata(handle, ds)
- self.nextAttempt[handle] = \
- ds.getNextAttempt(self.retrySchedule, now)
- LOG.trace("ServerQueue got message %s for %s",
- handle, self.dir)
+ ds.setNextAttempt(self.retrySchedule, now)
+ self.store.setMetadata(handle, ds)
+ LOG.trace("DeliveryQueue got message %s for %s",
+ handle, self.qname)
finally:
self._lock.release()
@@ -317,9 +331,9 @@
"""Returns a (msg, inserted, lastAttempt, nextAttempt) tuple
for a given message handle. For testing. """
self._repOk()
- o = self.getObject(handle)
- ds = self._getDeliveryState(handle)
- return (o, ds.queuedTime, ds.lastAttempt, self.nextAttempt[handle])
+ o = self.store.getObject(handle)
+ ds = self.store.getMetadata(handle)
+ return (o, ds.queuedTime, ds.lastAttempt, ds.nextAttempt)
def removeExpiredMessages(self, now=None):
"""Remove every message expired in this queue according to the
@@ -329,8 +343,9 @@
this method useful."""
try:
self._lock.acquire()
- for h in self.sendable:
- if self.nextAttempt[h] is None:
+ #XXXX
+ for h, ds in self._metadata_cache.items():
+ if ds.isRemovable():
self.removeMessage(h)
finally:
self._lock.release()
@@ -342,33 +357,29 @@
self._repOk()
if now is None:
now = time.time()
- LOG.trace("ServerQueue checking for deliverable messages in %s",
- self.dir)
+ LOG.trace("DeliveryQueue checking for deliverable messages in %s",
+ self.qname)
try:
self._lock.acquire()
- handles = self.sendable
messages = []
- self.sendable = []
- for h in self.pending.keys():
- LOG.trace(" [%s] is pending delivery", h)
- for h in handles:
- assert not self.pending.has_key(h)
- next = self.nextAttempt[h]
- if next is None:
+ for h in self.store._metadata_cache.keys():
+ state = self.store.getMetadata(h)
+ if state.isPending():
+ LOG.trace(" [%s] is pending delivery", h)
+ continue
+ elif state and state.isRemovable():
LOG.trace(" [%s] is expired", h)
self.removeMessage(h)
- elif next <= now:
+ elif (not state) or state.nextAttempt <= now:
LOG.trace(" [%s] is ready for delivery", h)
- state = self._getDeliveryState(h)
if state is None:
addr = None
else:
addr = state.address
messages.append(PendingMessage(h,self,addr))
- self.pending[h] = now
+ state.setPending(now)
else:
LOG.trace(" [%s] is not yet ready for redelivery", h)
- self.sendable.append(h)
finally:
self._lock.release()
@@ -392,37 +403,15 @@
raise NotImplementedError("_deliverMessages")
def removeMessage(self, handle):
- try:
- self._lock.acquire()
- mixminion.Filestore.BaseMetadataStore.removeMessage(self, handle)
- try:
- del self.pending[handle]
- pending = 1
- except KeyError:
- pending = 0
-
- try:
- del self.nextAttempt[handle]
- except KeyError:
- LOG.error("Removing message %s with no nextAttempt", handle)
+ self.store.removeMessage(handle)
- try:
- del self.sendable[self.sendable.index(handle)]
- except ValueError:
- if not pending:
- LOG.error("Removing message %s in neither "
- "'sendable' nor 'pending' list", handle)
- finally:
- self._lock.release()
+ def cleanQueue(self, secureDeleteFn=None):
+ self.store.cleanQueue(secureDeleteFn)
def removeAll(self, secureDeleteFn=None):
try:
self._lock.acquire()
- mixminion.Filestore.ObjectMetadataStore.removeAll(self,
- secureDeleteFn)
- self.pending = {}
- self.nextAttempt = {}
- self.sendable = []
+ self.store.removeAll(secureDeleteFn)
self.cleanQueue()
finally:
self._lock.release()
@@ -434,8 +423,8 @@
"""
assert self.retrySchedule is not None
- LOG.trace("ServerQueue got successful delivery for %s from %s",
- handle, self.dir)
+ LOG.trace("DeliveryQueue got successful delivery for %s from %s",
+ handle, self.qname)
self.removeMessage(handle)
def deliveryFailed(self, handle, retriable=0, now=None):
@@ -444,57 +433,55 @@
invoked after the corresponding message has been
unsuccessfully delivered."""
assert self.retrySchedule is not None
- LOG.trace("ServerQueue failed to deliver %s from %s",
- handle, self.dir)
+ LOG.trace("DeliveryQueue failed to deliver %s from %s",
+ handle, self.qname)
try:
self._lock.acquire()
try:
- lastAttempt = self.pending[handle]
+ ds = self.store.getMetadata(handle)
except KeyError:
# This should never happen
LOG.error_exc(sys.exc_info(),
- "Handle %s was not pending", handle)
+ "Handle %s had no state", handle)
+ ds = _DeliveryState(now)
+ ds.setNextAttempt(self.retrySchedule, now)
+ self.store.setMetadata(handle, ds)
+ return
+
+ if not ds.isPending():
+ LOG.error("Handle %s was not pending", handle)
return
+ last = ds.pending
+ ds.setNonPending()
+
if retriable:
# If we can retry the message, update the deliveryState
# with the most recent attempt, and see if there's another
# attempt in the future.
- try:
- ds = self._getDeliveryState(handle)
- except KeyError:
- # This should never happen
- LOG.error_exc(sys.exc_info(),
- "Handle %s had no state", handle)
- ds = _DeliveryState(now)
- self.setMetadata(handle, ds)
-
- ds.setLastAttempt(lastAttempt)
- nextAttempt = ds.getNextAttempt(self.retrySchedule, now)
- if nextAttempt is not None:
- LOG.trace(" (We'll try %s again at %s)", handle,
- formatTime(nextAttempt, 1))
+ ds.setLastAttempt(last)
+ ds.setNextAttempt(self.retrySchedule, now)
+ if ds.nextAttempt is not None:
# There is another scheduled delivery attempt. Remember
# it, mark the message sendable again, and save our state.
- self.nextAttempt[handle] = nextAttempt
- self.sendable.append(handle)
- try:
- del self.pending[handle]
- except KeyError:
- LOG.error("Handle %s was not pending", handle)
+ LOG.trace(" (We'll try %s again at %s)", handle,
+ formatTime(ds.nextAttempt, 1))
- self.setMetadata(handle, ds)
+ self.store.setMetadata(handle, ds)
return
-
+ else:
+ assert ds.isRemovable()
# Otherwise, fallthrough.
- # If we reach this point, the message is undeliverable.
+ # If we reach this point, the message is undeliverable, either
+ # because 'retriable' is false, or because we've run out of
+ # retries.
LOG.trace(" (Giving up on %s)", handle)
self.removeMessage(handle)
finally:
self._lock.release()
-class TimedMixPool(Queue):
+class TimedMixPool(mixminion.Filestore.ObjectStore):
"""A TimedMixPool holds a group of files, and returns some of them
as requested, according to a mixing algorithm that sends a batch
of messages every N seconds."""
@@ -503,7 +490,8 @@
def __init__(self, location, interval=600):
"""Create a TimedMixPool that sends its entire batch of messages
every 'interval' seconds."""
- Queue.__init__(self, location, create=1, scrub=1)
+ mixminion.Filestore.ObjectStore.__init__(
+ self, location, create=1, scrub=1)
self.interval = interval
def getBatch(self):