[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] Maint branch: change Filestore to raise an exception on...



Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv27233/lib/mixminion/server

Modified Files:
      Tag: mixminion-v0-0-5-patches
	MMTPServer.py Modules.py ServerMain.py ServerQueue.py 
Log Message:
Maint branch: change Filestore to raise an exception on failed cPickle.load.

Change users of Filestore to respond appropriately.


Index: MMTPServer.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/MMTPServer.py,v
retrieving revision 1.50
retrieving revision 1.50.2.1
diff -u -d -r1.50 -r1.50.2.1
--- MMTPServer.py	4 Sep 2003 16:11:22 -0000	1.50
+++ MMTPServer.py	28 Sep 2003 03:57:33 -0000	1.50.2.1
@@ -33,6 +33,7 @@
 from mixminion.Packet import MESSAGE_LEN, DIGEST_LEN
 from mixminion.MMTPClient import PeerCertificateCache
 import mixminion.server.EventStats as EventStats
+from mixminion.Filestore import CorruptedFile
 
 __all__ = [ 'AsyncServer', 'ListenConnection', 'MMTPServerConnection',
             'MMTPClientConnection' ]
@@ -928,11 +929,11 @@
 
     def beginNextMessage(self):
         """Start writing a message to the connection."""
-        if not self.messageList:
+        self._getNextMessage()
+        if not self._curMessage:
             self.shutdown(0)
             return
-        
-        self._getNextMessage()
+
         msg = self._curMessage
         if msg == 'RENEGOTIATE':
             self.finished = self.beginNextMessage
@@ -958,14 +959,21 @@
     def _getNextMessage(self):
         """Helper function: pull the next _curHandle, _curMessage pair from
            self.messageList."""
-        m = self.messageList[0]
-        del self.messageList[0]
-        if hasattr(m, 'getContents'):
-            self._curHandle = m
-            self._curMessage = m.getContents()
-        else:
-            self._curHandle = None
-            self._curMessage = m
+        while self.messageList:
+            m = self.messageList[0]
+            del self.messageList[0]
+            if hasattr(m, 'getContents'):
+                self._curHandle = m
+                try:
+                    self._curMessage = m.getContents()
+                except CorruptedFile:
+                    pass
+                return
+            else:
+                self._curHandle = None
+                self._curMessage = m
+                return
+        self._curHandle = self._curMessage = None
 
     def __sentMessage(self):
         """Called when we're done sending a message.  Begins reading the

Index: Modules.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Modules.py,v
retrieving revision 1.54.2.1
retrieving revision 1.54.2.2
diff -u -d -r1.54.2.1 -r1.54.2.2
--- Modules.py	12 Sep 2003 15:35:47 -0000	1.54.2.1
+++ Modules.py	28 Sep 2003 03:57:33 -0000	1.54.2.2
@@ -197,7 +197,10 @@
             try:
                 dh = handle.getHandle() # display handle
                 EventStats.log.attemptedDelivery() #FFFF
-                packet = handle.getMessage()
+                try:
+                    packet = handle.getMessage()
+                except mixminion.Filestore.CorruptedFile:
+                    continue
                 result = self.module.processMessage(packet)
                 if result == DELIVER_OK:
                     LOG.debug("Successfully delivered message MOD:%s", dh)

Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.92
retrieving revision 1.92.2.1
diff -u -d -r1.92 -r1.92.2.1
--- ServerMain.py	31 Aug 2003 19:29:29 -0000	1.92
+++ ServerMain.py	28 Sep 2003 03:57:33 -0000	1.92.2.1
@@ -257,7 +257,10 @@
                   self.queue.count(), len(handles))
 
         for h in handles:
-            packet = self.queue.getObject(h)
+            try:
+                packet = self.queue.getObject(h)
+            except mixminion.Filestore.CorruptedFile:
+                continue
             if packet.isDelivery():
                 h2 = self.moduleManager.queueDecodedMessage(packet)
                 if h2:
@@ -315,9 +318,12 @@
         # Map from addr -> [ (handle, msg) ... ]
         msgs = {}
         for pending in msgList:
-            addr = pending.getAddress()
-            if addr is None:
-                addr = pending.getMessage().getAddress()
+            try:
+                addr = pending.getAddress()
+                if addr is None:
+                    addr = pending.getMessage().getAddress()
+            except mixminion.Filestore.CorruptedFile:
+                continue
             msgs.setdefault(addr, []).append(pending)
         for addr, messages in msgs.items():
             if self.addr[:2] == (addr.ip, addr.port):

Index: ServerQueue.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerQueue.py,v
retrieving revision 1.33
retrieving revision 1.33.2.1
diff -u -d -r1.33 -r1.33.2.1
--- ServerQueue.py	31 Aug 2003 19:29:29 -0000	1.33
+++ ServerQueue.py	28 Sep 2003 03:57:33 -0000	1.33.2.1
@@ -18,6 +18,7 @@
 from mixminion.Common import MixError, MixFatalError, secureDelete, LOG, \
      createPrivateDir, readPickled, writePickled, formatTime, readFile
 from mixminion.Crypto import getCommonPRNG
+from mixminion.Filestore import CorruptedFile
 
 __all__ = [ 'DeliveryQueue', 'TimedMixPool', 'CottrellMixPool',
             'BinomialCottrellMixPool' ]
@@ -174,11 +175,9 @@
 
     def getMessage(self):
         """Return the underlying object stored in the delivery queue, loading
-           it from disk if necessary."""
+           it from disk if necessary. May raise CorruptedFile."""
         if self.message is None:
             self.message = self.queue.store.getObject(self.handle)
-            #XXXX There's an error case where getObject returns none
-            #XXXX if the data is corrupt on disk.
         return self.message
 
 class DeliveryQueue:
@@ -358,10 +357,11 @@
             self._lock.acquire()
             messages = []
             for h in self.store._metadata_cache.keys():
-                state = self.store.getMetadata(h)
-                if state is None:
+                try:
+                    state = self.store.getMetadata(h)
+                except CorruptedFile:
                     continue
-                elif state.isPending():
+                if state.isPending():
                     LOG.trace("     [%s] is pending delivery", h)
                     continue
                 elif state and state.isRemovable():
@@ -438,6 +438,8 @@
                 ds = self.store.getMetadata(handle)
             except KeyError:
                 ds = None
+            except CorruptedFile:
+                return
 
             if ds is None:
                 # This should never happen