[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[minion-cvs] Make client pooling work; make client MMTP errors more ...



Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.mit.edu:/tmp/cvs-serv19364/lib/mixminion

Modified Files:
	ClientMain.py Crypto.py MMTPClient.py Main.py Packet.py 
	test.py 
Log Message:
Make client pooling work; make client MMTP errors more consistent; refactor a bit

Index: ClientMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ClientMain.py,v
retrieving revision 1.50
retrieving revision 1.51
diff -u -d -r1.50 -r1.51
--- ClientMain.py	9 Feb 2003 22:30:58 -0000	1.50
+++ ClientMain.py	11 Feb 2003 22:18:03 -0000	1.51
@@ -15,7 +15,6 @@
 import getopt
 import getpass
 import os
-import socket
 import stat
 import sys
 import time
@@ -26,11 +25,11 @@
 import mixminion.Crypto
 import mixminion.MMTPClient
 from mixminion.Common import IntervalSet, LOG, floorDiv, MixError, \
-     MixFatalError, ceilDiv, createPrivateDir, isSMTPMailbox, formatDate, \
-     formatFnameTime, formatTime, Lockfile, openUnique, previousMidnight, \
-     readPossiblyGzippedFile, secureDelete, stringContains, succeedingMidnight
+     MixFatalError, MixProtocolError, ceilDiv, createPrivateDir, \
+     isSMTPMailbox, formatDate, formatFnameTime, formatTime, Lockfile, \
+     openUnique, previousMidnight, readPossiblyGzippedFile, secureDelete, \
+     stringContains, succeedingMidnight
 from mixminion.Crypto import sha1, ctr_crypt, trng
-
 from mixminion.Config import ClientConfig, ConfigError
 from mixminion.ServerInfo import ServerInfo, ServerDirectory
 from mixminion.Packet import ParseError, parseMBOXInfo, parseReplyBlocks, \
@@ -1035,16 +1034,16 @@
         handles = self.getHandles()
         timesByServer = {}
         for h in handles:
-            _, server, when = self.getPacket(h)
-            timesByServer.setdefault(server, []).append(when)
+            _, routing, when = self.getPacket(h)
+            timesByServer.setdefault(routing, []).append(when)
         for s in timesByServer.keys():
             count = len(timesByServer[s])
             oldest = min(timesByServer[s])
-            days = (now - oldest) / (24*60*60)
+            days = floorDiv(now - oldest, 24*60*60)
             if days < 1:
                 days = "<1"
-            print "%2d messages for server %s:%s (oldest is %s days old)"%(
-                count, s.getAddr(), s.getPort(), days)
+            print "%2d messages for server at %s:%s (oldest is %s days old)"%(
+                count, s.ip, s.port, days)
 
 class MixminionClient:
     """Access point for client functionality.  Currently, this is limited
@@ -1083,10 +1082,12 @@
                  self.generateForwardMessage(address, payload,
                                              servers1, servers2)
 
+        routing = firstHop.getRoutingInfo()
+
         if forcePool:
-            self.poolMessages([message], firstHop)
+            self.poolMessages([message], routing)
         else:
-            self.sendMessages([message], firstHop, noPool=forceNoPool)
+            self.sendMessages([message], routing, noPool=forceNoPool)
 
     def sendReplyMessage(self, payload, servers, surbList, forcePool=0,
                          forceNoPool=0):
@@ -1096,11 +1097,13 @@
         #XXXX003 testme
         message, firstHop = \
                  self.generateReplyMessage(payload, servers, surbList)
+
+        routing = firstHop.getRoutingInfo()
         
         if forcePool:
-            self.poolMessages([message], firstHop)
+            self.poolMessages([message], routing)
         else:
-            self.sendMessages([message], firstHop, noPool=forceNoPool)
+            self.sendMessages([message], routing, noPool=forceNoPool)
 
 
     def generateReplyBlock(self, address, servers, expiryTime=0):
@@ -1170,10 +1173,13 @@
             surbLog.close()
             clientUnlock()
 
-    def sendMessages(self, msgList, server, noPool=0, lazyPool=0,
+    def sendMessages(self, msgList, routingInfo, noPool=0, lazyPool=0,
                      warnIfLost=1):
         """Given a list of packets and a ServerInfo object, sends the
-           packets to the server via MMTP"""
+           packets to the server via MMTP
+
+           DOCDOC ServerInfo or IPV4Info...
+           """
         #XXXX003 testme
         LOG.info("Connecting...")
         timeout = self.config['Network'].get('ConnectionTimeout')
@@ -1183,21 +1189,23 @@
         if noPool or lazyPool: 
             handles = []
         else:
-            handles = self.poolMessages(msgList, server)
+            handles = self.poolMessages(msgList, routingInfo)
 
         try:
             try:
                 # May raise TimeoutError
-                mixminion.MMTPClient.sendMessages(server.getAddr(),
-                                                  server.getPort(),
-                                                  server.getKeyID(),
+                mixminion.MMTPClient.sendMessages(routingInfo,
                                                   msgList,
                                                   timeout)
             except:
                 if noPool and warnIfLost:
                     LOG.error("Error with pooling disabled: message lost")
                 elif lazyPool:
-                    self.poolMessages(msgList, server)
+                    self.poolMessages(msgList, routingInfo)
+                    #XXXX003 Log that error occurred, but is okay.
+                else:
+                    #XXXX003 Log that error occurred, but is okay.
+                    pass
                 raise
             try:
                 clientLock()
@@ -1206,8 +1214,8 @@
                         self.pool.removePacket(h)
             finally:
                 clientUnlock()
-        except socket.error, e:
-            raise MixError("Error sending packets: %s" % e)
+        except MixProtocolError, e:
+            raise UIError(str(e))
             
     def flushPool(self):
         """
@@ -1222,19 +1230,19 @@
             LOG.info("Found %s pending messages", len(handles))
             messagesByServer = {}
             for h in handles:
-                message, server, _ = self.pool.getPacket(h)
-                messagesByServer.setdefault(server, []).append((message, h))
+                message, routing, _ = self.pool.getPacket(h)
+                messagesByServer.setdefault(routing, []).append((message, h))
         finally:
             clientUnlock()
             
-        for server in messagesByServer.keys():
-            LOG.debug("Sending %s messages to %s...",
-                      len(messagesByServer[server]), server.getAddr())
-            msgs = [ m for m, _ in messagesByServer[server] ]
-            handles = [ h for _, h in messagesByServer[server] ] 
+        for routing in messagesByServer.keys():
+            LOG.info("Sending %s messages to %s:%s...",
+                     len(messagesByServer[routing]), routing.ip, routing.port)
+            msgs = [ m for m, _ in messagesByServer[routing] ]
+            handles = [ h for _, h in messagesByServer[routing] ] 
             try:
-                self.sendMessages(msgs, server, noPool=1, warnIfLost=0)
-                LOG.debug("... messages sent.")
+                self.sendMessages(msgs, routing, noPool=1, warnIfLost=0)
+                LOG.info("... messages sent.")
                 try:
                     clientLock()
                     for h in handles:
@@ -1244,10 +1252,10 @@
                     clientUnlock()
             except MixError:
                 LOG.error("Can't deliver messages to %s:%s; leaving in pool",
-                          server.getAddr(), server.getPort())
+                          routing.ip, routing.port)
         LOG.info("Pool flushed")
 
-    def poolMessages(self, msgList, server):
+    def poolMessages(self, msgList, routing):
         """
         DOCDOC
         """
@@ -1257,11 +1265,14 @@
         try:
             clientLock()
             for msg in msgList:
-                h = self.pool.poolPacket(msg, server)
+                h = self.pool.poolPacket(msg, routing)
                 handles.append(h)
         finally:
             clientUnlock()
-        LOG.trace("Messages pooled")
+        if len(msgList) > 1:
+            LOG.info("Messages pooled")
+        else:
+            LOG.info("Message pooled")
         return handles
 
     def decodeMessage(self, s, force=0):
@@ -1486,7 +1497,15 @@
                 else:
                     LOG.setMinSeverity("INFO")
             mixminion.Common.configureShredCommand(self.config)
-            mixminion.Crypto.init_crypto(self.config)
+            if not self.verbose:
+                try:
+                    LOG.setMinSeverity("WARN")
+                    mixminion.Crypto.init_crypto(self.config)
+                finally:
+                    LOG.setMinSeverity("INFO")
+            else:
+                mixminion.Crypto.init_crypto(self.config)
+                
             userdir = os.path.expanduser(self.config['User']['UserDir'])
             configureClientLock(os.path.join(userdir, "lock"))
         else:
@@ -1725,8 +1744,6 @@
     else:
         client.sendForwardMessage(address, payload, path1, path2,
                                   forcePool, forceNoPool)
-
-    LOG.info("Message sent")
 
 _IMPORT_SERVER_USAGE = """\
 Usage: %s [options] <filename> ...

Index: Crypto.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Crypto.py,v
retrieving revision 1.39
retrieving revision 1.40
diff -u -d -r1.39 -r1.40
--- Crypto.py	5 Feb 2003 05:34:55 -0000	1.39
+++ Crypto.py	11 Feb 2003 22:18:10 -0000	1.40
@@ -591,6 +591,7 @@
                 # If the file exists (a rare event!) we pass through, and
                 # try again.  This paranoia is brought to you by user
                 # request. :)
+        raise MixFatalError("Unreachable") # appease pychecker.
 
     def _prng(self, n):
         """Abstract method: Must be overridden to return n bytes of fresh

Index: MMTPClient.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/MMTPClient.py,v
retrieving revision 1.21
retrieving revision 1.22
diff -u -d -r1.21 -r1.22
--- MMTPClient.py	9 Feb 2003 22:30:58 -0000	1.21
+++ MMTPClient.py	11 Feb 2003 22:18:13 -0000	1.22
@@ -17,6 +17,8 @@
 
 __all__ = [ "BlockingClientConnection", "sendMessages" ]
 
+#DOCDOC MixProtocolError pattern
+
 import errno
 import signal
 import socket
@@ -57,6 +59,23 @@
         self.sock = None
 
     def connect(self, connectTimeout=None):
+        "DOCDOC"
+        try:
+            self._connect(connectTimeout)
+        except (socket.error, _ml.TLSError), e:
+            self._raise(e, "connecting")
+
+    def _raise(self, err, action):
+        if isinstance(err, socket.error):
+            tp = "Socket"
+        elif isinstance(err, _ml.TLSError):
+            tp = "TLS"
+        else:
+            tp = str(type(err))
+        raise MixProtocolError("%s error while %s to %s:%s: %s",
+             tp, action, self.targetIP, self.targetPort, err)
+
+    def _connect(self, connectTimeout=None):
         """Negotiate the handshake and protocol."""
         #DOCDOC connectTimeout
         # FFFF There should be a way to specify timeout for communication.
@@ -80,7 +99,7 @@
                 if e[0] == errno.EINTR:
                     raise TimeoutError("Connection timed out")
                 else:
-                    raise e
+                    raise MixProtocolError("Error connecting: %s" % e)
         finally:
             if connectTimeout:
                 signal.alarm(0)
@@ -122,8 +141,11 @@
         LOG.debug("MMTP protocol negotated: version %s", self.protocol)
 
     def renegotiate(self):
-        self.tls.renegotiate()
-        self.tls.do_handshake()
+        try:
+            self.tls.renegotiate()
+            self.tls.do_handshake()
+        except (socket.error, _ml.TLSError), e:
+            self._raise(e, "renegotiating connection")
 
     def sendPacket(self, packet,
                    control="SEND\r\n", serverControl="RECEIVED\r\n",
@@ -131,19 +153,22 @@
         """Send a single packet to a server."""
         assert len(packet) == 1<<15
         LOG.debug("Sending packet")
-        ##
-        # We write: "SEND\r\n", 28KB of data, and sha1(packet|"SEND").
-        self.tls.write(control)
-        self.tls.write(packet)
-        self.tls.write(sha1(packet+hashExtra))
-        LOG.debug("Packet sent; waiting for ACK")
-
-        # And we expect, "RECEIVED\r\n", and sha1(packet|"RECEIVED")
-        inp = self.tls.read(len(serverControl)+20)
-        if inp != serverControl+sha1(packet+serverHashExtra):
-            raise MixProtocolError("Bad ACK received")
-        LOG.debug("ACK received; packet successfully delivered")
+        try:
+            ##
+            # We write: "SEND\r\n", 28KB of data, and sha1(packet|"SEND").
+            self.tls.write(control)
+            self.tls.write(packet)
+            self.tls.write(sha1(packet+hashExtra))
+            LOG.debug("Packet sent; waiting for ACK")
 
+            # And we expect, "RECEIVED\r\n", and sha1(packet|"RECEIVED")
+            inp = self.tls.read(len(serverControl)+20)
+            if inp != serverControl+sha1(packet+serverHashExtra):
+                raise MixProtocolError("Bad ACK received")
+            LOG.debug("ACK received; packet successfully delivered")
+        except (socket.error, _ml.TLSError), e:
+            self._raise(e, "sending packet")
+            
     def sendJunkPacket(self, packet):
         if self.protocol == '0.1':
             LOG.debug("Not sending junk to a v0.1 server")
@@ -156,14 +181,16 @@
         """Close this connection."""
         LOG.debug("Shutting down connection to %s:%s",
                        self.targetIP, self.targetPort)
-        if self.tls is not None:
-            self.tls.shutdown()
-        if self.sock is not None:
-            self.sock.close()
+        try:
+            if self.tls is not None:
+                self.tls.shutdown()
+            if self.sock is not None:
+                self.sock.close()
+        except (socket.error, _ml.TLSError), e:
+            self._raise(e, "closing connection")
         LOG.debug("Connection closed")
 
-def sendMessages(targetIP, targetPort, targetKeyID, packetList,
-                 connectTimeout=None):
+def sendMessages(routing, packetList, connectTimeout=None, callback=None):
     """Sends a list of messages to a server.
 
        targetIP -- the address to connect to, in dotted-quad format.
@@ -173,6 +200,8 @@
        packetList -- a list of 32KB packets and control strings.  Control
            strings must be one of "JUNK" to send a 32KB padding chunk,
            or "RENEGOTIATE" to renegotiate the connection key.
+
+       DOCDOC args are wrong
     """
     # Generate junk before opening connection to avoid timing attacks
     packets = []
@@ -183,16 +212,19 @@
             packets.append(("RENEGOTIATE", None))
         else:
             packets.append(("MSG", p))
-    
-    con = BlockingClientConnection(targetIP, targetPort, targetKeyID)
+
+    con = BlockingClientConnection(routing.ip,routing.port,routing.keyinfo)
     try:
         con.connect(connectTimeout=connectTimeout)
-        for t,p in packets:
+        for idx in xrange(len(packets)):
+            t,p = packets[idx]
             if t == "JUNK":
                 con.sendJunkPacket(p)
             elif t == "RENEGOTIATE":
                 con.renegotiate()
             else:
                 con.sendPacket(p)
+            if callback is not None:
+                callback(idx)
     finally:
         con.shutdown()

Index: Main.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Main.py,v
retrieving revision 1.28
retrieving revision 1.29
diff -u -d -r1.28 -r1.29
--- Main.py	9 Feb 2003 22:30:58 -0000	1.28
+++ Main.py	11 Feb 2003 22:18:13 -0000	1.29
@@ -117,6 +117,7 @@
     "benchmarks" :     ( 'mixminion.benchmark',  'timeAll' ),
     "send" :           ( 'mixminion.ClientMain', 'runClient' ),
     "client" :         ( 'mixminion.ClientMain', 'runClient' ),
+    "pool" :           ( 'mixminion.ClientMain', 'runClient' ),
     "import-server" :  ( 'mixminion.ClientMain', 'importServer' ),
     "list-servers" :   ( 'mixminion.ClientMain', 'listServers' ),
     "update-servers" : ( 'mixminion.ClientMain', 'updateServers' ),

Index: Packet.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Packet.py,v
retrieving revision 1.29
retrieving revision 1.30
diff -u -d -r1.29 -r1.30
--- Packet.py	9 Feb 2003 22:30:58 -0000	1.29
+++ Packet.py	11 Feb 2003 22:18:13 -0000	1.30
@@ -556,11 +556,15 @@
         return "%s:%s (keyid=%s)"%(self.ip, self.port,
                                    binascii.b2a_hex(self.keyinfo))
 
+
     def pack(self):
         """Return the routing info for this address"""
         assert len(self.keyinfo) == DIGEST_LEN
         return struct.pack(IPV4_PAT, inet_aton(self.ip),
                            self.port, self.keyinfo)
+
+    def __repr__(self):
+        return "IPV4Info(%r, %r, %r)"%(self.ip, self.port, self.keyinfo)
 
     def __hash__(self):
         return hash(self.pack())

Index: test.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/test.py,v
retrieving revision 1.82
retrieving revision 1.83
diff -u -d -r1.82 -r1.83
--- test.py	9 Feb 2003 22:30:58 -0000	1.82
+++ test.py	11 Feb 2003 22:18:16 -0000	1.83
@@ -2855,9 +2855,10 @@
 
         # Send m1, then junk, then renegotiate, then m2.
         server.process(0.1)
+        routing = IPV4Info("127.0.0.1", TEST_PORT, keyid)
         t = threading.Thread(None,
                              mixminion.MMTPClient.sendMessages,
-                             args=("127.0.0.1", TEST_PORT, keyid,
+                             args=(routing,
                              [messages[0],"JUNK","RENEGOTIATE",messages[1]]))
         t.start()
         while len(messagesIn) < 2:
@@ -2871,11 +2872,12 @@
         self.assertEquals(1, server.nJunkPackets)
 
         # Now, with bad keyid.
+        routing = IPV4Info("127.0.0.1", TEST_PORT, "Z"*20)
         t = threading.Thread(None,
                              self.failUnlessRaises,
                              args=(MixProtocolError,
                                    mixminion.MMTPClient.sendMessages,
-                                   "127.0.0.1", TEST_PORT, "Z"*20, messages))
+                                   routing, messages))
         t.start()
         while t.isAlive():
             server.process(0.1)
@@ -2904,9 +2906,9 @@
         timedout = 0
         try:
             try:
-                mixminion.MMTPClient.sendMessages("127.0.0.1",
-                                                  TEST_PORT, "Z"*20, ["JUNK"],
-                                                  connectTimeout=1)
+                routing = IPV4Info("127.0.0.1", TEST_PORT, "Z"*20)
+                mixminion.MMTPClient.sendMessages(routing, ["JUNK"],
+                                                   connectTimeout=1)
                 timedout = 0
             except mixminion.MMTPClient.TimeoutError:
                 timedout = 1