[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] Merge fixes from maint branch



Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.mit.edu:/tmp/cvs-serv28051/lib/mixminion

Modified Files:
	ClientMain.py Filestore.py 
Log Message:
Merge fixes from maint branch

Index: ClientMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ClientMain.py,v
retrieving revision 1.115
retrieving revision 1.116
diff -u -d -r1.115 -r1.116
--- ClientMain.py	24 Sep 2003 08:17:27 -0000	1.115
+++ ClientMain.py	28 Sep 2003 04:12:29 -0000	1.116
@@ -1215,7 +1215,8 @@
     def getPacket(self, handle):
         """Given a handle, return a 3-tuple of the corresponding
            32K packet, IPV4Info, and time of first queueing.  (The time
-           is rounded down to the closest midnight GMT.)"""
+           is rounded down to the closest midnight GMT.)  May raise 
+           CorruptedFile."""
         obj = self.store.getObject(handle)
         try:
             magic, message, routing, when = obj
@@ -1247,7 +1248,10 @@
             return
         timesByServer = {}
         for h in handles:
-            _, routing, when = self.getPacket(h)
+            try:
+                _, routing, when = self.getPacket(h)
+            except mixminion.Filestore.CorruptedFile:
+                continue
             timesByServer.setdefault(routing, []).append(when)
         for s in timesByServer.keys():
             count = len(timesByServer[s])
@@ -1266,7 +1270,10 @@
         cutoff = now - maxAge
         remove = []
         for h in self.getHandles():
-            when = self.getPacket(h)[2]
+            try:
+                when = self.getPacket(h)[2]
+            except mixminion.Filestore.CorruptedFile:
+                continue
             if when < cutoff:
                 remove.append(h)
         LOG.info("Removing %s old messages from queue", len(remove))
@@ -1524,7 +1531,10 @@
             LOG.info("Flushing %s", len(handles))
             messagesByServer = {}
             for h in handles:
-                message, routing, _ = self.queue.getPacket(h)
+                try:
+                    message, routing, _ = self.queue.getPacket(h)
+                except mixminion.Filestore.CorruptedFile: 
+                    continue
                 messagesByServer.setdefault(routing, []).append((message, h))
         finally:
             clientUnlock()

Index: Filestore.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Filestore.py,v
retrieving revision 1.11
retrieving revision 1.12
diff -u -d -r1.11 -r1.12
--- Filestore.py	26 Sep 2003 02:17:38 -0000	1.11
+++ Filestore.py	28 Sep 2003 04:12:29 -0000	1.12
@@ -23,16 +23,20 @@
 import threading
 import time
 
-from mixminion.Common import MixFatalError, secureDelete, LOG, \
+from mixminion.Common import MixError, MixFatalError, secureDelete, LOG, \
      createPrivateDir, readFile, tryUnlink
 from mixminion.Crypto import getCommonPRNG
 
 __all__ = [ "StringStore", "StringMetadataStore",
             "ObjectStore", "ObjectMetadataStore", 
             "MixedStore", "MixedMetadataStore",
-            "DBBase", "JournaledDBBase", "BooleanJournaledDBBase"
+            "DBBase", "JournaledDBBase", "BooleanJournaledDBBase",
+            "CorruptedFile",
             ]
 
+class CorruptedFile(MixError):
+    """Raised when a pickled object cannot be properly decoded."""
+    pass
 
 # ======================================================================
 # Filestores.
@@ -65,6 +69,7 @@
              rmv_HANDLE  (A message waiting to be deleted)
              msg_HANDLE  (A message waiting in the queue.
              inp_HANDLE  (An incomplete message being created.)
+             crp_HANDLE  (A corrupted message awaiting debugging analysis)
        (Where HANDLE is a randomly chosen 8-character string of characters
        chosen from 'A-Za-z0-9+-'.  [Collision probability is negligible, and
        collisions are detected.])
@@ -73,6 +78,7 @@
              rmvm_HANDLE
              meta_HANDLE
              inpm_HANDLE
+             crpm_HANDLE
  
        Threading notes:  Although BaseStore itself is threadsafe, you'll want
        to synchronize around any multistep operations that you want to
@@ -161,10 +167,17 @@
            'handle'."""
         return os.path.exists(os.path.join(self.dir, "msg_"+handle))
 
+    def _doRemove(self, handle, newState):
+        self._changeState(handle, "msg", newState)
+        
+    def _preserveCorrupted(self, handle):
+        """Given a handle, change the message state to 'crp'."""
+        self._doRemove(handle, "crp")
+
     def removeMessage(self, handle):
         """Given a handle, removes the corresponding message from the
            filestore.  """
-        self._changeState(handle, "msg", "rmv") # handles locking.
+        self._doRemove(handle, "rmv") # handles locking.
 
     def removeAll(self, secureDeleteFn=None):
         """Removes all messages from this filestore."""
@@ -309,21 +322,22 @@
     """
     def __init__(self): pass
     def getObject(self, handle):
-        """Given a message handle, read and unpickle the contents of the
-           corresponding message.  In rare error cases, defaults to 'None'.
+        """Given a message handle, read and unpickle the contents of
+           the corresponding message.  In rare error cases, raises
+           CorruptedFile.
            """
         try:
             self._lock.acquire()
             f = open(os.path.join(self.dir, "msg_"+handle), 'rb')
             try:
                 res = cPickle.load(f)
+                f.close()
+                return res
             except cPickle.UnpicklingError, e:
                 LOG.error("Found damaged object %s in filestore %s: %s",
                           handle, self.dir, str(e))
-                self.removeMessage(handle)
-                res = None
-            f.close()
-            return res
+                self._preserveCorrupted(handle)
+                raise CorruptedFile()
         finally:
             self._lock.release()
 
@@ -389,11 +403,14 @@
                 except KeyError:
                     LOG.warn("Missing metadata for file %s",h)
                     self.setMetadata(h, newDataFn(h))
+                except CorruptedFile:
+                    continue
         finally:
             self._lock.release()
 
     def getMetadata(self, handle):
-        """Return the metadata associated with a given handle."""
+        """Return the metadata associated with a given handle.  If the
+           metadata is damaged, may raise CorruptedFile."""
         fname = os.path.join(self.dir, "meta_"+handle)
         if not os.path.exists(fname):
             raise KeyError(handle)
@@ -409,8 +426,8 @@
             except cPickle.UnpicklingError, e:
                 LOG.error("Found damaged metadata for %s in filestore %s: %s",
                           handle, self.dir, str(e))
-                self.removeMessage(handle)
-                return None
+                self._preserveCorrupted(handle)
+                raise CorruptedFile()
             f.close()
             self._metadata_cache[handle] = res
             return res
@@ -430,16 +447,14 @@
         finally:
             self._lock.release()
 
-    def removeMessage(self, handle):
-        """Given a handle, removes the corresponding message from the
-           filestore.  """
+    def _doRemove(self, handle, newState):
         try:
             self._lock.acquire()
             # Remove the message before the metadata, so we don't have
             # a message without metadata.
-            BaseStore.removeMessage(self, handle)
+            BaseStore._doRemove(self, handle, newState)
             if os.path.exists(os.path.join(self.dir, "meta_"+handle)):
-                self._changeState(handle, "meta", "rmvm")
+                self._changeState(handle, "meta", newState+"m")
 
             try:
                 del self._metadata_cache[handle]