[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[or-cvs] r19582: {torflow} Add range header to output files. Add function to wait for c (torflow/trunk/NetworkScanners/BwAuthority)



Author: mikeperry
Date: 2009-05-29 09:10:27 -0400 (Fri, 29 May 2009)
New Revision: 19582

Modified:
   torflow/trunk/NetworkScanners/BwAuthority/bwauthority.py
Log:

Add range header to output files. Add function to wait for consensus
computation (still needs work).



Modified: torflow/trunk/NetworkScanners/BwAuthority/bwauthority.py
===================================================================
--- torflow/trunk/NetworkScanners/BwAuthority/bwauthority.py	2009-05-29 10:49:48 UTC (rev 19581)
+++ torflow/trunk/NetworkScanners/BwAuthority/bwauthority.py	2009-05-29 13:10:27 UTC (rev 19582)
@@ -11,12 +11,13 @@
 
 import atexit
 import socket
-from time import time,strftime
+import time
 import sys
 import urllib2
 import os
 import traceback
 import copy
+import shutil
 import threading
 import ConfigParser
 
@@ -33,6 +34,9 @@
 
 user_agent = "Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; .NET CLR 1.0.3705; .NET CLR 1.1.4322)"
 
+# Note these urls should be https due to caching considerations.
+# If you really must make them http, be sure to change exit_ports to [80]
+# below, or else the scan will not finish.
 #          cutoff percent                URL
 urls =         [(10,          "https://128.174.236.117/4096k";),
                 (20,          "https://128.174.236.117/2048k";),
@@ -51,10 +55,11 @@
       percent_fast=100,
       percent_skip=0,
       min_bw=1024,
-      use_all_exits=False, # XXX: need to fix conserve_exits to ensure 443
+      use_all_exits=False,
       uniform=True,
       use_exit=None,
-      use_guards=False)
+      use_guards=False,
+      exit_ports=[443])
 
 def read_config(filename):
   config = ConfigParser.SafeConfigParser()
@@ -89,7 +94,8 @@
   def attach_sql_listener(self, db_uri):
     plog("DEBUG", "Got sqlite: "+db_uri)
     SQLSupport.setup_db(db_uri, echo=False, drop=True)
-    self.add_event_listener(SQLSupport.ConsensusTrackerListener())
+    self.sql_consensus_listener = SQLSupport.ConsensusTrackerListener()
+    self.add_event_listener(self.sql_consensus_listener)
     self.add_event_listener(SQLSupport.StreamListener())
 
   def write_sql_stats(self, percent_skip, percent_fast, rfilename=None):
@@ -113,12 +119,16 @@
     if not rfilename:
       rfilename="./data/stats/bws-"+time.strftime("20%y-%m-%d-%H:%M:%S")
     cond = threading.Condition()
-    def notlambda(h):
+    def notlambda(this):
       cond.acquire()
