[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] Simplify uptime processing in pinger; make it run witho...
Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv8972/lib/mixminion/server
Modified Files:
Pinger.py
Log Message:
Simplify uptime processing in pinger; make it run without crashing.
Index: Pinger.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/Pinger.py,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -d -r1.7 -r1.8
--- Pinger.py 2 Dec 2004 06:47:08 -0000 1.7
+++ Pinger.py 2 Dec 2004 23:39:02 -0000 1.8
@@ -37,8 +37,8 @@
import mixminion.server.MMTPServer
from mixminion.Common import MixError, ceilDiv, createPrivateDir, \
- floorDiv, formatBase64, formatFnameDate, formatTime, LOG, parseFnameDate,\
- previousMidnight, readPickled, secureDelete, writePickled
+ floorDiv, formatBase64, formatFnameDate, formatTime, IntervalSet, LOG, \
+ parseFnameDate, previousMidnight, readPickled, secureDelete, writePickled
try:
import sqlite
@@ -50,24 +50,26 @@
HEARTBEAT_INTERVAL = 30*60
ONE_DAY = 24*60*60
-REALTYPES = { 'time' : 'integer',
+# Map from logical type to type used in (sqlite) database.
+REALTYPES = { 'time' : 'integer',
'boolean' : 'integer',
'integer' : 'integer',
- 'float' : 'float',
+ 'float' : 'float',
'varchar' : 'varchar',
}
+
+_MERGE_ITERS_CODE = """
_MERGE_DONE = object()
-def _wrapNext(next):
+def _wrapNext(next,DONE=_MERGE_DONE):
def fn():
try:
return next()
except StopIteration:
- return _MERGE_DONE
+ return DONE
return fn
-
-_MERGE_ITERS = """
-def mergeIters(iterable1, iterable2):
+def _mergeIters(iterable1, iterable2):
+ DONE = _MERGE_DONE
n1 = _wrapNext(iter(iterable1).next)
n2 = _wrapNext(iter(iterable2).next)
peek1 = n1()
@@ -87,49 +89,8 @@
peek2 = n2()
"""
-class _MergeIters:
- """Given two iterators yielding their items in order, yield the ordered
- merge of their items.
- """
- _DONE = object()
- def __init__(self, iterable1, iterable2):
- self.n1 = iter(iterable1).next
- self.n2 = iter(iterable2).next
- try:
- self.peek1 = self.n1()
- except StopIteration:
- self.peek1 = self._DONE
- try:
- self.peek2 = self.n2()
- except StopIteration:
- self.peek2 = self._DONE()
- def __iter__(self):
- return self
- def __next__(self):
- if self.peek1 is self._DONE:
- if self.peek2 is self._DONE:
- raise StopIteration
- else:
- p1 = 0
- elif self.peek2 is self._DONE:
- p1 = 1
- else:
- p1 = self.peek1 < self.peek2
-
- if p1:
- r = self.peek1
- try:
- self.peek1 = self.n1()
- except StopIteration:
- self.peek1 = self._DONE
- else:
- try:
- self.peek2 = self.n2()
- except StopIteration:
- self.peek2 = self._DONE
-
- return r
-
+if sys.version_info[:2] >= (2,2):
+ exec _MERGE_ITERS_CODE
WEIGHT_AGE_PERIOD = 24*60*60
WEIGHT_AGE = [ 5, 10, 10, 10, 10, 9, 8, 5, 3, 2, 2, 1, 0, 0, 0, 0, 0 ]
@@ -167,7 +128,6 @@
body.append("%s %s %s"%(r[0],REALTYPES[r[1]],r[2]))
stmt = "CREATE TABLE %s (%s)" % (name,", ".join(body))
- #print "about to execute <<%s>>"%stmt
self.cursor.execute(stmt)
self.connection.commit()
@@ -182,7 +142,6 @@
u = ""
stmt = "CREATE %sINDEX %s ON %s (%s)"%(
u, name, tablename, ", ".join(columns))
- #print "About to execute <<%s>>"%stmt
self.cursor.execute(stmt)
self.connection.commit()
@@ -242,10 +201,11 @@
def _addServer(self, name):
# hold lock.
+ name = name.lower()
if self.serverNames.has_key(name):
return
- print "!!!!",name
self.cursor.execute("INSERT INTO servers (name) VALUES (%s)", name)
+ self.serverNames[name] = 1
def rotate(self, now=None):
self.lock.acquire()
@@ -278,54 +238,64 @@
def _execute(self, sql, args):
self.lock.acquire()
try:
- #print "about to execute %s %s"%(sql,args)
self.cursor.execute(sql, args)
finally:
self.lock.release()
- def _time(self, t):
- return long(t)
-
- def _now(self):
- return long(time.time())
+ def _time(self, t=None):
+ return long(t or time.time())
_STARTUP = "INSERT INTO lifetimes (up, stillup, shutdown) VALUES (%s,%s, 0)"
- def startup(self):
- self._startTime = now = self._now()
+ def startup(self,now=None):
+ self._startTime = now = self._time(now)
self._execute(self._STARTUP, (now,now))
_SHUTDOWN = "UPDATE lifetimes SET stillup = %s, shutdown = %s WHERE up = %s"
- def shutdown(self):
+ def shutdown(self, now=None):
if self._startTime is None: self.startup()
- self._execute(self._SHUTDOWN, (self._now(), self._now(), self._startTime))
+ now = self._time(now)
+ self._execute(self._SHUTDOWN, (now, now, self._startTime))
_HEARTBEAT = "UPDATE lifetimes SET stillup = %s WHERE up = %s"
- def heartbeat(self):
+ def heartbeat(self, now=None):
if self._startTime is None: self.startup()
- self._execute(self._HEARTBEAT, (self._now(), self._startTime))
+ self._execute(self._HEARTBEAT, (self._time(now), self._startTime))
def rotated(self):
pass
_CONNECTED = ("INSERT INTO connects (at, server, success) "
"VALUES (%s,%s,%s)")
- def connected(self, nickname, success=1):
- self._execute(self._CONNECTED, (self._now(), nickname.lower(), success))
+ def connected(self, nickname, success=1, now=None):
+ self.lock.acquire()
+ try:
+ self._addServer(nickname)
+ self._execute(self._CONNECTED,
+ (self._time(now), nickname.lower(), success))
+ finally:
+ self.lock.release()
def connectFailed(self, nickname):
self.connected(nickname, 0)
_QUEUED_PING = ("INSERT INTO pings (hash, path, sentat, received)"
"VALUES (%s,%s,%s,%s)")
- def queuedPing(self, hash, path):
+ def queuedPing(self, hash, path, now=None):
assert len(hash) == mixminion.Crypto.DIGEST_LEN
- self._execute(self._QUEUED_PING,
- (formatBase64(hash), path, self._now(), 0))
+ self.lock.acquire()
+ try:
+ path = path.lower()
+ for s in path.split(","):
+ self._addServer(s)
+ self._execute(self._QUEUED_PING,
+ (formatBase64(hash), path, self._time(now), 0))
+ finally:
+ self.lock.release()
_GOT_PING = "UPDATE pings SET received = %s WHERE hash = %s"
- def gotPing(self, hash):
+ def gotPing(self, hash, now=None):
assert len(hash) == mixminion.Crypto.DIGEST_LEN
- n = self._execute(self._GOT_PING, (self._now(), formatBase64(hash)))
+ n = self._execute(self._GOT_PING, (self._time(now), formatBase64(hash)))
if n == 0:
LOG.warn("Received ping with no record of its hash")
elif n > 1:
@@ -339,84 +309,83 @@
cur.execute("DELETE FROM uptimes WHERE start = %s AND end = %s",
startTime, endTime)
- lifetimeEvents = []
# First, calculate my own uptime.
self.heartbeat()
- lifetimeEvents = []
- cur.execute("SELECT MAX(up) FROM lifetimes WHERE up < %s",startTime)
- res = cur.fetchall()
- if res:
- cur.execute("SELECT up,stillup FROM lifetimes WHERE up >= %s"
- " AND up <= %s", res[0][0], endTime)
- else:
- cur.execute("SELECT up,stillup FROM lifetimes WHERE"
- " up <= %s", endTime)
- events = cur.fetchall()
+ timespan = IntervalSet( [(startTime, endTime)] )
+
+ cur.execute("SELECT up, stillup, shutdown FROM lifetimes WHERE "
+ "up <= %s AND stillup >= %s",
+ self._time(endTime), self._time(startTime))
myUptime = 0
- for start,end in events:
- lifetimeEvents.append((start,'up'))
- lifetimeEvents.append((end,'down'))
- if start < startTime: start = startTime
- if end > endTime: end = endTime
- myUptime += end-start
+ myIntervals = IntervalSet([ (start, max(end,shutdown))
+ for start,end,shutdown in cur ])
+ myIntervals *= timespan
+ myUptime = myIntervals.spanLength()
cur.execute("INSERT INTO uptimes (start, end, name, uptime)"
" VALUES (%s, %s, '<self>', %s)",
(self._time(startTime), self._time(endTime),
float(myUptime)/(endTime-startTime)))
- lifetimeEvents.sort()
- lifetimeEvents.append((sys.maxint,1))
# Okay, now everybody else.
for s in self.serverNames.keys():
- cur.execute("SELECT at, success FROM events"
+ cur.execute("SELECT at, success FROM connects"
" WHERE server = %s AND at >= %s AND at <= %s"
" ORDER BY at",
s, startTime, endTime)
- myStatus = None
- mySince = None
- hisStatus = None
- hisLast = None
- hisTimes = [ 0,0 ]
- for at, event in _MixIters(lifetimeEvents, iter(cur)):
- if event == 'up':
- if not myStatus: mySince = at
- myStatus = 1
- elif event == 'down':
- if myStatus: mySince = at
- myStatus = 0
- else:
- # event is 0 or 1; I tried to connect at
- # 'at' and succeeded or failed.
- if hisLast and hisLast > mySince:
- # The last thing that happened: I tried and
- # succeeeded or failed.
- t = (at-hisLast)/2
- hisTimes[hisStatus] += t
- hisTimes[event] += t
- elif myStatus:
- # The last thing that happened: I came online.
- hisTimes[event] += at-mySince
- else:
- # The last thing that happened: I went offline??
- LOG.warn("Internal error in calculateUptimes")
- hisStatus = event
- hisLast = at
- if hisTimes == [0,0]:
+
+ lastStatus = None
+ lastTime = None
+ times = [ 0, 0] # uptime, downtime
+ for at, success in cur:
+ assert success in (0,1)
+ upAt, downAt = myIntervals.getIntervalContaining(at)
+ if upAt == None:
+ # Event at edge of interval
+ #XXXX008 warn better?
+ LOG.warn("Event out of bounds")
+ continue
+ if lastTime is None or upAt > lastTime:
+ lastTime = upAt
+ lastStatus = None
+ if lastStatus is not None:
+ t = (at-lastTime)/2.0
+ times[success] += t
+ times[lastStatus] += t
+
+ if times == [0,0]:
continue
- fraction = float(hisTimes[1])/(hisTimes[0]+hisTimes[1])
+ fraction = float(times[1])/(times[0]+times[1])
+ print "Adding ",name,fraction
cur.execute("INSERT INTO uptimes (start, end, name, uptime)"
- " VALUES (%s, %s, %s, %s",
+ " VALUES (%s, %s, %s, %s)",
(startTime, endTime, s, fraction))
self.connection.commit()
finally:
self.lock.release()
+ def getUptimes(self, startAt, endAt):
+ """DODOC: uptimes for all servers overlapping [startAt, endAt],
+ as mapping from (start,end) to nickname to fraction.
+ """
+ result = {}
+ cur = self.cursor
+ self.lock.acquire()
+ try:
+ cur.execute("SELECT start, end, name, uptime FROM uptimes "
+ "WHERE %s >= start AND %s <= end",
+ (self._time(startAt), self._time(endAt)))
+ for s,e,n,u in cur:
+ result.setdefault((s,e), {})[n] = u
+ return result
+ finally:
+ self.lock.release()
+
def _calculateOneHopResults(self, serverName, startTime, now=None):
# hold lock; commit when done.
cur = self.cursor
- GRANULARITY = 24*60*60
+ GRANULARITY = PING_GRANULARITY
if now is None:
now = time.time()
nPeriods = ceilDiv(now-startTime, GRANULARITY)