[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] Finish switching pinger code over to identity-based (no...



Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria:/tmp/cvs-serv13027/lib/mixminion/server

Modified Files:
	Pinger.py 
Log Message:
Finish switching pinger code over to identity-based (not nickname-based) server tracking. (I think.)

Index: Pinger.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Pinger.py,v
retrieving revision 1.27
retrieving revision 1.28
diff -u -d -r1.27 -r1.28
--- Pinger.py	9 Aug 2005 15:51:32 -0000	1.27
+++ Pinger.py	3 Nov 2005 21:12:20 -0000	1.28
@@ -241,6 +241,22 @@
             self._theCursor.execute(stmt, (keyVals+valVals))
         return fn
 
+    def encodeIdentity(self, identity):
+        """DOCDOC"""
+        if identity == '<self>':
+            return '<self>'
+        else:
+            assert len(identity) == mixminion.Crypto.DIGEST_LEN
+            return binascii.b2a_hex(identity)
+
+    def decodeIdentity(self, hexid):
+        """DOCDOC"""
+        if hexid == '<self>':
+            return '<self>'
+        else:
+            assert len(hexid) == mixminion.Crypto.DIGEST_LEN*2
+            return binascii.a2b_hex(hexid)
+
 class PingLog:
     """A PingLog stores a series of pinging-related events to a
        persistant relational database, and calculates server statistics based
@@ -259,7 +275,7 @@
     #    server reliability (a float between 0 and 1).
     # _intervals: An instance of IntervalSchedule.
     # _brokenChains, _interestingChains: Maps from comma-separated
-    #   lowercase nicknames for servers in chains that we believe to be
+    #   lowercase hex ids for servers in chains that we believe to be
     #   broken or "interesting" to 1.
     # _startTime: The 'startup' time for the current myLifespan row.
     # _lastRecalculation: The last time this process recomputed all
@@ -316,11 +332,10 @@
                                   ("sentat",   "timestamp",    "not null"),
                                   ("received", "timestamp")])
 
-            # Holds lowercased nicknames for all the servers we know about.
+            # Holds identity digests for all the servers we know about.
             self._db.createTable("server",
-                                 [("id",   "integer",     "primary key"),
-                                  ("name", "varchar(32)", "unique not null"),
-                                  ("identity", "varchar(40)", "unique not null")])
+                              [("id",   "integer",     "primary key"),
+                               ("identity", "varchar(40)", "unique not null")])
 
             # Holds information about our attempts to launch MMTP connections
             # to other servers.  A row in connectionAttempt means: We tried to
@@ -413,7 +428,6 @@
 
             #### Indices.
 
-            self._db.createIndex("serverName", "server", ["name"], unique=1)
             self._db.createIndex("serverIdentity", "server",
                                  ["identity"], unique=1)
             self._db.createIndex("statsIntervalSE", "statsInterval",
@@ -459,20 +473,20 @@
         res = cur.fetchall()
         serverIDs = {}
         for idnum,identity in res:
-            serverIDs[binascii.a2b_hex(identity)] = idnum
+            serverIDs[self._db.decodeIdentity(identity)] = idnum
 
         serverReliability = {}
-        cur.execute("SELECT name, reliability FROM "
-                    "echolotCurrentOneHopResult, server "
-                    "WHERE server.id = echolotCurrentOneHopResult.server")
+        cur.execute("SELECT identity, reliability FROM "
+                    "echolotCurrentOneHopResult, server WHERE "
+                    "echolotCurrentOneHopResult.server = server.id")
         res = cur.fetchall()
-        for name,rel in res:
-            serverReliability[name]=rel
+        for hexid,rel in res:
+            serverReliability[self._db.decodeIdentity(hexid)]=rel
 
-        cur.execute("SELECT s1.name, s2.name, broken, interesting FROM "
-                    "echolotCurrentTwoHopResult, server as S1, server as S2 "
-                    "WHERE (interesting = 1 OR broken = 1) AND "
-                    "S1.id = server1 AND S2.id = server2")
+        cur.execute("SELECT S1.identity, S2.identity,broken,interesting FROM"
+                    " echolotCurrentTwoHopResult,server AS S1,server AS S2 "
+                    "WHERE (interesting = 1 OR broken = 1) "
+                    " AND S1.id = server1 AND S2.id = server2")
         res = cur.fetchall()
         broken = {}
         interesting = {}
@@ -491,14 +505,14 @@
 
     def updateServers(self, descriptorSource):
         """Add the names 'descriptorSource' to the database, if they
-        aren't there already.
+           aren't there already.
         """
         for s in descriptorInfo.getServerList():
-            self._getServerID(s.getIdentityDigest(), s.getNickname())
+            self._getServerID(s.getIdentityDigest())
         self._db.getConnection().commit()
 
-    def _getServerID(self, identity, name=None):
-        """Helper: Return the database ID for the server whose hex
+    def _getServerID(self, identity):
+        """Helper: Return the database ID for the server whose
            identity digest is 'identity'.  If the database doesn't
            know about the server yet, add it.  Does not commit the
            current transaction.
@@ -511,14 +525,13 @@
                 self._serverIDs[identity] = 1
         finally:
             self._lock.release()
+
         cur = self._db.getCursor()
 
-        if name is None: name = binascii.b2a_hex(identity)
+        hexid = self._db.encodeIdentity(identity)
 
-        cur.execute("INSERT INTO server (name,identity) VALUES (%s,%s)",
-                    name.lower(), binascii.b2a_hex(identity))
-        cur.execute("SELECT id FROM server WHERE identity = %s",
-                    binascii.b2a_hex(identity))
+        cur.execute("INSERT INTO server (identity) VALUES (%s)", hexid)
+        cur.execute("SELECT id FROM server WHERE identity = %s", hexid)
         #XXXX catch errors!
         ident, = cur.fetchone()
         self._serverIDs[identity]=ident
@@ -612,7 +625,7 @@
     _CONNECTED = ("INSERT INTO connectionAttempt (at, server, success) "
                   "VALUES (%s,%s,%s)")
     def connected(self, identity, success=1, now=None):
-        """Note that we attempted to connect to the server named 'nickname'.
+        """Note that we attempted to connect to the server with 'identity'.
            We successfully negotiated a protocol iff success is true.
         """
         serverID = self._getServerID(identity)
@@ -748,7 +761,7 @@
                     "AND %s >= startat AND %s <= endat",
                     (self._db.time(startAt), self._db.time(endAt)))
         for s,e,i,u in cur:
