[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[or-cvs] r19644: {torflow} Track slice source timestamp information on a per-node basis (torflow/trunk/NetworkScanners/BwAuthority)
Author: mikeperry
Date: 2009-06-06 22:29:01 -0400 (Sat, 06 Jun 2009)
New Revision: 19644
Modified:
torflow/trunk/NetworkScanners/BwAuthority/aggregate.py
torflow/trunk/NetworkScanners/BwAuthority/run_scan.sh
Log:
Track slice source timestamp information on a per-node basis
instead of per file. We were missing nodes that switched
slices during scans.
Modified: torflow/trunk/NetworkScanners/BwAuthority/aggregate.py
===================================================================
--- torflow/trunk/NetworkScanners/BwAuthority/aggregate.py 2009-06-07 00:24:07 UTC (rev 19643)
+++ torflow/trunk/NetworkScanners/BwAuthority/aggregate.py 2009-06-07 02:29:01 UTC (rev 19644)
@@ -9,7 +9,8 @@
from TorCtl.TorUtil import plog
from TorCtl import TorCtl,TorUtil
-bw_files = {}
+bw_files = []
+timestamps = {}
nodes = {}
prev_consensus = {}
ALPHA = 0.3333 # Prev consensus values count for 1/3 of the avg
@@ -29,29 +30,51 @@
min_item = i
return min_item
+class NodeData:
+ def __init__(self, timestamp):
+ self.strm_bw = []
+ self.filt_bw = []
+ self.ns_bw = []
+ self.timestamp = timestamp
+
class Node:
def __init__(self):
+ self.node_data = {}
self.idhex = None
self.nick = None
- self.strm_bw = []
- self.filt_bw = []
- self.ns_bw = []
self.chosen_sbw = None
self.chosen_fbw = None
self.sbw_ratio = None
self.fbw_ratio = None
self.ratio = None
self.new_bw = None
+ self.strm_bw = []
+ self.filt_bw = []
+ self.ns_bw = []
def add_line(self, line):
if self.idhex and self.idhex != line.idhex:
raise Exception("Line mismatch")
self.idhex = line.idhex
self.nick = line.nick
- self.strm_bw.append(line.strm_bw)
- self.filt_bw.append(line.filt_bw)
- self.ns_bw.append(line.ns_bw)
+ if line.slice_file not in self.node_data \
+ or self.node_data[line.slice_file].timestamp < line.timestamp:
+ self.node_data[line.slice_file] = NodeData(line.timestamp)
+ nd = self.node_data[line.slice_file]
+ nd.strm_bw.append(line.strm_bw)
+ nd.filt_bw.append(line.filt_bw)
+ nd.ns_bw.append(line.ns_bw)
+
+ self.strm_bw = []
+ self.filt_bw = []
+ self.ns_bw = []
+
+ for nd in self.node_data.itervalues():
+ self.strm_bw.extend(nd.strm_bw)
+ self.filt_bw.extend(nd.filt_bw)
+ self.ns_bw.extend(nd.ns_bw)
+
def avg_strm_bw(self):
return sum(self.strm_bw)/float(len(self.strm_bw))
@@ -72,12 +95,14 @@
return self.chosen_fbw
class Line:
- def __init__(self, line):
+ def __init__(self, line, slice_file, timestamp):
self.idhex = re.search("[\s]*node_id=([\S]+)[\s]*", line).group(1)
self.nick = re.search("[\s]*nick=([\S]+)[\s]*", line).group(1)
self.strm_bw = int(re.search("[\s]*strm_bw=([\S]+)[\s]*", line).group(1))
self.filt_bw = int(re.search("[\s]*filt_bw=([\S]+)[\s]*", line).group(1))
self.ns_bw = int(re.search("[\s]*ns_bw=([\S]+)[\s]*", line).group(1))
+ self.slice_file = slice_file
+ self.timestamp = timestamp
def main(argv):
TorUtil.read_config(argv[1]+"/scanner.1/bwauthority.cfg")
@@ -85,44 +110,59 @@
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((TorUtil.control_host,TorUtil.control_port))
c = TorCtl.Connection(s)
+ c.debug(file(argv[1]+"/aggregate-control.log", "w", buffering=0))
c.authenticate_cookie(file(argv[1]+"/scanner.1/tor-data/control_auth_cookie",
- "r"))
+ "r"))
+
ns_list = c.get_network_status()
+ ns_list.sort(lambda x, y: x.bandwidth < y.bandwidth)
+ got_ns_bw = False
for n in ns_list:
if n.bandwidth == None:
- plog("ERROR", "Your Tor is not providing NS w bandwidths!")
- sys.exit(0)
+ plog("NOTICE", "Your Tor is not providing NS w bandwidths for "+n.idhex)
+ else:
+ got_ns_bw = True
+ n.measured = False
prev_consensus["$"+n.idhex] = n
+ if not got_ns_bw:
+ # Sometimes the consensus lacks a descriptor. In that case,
+ # it will skip outputting
+ plog("ERROR", "Your Tor is not providing NS w bandwidths!")
+ sys.exit(0)
+
for da in argv[1:-1]:
# First, create a list of the most recent files in the
# scan dirs that are recent enough
for root, dirs, f in os.walk(da):
for ds in dirs:
- print ds
if re.match("^scanner.[\d+]$", ds):
- print ds
for sr, sd, files in os.walk(da+"/"+ds+"/scan-data"):
for f in files:
if re.search("^bws-[\S]+-done-", f):
- print sr+"/"+f
found_done = True
fp = file(sr+"/"+f, "r")
slicenum = sr+"/"+fp.readline()
timestamp = float(fp.readline())
fp.close()
- if slicenum not in bw_files \
- or bw_files[slicenum][0] < timestamp:
- bw_files[slicenum] = (timestamp, sr+"/"+f)
-
-
- for (t,f) in bw_files.itervalues():
+ bw_files.append((slicenum, timestamp, sr+"/"+f))
+ if slicenum not in timestamps or \
+ timestamps[slicenum] < timestamp:
+ timestamps[slicenum] = timestamp
+
+ # FIXME: Hrmm.. there may be edge cases here where we have
+ # an extra slice number that is never scanned again (ie the network
+ # shrinks). This will leave us with really old timestamps.
+ oldest_timestamp = min(timestamps.itervalues())
+
+ # Need to only use most recent slice-file for each node..
+ for (s,t,f) in bw_files:
fp = file(f, "r")
fp.readline() # slicenum
fp.readline() # timestamp
for l in fp.readlines():
try:
- line = Line(l)
+ line = Line(l,s,t)
if line.idhex not in nodes:
n = Node()
nodes[line.idhex] = n
@@ -139,12 +179,12 @@
float(len(nodes))
for n in nodes.itervalues():
- n.choose_strm_bw(pre_strm_avg)
+ n.choose_strm_bw(pre_strm_avg)
n.choose_filt_bw(pre_filt_avg)
- true_strm_avg = sum(map(lambda n: n.strm_bw[n.chosen_sbw],
+ true_strm_avg = sum(map(lambda n: n.strm_bw[n.chosen_sbw],
nodes.itervalues()))/float(len(nodes))
- true_filt_avg = sum(map(lambda n: n.filt_bw[n.chosen_fbw],
+ true_filt_avg = sum(map(lambda n: n.filt_bw[n.chosen_fbw],
nodes.itervalues()))/float(len(nodes))
for n in nodes.itervalues():
@@ -156,13 +196,17 @@
else:
n.ratio = n.fbw_ratio
n.new_bw = n.ns_bw[n.chosen_fbw]*n.ratio
- if n.idhex in prev_consensus:
+ if n.idhex in prev_consensus and prev_consensus[n.idhex].bandwidth != None:
+ prev_consensus[n.idhex].measured = True
n.new_bw = ((prev_consensus[n.idhex].bandwidth*ALPHA + n.new_bw)/(ALPHA + 1))/1024.0
+ for n in prev_consensus.itervalues():
+ if not n.measured:
+ plog("INFO", "Didn't measure "+n.idhex+"="+n.nickname)
+
n_print = nodes.values()
n_print.sort(lambda x,y: int(x.new_bw) - int(y.new_bw))
- oldest_timestamp = min(map(lambda (t,f): t, bw_files.itervalues()))
out = file(argv[-1], "w")
out.write(str(int(round(oldest_timestamp,0)))+"\n")
for n in n_print:
Modified: torflow/trunk/NetworkScanners/BwAuthority/run_scan.sh
===================================================================
--- torflow/trunk/NetworkScanners/BwAuthority/run_scan.sh 2009-06-07 00:24:07 UTC (rev 19643)
+++ torflow/trunk/NetworkScanners/BwAuthority/run_scan.sh 2009-06-07 02:29:01 UTC (rev 19644)
@@ -7,7 +7,7 @@
# git branch --track rs-format-fix mikeperry/rs-format-fix
# git checkout rs-format-fix
TOR_EXE=../../../tor.git/src/or/tor
-#PYTHONPATH=../../../SQLAlchemy-0.5.4p2/lib
+PYTHONPATH=../../../SQLAlchemy-0.5.4p2/lib
for i in data/scanner.*
do