[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[or-cvs] r19404: {torctl} Optimize stat computation and solve the inheritance promotio (torctl/trunk/python/TorCtl)
Author: mikeperry
Date: 2009-04-30 23:45:10 -0400 (Thu, 30 Apr 2009)
New Revision: 19404
Modified:
torctl/trunk/python/TorCtl/SQLSupport.py
Log:
Optimize stat computation and solve the inheritance
promotion/upgrade problem by updating the row_type directly.
Modified: torctl/trunk/python/TorCtl/SQLSupport.py
===================================================================
--- torctl/trunk/python/TorCtl/SQLSupport.py 2009-04-30 18:39:22 UTC (rev 19403)
+++ torctl/trunk/python/TorCtl/SQLSupport.py 2009-05-01 03:45:10 UTC (rev 19404)
@@ -33,11 +33,8 @@
tc_metadata = MetaData()
tc_metadata.echo=False
tc_session = scoped_session(sessionmaker(autoflush=True))
-#__metadata__ = tc_metadata
-#__session__ = tc_session
def setup_db(db_uri):
- #session.close()
tc_engine = create_engine(db_uri, echo=False)
tc_metadata.bind = tc_engine
tc_metadata.echo = False
@@ -107,40 +104,6 @@
launch_time = Field(Float)
last_extend = Field(Float)
- def xfer_copy(self, circ):
- rs = []
- for r in circ.routers:
- rs.append(r)
- for r in rs:
- #r.circuits.append(self)
- rcl = len(r.circuits)
- self.routers.append(r)
- r.circuits.remove(circ)
- assert rcl == len(r.circuits)
- assert len(self.routers) == len(rs)
-
- csl = []
- for s in circ.streams:
- csl.append(s)
- for s in csl:
- self.streams.append(s)
- s.circ = self
- assert len(csl) == len(self.streams)
-
- cel = []
- for e in circ.extensions:
- cel.append(e)
- for e in cel:
- self.extensions.append(e)
- e.circ = self
- assert len(cel) == len(self.extensions)
-
- self.circ_id = circ.circ_id
- self.launch_time = circ.launch_time
- self.last_extend = circ.last_extend
- tc_session.delete(circ)
- return self
-
class FailedCircuit(Circuit):
using_mapper_options(save_on_init=False)
using_options(session=tc_session, metadata=tc_metadata)
@@ -228,13 +191,14 @@
circ_from_rate = Field(Float)
circ_to_rate = Field(Float)
+ circ_bi_rate = Field(Float)
circ_to_ratio = Field(Float)
circ_from_ratio = Field(Float)
circ_bi_ratio = Field(Float)
avg_first_ext = Field(Float)
- first_ext_ratio = Field(Float)
+ ext_ratio = Field(Float)
avg_sbw = Field(Float)
sbw_ratio = Field(Float)
@@ -245,97 +209,126 @@
filt_bi_ratio = Field(Float)
filt_sbw_ratio = Field(Float)
+ def _compute_stats_relation(r):
+ rs = r.stats
+ rs.circ_fail_to = 0
+ rs.circ_try_to = 0
+ rs.circ_fail_from = 0
+ rs.circ_try_from = 0
+ tot_extend_time = 0
+ tot_extends = 0
+ for c in r.circuits:
+ for e in c.extensions:
+ if e.to_node == r:
+ rs.circ_try_to += 1
+ if isinstance(e, FailedExtension):
+ rs.circ_fail_to += 1
+ elif e.hop == 0:
+ tot_extend_time += e.delta
+ tot_extends += 1
+ elif e.from_node == r:
+ rs.circ_try_from += 1
+ if isinstance(e, FailedExtension):
+ rs.circ_fail_from += 1
+
+ if isinstance(c, FailedCircuit):
+ pass # TODO: Also count timeouts against earlier nodes?
+ elif isinstance(c, DestroyedCircuit):
+ pass # TODO: Count these somehow..
+
+ if tot_extends > 0: rs.avg_first_ext = (1.0*tot_extend_time)/tot_extends
+ else: rs.avg_first_ext = 0
+ _compute_stats_relation = Callable(_compute_stats_relation)
+
+ def _compute_stats_query(r):
+ rs = r.stats
+ to_r = Extension.query.filter_by(to_node=r)
+ rs.circ_try_to = to_r.count()
+ rs.circ_try_from = Extension.query.filter_by(from_node=r).count()
+ rs.circ_fail_to = FailedExtension.query.filter_by(to_node=r).count()
+ rs.circ_fail_from = FailedExtension.query.filter_by(from_node=r).count()
+ rs.avg_first_ext = to_r.filter_by(hop=0,row_type='extension').avg(Extension.delta)
+ _compute_stats_query = Callable(_compute_stats_query)
+
+
def _compute_stats():
- # FIXME: Change loading method from lazy to eager here
- # FIXME: Always?
- for r in Router.query.all():
+ for r in Router.query.\
+ options(eagerload_all('circuits.extensions')).\
+ all():
+ RouterStats._compute_stats_relation(r)
+ #RouterStats._compute_stats_query(r) # Remove options if this is used
rs = r.stats
- rs.circ_fail_to = 0
- rs.circ_try_to = 0
- rs.circ_fail_from = 0
- rs.circ_try_from = 0
- tot_extend_time = 0
- tot_extends = 0
- for c in r.circuits:
- # FIXME: Should this be SQL-query based instead?
- for e in c.extensions:
- if e.to_node == r:
- rs.circ_try_to += 1
- if isinstance(e, FailedExtension):
- rs.circ_fail_to += 1
- elif e.hop == 0:
- tot_extend_time += e.delta
- tot_extends += 1
- elif e.from_node == r:
- rs.circ_try_from += 1
- if isinstance(e, FailedExtension):
- rs.circ_fail_from += 1
-
- if isinstance(c, FailedCircuit):
- pass
- # TODO: Also count timeouts against earlier nodes?
- elif isinstance(c, DestroyedCircuit):
- pass # TODO: Count these somehow..
-
if rs.circ_try_from > 0:
rs.circ_from_rate = (1.0*rs.circ_fail_from/rs.circ_try_from)
if rs.circ_try_to > 0:
rs.circ_to_rate = (1.0*rs.circ_fail_to/rs.circ_try_to)
+ if rs.circ_try_to+rs.circ_try_from > 0:
+ rs.circ_bi_rate = (1.0*rs.circ_fail_to+rs.circ_fail_from)/(rs.circ_try_to+rs.circ_try_from)
- if tot_extends > 0:
- rs.avg_first_ext = (1.0*tot_extend_time)/tot_extends
- else:
- rs.avg_first_ext = 0
#for s in r.streams:
# if isinstance(c, ClosedStream):
# elif isinstance(c, FailedStream):
tc_session.update(rs)
_compute_stats = Callable(_compute_stats)
-
+
def _compute_ranks():
- # FIXME: Single query for this?
min_avg_rank = 0x7fffffff
max_avg_rank = 0
- for r in Router.query.all():
+ # TODO: Can we optimize this further into one query/update?
+ for r in Router.query.all():
if r.stats: tc_session.delete(r.stats)
rs = RouterStats()
rs.router = r
r.stats = rs
-
- min_rank = 0x7fffffff
- tot_rank = 0
- tot_bw = 0
- max_rank = 0
- ranks = len(r.bw_history)
- for h in r.bw_history:
- tot_rank += h.rank
- tot_bw += h.bw
- if h.rank < min_rank:
- min_rank = h.rank
- if h.rank > max_rank:
- max_rank = h.rank
- rs.min_rank = min_rank
- rs.avg_rank = (1.0*tot_rank)/ranks
- if rs.avg_rank < min_avg_rank:
- min_avg_rank = rs.avg_rank
- if rs.avg_rank > max_avg_rank:
- max_avg_rank = rs.avg_rank
- rs.max_rank = max_rank
- rs.avg_bw = (1.0*tot_bw)/ranks
- tc_session.save_or_update(rs)
- tc_session.update(r)
- # FIXME: Single query for this?
+ rank_q = BwHistory.query.filter_by(router=r)
+ rs.min_rank = rank_q.min(BwHistory.rank)
+ rs.avg_rank = rank_q.avg(BwHistory.rank)
+ rs.max_rank = rank_q.max(BwHistory.rank)
+ rs.avg_bw = rank_q.avg(BwHistory.bw)
+ min_avg_rank = RouterStats.query.filer('1=1').min(RouterStats.avg_rank)
+ max_avg_rank = RouterStats.query.filer('1=1').max(RouterStats.avg_rank)
for rs in RouterStats.query.all():
rs.percentile = (100.0*rs.avg_rank)/(max_avg_rank - min_avg_rank)
- tc_session.update(rs)
+
+ tc_session.update(rs)
+ tc_session.update(r)
_compute_ranks = Callable(_compute_ranks)
- def _compute_ratios():
- pass
+ def _compute_ratios(filter):
+ sliceq = RouterStats.query.filter(filter)
+ avg_circ_from_rate = sliceq.avg(RouterStats.circ_from_rate)
+ avg_circ_to_rate = sliceq.avg(RouterStats.circ_to_rate)
+ avg_circ_bi_rate = sliceq.avg(RouterStats.circ_bi_rate)
+ avg_ext = sliceq.avg(RouterStats.avg_first_ext)
+ for rs in sliceq.all():
+ rs.circ_from_ratio = rs.circ_from_rate/avg_circ_from_rate
+ rs.circ_to_ratio = rs.circ_to_rate/avg_circ_to_rate
+ rs.circ_bi_ratio = rs.circ_bi_rate/avg_circ_bi_rate
+ rs.ext_ratio = rs.avg_first_ext/avg_ext
+ rs.filt_from_ratio = rs.circ_from_ratio
+ rs.filt_to_ratio = rs.circ_to_ratio
+ rs.filt_bi_ratio = rs.circ_bi_ratio
_compute_ratios = Callable(_compute_ratios)
- def _compute_filtered_ratios():
- pass
+ def _compute_filtered_ratios(filter, min_ratio):
+ # XXX: Actually, we should start off simple and only
+ # do filtering for stream ratios
+ badrouters = RouterStats.query.filter(
+ RouterStats.circ_from_rate < min_ratio).column(RouterStats.router).all()
+
+ extnq = Circuit.query
+ for r in badrouters:
+ extnq.filter(not_(Circuit.routers.contains(r)))
+ extnq = sliceq.column(Circuit.extensions)
+
+ #sliceq = RouterStats.query.filter(filter)
+ #for r in badrouters:
+ # sliceq = sliceq.filter(not_(Circuits.routers.contains(r)))
+
+ avg_rate = sliceq.avg(RouterStats.circ_from_rate)
+
+ for rs in sliceq.all():
+ rs.filt_from_ratio = rs.circ_from_rate/avg_rate
_compute_filtered_ratios = Callable(_compute_filtered_ratios)
def reset():
@@ -541,17 +534,23 @@
tc_session.update(circ)
tc_session.commit()
elif c.status == "FAILED":
- circ = Circuit.query.options(eagerload_all('routers.circuits'),
- eagerload('extensions'), eagerload('streams')).filter_by(
- circ_id = c.circ_id).first()
+ circ = Circuit.query.filter_by(circ_id = c.circ_id).first()
if not circ: return # Skip circuits from before we came online
-
+
+ circ.expunge()
if isinstance(circ, BuiltCircuit):
- circ = DestroyedCircuit().xfer_copy(circ)
+ # Convert to destroyed circuit
+ Circuit.table.update(Circuit.id ==
+ circ.id).execute(row_type='destroyedcircuit')
+ circ = DestroyedCircuit.query.filter_by(id=circ.id).one()
circ.destroy_reason = reason
circ.destroy_time = c.arrived_at
else:
- circ = FailedCircuit().xfer_copy(circ)
+ # Convert to failed circuit
+ Circuit.table.update(Circuit.id ==
+ circ.id).execute(row_type='failedcircuit')
+ circ = FailedCircuit.query.options(
+ eagerload('extensions')).filter_by(id=circ.id).one()
circ.fail_reason = reason
circ.fail_time = c.arrived_at
e = FailedExtension(circ=circ, hop=len(c.path)+1, time=c.arrived_at)
@@ -579,26 +578,35 @@
tc_session.save_or_update(circ)
tc_session.commit()
elif c.status == "BUILT":
- circ = Circuit.query.options(eagerload_all('routers.circuits'),
- eagerload('extensions'), eagerload('streams')).filter_by(
+ circ = Circuit.query.filter_by(
circ_id = c.circ_id).first()
if not circ: return # Skip circuits from before we came online
+
+ circ.expunge()
+ # Convert to built circuit
+ Circuit.table.update(Circuit.id ==
+ circ.id).execute(row_type='builtcircuit')
+ circ = BuiltCircuit.query.filter_by(id=circ.id).one()
- circ = BuiltCircuit().xfer_copy(circ)
circ.built_time = c.arrived_at
circ.tot_delta = c.arrived_at - circ.launch_time
tc_session.save_or_update(circ)
tc_session.commit()
elif c.status == "CLOSED":
- circ = BuiltCircuit.query.options(eagerload_all('routers.circuits'),
- eagerload('extensions'), eagerload('streams')).filter_by(
- circ_id = c.circ_id).first()
+ circ = BuiltCircuit.query.filter_by(circ_id = c.circ_id).first()
if circ:
+ circ.expunge()
if lreason in ("REQUESTED", "FINISHED", "ORIGIN"):
- circ = ClosedCircuit().xfer_copy(circ)
+ # Convert to closed circuit
+ Circuit.table.update(Circuit.id ==
+ circ.id).execute(row_type='closedcircuit')
+ circ = ClosedCircuit.query.filter_by(id=circ.id).one()
circ.closed_time = c.arrived_at
else:
- circ = DestroyedCircuit().xfer_copy(circ)
+ # Convert to destroyed circuit
+ Circuit.table.update(Circuit.id ==
+ circ.id).execute(row_type='destroyedcircuit')
+ circ = DestroyedCircuit.query.filter_by(id=circ.id).one()
circ.destroy_reason = reason
circ.destroy_time = c.arrived_at
tc_session.save_or_update(circ)