[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[or-cvs] Move most of the queue and thread logic into a new common b...
Update of /home/or/cvsroot/control/python
In directory moria:/tmp/cvs-serv4235
Modified Files:
TorCtl.py TorCtl0.py TorCtl1.py
Log Message:
Move most of the queue and thread logic into a new common base class in TorCtl; this means that v0 control connections are smart too.
Index: TorCtl.py
===================================================================
RCS file: /home/or/cvsroot/control/python/TorCtl.py,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -d -r1.12 -r1.13
--- TorCtl.py 12 Nov 2005 21:42:27 -0000 1.12
+++ TorCtl.py 19 Nov 2005 19:42:31 -0000 1.13
@@ -11,6 +11,8 @@
import re
import struct
import sys
+import threading
+import Queue
class TorCtlError(Exception):
"Generic error raised by TorControl code."
@@ -216,6 +218,165 @@
"""
raise NotImplemented
+class _ConnectionBase:
+ def __init__(self):
+ self._s = None
+ self._handler = None
+ self._handleFn = None
+ self._sendLock = threading.RLock()
+ self._queue = Queue.Queue()
+ self._thread = None
+ self._closedEx = None
+ self._closed = 0
+ self._closeHandler = None
+ self._eventThread = None
+ self._eventQueue = Queue.Queue()
+
+ def set_event_handler(self, handler):
+ """Cause future events from the Tor process to be sent to 'handler'.
+ """
+ raise NotImplemented
+
+ def set_close_handler(self, handler):
+ """Call 'handler' when the Tor process has closed its connection or
+ given us an exception. If we close normally, no arguments are
+ provided; otherwise, it will be called with an exception as its
+ argument.
+ """
+ self._closeHandler = handler
+
+ def close(self):
+ """Shut down this controller connection"""
+ self._sendLock.acquire()
+ try:
+ self._queue.put("CLOSE")
+ self._eventQueue.put("CLOSE")
+ self._s.close()
+ self._s = None
+ self._closed = 1
+ finally:
+ self._sendLock.release()
+
+ def _read_reply(self):
+ """DOCDOC"""
+ raise NotImplementd
+
+ def launch_thread(self, daemon=1):
+ """Launch a background thread to handle messages from the Tor process."""
+ assert self._thread is None
+ t = threading.Thread(target=self._loop)
+ if daemon:
+ t.setDaemon(daemon)
+ t.start()
+ self._thread = t
+ t = threading.Thread(target=self._eventLoop)
+ if daemon:
+ t.setDaemon(daemon)
+ t.start()
+ self._eventThread = t
+ return self._thread
+
+ def _loop(self):
+ """Main subthread loop: Read commands from Tor, and handle them either
+ as events or as responses to other commands.
+ """
+ while 1:
+ ex = None
+ try:
+ isEvent, reply = self._read_reply()
+ except:
+ self._err(sys.exc_info())
+ return
+
+ if isEvent:
+ if self._handler is not None:
+ self._eventQueue.put(reply)
+ else:
+ cb = self._queue.get()
+ cb(reply)
+
+ def _err(self, (tp, ex, tb), fromEventLoop=0):
+ """DOCDOC"""
+ if self._s:
+ try:
+ self.close()
+ except:
+ pass
+ self._sendLock.acquire()
+ try:
+ self._closedEx = ex
+ self._closed = 1
+ finally:
+ self._sendLock.release()
+ while 1:
+ try:
+ cb = self._queue.get(timeout=0)
+ if cb != "CLOSE":
+ cb("EXCEPTION")
+ except Queue.Empty:
+ break
+ if self._closeHandler is not None:
+ self._closeHandler(ex)
+ return
+
+ def _eventLoop(self):
+ """DOCDOC"""
+ while 1:
+ reply = self._eventQueue.get()
+ if reply == "CLOSE":
+ return
+ try:
+ self._handleFn(reply)
+ except:
+ self._err(sys.exc_info(), 1)
+ return
+
+ def _sendImpl(self, sendFn, msg):
+ """DOCDOC"""
+ if self._thread is None:
+ self.launch_thread(1)
+ # This condition will get notified when we've got a result...
+ condition = threading.Condition()
+ # Here's where the result goes...
+ result = []
+
+ if self._closedEx is not None:
+ raise self._closedEx
+ elif self._closed:
+ raise TorCtl.TorCtlClosed()
+
+ def cb(reply,condition=condition,result=result):
+ condition.acquire()
+ try:
+ result.append(reply)
+ condition.notify()
+ finally:
+ condition.release()
+
+ # Sends a message to Tor...
+ self._sendLock.acquire()
+ try:
+ self._queue.put(cb)
+ sendFn(msg)
+ finally:
+ self._sendLock.release()
+
+ # Now wait till the answer is in...
+ condition.acquire()
+ try:
+ while not result:
+ condition.wait()
+ finally:
+ condition.release()
+
+ # ...And handle the answer appropriately.
+ assert len(result) == 1
+ reply = result[0]
+ if reply == "EXCEPTION":
+ raise self._closedEx
+
+ return reply
+
class DebugEventHandler(EventHandler):
"""Trivial event handler: dumps all events to stdout."""
def __init__(self, out=None):
Index: TorCtl0.py
===================================================================
RCS file: /home/or/cvsroot/control/python/TorCtl0.py,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -d -r1.6 -r1.7
--- TorCtl0.py 12 Nov 2005 21:42:27 -0000 1.6
+++ TorCtl0.py 19 Nov 2005 19:42:31 -0000 1.7
@@ -13,8 +13,7 @@
import socket
import struct
import sys
-import threading
-import Queue
+import TorCtl
__all__ = [
"MSG_TYPE", "EVENT_TYPE", "CIRC_STATUS", "STREAM_STATUS",
@@ -270,54 +269,29 @@
else:
return s
-class Connection:
+class Connection(TorCtl._ConnectionBase):
"""A Connection represents a connection to the Tor process."""
def __init__(self, sock):
"""Create a Connection to communicate with the Tor process over the
socket 'sock'.
"""
+ TorCtl._ConnectionBase.__init__(self)
self._s = sock
- self._handler = None
- self._sendLock = threading.RLock()
- self._queue = Queue.Queue()
- self._thread = None
def set_event_handler(self, handler):
"""Cause future events from the Tor process to be sent to 'handler'.
"""
self._handler = handler
+ self._handleFn = handler.handle0
- def launch_thread(self, daemon=1):
- """Launch a background thread to handle messages from the Tor process."""
- assert self._thread is None
- t = threading.Thread(target=self._loop)
- if daemon:
- t.setDaemon(daemon)
- t.start()
- self._thread = t
- return t
-
- def _send(self, type, body=""):
+ def _doSend(self, (type, body)):
"""Helper: Deliver a command of type 'type' and body 'body' to Tor.
"""
self._s.sendall(pack_message(type, body))
- def _loop(self):
- """Main subthread loop: Read commands from Tor, and handle them either
- as events or as responses to other commands.
- """
- while 1:
- try:
- length, tp, body = _receive_message(self._s)
- except (OSError, socket.error), _:
- if self._queue.get(timeout=0) != "CLOSE":
- raise
- if tp == MSG_TYPE.EVENT:
- if self._handler is not None:
- self._handler.handle0(body)
- else:
- cb = self._queue.get()
- cb(tp, body)
+ def _read_reply(self):
+ length, tp, body = _receive_message(self._s)
+ return (tp == MSG_TYPE.EVENT), (tp, body)
def _sendAndRecv(self, tp, msg="", expectedTypes=(MSG_TYPE.DONE,)):
"""Helper: Send a command of type 'tp' and body 'msg' to Tor,
@@ -325,40 +299,8 @@
in expectedTypes, return a (tp,body) tuple. If it is an error,
raise ErrorReply. Otherwise, raise ProtocolError.
"""
- if self._thread is None:
- self.launch_thread(1)
- # This condition will get notified when we've got a result...
- condition = threading.Condition()
- # Here's where the result goes...
- result = []
-
- def cb(tp,body,condition=condition,result=result):
- condition.acquire()
- try:
- result.append((tp, body))
- condition.notify()
- finally:
- condition.release()
- # Sends a message to Tor...
- self._sendLock.acquire()
- try:
- self._queue.put(cb)
- self._send(tp, msg)
- finally:
- self._sendLock.release()
-
- # Now wait till the answer is in...
- condition.acquire()
- try:
- while not result:
- condition.wait()
- finally:
- condition.release()
-
- # ...And handle the answer appropriately.
- assert len(result) == 1
- tp, msg = result[0]
+ tp, msg = self.sendImpl(self._doSend, (tp, msg))
if tp in expectedTypes:
return tp, msg
elif tp == MSG_TYPE.ERROR:
@@ -371,15 +313,6 @@
else:
raise ProtocolError("Unexpectd message type 0x%04x"%tp)
- def close(self):
- """Shut down this controller connection"""
- self._sendLock.acquire()
- try:
- self._queue.put("CLOSE")
- self._s.close()
- finally:
- self._sendLock.release()
-
def authenticate(self, secret=""):
"""Send an authenticating secret to Tor. You'll need to call
this method before other commands. You need to use a
@@ -511,4 +444,3 @@
def post_descriptor(self, descriptor):
"""Tell Tor about a new descriptor in 'descriptor'."""
self._sendAndRecv(MSG_TYPE.POSTDESCRIPTOR,descriptor)
-
Index: TorCtl1.py
===================================================================
RCS file: /home/or/cvsroot/control/python/TorCtl1.py,v
retrieving revision 1.21
retrieving revision 1.22
diff -u -d -r1.21 -r1.22
--- TorCtl1.py 18 Nov 2005 20:12:42 -0000 1.21
+++ TorCtl1.py 19 Nov 2005 19:42:31 -0000 1.22
@@ -8,9 +8,7 @@
import re
import socket
import sys
-import threading
import types
-import Queue
import TorCtl
def _quote(s):
@@ -109,23 +107,15 @@
more.append(line)
lines.append((code, s, _unescape_dots("".join(more))))
-class Connection:
+class Connection(TorCtl._ConnectionBase):
"""A Connection represents a connection to the Tor process."""
def __init__(self, sock):
"""Create a Connection to communicate with the Tor process over the
socket 'sock'.
"""
+ TorCtl._ConnectionBase.__init__(self)
self._s = _BufSock(sock)
self._debugFile = None
- self._handler = None
- self._sendLock = threading.RLock()
- self._queue = Queue.Queue()
- self._thread = None
- self._closedEx = None
- self._closed = 0
- self._closeHandler = None
- self._eventThread = None
- self._eventQueue = Queue.Queue()
def debug(self, f):
"""DOCDOC"""
@@ -135,97 +125,17 @@
"""Cause future events from the Tor process to be sent to 'handler'.
"""
self._handler = handler
+ self._handleFn = handler.handle1
- def set_close_handler(self, handler):
- """Call 'handler' when the Tor process has closed its connection or
- given us an exception. If we close normally, no arguments are
- provided; otherwise, it will be called with an exception as its
- argument.
- """
- self._closeHandler = handler
-
- def launch_thread(self, daemon=1):
- """Launch a background thread to handle messages from the Tor process."""
- assert self._thread is None
- t = threading.Thread(target=self._loop)
- if daemon:
- t.setDaemon(daemon)
- t.start()
- self._thread = t
- t = threading.Thread(target=self._eventLoop)
- if daemon:
- t.setDaemon(daemon)
- t.start()
- self._eventThread = t
- return self._thread
-
- def close(self):
- """Shut down this controller connection"""
- self._sendLock.acquire()
- try:
- self._queue.put("CLOSE")
- self._eventQueue.put("CLOSE")
- self._s.close()
- self._s = None
- self._closed = 1
- finally:
- self._sendLock.release()
-
- def _loop(self):
- """Main subthread loop: Read commands from Tor, and handle them either
- as events or as responses to other commands.
- """
- while 1:
- ex = None
- try:
- lines = _read_reply(self._s,self._debugFile)
- except:
- self._err(sys.exc_info())
- return
-
- assert lines
- if lines[0][0][0] == "6":
- if self._handler is not None:
- self._eventQueue.put(lines)
- else:
- cb = self._queue.get()
- cb(lines)
-
-
- def _err(self, (tp, ex, tb), fromEventLoop=0):
- """DOCDOC"""
- if self._s:
- try:
- self.close()
- except:
- pass
- self._sendLock.acquire()
- try:
- self._closedEx = ex
- self._closed = 1
- finally:
- self._sendLock.release()
- while 1:
- try:
- cb = self._queue.get(timeout=0)
- if cb != "CLOSE":
- cb("EXCEPTION")
- except Queue.Empty:
- break
- if self._closeHandler is not None:
- self._closeHandler(ex)
- return
+ def _read_reply(self):
+ lines = _read_reply(self._s, self._debugFile)
+ isEvent = (lines and lines[0][0][0] == '6')
+ return isEvent, lines
- def _eventLoop(self):
- while 1:
- lines = self._eventQueue.get()
- if lines == "CLOSE":
- return
- try:
- self._handler.handle1(lines)
- except:
- self._err(sys.exc_info(), 1)
- return
+ def _doSend(self, msg):
+ if self._debugFile:
+ self._debugFile.write(">>> %s" % msg)
+ self._s.write(msg)
def _sendAndRecv(self, msg="", expectedTypes=("250", "251")):
"""Helper: Send a command 'msg' to Tor, and wait for a command
@@ -233,54 +143,12 @@
return a list of (tp,body,extra) tuples. If it is an
error, raise ErrorReply. Otherwise, raise TorCtl.ProtocolError.
"""
- if self._thread is None:
- self.launch_thread(1)
- # This condition will get notified when we've got a result...
- condition = threading.Condition()
- # Here's where the result goes...
- result = []
-
- if self._closedEx is not None:
- raise self._closedEx
- elif self._closed:
- raise TorCtl.TorCtlClosed()
-
- def cb(lines,condition=condition,result=result):
- condition.acquire()
- try:
- result.append((lines))
- condition.notify()
- finally:
- condition.release()
-
if type(msg) == types.ListType:
msg = "".join(msg)
-
assert msg.endswith("\r\n")
- # Sends a message to Tor...
- self._sendLock.acquire()
- try:
- self._queue.put(cb)
- if self._debugFile:
- self._debugFile.write(">>> %s" % msg)
- self._s.write(msg)
- finally:
- self._sendLock.release()
-
- # Now wait till the answer is in...
- condition.acquire()
- try:
- while not result:
- condition.wait()
- finally:
- condition.release()
-
- # ...And handle the answer appropriately.
- assert len(result) == 1
- lines = result[0]
- if lines == "EXCEPTION":
- raise self._closedEx
+ lines = self._sendImpl(self._doSend, msg)
+ print lines
for tp, msg, _ in lines:
if tp[0] in '45':
raise TorCtl.ErrorReply("%s %s"%(tp, msg))