-      SQLSupport.RouterStats.write_bws(file(rfilename, "w"),
-                            percent_skip, percent_fast,
+      f=file(rfilename, "w")
+      f.write("low="+str(int(round((percent_skip*len(this.sorted_r))/100.0,0)))
+             +" hi="+str(int(round((percent_fast*len(this.sorted_r))/100.0,0)))
+             +"\n")
+      SQLSupport.RouterStats.write_bws(f, percent_skip, percent_fast,
                             order_by=SQLSupport.RouterStats.sbw,
                             recompute=False) # XXX: Careful here..
+      f.close()
       cond.notify()
       cond.release()
     cond.acquire()
@@ -143,10 +153,10 @@
       cond.acquire()
       SQLSupport.tc_session.close()
       try:
-        os.rename(sql_file, new_file)
+        shutil.copy(sql_file, new_file)
       except Exception,e:
         plog("WARN", "Error moving sql file: "+str(e))
-      SQLSupport.setup_db('sqlite:////'+sql_file, echo=False, drop=True)
+      SQLSupport.reset_all()
       cond.notify()
       cond.release()
     cond.acquire()
@@ -155,11 +165,32 @@
     cond.release()
 
   def commit(self):
-    # FIXME: This needs two stages+condition to really be correct
-    def notlambda(this): 
+    cond1 = threading.Condition()
+    cond2 = threading.Condition()
+    def notlambda2(this):
+      plog("INFO", "Commit done.")
+      cond2.notify()
+      cond2.release()
+
+    def notlambda1(this):
+      cond1.acquire()
+      cond2.acquire()
+      plog("INFO", "Committing jobs...")
       this.run_all_jobs = True
-    self.schedule_immediate(notlambda)
+      self.schedule_low_prio(notlambda2)
+      cond1.notify()
+      cond1.release()
 
+    cond1.acquire()
+    cond2.acquire()
+    self.schedule_immediate(notlambda1)
+
+    cond1.wait()
+    cond1.release()
+    cond2.wait()
+    cond2.release()
+    plog("INFO", "Scanner commit done.")
+
   def close_circuits(self):
     cond = threading.Condition()
     def notlambda(this):
@@ -188,7 +219,7 @@
     cond = threading.Condition()
     def notlambda(this):
       cond.acquire()
-      this.new_nym = True # GIL hack
+      this.new_nym = True
       lines = this.c.sendAndRecv("SIGNAL CLEARDNSCACHE\r\n")
       for _,msg,more in lines:
         plog("DEBUG", msg)
@@ -204,11 +235,19 @@
     cond._finished = True # lol python haxx. Could make subclass, but why?? :)
     def notlambda(this):
       cond.acquire()
-      for r in this.sorted_r:
-        if len(r._generated) > position:
-          if r._generated[position] < count:
-            cond._finished = False
-            break
+      # TODO: Using the entry_gen router list is somewhat ghetto..
+      for r in this.selmgr.path_selector.entry_gen.rstr_routers:
+        if r._generated[position] < count:
+          cond._finished = False
+          plog("DEBUG", "Entry router "+r.idhex+"="+r.nickname+" not done: "+str(r._generated[position])+", down: "+str(r.down)+", OK: "+str(this.selmgr.path_selector.entry_gen.rstr_list.r_is_ok(r))+", sorted_r: "+str(r in this.sorted_r))
+          # XXX:
+          #break
+      for r in this.selmgr.path_selector.exit_gen.rstr_routers:
+        if r._generated[position] < count:
+          cond._finished = False
+          plog("DEBUG", "Exit router "+r.idhex+"="+r.nickname+" not done: "+str(r._generated[position])+", down: "+str(r.down)+", OK: "+str(this.selmgr.path_selector.exit_gen.rstr_list.r_is_ok(r))+", sorted_r: "+str(r in this.sorted_r))
+          # XXX:
+          #break
       cond.notify()
       cond.release()
     cond.acquire()
@@ -243,6 +282,25 @@
     cond.release()
     return cond._rank
 
+  def wait_for_consensus(self):
+    cond = threading.Condition()
+    # XXX: We want to set a flag that says "Don't compute consensus db
+    # until we get called again plz". Locks and semaphores are off limits
+    # because this is the same thread+codepath
+    def notlambda(this):
+      if this.sql_consensus_listener.last_desc_at \
+                 != SQLSupport.ConsensusTrackerListener.CONSENSUS_DONE:
+        plog("INFO", "Waiting on consensus result: "+str(this.run_all_jobs))
+        this.schedule_low_prio(notlambda)
+      else:
+        cond.acquire()
+        cond.notify()
+        cond.release()
+    cond.acquire()
+    self.schedule_low_prio(notlambda)
+    cond.wait()
+    cond.release()
+
 def http_request(address):
   ''' perform an http GET-request and return 1 for success or 0 for failure '''
 
@@ -275,19 +333,29 @@
   attempt = 0
   successful = 0
   while not hdlr.is_count_met(circs_per_node):
+    hdlr.wait_for_consensus()
+
+    # Check local time. Do not scan between 01:30 and 05:30 local time
+    # XXX: -> config file?
+    lt = time.localtime()
+    sleep_start = time.mktime(lt[0:3]+(1,30,0,0,0)+(lt[-1],))
+    sleep_stop = time.mktime(lt[0:3]+(5,30,0,0,0)+(lt[-1],))
+    t0 = time.time()
+    if sleep_start <= t0 and t0 <= sleep_stop:
+      plog("NOTICE", "It's bedtime. Sleeping for "+str(round((sleep_stop-t0)/3600.0,1))+"h")
+      #time.sleep(sleep_stop - t0)
+
     hdlr.new_exit()
-    
     attempt += 1
     
-    t0 = time()
-    # XXX: This noise is due to a difficult to find Tor bug that
+    # FIXME: This noise is due to a difficult to find Tor bug that
     # causes some exits to hang forever on streams :(
     timer = threading.Timer(max_fetch_time, lambda: hdlr.close_streams(7))
     timer.start()
     ret = http_request(choose_url(start_pct))
     timer.cancel()
 
-    delta_build = time() - t0
+    delta_build = time.time() - t0
     if delta_build >= max_fetch_time:
       plog('WARN', 'Timer exceeded limit: ' + str(delta_build) + '\n')
 
@@ -299,9 +367,9 @@
       plog('DEBUG', str(start_pct)+'-'+str(stop_pct)+'% circuit build+fetch failed for ' + str(build_exit))
 
     if save_every and ret and successful and (successful % save_every) == 0:
-      race_time = strftime("20%y-%m-%d-%H:%M:%S")
+      race_time = time.strftime("20%y-%m-%d-%H:%M:%S")
+      hdlr.close_circuits()
       hdlr.commit()
-      hdlr.close_circuits()
       lo = str(round(start_pct,1))
       hi = str(round(stop_pct,1))
       hdlr.write_sql_stats(start_pct, stop_pct, os.getcwd()+'/'+out_dir+'/sql-'+lo+':'+hi+"-"+str(successful)+"-"+race_time)
@@ -340,17 +408,17 @@
                 max_fetch_time)
 
       plog('DEBUG', 'speedroced')
