[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[or-cvs] r9831: Z-tests written for stream bandwidth and obsevered bandwidth (in torflow/trunk: . TorCtl)
Author: mikeperry
Date: 2007-03-15 14:58:51 -0400 (Thu, 15 Mar 2007)
New Revision: 9831
Added:
torflow/trunk/speedracer.pl
Modified:
torflow/trunk/TorCtl/PathSupport.py
torflow/trunk/TorCtl/TorCtl.py
torflow/trunk/TorCtl/TorUtil.py
torflow/trunk/metatroller.py
torflow/trunk/soat.pl
Log:
Z-tests written for stream bandwidth and obsevered bandwidth/stream bandwidth.
Still a little flaky unfortunately.. Really need to add timestamps to events
as soon as they are read, and also some stats are in fact self-consistent in
their insanity rather than sane. But at least no regressions are present in
this code, so I'm gonna commit it. Bugfixes to follow.
Modified: torflow/trunk/TorCtl/PathSupport.py
===================================================================
--- torflow/trunk/TorCtl/PathSupport.py 2007-03-15 16:28:01 UTC (rev 9830)
+++ torflow/trunk/TorCtl/PathSupport.py 2007-03-15 18:58:51 UTC (rev 9831)
@@ -6,8 +6,8 @@
import random
import socket
import copy
-import datetime
import Queue
+import time
from TorUtil import *
__all__ = ["NodeRestrictionList", "PathRestrictionList",
@@ -124,7 +124,7 @@
class Connection(TorCtl.Connection):
def build_circuit(self, pathlen, path_sel):
- circ = TorCtl.Circuit()
+ circ = Circuit()
if pathlen == 1:
circ.exit = path_sel.exit_chooser(circ.path)
circ.path = [circ.exit]
@@ -136,7 +136,6 @@
circ.exit = path_sel.exit_chooser(circ.path)
circ.path.append(circ.exit)
circ.cid = self.extend_circuit(0, circ.id_path())
- circ.created_at = datetime.datetime.now()
return circ
######################## Node Restrictions ########################
@@ -144,22 +143,22 @@
# TODO: We still need more path support implementations
# - BwWeightedGenerator
# - NodeRestrictions:
-# - Uptime/LongLivedPorts (Does/should hibernation count?)
-# - Published/Updated
-# - GeoIP
-# - NodeCountry
-# - PathRestrictions
-# - Family
-# - GeoIP:
-# - OceanPhobicRestrictor (avoids Pacific Ocean or two atlantic crossings)
-# or ContinentRestrictor (avoids doing more than N continent crossings)
-# - Mathematical/empirical study of predecessor expectation
-# - If middle node on the same continent as exit, exit learns nothing
-# - else, exit has a bias on the continent of origin of user
-# - Language and browser accept string determine this anyway
-# - EchelonPhobicRestrictor
-# - Does not cross international boundaries for client->Entry or
-# Exit->destination hops
+# - Uptime/LongLivedPorts (Does/should hibernation count?)
+# - Published/Updated
+# - GeoIP
+# - NodeCountry
+# - PathRestrictions:
+# - Family
+# - GeoIP:
+# - OceanPhobicRestrictor (avoids Pacific Ocean or two atlantic crossings)
+# or ContinentRestrictor (avoids doing more than N continent crossings)
+# - Mathematical/empirical study of predecessor expectation
+# - If middle node on the same continent as exit, exit learns nothing
+# - else, exit has a bias on the continent of origin of user
+# - Language and browser accept string determine this anyway
+# - EchelonPhobicRestrictor
+# - Does not cross international boundaries for client->Entry or
+# Exit->destination hops
class PercentileRestriction(NodeRestriction):
"""If used, this restriction MUST be FIRST in the RestrictionList."""
@@ -173,6 +172,7 @@
self.sorted_r = r_list
self.position = 0
+ # XXX: Don't count non-running routers in this
def r_is_ok(self, r):
ret = True
if self.position == len(self.sorted_r):
@@ -255,7 +255,6 @@
return True
return False
-
class VersionExcludeRestriction(NodeRestriction):
def __init__(self, exclude):
self.exclude = map(TorCtl.RouterVersion, exclude)
@@ -272,7 +271,6 @@
if less_eq: self.less_eq = TorCtl.RouterVersion(less_eq)
else: self.less_eq = None
-
def r_is_ok(self, router):
return (not self.gr_eq or router.version >= self.gr_eq) and \
(not self.less_eq or router.version <= self.less_eq)
@@ -344,6 +342,7 @@
self.mark_chosen(r)
yield r
+# XXX: Either this is busted or the ExitPolicyRestriction is..
class OrderedExitGenerator(NodeGenerator):
def __init__(self, restriction_list, to_port):
self.to_port = to_port
@@ -501,15 +500,17 @@
self.mid_rstr.update_routers(new_rlist)
self.exit_rstr.update_routers(new_rlist)
-class Circuit(TorCtl.Circuit):
- def __init__(self, circuit): # Promotion constructor
- # perf shortcut since we don't care about the 'circuit'
- # instance after this
- self.__dict__ = circuit.__dict__
+class Circuit:
+ def __init__(self):
+ self.cid = 0
+ self.path = [] # routers
+ self.exit = None
self.built = False
self.detached_cnt = 0
- self.created_at = datetime.datetime.now()
+ self.last_extended_at = time.time()
self.pending_streams = [] # Which stream IDs are pending us
+
+ def id_path(self): return map(lambda r: r.idhex, self.path)
class Stream:
def __init__(self, sid, host, port, kind):
@@ -520,7 +521,14 @@
self.host = host
self.port = port
self.kind = kind
+ self.attached_at = 0
+ self.bytes_read = 0
+ self.bytes_written = 0
+ # XXX: Use event timestamps
+ def lifespan(self): return time.time()-self.attached_at
+ def write_bw(self): return self.bytes_written/self.lifespan()
+
# TODO: Make passive "PathWatcher" so people can get aggregate
# node reliability stats for normal usage without us attaching streams
@@ -543,6 +551,7 @@
self.num_circuits = 1
self.RouterClass = RouterClass
self.sorted_r = []
+ self.name_to_key = {}
self.routers = {}
self.circuits = {}
self.streams = {}
@@ -551,13 +560,72 @@
self.selmgr.reconfigure(self.sorted_r)
self.imm_jobs = Queue.Queue()
self.low_prio_jobs = Queue.Queue()
+ self.run_all_jobs = False
self.do_reconfigure = False
plog("INFO", "Read "+str(len(self.sorted_r))+"/"+str(len(nslist))+" routers")
+ def schedule_immediate(self, job):
+ """
+ Schedules an immediate job to be run before the next event is
+ processed.
+ """
+ self.imm_jobs.put(job)
+
+ def schedule_low_prio(self, job):
+ """
+ Schedules a job to be run when a non-time critical event arrives.
+ """
+ self.low_prio_jobs.put(job)
+
+ def schedule_selmgr(self, job):
+ """
+ Schedules an immediate job to be run before the next event is
+ processed. Also notifies the selection manager that it needs
+ to update itself.
+ """
+ def notlambda(this):
+ job(this.selmgr)
+ this.do_reconfigure = True
+ self.schedule_immediate(notlambda)
+
+
+ def heartbeat_event(self, event):
+ while not self.imm_jobs.empty():
+ imm_job = self.imm_jobs.get_nowait()
+ imm_job(self)
+
+ if self.do_reconfigure:
+ self.selmgr.reconfigure(self.sorted_r)
+ self.do_reconfigure = False
+
+ if self.run_all_jobs:
+ self.run_all_jobs = False
+ while not self.low_prio_jobs.empty():
+ imm_job = self.low_prio_jobs.get_nowait()
+ imm_job(self)
+ return
+
+ # If event is stream:NEW*/DETACHED or circ BUILT/FAILED,
+ # don't run low prio jobs.. No need to delay streams or delay bandwidth
+ # counting for them.
+ if isinstance(event, TorCtl.CircuitEvent):
+ if event.status in ("BUILT", "FAILED", "EXTENDED"):
+ return
+ elif isinstance(event, TorCtl.StreamEvent):
+ if event.status in ("NEW", "NEWRESOLVE", "DETACHED", "FAILED", "CLOSED"):
+ return
+
+ # Do the low prio jobs one at a time in case a
+ # higher priority event is queued
+ if not self.low_prio_jobs.empty():
+ delay_job = self.low_prio_jobs.get_nowait()
+ delay_job(self)
+
def read_routers(self, nslist):
routers = self.c.read_routers(nslist)
new_routers = []
for r in routers:
+ self.name_to_key[r.nickname] = "$"+r.idhex
if r.idhex in self.routers:
if self.routers[r.idhex].nickname != r.nickname:
plog("NOTICE", "Router "+r.idhex+" changed names from "
@@ -567,7 +635,7 @@
self.routers[r.idhex].update_to(r)
else:
rc = self.RouterClass(r)
- self.routers[r.idhex] = rc
+ self.routers[rc.idhex] = rc
new_routers.append(rc)
self.sorted_r.extend(new_routers)
self.sorted_r.sort(lambda x, y: cmp(y.bw, x.bw))
@@ -585,6 +653,7 @@
+" pending streams")
unattached_streams.extend(self.circuits[key].pending_streams)
# FIXME: Consider actually closing circ if no streams.
+ # XXX: Circ chosen&failed count before doing this?
del self.circuits[key]
for circ in self.circuits.itervalues():
@@ -605,9 +674,9 @@
while circ == None:
self.selmgr.set_target(stream.host, stream.port)
try:
- circ = Circuit(self.c.build_circuit(
+ circ = self.c.build_circuit(
self.selmgr.pathlen,
- self.selmgr.path_selector))
+ self.selmgr.path_selector)
except TorCtl.ErrorReply, e:
# FIXME: How come some routers are non-existant? Shouldn't
# we have gotten an NS event to notify us they
@@ -621,53 +690,6 @@
self.circuits[circ.cid] = circ
self.last_exit = circ.exit
-
- def schedule_immediate(self, job):
- """
- Schedules an immediate job to be run before the next event is
- processed.
- """
- self.imm_jobs.put(job)
-
- def schedule_low_prio(self, job):
- """
- Schedules a job to be run when a non-time critical event arrives.
- """
- self.low_prio_jobs.put(job)
-
- def schedule_selmgr(self, job):
- """
- Schedules an immediate job to be run before the next event is
- processed. Also notifies the selection manager that it needs
- to update itself.
- """
- def notlambda(this):
- job(this.selmgr)
- this.do_reconfigure = True
- self.schedule_immediate(notlambda)
-
- def heartbeat_event(self, event):
- while not self.imm_jobs.empty():
- imm_job = self.imm_jobs.get_nowait()
- imm_job(self)
-
- if self.do_reconfigure:
- self.selmgr.reconfigure(self.sorted_r)
- self.do_reconfigure = False
-
- # If event is stream:NEW*/DETACHED or circ BUILT/FAILED,
- # don't run low prio jobs.. No need to delay streams on them.
- if isinstance(event, TorCtl.CircuitEvent):
- if event.status in ("BUILT", "FAILED"): return
- elif isinstance(event, TorCtl.StreamEvent):
- if event.status in ("NEW", "NEWRESOLVE", "DETACHED"): return
-
- # Do the low prio jobs one at a time in case a
- # higher priority event is queued
- if not self.low_prio_jobs.empty():
- delay_job = self.low_prio_jobs.get_nowait()
- delay_job(self)
-
def circ_status_event(self, c):
output = [c.event_name, str(c.circ_id), c.status]
if c.path: output.append(",".join(c.path))
@@ -678,7 +700,9 @@
if c.circ_id not in self.circuits:
plog("DEBUG", "Ignoring circ " + str(c.circ_id))
return
- if c.status == "FAILED" or c.status == "CLOSED":
+ if c.status == "EXTENDED":
+ self.circuits[c.circ_id].last_extended_at = time.time()
+ elif c.status == "FAILED" or c.status == "CLOSED":
circ = self.circuits[c.circ_id]
del self.circuits[c.circ_id]
for stream in circ.pending_streams:
@@ -720,7 +744,6 @@
plog("WARN", "Stream "+str(s.strm_id)+" detached from no circuit!")
else:
self.streams[s.strm_id].detached_from.append(s.circ_id)
-
if self.streams[s.strm_id] in self.streams[s.strm_id].pending_circ.pending_streams:
self.streams[s.strm_id].pending_circ.pending_streams.remove(self.streams[s.strm_id])
@@ -734,6 +757,7 @@
self.streams[s.strm_id].circ = self.streams[s.strm_id].pending_circ
self.streams[s.strm_id].circ.pending_streams.remove(self.streams[s.strm_id])
self.streams[s.strm_id].pending_circ = None
+ self.streams[s.strm_id].attached_at = time.time()
elif s.status == "FAILED" or s.status == "CLOSED":
# FIXME stats
if s.strm_id not in self.streams:
@@ -748,7 +772,8 @@
# (FIXME: be careful about double stats)
if s.status == "FAILED":
# Avoid busted circuits that will not resolve or carry
- # traffic. FIXME: Failed count before doing this?
+ # traffic.
+ # XXX: Circ chosen&failed count before doing this?
if s.circ_id in self.circuits: del self.circuits[s.circ_id]
else: plog("WARN","Failed stream on unknown circ "+str(s.circ_id))
return
@@ -767,7 +792,16 @@
self.streams[s.strm_id].host = s.target_host
self.streams[s.strm_id].port = s.target_port
-
+ def stream_bw_event(self, s):
+ output = [s.event_name, str(s.strm_id), str(s.bytes_read),
+ str(s.bytes_written)]
+ plog("DEBUG", " ".join(output))
+ if not s.strm_id in self.streams:
+ plog("WARN", "BW event for unknown stream id: "+str(s.strm_id))
+ else:
+ self.streams[s.strm_id].bytes_read += s.bytes_read
+ self.streams[s.strm_id].bytes_written += s.bytes_written
+
def ns_event(self, n):
self.read_routers(n.nslist)
plog("DEBUG", "Read " + str(len(n.nslist))+" NS => "
@@ -781,6 +815,8 @@
+ str(len(self.sorted_r)) + " routers")
self.selmgr.update_routers(self.sorted_r)
+ def bandwidth_event(self, b): pass # For heartbeat only..
+
########################## Unit tests ##########################
Modified: torflow/trunk/TorCtl/TorCtl.py
===================================================================
--- torflow/trunk/TorCtl/TorCtl.py 2007-03-15 16:28:01 UTC (rev 9830)
+++ torflow/trunk/TorCtl/TorCtl.py 2007-03-15 18:58:51 UTC (rev 9831)
@@ -28,6 +28,7 @@
CIRC="CIRC",
STREAM="STREAM",
ORCONN="ORCONN",
+ STREAM_BW="STREAM_BW",
BW="BW",
NS="NS",
NEWDESC="NEWDESC",
@@ -111,18 +112,24 @@
self.reason = reason
self.ncircs = ncircs
+class StreamBwEvent:
+ def __init__(self, event_name, strm_id, read, written):
+ self.event_name = event_name
+ self.strm_id = int(strm_id)
+ self.bytes_read = int(read)
+ self.bytes_written = int(written)
+
class LogEvent:
def __init__(self, level, msg):
self.event_name = self.level = level
self.msg = msg
class AddrMapEvent:
- def __init__(self, event_name, from_addr, to_addr, when, by_exit):
+ def __init__(self, event_name, from_addr, to_addr, when):
self.event_name = event_name
self.from_addr = from_addr
self.to_addr = to_addr
self.when = when
- self.by_exit = by_exit # XOXOXOX <3 ;) @ nickm
class BWEvent:
def __init__(self, event_name, read, written):
@@ -214,16 +221,6 @@
plog("NOTICE", "No matching exit line for "+self.nickname)
return False
-class Circuit:
- def __init__(self):
- self.cid = 0
- self.created_at = 0 # time
- self.path = [] # routers
- self.exit = 0
-
- def id_path(self): return map(lambda r: r.idhex, self.path)
-
-
class Connection:
"""A Connection represents a connection to the Tor process."""
def __init__(self, sock):
@@ -289,7 +286,7 @@
self._err(sys.exc_info())
return
- if isEvent:
+ if isEvent: # XXX: timestamp these, and pass timestamp to EventHandler
if self._handler is not None:
self._eventQueue.put(reply)
else:
@@ -691,6 +688,7 @@
"CIRC" : self.circ_status_event,
"STREAM" : self.stream_status_event,
"ORCONN" : self.or_conn_status_event,
+ "STREAM_BW" : self.stream_bw_event,
"BW" : self.bandwidth_event,
"DEBUG" : self.msg_event,
"INFO" : self.msg_event,
@@ -763,6 +761,11 @@
else: wrote = 0
event = ORConnEvent(evtype, status, target, age, read, wrote,
reason, ncircs)
+ elif evtype == "STREAM_BW":
+ m = re.match(r"(\d+)\s+(\d+)\s+(\d+)", body)
+ if not m:
+ raise ProtocolError("STREAM_BW event misformatted.")
+ event = StreamBwEvent(evtype, *m.groups())
elif evtype == "BW":
m = re.match(r"(\d+)\s+(\d+)", body)
if not m:
@@ -783,7 +786,7 @@
else:
when = time.localtime(
time.strptime(when[1:-1], "%Y-%m-%d %H:%M:%S"))
- event = AddrMapEvent(evtype, fromaddr, toaddr, when, "Unknown")
+ event = AddrMapEvent(evtype, fromaddr, toaddr, when)
elif evtype == "NS":
event = NetworkStatusEvent(evtype, parse_ns_body(data))
else:
@@ -805,29 +808,24 @@
def circ_status_event(self, event):
"""Called when a circuit status changes if listening to CIRCSTATUS
- events. 'status' is a member of CIRC_STATUS; circID is a numeric
- circuit ID, and 'path' is the circuit's path so far as a list of
- names.
- """
+ events."""
raise NotImplemented()
def stream_status_event(self, event):
"""Called when a stream status changes if listening to STREAMSTATUS
- events. 'status' is a member of STREAM_STATUS; streamID is a
- numeric stream ID, and 'target' is the destination of the stream.
- """
+ events. """
raise NotImplemented()
+ def stream_bw_event(self, event):
+ raise NotImplemented()
+
def or_conn_status_event(self, event):
"""Called when an OR connection's status changes if listening to
- ORCONNSTATUS events. 'status' is a member of OR_CONN_STATUS; target
- is the OR in question.
- """
+ ORCONNSTATUS events."""
raise NotImplemented()
def bandwidth_event(self, event):
- """Called once a second if listening to BANDWIDTH events. 'read' is
- the number of bytes read; 'written' is the number of bytes written.
+ """Called once a second if listening to BANDWIDTH events.
"""
raise NotImplemented()
Modified: torflow/trunk/TorCtl/TorUtil.py
===================================================================
--- torflow/trunk/TorCtl/TorUtil.py 2007-03-15 16:28:01 UTC (rev 9830)
+++ torflow/trunk/TorCtl/TorUtil.py 2007-03-15 18:58:51 UTC (rev 9831)
@@ -13,10 +13,11 @@
import socket
import binascii
import sha
+import math
__all__ = ["Enum", "Enum2", "quote", "escape_dots", "unescape_dots",
"BufSock", "secret_to_key", "urandom_rng", "s2k_gen", "s2k_check",
- "plog", "ListenSocket"]
+ "plog", "ListenSocket", "zprob"]
class Enum:
# Helper: define an ordered dense name-to-number 1-1 mapping.
@@ -191,3 +192,46 @@
print level + ": " + msg
sys.stdout.flush()
+# Stolen from
+# http://www.nmr.mgh.harvard.edu/Neural_Systems_Group/gary/python/stats.py
+def zprob(z):
+ """
+Returns the area under the normal curve 'to the left of' the given z value.
+Thus,
+ for z<0, zprob(z) = 1-tail probability
+ for z>0, 1.0-zprob(z) = 1-tail probability
+ for any z, 2.0*(1.0-zprob(abs(z))) = 2-tail probability
+Adapted from z.c in Gary Perlman's |Stat.
+
+Usage: lzprob(z)
+"""
+ Z_MAX = 6.0 # maximum meaningful z-value
+ if z == 0.0:
+ x = 0.0
+ else:
+ y = 0.5 * math.fabs(z)
+ if y >= (Z_MAX*0.5):
+ x = 1.0
+ elif (y < 1.0):
+ w = y*y
+ x = ((((((((0.000124818987 * w
+ -0.001075204047) * w +0.005198775019) * w
+ -0.019198292004) * w +0.059054035642) * w
+ -0.151968751364) * w +0.319152932694) * w
+ -0.531923007300) * w +0.797884560593) * y * 2.0
+ else:
+ y = y - 2.0
+ x = (((((((((((((-0.000045255659 * y
+ +0.000152529290) * y -0.000019538132) * y
+ -0.000676904986) * y +0.001390604284) * y
+ -0.000794620820) * y -0.002034254874) * y
+ +0.006549791214) * y -0.010557625006) * y
+ +0.011630447319) * y -0.009279453341) * y
+ +0.005353579108) * y -0.002141268741) * y
+ +0.000535310849) * y +0.999936657524
+ if z > 0.0:
+ prob = ((x+1.0)*0.5)
+ else:
+ prob = ((1.0-x)*0.5)
+ return prob
+
Modified: torflow/trunk/metatroller.py
===================================================================
--- torflow/trunk/metatroller.py 2007-03-15 16:28:01 UTC (rev 9830)
+++ torflow/trunk/metatroller.py 2007-03-15 18:58:51 UTC (rev 9831)
@@ -11,11 +11,11 @@
import traceback
import re
import random
-import datetime
import threading
import struct
import copy
import time
+import math
from TorCtl import *
from TorCtl.TorUtil import *
from TorCtl.PathSupport import *
@@ -31,7 +31,7 @@
max_detach = 3
# Do NOT modify this object directly after it is handed to PathBuilder
-# Use PathBuilder.schedule_reconfigure instead.
+# Use PathBuilder.schedule_selmgr instead.
# (Modifying the arguments here is OK)
__selmgr = PathSupport.SelectionManager(
pathlen=3,
@@ -44,7 +44,53 @@
use_exit=None,
use_guards=False)
+class BandwidthStats:
+ def __init__(self):
+ self.byte_list = []
+ self.duration_list = []
+ self.min_bw = 1e10
+ self.max_bw = 0
+ self.mean = 0
+ self.dev = 0
+ def _exp(self): # Weighted avg
+ tot_bw = reduce(lambda x, y: x+y, self.byte_list, 0.0)
+ EX = 0.0
+ for i in xrange(len(self.byte_list)):
+ EX += (self.byte_list[i]*self.byte_list[i])/self.duration_list[i]
+ if tot_bw == 0.0: return 0.0
+ EX /= tot_bw
+ return EX
+
+ def _exp2(self): # E[X^2]
+ tot_bw = reduce(lambda x, y: x+y, self.byte_list, 0.0)
+ EX = 0.0
+ for i in xrange(len(self.byte_list)):
+ EX += (self.byte_list[i]**3)/(self.duration_list[i]**2)
+ if tot_bw == 0.0: return 0.0
+ EX /= tot_bw
+ return EX
+
+ def _dev(self): # Weighted dev
+ EX = self.mean
+ EX2 = self._exp2()
+ arg = EX2 - (EX*EX)
+ if arg < -0.05:
+ plog("WARN", "Diff of "+str(EX2)+" and "+str(EX)+"^2 is "+str(arg))
+ return math.sqrt(abs(arg))
+
+ def add_bw(self, bytes, duration):
+ if not bytes: plog("WARN", "No bytes for bandwidth")
+ bytes /= 1024.
+ self.byte_list.append(bytes)
+ self.duration_list.append(duration)
+ bw = bytes/duration
+ plog("DEBUG", "Got bandwidth "+str(bw))
+ if self.min_bw > bw: self.min_bw = bw
+ if self.max_bw < bw: self.max_bw = bw
+ self.mean = self._exp()
+ self.dev = self._dev()
+
# Technically we could just add member vars as we need them, but this
# is a bit more clear
class StatsRouter(TorCtl.Router):
@@ -62,7 +108,7 @@
self.strm_succeeded = 0
self.strm_suspected = 0 # disjoint from failed (for verification only)
self.strm_uncounted = 0
- self.strm_chosen = 0 # above 3 should add to this
+ self.strm_chosen = 0 # above 4 should add to this
self.reason_suspected = {}
self.reason_failed = {}
self.first_seen = time.time()
@@ -74,10 +120,24 @@
self.hibernated_at = self.first_seen
self.total_hibernation_time = 0
self.total_active_uptime = 0
- self.max_bw = 0
- self.min_bw = 0
- self.avg_bw = 0
+ self.total_extend_time = 0
+ self.total_extended = 0
+ self.bwstats = BandwidthStats()
+ self.z_ratio = 0
+ self.prob_zr = 0
+ self.z_bw = 0
+ self.prob_zb = 0
+ def avg_extend_time(self):
+ if self.total_extended:
+ return self.total_extend_time/self.total_extended
+ else: return 0
+
+ def bw_ratio(self):
+ bw = self.bwstats.mean
+ if bw == 0.0: return 0
+ else: return self.bw/(1024.*bw)
+
def current_uptime(self):
if self.became_active_at:
ret = (self.total_active_uptime+(time.time()-self.became_active_at))
@@ -105,25 +165,41 @@
def _succeeded_per_hour(self):
return (3600.*(self.circ_succeeded+self.strm_succeeded))/self.current_uptime()
-
+
+ key = """Metatroller Statistics:
+ CC=Circuits Chosen CF=Circuits Failed CS=Circuit Suspected
+ SC=Streams Chosen SF=Streams Failed SS=Streams Suspected
+ FH=Failed per Hour SH=Suspected per Hour ET=avg circuit Extend Time (s)
+ EB=mean BW (K) BD=BW std Dev (K) BR=Ratio of observed to avg BW
+ ZB=BW z-test value PB=Probability(z-bw) ZR=Ratio z-test value
+ PR=Prob(z-ratio) U=Uptime (h)\n"""
+
def __str__(self):
- return (self.idhex+" ("+self.nickname+")\n\t"
+ return (self.idhex+" ("+self.nickname+")\n"
+ +" CC="+str(self.circ_chosen)
+" CF="+str(self.circ_failed)
+" CS="+str(self.circ_suspected+self.circ_failed)
- +" CC="+str(self.circ_chosen)
+ +" SC="+str(self.strm_chosen)
+" SF="+str(self.strm_failed)
+" SS="+str(self.strm_suspected+self.strm_failed)
- +" SC="+str(self.strm_chosen)
- +" FH="+str(round(self.failed_per_hour(),2))
- +" SH="+str(round(self.suspected_per_hour(),2))
- +" Up="+str(round(self.current_uptime()/3600, 1))+"h\n")
+ +" FH="+str(round(self.failed_per_hour(),1))
+ +" SH="+str(round(self.suspected_per_hour(),1))+"\n"
+ +" ET="+str(round(self.avg_extend_time(),1))
+ +" EB="+str(round(self.bwstats.mean,1))
+ +" BD="+str(round(self.bwstats.dev,1))
+ +" ZB="+str(round(self.z_bw,1))
+ +" PB="+(str(round(self.prob_zb,3))[1:])
+ +" BR="+str(round(self.bw_ratio(),1))
+ +" ZR="+str(round(self.z_ratio,1))
+ +" PR="+(str(round(self.prob_zr,3))[1:])
+ +" U="+str(round(self.current_uptime()/3600, 1))+"\n")
def sanity_check(self):
- if self.circ_failed + self.circ_succeeded + self.circ_suspected \
- + self.circ_uncounted != self.circ_chosen:
+ if (self.circ_failed + self.circ_succeeded + self.circ_suspected
+ + self.circ_uncounted != self.circ_chosen):
plog("ERROR", self.nickname+" does not add up for circs")
- if self.strm_failed + self.strm_succeeded + self.strm_suspected \
- + self.strm_uncounted != self.strm_chosen:
+ if (self.strm_failed + self.strm_succeeded + self.strm_suspected
+ + self.strm_uncounted != self.strm_chosen):
plog("ERROR", self.nickname+" does not add up for streams")
def check_reasons(reasons, expected, which, rtype):
count = 0
@@ -147,7 +223,8 @@
self._suspected_per_hour()+self._succeeded_per_hour(), 2)
chosen_tot = round(self._chosen_per_hour(), 2)
if per_hour_tot != chosen_tot:
- plog("ERROR", self.nickname+" has mismatch of per hour counts: "+str(per_hour_tot) +" vs "+str(chosen_tot))
+ plog("ERROR", self.nickname+" has mismatch of per hour counts: "
+ +str(per_hour_tot) +" vs "+str(chosen_tot))
class ReasonRouterList:
"Helper class to track which reasons are in which routers."
@@ -160,14 +237,15 @@
def write_list(self, f):
rlist = self.sort_list()
for r in rlist:
+ susp = 0
f.write(r.idhex+" ("+r.nickname+") Fail=")
if self.reason in r.reason_failed:
- f.write(str(r.reason_failed[self.reason]))
- else: f.write("0")
+ susp = r.reason_failed[self.reason]
+ f.write(str(susp))
f.write(" Susp=")
if self.reason in r.reason_suspected:
- f.write(str(r.reason_suspected[self.reason])+"\n")
- else: f.write("0\n")
+ susp += r.reason_suspected[self.reason]
+ f.write(str(susp)+"\n")
def add_r(self, r):
self.rlist[r] = 1
@@ -221,23 +299,53 @@
return reduce(lambda x, y: x + y.reason_failed[self.reason],
self.rlist.iterkeys(), 0)
+
class StatsHandler(PathSupport.PathBuilder):
def __init__(self, c, slmgr):
PathBuilder.__init__(self, c, slmgr, StatsRouter)
self.failed_reasons = {}
self.suspect_reasons = {}
+ def run_zbtest(self): # Unweighted z-test
+ n = reduce(lambda x, y: x+(y.bwstats.mean > 0), self.sorted_r, 0)
+ if n == 0: return
+ avg = reduce(lambda x, y: x+y.bwstats.mean, self.sorted_r, 0)/float(n)
+ def notlambda(x, y):
+ if y.bwstats.mean <= 0: return x+0
+ else: return x+(y.bwstats.mean-avg)*(y.bwstats.mean-avg)
+ stddev = math.sqrt(reduce(notlambda, self.sorted_r, 0)/float(n))
+ for r in self.sorted_r:
+ if r.bwstats.mean > 0:
+ r.z_bw = abs((r.bwstats.mean-avg)/stddev)
+ r.prob_zb = TorUtil.zprob(-r.z_bw)
+ return (avg, stddev)
+
+ def run_zrtest(self): # Unweighted z-test
+ n = reduce(lambda x, y: x+(y.bw_ratio() > 0), self.sorted_r, 0)
+ if n == 0: return
+ avg = reduce(lambda x, y: x+y.bw_ratio(), self.sorted_r, 0)/float(n)
+ def notlambda(x, y):
+ if y.bw_ratio() <= 0: return x+0
+ else: return x+(y.bw_ratio()-avg)*(y.bw_ratio()-avg)
+ stddev = math.sqrt(reduce(notlambda, self.sorted_r, 0)/float(n))
+ for r in self.sorted_r:
+ if r.bw_ratio() > 0:
+ r.z_ratio = abs((r.bw_ratio()-avg)/stddev)
+ r.prob_zr = TorUtil.zprob(-r.z_ratio)
+ return (avg, stddev)
+
def write_reasons(self, f, reasons, name):
- f.write("\n\n\t------------------- "+name+" -------------------\n")
+ f.write("\n\n\t----------------- "+name+" -----------------\n")
for rsn in reasons:
f.write("\nReason="+rsn.reason+". Failed: "+str(rsn.total_failed())
+", Suspected: "+str(rsn.total_suspected())+"\n")
rsn.write_list(f)
def write_routers(self, f, rlist, name):
- f.write("\n\n\t------------------- "+name+" -------------------\n\n")
+ f.write("\n\n\t----------------- "+name+" -----------------\n\n")
for r in rlist:
- f.write(str(r))
+ # only print it if we've used it.
+ if r.circ_chosen+r.strm_chosen > 0: f.write(str(r))
def write_stats(self, filename):
plog("DEBUG", "Writing stats")
@@ -259,7 +367,18 @@
for rsn in self.suspect_reasons.itervalues(): rsn._verify_suspected()
f = file(filename, "w")
+ f.write(StatsRouter.key)
+ (avg, dev) = self.run_zbtest()
+ f.write("\n\nBW stats: u="+str(round(avg,1))+" s="+str(round(dev,1))+"\n")
+ (avg, dev) = self.run_zrtest()
+ f.write("BW ratio stats: u="+str(round(avg,1)) +" s="+str(round(dev,1)))
+
+ # sort+print by bandwidth
+ bw_rate = copy.copy(self.sorted_r)
+ bw_rate.sort(lambda x, y: cmp(y.bw_ratio(), x.bw_ratio()))
+ self.write_routers(f, bw_rate, "Bandwidth Ratios")
+
# FIXME: Print out key/legend header
failed = copy.copy(self.sorted_r)
failed.sort(lambda x, y:
@@ -274,18 +393,18 @@
self.write_routers(f, suspected, "Suspected Counts")
fail_rate = copy.copy(failed)
- fail_rate.sort(lambda x, y:
- cmp(y.failed_per_hour(), x.failed_per_hour()))
+ fail_rate.sort(lambda x, y: cmp(y.failed_per_hour(), x.failed_per_hour()))
self.write_routers(f, fail_rate, "Fail Rates")
suspect_rate = copy.copy(suspected)
suspect_rate.sort(lambda x, y:
cmp(y.suspected_per_hour(), x.suspected_per_hour()))
self.write_routers(f, suspect_rate, "Suspect Rates")
-
+
# TODO: Sort by failed/selected and suspect/selected ratios
# if we ever want to do non-uniform scanning..
+ # XXX: Add failed in here somehow..
susp_reasons = self.suspect_reasons.values()
susp_reasons.sort(lambda x, y:
cmp(y.total_suspected(), x.total_suspected()))
@@ -297,9 +416,13 @@
self.write_reasons(f, fail_reasons, "Failed Reasons")
f.close()
+ # FIXME: sort+print by circ extend time
+
def reset_stats(self):
- for r in self.sorted_r:
- r.reset()
+ plog("DEBUG", "Resetting stats")
+ self.suspect_reasons.clear()
+ self.failed_reasons.clear()
+ for r in self.sorted_r: r.reset()
# TODO: Use stream bandwidth events to implement reputation system
# from
@@ -316,7 +439,13 @@
if c.remote_reason: rreason = c.remote_reason
else: rreason = "NONE"
reason = c.event_name+":"+c.status+":"+lreason+":"+rreason
- if c.status == "FAILED":
+ if c.status == "EXTENDED":
+ delta = time.time() - self.circuits[c.circ_id].last_extended_at
+ r_ext = c.path[-1]
+ if r_ext[0] != '$': r_ext = self.name_to_key[r_ext]
+ self.routers[r_ext[1:]].total_extend_time += delta
+ self.routers[r_ext[1:]].total_extended += 1
+ elif c.status == "FAILED":
# update selection count
for r in self.circuits[c.circ_id].path: r.circ_chosen += 1
@@ -366,6 +495,7 @@
PathBuilder.circ_status_event(self, c)
def stream_status_event(self, s):
+ # XXX: Verify circ id matches stream.circ
if s.strm_id in self.streams:
# TODO: Hrmm, consider making this sane in TorCtl.
if s.reason: lreason = s.reason
@@ -373,8 +503,8 @@
if s.remote_reason: rreason = s.remote_reason
else: rreason = "NONE"
reason = s.event_name+":"+s.status+":"+lreason+":"+rreason+":"+self.streams[s.strm_id].kind
- if s.status in ("DETACHED", "FAILED", "CLOSED", "SUCCEEDED") \
- and not s.circ_id:
+ if (s.status in ("DETACHED", "FAILED", "CLOSED", "SUCCEEDED")
+ and not s.circ_id):
# XXX: REMAPs can do this (normal). Also REASON=DESTROY (bug?)
# Also timeouts.. Those should use the pending circ instead
# of returning..
@@ -385,6 +515,17 @@
# Update strm_chosen count
# FIXME: use SENTRESOLVE/SENTCONNECT instead?
for r in self.circuits[s.circ_id].path: r.strm_chosen += 1
+
+ # Update bw stats
+ if self.streams[s.strm_id].attached_at:
+ if s.status == "DETACHED":
+ plog("WARN", str(s.strm_id)+" detached after succeeded")
+ lifespan = self.streams[s.strm_id].lifespan()
+ for r in self.streams[s.strm_id].circ.path:
+ r.bwstats.add_bw(self.streams[s.strm_id].bytes_written+
+ self.streams[s.strm_id].bytes_read,
+ lifespan)
+
# Update failed count,reason_failed for exit
r = self.circuits[s.circ_id].exit
if not reason in r.reason_failed: r.reason_failed[reason] = 1
@@ -411,9 +552,19 @@
# Always get both a closed and a failed..
# - Check if the circuit exists still
# XXX: Save both closed and failed reason in stream object
+ # and rely on a flag instead of this
if s.circ_id in self.circuits:
# Update strm_chosen count
for r in self.circuits[s.circ_id].path: r.strm_chosen += 1
+
+ # Update bw stats
+ if self.streams[s.strm_id].attached_at:
+ lifespan = self.streams[s.strm_id].lifespan()
+ for r in self.streams[s.strm_id].circ.path:
+ r.bwstats.add_bw(self.streams[s.strm_id].bytes_written+
+ self.streams[s.strm_id].bytes_read,
+ lifespan)
+
if lreason in ("TIMEOUT", "INTERNAL", "TORPROTOCOL" "DESTROY"):
for r in self.circuits[s.circ_id].path[:-1]:
r.strm_suspected += 1
@@ -428,7 +579,7 @@
r.strm_uncounted += 1
r = self.circuits[s.circ_id].exit
- if lreason == "DONE":
+ if lreason == "DONE" or (lreason == "END" and rreason == "DONE"):
r.strm_succeeded += 1
else:
if not reason in r.reason_failed:
@@ -491,8 +642,6 @@
s.write("250 NEWNYM OK\r\n")
elif command == "GETDNSEXIT":
pass # TODO: Takes a hostname? Or prints most recent?
- elif command == "RESETSTATS":
- s.write("250 OK\r\n")
elif command == "ORDEREXITS":
try:
if arg:
@@ -586,9 +735,15 @@
h.schedule_low_prio(notlambda)
s.write("250 OK\r\n")
elif command == "RESETSTATS":
+ plog("DEBUG", "Got resetstats")
def notlambda(this): this.reset_stats()
h.schedule_low_prio(notlambda)
s.write("250 OK\r\n")
+ elif command == "COMMIT":
+ plog("DEBUG", "Got commit")
+ def notlambda(this): this.run_all_jobs = True
+ h.schedule_immediate(notlambda)
+ s.write("250 OK\r\n")
elif command == "HELP":
s.write("250 OK\r\n")
else:
@@ -619,8 +774,10 @@
h = StatsHandler(c, __selmgr)
c.set_event_handler(h)
c.set_events([TorCtl.EVENT_TYPE.STREAM,
+ TorCtl.EVENT_TYPE.BW,
TorCtl.EVENT_TYPE.NS,
TorCtl.EVENT_TYPE.CIRC,
+ TorCtl.EVENT_TYPE.STREAM_BW,
TorCtl.EVENT_TYPE.NEWDESC], True)
c.set_option("__LeaveStreamsUnattached", "1")
return (c,h)
Modified: torflow/trunk/soat.pl
===================================================================
--- torflow/trunk/soat.pl 2007-03-15 16:28:01 UTC (rev 9830)
+++ torflow/trunk/soat.pl 2007-03-15 18:58:51 UTC (rev 9831)
@@ -60,32 +60,6 @@
my $LOG_LEVEL = "DEBUG";
my %log_levels = ("DEBUG", 0, "INFO", 1, "NOTICE", 2, "WARN", 3, "ERROR", 4);
-
-my %mt_circ_sel_counts;
-my %mt_strm_sel_counts;
-my %mt_reason_counts;
-my %mt_fail_counts;
-my %mt_fail_totals; # actually 5 vars in metatroller
-
-my %mt_suspect_circ_sel_counts;
-my %mt_suspect_strm_sel_counts;
-my %mt_suspect_reason_counts;
-my %mt_suspect_counts;
-
-my %total_circ_sel_counts;
-my %total_strm_sel_counts;
-my %total_reason_counts;
-my %total_fail_counts;
-my %total_fail_totals; # actually 5 vars in metatroller
-
-my %total_suspect_circ_sel_counts;
-my %total_suspect_strm_sel_counts;
-my %total_suspect_counts;
-my %total_suspect_reason_counts;
-
-
-my %key_names;
-
sub plog
{
my $level = shift;
@@ -681,321 +655,6 @@
plog "INFO", "Checked all ssh hosts\n";
}
-sub parse_failrates
-{
- my $file = shift;
- my $hash_fail = shift;
- my $hash_circ = shift;
- my $hash_strm = shift;
- my $hash_totals = shift;
-
- while(1) {
- my $line = <$file>;
-
- if($line =~ /^250 FAILCOUNT/) {
- $line =~ /FAILCOUNT ([\S]+) \(([\S]+)\) = ([\d]+)\//;
- my $key = $1;
- my $name = $2;
- my $fail = $3;
- # Hack because for reason failure lines there is an extra element
- # (middle=total failures)
- $line =~ /\/([\d]+)\+([\d]+)/;
- my $circ_sel = $1;
- my $strm_sel = $2;
-
- if($key_names{$key} and (not $key_names{$key} eq $name)) {
- plog "NOTICE", "$name for $key is not the same as $key_names{$key}\n";
- }
- $key_names{$key} = $name;
- if($$hash_fail{$key}) {
- $$hash_fail{$key} += $fail;
- } else {
- $$hash_fail{$key} = $fail;
- }
-
- if($hash_circ) {
- if($$hash_circ{$key}) {
- $$hash_circ{$key} += $circ_sel;
- } else {
- $$hash_circ{$key} = $circ_sel;
- }
- }
- if($hash_strm) {
- if($$hash_strm{$key}) {
- $$hash_strm{$key} += $strm_sel;
- } else {
- $$hash_strm{$key} = $strm_sel;
- }
- }
-
- } else {
- if($hash_totals) {
- if($line =~ /^250 FAILTOTALS ([\d]+)\/([\d]+) ([\d]+)\+([\d]+)\/([\d]+)/) {
- $$hash_totals{"CIRCUITFAILED"} = $1;
- $$hash_totals{"CIRCUITCOUNTS"} = $2;
- $$hash_totals{"STREAMDETACHED"} = $3;
- $$hash_totals{"STREAMFAILURES"} = $4;
- $$hash_totals{"STREAMCOUNTS"} = $5;
- plog "DEBUG", "Got fail totals\n";
- }
- }
- last;
- }
- }
-}
-
-sub read_failrates
-{
- my $mcp = shift;
- my $file = shift;
- my $cmd = shift;
- my $mt_fcnt = shift;
- my $tot_fcnt = shift;
- my $mt_circ_sel_cnt = shift;
- my $tot_circ_sel_cnt = shift;
- my $mt_strm_sel_cnt = shift;
- my $tot_strm_sel_cnt = shift;
- my $mt_totals = shift;
- my $tot_totals = shift;
-
- return;
- # Hack to avoid counting previous run
- print $mcp "$cmd\r\n";
- parse_failrates($mcp, $mt_fcnt, $mt_circ_sel_cnt, $mt_strm_sel_cnt, $mt_totals);
-
- if(not open(FAILFILE, '<', $file)) {
- return;
- }
-
- parse_failrates(*FAILFILE, $tot_fcnt, $tot_circ_sel_cnt, $tot_strm_sel_cnt, $tot_totals);
- close(FAILFILE);
-}
-
-sub write_failrates
-{
- my $mcp = shift;
- my $file = shift;
- my $cmd = shift;
- my $mt_fcnt = shift;
- my $tot_fcnt = shift;
- my $mt_circ_sel_cnt = shift;
- my $tot_circ_sel_cnt = shift;
- my $mt_strm_sel_cnt = shift;
- my $tot_strm_sel_cnt = shift;
- my $mt_totals = shift;
- my $tot_totals = shift;
- my %fail_counts;
- my %circ_counts;
- my %strm_counts;
- my %rate_totals;
- return;
-
- print $mcp "$cmd\r\n";
- parse_failrates($mcp, \%fail_counts, \%circ_counts, \%strm_counts, \%rate_totals);
-
- # Get delta
- foreach(keys(%fail_counts)) {
- if(not $$tot_fcnt{$_}) {
- $$tot_circ_sel_cnt{$_} = 0;
- $$tot_strm_sel_cnt{$_} = 0;
- $$tot_fcnt{$_} = 0;
- }
-
- if(not $$mt_fcnt{$_}) {
- $$mt_fcnt{$_} = 0;
- $$mt_circ_sel_cnt{$_} = 0;
- $$mt_strm_sel_cnt{$_} = 0;
- }
-
- # Update our totals only if the change is positive (ie no restart)
- if(($fail_counts{$_} - $$mt_fcnt{$_}) > 0) {
- $$tot_fcnt{$_} += ($fail_counts{$_} - $$mt_fcnt{$_});
- $$tot_circ_sel_cnt{$_} += ($circ_counts{$_} - $$mt_circ_sel_cnt{$_});
- $$tot_strm_sel_cnt{$_} += ($strm_counts{$_} - $$mt_strm_sel_cnt{$_});
- }
-
- # Store MT totals
- $$mt_fcnt{$_} = $fail_counts{$_};
- $$mt_circ_sel_cnt{$_} = $circ_counts{$_};
- $$mt_strm_sel_cnt{$_} = $strm_counts{$_};
- }
-
- if($tot_totals) {
-
- if(($rate_totals{"STREAMCOUNTS"} - $$mt_totals{"STREAMCOUNTS"}) > 0) {
-
- $$tot_totals{"CIRCUITFAILED"} +=
- $rate_totals{"CIRCUITFAILED"} - $$mt_totals{"CIRCUITFAILED"};
- $$tot_totals{"CIRCUITCOUNTS"} +=
- $rate_totals{"CIRCUITCOUNTS"} - $$mt_totals{"CIRCUITCOUNTS"};
- $$tot_totals{"STREAMDETACHED"} +=
- $rate_totals{"STREAMDETACHED"} - $$mt_totals{"STREAMDETACHED"};
- $$tot_totals{"STREAMFAILURES"} +=
- $rate_totals{"STREAMFAILURES"} - $$mt_totals{"STREAMFAILURES"};
- $$tot_totals{"STREAMCOUNTS"} +=
- $rate_totals{"STREAMCOUNTS"} - $$mt_totals{"STREAMCOUNTS"};
-
- }
- $$mt_totals{"CIRCUITFAILED"} = $rate_totals{"CIRCUITFAILED"};
- $$mt_totals{"CIRCUITCOUNTS"} = $rate_totals{"CIRCUITCOUNTS"};
- $$mt_totals{"STREAMDETACHED"} = $rate_totals{"STREAMDETACHED"};
- $$mt_totals{"STREAMFAILURES"} = $rate_totals{"STREAMFAILURES"};
- $$mt_totals{"STREAMCOUNTS"} = $rate_totals{"STREAMCOUNTS"};
-
- }
-
- # Use global, not arg (which may be undef)
- my @sorted_r = sort {
- $$tot_fcnt{$b} <=> $$tot_fcnt{$a}
- } keys %$tot_fcnt;
-
- if(not open(FAILFILE, '>', $file)) {
- die "Can't open $file to save fail rate table";
- }
-
- foreach(@sorted_r) {
- print FAILFILE "250 FAILCOUNT $_ ($key_names{$_}) = $$tot_fcnt{$_}/$$tot_circ_sel_cnt{$_}+$$tot_strm_sel_cnt{$_}\n";
- }
-
- if($tot_totals) {
- print FAILFILE "250 FAILTOTALS " . $$tot_totals{"CIRCUITFAILED"} ."/".
- $$tot_totals{"CIRCUITCOUNTS"} ." ". $$tot_totals{"STREAMDETACHED"} ."+".
- $$tot_totals{"STREAMFAILURES"} ."/". $$tot_totals{"STREAMCOUNTS"} ." OK\n";
- } else {
- print FAILFILE "250 OK\n";
- }
-
- close(FAILFILE);
- plog "INFO", "Updated failure counts\n";
-}
-
-
-sub parse_reasons
-{
- my $file = shift;
- my $hash = shift;
-
- while( 1 ) {
- my $line = <$file>;
- my $reason;
- if($line =~ /^250 REASON=([\S]+)/) {
- $reason = $1;
- } elsif($line =~ /^250 OK/) {
- plog "DEBUG", "End reason parsing on $line\n";
- return;
- } else {
- plog "NOTICE", "Weird end line $line\n";
- return;
- }
-
- if(not $$hash{$reason}) {
- $$hash{$reason} = {};
- }
-
- parse_failrates($file, $$hash{$reason});
- }
-}
-
-sub read_reasons
-{
- my $mcp = shift;
- my $file = shift;
- my $cmd = shift;
- my $mt_rc = shift;
- my $tot_rc = shift;
- return;
-
- # Hack to avoid double-counting
- print $mcp "$cmd\r\n";
- parse_reasons($mcp, $mt_rc);
-
- if(not open(FAILFILE, '<', $file)) {
- return;
- }
-
- parse_reasons(*FAILFILE, $tot_rc);
- close(FAILFILE);
-}
-
-sub write_reasons
-{
- my $mcp = shift;
- my $file = shift;
- my $cmd = shift;
- my $mt_rcnt = shift;
- my $tot_rcnt = shift;
- my $tot_fcnt = shift;
- my $tot_circ_sel_cnt = shift;
- my $tot_strm_sel_cnt = shift;
- my %curr_reason_counts;
-
- return;
- print $mcp "$cmd\r\n";
- parse_reasons($mcp, \%curr_reason_counts);
-
- # Get delta
- foreach(keys(%curr_reason_counts)) {
- if(not $$tot_rcnt{$_}) {
- $$tot_rcnt{$_} = {};
- }
- if(not $$mt_rcnt{$_}) {
- $$mt_rcnt{$_} = {};
- }
- my $curr_hash = $curr_reason_counts{$_};
- my $total_hash = $$tot_rcnt{$_};
- my $mt_hash = $$mt_rcnt{$_};
-
- foreach(keys(%$curr_hash)) {
-
- if(not $$total_hash{$_}) {
- $$total_hash{$_} = 0;
- }
-
- if(not $$mt_hash{$_}) {
- $$mt_hash{$_} = 0;
- }
-
- # Update our totals
- if(($$curr_hash{$_} - $$mt_hash{$_}) > 0) {
- $$total_hash{$_} += ($$curr_hash{$_} - $$mt_hash{$_});
- }
-
- # Store MT totals
- $$mt_hash{$_} = $$curr_hash{$_};
- }
- }
-
- my $failed_total = 0;
- foreach(keys(%$tot_fcnt)) {
- $failed_total += $$tot_fcnt{$_};
- }
-
- if(not open(FAILFILE, '>', $file)) {
- die "Can't open $file to save fail rate table";
- }
-
- foreach(keys(%$tot_rcnt)) {
- print FAILFILE "250 REASON=$_\r\n";
- my $reason_hash = $$tot_rcnt{$_};
- my $reason_total = 0;
-
- my @sorted_r = sort {
- $$reason_hash{$b} <=> $$reason_hash{$a}
- } keys %$reason_hash;
-
- foreach(@sorted_r) {
- print FAILFILE "250 FAILCOUNT $_ ($key_names{$_}) = $$reason_hash{$_}/$$tot_fcnt{$_}/$$tot_circ_sel_cnt{$_}+$$tot_strm_sel_cnt{$_}\r\n";
- $reason_total += $$reason_hash{$_};
- }
- print FAILFILE "250 REASONTOTAL $reason_total/$failed_total\r\n";
- }
- print FAILFILE "250 OK\r\n";
-
- close(FAILFILE);
- plog "INFO", "Updated failure counts\n";
-}
-
-
sub main
{
my $mcp = IO::Socket::INET->new(
@@ -1035,22 +694,6 @@
delete $ENV{"HTTP_PROXY"};
delete $ENV{"proxy"};
delete $ENV{"PROXY"};
-
- read_failrates($mcp, $DOC_DIR . "/naive_fail_rates",
- "FAILRATES", \%mt_fail_counts, \%total_fail_counts,
- \%mt_circ_sel_counts, \%total_circ_sel_counts,
- \%mt_strm_sel_counts, \%total_strm_sel_counts,
- \%mt_fail_totals, \%total_fail_totals);
- read_failrates($mcp, $DOC_DIR . "/suspected_rates",
- "SUSPECTRATES", \%mt_suspect_counts, \%total_suspect_counts,
- \%mt_suspect_circ_sel_counts, \%total_suspect_circ_sel_counts,
- \%mt_suspect_strm_sel_counts, \%total_suspect_strm_sel_counts);
-
- read_reasons($mcp, $DOC_DIR . "/naive_fail_reasons",
- "FAILREASONS", \%mt_reason_counts, \%total_reason_counts);
- read_reasons($mcp, $DOC_DIR . "/suspected_reasons",
- "SUSPECTREASONS", \%mt_suspect_reason_counts,
- \%total_suspect_reason_counts);
if(is_in("urls", \@TO_SCAN)) {
@DOC_URLS = (@DOC_URLS, get_doc_list());
@@ -1089,25 +732,6 @@
print $mcp "SAVESTATS\r\n";
$line = <$mcp>;
die "Error saving stats: $line" if (not $line =~ /^250/);
-
- write_failrates($mcp, $DOC_DIR . "/naive_fail_rates",
- "FAILRATES", \%mt_fail_counts, \%total_fail_counts,
- \%mt_circ_sel_counts, \%total_circ_sel_counts,
- \%mt_strm_sel_counts, \%total_strm_sel_counts,
- \%mt_fail_totals, \%total_fail_totals);
- write_failrates($mcp, $DOC_DIR . "/suspected_rates",
- "SUSPECTRATES", \%mt_suspect_counts, \%total_suspect_counts,
- \%mt_suspect_circ_sel_counts, \%total_suspect_circ_sel_counts,
- \%mt_suspect_strm_sel_counts, \%total_suspect_strm_sel_counts);
-
- write_reasons($mcp, $DOC_DIR . "/naive_fail_reasons",
- "FAILREASONS", \%mt_reason_counts, \%total_reason_counts,
- \%total_fail_counts, \%total_circ_sel_counts, \%total_strm_sel_counts);
- write_reasons($mcp, $DOC_DIR . "/suspected_reasons",
- "SUSPECTREASONS", \%mt_suspect_reason_counts,
- \%total_suspect_reason_counts, \%total_suspect_counts,
- \%total_suspect_circ_sel_counts,
- \%total_suspect_strm_sel_counts);
}
}
Added: torflow/trunk/speedracer.pl
===================================================================
--- torflow/trunk/speedracer.pl 2007-03-15 16:28:01 UTC (rev 9830)
+++ torflow/trunk/speedracer.pl 2007-03-15 18:58:51 UTC (rev 9831)
@@ -0,0 +1,251 @@
+#!/usr/bin/perl -w
+
+
+use strict;
+use IO::Socket;
+use IO::Socket::INET;
+use Time::HiRes qw( usleep ualarm gettimeofday tv_interval );
+
+my $META_PORT = "9052";
+my $META_HOST = "127.0.0.1";
+
+my $USER_AGENT = "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; .NET CLR 1.0.3705; .NET CLR 1.1.4322)";
+
+my $SOCKS_PROXY = "127.0.0.1:9060";
+my $CURL_PROXY = "--socks5 $SOCKS_PROXY";
+
+# http://bitter.stalin.se/torfile
+# http://www.sigma.su.se/~who/torfile
+my $URL = "http://130.237.152.195/~who/torfile";
+my $COUNT = 400;
+my $START_PCT = 0;
+my $STOP_PCT = 21;
+my $PCT_STEP = 7;
+
+my $LOG_LEVEL = "DEBUG";
+my %log_levels = ("DEBUG", 0, "INFO", 1, "NOTICE", 2, "WARN", 3, "ERROR", 4);
+
+
+sub plog
+{
+ my $level = shift;
+ my $msg = shift;
+ my ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) = localtime(time);
+
+ $year += 1900; # lame.
+ $mon += 1;
+
+ print "$level \[" . localtime() . "\]: " . $msg if($log_levels{$level} >= $log_levels{$LOG_LEVEL})
+ #print "$level\[$year-$mon-$mday $hour:$min:$sec\]: " . $msg if($log_levels{$level} >= $log_levels{$LOG_LEVEL})
+}
+
+sub is_in
+{
+ my $element = shift;
+ my $ary = shift;
+ my $is_there = 0;
+ foreach (@$ary) {
+ if ($_ eq $element) {
+ $is_there = 1;
+ last;
+ }
+ }
+
+ return $is_there;
+}
+
+sub compare_arrays {
+ my ($first, $second) = @_;
+ no warnings; # silence spurious -w undef complaints
+ return 0 unless @$first == @$second;
+ for (my $i = 0; $i < @$first; $i++) {
+ return 0 if $first->[$i] ne $second->[$i];
+ }
+ return 1;
+}
+
+sub query_exit
+{
+ my $mcp = shift;
+ my $line;
+ print $mcp "GETLASTEXIT\r\n";
+ $line = <$mcp>;
+ $line =~ /LASTEXIT=([\S]+)/;
+
+ return $1;
+}
+
+
+sub speedrace
+{
+ my $mcp = shift;
+ my $skip = shift;
+ my $pct = shift;
+ my @build_times;
+ my @fetch_times;
+ my $tot_fetch_time = 0;
+ my $tot_build_time = 0;
+ my $i = 0;
+ my $line;
+
+ # Weak new-nym
+ print $mcp "PERCENTSKIP $skip\r\n";
+ $line = <$mcp>;
+ die "Error setting percentskip: $line" if (not $line =~ /^250/);
+
+ print $mcp "PERCENTFAST $pct\r\n";
+ $line = <$mcp>;
+ die "Error setting percentfast: $line" if (not $line =~ /^250/);
+
+ # So this is a really big hack. Since metatroller builds circuits on
+ # the fly where as tor has a pool of pre-built circuits to use,
+ # we want to get it to build a circuit for us but not count
+ # that construction time. The way we do this is to issue
+ # a NEWNYM and then get the url TWICE.
+
+ while($#build_times+1 < $COUNT) {
+ my $t0;
+ my $delta_build;
+ my $delta_fetch;
+ my $fetch_exit;
+ my $build_exit;
+ my $ret;
+
+ print $mcp "NEWNYM\r\n";
+ $line = <$mcp>;
+ die "Error sending NEWNYM: $line" if (not $line =~ /^250/);
+
+ # Build the circuit...
+ do {
+ $i++;
+
+ $t0 = [gettimeofday()];
+ $ret =
+# system("tsocks wget -U \"$USER_AGENT\" \'$URL\' -O - >& /dev/null");
+ system("curl $CURL_PROXY -m 600 -A \"$USER_AGENT\" \'$URL\' >& /dev/null");
+
+ if($ret == 2) {
+ plog "NOTICE", "wget got Sigint. Dying\n";
+ exit;
+ }
+ plog "NOTICE", "wget failed with ret=$ret.. Retrying...\n"
+ if($ret != 0);
+ $delta_build = tv_interval $t0;
+ plog "NOTICE", "Timer exceeded limit: $delta_build\n"
+ if($delta_build >= 550.0);
+ } while($ret != 0 || $delta_build >= 550.0);
+
+ $build_exit = query_exit($mcp);
+
+ plog "DEBUG", "Got 1st via $build_exit\n";
+
+ # Now do it for real
+
+ do {
+ $i++;
+ $t0 = [gettimeofday()];
+ $ret =
+# system("tsocks wget -U \"$USER_AGENT\" \'$URL\' -O - >& /dev/null");
+ system("curl $CURL_PROXY -m 600 -A \"$USER_AGENT\" \'$URL\' >& /dev/null");
+
+ if($ret == 2) {
+ plog "NOTICE", "wget got Sigint. Dying\n";
+ exit;
+ }
+ plog "NOTICE", "wget failed with ret=$ret.. Retrying with clock still running\n"
+ if($ret != 0);
+ $delta_fetch = tv_interval $t0;
+ plog "NOTICE", "Timer exceeded limit: $delta_fetch\n"
+ if($delta_fetch >= 550.0);
+ } while($ret != 0 || $delta_fetch >= 550.0);
+
+ $fetch_exit = query_exit($mcp);
+
+ if($fetch_exit eq $build_exit) {
+ $tot_build_time += $delta_build;
+ push(@build_times, $delta_build);
+ plog "DEBUG", "$skip-$pct% circuit build+fetch took $delta_build for $fetch_exit\n";
+
+ push(@fetch_times, $delta_fetch);
+ $tot_fetch_time += $delta_fetch;
+ plog "DEBUG", "$skip-$pct% fetch took $delta_fetch for $fetch_exit\n";
+ } else {
+ plog "NOTICE", "Ignoring strange exit swap $build_exit -> $fetch_exit. Circuit failure?\n";
+ }
+ }
+ my $avg_build_time = $tot_build_time/($#build_times+1);
+ my $build_dev = 0;
+ foreach(@build_times) {
+ $build_dev +=
+ ($_ - $avg_build_time)*($_ - $avg_build_time);
+ }
+ $build_dev = sqrt($build_dev / ($#build_times+1));
+
+ my $avg_fetch_time = $tot_fetch_time/($#fetch_times+1);
+ my $fetch_dev = 0;
+ foreach(@fetch_times) {
+ $fetch_dev +=
+ ($_ - $avg_fetch_time)*($_ - $avg_fetch_time);
+ }
+ $fetch_dev = sqrt($fetch_dev / ($#fetch_times+1));
+ plog "INFO", "RANGE $skip-$pct " . ($#build_times+1) . " build+fetches: avg=$avg_build_time, dev=$build_dev\n";
+ plog "INFO", "RANGE $skip-$pct " . ($#fetch_times+1) . " fetches: avg=$avg_fetch_time, dev=$fetch_dev\n";
+ plog "INFO", " " . ($COUNT*2) . " fetches took $i tries\n";
+}
+
+sub main
+{
+ my $mcp = IO::Socket::INET->new(
+ Proto => "tcp",
+ PeerAddr => $META_HOST,
+ PeerPort => $META_PORT)
+ or die "The Metatroller is not enabled";
+ my $line = <$mcp>;
+ $line = <$mcp>;
+
+ delete $ENV{"http_proxy"};
+ delete $ENV{"HTTP_PROXY"};
+ delete $ENV{"proxy"};
+ delete $ENV{"PROXY"};
+
+ print $mcp "GUARDNODES 0\r\n";
+ $line = <$mcp>;
+ die "Error setting Guard Nodes: $line" if (not $line =~ /^250/);
+
+ print $mcp "UNIFORM 1\r\n";
+ $line = <$mcp>;
+ die "Error setting UNIFORM: $line" if (not $line =~ /^250/);
+
+ print $mcp "ORDEREXITS 1\r\n";
+ $line = <$mcp>;
+ die "Error setting ORDEREXITS: $line" if (not $line =~ /^250/);
+
+ print $mcp "PATHLEN 2\r\n";
+ $line = <$mcp>;
+ die "Error setting PATHLEN: $line" if (not $line =~ /^250/);
+
+ my $pct = $START_PCT;
+ plog "INFO", "Beginning time loop\n";
+
+ while($pct < $STOP_PCT) {
+ print $mcp "RESETSTATS\r\n";
+ $line = <$mcp>;
+ die "Error on RESETSTATS: $line" if (not $line =~ /^250/);
+ print $mcp "COMMIT\r\n";
+ $line = <$mcp>;
+ die "Error on COMMIT: $line" if (not $line =~ /^250/);
+ plog "DEBUG", "Reset stats\n";
+ speedrace($mcp, $pct, $pct+$PCT_STEP);
+ plog "DEBUG", "speedroced\n";
+ print $mcp "SAVESTATS ./speed/stats-$pct:".($pct+$PCT_STEP)."\r\n";
+ $line = <$mcp>;
+ die "Error on SAVESTATS: $line" if (not $line =~ /^250/);
+ plog "DEBUG", "Wrote stats\n";
+ $pct += $PCT_STEP;
+ print $mcp "COMMIT\r\n";
+ $line = <$mcp>;
+ die "Error on COMMIT: $line" if (not $line =~ /^250/);
+ }
+}
+
+main();
Property changes on: torflow/trunk/speedracer.pl
___________________________________________________________________
Name: svn:executable
+ *