[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[or-cvs] r19582: {torflow} Add range header to output files. Add function to wait for c (torflow/trunk/NetworkScanners/BwAuthority)
Author: mikeperry
Date: 2009-05-29 09:10:27 -0400 (Fri, 29 May 2009)
New Revision: 19582
Modified:
torflow/trunk/NetworkScanners/BwAuthority/bwauthority.py
Log:
Add range header to output files. Add function to wait for consensus
computation (still needs work).
Modified: torflow/trunk/NetworkScanners/BwAuthority/bwauthority.py
===================================================================
--- torflow/trunk/NetworkScanners/BwAuthority/bwauthority.py 2009-05-29 10:49:48 UTC (rev 19581)
+++ torflow/trunk/NetworkScanners/BwAuthority/bwauthority.py 2009-05-29 13:10:27 UTC (rev 19582)
@@ -11,12 +11,13 @@
import atexit
import socket
-from time import time,strftime
+import time
import sys
import urllib2
import os
import traceback
import copy
+import shutil
import threading
import ConfigParser
@@ -33,6 +34,9 @@
user_agent = "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; .NET CLR 1.0.3705; .NET CLR 1.1.4322)"
+# Note these urls should be https due to caching considerations.
+# If you really must make them http, be sure to change exit_ports to [80]
+# below, or else the scan will not finish.
# cutoff percent URL
urls = [(10, "https://128.174.236.117/4096k"),
(20, "https://128.174.236.117/2048k"),
@@ -51,10 +55,11 @@
percent_fast=100,
percent_skip=0,
min_bw=1024,
- use_all_exits=False, # XXX: need to fix conserve_exits to ensure 443
+ use_all_exits=False,
uniform=True,
use_exit=None,
- use_guards=False)
+ use_guards=False,
+ exit_ports=[443])
def read_config(filename):
config = ConfigParser.SafeConfigParser()
@@ -89,7 +94,8 @@
def attach_sql_listener(self, db_uri):
plog("DEBUG", "Got sqlite: "+db_uri)
SQLSupport.setup_db(db_uri, echo=False, drop=True)
- self.add_event_listener(SQLSupport.ConsensusTrackerListener())
+ self.sql_consensus_listener = SQLSupport.ConsensusTrackerListener()
+ self.add_event_listener(self.sql_consensus_listener)
self.add_event_listener(SQLSupport.StreamListener())
def write_sql_stats(self, percent_skip, percent_fast, rfilename=None):
@@ -113,12 +119,16 @@
if not rfilename:
rfilename="./data/stats/bws-"+time.strftime("20%y-%m-%d-%H:%M:%S")
cond = threading.Condition()
- def notlambda(h):
+ def notlambda(this):
cond.acquire()
- SQLSupport.RouterStats.write_bws(file(rfilename, "w"),
- percent_skip, percent_fast,
+ f=file(rfilename, "w")
+ f.write("low="+str(int(round((percent_skip*len(this.sorted_r))/100.0,0)))
+ +" hi="+str(int(round((percent_fast*len(this.sorted_r))/100.0,0)))
+ +"\n")
+ SQLSupport.RouterStats.write_bws(f, percent_skip, percent_fast,
order_by=SQLSupport.RouterStats.sbw,
recompute=False) # XXX: Careful here..
+ f.close()
cond.notify()
cond.release()
cond.acquire()
@@ -143,10 +153,10 @@
cond.acquire()
SQLSupport.tc_session.close()
try:
- os.rename(sql_file, new_file)
+ shutil.copy(sql_file, new_file)
except Exception,e:
plog("WARN", "Error moving sql file: "+str(e))
- SQLSupport.setup_db('sqlite:////'+sql_file, echo=False, drop=True)
+ SQLSupport.reset_all()
cond.notify()
cond.release()
cond.acquire()
@@ -155,11 +165,32 @@
cond.release()
def commit(self):
- # FIXME: This needs two stages+condition to really be correct
- def notlambda(this):
+ cond1 = threading.Condition()
+ cond2 = threading.Condition()
+ def notlambda2(this):
+ plog("INFO", "Commit done.")
+ cond2.notify()
+ cond2.release()
+
+ def notlambda1(this):
+ cond1.acquire()
+ cond2.acquire()
+ plog("INFO", "Committing jobs...")
this.run_all_jobs = True
- self.schedule_immediate(notlambda)
+ self.schedule_low_prio(notlambda2)
+ cond1.notify()
+ cond1.release()
+ cond1.acquire()
+ cond2.acquire()
+ self.schedule_immediate(notlambda1)
+
+ cond1.wait()
+ cond1.release()
+ cond2.wait()
+ cond2.release()
+ plog("INFO", "Scanner commit done.")
+
def close_circuits(self):
cond = threading.Condition()
def notlambda(this):
@@ -188,7 +219,7 @@
cond = threading.Condition()
def notlambda(this):
cond.acquire()
- this.new_nym = True # GIL hack
+ this.new_nym = True
lines = this.c.sendAndRecv("SIGNAL CLEARDNSCACHE\r\n")
for _,msg,more in lines:
plog("DEBUG", msg)
@@ -204,11 +235,19 @@
cond._finished = True # lol python haxx. Could make subclass, but why?? :)
def notlambda(this):
cond.acquire()
- for r in this.sorted_r:
- if len(r._generated) > position:
- if r._generated[position] < count:
- cond._finished = False
- break
+ # TODO: Using the entry_gen router list is somewhat ghetto..
+ for r in this.selmgr.path_selector.entry_gen.rstr_routers:
+ if r._generated[position] < count:
+ cond._finished = False
+ plog("DEBUG", "Entry router "+r.idhex+"="+r.nickname+" not done: "+str(r._generated[position])+", down: "+str(r.down)+", OK: "+str(this.selmgr.path_selector.entry_gen.rstr_list.r_is_ok(r))+", sorted_r: "+str(r in this.sorted_r))
+ # XXX:
+ #break
+ for r in this.selmgr.path_selector.exit_gen.rstr_routers:
+ if r._generated[position] < count:
+ cond._finished = False
+ plog("DEBUG", "Exit router "+r.idhex+"="+r.nickname+" not done: "+str(r._generated[position])+", down: "+str(r.down)+", OK: "+str(this.selmgr.path_selector.exit_gen.rstr_list.r_is_ok(r))+", sorted_r: "+str(r in this.sorted_r))
+ # XXX:
+ #break
cond.notify()
cond.release()
cond.acquire()
@@ -243,6 +282,25 @@
cond.release()
return cond._rank
+ def wait_for_consensus(self):
+ cond = threading.Condition()
+ # XXX: We want to set a flag that says "Don't compute consensus db
+ # until we get called again plz". Locks and semaphores are off limits
+ # because this is the same thread+codepath
+ def notlambda(this):
+ if this.sql_consensus_listener.last_desc_at \
+ != SQLSupport.ConsensusTrackerListener.CONSENSUS_DONE:
+ plog("INFO", "Waiting on consensus result: "+str(this.run_all_jobs))
+ this.schedule_low_prio(notlambda)
+ else:
+ cond.acquire()
+ cond.notify()
+ cond.release()
+ cond.acquire()
+ self.schedule_low_prio(notlambda)
+ cond.wait()
+ cond.release()
+
def http_request(address):
''' perform an http GET-request and return 1 for success or 0 for failure '''
@@ -275,19 +333,29 @@
attempt = 0
successful = 0
while not hdlr.is_count_met(circs_per_node):
+ hdlr.wait_for_consensus()
+
+ # Check local time. Do not scan between 01:30 and 05:30 local time
+ # XXX: -> config file?
+ lt = time.localtime()
+ sleep_start = time.mktime(lt[0:3]+(1,30,0,0,0)+(lt[-1],))
+ sleep_stop = time.mktime(lt[0:3]+(5,30,0,0,0)+(lt[-1],))
+ t0 = time.time()
+ if sleep_start <= t0 and t0 <= sleep_stop:
+ plog("NOTICE", "It's bedtime. Sleeping for "+str(round((sleep_stop-t0)/3600.0,1))+"h")
+ #time.sleep(sleep_stop - t0)
+
hdlr.new_exit()
-
attempt += 1
- t0 = time()
- # XXX: This noise is due to a difficult to find Tor bug that
+ # FIXME: This noise is due to a difficult to find Tor bug that
# causes some exits to hang forever on streams :(
timer = threading.Timer(max_fetch_time, lambda: hdlr.close_streams(7))
timer.start()
ret = http_request(choose_url(start_pct))
timer.cancel()
- delta_build = time() - t0
+ delta_build = time.time() - t0
if delta_build >= max_fetch_time:
plog('WARN', 'Timer exceeded limit: ' + str(delta_build) + '\n')
@@ -299,9 +367,9 @@
plog('DEBUG', str(start_pct)+'-'+str(stop_pct)+'% circuit build+fetch failed for ' + str(build_exit))
if save_every and ret and successful and (successful % save_every) == 0:
- race_time = strftime("20%y-%m-%d-%H:%M:%S")
+ race_time = time.strftime("20%y-%m-%d-%H:%M:%S")
+ hdlr.close_circuits()
hdlr.commit()
- hdlr.close_circuits()
lo = str(round(start_pct,1))
hi = str(round(stop_pct,1))
hdlr.write_sql_stats(start_pct, stop_pct, os.getcwd()+'/'+out_dir+'/sql-'+lo+':'+hi+"-"+str(successful)+"-"+race_time)
@@ -340,17 +408,17 @@
max_fetch_time)
plog('DEBUG', 'speedroced')
+ hdlr.close_circuits()
hdlr.commit()
- hdlr.close_circuits()
lo = str(round(pct,1))
hi = str(round(pct+pct_step,1))
- hdlr.write_sql_stats(pct, pct+pct_step, os.getcwd()+'/'+out_dir+'/sql-'+lo+':'+hi+"-done-"+strftime("20%y-%m-%d-%H:%M:%S"))
- hdlr.write_strm_bws(pct, pct+pct_step, os.getcwd()+'/'+out_dir+'/bws-'+lo+':'+hi+"-done-"+strftime("20%y-%m-%d-%H:%M:%S"))
+ hdlr.write_sql_stats(pct, pct+pct_step, os.getcwd()+'/'+out_dir+'/sql-'+lo+':'+hi+"-done-"+time.strftime("20%y-%m-%d-%H:%M:%S"))
+ hdlr.write_strm_bws(pct, pct+pct_step, os.getcwd()+'/'+out_dir+'/bws-'+lo+':'+hi+"-done-"+time.strftime("20%y-%m-%d-%H:%M:%S"))
plog('DEBUG', 'Wrote stats')
pct += pct_step
- hdlr.save_sql_file(sql_file, "db-"+str(lo)+":"+str(hi)+"-"+strftime("20%y-%m-%d-%H:%M:%S")+".sqlite")
+ hdlr.save_sql_file(sql_file, "db-"+str(lo)+":"+str(hi)+"-"+time.strftime("20%y-%m-%d-%H:%M:%S")+".sqlite")
def cleanup(c, f):
plog("INFO", "Resetting __LeaveStreamsUnattached=0 and FetchUselessDescriptors="+f)
@@ -370,6 +438,7 @@
h = BwScanHandler(c, __selmgr)
c.set_event_handler(h)
+ #c.set_periodic_timer(2.0, "PULSE")
c.set_events([TorCtl.EVENT_TYPE.STREAM,
TorCtl.EVENT_TYPE.BW,