[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[or-cvs] r19396: {} Add initial SQL event listener support using elixir. (torctl/trunk/python/TorCtl)
Author: mikeperry
Date: 2009-04-29 21:08:00 -0400 (Wed, 29 Apr 2009)
New Revision: 19396
Added:
torctl/trunk/python/TorCtl/SQLSupport.py
Log:
Add initial SQL event listener support using elixir.
Added: torctl/trunk/python/TorCtl/SQLSupport.py
===================================================================
--- torctl/trunk/python/TorCtl/SQLSupport.py (rev 0)
+++ torctl/trunk/python/TorCtl/SQLSupport.py 2009-04-30 01:08:00 UTC (rev 19396)
@@ -0,0 +1,646 @@
+#!/usr/bin/python
+
+"""
+
+Support classes for statisics gathering in SQL Databases
+
+DOCDOC
+
+"""
+
+import socket
+import sys
+import time
+import datetime
+
+import PathSupport, TorCtl
+from TorUtil import *
+from PathSupport import *
+from TorUtil import meta_port, meta_host, control_port, control_host, control_pass
+from TorCtl import EVENT_TYPE, EVENT_STATE, TorCtlError
+
+from sqlalchemy.orm import scoped_session, sessionmaker, eagerload, lazyload, eagerload_all
+from sqlalchemy import create_engine
+from sqlalchemy.schema import ThreadLocalMetaData,MetaData
+from elixir import *
+
+#################### Model #######################
+
+# In elixir, the session (DB connection) is a property of the model..
+# There can only be one for all of the listeners below that use it
+# See http://elixir.ematia.de/trac/wiki/Recipes/MultipleDatabases
+OP=None
+tc_metadata = MetaData()
+tc_metadata.echo=False
+tc_session = scoped_session(sessionmaker(autoflush=True))
+#__metadata__ = tc_metadata
+#__session__ = tc_session
+
+def setup_db(db_uri):
+ #session.close()
+ tc_engine = create_engine(db_uri, echo=False)
+ tc_metadata.bind = tc_engine
+ tc_metadata.echo = False
+
+ setup_all()
+ create_all()
+
+class Router(Entity):
+ using_options(order_by='-published', session=tc_session, metadata=tc_metadata)
+ using_mapper_options(save_on_init=False)
+ idhex = Field(CHAR(40), primary_key=True, index=True)
+ orhash = Field(CHAR(27))
+ published = Field(Time)
+ nick = Field(Text)
+
+ OS = Field(Text)
+ rate_limited = Field(Boolean)
+ guard = Field(Boolean)
+ exit = Field(Boolean)
+ stable = Field(Boolean)
+ v2dir = Field(Boolean)
+ v3dir = Field(Boolean)
+ hsdir = Field(Boolean)
+
+ bw = Field(Integer)
+ version = Field(Integer)
+ # FIXME: is mutable=False what we want? Do we care?
+ router = Field(PickleType(mutable=False))
+ circuits = ManyToMany('Circuit')
+ streams = ManyToMany('Stream')
+ bw_history = OneToMany('BwHistory')
+ stats = OneToOne('RouterStats', inverse="router")
+
+ def from_router(self, router):
+ self.published = router.published
+ self.bw = router.bw
+ self.idhex = router.idhex
+ self.orhash = router.orhash
+ self.nick = router.nickname
+ self.OS = router.os
+ self.rate_limited = router.rate_limited
+ self.guard = "Guard" in router.flags
+ self.exit = "Exit" in router.flags
+ self.stable = "Stable" in router.flags
+ self.v2dir = "V2Dir" in router.flags
+ self.v3dir = "V3Dir" in router.flags
+ self.hsdir = "HSDir" in router.flags
+ self.version = router.version.version
+ self.router = router #pickle.dumps(router)
+ return self
+
+class BwHistory(Entity):
+ using_options(session=tc_session, metadata=tc_metadata)
+ using_mapper_options(save_on_init=False)
+ router = ManyToOne('Router')
+ bw = Field(Integer)
+ rank = Field(Integer)
+ pub_time = Field(Time)
+
+class Circuit(Entity):
+ using_options(order_by='-launch_time', session=tc_session, metadata=tc_metadata)
+ using_mapper_options(save_on_init=False)
+ routers = ManyToMany('Router')
+ streams = OneToMany('Stream')
+ extensions = OneToMany('Extension', inverse='circ')
+ circ_id = Field(Integer, index=True)
+ launch_time = Field(Float)
+ last_extend = Field(Float)
+
+ def xfer_copy(self, circ):
+ rs = []
+ for r in circ.routers:
+ rs.append(r)
+ for r in rs:
+ #r.circuits.append(self)
+ rcl = len(r.circuits)
+ self.routers.append(r)
+ r.circuits.remove(circ)
+ assert rcl == len(r.circuits)
+ assert len(self.routers) == len(rs)
+
+ csl = []
+ for s in circ.streams:
+ csl.append(s)
+ for s in csl:
+ self.streams.append(s)
+ s.circ = self
+ assert len(csl) == len(self.streams)
+
+ cel = []
+ for e in circ.extensions:
+ cel.append(e)
+ for e in cel:
+ self.extensions.append(e)
+ e.circ = self
+ assert len(cel) == len(self.extensions)
+
+ self.circ_id = circ.circ_id
+ self.launch_time = circ.launch_time
+ self.last_extend = circ.last_extend
+ tc_session.delete(circ)
+ return self
+
+class FailedCircuit(Circuit):
+ using_mapper_options(save_on_init=False)
+ using_options(session=tc_session, metadata=tc_metadata)
+ #failed_extend = ManyToOne('Extension', inverse='circ')
+ fail_reason = Field(Text)
+ fail_time = Field(Float)
+
+class BuiltCircuit(Circuit):
+ using_options(session=tc_session, metadata=tc_metadata)
+ using_mapper_options(save_on_init=False)
+ built_time = Field(Float)
+ tot_delta = Field(Float)
+
+class DestroyedCircuit(Circuit):
+ using_options(session=tc_session, metadata=tc_metadata)
+ using_mapper_options(save_on_init=False)
+ destroy_reason = Field(Text)
+ destroy_time = Field(Float)
+
+class ClosedCircuit(BuiltCircuit):
+ using_options(session=tc_session, metadata=tc_metadata)
+ using_mapper_options(save_on_init=False)
+ closed_time = Field(Float)
+
+class Extension(Entity):
+ using_mapper_options(save_on_init=False)
+ using_options(order_by='-time', session=tc_session, metadata=tc_metadata)
+ circ = ManyToOne('Circuit', inverse='extensions')
+ from_node = ManyToOne('Router')
+ to_node = ManyToOne('Router')
+ hop = Field(Integer)
+ time = Field(Float)
+ delta = Field(Float)
+
+class FailedExtension(Extension):
+ using_options(session=tc_session, metadata=tc_metadata)
+ #failed_circ = ManyToOne('FailedCircuit', inverse='failed_extend')
+ using_mapper_options(save_on_init=False)
+ reason = Field(Text)
+
+class Stream(Entity):
+ using_options(session=tc_session, metadata=tc_metadata)
+ using_options(order_by='-start_time')
+ using_mapper_options(save_on_init=False)
+ circuit = ManyToOne('Circuit')
+ strm_id = Field(Integer, index=True)
+ start_time = Field(Float)
+ tot_bytes = Field(Integer)
+
+class FailedStream(Stream):
+ using_options(session=tc_session, metadata=tc_metadata)
+ using_mapper_options(save_on_init=False)
+ reason = Field(Text)
+ fail_time = Field(Float)
+
+class ClosedStream(Stream):
+ using_options(session=tc_session, metadata=tc_metadata)
+ using_mapper_options(save_on_init=False)
+ end_time = Field(Float)
+ bandwidth = Field(Float)
+
+class RouterStats(Entity):
+ using_options(session=tc_session, metadata=tc_metadata)
+ using_mapper_options(save_on_init=False)
+ router = ManyToOne('Router', inverse="stats")
+
+ # Unused
+ circ_used = Field(Integer) # Extended up to this node
+ circ_fail = Field(Integer) # Includes timeouts of priors
+
+ # Easily derived from BwHistory
+ min_rank = Field(Integer)
+ avg_rank = Field(Integer)
+ max_rank = Field(Integer)
+ avg_bw = Field(Float)
+
+ percentile = Field(Float)
+
+ # These can be derived with a single query over
+ # FailedExtension and Extension
+ circ_fail_to = Field(Integer)
+ circ_fail_from = Field(Integer)
+ circ_try_to = Field(Integer)
+ circ_try_from = Field(Integer)
+
+ circ_from_rate = Field(Float)
+ circ_to_rate = Field(Float)
+
+ circ_to_ratio = Field(Float)
+ circ_from_ratio = Field(Float)
+ circ_bi_ratio = Field(Float)
+
+ avg_first_ext = Field(Float)
+ first_ext_ratio = Field(Float)
+
+ avg_sbw = Field(Float)
+ sbw_ratio = Field(Float)
+
+ # FIXME: Figure out how to efficiently compute these..
+ filt_to_ratio = Field(Float)
+ filt_from_ratio = Field(Float)
+ filt_bi_ratio = Field(Float)
+ filt_sbw_ratio = Field(Float)
+
+ def _compute_stats():
+ # FIXME: Change loading method from lazy to eager here
+ # FIXME: Always?
+ for r in Router.query.all():
+ rs = r.stats
+ rs.circ_fail_to = 0
+ rs.circ_try_to = 0
+ rs.circ_fail_from = 0
+ rs.circ_try_from = 0
+ tot_extend_time = 0
+ tot_extends = 0
+ for c in r.circuits:
+ # FIXME: Should this be SQL-query based instead?
+ for e in c.extensions:
+ if e.to_node == r:
+ rs.circ_try_to += 1
+ if isinstance(e, FailedExtension):
+ rs.circ_fail_to += 1
+ elif e.hop == 0:
+ tot_extend_time += e.delta
+ tot_extends += 1
+ elif e.from_node == r:
+ rs.circ_try_from += 1
+ if isinstance(e, FailedExtension):
+ rs.circ_fail_from += 1
+
+ if isinstance(c, FailedCircuit):
+ pass
+ # TODO: Also count timeouts against earlier nodes?
+ elif isinstance(c, DestroyedCircuit):
+ pass # TODO: Count these somehow..
+
+ if rs.circ_try_from > 0:
+ rs.circ_from_rate = (1.0*rs.circ_fail_from/rs.circ_try_from)
+ if rs.circ_try_to > 0:
+ rs.circ_to_rate = (1.0*rs.circ_fail_to/rs.circ_try_to)
+
+ if tot_extends > 0:
+ rs.avg_first_ext = (1.0*tot_extend_time)/tot_extends
+ else:
+ rs.avg_first_ext = 0
+ #for s in r.streams:
+ # if isinstance(c, ClosedStream):
+ # elif isinstance(c, FailedStream):
+ tc_session.update(rs)
+ _compute_stats = Callable(_compute_stats)
+
+ def _compute_ranks():
+ # FIXME: Single query for this?
+ min_avg_rank = 0x7fffffff
+ max_avg_rank = 0
+ for r in Router.query.all():
+ if r.stats: tc_session.delete(r.stats)
+ rs = RouterStats()
+ rs.router = r
+ r.stats = rs
+
+ min_rank = 0x7fffffff
+ tot_rank = 0
+ tot_bw = 0
+ max_rank = 0
+ ranks = len(r.bw_history)
+ for h in r.bw_history:
+ tot_rank += h.rank
+ tot_bw += h.bw
+ if h.rank < min_rank:
+ min_rank = h.rank
+ if h.rank > max_rank:
+ max_rank = h.rank
+ rs.min_rank = min_rank
+ rs.avg_rank = (1.0*tot_rank)/ranks
+ if rs.avg_rank < min_avg_rank:
+ min_avg_rank = rs.avg_rank
+ if rs.avg_rank > max_avg_rank:
+ max_avg_rank = rs.avg_rank
+ rs.max_rank = max_rank
+ rs.avg_bw = (1.0*tot_bw)/ranks
+ tc_session.save_or_update(rs)
+ tc_session.update(r)
+ # FIXME: Single query for this?
+ for rs in RouterStats.query.all():
+ rs.percentile = (100.0*rs.avg_rank)/(max_avg_rank - min_avg_rank)
+ tc_session.update(rs)
+ _compute_ranks = Callable(_compute_ranks)
+
+ def _compute_ratios():
+ pass
+ _compute_ratios = Callable(_compute_ratios)
+
+ def _compute_filtered_ratios():
+ pass
+ _compute_filtered_ratios = Callable(_compute_filtered_ratios)
+
+ def reset():
+ for r in Router.query.all():
+ r.stats = None
+ tc_session.update(r)
+ RouterStats.table.drop()
+ RouterStats.table.create()
+ tc_session.commit()
+ reset = Callable(reset)
+
+ def compute(router_filter, stats_filter):
+ RouterStats._compute_ranks()
+ RouterStats._compute_stats()
+ RouterStats._compute_ratios()
+ RouterStats._compute_filtered_ratios()
+ tc_session.commit()
+ compute = Callable(compute)
+
+##################### End Model ####################
+
+class CircuitStatsBroker:
+ pass
+
+class StreamStatsBroker:
+ pass
+
+class RatioBroker:
+ pass
+
+#################### Model Support ################
+def reset_all_stats():
+ # Need to keep routers around..
+ for r in Router.query.all():
+ r.bw_history = [] # XXX: Is this sufficient/correct?
+ r.circuits = []
+ r.streams = []
+ r.stats = None
+ tc_session.update(r)
+
+ BwHistory.table.drop() # Will drop subclasses
+ Extension.table.drop()
+ Stream.table.drop()
+ Circuit.table.drop()
+ RouterStats.table.drop()
+
+ RouterStats.table.create()
+ BwHistory.table.create()
+ Extension.table.create()
+ Stream.table.create()
+ Circuit.table.create()
+
+ tc_session.commit()
+
+##################### End Model Support ####################
+
+class ConsensusTrackerListener(TorCtl.DualEventListener):
+ def __init__(self):
+ TorCtl.DualEventListener.__init__(self)
+ self.last_desc_at = time.time()
+ self.consensus = None
+
+ # XXX: What about non-running routers and uptime information?
+ def _update_rank_history(self, idlist):
+ for idhex in idlist:
+ if idhex not in self.consensus.routers: continue
+ rc = self.consensus.routers[idhex]
+ r = Router.query.options(eagerload('bw_history')).filter_by(
+ idhex=idhex).one()
+ bwh = BwHistory(router=r, rank=rc.list_rank, bw=rc.bw,
+ pub_time=r.published)
+ r.bw_history.append(bwh)
+ tc_session.save_or_update(bwh)
+ tc_session.update(r)
+ tc_session.commit()
+
+ def _update_db(self, idlist):
+ for idhex in idlist:
+ if idhex in self.consensus.routers:
+ rc = self.consensus.routers[idhex]
+ r = Router.query.filter_by(idhex=rc.idhex).first()
+
+ if r and r.orhash == rc.orhash:
+ # We already have it stored. (Possible spurious NEWDESC)
+ continue
+
+ if not r: r = Router()
+
+ r.from_router(rc)
+ tc_session.save_or_update(r)
+ tc_session.commit()
+
+ def update_consensus(self):
+ self.consensus = self.parent_handler.current_consensus()
+ self._update_db(self.consensus.ns_map.iterkeys())
+
+ def set_parent(self, parent_handler):
+ if not isinstance(parent_handler, TorCtl.ConsensusTracker):
+ raise TorCtlError("ConsensusTrackerListener can only be attached to ConsensusTracker instances")
+ TorCtl.DualEventListener.set_parent(self, parent_handler)
+
+ def heartbeat_event(self, e):
+ if e.state == EVENT_STATE.PRELISTEN:
+ if not self.consensus:
+ global OP
+ OP = Router.query.filter_by(
+ idhex="0000000000000000000000000000000000000000").first()
+ if not OP:
+ OP = Router(idhex="0000000000000000000000000000000000000000",
+ orhash="000000000000000000000000000",
+ nick="!!TorClient", published=datetime.datetime.utcnow())
+ tc_session.save_or_update(OP)
+ tc_session.commit()
+ self.update_consensus()
+ # So ghetto
+ if e.arrived_at - self.last_desc_at > 20.0:
+ plog("INFO", "Newdesc timer is up. Assuming we have full consensus now")
+ self.last_desc_at = 0x7fffffff
+ self._update_rank_history(self.consensus.ns_map.iterkeys())
+
+ def new_consensus_event(self, n):
+ if n.state == EVENT_STATE.POSTLISTEN:
+ self.last_desc_at = n.arrived_at
+ self.update_consensus()
+
+ def new_desc_event(self, d):
+ if d.state == EVENT_STATE.POSTLISTEN:
+ self.last_desc_at = d.arrived_at
+ self.consensus = self.parent_handler.current_consensus()
+ self._update_db(d.idlist)
+
+class CircuitListener(TorCtl.PreEventListener):
+ def set_parent(self, parent_handler):
+ if not filter(lambda f: f.__class__ == ConsensusTrackerListener,
+ parent_handler.post_listeners):
+ raise TorCtlError("CircuitListener needs a ConsensusTrackerListener")
+ TorCtl.PreEventListener.set_parent(self, parent_handler)
+ # TODO: This is really lame. We only know the extendee of a circuit
+ # if we have built the path ourselves. Otherwise, Tor keeps it a
+ # secret from us. This prevents us from properly tracking failures
+ # for normal Tor usage.
+ if isinstance(parent_handler, PathSupport.PathBuilder):
+ self.track_parent = True
+ else:
+ self.track_parent = False
+
+ def circ_status_event(self, c):
+ if self.track_parent and c.cird_id not in self.parent_handler.circuits:
+ return # Ignore circuits that aren't ours
+ # TODO: Hrmm, consider making this sane in TorCtl.
+ if c.reason: lreason = c.reason
+ else: lreason = "NONE"
+ if c.remote_reason: rreason = c.remote_reason
+ else: rreason = "NONE"
+ reason = c.event_name+":"+c.status+":"+lreason+":"+rreason
+
+ output = [str(c.arrived_at), str(time.time()-c.arrived_at), c.event_name, str(c.circ_id), c.status]
+ if c.path: output.append(",".join(c.path))
+ if c.reason: output.append("REASON=" + c.reason)
+ if c.remote_reason: output.append("REMOTE_REASON=" + c.remote_reason)
+ plog("DEBUG", " ".join(output))
+
+ if c.status == "LAUNCHED":
+ circ = Circuit(circ_id=c.circ_id,launch_time=c.arrived_at,
+ last_extend=c.arrived_at)
+ if self.track_parent:
+ for r in self.parent_handler.circuits[c.circ_id].path:
+ rq = Router.query.options(eagerload('circuits')).filter_by(
+ idhex=r.idhex).one()
+ circ.routers.append(rq)
+ rq.circuits.append(circ)
+ tc_session.update(rq)
+ tc_session.save_or_update(circ)
+ tc_session.commit()
+ elif c.status == "EXTENDED":
+ circ = Circuit.query.options(eagerload('extensions')).filter_by(
+ circ_id = c.circ_id).first()
+ if not circ: return # Skip circuits from before we came online
+
+ e = Extension(circ=circ, hop=len(c.path), time=c.arrived_at)
+
+ if len(c.path) == 1:
+ e.from_node = OP
+ else:
+ r_ext = c.path[-2]
+ if r_ext[0] != '$': r_ext = self.parent_handler.name_to_key[r_ext]
+ e.from_node = Router.query.filter_by(idhex=r_ext[1:]).one()
+
+ r_ext = c.path[-1]
+ if r_ext[0] != '$': r_ext = self.parent_handler.name_to_key[r_ext]
+
+ e.to_node = Router.query.filter_by(idhex=r_ext[1:]).one()
+ if not self.track_parent:
+ # XXX: Eager load here?
+ circ.routers.append(e.to_node)
+ e.to_node.circuits.append(circ)
+ tc_session.update(e.to_node)
+
+ e.delta = c.arrived_at - circ.last_extend
+ circ.last_extend = c.arrived_at
+ circ.extensions.append(e)
+ tc_session.save_or_update(e)
+ tc_session.update(circ)
+ tc_session.commit()
+ elif c.status == "FAILED":
+ circ = Circuit.query.options(eagerload_all('routers.circuits'),
+ eagerload('extensions'), eagerload('streams')).filter_by(
+ circ_id = c.circ_id).first()
+ if not circ: return # Skip circuits from before we came online
+
+ if isinstance(circ, BuiltCircuit):
+ circ = DestroyedCircuit().xfer_copy(circ)
+ circ.destroy_reason = reason
+ circ.destroy_time = c.arrived_at
+ else:
+ circ = FailedCircuit().xfer_copy(circ)
+ circ.fail_reason = reason
+ circ.fail_time = c.arrived_at
+ e = FailedExtension(circ=circ, hop=len(c.path)+1, time=c.arrived_at)
+
+ if len(c.path) == 0:
+ e.from_node = OP
+ else:
+ r_ext = c.path[-1]
+ if r_ext[0] != '$': r_ext = self.parent_handler.name_to_key[r_ext]
+
+ e.from_node = Router.query.filter_by(idhex=r_ext[1:]).one()
+
+ if self.track_parent:
+ r=self.parent_handler.circuits[c.circ_id].path[len(c.path)+1]
+ e.to_node = Router.query.filter_by(idhex=r.idhex).one()
+ else:
+ e.to_node = None # We have no idea..
+
+ e.delta = c.arrived_at - circ.last_extend
+ e.reason = reason
+ circ.extensions.append(e)
+ circ.fail_time = c.arrived_at
+ tc_session.save_or_update(e)
+
+ tc_session.save_or_update(circ)
+ tc_session.commit()
+ elif c.status == "BUILT":
+ circ = Circuit.query.options(eagerload_all('routers.circuits'),
+ eagerload('extensions'), eagerload('streams')).filter_by(
+ circ_id = c.circ_id).first()
+ if not circ: return # Skip circuits from before we came online
+
+ circ = BuiltCircuit().xfer_copy(circ)
+ circ.built_time = c.arrived_at
+ circ.tot_delta = c.arrived_at - circ.launch_time
+ tc_session.save_or_update(circ)
+ tc_session.commit()
+ elif c.status == "CLOSED":
+ circ = BuiltCircuit.query.options(eagerload_all('routers.circuits'),
+ eagerload('extensions'), eagerload('streams')).filter_by(
+ circ_id = c.circ_id).first()
+ if circ:
+ if lreason in ("REQUESTED", "FINISHED", "ORIGIN"):
+ circ = ClosedCircuit().xfer_copy(circ)
+ circ.closed_time = c.arrived_at
+ else:
+ circ = DestroyedCircuit().xfer_copy(circ)
+ circ.destroy_reason = reason
+ circ.destroy_time = c.arrived_at
+ tc_session.save_or_update(circ)
+ tc_session.commit()
+
+class StreamListener(CircuitListener):
+ def stream_bw_event(self, s):
+ pass
+ def stream_status_event(self, s):
+ pass
+
+def run_example(host, port):
+ """ Example of basic TorCtl usage. See PathSupport for more advanced
+ usage.
+ """
+ print "host is %s:%d"%(host,port)
+ setup_db("sqlite:///torflow.sqllite")
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.connect((host,port))
+ c = Connection(s)
+ th = c.launch_thread()
+ c.authenticate(control_pass)
+ c.set_event_handler(TorCtl.ConsensusTracker(c))
+ c.add_event_listener(ConsensusTrackerListener())
+ c.add_event_listener(CircuitListener())
+
+ print `c.extend_circuit(0,["moria1"])`
+ try:
+ print `c.extend_circuit(0,[""])`
+ except TorCtl.ErrorReply: # wtf?
+ print "got error. good."
+ except:
+ print "Strange error", sys.exc_info()[0]
+
+ c.set_events([EVENT_TYPE.STREAM, EVENT_TYPE.CIRC,
+ EVENT_TYPE.NEWCONSENSUS, EVENT_TYPE.NEWDESC,
+ EVENT_TYPE.ORCONN, EVENT_TYPE.BW], True)
+
+ th.join()
+ return
+
+
+if __name__ == '__main__':
+ run_example(control_host,control_port)
+