[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] Finish switching pinger code over to identity-based (no...
Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria:/tmp/cvs-serv13027/lib/mixminion/server
Modified Files:
Pinger.py
Log Message:
Finish switching pinger code over to identity-based (not nickname-based) server tracking. (I think.)
Index: Pinger.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Pinger.py,v
retrieving revision 1.27
retrieving revision 1.28
diff -u -d -r1.27 -r1.28
--- Pinger.py 9 Aug 2005 15:51:32 -0000 1.27
+++ Pinger.py 3 Nov 2005 21:12:20 -0000 1.28
@@ -241,6 +241,22 @@
self._theCursor.execute(stmt, (keyVals+valVals))
return fn
+ def encodeIdentity(self, identity):
+ """DOCDOC"""
+ if identity == '<self>':
+ return '<self>'
+ else:
+ assert len(identity) == mixminion.Crypto.DIGEST_LEN
+ return binascii.b2a_hex(identity)
+
+ def decodeIdentity(self, hexid):
+ """DOCDOC"""
+ if hexid == '<self>':
+ return '<self>'
+ else:
+ assert len(hexid) == mixminion.Crypto.DIGEST_LEN*2
+ return binascii.a2b_hex(hexid)
+
class PingLog:
"""A PingLog stores a series of pinging-related events to a
persistant relational database, and calculates server statistics based
@@ -259,7 +275,7 @@
# server reliability (a float between 0 and 1).
# _intervals: An instance of IntervalSchedule.
# _brokenChains, _interestingChains: Maps from comma-separated
- # lowercase nicknames for servers in chains that we believe to be
+ # lowercase hex ids for servers in chains that we believe to be
# broken or "interesting" to 1.
# _startTime: The 'startup' time for the current myLifespan row.
# _lastRecalculation: The last time this process recomputed all
@@ -316,11 +332,10 @@
("sentat", "timestamp", "not null"),
("received", "timestamp")])
- # Holds lowercased nicknames for all the servers we know about.
+ # Holds identity digests for all the servers we know about.
self._db.createTable("server",
- [("id", "integer", "primary key"),
- ("name", "varchar(32)", "unique not null"),
- ("identity", "varchar(40)", "unique not null")])
+ [("id", "integer", "primary key"),
+ ("identity", "varchar(40)", "unique not null")])
# Holds information about our attempts to launch MMTP connections
# to other servers. A row in connectionAttempt means: We tried to
@@ -413,7 +428,6 @@
#### Indices.
- self._db.createIndex("serverName", "server", ["name"], unique=1)
self._db.createIndex("serverIdentity", "server",
["identity"], unique=1)
self._db.createIndex("statsIntervalSE", "statsInterval",
@@ -459,20 +473,20 @@
res = cur.fetchall()
serverIDs = {}
for idnum,identity in res:
- serverIDs[binascii.a2b_hex(identity)] = idnum
+ serverIDs[self._db.decodeIdentity(identity)] = idnum
serverReliability = {}
- cur.execute("SELECT name, reliability FROM "
- "echolotCurrentOneHopResult, server "
- "WHERE server.id = echolotCurrentOneHopResult.server")
+ cur.execute("SELECT identity, reliability FROM "
+ "echolotCurrentOneHopResult, server WHERE "
+ "echolotCurrentOneHopResult.server = server.id")
res = cur.fetchall()
- for name,rel in res:
- serverReliability[name]=rel
+ for hexid,rel in res:
+ serverReliability[self._db.decodeIdentity(hexid)]=rel
- cur.execute("SELECT s1.name, s2.name, broken, interesting FROM "
- "echolotCurrentTwoHopResult, server as S1, server as S2 "
- "WHERE (interesting = 1 OR broken = 1) AND "
- "S1.id = server1 AND S2.id = server2")
+ cur.execute("SELECT S1.identity, S2.identity,broken,interesting FROM"
+ " echolotCurrentTwoHopResult,server AS S1,server AS S2 "
+ "WHERE (interesting = 1 OR broken = 1) "
+ " AND S1.id = server1 AND S2.id = server2")
res = cur.fetchall()
broken = {}
interesting = {}
@@ -491,14 +505,14 @@
def updateServers(self, descriptorSource):
"""Add the names 'descriptorSource' to the database, if they
- aren't there already.
+ aren't there already.
"""
for s in descriptorInfo.getServerList():
- self._getServerID(s.getIdentityDigest(), s.getNickname())
+ self._getServerID(s.getIdentityDigest())
self._db.getConnection().commit()
- def _getServerID(self, identity, name=None):
- """Helper: Return the database ID for the server whose hex
+ def _getServerID(self, identity):
+ """Helper: Return the database ID for the server whose
identity digest is 'identity'. If the database doesn't
know about the server yet, add it. Does not commit the
current transaction.
@@ -511,14 +525,13 @@
self._serverIDs[identity] = 1
finally:
self._lock.release()
+
cur = self._db.getCursor()
- if name is None: name = binascii.b2a_hex(identity)
+ hexid = self._db.encodeIdentity(identity)
- cur.execute("INSERT INTO server (name,identity) VALUES (%s,%s)",
- name.lower(), binascii.b2a_hex(identity))
- cur.execute("SELECT id FROM server WHERE identity = %s",
- binascii.b2a_hex(identity))
+ cur.execute("INSERT INTO server (identity) VALUES (%s)", hexid)
+ cur.execute("SELECT id FROM server WHERE identity = %s", hexid)
#XXXX catch errors!
ident, = cur.fetchone()
self._serverIDs[identity]=ident
@@ -612,7 +625,7 @@
_CONNECTED = ("INSERT INTO connectionAttempt (at, server, success) "
"VALUES (%s,%s,%s)")
def connected(self, identity, success=1, now=None):
- """Note that we attempted to connect to the server named 'nickname'.
+ """Note that we attempted to connect to the server with 'identity'.
We successfully negotiated a protocol iff success is true.
"""
serverID = self._getServerID(identity)
@@ -748,7 +761,7 @@
"AND %s >= startat AND %s <= endat",
(self._db.time(startAt), self._db.time(endAt)))
for s,e,i,u in cur:
- result.setdefault((s,e), {})[binascii.a2b_hex(i)] = u
+ result.setdefault((s,e), {})[self._db.decodeIdentity(i)] = u
self._db.getConnection().commit()
return result
@@ -1020,22 +1033,23 @@
if now is None: now = time.time()
print >>f, "# List of all currently tracked servers."
- print >>f, "KNOWN_SERVERS =",serverIdentities
+ print >>f, "KNOWN_SERVERS =", [
+ self._db.encodeIdentity(i) for i in serverIdentities]
cur = self._db.getCursor()
print >>f, "\n# Map from server to list of (period-start, period-end, fractional uptime"
print >>f, "SERVER_UPTIMES = {"
- cur.execute("SELECT startAt,endAt,identity,name,uptime FROM uptime, server, statsInterval "
+ cur.execute("SELECT startAt,endAt,identity,uptime FROM uptime, server, statsInterval "
"WHERE startAt >= %s AND startAt <= %s "
"AND uptime.server = server.id "
"AND uptime.interval = statsInterval.id "
- "ORDER BY name, startAt", (since, now))
+ "ORDER BY identity, startAt", (since, now))
lastServer = "---"
- for s,e,i,n,u in cur:
+ for s,e,i,u in cur:
if i != lastServer:
if lastServer != '---': print >>f, " ],"
lastServer = i
- print >>f, " (%r,%r) : [" % (i,n)
+ print >>f, " %r : [" % i
print >>f, " (%s,%s,%.04f),"%(s,e,u)
if lastServer != '---': print >>f, " ]"
print >>f, "}"
@@ -1045,37 +1059,38 @@
# # of those pings received, median latency on those pings (sec),
# weighted reliability)"""
print >>f, "SERVER_DAILY_PING_STATUS = {"
- cur.execute("SELECT identity,name,startAt,endAt,nSent,nReceived,"
+ cur.execute("SELECT identity,startAt,endAt,nSent,nReceived,"
" latency,reliability "
- "FROM echolotOneHopResult, server, statsInterval "
+ "FROM echolotOneHopResult, statsInterval, server "
"WHERE startat >= %s AND startat <= %s"
"AND echolotOneHopResult.server = server.id "
"AND echolotOneHopResult.interval = statsInterval.id "
- "ORDER BY name, startat", (since, now))
+ "ORDER BY identity, startat", (since, now))
lastServer = "---"
- for i,n,s,e,nS,nR,lat,r in cur:
+ for i,s,e,nS,nR,lat,r in cur:
if s == '<self>': continue
if i != lastServer:
if lastServer != '---': print >>f, " ],"
lastServer = i
- print >>f, " (%r,%r) : [" % (i,n)
+ print >>f, " (%r) : [" % i
print >>f, " (%s,%s,%s,%s,%s,%.04f),"%(s,e,nS,nR,lat,r)
if lastServer != '---': print >>f, " ]"
print >>f, "}"
- print >>f, "\n# Map from server-name to current (avg latency, avg reliability)"
+ print >>f, "\n# Map from server identity to current (avg latency, avg reliability)"
print >>f, "SERVER_CUR_PING_STATUS = {"
- cur.execute("SELECT name,latency,reliability FROM "
+ cur.execute("SELECT identity,latency,reliability FROM "
"echolotCurrentOneHopResult, server WHERE "
"echolotCurrentOneHopResult.server = server.id")
- for n,lat,r in cur:
- if n == '<self>': continue
- print >>f, " %r : (%s,%.04f)," %(n,lat,r)
+ for i,lat,r in cur:
+ if i == '<self>': continue
+ print >>f, " %r : (%s,%.04f)," %(i,lat,r)
print >>f, "}"
print >>f, "\n# Chains that we want to know more about"
print >>f, "INTERESTING_CHAINS = ["
- cur.execute("SELECT S1.name, S2.name FROM echolotCurrentTwoHopResult, "
+ cur.execute("SELECT S1.identity, S2.identity "
+ "FROM echolotCurrentTwoHopResult, "
" server as S1, server as S2 WHERE "
"interesting = 1 AND S1.id = server1 AND S2.id = server2")
for s1,s2 in cur:
@@ -1085,7 +1100,8 @@
print >>f, "\n# Chains that are more unreliable than we'd expect"
print >>f, "BROKEN_CHAINS = ["
- cur.execute("SELECT S1.name, S2.name FROM echolotCurrentTwoHopResult, "
+ cur.execute("SELECT S1.identity, S2.identity "
+ "FROM echolotCurrentTwoHopResult, "
" server as S1, server as S2 WHERE "
"broken = 1 AND S1.id = server1 AND S2.id = server2")
for s1,s2 in cur:
@@ -1125,13 +1141,11 @@
# pingLog: an instance of PingLog
# outgoingQueue: an instance of outgoingQueue, if we're going to send
# pings
- # myNickname: the nickname of this server
def __init__(self, config):
"""Create a new PingGenerator with a given configuration"""
self.directory = None
self.pingLog = None
self.outgoingQueue = None
- self.myNickname = config['Server']['Nickname']
def connect(self, directory, outgoingQueue, pingLog, keyring):
"""Use the provided directory/queue/pingLog/keyring as needed.
@@ -1178,7 +1192,8 @@
Return 1 if we are able to queue the ping, 0 otherwise.
"""
assert path1 and path2
- assert path2[-1].getNickname().lower() == self.myNickname.lower()
+ assert (path2[-1].getIdentityDigest() ==
+ self.keyring.getIdentityKeyDigest())
try:
p1 = self.directory.getPath(path1)
p2 = self.directory.getPath(path2)
@@ -1208,7 +1223,8 @@
from the start of the period by a random-looking amount.
"""
## Fields:
- # nextPingTime: map from path to the next time we plan to ping it.
+ # nextPingTime: map from path (list of IDs) to the next time we plan to
+ # ping it.
# seed: a secret random value used to computer perturbations.
def __init__(self):
self.nextPingTime = {}#path->when
@@ -1262,7 +1278,8 @@
self.nextPingTime[path] = t
if oldTime != t:
LOG.trace("Scheduling %d-hop ping for %s at %s", len(path),
- ",".join(path), formatTime(t,1))
+ ",".join([binascii.b2a_hex(p) for p in path]),
+ formatTime(t,1))
#LOG.trace("(Period starts at %s; period is %s days; interval is %s sec; perturbation is %s sec)",
# formatTime(periodStart,1), self._period_length/ONE_DAY, interval, perturbation)
return t
@@ -1300,15 +1317,16 @@
def scheduleAllPings(self, now=None):
if now is None: now = int(time.time())
servers = self.directory.getAllServers()
- nicknames = {}
+ identities = {}
for s in servers:
- nicknames[s.getNickname().lower()]=1
- for (n,) in self.nextPingTime.keys():
- if not nicknames.has_key(n):
- LOG.trace("Unscheduling 1-hop ping for %s", n)
- del self.nextPingTime[(n,)]
- for n in nicknames.keys():
- self._schedulePing((n,), now)
+ identities[s.getIdentityDigest()]=1
+ for (i,) in self.nextPingTime.keys():
+ if not identities.has_key(i):
+ LOG.trace("Unscheduling 1-hop ping for %s",
+ binascii.b2a_hex(i))
+ del self.nextPingTime[(i,)]
+ for i in identities.keys():
+ self._schedulePing((i,), now)
def _getPingInterval(self, path):
return self._ping_interval
@@ -1316,27 +1334,28 @@
def sendPings(self, now=None):
if now is None: now = time.time()
servers = self.directory.getAllServers()
- nicknames = {}
+ identities = {}
for s in servers:
- nicknames[s.getNickname().lower()] = 1
+ identities[s.getIdentityDigest()] = s
pingable = []
- for n in nicknames.keys():
- when = self.nextPingTime.get((n,))
+ for i in identities.keys():
+ when = self.nextPingTime.get((i,))
if when is None:
# No ping scheduled; server must be new to directory.
- self._schedulePing((n,),now)
+ self._schedulePing((i,),now)
continue
elif when > now: # Not yet.
continue
else:
# Time for a ping!
- pingable.append(n)
+ pingable.append(i)
myDescriptor = self.keyring.getCurrentDescriptor()
- for n in pingable:
- if self._sendOnePing([n], [myDescriptor]):
- self._schedulePing((n,), now+60)
+ for i in pingable:
+ s = identities[i]
+ if self._sendOnePing([s], [myDescriptor]):
+ self._schedulePing((i,), now+60)
else:
- del self.nextPingTime[(n,)]
+ del self.nextPingTime[(i,)]
class TwoHopPingGenerator(_PingScheduler, PingGenerator):
"""A TwoHopPingGenerator uses the Echolot ping algorithm to schedule
@@ -1360,19 +1379,19 @@
def scheduleAllPings(self, now=None):
if now is None: now = time.time()
servers = self.directory.getAllServers()
- nicknames = {}
+ identities = {}
for s in servers:
- nicknames[s.getNickname().lower()]=1
- for n1,n2 in self.nextPingTime.keys():
- if not (nicknames.has_key(n1) and nicknames.has_key(n2)):
+ identities[s.getIdentityDigest()]=1
+ for id1,id2 in self.nextPingTime.keys():
+ if not (identities.has_key(id1) and identities.has_key(id2)):
LOG.trace("Unscheduling 2-hop ping for %s,%s",n1,n2)
- del self.nextPingTime[(n1,n2)]
- for n1 in nicknames.keys():
- for n2 in nicknames.keys():
- self._schedulePing((n1,n2),now)
+ del self.nextPingTime[(id1,id2)]
+ for id1 in identities.keys():
+ for id2 in identities.keys():
+ self._schedulePing((id1,id2),now)
def _getPingInterval(self, path):
- p = ",".join([ s.lower() for s in path])
+ p = ",".join([self.pingLog._db.encodeIdentity(i) for i in path])
if self.pingLog._interestingChains.get(p, 0):
#LOG.trace("While scheduling, I decided that %s was interesting",p)
return self._interesting_interval
@@ -1383,28 +1402,30 @@
def sendPings(self, now=None):
if now is None: now = time.time()
servers = self.directory.getAllServers()
- nicknames = {}
+ identities = {}
for s in servers:
- nicknames[s.getNickname().lower()] = 1
+ identities[s.getIdentityDigest()]=s
pingable = []
- for n1 in nicknames.keys():
- for n2 in nicknames.keys():
- when = self.nextPingTime.get((n1,n2))
+ for id1 in identities.keys():
+ for id2 in identities.keys():
+ when = self.nextPingTime.get((id1,id2))
if when is None:
# No ping scheduled; server must be new to directory.
- self._schedulePing((n1,n2),now)
+ self._schedulePing((id1,id2),now)
continue
elif when > now: # Not yet.
continue
else:
# Time for a ping!
- pingable.append((n1,n2))
+ pingable.append((id1,id2))
myDescriptor = self.keyring.getCurrentDescriptor()
- for n1, n2 in pingable:
- if self._sendOnePing([n1,n2], [myDescriptor]):
- self._schedulePing((n1,n2), now+60)
+ for id1, id2 in pingable:
+ s1 = identities[id1]
+ s2 = identities[id2]
+ if self._sendOnePing([s1,s2], [myDescriptor]):
+ self._schedulePing((id1,id2), now+60)
else:
- del self.nextPingTime[(n1,n2)]
+ del self.nextPingTime[(id1,id2)]
class TestLinkPaddingGenerator(PingGenerator):
"""A PingGenerator to ensure that we randomly probe all known server