[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] Get locking right: since sqlite does database locking s...



Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.mit.edu:/tmp/cvs-serv7675/lib/mixminion

Modified Files:
	ThreadUtils.py test.py 
Log Message:
Get locking right: since sqlite does database locking so poorly, but other dbs have good transaction systems, make all database access happen in a separate thread when we are using sqlite, and allow for saner stuff with better DBs.

Index: ThreadUtils.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ThreadUtils.py,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -d -r1.4 -r1.5
--- ThreadUtils.py	27 Jul 2004 03:10:55 -0000	1.4
+++ ThreadUtils.py	12 Dec 2004 02:48:16 -0000	1.5
@@ -8,7 +8,7 @@
    """
 
 __all__ = [ 'MessageQueue', 'QueueEmpty', 'ClearableQueue', 'TimeoutQueue',
-            'RWLock' ]
+            'RWLock', 'ProcessingThread', 'BackgroundingDecorator' ]
 
 import threading
 import time
@@ -247,3 +247,68 @@
                 self.readOK.notifyAll()
         finally:
             self.rwOK.release()
+
+#----------------------------------------------------------------------
+# Processing threads
+#DOCDOC
+#XXXX008 add tests for these.
+
+class ProcessingThread(threading.Thread):
+    """Background thread to handle CPU-intensive functions.
+
+       Currently used to process packets in the background."""
+    # Fields:
+    #   mqueue: a ClearableQueue of callable objects.
+    class _Shutdown:
+        """Callable that raises itself when called.  Inserted into the
+           queue when it's time to shut down."""
+        def __call__(self):
+            raise self
+
+    def __init__(self):
+        """Create a new processing thread."""
+        threading.Thread.__init__(self, name="processing thread")
+        self.mqueue = ClearableQueue()
+        self.threadName = name
+
+    def shutdown(self):
+        LOG.info("Telling %s to shut down.", self.name)
+        self.mqueue.clear()
+        self.mqueue.put(ProcessingThread._Shutdown())
+
+    def addJob(self, job):
+        """Adds a job to the message queue.  A job is a callable object
+           to be invoked by the processing thread.  If the job raises
+           ProcessingThread._Shutdown, the processing thread stops running."""
+        self.mqueue.put(job)
+
+    def run(self):
+        try:
+            while 1:
+                job = self.mqueue.get()
+                job()
+        except ProcessingThread._Shutdown:
+            LOG.info("Shutting down %s",self.name)
+            return
+        except:
+            LOG.error_exc(sys.exc_info(),
+                          "Exception in %s; shutting down thread.",self.name)
+
+class BackgroundingDecorator:
+    """DOCDOC"""
+    class _AddJob:
+        def __init__(self, processingThread, fn):
+            self.thread = processingThread
+            self.fn = fn
+        def __call__(self, *args, **kwargs):
+            def callback(self=self, args=args, kwargs=kwargs):
+                self.fn(*args, **kwargs)
+            self.thread.addJob(callback)
+
+    def __init__(self, processingThread, obj):
+        self._thread = processingThread
+        self._baseObject = obj
+
+    def __getattr__(self, attr):
+        fn = getattr(self._baseObject,attr)
+        return self._AddJob(self._thread,fn)

Index: test.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/test.py,v
retrieving revision 1.209
retrieving revision 1.210
diff -u -d -r1.209 -r1.210
--- test.py	11 Dec 2004 18:00:41 -0000	1.209
+++ test.py	12 Dec 2004 02:48:16 -0000	1.210
@@ -7740,7 +7740,7 @@
         os.mkdir(d)
         loc = os.path.join(d, "db")
         t = previousMidnight(time.time())+3600
-        log = P.openPingLog(loc)
+        log = P.openPingLog(None,location=loc)
         log.startup(now=t)
         log.heartbeat(now=t+1)
         log.heartbeat(now=t+20)
@@ -7760,7 +7760,12 @@
         log.connected("Foobar",now=t+90)
         log.gotPing("\x00Z"*10, now=t+130)
         log.gotPing("BN"*10, now=t+150)
-        log.gotPing("BL"*10, now=t+160) #Never sent.
+        suspendLog()
+        try:
+            log.gotPing("BL"*10, now=t+160) #Never sent.
+        finally:
+            s = resumeLog()
+        self.assertEndsWith(s, "Received ping with no record of its hash\n")
         log.gotPing("''"*10, now=t+161.1)
         log.rotate(now=t+160)
         log.heartbeat(t+200)
@@ -7785,7 +7790,7 @@
         log.shutdown()
         #log.calculateDailyResults( ) #XXXX TEST
         log.close()
-        log = P.openPingLog(loc)
+        log = P.openPingLog(None,location=loc)
         t += 3600
         log.startup(now=t)
         log.calculateUptimes(t-3600, t+100, now=t+100)
@@ -7834,7 +7839,7 @@
     loader = unittest.TestLoader()
     tc = loader.loadTestsFromTestCase
 
-    if 1:
+    if 0:
         suite.addTest(tc(PingerTests))
         return suite
     testClasses = [MiscTests,