[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[or-cvs] r19576: {torctl} Add support for parsing w line in NS dox, create RankRestric (torctl/trunk/python/TorCtl)
Author: mikeperry
Date: 2009-05-28 07:50:55 -0400 (Thu, 28 May 2009)
New Revision: 19576
Modified:
torctl/trunk/python/TorCtl/PathSupport.py
torctl/trunk/python/TorCtl/SQLSupport.py
torctl/trunk/python/TorCtl/TorCtl.py
Log:
Add support for parsing w line in NS dox, create
RankRestriction (currently unused), add average router bw to
bws stats.
Modified: torctl/trunk/python/TorCtl/PathSupport.py
===================================================================
--- torctl/trunk/python/TorCtl/PathSupport.py 2009-05-28 11:49:06 UTC (rev 19575)
+++ torctl/trunk/python/TorCtl/PathSupport.py 2009-05-28 11:50:55 UTC (rev 19576)
@@ -222,7 +222,23 @@
def __str__(self):
return self.__class__.__name__+"("+str(self.pct_skip)+","+str(self.pct_fast)+")"
+
+class RankRestriction(NodeRestriction):
+ """Restriction to cut out a list-rank slice of the network."""
+ def __init__(self, rank_skip, rank_stop):
+ self.rank_skip = rank_skip
+ self.rank_stop = rank_stop
+
+ def r_is_ok(self, r):
+ "Returns true if r is in the boundaries (by rank)"
+ if r.list_rank < self.rank_skip: return False
+ elif r.list_rank > self.rank_stop: return False
+ return True
+
+ def __str__(self):
+ return self.__class__.__name__+"("+str(self.rank_skip)+","+str(self.rank_stop)+")"
+
class OSRestriction(NodeRestriction):
"Restriction based on operating system"
def __init__(self, ok, bad=[]):
@@ -1281,6 +1297,18 @@
for g in xrange(0, len(r._generated)):
r._generated[g] = 0
+ def is_urgent_event(event):
+ # If event is stream:NEW*/DETACHED or circ BUILT/FAILED,
+ # it is high priority and requires immediate action.
+ if isinstance(event, TorCtl.CircuitEvent):
+ if event.status in ("BUILT", "FAILED", "CLOSED"):
+ return True
+ elif isinstance(event, TorCtl.StreamEvent):
+ if event.status in ("NEW", "NEWRESOLVE", "DETACHED"):
+ return True
+ return False
+ is_urgent_event = Callable(is_urgent_event)
+
def schedule_selmgr(self, job):
"""
Schedules an immediate job to be run before the next event is
@@ -1311,16 +1339,11 @@
imm_job = self.low_prio_jobs.get_nowait()
imm_job(self)
return
-
+
# If event is stream:NEW*/DETACHED or circ BUILT/FAILED,
# don't run low prio jobs.. No need to delay streams for them.
- if isinstance(event, TorCtl.CircuitEvent):
- if event.status in ("BUILT", "FAILED"):
- return
- elif isinstance(event, TorCtl.StreamEvent):
- if event.status in ("NEW", "NEWRESOLVE", "DETACHED"):
- return
-
+ if PathBuilder.is_urgent_event(event): return
+
# Do the low prio jobs one at a time in case a
# higher priority event is queued
if not self.low_prio_jobs.empty():
Modified: torctl/trunk/python/TorCtl/SQLSupport.py
===================================================================
--- torctl/trunk/python/TorCtl/SQLSupport.py 2009-05-28 11:49:06 UTC (rev 19575)
+++ torctl/trunk/python/TorCtl/SQLSupport.py 2009-05-28 11:50:55 UTC (rev 19576)
@@ -462,7 +462,7 @@
def reset():
RouterStats.table.drop()
RouterStats.table.create()
- for r in Router.query.all(): # Is this needed?
+ for r in Router.query.all():
rs = RouterStats()
rs.router = r
r.stats = rs
@@ -587,13 +587,18 @@
f.write(str(int(time.time()))+"\n")
- # XXX: print out avg consensus bw too.. Hrmm, but only
- # during active scan period..
+ def cvt(a,b,c=1):
+ if type(a) == float: return round(a/c,b)
+ elif type(a) == int: return a
+ elif type(a) == type(None): return "None"
+ else: return type(a)
+
for s in RouterStats.query.filter(pct_clause).filter(stat_clause).\
order_by(order_by).all():
f.write("node_id=$"+s.router.idhex+" nick="+s.router.nickname)
- f.write(" strm_bw="+str(s.sbw))
- f.write(" filt_bw="+str(s.filt_sbw)+"\n")
+ f.write(" strm_bw="+str(int(cvt(s.sbw,0))))
+ f.write(" filt_bw="+str(int(cvt(s.filt_sbw,0))))
+ f.write(" ns_bw="+str(int(cvt(s.avg_bw,0)))+"\n")
f.flush()
write_bws = Callable(write_bws)
@@ -604,12 +609,12 @@
#################### Model Support ################
def reset_all():
# Need to keep routers around..
- for r in Router.query.all():
- r.bw_history = [] # XXX: Is this sufficient/correct/necessary?
- r.circuits = []
- r.streams = []
- r.stats = None
- tc_session.add(r)
+ #for r in Router.query.all():
+ # r.bw_history = [] # XXX: Is this sufficient/correct/necessary?
+ # r.circuits = []
+ # r.streams = []
+ # r.stats = None
+ # tc_session.add(r)
BwHistory.table.drop() # Will drop subclasses
Extension.table.drop()
@@ -623,6 +628,7 @@
Stream.table.create()
Circuit.table.create()
+ tc_session.clear()
tc_session.commit()
##################### End Model Support ####################
@@ -689,11 +695,26 @@
tc_session.add(OP)
tc_session.commit()
self.update_consensus()
- # So ghetto
+ # XXX: This hack exists because update_rank_history is expensive.
+ # However, even if we delay it till the end of the consensus update,
+ # it still delays event processing for up to 30 seconds on a fast
+ # machine.
+ #
+ # The correct way to do this is give SQL processing
+ # to a dedicated worker thread that pulls events off of a secondary
+ # queue, that way we don't block stream handling on this processing.
+ # The problem is we are pretty heavily burdened with the need to
+ # stay in sync with our parent event handler. A queue will break this
+ # coupling (even if we could get all the locking right).
+ #
+ # A lighterweight hack might be to just make the scanners pause
+ # on a condition used to signal we are doing this (and other) heavy
+ # lifting. We could have them possibly check self.last_desc_at..
if e.arrived_at - self.last_desc_at > 30.0:
- plog("INFO", "Newdesc timer is up. Assuming we have full consensus now")
- self.last_desc_at = 0x7fffffff
- self._update_rank_history(self.consensus.ns_map.iterkeys())
+ if not PathSupport.PathBuilder.is_urgent_event(e):
+ plog("INFO", "Newdesc timer is up. Assuming we have full consensus")
+ self.last_desc_at = 0x7fffffff
+ self._update_rank_history(self.consensus.ns_map.iterkeys())
def new_consensus_event(self, n):
if n.state == EVENT_STATE.POSTLISTEN:
Modified: torctl/trunk/python/TorCtl/TorCtl.py
===================================================================
--- torctl/trunk/python/TorCtl/TorCtl.py 2009-05-28 11:49:06 UTC (rev 19575)
+++ torctl/trunk/python/TorCtl/TorCtl.py 2009-05-28 11:50:55 UTC (rev 19576)
@@ -89,7 +89,7 @@
class NetworkStatus:
"Filled in during NS events"
- def __init__(self, nickname, idhash, orhash, updated, ip, orport, dirport, flags):
+ def __init__(self, nickname, idhash, orhash, updated, ip, orport, dirport, flags, bandwidth=None):
self.nickname = nickname
self.idhash = idhash
self.orhash = orhash
@@ -98,6 +98,7 @@
self.dirport = int(dirport)
self.flags = flags
self.idhex = (self.idhash + "=").decode("base64").encode("hex").upper()
+ self.bandwidth = bandwidth
m = re.search(r"(\d+)-(\d+)-(\d+) (\d+):(\d+):(\d+)", updated)
self.updated = datetime.datetime(*map(int, m.groups()))
@@ -275,10 +276,13 @@
self.__dict__[i] = copy.deepcopy(args[0].__dict__[i])
return
else:
- (idhex, name, bw, down, exitpolicy, flags, ip, version, os, uptime, published, contact, rate_limited, orhash) = args
+ (idhex, name, bw, down, exitpolicy, flags, ip, version, os, uptime, published, contact, rate_limited, orhash, ns_bandwidth) = args
self.idhex = idhex
self.nickname = name
- self.bw = bw
+ if ns_bandwidth != None:
+ self.bw = ns_bandwidth
+ else:
+ self.bw = bw
self.exitpolicy = exitpolicy
self.flags = flags # Technicaly from NS doc
self.down = down
@@ -366,7 +370,7 @@
plog("INFO", "No version and/or OS for router " + ns.nickname)
return Router(ns.idhex, ns.nickname, bw_observed, dead, exitpolicy,
ns.flags, ip, version, os, uptime, published, contact, rate_limited,
- ns.orhash)
+ ns.orhash, ns.bandwidth)
build_from_desc = Callable(build_from_desc)
def update_to(self, new):
@@ -917,8 +921,12 @@
m = re.search(r"^s((?:\s\S*)+)", nsline, re.M)
flags = m.groups()
flags = flags[0].strip().split(" ")
- m = re.match(r"(\S+)\s(\S+)\s(\S+)\s(\S+\s\S+)\s(\S+)\s(\d+)\s(\d+)", nsline)
- nslist.append(NetworkStatus(*(m.groups() + (flags,))))
+ m = re.match(r"(\S+)\s(\S+)\s(\S+)\s(\S+\s\S+)\s(\S+)\s(\d+)\s(\d+)", nsline)
+ w = re.search(r"^w Bandwidth=(\d+)", nsline, re.M)
+ if w:
+ nslist.append(NetworkStatus(*(m.groups() + (flags,) + int(w.group(0)))))
+ else:
+ nslist.append(NetworkStatus(*(m.groups() + (flags,))))
return nslist
class EventSink: