[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] Get locking right: since sqlite does database locking s...
Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.mit.edu:/tmp/cvs-serv7675/lib/mixminion
Modified Files:
ThreadUtils.py test.py
Log Message:
Get locking right: since sqlite does database locking so poorly, but other dbs have good transaction systems, make all database access happen in a separate thread when we are using sqlite, and allow for saner stuff with better DBs.
Index: ThreadUtils.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/ThreadUtils.py,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -d -r1.4 -r1.5
--- ThreadUtils.py 27 Jul 2004 03:10:55 -0000 1.4
+++ ThreadUtils.py 12 Dec 2004 02:48:16 -0000 1.5
@@ -8,7 +8,7 @@
"""
__all__ = [ 'MessageQueue', 'QueueEmpty', 'ClearableQueue', 'TimeoutQueue',
- 'RWLock' ]
+ 'RWLock', 'ProcessingThread', 'BackgroundingDecorator' ]
import threading
import time
@@ -247,3 +247,68 @@
self.readOK.notifyAll()
finally:
self.rwOK.release()
+
+#----------------------------------------------------------------------
+# Processing threads
+#DOCDOC
+#XXXX008 add tests for these.
+
+class ProcessingThread(threading.Thread):
+ """Background thread to handle CPU-intensive functions.
+
+ Currently used to process packets in the background."""
+ # Fields:
+ # mqueue: a ClearableQueue of callable objects.
+ class _Shutdown:
+ """Callable that raises itself when called. Inserted into the
+ queue when it's time to shut down."""
+ def __call__(self):
+ raise self
+
+ def __init__(self):
+ """Create a new processing thread."""
+ threading.Thread.__init__(self, name="processing thread")
+ self.mqueue = ClearableQueue()
+ self.threadName = name
+
+ def shutdown(self):
+ LOG.info("Telling %s to shut down.", self.name)
+ self.mqueue.clear()
+ self.mqueue.put(ProcessingThread._Shutdown())
+
+ def addJob(self, job):
+ """Adds a job to the message queue. A job is a callable object
+ to be invoked by the processing thread. If the job raises
+ ProcessingThread._Shutdown, the processing thread stops running."""
+ self.mqueue.put(job)
+
+ def run(self):
+ try:
+ while 1:
+ job = self.mqueue.get()
+ job()
+ except ProcessingThread._Shutdown:
+ LOG.info("Shutting down %s",self.name)
+ return
+ except:
+ LOG.error_exc(sys.exc_info(),
+ "Exception in %s; shutting down thread.",self.name)
+
+class BackgroundingDecorator:
+ """DOCDOC"""
+ class _AddJob:
+ def __init__(self, processingThread, fn):
+ self.thread = processingThread
+ self.fn = fn
+ def __call__(self, *args, **kwargs):
+ def callback(self=self, args=args, kwargs=kwargs):
+ self.fn(*args, **kwargs)
+ self.thread.addJob(callback)
+
+ def __init__(self, processingThread, obj):
+ self._thread = processingThread
+ self._baseObject = obj
+
+ def __getattr__(self, attr):
+ fn = getattr(self._baseObject,attr)
+ return self._AddJob(self._thread,fn)
Index: test.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/test.py,v
retrieving revision 1.209
retrieving revision 1.210
diff -u -d -r1.209 -r1.210
--- test.py 11 Dec 2004 18:00:41 -0000 1.209
+++ test.py 12 Dec 2004 02:48:16 -0000 1.210
@@ -7740,7 +7740,7 @@
os.mkdir(d)
loc = os.path.join(d, "db")
t = previousMidnight(time.time())+3600
- log = P.openPingLog(loc)
+ log = P.openPingLog(None,location=loc)
log.startup(now=t)
log.heartbeat(now=t+1)
log.heartbeat(now=t+20)
@@ -7760,7 +7760,12 @@
log.connected("Foobar",now=t+90)
log.gotPing("\x00Z"*10, now=t+130)
log.gotPing("BN"*10, now=t+150)
- log.gotPing("BL"*10, now=t+160) #Never sent.
+ suspendLog()
+ try:
+ log.gotPing("BL"*10, now=t+160) #Never sent.
+ finally:
+ s = resumeLog()
+ self.assertEndsWith(s, "Received ping with no record of its hash\n")
log.gotPing("''"*10, now=t+161.1)
log.rotate(now=t+160)
log.heartbeat(t+200)
@@ -7785,7 +7790,7 @@
log.shutdown()
#log.calculateDailyResults( ) #XXXX TEST
log.close()
- log = P.openPingLog(loc)
+ log = P.openPingLog(None,location=loc)
t += 3600
log.startup(now=t)
log.calculateUptimes(t-3600, t+100, now=t+100)
@@ -7834,7 +7839,7 @@
loader = unittest.TestLoader()
tc = loader.loadTestsFromTestCase
- if 1:
+ if 0:
suite.addTest(tc(PingerTests))
return suite
testClasses = [MiscTests,