+      hdlr.close_circuits()
       hdlr.commit()
-      hdlr.close_circuits()
 
       lo = str(round(pct,1))
       hi = str(round(pct+pct_step,1))
       
-      hdlr.write_sql_stats(pct, pct+pct_step, os.getcwd()+'/'+out_dir+'/sql-'+lo+':'+hi+"-done-"+strftime("20%y-%m-%d-%H:%M:%S"))
-      hdlr.write_strm_bws(pct, pct+pct_step, os.getcwd()+'/'+out_dir+'/bws-'+lo+':'+hi+"-done-"+strftime("20%y-%m-%d-%H:%M:%S"))
+      hdlr.write_sql_stats(pct, pct+pct_step, os.getcwd()+'/'+out_dir+'/sql-'+lo+':'+hi+"-done-"+time.strftime("20%y-%m-%d-%H:%M:%S"))
+      hdlr.write_strm_bws(pct, pct+pct_step, os.getcwd()+'/'+out_dir+'/bws-'+lo+':'+hi+"-done-"+time.strftime("20%y-%m-%d-%H:%M:%S"))
       plog('DEBUG', 'Wrote stats')
       pct += pct_step
-      hdlr.save_sql_file(sql_file, "db-"+str(lo)+":"+str(hi)+"-"+strftime("20%y-%m-%d-%H:%M:%S")+".sqlite")
+      hdlr.save_sql_file(sql_file, "db-"+str(lo)+":"+str(hi)+"-"+time.strftime("20%y-%m-%d-%H:%M:%S")+".sqlite")
 
 def cleanup(c, f):
   plog("INFO", "Resetting __LeaveStreamsUnattached=0 and FetchUselessDescriptors="+f)
@@ -370,6 +438,7 @@
   h = BwScanHandler(c, __selmgr)
 
   c.set_event_handler(h)
+  #c.set_periodic_timer(2.0, "PULSE")
 
   c.set_events([TorCtl.EVENT_TYPE.STREAM,
           TorCtl.EVENT_TYPE.BW,