[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[or-cvs] r19395: {torctl} Add 'EventListener' support to TorCtl, so that multiple list (torctl/trunk/python/TorCtl)
Author: mikeperry
Date: 2009-04-29 21:07:00 -0400 (Wed, 29 Apr 2009)
New Revision: 19395
Modified:
torctl/trunk/python/TorCtl/TorCtl.py
torctl/trunk/python/TorCtl/__init__.py
Log:
Add 'EventListener' support to TorCtl, so that multiple listeners can be
attached independent of being full EventHandlers or part of the PathSupport
inheritance hierarchy.
Modified: torctl/trunk/python/TorCtl/TorCtl.py
===================================================================
--- torctl/trunk/python/TorCtl/TorCtl.py 2009-04-28 13:08:47 UTC (rev 19394)
+++ torctl/trunk/python/TorCtl/TorCtl.py 2009-04-30 01:07:00 UTC (rev 19395)
@@ -27,7 +27,7 @@
"EventHandler", "DebugEventHandler", "NetworkStatusEvent",
"NewDescEvent", "CircuitEvent", "StreamEvent", "ORConnEvent",
"StreamBwEvent", "LogEvent", "AddrMapEvent", "BWEvent",
- "UnknownEvent", "ConsensusTracker" ]
+ "UnknownEvent", "ConsensusTracker", "EventListener", "EVENT_STATE" ]
import os
import re
@@ -63,6 +63,14 @@
WARN="WARN",
ERR="ERR")
+EVENT_STATE = Enum2(
+ PRISTINE="PRISTINE",
+ PRELISTEN="PRELISTEN",
+ HEARTBEAT="HEARTBEAT",
+ HANDLING="HANDLING",
+ POSTLISTEN="POSTLISTEN",
+ DONE="DONE")
+
class TorCtlError(Exception):
"Generic error raised by TorControl code."
pass
@@ -93,26 +101,29 @@
m = re.search(r"(\d+)-(\d+)-(\d+) (\d+):(\d+):(\d+)", updated)
self.updated = datetime.datetime(*map(int, m.groups()))
-class NetworkStatusEvent:
- def __init__(self, event_name, nslist):
+class Event:
+ def __init__(self, event_name):
self.event_name = event_name
self.arrived_at = 0
+ self.state = EVENT_STATE.PRISTINE
+
+class NetworkStatusEvent(Event):
+ def __init__(self, event_name, nslist):
+ Event.__init__(self, event_name)
self.nslist = nslist # List of NetworkStatus objects
class NewConsensusEvent(NetworkStatusEvent):
pass
-class NewDescEvent:
+class NewDescEvent(Event):
def __init__(self, event_name, idlist):
- self.event_name = event_name
- self.arrived_at = 0
+ Event.__init__(self, event_name)
self.idlist = idlist
-class CircuitEvent:
+class CircuitEvent(Event):
def __init__(self, event_name, circ_id, status, path, purpose,
reason, remote_reason):
- self.event_name = event_name
- self.arrived_at = 0
+ Event.__init__(self, event_name)
self.circ_id = circ_id
self.status = status
self.path = path
@@ -120,11 +131,10 @@
self.reason = reason
self.remote_reason = remote_reason
-class StreamEvent:
+class StreamEvent(Event):
def __init__(self, event_name, strm_id, status, circ_id, target_host,
target_port, reason, remote_reason, source, source_addr, purpose):
- self.event_name = event_name
- self.arrived_at = 0
+ Event.__init__(self, event_name)
self.strm_id = strm_id
self.status = status
self.circ_id = circ_id
@@ -136,11 +146,10 @@
self.source_addr = source_addr
self.purpose = purpose
-class ORConnEvent:
+class ORConnEvent(Event):
def __init__(self, event_name, status, endpoint, age, read_bytes,
wrote_bytes, reason, ncircs):
- self.event_name = event_name
- self.arrived_at = 0
+ Event.__init__(self, event_name)
self.status = status
self.endpoint = endpoint
self.age = age
@@ -149,21 +158,22 @@
self.reason = reason
self.ncircs = ncircs
-class StreamBwEvent:
+class StreamBwEvent(Event):
def __init__(self, event_name, strm_id, read, written):
- self.event_name = event_name
+ Event.__init__(self, event_name)
self.strm_id = int(strm_id)
self.bytes_read = int(read)
self.bytes_written = int(written)
-class LogEvent:
+class LogEvent(Event):
def __init__(self, level, msg):
- self.event_name = self.level = level
+ Event.__init__(self, level)
+ self.level = level
self.msg = msg
-class AddrMapEvent:
+class AddrMapEvent(Event):
def __init__(self, event_name, from_addr, to_addr, when):
- self.event_name = event_name
+ Event.__init__(self, event_name)
self.from_addr = from_addr
self.to_addr = to_addr
self.when = when
@@ -174,15 +184,15 @@
self.to_addr = to_addr
self.when = when
-class BWEvent:
+class BWEvent(Event):
def __init__(self, event_name, read, written):
- self.event_name = event_name
+ Event.__init__(self, event_name)
self.read = read
self.written = written
-class UnknownEvent:
+class UnknownEvent(Event):
def __init__(self, event_name, event_string):
- self.event_name = event_name
+ Event.__init__(self, event_name)
self.event_string = event_string
class ExitPolicyLine:
@@ -265,7 +275,7 @@
self.__dict__[i] = copy.deepcopy(args[0].__dict__[i])
return
else:
- (idhex, name, bw, down, exitpolicy, flags, ip, version, os, uptime, published, contact, rate_limited) = args
+ (idhex, name, bw, down, exitpolicy, flags, ip, version, os, uptime, published, contact, rate_limited, orhash) = args
self.idhex = idhex
self.nickname = name
self.bw = bw
@@ -283,6 +293,7 @@
self.deleted = False # Has Tor already deleted this descriptor?
self.contact = contact
self.rate_limited = rate_limited
+ self.orhash = orhash
def __str__(self):
s = self.idhex, self.nickname
@@ -353,7 +364,8 @@
if not version or not os:
plog("INFO", "No version and/or OS for router " + ns.nickname)
return Router(ns.idhex, ns.nickname, bw_observed, dead, exitpolicy,
- ns.flags, ip, version, os, uptime, published, contact, rate_limited)
+ ns.flags, ip, version, os, uptime, published, contact, rate_limited,
+ ns.orhash)
build_from_desc = Callable(build_from_desc)
def update_to(self, new):
@@ -562,9 +574,18 @@
def set_event_handler(self, handler):
"""Cause future events from the Tor process to be sent to 'handler'.
"""
+ if self._handler:
+ handler.pre_listeners = self._handler.pre_listeners
+ handler.post_listeners = self._handler.post_listeners
self._handler = handler
self._handleFn = handler._handle1
+ def add_event_listener(self, listener):
+ if not self._handler:
+ self._handler = EventHandler()
+ self._handleFn = self._handler._handle1
+ self._handler.add_event_listener(listener)
+
def _read_reply(self):
lines = []
while 1:
@@ -887,7 +908,68 @@
nslist.append(NetworkStatus(*(m.groups() + (flags,))))
return nslist
-class EventHandler:
+class EventSink:
+ def heartbeat_event(self, event): pass
+ def unknown_event(self, event): pass
+ def circ_status_event(self, event): pass
+ def stream_status_event(self, event): pass
+ def stream_bw_event(self, event): pass
+ def or_conn_status_event(self, event): pass
+ def bandwidth_event(self, event): pass
+ def new_desc_event(self, event): pass
+ def msg_event(self, event): pass
+ def ns_event(self, event): pass
+ def new_consensus_event(self, event): pass
+ def address_mapped_event(self, event): pass
+
+class EventListener(EventSink):
+ """An 'EventListener' is a passive sink for parsed Tor events. It
+ implements the same interface as EventHandler, but it should
+ not alter Tor's behavior as a result of these events.
+
+ Do not extend from this class. Instead, extend from one of
+ Pre, Post, or Dual event listener, to get events
+ before, after, or before and after the EventHandler handles them.
+ """
+ def __init__(self):
+ """Create a new EventHandler."""
+ self._map1 = {
+ "CIRC" : self.circ_status_event,
+ "STREAM" : self.stream_status_event,
+ "ORCONN" : self.or_conn_status_event,
+ "STREAM_BW" : self.stream_bw_event,
+ "BW" : self.bandwidth_event,
+ "DEBUG" : self.msg_event,
+ "INFO" : self.msg_event,
+ "NOTICE" : self.msg_event,
+ "WARN" : self.msg_event,
+ "ERR" : self.msg_event,
+ "NEWDESC" : self.new_desc_event,
+ "ADDRMAP" : self.address_mapped_event,
+ "NS" : self.ns_event,
+ "NEWCONSENSUS" : self.new_consensus_event
+ }
+ self.parent_handler = None
+ self._sabotage()
+
+ def _sabotage(self):
+ raise TorCtlError("Error: Do not extend from EventListener directly! Use Pre, Post or DualEventListener instead.")
+
+ def listen(self, event):
+ self.heartbeat_event(event)
+ self._map1.get(event.event_name, self.unknown_event)(event)
+
+ def set_parent(self, parent_handler):
+ self.parent_handler = parent_handler
+
+class PreEventListener(EventListener):
+ def _sabotage(self): pass
+class PostEventListener(EventListener):
+ def _sabotage(self): pass
+class DualEventListener(PreEventListener,PostEventListener):
+ def _sabotage(self): pass
+
+class EventHandler(EventSink):
"""An 'EventHandler' wraps callbacks for the events Tor can return.
Each event argument is an instance of the corresponding event
class."""
@@ -909,14 +991,24 @@
"NS" : self.ns_event,
"NEWCONSENSUS" : self.new_consensus_event
}
+ self.pre_listeners = []
+ self.post_listeners = []
def _handle1(self, timestamp, lines):
"""Dispatcher: called from Connection when an event is received."""
for code, msg, data in lines:
event = self._decode1(msg, data)
event.arrived_at = timestamp
+ event.state=EVENT_STATE.PRELISTEN
+ for l in self.pre_listeners:
+ l.listen(event)
+ event.state=EVENT_STATE.HEARTBEAT
self.heartbeat_event(event)
+ event.state=EVENT_STATE.HANDLING
self._map1.get(event.event_name, self.unknown_event)(event)
+ event.state=EVENT_STATE.POSTLISTEN
+ for l in self.post_listeners:
+ l.listen(event)
def _decode1(self, body, data):
"""Unpack an event message into a type/arguments-tuple tuple."""
@@ -1027,6 +1119,13 @@
return event
+ def add_event_listener(self, evlistener):
+ if isinstance(evlistener, PreEventListener):
+ self.pre_listeners.append(evlistener)
+ if isinstance(evlistener, PostEventListener):
+ self.post_listeners.append(evlistener)
+ evlistener.set_parent(self)
+
def heartbeat_event(self, event):
"""Called before any event is received. Convenience function
for any cleanup/setup/reconfiguration you may need to do.
@@ -1037,53 +1136,53 @@
"""Called when we get an event type we don't recognize. This
is almost alwyas an error.
"""
- raise NotImplemented()
+ pass
def circ_status_event(self, event):
"""Called when a circuit status changes if listening to CIRCSTATUS
events."""
- raise NotImplemented()
+ pass
def stream_status_event(self, event):
"""Called when a stream status changes if listening to STREAMSTATUS
events. """
- raise NotImplemented()
+ pass
def stream_bw_event(self, event):
- raise NotImplemented()
+ pass
def or_conn_status_event(self, event):
"""Called when an OR connection's status changes if listening to
ORCONNSTATUS events."""
- raise NotImplemented()
+ pass
def bandwidth_event(self, event):
"""Called once a second if listening to BANDWIDTH events.
"""
- raise NotImplemented()
+ pass
def new_desc_event(self, event):
"""Called when Tor learns a new server descriptor if listenting to
NEWDESC events.
"""
- raise NotImplemented()
+ pass
def msg_event(self, event):
"""Called when a log message of a given severity arrives if listening
to INFO_MSG, NOTICE_MSG, WARN_MSG, or ERR_MSG events."""
- raise NotImplemented()
+ pass
def ns_event(self, event):
- raise NotImplemented()
+ pass
def new_consensus_event(self, event):
- raise NotImplemented()
+ pass
def address_mapped_event(self, event):
"""Called when Tor adds a mapping for an address if listening
to ADDRESSMAPPED events.
"""
- raise NotImplemented()
+ pass
class Consensus:
"""
Modified: torctl/trunk/python/TorCtl/__init__.py
===================================================================
--- torctl/trunk/python/TorCtl/__init__.py 2009-04-28 13:08:47 UTC (rev 19394)
+++ torctl/trunk/python/TorCtl/__init__.py 2009-04-30 01:07:00 UTC (rev 19395)
@@ -24,4 +24,5 @@
creation, stream bandwidth, and circuit failure information.
"""
-__all__ = ["TorUtil", "GeoIPSupport", "PathSupport", "TorCtl", "StatsSupport"]
+__all__ = ["TorUtil", "GeoIPSupport", "PathSupport", "TorCtl", "StatsSupport",
+ "SQLSupport"]