[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[or-cvs] r19404: {torctl} Optimize stat computation and solve the inheritance promotio (torctl/trunk/python/TorCtl)



Author: mikeperry
Date: 2009-04-30 23:45:10 -0400 (Thu, 30 Apr 2009)
New Revision: 19404

Modified:
   torctl/trunk/python/TorCtl/SQLSupport.py
Log:

Optimize stat computation and solve the inheritance
promotion/upgrade problem by updating the row_type directly.



Modified: torctl/trunk/python/TorCtl/SQLSupport.py
===================================================================
--- torctl/trunk/python/TorCtl/SQLSupport.py	2009-04-30 18:39:22 UTC (rev 19403)
+++ torctl/trunk/python/TorCtl/SQLSupport.py	2009-05-01 03:45:10 UTC (rev 19404)
@@ -33,11 +33,8 @@
 tc_metadata = MetaData()
 tc_metadata.echo=False
 tc_session = scoped_session(sessionmaker(autoflush=True))
-#__metadata__ = tc_metadata
-#__session__ = tc_session
 
 def setup_db(db_uri):
-  #session.close()
   tc_engine = create_engine(db_uri, echo=False)
   tc_metadata.bind = tc_engine
   tc_metadata.echo = False
@@ -107,40 +104,6 @@
   launch_time = Field(Float)
   last_extend = Field(Float)
 
-  def xfer_copy(self, circ):
-    rs = []
-    for r in circ.routers:
-      rs.append(r)
-    for r in rs:
-      #r.circuits.append(self)
-      rcl = len(r.circuits)
-      self.routers.append(r)
-      r.circuits.remove(circ)
-      assert rcl == len(r.circuits)
-    assert len(self.routers) == len(rs)
-
-    csl = []
-    for s in circ.streams:
-      csl.append(s)
-    for s in csl:
-      self.streams.append(s)
-      s.circ = self
-    assert len(csl) == len(self.streams)
-
-    cel = []
-    for e in circ.extensions:
-      cel.append(e)
-    for e in cel: 
-      self.extensions.append(e)
-      e.circ = self
-    assert len(cel) == len(self.extensions)
-
-    self.circ_id = circ.circ_id
-    self.launch_time = circ.launch_time
-    self.last_extend = circ.last_extend
-    tc_session.delete(circ)
-    return self
-
 class FailedCircuit(Circuit):
   using_mapper_options(save_on_init=False)
   using_options(session=tc_session, metadata=tc_metadata)
@@ -228,13 +191,14 @@
 
   circ_from_rate = Field(Float)
   circ_to_rate = Field(Float)
+  circ_bi_rate = Field(Float)
 
   circ_to_ratio = Field(Float)
   circ_from_ratio = Field(Float)
   circ_bi_ratio = Field(Float)
 
   avg_first_ext = Field(Float)
-  first_ext_ratio = Field(Float)
+  ext_ratio = Field(Float)
   
   avg_sbw = Field(Float)
   sbw_ratio = Field(Float)
@@ -245,97 +209,126 @@
   filt_bi_ratio = Field(Float)
   filt_sbw_ratio = Field(Float)
 
+  def _compute_stats_relation(r):
+    rs = r.stats
+    rs.circ_fail_to = 0
+    rs.circ_try_to = 0
+    rs.circ_fail_from = 0
+    rs.circ_try_from = 0
+    tot_extend_time = 0
+    tot_extends = 0
+    for c in r.circuits: 
+      for e in c.extensions: 
+        if e.to_node == r:
+          rs.circ_try_to += 1
+          if isinstance(e, FailedExtension):
+            rs.circ_fail_to += 1
+          elif e.hop == 0:
+            tot_extend_time += e.delta
+            tot_extends += 1
+        elif e.from_node == r:
+          rs.circ_try_from += 1
+          if isinstance(e, FailedExtension):
+            rs.circ_fail_from += 1
+          
+      if isinstance(c, FailedCircuit):
+        pass # TODO: Also count timeouts against earlier nodes?
+      elif isinstance(c, DestroyedCircuit):
+        pass # TODO: Count these somehow..
+
+    if tot_extends > 0: rs.avg_first_ext = (1.0*tot_extend_time)/tot_extends
+    else: rs.avg_first_ext = 0
+  _compute_stats_relation = Callable(_compute_stats_relation)
+
+  def _compute_stats_query(r):
+    rs = r.stats
+    to_r = Extension.query.filter_by(to_node=r)
+    rs.circ_try_to = to_r.count()
+    rs.circ_try_from = Extension.query.filter_by(from_node=r).count()
+    rs.circ_fail_to = FailedExtension.query.filter_by(to_node=r).count()
+    rs.circ_fail_from = FailedExtension.query.filter_by(from_node=r).count()
+    rs.avg_first_ext = to_r.filter_by(hop=0,row_type='extension').avg(Extension.delta)
+  _compute_stats_query = Callable(_compute_stats_query)
+
+
   def _compute_stats():
-    # FIXME: Change loading method from lazy to eager here
-    # FIXME: Always?
-    for r in Router.query.all():
+    for r in Router.query.\
+                   options(eagerload_all('circuits.extensions')).\
+                   all():
+      RouterStats._compute_stats_relation(r)
+      #RouterStats._compute_stats_query(r) # Remove options if this is used
       rs = r.stats
-      rs.circ_fail_to = 0
-      rs.circ_try_to = 0
-      rs.circ_fail_from = 0
-      rs.circ_try_from = 0
-      tot_extend_time = 0
-      tot_extends = 0
-      for c in r.circuits: 
-        # FIXME: Should this be SQL-query based instead?
-        for e in c.extensions: 
-          if e.to_node == r:
-            rs.circ_try_to += 1
-            if isinstance(e, FailedExtension):
-              rs.circ_fail_to += 1
-            elif e.hop == 0:
-              tot_extend_time += e.delta
-              tot_extends += 1
-          elif e.from_node == r:
-            rs.circ_try_from += 1
-            if isinstance(e, FailedExtension):
-              rs.circ_fail_from += 1
-            
-        if isinstance(c, FailedCircuit):
-          pass
-          # TODO: Also count timeouts against earlier nodes?
-        elif isinstance(c, DestroyedCircuit):
-          pass # TODO: Count these somehow..
-
       if rs.circ_try_from > 0:
         rs.circ_from_rate = (1.0*rs.circ_fail_from/rs.circ_try_from)
       if rs.circ_try_to > 0:
         rs.circ_to_rate = (1.0*rs.circ_fail_to/rs.circ_try_to)
+      if rs.circ_try_to+rs.circ_try_from > 0:
+        rs.circ_bi_rate = (1.0*rs.circ_fail_to+rs.circ_fail_from)/(rs.circ_try_to+rs.circ_try_from)
 
-      if tot_extends > 0:
-        rs.avg_first_ext = (1.0*tot_extend_time)/tot_extends
-      else:
-        rs.avg_first_ext = 0
       #for s in r.streams:
       #  if isinstance(c, ClosedStream):
       #  elif isinstance(c, FailedStream):
       tc_session.update(rs)
   _compute_stats = Callable(_compute_stats)
- 
+
   def _compute_ranks():
-    # FIXME: Single query for this?
     min_avg_rank = 0x7fffffff
     max_avg_rank = 0
-    for r in Router.query.all():
+    # TODO: Can we optimize this further into one query/update?
+    for r in Router.query.all(): 
       if r.stats: tc_session.delete(r.stats)
       rs = RouterStats()
       rs.router = r
       r.stats = rs
-
-      min_rank = 0x7fffffff
-      tot_rank = 0
-      tot_bw = 0
-      max_rank = 0
-      ranks = len(r.bw_history)
-      for h in r.bw_history:
-        tot_rank += h.rank
-        tot_bw += h.bw
-        if h.rank < min_rank:
-          min_rank = h.rank
-        if h.rank > max_rank:
-          max_rank = h.rank
-      rs.min_rank = min_rank
-      rs.avg_rank = (1.0*tot_rank)/ranks
-      if rs.avg_rank < min_avg_rank:
-        min_avg_rank = rs.avg_rank
-      if rs.avg_rank > max_avg_rank:
-        max_avg_rank = rs.avg_rank
-      rs.max_rank = max_rank
-      rs.avg_bw = (1.0*tot_bw)/ranks
-      tc_session.save_or_update(rs)
-      tc_session.update(r)
-    # FIXME: Single query for this?
+      rank_q = BwHistory.query.filter_by(router=r)
+      rs.min_rank = rank_q.min(BwHistory.rank)
+      rs.avg_rank = rank_q.avg(BwHistory.rank)
+      rs.max_rank = rank_q.max(BwHistory.rank)
+      rs.avg_bw = rank_q.avg(BwHistory.bw)
+    min_avg_rank = RouterStats.query.filer('1=1').min(RouterStats.avg_rank)
+    max_avg_rank = RouterStats.query.filer('1=1').max(RouterStats.avg_rank)
     for rs in RouterStats.query.all():
       rs.percentile = (100.0*rs.avg_rank)/(max_avg_rank - min_avg_rank)
-      tc_session.update(rs)
+
+    tc_session.update(rs)
+    tc_session.update(r)
   _compute_ranks = Callable(_compute_ranks)
 
-  def _compute_ratios():
-    pass
+  def _compute_ratios(filter):
+    sliceq = RouterStats.query.filter(filter)
+    avg_circ_from_rate = sliceq.avg(RouterStats.circ_from_rate)
+    avg_circ_to_rate = sliceq.avg(RouterStats.circ_to_rate)
+    avg_circ_bi_rate = sliceq.avg(RouterStats.circ_bi_rate)
+    avg_ext = sliceq.avg(RouterStats.avg_first_ext)
+    for rs in sliceq.all():
+      rs.circ_from_ratio = rs.circ_from_rate/avg_circ_from_rate
+      rs.circ_to_ratio = rs.circ_to_rate/avg_circ_to_rate
+      rs.circ_bi_ratio = rs.circ_bi_rate/avg_circ_bi_rate
+      rs.ext_ratio = rs.avg_first_ext/avg_ext
+      rs.filt_from_ratio = rs.circ_from_ratio
+      rs.filt_to_ratio = rs.circ_to_ratio
+      rs.filt_bi_ratio = rs.circ_bi_ratio
   _compute_ratios = Callable(_compute_ratios)
 
-  def _compute_filtered_ratios():
-    pass
+  def _compute_filtered_ratios(filter, min_ratio):
+    # XXX: Actually, we should start off simple and only
+    # do filtering for stream ratios
+    badrouters = RouterStats.query.filter(
+       RouterStats.circ_from_rate < min_ratio).column(RouterStats.router).all()
+
+    extnq = Circuit.query
+    for r in badrouters:
+      extnq.filter(not_(Circuit.routers.contains(r)))
+    extnq = sliceq.column(Circuit.extensions)
+    
+    #sliceq = RouterStats.query.filter(filter)
+    #for r in badrouters:
+    #  sliceq = sliceq.filter(not_(Circuits.routers.contains(r)))
+
+    avg_rate = sliceq.avg(RouterStats.circ_from_rate)
+
+    for rs in sliceq.all():
+      rs.filt_from_ratio = rs.circ_from_rate/avg_rate
   _compute_filtered_ratios = Callable(_compute_filtered_ratios)
 
   def reset():
@@ -541,17 +534,23 @@
       tc_session.update(circ)
       tc_session.commit()
     elif c.status == "FAILED":
-      circ = Circuit.query.options(eagerload_all('routers.circuits'), 
-               eagerload('extensions'), eagerload('streams')).filter_by(
-                      circ_id = c.circ_id).first()
+      circ = Circuit.query.filter_by(circ_id = c.circ_id).first()
       if not circ: return # Skip circuits from before we came online
-
+        
+      circ.expunge()
       if isinstance(circ, BuiltCircuit):
-        circ = DestroyedCircuit().xfer_copy(circ) 
+        # Convert to destroyed circuit
+        Circuit.table.update(Circuit.id ==
+                  circ.id).execute(row_type='destroyedcircuit')
+        circ = DestroyedCircuit.query.filter_by(id=circ.id).one()
         circ.destroy_reason = reason
         circ.destroy_time = c.arrived_at
       else:
-        circ = FailedCircuit().xfer_copy(circ)
+        # Convert to failed circuit
+        Circuit.table.update(Circuit.id ==
+                  circ.id).execute(row_type='failedcircuit')
+        circ = FailedCircuit.query.options(
+                  eagerload('extensions')).filter_by(id=circ.id).one()
         circ.fail_reason = reason
         circ.fail_time = c.arrived_at
         e = FailedExtension(circ=circ, hop=len(c.path)+1, time=c.arrived_at)
@@ -579,26 +578,35 @@
       tc_session.save_or_update(circ)
       tc_session.commit()
     elif c.status == "BUILT":
-      circ = Circuit.query.options(eagerload_all('routers.circuits'), 
-               eagerload('extensions'), eagerload('streams')).filter_by(
+      circ = Circuit.query.filter_by(
                      circ_id = c.circ_id).first()
       if not circ: return # Skip circuits from before we came online
+
+      circ.expunge()
+      # Convert to built circuit
+      Circuit.table.update(Circuit.id ==
+                circ.id).execute(row_type='builtcircuit')
+      circ = BuiltCircuit.query.filter_by(id=circ.id).one()
       
-      circ = BuiltCircuit().xfer_copy(circ)
       circ.built_time = c.arrived_at
       circ.tot_delta = c.arrived_at - circ.launch_time
       tc_session.save_or_update(circ)
       tc_session.commit()
     elif c.status == "CLOSED":
-      circ = BuiltCircuit.query.options(eagerload_all('routers.circuits'), 
-               eagerload('extensions'), eagerload('streams')).filter_by(
-                   circ_id = c.circ_id).first()
+      circ = BuiltCircuit.query.filter_by(circ_id = c.circ_id).first()
       if circ:
+        circ.expunge()
         if lreason in ("REQUESTED", "FINISHED", "ORIGIN"):
-          circ = ClosedCircuit().xfer_copy(circ)
+          # Convert to closed circuit
+          Circuit.table.update(Circuit.id ==
+                    circ.id).execute(row_type='closedcircuit')
+          circ = ClosedCircuit.query.filter_by(id=circ.id).one()
           circ.closed_time = c.arrived_at
         else:
-          circ = DestroyedCircuit().xfer_copy(circ)
+          # Convert to destroyed circuit
+          Circuit.table.update(Circuit.id ==
+                    circ.id).execute(row_type='destroyedcircuit')
+          circ = DestroyedCircuit.query.filter_by(id=circ.id).one()
           circ.destroy_reason = reason
           circ.destroy_time = c.arrived_at
         tc_session.save_or_update(circ)