[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[or-cvs] Do not block communications loop while processing events, s...



Update of /home/or/cvsroot/control/python
In directory moria:/tmp/cvs-serv8280

Modified Files:
	TorCtl1.py 
Log Message:
Do not block communications loop while processing events, since they may want to call functions that wait for a reply.

Index: TorCtl1.py
===================================================================
RCS file: /home/or/cvsroot/control/python/TorCtl1.py,v
retrieving revision 1.20
retrieving revision 1.21
diff -u -d -r1.20 -r1.21
--- TorCtl1.py	18 Nov 2005 01:20:53 -0000	1.20
+++ TorCtl1.py	18 Nov 2005 20:12:42 -0000	1.21
@@ -124,6 +124,8 @@
         self._closedEx = None
         self._closed = 0
         self._closeHandler = None
+        self._eventThread = None
+        self._eventQueue = Queue.Queue()
 
     def debug(self, f):
         """DOCDOC"""
@@ -150,14 +152,21 @@
             t.setDaemon(daemon)
         t.start()
         self._thread = t
-        return t
+        t = threading.Thread(target=self._eventLoop)
+        if daemon:
+            t.setDaemon(daemon)
+        t.start()
+        self._eventThread = t
+        return self._thread
 
     def close(self):
         """Shut down this controller connection"""
         self._sendLock.acquire()
         try:
             self._queue.put("CLOSE")
+            self._eventQueue.put("CLOSE")
             self._s.close()
+            self._s = None
             self._closed = 1
         finally:
             self._sendLock.release()
@@ -171,34 +180,53 @@
             try:
                 lines = _read_reply(self._s,self._debugFile)
             except:
-                tp, ex, tb = sys.exc_info()
-
-            if ex is not None:
-                self._sendLock.acquire()
-                try:
-                    self._closedEx = ex
-                    self._closed = 1
-                finally:
-                    self._sendLock.release()
-                while 1:
-                    try:
-                        cb = self._queue.get(timeout=0)
-                        if cb != "CLOSE":
-                            cb("EXCEPTION")
-                    except Queue.Empty:
-                        break
-                if self._closeHandler is not None:
-                    self._closeHandler(ex)
+                self._err(sys.exc_info())
                 return
 
             assert lines
             if lines[0][0][0] == "6":
                 if self._handler is not None:
-                    self._handler.handle1(lines)
+                    self._eventQueue.put(lines)
             else:
                 cb = self._queue.get()
                 cb(lines)
 
+
+    def _err(self, (tp, ex, tb), fromEventLoop=0):
+        """DOCDOC"""
+        if self._s:
+            try:
+                self.close()
+            except:
+                pass
+        self._sendLock.acquire()
+        try:
+            self._closedEx = ex
+            self._closed = 1
+        finally:
+            self._sendLock.release()
+        while 1:
+            try:
+                cb = self._queue.get(timeout=0)
+                if cb != "CLOSE":
+                    cb("EXCEPTION")
+            except Queue.Empty:
+                break
+        if self._closeHandler is not None:
+            self._closeHandler(ex)
+        return
+
+    def _eventLoop(self):
+        while 1:
+            lines = self._eventQueue.get()
+            if lines == "CLOSE":
+                return
+            try:
+                self._handler.handle1(lines)
+            except:
+                self._err(sys.exc_info(), 1)
+                return
+
     def _sendAndRecv(self, msg="", expectedTypes=("250", "251")):
         """Helper: Send a command 'msg' to Tor, and wait for a command
            in response.  If the response type is in expectedTypes,