[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[or-cvs] r15380: Statistics functions and classes formerly in metatroller.py, (torflow/branches/gsoc2008)
Author: fallon
Date: 2008-06-20 18:42:04 -0400 (Fri, 20 Jun 2008)
New Revision: 15380
Added:
torflow/branches/gsoc2008/StatsSupport.py
Log:
Statistics functions and classes formerly in metatroller.py, now reside here.
Added: torflow/branches/gsoc2008/StatsSupport.py
===================================================================
--- torflow/branches/gsoc2008/StatsSupport.py (rev 0)
+++ torflow/branches/gsoc2008/StatsSupport.py 2008-06-20 22:42:04 UTC (rev 15380)
@@ -0,0 +1,641 @@
+#StatsSupport.py - functions and classes useful for calculating stream/circuit statistics
+
+import sys
+import re
+import random
+import copy
+import time
+import math
+
+from TorCtl import TorUtil, PathSupport, TorCtl
+from TorCtl.TorUtil import *
+from TorCtl.PathSupport import *
+from TorCtl.TorUtil import meta_port, meta_host, control_port, control_host
+
+class ReasonRouterList:
+ "Helper class to track which Routers have failed for a given reason"
+ def __init__(self, reason):
+ self.reason = reason
+ self.rlist = {}
+
+ def sort_list(self): raise NotImplemented()
+
+ def write_list(self, f):
+ "Write the list of failure counts for this reason 'f'"
+ rlist = self.sort_list()
+ for r in rlist:
+ susp = 0
+ tot_failed = r.circ_failed+r.strm_failed
+ tot_susp = tot_failed+r.circ_suspected+r.strm_suspected
+ f.write(r.idhex+" ("+r.nickname+") F=")
+ if self.reason in r.reason_failed:
+ susp = r.reason_failed[self.reason]
+ f.write(str(susp)+"/"+str(tot_failed))
+ f.write(" S=")
+ if self.reason in r.reason_suspected:
+ susp += r.reason_suspected[self.reason]
+ f.write(str(susp)+"/"+str(tot_susp)+"\n")
+
+ def add_r(self, r):
+ "Add a router to the list for this reason"
+ self.rlist[r] = 1
+
+ def total_suspected(self):
+ "Get a list of total suspected failures for this reason"
+ # suspected is disjoint from failed. The failed table
+ # may not have an entry
+ def notlambda(x, y):
+ if self.reason in y.reason_suspected:
+ if self.reason in y.reason_failed:
+ return (x + y.reason_suspected[self.reason]
+ + y.reason_failed[self.reason])
+ else:
+ return (x + y.reason_suspected[self.reason])
+ else:
+ if self.reason in y.reason_failed:
+ return (x + y.reason_failed[self.reason])
+ else: return x
+ return reduce(notlambda, self.rlist.iterkeys(), 0)
+
+ def total_failed(self):
+ "Get a list of total failures for this reason"
+ def notlambda(x, y):
+ if self.reason in y.reason_failed:
+ return (x + y.reason_failed[self.reason])
+ else: return x
+ return reduce(notlambda, self.rlist.iterkeys(), 0)
+
+class SuspectRouterList(ReasonRouterList):
+ """Helper class to track all routers suspected of failing for a given
+ reason. The main difference between this and the normal
+ ReasonRouterList is the sort order and the verification."""
+ def __init__(self, reason): ReasonRouterList.__init__(self,reason)
+
+ def sort_list(self):
+ rlist = self.rlist.keys()
+ rlist.sort(lambda x, y: cmp(y.reason_suspected[self.reason],
+ x.reason_suspected[self.reason]))
+ return rlist
+
+ def _verify_suspected(self):
+ return reduce(lambda x, y: x + y.reason_suspected[self.reason],
+ self.rlist.iterkeys(), 0)
+
+class FailedRouterList(ReasonRouterList):
+ """Helper class to track all routers that failed for a given
+ reason. The main difference between this and the normal
+ ReasonRouterList is the sort order and the verification."""
+ def __init__(self, reason): ReasonRouterList.__init__(self,reason)
+
+ def sort_list(self):
+ rlist = self.rlist.keys()
+ rlist.sort(lambda x, y: cmp(y.reason_failed[self.reason],
+ x.reason_failed[self.reason]))
+ return rlist
+
+ def _verify_failed(self):
+ return reduce(lambda x, y: x + y.reason_failed[self.reason],
+ self.rlist.iterkeys(), 0)
+class BandwidthStats:
+ "Class that manages observed bandwidth through a Router"
+ def __init__(self):
+ self.byte_list = []
+ self.duration_list = []
+ self.min_bw = 1e10
+ self.max_bw = 0
+ self.mean = 0
+ self.dev = 0
+
+ def _exp(self): # Weighted avg
+ "Expectation - weighted average of the bandwidth through this node"
+ tot_bw = reduce(lambda x, y: x+y, self.byte_list, 0.0)
+ EX = 0.0
+ for i in xrange(len(self.byte_list)):
+ EX += (self.byte_list[i]*self.byte_list[i])/self.duration_list[i]
+ if tot_bw == 0.0: return 0.0
+ EX /= tot_bw
+ return EX
+
+ def _exp2(self): # E[X^2]
+ "Second moment of the bandwidth"
+ tot_bw = reduce(lambda x, y: x+y, self.byte_list, 0.0)
+ EX = 0.0
+ for i in xrange(len(self.byte_list)):
+ EX += (self.byte_list[i]**3)/(self.duration_list[i]**2)
+ if tot_bw == 0.0: return 0.0
+ EX /= tot_bw
+ return EX
+
+ def _dev(self): # Weighted dev
+ "Standard deviation of bandwidth"
+ EX = self.mean
+ EX2 = self._exp2()
+ arg = EX2 - (EX*EX)
+ if arg < -0.05:
+ plog("WARN", "Diff of "+str(EX2)+" and "+str(EX)+"^2 is "+str(arg))
+ return math.sqrt(abs(arg))
+
+ def add_bw(self, bytes, duration):
+ "Add an observed transfer of 'bytes' for 'duration' seconds"
+ if not bytes: plog("WARN", "No bytes for bandwidth")
+ bytes /= 1024.
+ self.byte_list.append(bytes)
+ self.duration_list.append(duration)
+ bw = bytes/duration
+ plog("DEBUG", "Got bandwidth "+str(bw))
+ if self.min_bw > bw: self.min_bw = bw
+ if self.max_bw < bw: self.max_bw = bw
+ self.mean = self._exp()
+ self.dev = self._dev()
+
+
+
+class StatsRouter(TorCtl.Router):
+ "Extended Router to handle statistics markup"
+ def __init__(self, router): # Promotion constructor :)
+ """'Promotion Constructor' that converts a Router directly into a
+ StatsRouter without a copy."""
+ self.__dict__ = router.__dict__
+ self.reset()
+
+ def reset(self):
+ "Reset all stats on this Router"
+ self.circ_uncounted = 0
+ self.circ_failed = 0
+ self.circ_succeeded = 0 # disjoint from failed
+ self.circ_suspected = 0
+ self.circ_chosen = 0 # above 4 should add to this
+ self.strm_failed = 0 # Only exits should have these
+ self.strm_succeeded = 0
+ self.strm_suspected = 0 # disjoint from failed
+ self.strm_uncounted = 0
+ self.strm_chosen = 0 # above 4 should add to this
+ self.reason_suspected = {}
+ self.reason_failed = {}
+ self.first_seen = time.time()
+ if "Running" in self.flags:
+ self.became_active_at = self.first_seen
+ self.hibernated_at = 0
+ else:
+ self.became_active_at = 0
+ self.hibernated_at = self.first_seen
+ self.total_hibernation_time = 0
+ self.total_active_uptime = 0
+ self.total_extend_time = 0
+ self.total_extended = 0
+ self.bwstats = BandwidthStats()
+ self.z_ratio = 0
+ self.prob_zr = 0
+ self.z_bw = 0
+ self.prob_zb = 0
+
+ def avg_extend_time(self):
+ """Return the average amount of time it took for this router
+ to extend a circuit one hop"""
+ if self.total_extended:
+ return self.total_extend_time/self.total_extended
+ else: return 0
+
+ def bw_ratio(self):
+ """Return the ratio of the Router's advertised bandwidth to its
+ observed average stream bandwidth"""
+ bw = self.bwstats.mean
+ if bw == 0.0: return 0
+ else: return self.bw/(1024.*bw)
+
+ def current_uptime(self):
+ if self.became_active_at:
+ ret = (self.total_active_uptime+(time.time()-self.became_active_at))
+ else:
+ ret = self.total_active_uptime
+ if ret == 0: return 0.000005 # eh..
+ else: return ret
+
+ def failed_per_hour(self):
+ """Return the number of circuit extend failures per hour for this
+ Router"""
+ return (3600.*(self.circ_failed+self.strm_failed))/self.current_uptime()
+
+ # XXX: Seperate suspected from failed in totals
+ def suspected_per_hour(self):
+ """Return the number of circuits that failed with this router as an
+ earlier hop"""
+ return (3600.*(self.circ_suspected+self.strm_suspected
+ +self.circ_failed+self.strm_failed))/self.current_uptime()
+
+ # These four are for sanity checking
+ def _suspected_per_hour(self):
+ return (3600.*(self.circ_suspected+self.strm_suspected))/self.current_uptime()
+
+ def _uncounted_per_hour(self):
+ return (3600.*(self.circ_uncounted+self.strm_uncounted))/self.current_uptime()
+
+ def _chosen_per_hour(self):
+ return (3600.*(self.circ_chosen+self.strm_chosen))/self.current_uptime()
+
+ def _succeeded_per_hour(self):
+ return (3600.*(self.circ_succeeded+self.strm_succeeded))/self.current_uptime()
+
+ key = """Metatroller Statistics:
+ CC=Circuits Chosen CF=Circuits Failed CS=Circuit Suspected
+ SC=Streams Chosen SF=Streams Failed SS=Streams Suspected
+ FH=Failed per Hour SH=Suspected per Hour ET=avg circuit Extend Time (s)
+ EB=mean BW (K) BD=BW std Dev (K) BR=Ratio of observed to avg BW
+ ZB=BW z-test value PB=Probability(z-bw) ZR=Ratio z-test value
+ PR=Prob(z-ratio) U=Uptime (h)\n"""
+
+ def __str__(self):
+ return (self.idhex+" ("+self.nickname+")\n"
+ +" CC="+str(self.circ_chosen)
+ +" CF="+str(self.circ_failed)
+ +" CS="+str(self.circ_suspected+self.circ_failed)
+ +" SC="+str(self.strm_chosen)
+ +" SF="+str(self.strm_failed)
+ +" SS="+str(self.strm_suspected+self.strm_failed)
+ +" FH="+str(round(self.failed_per_hour(),1))
+ +" SH="+str(round(self.suspected_per_hour(),1))+"\n"
+ +" ET="+str(round(self.avg_extend_time(),1))
+ +" EB="+str(round(self.bwstats.mean,1))
+ +" BD="+str(round(self.bwstats.dev,1))
+ +" ZB="+str(round(self.z_bw,1))
+ +" PB="+(str(round(self.prob_zb,3))[1:])
+ +" BR="+str(round(self.bw_ratio(),1))
+ +" ZR="+str(round(self.z_ratio,1))
+ +" PR="+(str(round(self.prob_zr,3))[1:])
+ +" U="+str(round(self.current_uptime()/3600, 1))+"\n")
+
+ def sanity_check(self):
+ "Makes sure all stats are self-consistent"
+ if (self.circ_failed + self.circ_succeeded + self.circ_suspected
+ + self.circ_uncounted != self.circ_chosen):
+ plog("ERROR", self.nickname+" does not add up for circs")
+ if (self.strm_failed + self.strm_succeeded + self.strm_suspected
+ + self.strm_uncounted != self.strm_chosen):
+ plog("ERROR", self.nickname+" does not add up for streams")
+ def check_reasons(reasons, expected, which, rtype):
+ count = 0
+ for rs in reasons.iterkeys():
+ if re.search(r"^"+which, rs): count += reasons[rs]
+ if count != expected:
+ plog("ERROR", "Mismatch "+which+" "+rtype+" for "+self.nickname)
+ check_reasons(self.reason_suspected,self.strm_suspected,"STREAM","susp")
+ check_reasons(self.reason_suspected,self.circ_suspected,"CIRC","susp")
+ check_reasons(self.reason_failed,self.strm_failed,"STREAM","failed")
+ check_reasons(self.reason_failed,self.circ_failed,"CIRC","failed")
+ now = time.time()
+ tot_hib_time = self.total_hibernation_time
+ tot_uptime = self.total_active_uptime
+ if self.hibernated_at: tot_hib_time += now - self.hibernated_at
+ if self.became_active_at: tot_uptime += now - self.became_active_at
+ if round(tot_hib_time+tot_uptime) != round(now-self.first_seen):
+ plog("ERROR", "Mismatch of uptimes for "+self.nickname)
+
+ per_hour_tot = round(self._uncounted_per_hour()+self.failed_per_hour()+
+ self._suspected_per_hour()+self._succeeded_per_hour(), 2)
+ chosen_tot = round(self._chosen_per_hour(), 2)
+ if per_hour_tot != chosen_tot:
+ plog("ERROR", self.nickname+" has mismatch of per hour counts: "
+ +str(per_hour_tot) +" vs "+str(chosen_tot))
+
+
+
+class StatsHandler(PathSupport.PathBuilder):
+ """An extension of PathSupport.PathBuilder that keeps track of
+ router statistics for every circuit and stream"""
+ def __init__(self, c, slmgr):
+ PathBuilder.__init__(self, c, slmgr, StatsRouter)
+ self.circ_count = 0
+ self.strm_count = 0
+ self.strm_failed = 0
+ self.circ_failed = 0
+ self.failed_reasons = {}
+ self.suspect_reasons = {}
+
+ def run_zbtest(self): # Unweighted z-test
+ """Run unweighted z-test to calculate the probabilities of a node
+ having a given stream bandwidth based on the Normal distribution"""
+ n = reduce(lambda x, y: x+(y.bwstats.mean > 0), self.sorted_r, 0)
+ if n == 0: return (0, 0)
+ avg = reduce(lambda x, y: x+y.bwstats.mean, self.sorted_r, 0)/float(n)
+ def notlambda(x, y):
+ if y.bwstats.mean <= 0: return x+0
+ else: return x+(y.bwstats.mean-avg)*(y.bwstats.mean-avg)
+ stddev = math.sqrt(reduce(notlambda, self.sorted_r, 0)/float(n))
+ if not stddev: return (avg, stddev)
+ for r in self.sorted_r:
+ if r.bwstats.mean > 0:
+ r.z_bw = abs((r.bwstats.mean-avg)/stddev)
+ r.prob_zb = TorUtil.zprob(-r.z_bw)
+ return (avg, stddev)
+
+ def run_zrtest(self): # Unweighted z-test
+ """Run unweighted z-test to calculate the probabilities of a node
+ having a given ratio of stream bandwidth to advertised bandwidth
+ based on the Normal distribution"""
+ n = reduce(lambda x, y: x+(y.bw_ratio() > 0), self.sorted_r, 0)
+ if n == 0: return (0, 0)
+ avg = reduce(lambda x, y: x+y.bw_ratio(), self.sorted_r, 0)/float(n)
+ def notlambda(x, y):
+ if y.bw_ratio() <= 0: return x+0
+ else: return x+(y.bw_ratio()-avg)*(y.bw_ratio()-avg)
+ stddev = math.sqrt(reduce(notlambda, self.sorted_r, 0)/float(n))
+ if not stddev: return (avg, stddev)
+ for r in self.sorted_r:
+ if r.bw_ratio() > 0:
+ r.z_ratio = abs((r.bw_ratio()-avg)/stddev)
+ r.prob_zr = TorUtil.zprob(-r.z_ratio)
+ return (avg, stddev)
+
+ def write_reasons(self, f, reasons, name):
+ "Write out all the failure reasons and statistics for all Routers"
+ f.write("\n\n\t----------------- "+name+" -----------------\n")
+ for rsn in reasons:
+ f.write("\n"+rsn.reason+". Failed: "+str(rsn.total_failed())
+ +", Suspected: "+str(rsn.total_suspected())+"\n")
+ rsn.write_list(f)
+
+ def write_routers(self, f, rlist, name):
+ "Write out all the usage statistics for all Routers"
+ f.write("\n\n\t----------------- "+name+" -----------------\n\n")
+ for r in rlist:
+ # only print it if we've used it.
+ if r.circ_chosen+r.strm_chosen > 0: f.write(str(r))
+
+ def write_stats(self, filename):
+ "Write out all the statistics the StatsHandler has gathered"
+ # TODO: all this shit should be configurable. Some of it only makes
+ # sense when scanning in certain modes.
+ plog("DEBUG", "Writing stats")
+ # Sanity check routers
+ for r in self.sorted_r: r.sanity_check()
+
+ # Sanity check the router reason lists.
+ for r in self.sorted_r:
+ for rsn in r.reason_failed:
+ if r not in self.failed_reasons[rsn].rlist:
+ plog("ERROR", "Router missing from reason table")
+ for rsn in r.reason_suspected:
+ if r not in self.suspect_reasons[rsn].rlist:
+ plog("ERROR", "Router missing from reason table")
+
+ # Sanity check the lists the other way
+ for rsn in self.failed_reasons.itervalues(): rsn._verify_failed()
+ for rsn in self.suspect_reasons.itervalues(): rsn._verify_suspected()
+
+ f = file(filename, "w")
+ f.write(StatsRouter.key)
+ (avg, dev) = self.run_zbtest()
+ f.write("\n\nBW stats: u="+str(round(avg,1))+" s="+str(round(dev,1))+"\n")
+
+ (avg, dev) = self.run_zrtest()
+ f.write("BW ratio stats: u="+str(round(avg,1))+" s="+str(round(dev,1))+"\n")
+
+ # Circ, strm infoz
+ f.write("Circ failure ratio: "+str(self.circ_failed)
+ +"/"+str(self.circ_count)+"\n")
+
+ f.write("Stream failure ratio: "+str(self.strm_failed)
+ +"/"+str(self.strm_count)+"\n")
+
+ # Extend times
+ n = 0.01+reduce(lambda x, y: x+(y.avg_extend_time() > 0), self.sorted_r, 0)
+ avg_extend = reduce(lambda x, y: x+y.avg_extend_time(), self.sorted_r, 0)/n
+ def notlambda(x, y):
+ return x+(y.avg_extend_time()-avg_extend)*(y.avg_extend_time()-avg_extend)
+ dev_extend = math.sqrt(reduce(notlambda, self.sorted_r, 0)/float(n))
+
+ f.write("Extend time: u="+str(round(avg_extend,1))
+ +" s="+str(round(dev_extend,1)))
+
+ # sort+print by bandwidth
+ bw_rate = copy.copy(self.sorted_r)
+ bw_rate.sort(lambda x, y: cmp(y.bw_ratio(), x.bw_ratio()))
+ self.write_routers(f, bw_rate, "Bandwidth Ratios")
+
+ failed = copy.copy(self.sorted_r)
+ failed.sort(lambda x, y:
+ cmp(y.circ_failed+y.strm_failed,
+ x.circ_failed+x.strm_failed))
+ self.write_routers(f, failed, "Failed Counts")
+
+ suspected = copy.copy(self.sorted_r)
+ suspected.sort(lambda x, y: # Suspected includes failed
+ cmp(y.circ_failed+y.strm_failed+y.circ_suspected+y.strm_suspected,
+ x.circ_failed+x.strm_failed+x.circ_suspected+x.strm_suspected))
+ self.write_routers(f, suspected, "Suspected Counts")
+
+ fail_rate = copy.copy(failed)
+ fail_rate.sort(lambda x, y: cmp(y.failed_per_hour(), x.failed_per_hour()))
+ self.write_routers(f, fail_rate, "Fail Rates")
+
+ suspect_rate = copy.copy(suspected)
+ suspect_rate.sort(lambda x, y:
+ cmp(y.suspected_per_hour(), x.suspected_per_hour()))
+ self.write_routers(f, suspect_rate, "Suspect Rates")
+
+ # TODO: Sort by failed/selected and suspect/selected ratios
+ # if we ever want to do non-uniform scanning..
+
+ # FIXME: Add failed in here somehow..
+ susp_reasons = self.suspect_reasons.values()
+ susp_reasons.sort(lambda x, y:
+ cmp(y.total_suspected(), x.total_suspected()))
+ self.write_reasons(f, susp_reasons, "Suspect Reasons")
+
+ fail_reasons = self.failed_reasons.values()
+ fail_reasons.sort(lambda x, y:
+ cmp(y.total_failed(), x.total_failed()))
+ self.write_reasons(f, fail_reasons, "Failed Reasons")
+ f.close()
+
+ # FIXME: sort+print by circ extend time
+
+ def reset_stats(self):
+ plog("DEBUG", "Resetting stats")
+ self.circ_count = 0
+ self.strm_count = 0
+ self.strm_failed = 0
+ self.circ_failed = 0
+ self.suspect_reasons.clear()
+ self.failed_reasons.clear()
+ for r in self.sorted_r: r.reset()
+
+ def circ_status_event(self, c):
+ if c.circ_id in self.circuits:
+ # TODO: Hrmm, consider making this sane in TorCtl.
+ if c.reason: lreason = c.reason
+ else: lreason = "NONE"
+ if c.remote_reason: rreason = c.remote_reason
+ else: rreason = "NONE"
+ reason = c.event_name+":"+c.status+":"+lreason+":"+rreason
+ if c.status == "LAUNCHED":
+ # Update circ_chosen count
+ self.circ_count += 1
+ elif c.status == "EXTENDED":
+ delta = c.arrived_at - self.circuits[c.circ_id].last_extended_at
+ r_ext = c.path[-1]
+ if r_ext[0] != '$': r_ext = self.name_to_key[r_ext]
+ self.routers[r_ext[1:]].total_extend_time += delta
+ self.routers[r_ext[1:]].total_extended += 1
+ elif c.status == "FAILED":
+ for r in self.circuits[c.circ_id].path: r.circ_chosen += 1
+
+ if len(c.path)-1 < 0: start_f = 0
+ else: start_f = len(c.path)-1
+
+ # Count failed
+ self.circ_failed += 1
+ # XXX: Differentiate between extender and extendee
+ for r in self.circuits[c.circ_id].path[start_f:len(c.path)+1]:
+ r.circ_failed += 1
+ if not reason in r.reason_failed:
+ r.reason_failed[reason] = 1
+ else: r.reason_failed[reason]+=1
+ if reason not in self.failed_reasons:
+ self.failed_reasons[reason] = FailedRouterList(reason)
+ self.failed_reasons[reason].add_r(r)
+
+ for r in self.circuits[c.circ_id].path[len(c.path)+1:]:
+ r.circ_uncounted += 1
+
+ # Don't count if failed was set this round, don't set
+ # suspected..
+ for r in self.circuits[c.circ_id].path[:start_f]:
+ r.circ_suspected += 1
+ if not reason in r.reason_suspected:
+ r.reason_suspected[reason] = 1
+ else: r.reason_suspected[reason]+=1
+ if reason not in self.suspect_reasons:
+ self.suspect_reasons[reason] = SuspectRouterList(reason)
+ self.suspect_reasons[reason].add_r(r)
+ elif c.status == "CLOSED":
+ # Since PathBuilder deletes the circuit on a failed,
+ # we only get this for a clean close
+ for r in self.circuits[c.circ_id].path:
+ r.circ_chosen += 1
+ if lreason in ("REQUESTED", "FINISHED", "ORIGIN"):
+ r.circ_succeeded += 1
+ else:
+ if not reason in r.reason_suspected:
+ r.reason_suspected[reason] = 1
+ else: r.reason_suspected[reason] += 1
+ r.circ_suspected+= 1
+ if reason not in self.suspect_reasons:
+ self.suspect_reasons[reason] = SuspectRouterList(reason)
+ self.suspect_reasons[reason].add_r(r)
+ PathBuilder.circ_status_event(self, c)
+
+ def count_stream_reason_failed(self, s, reason):
+ "Count the routers involved in a failure"
+ # Update failed count,reason_failed for exit
+ r = self.circuits[s.circ_id].exit
+ if not reason in r.reason_failed: r.reason_failed[reason] = 1
+ else: r.reason_failed[reason]+=1
+ r.strm_failed += 1
+ if reason not in self.failed_reasons:
+ self.failed_reasons[reason] = FailedRouterList(reason)
+ self.failed_reasons[reason].add_r(r)
+
+ def count_stream_suspects(self, s, lreason, reason):
+ "Count the routers 'suspected' of being involved in a failure"
+ if lreason in ("TIMEOUT", "INTERNAL", "TORPROTOCOL" "DESTROY"):
+ for r in self.circuits[s.circ_id].path[:-1]:
+ r.strm_suspected += 1
+ if not reason in r.reason_suspected:
+ r.reason_suspected[reason] = 1
+ else: r.reason_suspected[reason]+=1
+ if reason not in self.suspect_reasons:
+ self.suspect_reasons[reason] = SuspectRouterList(reason)
+ self.suspect_reasons[reason].add_r(r)
+ else:
+ for r in self.circuits[s.circ_id].path[:-1]:
+ r.strm_uncounted += 1
+
+ def stream_status_event(self, s):
+ if s.strm_id in self.streams and not self.streams[s.strm_id].ignored:
+ # TODO: Hrmm, consider making this sane in TorCtl.
+ if s.reason: lreason = s.reason
+ else: lreason = "NONE"
+ if s.remote_reason: rreason = s.remote_reason
+ else: rreason = "NONE"
+ reason = s.event_name+":"+s.status+":"+lreason+":"+rreason+":"+self.streams[s.strm_id].kind
+ if (s.status in ("DETACHED", "FAILED", "CLOSED", "SUCCEEDED")
+ and not s.circ_id):
+ # XXX: REMAPs can do this (normal). Also REASON=DESTROY (bug?)
+ # Also timeouts.. Those should use the pending circ instead
+ # of returning..
+ plog("WARN", "Stream "+str(s.strm_id)+" detached from no circuit!")
+ PathBuilder.stream_status_event(self, s)
+ return
+
+ # Verify circ id matches stream.circ
+ if s.status not in ("NEW", "NEWRESOLVE", "REMAP"):
+ circ = self.streams[s.strm_id].circ
+ if not circ: circ = self.streams[s.strm_id].pending_circ
+ if circ and circ.circ_id != s.circ_id:
+ plog("WARN", str(s.strm_id) + " has mismatch of "
+ +str(s.circ_id)+" v "+str(circ.circ_id))
+ if s.circ_id and s.circ_id not in self.circuits:
+ plog("NOTICE", "Unknown circuit "+str(s.circ_id)
+ +" for stream "+str(s.strm_id))
+ return
+
+ if s.status == "DETACHED":
+ if self.streams[s.strm_id].attached_at:
+ plog("WARN", str(s.strm_id)+" detached after succeeded")
+ # Update strm_chosen count
+ self.strm_count += 1
+ for r in self.circuits[s.circ_id].path: r.strm_chosen += 1
+ self.strm_failed += 1
+ self.count_stream_suspects(s, lreason, reason)
+ self.count_stream_reason_failed(s, reason)
+ elif s.status == "FAILED":
+ # HACK. We get both failed and closed for the same stream,
+ # with different reasons. Might as well record both, since they
+ # often differ.
+ self.streams[s.strm_id].failed_reason = reason
+ elif s.status == "CLOSED":
+ # Always get both a closed and a failed..
+ # - Check if the circuit exists still
+ # Update strm_chosen count
+ self.strm_count += 1
+ for r in self.circuits[s.circ_id].path: r.strm_chosen += 1
+
+ # Update bw stats. XXX: Don't do this for resolve streams
+ if self.streams[s.strm_id].attached_at:
+ lifespan = self.streams[s.strm_id].lifespan(s.arrived_at)
+ for r in self.streams[s.strm_id].circ.path:
+ r.bwstats.add_bw(self.streams[s.strm_id].bytes_written+
+ self.streams[s.strm_id].bytes_read, lifespan)
+
+ if self.streams[s.strm_id].failed:
+ reason = self.streams[s.strm_id].failed_reason+":"+lreason+":"+rreason
+
+ self.count_stream_suspects(s, lreason, reason)
+
+ r = self.circuits[s.circ_id].exit
+ if (not self.streams[s.strm_id].failed
+ and (lreason == "DONE" or (lreason == "END" and rreason == "DONE"))):
+ r.strm_succeeded += 1
+ else:
+ self.strm_failed += 1
+ self.count_stream_reason_failed(s, reason)
+ PathBuilder.stream_status_event(self, s)
+
+ def ns_event(self, n):
+ PathBuilder.ns_event(self, n)
+ now = n.arrived_at
+ for ns in n.nslist:
+ if not ns.idhex in self.routers:
+ continue
+ r = self.routers[ns.idhex]
+ if "Running" in ns.flags:
+ if not r.became_active_at:
+ r.became_active_at = now
+ r.total_hibernation_time += now - r.hibernated_at
+ r.hibernated_at = 0
+ else:
+ if not r.hibernated_at:
+ r.hibernated_at = now
+ r.total_active_uptime += now - r.became_active_at
+ r.became_active_at = 0