-            result.setdefault((s,e), {})[binascii.a2b_hex(i)] = u
+            result.setdefault((s,e), {})[self._db.decodeIdentity(i)] = u
         self._db.getConnection().commit()
         return result
 
@@ -1020,22 +1033,23 @@
 
         if now is None: now = time.time()
         print >>f, "# List of all currently tracked servers."
-        print >>f, "KNOWN_SERVERS =",serverIdentities
+        print >>f, "KNOWN_SERVERS =", [
+            self._db.encodeIdentity(i) for i in serverIdentities]
         cur = self._db.getCursor()
 
         print >>f, "\n# Map from server to list of (period-start, period-end, fractional uptime"
         print >>f, "SERVER_UPTIMES = {"
-        cur.execute("SELECT startAt,endAt,identity,name,uptime FROM uptime, server, statsInterval "
+        cur.execute("SELECT startAt,endAt,identity,uptime FROM uptime, server, statsInterval "
                     "WHERE startAt >= %s AND startAt <= %s "
                     "AND uptime.server = server.id "
                     "AND uptime.interval = statsInterval.id "
-                    "ORDER BY name, startAt", (since, now))
+                    "ORDER BY identity, startAt", (since, now))
         lastServer = "---"
-        for s,e,i,n,u in cur:
+        for s,e,i,u in cur:
             if i != lastServer:
                 if lastServer != '---': print >>f, "   ],"
                 lastServer = i
-                print >>f, "   (%r,%r) : [" % (i,n)
+                print >>f, "   %r : [" % i
             print >>f, "      (%s,%s,%.04f),"%(s,e,u)
         if lastServer != '---': print >>f, "   ]"
         print >>f, "}"
@@ -1045,37 +1059,38 @@
 #      # of those pings received, median latency on those pings (sec),
 #      weighted reliability)"""
         print >>f, "SERVER_DAILY_PING_STATUS = {"
-        cur.execute("SELECT identity,name,startAt,endAt,nSent,nReceived,"
+        cur.execute("SELECT identity,startAt,endAt,nSent,nReceived,"
                    "  latency,reliability "
-                   "FROM echolotOneHopResult, server, statsInterval "
+                   "FROM echolotOneHopResult, statsInterval, server "
                    "WHERE startat >= %s AND startat <= %s"
                    "AND echolotOneHopResult.server = server.id "
                    "AND echolotOneHopResult.interval = statsInterval.id "
-                   "ORDER BY name, startat", (since, now))
+                   "ORDER BY identity, startat", (since, now))
         lastServer = "---"
-        for i,n,s,e,nS,nR,lat,r in cur:
+        for i,s,e,nS,nR,lat,r in cur:
             if s == '<self>': continue
             if i != lastServer:
                 if lastServer != '---': print >>f, "   ],"
                 lastServer = i
-                print >>f, "   (%r,%r) : [" % (i,n)
+                print >>f, "   (%r) : [" % i
             print >>f, "      (%s,%s,%s,%s,%s,%.04f),"%(s,e,nS,nR,lat,r)
         if lastServer != '---': print >>f, "   ]"
         print >>f, "}"
 
-        print >>f, "\n# Map from server-name to current (avg latency, avg reliability)"
+        print >>f, "\n# Map from server identity to current (avg latency, avg reliability)"
         print >>f, "SERVER_CUR_PING_STATUS = {"
-        cur.execute("SELECT name,latency,reliability FROM "
+        cur.execute("SELECT identity,latency,reliability FROM "
                     "echolotCurrentOneHopResult, server WHERE "
                     "echolotCurrentOneHopResult.server = server.id")
-        for n,lat,r in cur:
-            if n == '<self>': continue
-            print >>f, "   %r : (%s,%.04f)," %(n,lat,r)
+        for i,lat,r in cur:
+            if i == '<self>': continue
+            print >>f, "   %r : (%s,%.04f)," %(i,lat,r)
         print >>f, "}"
 
         print >>f, "\n# Chains that we want to know more about"
         print >>f, "INTERESTING_CHAINS = ["
-        cur.execute("SELECT S1.name, S2.name FROM echolotCurrentTwoHopResult, "
+        cur.execute("SELECT S1.identity, S2.identity "
+                    "FROM echolotCurrentTwoHopResult, "
                     "   server as S1, server as S2 WHERE "
                     "interesting = 1 AND S1.id = server1 AND S2.id = server2")
         for s1,s2 in cur:
@@ -1085,7 +1100,8 @@
 
         print >>f, "\n# Chains that are more unreliable than we'd expect"
         print >>f, "BROKEN_CHAINS = ["
-        cur.execute("SELECT S1.name, S2.name FROM echolotCurrentTwoHopResult, "
+        cur.execute("SELECT S1.identity, S2.identity "
+                    "FROM echolotCurrentTwoHopResult, "
                     "   server as S1, server as S2 WHERE "
                     "broken = 1 AND S1.id = server1 AND S2.id = server2")
         for s1,s2 in cur:
@@ -1125,13 +1141,11 @@
     # pingLog: an instance of PingLog
     # outgoingQueue: an instance of outgoingQueue, if we're going to send
     #   pings
-    # myNickname: the nickname of this server
     def __init__(self, config):
         """Create a new PingGenerator with a given configuration"""
         self.directory = None
         self.pingLog = None
         self.outgoingQueue = None
-        self.myNickname = config['Server']['Nickname']
 
     def connect(self, directory, outgoingQueue, pingLog, keyring):
         """Use the provided directory/queue/pingLog/keyring as needed.
@@ -1178,7 +1192,8 @@
            Return 1 if we are able to queue the ping, 0 otherwise.
         """
         assert path1 and path2
-        assert path2[-1].getNickname().lower() == self.myNickname.lower()
+        assert (path2[-1].getIdentityDigest() ==
+                self.keyring.getIdentityKeyDigest())
         try:
             p1 = self.directory.getPath(path1)
             p2 = self.directory.getPath(path2)
@@ -1208,7 +1223,8 @@
        from the start of the period by a random-looking amount.
     """
     ## Fields:
-    # nextPingTime: map from path to the next time we plan to ping it.
+    # nextPingTime: map from path (list of IDs) to the next time we plan to
+    #      ping it.
     # seed: a secret random value used to computer perturbations.
     def __init__(self):
         self.nextPingTime = {}#path->when
@@ -1262,7 +1278,8 @@
         self.nextPingTime[path] = t
         if oldTime != t:
             LOG.trace("Scheduling %d-hop ping for %s at %s", len(path),
-                      ",".join(path), formatTime(t,1))
+                      ",".join([binascii.b2a_hex(p) for p in path]),
+                      formatTime(t,1))
             #LOG.trace("(Period starts at %s; period is %s days; interval is %s sec; perturbation is %s sec)",
             #          formatTime(periodStart,1), self._period_length/ONE_DAY, interval, perturbation)
         return t
@@ -1300,15 +1317,16 @@
     def scheduleAllPings(self, now=None):
         if now is None: now = int(time.time())
         servers = self.directory.getAllServers()
-        nicknames = {}
+        identities = {}
         for s in servers:
-            nicknames[s.getNickname().lower()]=1
-        for (n,) in self.nextPingTime.keys():
-            if not nicknames.has_key(n):
-                LOG.trace("Unscheduling 1-hop ping for %s", n)
-                del self.nextPingTime[(n,)]
-        for n in nicknames.keys():
-            self._schedulePing((n,), now)
+            identities[s.getIdentityDigest()]=1
+        for (i,) in self.nextPingTime.keys():
+            if not identities.has_key(i):
+                LOG.trace("Unscheduling 1-hop ping for %s",
+                          binascii.b2a_hex(i))
+                del self.nextPingTime[(i,)]
+        for i in identities.keys():
+            self._schedulePing((i,), now)
 
     def _getPingInterval(self, path):
         return self._ping_interval
@@ -1316,27 +1334,28 @@
     def sendPings(self, now=None):
         if now is None: now = time.time()
         servers = self.directory.getAllServers()
-        nicknames = {}
+        identities = {}
         for s in servers:
-            nicknames[s.getNickname().lower()] = 1
+            identities[s.getIdentityDigest()] = s
         pingable = []
-        for n in nicknames.keys():
-            when = self.nextPingTime.get((n,))
+        for i in identities.keys():
+            when = self.nextPingTime.get((i,))
             if when is None:
                 # No ping scheduled; server must be new to directory.
-                self._schedulePing((n,),now)
+                self._schedulePing((i,),now)
                 continue
             elif when > now: # Not yet.
                 continue
             else:
                 # Time for a ping!
-                pingable.append(n)
+                pingable.append(i)
         myDescriptor = self.keyring.getCurrentDescriptor()
-        for n in pingable:
-            if self._sendOnePing([n], [myDescriptor]):
-                self._schedulePing((n,), now+60)
+        for i in pingable:
+            s = identities[i]
+            if self._sendOnePing([s], [myDescriptor]):
+                self._schedulePing((i,), now+60)
             else:
-                del self.nextPingTime[(n,)]
+                del self.nextPingTime[(i,)]
 
 class TwoHopPingGenerator(_PingScheduler, PingGenerator):
     """A TwoHopPingGenerator uses the Echolot ping algorithm to schedule
@@ -1360,19 +1379,19 @@
     def scheduleAllPings(self, now=None):
         if now is None: now = time.time()
         servers = self.directory.getAllServers()
-        nicknames = {}
+        identities = {}
         for s in servers:
-            nicknames[s.getNickname().lower()]=1
-        for n1,n2 in self.nextPingTime.keys():
-            if not (nicknames.has_key(n1) and nicknames.has_key(n2)):
+            identities[s.getIdentityDigest()]=1
+        for id1,id2 in self.nextPingTime.keys():
+            if not (identities.has_key(id1) and identities.has_key(id2)):
                 LOG.trace("Unscheduling 2-hop ping for %s,%s",n1,n2)
-                del self.nextPingTime[(n1,n2)]
-        for n1 in nicknames.keys():
-            for n2 in nicknames.keys():
-                self._schedulePing((n1,n2),now)
+                del self.nextPingTime[(id1,id2)]
+        for id1 in identities.keys():
+            for id2 in identities.keys():
+                self._schedulePing((id1,id2),now)
 
     def _getPingInterval(self, path):
-        p = ",".join([ s.lower() for s in path])
+        p = ",".join([self.pingLog._db.encodeIdentity(i) for i in path])
         if self.pingLog._interestingChains.get(p, 0):
             #LOG.trace("While scheduling, I decided that %s was interesting",p)
             return self._interesting_interval
@@ -1383,28 +1402,30 @@
     def sendPings(self, now=None):
         if now is None: now = time.time()
         servers = self.directory.getAllServers()
-        nicknames = {}
+        identities = {}
         for s in servers:
-            nicknames[s.getNickname().lower()] = 1
+            identities[s.getIdentityDigest()]=s
         pingable = []
-        for n1 in nicknames.keys():
-            for n2 in nicknames.keys():
-                when = self.nextPingTime.get((n1,n2))
+        for id1 in identities.keys():
+            for id2 in identities.keys():
+                when = self.nextPingTime.get((id1,id2))
                 if when is None:
                     # No ping scheduled; server must be new to directory.
-                    self._schedulePing((n1,n2),now)
+                    self._schedulePing((id1,id2),now)
                     continue
                 elif when > now: # Not yet.
                     continue
                 else:
                     # Time for a ping!
-                    pingable.append((n1,n2))
+                    pingable.append((id1,id2))
         myDescriptor = self.keyring.getCurrentDescriptor()
-        for n1, n2 in pingable:
-            if self._sendOnePing([n1,n2], [myDescriptor]):
-                self._schedulePing((n1,n2), now+60)
+        for id1, id2 in pingable:
+            s1 = identities[id1]
+            s2 = identities[id2]
+            if self._sendOnePing([s1,s2], [myDescriptor]):
+                self._schedulePing((id1,id2), now+60)
             else:
-                del self.nextPingTime[(n1,n2)]
+                del self.nextPingTime[(id1,id2)]
 
 class TestLinkPaddingGenerator(PingGenerator):
     """A PingGenerator to ensure that we randomly probe all known server