[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[minion-cvs] Make client pooling work; make client MMTP errors more ...
Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.mit.edu:/tmp/cvs-serv19364/lib/mixminion
Modified Files:
ClientMain.py Crypto.py MMTPClient.py Main.py Packet.py
test.py
Log Message:
Make client pooling work; make client MMTP errors more consistent; refactor a bit
Index: ClientMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ClientMain.py,v
retrieving revision 1.50
retrieving revision 1.51
diff -u -d -r1.50 -r1.51
--- ClientMain.py 9 Feb 2003 22:30:58 -0000 1.50
+++ ClientMain.py 11 Feb 2003 22:18:03 -0000 1.51
@@ -15,7 +15,6 @@
import getopt
import getpass
import os
-import socket
import stat
import sys
import time
@@ -26,11 +25,11 @@
import mixminion.Crypto
import mixminion.MMTPClient
from mixminion.Common import IntervalSet, LOG, floorDiv, MixError, \
- MixFatalError, ceilDiv, createPrivateDir, isSMTPMailbox, formatDate, \
- formatFnameTime, formatTime, Lockfile, openUnique, previousMidnight, \
- readPossiblyGzippedFile, secureDelete, stringContains, succeedingMidnight
+ MixFatalError, MixProtocolError, ceilDiv, createPrivateDir, \
+ isSMTPMailbox, formatDate, formatFnameTime, formatTime, Lockfile, \
+ openUnique, previousMidnight, readPossiblyGzippedFile, secureDelete, \
+ stringContains, succeedingMidnight
from mixminion.Crypto import sha1, ctr_crypt, trng
-
from mixminion.Config import ClientConfig, ConfigError
from mixminion.ServerInfo import ServerInfo, ServerDirectory
from mixminion.Packet import ParseError, parseMBOXInfo, parseReplyBlocks, \
@@ -1035,16 +1034,16 @@
handles = self.getHandles()
timesByServer = {}
for h in handles:
- _, server, when = self.getPacket(h)
- timesByServer.setdefault(server, []).append(when)
+ _, routing, when = self.getPacket(h)
+ timesByServer.setdefault(routing, []).append(when)
for s in timesByServer.keys():
count = len(timesByServer[s])
oldest = min(timesByServer[s])
- days = (now - oldest) / (24*60*60)
+ days = floorDiv(now - oldest, 24*60*60)
if days < 1:
days = "<1"
- print "%2d messages for server %s:%s (oldest is %s days old)"%(
- count, s.getAddr(), s.getPort(), days)
+ print "%2d messages for server at %s:%s (oldest is %s days old)"%(
+ count, s.ip, s.port, days)
class MixminionClient:
"""Access point for client functionality. Currently, this is limited
@@ -1083,10 +1082,12 @@
self.generateForwardMessage(address, payload,
servers1, servers2)
+ routing = firstHop.getRoutingInfo()
+
if forcePool:
- self.poolMessages([message], firstHop)
+ self.poolMessages([message], routing)
else:
- self.sendMessages([message], firstHop, noPool=forceNoPool)
+ self.sendMessages([message], routing, noPool=forceNoPool)
def sendReplyMessage(self, payload, servers, surbList, forcePool=0,
forceNoPool=0):
@@ -1096,11 +1097,13 @@
#XXXX003 testme
message, firstHop = \
self.generateReplyMessage(payload, servers, surbList)
+
+ routing = firstHop.getRoutingInfo()
if forcePool:
- self.poolMessages([message], firstHop)
+ self.poolMessages([message], routing)
else:
- self.sendMessages([message], firstHop, noPool=forceNoPool)
+ self.sendMessages([message], routing, noPool=forceNoPool)
def generateReplyBlock(self, address, servers, expiryTime=0):
@@ -1170,10 +1173,13 @@
surbLog.close()
clientUnlock()
- def sendMessages(self, msgList, server, noPool=0, lazyPool=0,
+ def sendMessages(self, msgList, routingInfo, noPool=0, lazyPool=0,
warnIfLost=1):
"""Given a list of packets and a ServerInfo object, sends the
- packets to the server via MMTP"""
+ packets to the server via MMTP
+
+ DOCDOC ServerInfo or IPV4Info...
+ """
#XXXX003 testme
LOG.info("Connecting...")
timeout = self.config['Network'].get('ConnectionTimeout')
@@ -1183,21 +1189,23 @@
if noPool or lazyPool:
handles = []
else:
- handles = self.poolMessages(msgList, server)
+ handles = self.poolMessages(msgList, routingInfo)
try:
try:
# May raise TimeoutError
- mixminion.MMTPClient.sendMessages(server.getAddr(),
- server.getPort(),
- server.getKeyID(),
+ mixminion.MMTPClient.sendMessages(routingInfo,
msgList,
timeout)
except:
if noPool and warnIfLost:
LOG.error("Error with pooling disabled: message lost")
elif lazyPool:
- self.poolMessages(msgList, server)
+ self.poolMessages(msgList, routingInfo)
+ #XXXX003 Log that error occurred, but is okay.
+ else:
+ #XXXX003 Log that error occurred, but is okay.
+ pass
raise
try:
clientLock()
@@ -1206,8 +1214,8 @@
self.pool.removePacket(h)
finally:
clientUnlock()
- except socket.error, e:
- raise MixError("Error sending packets: %s" % e)
+ except MixProtocolError, e:
+ raise UIError(str(e))
def flushPool(self):
"""
@@ -1222,19 +1230,19 @@
LOG.info("Found %s pending messages", len(handles))
messagesByServer = {}
for h in handles:
- message, server, _ = self.pool.getPacket(h)
- messagesByServer.setdefault(server, []).append((message, h))
+ message, routing, _ = self.pool.getPacket(h)
+ messagesByServer.setdefault(routing, []).append((message, h))
finally:
clientUnlock()
- for server in messagesByServer.keys():
- LOG.debug("Sending %s messages to %s...",
- len(messagesByServer[server]), server.getAddr())
- msgs = [ m for m, _ in messagesByServer[server] ]
- handles = [ h for _, h in messagesByServer[server] ]
+ for routing in messagesByServer.keys():
+ LOG.info("Sending %s messages to %s:%s...",
+ len(messagesByServer[routing]), routing.ip, routing.port)
+ msgs = [ m for m, _ in messagesByServer[routing] ]
+ handles = [ h for _, h in messagesByServer[routing] ]
try:
- self.sendMessages(msgs, server, noPool=1, warnIfLost=0)
- LOG.debug("... messages sent.")
+ self.sendMessages(msgs, routing, noPool=1, warnIfLost=0)
+ LOG.info("... messages sent.")
try:
clientLock()
for h in handles:
@@ -1244,10 +1252,10 @@
clientUnlock()
except MixError:
LOG.error("Can't deliver messages to %s:%s; leaving in pool",
- server.getAddr(), server.getPort())
+ routing.ip, routing.port)
LOG.info("Pool flushed")
- def poolMessages(self, msgList, server):
+ def poolMessages(self, msgList, routing):
"""
DOCDOC
"""
@@ -1257,11 +1265,14 @@
try:
clientLock()
for msg in msgList:
- h = self.pool.poolPacket(msg, server)
+ h = self.pool.poolPacket(msg, routing)
handles.append(h)
finally:
clientUnlock()
- LOG.trace("Messages pooled")
+ if len(msgList) > 1:
+ LOG.info("Messages pooled")
+ else:
+ LOG.info("Message pooled")
return handles
def decodeMessage(self, s, force=0):
@@ -1486,7 +1497,15 @@
else:
LOG.setMinSeverity("INFO")
mixminion.Common.configureShredCommand(self.config)
- mixminion.Crypto.init_crypto(self.config)
+ if not self.verbose:
+ try:
+ LOG.setMinSeverity("WARN")
+ mixminion.Crypto.init_crypto(self.config)
+ finally:
+ LOG.setMinSeverity("INFO")
+ else:
+ mixminion.Crypto.init_crypto(self.config)
+
userdir = os.path.expanduser(self.config['User']['UserDir'])
configureClientLock(os.path.join(userdir, "lock"))
else:
@@ -1725,8 +1744,6 @@
else:
client.sendForwardMessage(address, payload, path1, path2,
forcePool, forceNoPool)
-
- LOG.info("Message sent")
_IMPORT_SERVER_USAGE = """\
Usage: %s [options] <filename> ...
Index: Crypto.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Crypto.py,v
retrieving revision 1.39
retrieving revision 1.40
diff -u -d -r1.39 -r1.40
--- Crypto.py 5 Feb 2003 05:34:55 -0000 1.39
+++ Crypto.py 11 Feb 2003 22:18:10 -0000 1.40
@@ -591,6 +591,7 @@
# If the file exists (a rare event!) we pass through, and
# try again. This paranoia is brought to you by user
# request. :)
+ raise MixFatalError("Unreachable") # appease pychecker.
def _prng(self, n):
"""Abstract method: Must be overridden to return n bytes of fresh
Index: MMTPClient.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/MMTPClient.py,v
retrieving revision 1.21
retrieving revision 1.22
diff -u -d -r1.21 -r1.22
--- MMTPClient.py 9 Feb 2003 22:30:58 -0000 1.21
+++ MMTPClient.py 11 Feb 2003 22:18:13 -0000 1.22
@@ -17,6 +17,8 @@
__all__ = [ "BlockingClientConnection", "sendMessages" ]
+#DOCDOC MixProtocolError pattern
+
import errno
import signal
import socket
@@ -57,6 +59,23 @@
self.sock = None
def connect(self, connectTimeout=None):
+ "DOCDOC"
+ try:
+ self._connect(connectTimeout)
+ except (socket.error, _ml.TLSError), e:
+ self._raise(e, "connecting")
+
+ def _raise(self, err, action):
+ if isinstance(err, socket.error):
+ tp = "Socket"
+ elif isinstance(err, _ml.TLSError):
+ tp = "TLS"
+ else:
+ tp = str(type(err))
+ raise MixProtocolError("%s error while %s to %s:%s: %s",
+ tp, action, self.targetIP, self.targetPort, err)
+
+ def _connect(self, connectTimeout=None):
"""Negotiate the handshake and protocol."""
#DOCDOC connectTimeout
# FFFF There should be a way to specify timeout for communication.
@@ -80,7 +99,7 @@
if e[0] == errno.EINTR:
raise TimeoutError("Connection timed out")
else:
- raise e
+ raise MixProtocolError("Error connecting: %s" % e)
finally:
if connectTimeout:
signal.alarm(0)
@@ -122,8 +141,11 @@
LOG.debug("MMTP protocol negotated: version %s", self.protocol)
def renegotiate(self):
- self.tls.renegotiate()
- self.tls.do_handshake()
+ try:
+ self.tls.renegotiate()
+ self.tls.do_handshake()
+ except (socket.error, _ml.TLSError), e:
+ self._raise(e, "renegotiating connection")
def sendPacket(self, packet,
control="SEND\r\n", serverControl="RECEIVED\r\n",
@@ -131,19 +153,22 @@
"""Send a single packet to a server."""
assert len(packet) == 1<<15
LOG.debug("Sending packet")
- ##
- # We write: "SEND\r\n", 28KB of data, and sha1(packet|"SEND").
- self.tls.write(control)
- self.tls.write(packet)
- self.tls.write(sha1(packet+hashExtra))
- LOG.debug("Packet sent; waiting for ACK")
-
- # And we expect, "RECEIVED\r\n", and sha1(packet|"RECEIVED")
- inp = self.tls.read(len(serverControl)+20)
- if inp != serverControl+sha1(packet+serverHashExtra):
- raise MixProtocolError("Bad ACK received")
- LOG.debug("ACK received; packet successfully delivered")
+ try:
+ ##
+ # We write: "SEND\r\n", 28KB of data, and sha1(packet|"SEND").
+ self.tls.write(control)
+ self.tls.write(packet)
+ self.tls.write(sha1(packet+hashExtra))
+ LOG.debug("Packet sent; waiting for ACK")
+ # And we expect, "RECEIVED\r\n", and sha1(packet|"RECEIVED")
+ inp = self.tls.read(len(serverControl)+20)
+ if inp != serverControl+sha1(packet+serverHashExtra):
+ raise MixProtocolError("Bad ACK received")
+ LOG.debug("ACK received; packet successfully delivered")
+ except (socket.error, _ml.TLSError), e:
+ self._raise(e, "sending packet")
+
def sendJunkPacket(self, packet):
if self.protocol == '0.1':
LOG.debug("Not sending junk to a v0.1 server")
@@ -156,14 +181,16 @@
"""Close this connection."""
LOG.debug("Shutting down connection to %s:%s",
self.targetIP, self.targetPort)
- if self.tls is not None:
- self.tls.shutdown()
- if self.sock is not None:
- self.sock.close()
+ try:
+ if self.tls is not None:
+ self.tls.shutdown()
+ if self.sock is not None:
+ self.sock.close()
+ except (socket.error, _ml.TLSError), e:
+ self._raise(e, "closing connection")
LOG.debug("Connection closed")
-def sendMessages(targetIP, targetPort, targetKeyID, packetList,
- connectTimeout=None):
+def sendMessages(routing, packetList, connectTimeout=None, callback=None):
"""Sends a list of messages to a server.
targetIP -- the address to connect to, in dotted-quad format.
@@ -173,6 +200,8 @@
packetList -- a list of 32KB packets and control strings. Control
strings must be one of "JUNK" to send a 32KB padding chunk,
or "RENEGOTIATE" to renegotiate the connection key.
+
+ DOCDOC args are wrong
"""
# Generate junk before opening connection to avoid timing attacks
packets = []
@@ -183,16 +212,19 @@
packets.append(("RENEGOTIATE", None))
else:
packets.append(("MSG", p))
-
- con = BlockingClientConnection(targetIP, targetPort, targetKeyID)
+
+ con = BlockingClientConnection(routing.ip,routing.port,routing.keyinfo)
try:
con.connect(connectTimeout=connectTimeout)
- for t,p in packets:
+ for idx in xrange(len(packets)):
+ t,p = packets[idx]
if t == "JUNK":
con.sendJunkPacket(p)
elif t == "RENEGOTIATE":
con.renegotiate()
else:
con.sendPacket(p)
+ if callback is not None:
+ callback(idx)
finally:
con.shutdown()
Index: Main.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Main.py,v
retrieving revision 1.28
retrieving revision 1.29
diff -u -d -r1.28 -r1.29
--- Main.py 9 Feb 2003 22:30:58 -0000 1.28
+++ Main.py 11 Feb 2003 22:18:13 -0000 1.29
@@ -117,6 +117,7 @@
"benchmarks" : ( 'mixminion.benchmark', 'timeAll' ),
"send" : ( 'mixminion.ClientMain', 'runClient' ),
"client" : ( 'mixminion.ClientMain', 'runClient' ),
+ "pool" : ( 'mixminion.ClientMain', 'runClient' ),
"import-server" : ( 'mixminion.ClientMain', 'importServer' ),
"list-servers" : ( 'mixminion.ClientMain', 'listServers' ),
"update-servers" : ( 'mixminion.ClientMain', 'updateServers' ),
Index: Packet.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Packet.py,v
retrieving revision 1.29
retrieving revision 1.30
diff -u -d -r1.29 -r1.30
--- Packet.py 9 Feb 2003 22:30:58 -0000 1.29
+++ Packet.py 11 Feb 2003 22:18:13 -0000 1.30
@@ -556,11 +556,15 @@
return "%s:%s (keyid=%s)"%(self.ip, self.port,
binascii.b2a_hex(self.keyinfo))
+
def pack(self):
"""Return the routing info for this address"""
assert len(self.keyinfo) == DIGEST_LEN
return struct.pack(IPV4_PAT, inet_aton(self.ip),
self.port, self.keyinfo)
+
+ def __repr__(self):
+ return "IPV4Info(%r, %r, %r)"%(self.ip, self.port, self.keyinfo)
def __hash__(self):
return hash(self.pack())
Index: test.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/test.py,v
retrieving revision 1.82
retrieving revision 1.83
diff -u -d -r1.82 -r1.83
--- test.py 9 Feb 2003 22:30:58 -0000 1.82
+++ test.py 11 Feb 2003 22:18:16 -0000 1.83
@@ -2855,9 +2855,10 @@
# Send m1, then junk, then renegotiate, then m2.
server.process(0.1)
+ routing = IPV4Info("127.0.0.1", TEST_PORT, keyid)
t = threading.Thread(None,
mixminion.MMTPClient.sendMessages,
- args=("127.0.0.1", TEST_PORT, keyid,
+ args=(routing,
[messages[0],"JUNK","RENEGOTIATE",messages[1]]))
t.start()
while len(messagesIn) < 2:
@@ -2871,11 +2872,12 @@
self.assertEquals(1, server.nJunkPackets)
# Now, with bad keyid.
+ routing = IPV4Info("127.0.0.1", TEST_PORT, "Z"*20)
t = threading.Thread(None,
self.failUnlessRaises,
args=(MixProtocolError,
mixminion.MMTPClient.sendMessages,
- "127.0.0.1", TEST_PORT, "Z"*20, messages))
+ routing, messages))
t.start()
while t.isAlive():
server.process(0.1)
@@ -2904,9 +2906,9 @@
timedout = 0
try:
try:
- mixminion.MMTPClient.sendMessages("127.0.0.1",
- TEST_PORT, "Z"*20, ["JUNK"],
- connectTimeout=1)
+ routing = IPV4Info("127.0.0.1", TEST_PORT, "Z"*20)
+ mixminion.MMTPClient.sendMessages(routing, ["JUNK"],
+ connectTimeout=1)
timedout = 0
except mixminion.MMTPClient.TimeoutError:
timedout = 1