[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] Merge fixes from maint branch
Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.mit.edu:/tmp/cvs-serv28051/lib/mixminion
Modified Files:
ClientMain.py Filestore.py
Log Message:
Merge fixes from maint branch
Index: ClientMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ClientMain.py,v
retrieving revision 1.115
retrieving revision 1.116
diff -u -d -r1.115 -r1.116
--- ClientMain.py 24 Sep 2003 08:17:27 -0000 1.115
+++ ClientMain.py 28 Sep 2003 04:12:29 -0000 1.116
@@ -1215,7 +1215,8 @@
def getPacket(self, handle):
"""Given a handle, return a 3-tuple of the corresponding
32K packet, IPV4Info, and time of first queueing. (The time
- is rounded down to the closest midnight GMT.)"""
+ is rounded down to the closest midnight GMT.) May raise
+ CorruptedFile."""
obj = self.store.getObject(handle)
try:
magic, message, routing, when = obj
@@ -1247,7 +1248,10 @@
return
timesByServer = {}
for h in handles:
- _, routing, when = self.getPacket(h)
+ try:
+ _, routing, when = self.getPacket(h)
+ except mixminion.Filestore.CorruptedFile:
+ continue
timesByServer.setdefault(routing, []).append(when)
for s in timesByServer.keys():
count = len(timesByServer[s])
@@ -1266,7 +1270,10 @@
cutoff = now - maxAge
remove = []
for h in self.getHandles():
- when = self.getPacket(h)[2]
+ try:
+ when = self.getPacket(h)[2]
+ except mixminion.Filestore.CorruptedFile:
+ continue
if when < cutoff:
remove.append(h)
LOG.info("Removing %s old messages from queue", len(remove))
@@ -1524,7 +1531,10 @@
LOG.info("Flushing %s", len(handles))
messagesByServer = {}
for h in handles:
- message, routing, _ = self.queue.getPacket(h)
+ try:
+ message, routing, _ = self.queue.getPacket(h)
+ except mixminion.Filestore.CorruptedFile:
+ continue
messagesByServer.setdefault(routing, []).append((message, h))
finally:
clientUnlock()
Index: Filestore.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Filestore.py,v
retrieving revision 1.11
retrieving revision 1.12
diff -u -d -r1.11 -r1.12
--- Filestore.py 26 Sep 2003 02:17:38 -0000 1.11
+++ Filestore.py 28 Sep 2003 04:12:29 -0000 1.12
@@ -23,16 +23,20 @@
import threading
import time
-from mixminion.Common import MixFatalError, secureDelete, LOG, \
+from mixminion.Common import MixError, MixFatalError, secureDelete, LOG, \
createPrivateDir, readFile, tryUnlink
from mixminion.Crypto import getCommonPRNG
__all__ = [ "StringStore", "StringMetadataStore",
"ObjectStore", "ObjectMetadataStore",
"MixedStore", "MixedMetadataStore",
- "DBBase", "JournaledDBBase", "BooleanJournaledDBBase"
+ "DBBase", "JournaledDBBase", "BooleanJournaledDBBase",
+ "CorruptedFile",
]
+class CorruptedFile(MixError):
+ """Raised when a pickled object cannot be properly decoded."""
+ pass
# ======================================================================
# Filestores.
@@ -65,6 +69,7 @@
rmv_HANDLE (A message waiting to be deleted)
msg_HANDLE (A message waiting in the queue.
inp_HANDLE (An incomplete message being created.)
+ crp_HANDLE (A corrupted message awaiting debugging analysis)
(Where HANDLE is a randomly chosen 8-character string of characters
chosen from 'A-Za-z0-9+-'. [Collision probability is negligible, and
collisions are detected.])
@@ -73,6 +78,7 @@
rmvm_HANDLE
meta_HANDLE
inpm_HANDLE
+ crpm_HANDLE
Threading notes: Although BaseStore itself is threadsafe, you'll want
to synchronize around any multistep operations that you want to
@@ -161,10 +167,17 @@
'handle'."""
return os.path.exists(os.path.join(self.dir, "msg_"+handle))
+ def _doRemove(self, handle, newState):
+ self._changeState(handle, "msg", newState)
+
+ def _preserveCorrupted(self, handle):
+ """Given a handle, change the message state to 'crp'."""
+ self._doRemove(handle, "crp")
+
def removeMessage(self, handle):
"""Given a handle, removes the corresponding message from the
filestore. """
- self._changeState(handle, "msg", "rmv") # handles locking.
+ self._doRemove(handle, "rmv") # handles locking.
def removeAll(self, secureDeleteFn=None):
"""Removes all messages from this filestore."""
@@ -309,21 +322,22 @@
"""
def __init__(self): pass
def getObject(self, handle):
- """Given a message handle, read and unpickle the contents of the
- corresponding message. In rare error cases, defaults to 'None'.
+ """Given a message handle, read and unpickle the contents of
+ the corresponding message. In rare error cases, raises
+ CorruptedFile.
"""
try:
self._lock.acquire()
f = open(os.path.join(self.dir, "msg_"+handle), 'rb')
try:
res = cPickle.load(f)
+ f.close()
+ return res
except cPickle.UnpicklingError, e:
LOG.error("Found damaged object %s in filestore %s: %s",
handle, self.dir, str(e))
- self.removeMessage(handle)
- res = None
- f.close()
- return res
+ self._preserveCorrupted(handle)
+ raise CorruptedFile()
finally:
self._lock.release()
@@ -389,11 +403,14 @@
except KeyError:
LOG.warn("Missing metadata for file %s",h)
self.setMetadata(h, newDataFn(h))
+ except CorruptedFile:
+ continue
finally:
self._lock.release()
def getMetadata(self, handle):
- """Return the metadata associated with a given handle."""
+ """Return the metadata associated with a given handle. If the
+ metadata is damaged, may raise CorruptedFile."""
fname = os.path.join(self.dir, "meta_"+handle)
if not os.path.exists(fname):
raise KeyError(handle)
@@ -409,8 +426,8 @@
except cPickle.UnpicklingError, e:
LOG.error("Found damaged metadata for %s in filestore %s: %s",
handle, self.dir, str(e))
- self.removeMessage(handle)
- return None
+ self._preserveCorrupted(handle)
+ raise CorruptedFile()
f.close()
self._metadata_cache[handle] = res
return res
@@ -430,16 +447,14 @@
finally:
self._lock.release()
- def removeMessage(self, handle):
- """Given a handle, removes the corresponding message from the
- filestore. """
+ def _doRemove(self, handle, newState):
try:
self._lock.acquire()
# Remove the message before the metadata, so we don't have
# a message without metadata.
- BaseStore.removeMessage(self, handle)
+ BaseStore._doRemove(self, handle, newState)
if os.path.exists(os.path.join(self.dir, "meta_"+handle)):
- self._changeState(handle, "meta", "rmvm")
+ self._changeState(handle, "meta", newState+"m")
try:
del self._metadata_cache[handle]