[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] Begin refactoring ServerQueue.
Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.mit.edu:/tmp/cvs-serv16558/lib/mixminion
Added Files:
Filestore.py
Log Message:
Begin refactoring ServerQueue.
(Client queues and fragment pools both behave enough like "generic
filestore with metadata" that it doesn't make much sense to have a
separate implementation any more.)
--- NEW FILE: Filestore.py ---
# Copyright 2002-2003 Nick Mathewson. See LICENSE for licensing information.
# $Id: Filestore.py,v 1.1 2003/07/24 03:22:56 nickm Exp $
"""mixminion.Filestore
Common code for directory-based, security conscious, threadsafe
unordered file stores.
"""
# Formerly, this was all in mixminion.server.ServerQueue. But
# ClientMain had already grown a minimal version for client file
# queues, and we needed another for fragment stores anyway. So it was
# time to refactor the common code.
import os
import time
import stat
import cPickle
import threading
from mixminion.Common import MixFatalError, secureDelete, LOG, \
createPrivateDir, readFile
from mixminion.Crypto import getCommonPRNG
__all__ = [ "StringStore", "StringMetadataStore",
"ObjectStore", "ObjectMetadataStore",
"MixedStore", "MixedMetadataStore" ]
# Mode to pass to open(2) for creating a new file, and dying if it already
# exists.
_NEW_FILE_FLAGS = os.O_WRONLY+os.O_CREAT+os.O_EXCL
# On windows or (old-school) mac, binary != text.
_NEW_FILE_FLAGS += getattr(os, 'O_BINARY', 0)
# Any inp_* files older than INPUT_TIMEOUT seconds old are assumed to be
# trash.
INPUT_TIMEOUT = 6000
class BaseStore:
"""A BaseStore is an unordered collection of files with secure insert,
move, and delete operations.
This class is not for direct use; combine it with one of the
mixin classes below.
Abstractly, a BaseStore is a consistent collection of 'things'
with (optional) persistant metadata. The 'things' support
insert, move, and delete operations. The metadata supports
modification.
Implementation: a BaseStore is a directory of 'message' files.
Each filename in the directory has a name in one of the
following formats:
rmv_HANDLE (A message waiting to be deleted)
msg_HANDLE (A message waiting in the queue.
inp_HANDLE (An incomplete message being created.)
(Where HANDLE is a randomly chosen 8-character string of characters
chosen from 'A-Za-z0-9+-'. [Collision probability is negligible, and
collisions are detected.])
If metadata is present, is has a names with the analogous
rmvm_HANDLE
meta_HANDLE
inpm_HANDLE
Threading notes: Although BaseStore itself is threadsafe, you'll want
to synchronize around any multistep operations that you want to
run atomically. Use BaseStore.lock() and BaseStore.unlock() for this.
In the Mixminion server, no queue currently has more than one producer
or more than one consumer ... so synchronization turns out to be
fairly easy.
"""
# Fields: dir--the location of the file store.
# n_entries: the number of complete messages in the queue.
# <0 if we haven't counted yet.
# _lock: A lock that must be held while modifying or accessing
# the queue object. Filesystem operations are allowed
# without holding the lock, but they must not be visible
# to users of the queue.
def __init__(self, location, create=0, scrub=0):
"""Creates a file store object for a given directory, 'location'. If
'create' is true, creates the directory if necessary. If 'scrub'
is true, removes any incomplete or invalidated messages from the
store."""
secureDelete([]) # Make sure secureDelete is configured. HACK!
self._lock = threading.RLock()
self.dir = location
if not os.path.isabs(location):
LOG.warn("Directory path %s isn't absolute.", location)
if os.path.exists(location) and not os.path.isdir(location):
raise MixFatalError("%s is not a directory" % location)
createPrivateDir(location, nocreate=(not create))
if scrub:
self.cleanQueue()
# Count messages on first time through.
self.n_entries = -1
def lock(self):
"""Prevent access to this filestore from other threads."""
self._lock.acquire()
def unlock(self):
"""Release the lock on this filestore."""
self._lock.release()
def count(self, recount=0):
"""Returns the number of complete messages in the filestore."""
try:
self._lock.acquire()
if self.n_entries >= 0 and not recount:
return self.n_entries
else:
res = 0
for fn in os.listdir(self.dir):
if fn.startswith("msg_"):
res += 1
self.n_entries = res
return res
finally:
self._lock.release()
def pickRandom(self, count=None):
"""Returns a list of 'count' handles to messages in this filestore.
The messages are chosen randomly, and returned in a random order.
If there are fewer than 'count' messages in the filestore,
all the messages will be included."""
handles = self.getAllMessages() # handles locking
return getCommonPRNG().shuffle(handles, count)
def getAllMessages(self):
"""Returns handles for all messages currently in the filestore.
Note: this ordering is not guaranteed to be random."""
self._lock.acquire()
hs = [fn[4:] for fn in os.listdir(self.dir) if fn.startswith("msg_")]
self._lock.release()
return hs
def removeMessage(self, handle):
"""Given a handle, removes the corresponding message from the
filestore. """
self._changeState(handle, "msg", "rmv") # handles locking.
def removeAll(self, secureDeleteFn=None):
"""Removes all messages from this filestore."""
try:
self._lock.acquire()
for m in os.listdir(self.dir):
if m[:4] in ('inp_', 'msg_'):
self._changeState(m[4:], m[:3], "rmv")
elif m[:4] in ('inpm_', 'meta_'):
self._changeState(m[5:], m[:4], "rmvm")
self.n_entries = 0
self.cleanQueue(secureDeleteFn)
finally:
self._lock.release()
def moveMessage(self, handle, other):
"""Given a handle and a queue, moves the corresponding message
from this filestore to the filestore provided. Returns a
new handle for the message in the destination queue."""
# Since we're switching handles, we don't want to just rename;
# We really want to copy and delete the old file.
try:
self._lock.acquire()
newHandle = other.queueMessage(self.messageContents(handle))
self.removeMessage(handle)
finally:
self._lock.release()
return newHandle
def getMessagePath(self, handle):
"""Given a handle for an existing message, return the name of the
file that contains that message."""
# We don't need to lock here: the handle is still valid, or it isn't.
return os.path.join(self.dir, "msg_"+handle)
def openMessage(self, handle):
"""Given a handle for an existing message, returns a file descriptor
open to read that message."""
# We don't need to lock here; the handle is still valid, or it isn't.
return open(os.path.join(self.dir, "msg_"+handle), 'rb')
def openNewMessage(self):
"""Returns (file, handle) tuple to create a new message. Once
you're done writing, you must call finishMessage to
commit your changes, or abortMessage to reject them."""
file, handle = getCommonPRNG().openNewFile(self.dir, "inp_", 1)
return file, handle
def finishMessage(self, f, handle, ismeta=0):
"""Given a file and a corresponding handle, closes the file
commits the corresponding message."""
f.close()
if ismeta:
self._changeState(handle, "inpm", "meta")
else:
self._changeState(handle, "inp", "msg")
def abortMessage(self, f, handle, ismeta=0):
"""Given a file and a corresponding handle, closes the file
rejects the corresponding message."""
f.close()
if ismeta:
self._changeState(handle, "inpm", "rmvm")
else:
self._changeState(handle, "inp", "rmv")
def cleanQueue(self, secureDeleteFn=None):
"""Removes all timed-out or trash messages from the filestore.
If secureDeleteFn is provided, it is called with a list of
filenames to be removed. Otherwise, files are removed using
secureDelete.
Returns 1 if a clean is already in progress; otherwise
returns 0.
"""
# We don't need to hold the lock here; we synchronize via the
# filesystem.
rmv = []
allowedTime = int(time.time()) - INPUT_TIMEOUT
for m in os.listdir(self.dir):
if m.startswith("rmv_") or m.startswith("rmvm_"):
rmv.append(os.path.join(self.dir, m))
elif m.startswith("inp_"):
try:
s = os.stat(m)
if s[stat.ST_MTIME] < allowedTime:
self._changeState(m[4:], "inp", "rmv")
rmv.append(os.path.join(self.dir, m))
except OSError:
pass
if secureDeleteFn:
secureDeleteFn(rmv)
else:
secureDelete(rmv, blocking=1)
return 0
def _changeState(self, handle, s1, s2):
"""Helper method: changes the state of message 'handle' from 's1'
to 's2', and changes the internal count."""
try:
self._lock.acquire()
try:
os.rename(os.path.join(self.dir, s1+"_"+handle),
os.path.join(self.dir, s2+"_"+handle))
except OSError, e:
contents = os.listdir(self.dir)
LOG.error("Error while trying to change %s from %s to %s: %s",
handle, s1, s2, e)
LOG.error("Directory %s contains: %s", self.dir, contents)
self.count(1)
return
if self.n_entries < 0:
return
if s1 == 'msg' and s2 != 'msg':
self.n_entries -= 1
elif s1 != 'msg' and s2 == 'msg':
self.n_entries += 1
finally:
self._lock.release()
class StringStoreMixin:
def __init__(self): pass
def messageContents(self, handle):
"""Given a message handle, returns the contents of the corresponding
message."""
try:
self._lock.acquire()
return readFile(os.path.join(self.dir, "msg_"+handle), 1)
finally:
self._lock.release()
def queueMessage(self, contents):
"""Creates a new message in the filestore whose contents are
'contents', and returns a handle to that message."""
f, handle = self.openNewMessage()
f.write(contents)
self.finishMessage(f, handle) # handles locking
return handle
class ObjectStoreMixin:
def __init__(self): pass
def getObject(self, handle):
"""Given a message handle, read and unpickle the contents of the
corresponding message."""
try:
self._lock.acquire()
f = open(os.path.join(self.dir, "msg_"+handle), 'rb')
res = cPickle.load(f)
f.close()
return res
finally:
self._lock.release()
def queueObject(self, object):
"""Queue an object using cPickle, and return a handle to that
object."""
f, handle = self.openNewMessage()
cPickle.dump(object, f, 1)
self.finishMessage(f, handle) # handles locking
return handle
class BaseMetadataStore(BaseStore):
def __init__(self, location, create=0, scrub=0):
BaseStore.__init__(self, location=location, create=create, scrub=scrub)
self._metadata_cache = {}
def loadAllMetadata(self, newDataFn):
try:
self._lock.acquire()
self._metadata_cache = {}
for h in self.getAllMessages():
try:
self.getMetadata(h)
except KeyError:
LOG.warn("Missing metadata for file %s",h)
self.setMetadata(h, newDataFn(h))
finally:
self._lock.release()
def getMetadata(self, handle):
fname = os.path.join(self.dir, "meta_"+handle)
if not os.path.exists(fname):
raise KeyError(handle)
try:
self._lock.acquire()
try:
return self._metadata_cache[handle]
except KeyError:
pass
f = open(fname, 'rb')
res = cPickle.load(f)
f.close()
self._metadata_cache[handle] = res
return res
finally:
self._lock.release()
def setMetadata(self, handle, object):
"""DOCDOC"""
try:
self._lock.acquire()
fname = os.path.join(self.dir, "inpm_"+handle)
f = os.fdopen(os.open(fname, _NEW_FILE_FLAGS, 0600), "wb")
cPickle.dump(object, f, 1)
self.finishMessage(f, handle, ismeta=1)
self._metadata_cache[handle] = object
return handle
finally:
self._lock.release()
def removeMessage(self, handle):
try:
self._lock.acquire()
BaseStore.removeMessage(self, handle)
if os.path.exists(os.path.join(self.dir, "meta_"+handle)):
self._changeState(handle, "meta", "rmvm")
try:
del self._metadata_cache[handle]
except KeyError:
pass
finally:
self._lock.release()
class StringStore(BaseStore, StringStoreMixin):
def __init__(self, location, create=0, scrub=0):
BaseStore.__init__(self, location, create, scrub)
StringStoreMixin.__init__(self)
class StringMetadataStore(BaseMetadataStore, StringStoreMixin):
def __init__(self, location, create=0, scrub=0):
BaseMetadataStore.__init__(self, location, create, scrub)
StringStoreMixin.__init__(self)
class ObjectStore(BaseStore, ObjectStoreMixin):
def __init__(self, location, create=0, scrub=0):
BaseStore.__init__(self, location, create, scrub)
ObjectStoreMixin.__init__(self)
class ObjectMetadataStore(BaseMetadataStore, ObjectStoreMixin):
def __init__(self, location, create=0, scrub=0):
BaseMetadataStore.__init__(self, location, create, scrub)
ObjectStoreMixin.__init__(self)
class MixedStore(BaseStore, StringStoreMixin, ObjectStoreMixin):
def __init__(self, location, create=0, scrub=0):
BaseStore.__init__(self, location, create, scrub)
StringStoreMixin.__init__(self)
ObjectStoreMixin.__init__(self)
class MixedMetadataStore(BaseMetadataStore, StringStoreMixin,
ObjectStoreMixin):
def __init__(self, location, create=0, scrub=0):
BaseMetadataStore.__init__(self, location, create, scrub)
StringStoreMixin.__init__(self)
ObjectStoreMixin.__init__(self)