[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[minion-cvs] HALFTESTED] Make the server slightly multithreaded to k...



Update of /home/minion/cvsroot/src/minion/lib/mixminion
In directory moria.mit.edu:/tmp/cvs-serv10668/lib/mixminion

Modified Files:
	BuildMessage.py Common.py Crypto.py benchmark.py test.py 
Log Message:
[HALFTESTED] Make the server slightly multithreaded to keep it from stalling.

Previously, we'd become unresponsive to the network while processing
packets, overwriting files, or sending messages via SMTP or Mixmaster.
Now we have a single separate thread to handle each of those, so we
will no longer see stalls of 30ms up to 5 minutes (!!).

The ideal is still for networking code to use the async server in
the main thread, whenever convenient and possible.

(NOTE: THIS CODE HAS WORKED FOR ME ON MY LAPTOP.  It is not tested or
documented enough to convince me that it is correct.  It has not been
tested on Linux. There are subtle issues about synchronizing access to
server queues that I need to hash out with myself before I trust this
code.  By all means feel free to play with it.... but if it breaks [as
Linus likes to say], you get to keep both pieces.)

BuildMessage:
- Make use of new Crypto.getTrueRNG.

Common:
- Import 'Queue' from the python distribution here.
- Make blocking securedelete work well with sigchild handler.
- Add 'blocking' option to waitForChildren.
- Remove unused, unusable signal stuff.

Crypto:
- Optimize getCommonPRNG for the common case
- Add a lock to TRNG
- Add an accessor function: getTrueRNG.

benchmark:
- Add note about limitations of hashlog benchmarks.

test, server.Queue<deleted>, server.ServerQueue<new>:
- Rename Queue.py to ServerQueue.py to avoid conflicting with Queue from
  the standard python distribution.
- Add locking -- possibly not completely right, and certainly not well
  documented, to StandardQueue.

HashLog:
- Add locks to hashlog.

ServerMain, Modules:
- The big threading patch, part 1.  Go from a single-thread model to one
  with 4 threads:
        * Main thread that runs the async server
	* Processing thread that handles all the crypto and packet magic
	* Cleaning thread that overwrites dead files
	* Delivery thread that handles all outgoing messages not bound
	  for the main server.  (Right now, that's everything but MMTP,
	  including SMTP and SMTP-via-Mixmaster).
  The code works for me, but needs some rethinking and cleanup in places.
- Catch HUP and TERM; take appropriate action on TERM.



Index: BuildMessage.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/BuildMessage.py,v
retrieving revision 1.33
retrieving revision 1.34
diff -u -d -r1.33 -r1.34
--- BuildMessage.py	5 Jan 2003 13:19:53 -0000	1.33
+++ BuildMessage.py	9 Jan 2003 06:28:58 -0000	1.34
@@ -388,7 +388,7 @@
     if paranoia:
         nHops = len(path1)
         if path2: nHops += len(path2)
-        secretRNG = Crypto.TrueRNG(SECRET_LEN*len(nHops))
+        secretRNG = Crypto.getTrueRNG()
     else:
         secretRNG = paddingPRNG
 

Index: Common.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Common.py,v
retrieving revision 1.49
retrieving revision 1.50
diff -u -d -r1.49 -r1.50
--- Common.py	8 Jan 2003 08:04:25 -0000	1.49
+++ Common.py	9 Jan 2003 06:28:58 -0000	1.50
@@ -8,10 +8,10 @@
 __all__ = [ 'IntervalSet', 'LOG', 'LogStream', 'MixError', 'MixFatalError',
             'MixProtocolError', 'ceilDiv', 'checkPrivateDir',
             'createPrivateDir', 'floorDiv', 'formatBase64', 'formatDate',
-            'formatFnameTime', 'formatTime', 'installSignalHandlers',
-            'isSMTPMailbox', 'onReset', 'onTerminate', 'openUnique',
-            'previousMidnight', 'readPossiblyGzippedFile', 'secureDelete',
-            'stringContains', 'waitForChildren' ]
+            'formatFnameTime', 'formatTime', 'installSIGCHLDHandler',
+            'isSMTPMailbox', 'openUnique', 'previousMidnight',
+            'readPossiblyGzippedFile', 'secureDelete', 'stringContains',
+            'waitForChildren' ]
 
 import base64
 import bisect
@@ -26,6 +26,9 @@
 import threading
 import time
 import traceback
+# Imported here so we can get it in mixminion.server without being shadowed
+# by the old Queue.py file.
+import Queue
 
 from types import StringType
 
@@ -271,15 +274,17 @@
             os.unlink(f)
         return None
 
-    if blocking:
-        mode = os.P_WAIT
-    else:
-        mode = os.P_NOWAIT
-
     # Some systems are unhappy when you call them with too many options.
     for i in xrange(0, len(fnames), 250-len(_SHRED_OPTS)):
         files = fnames[i:i+250-len(_SHRED_OPTS)]
-        os.spawnl(mode, _SHRED_CMD, _SHRED_CMD, *(_SHRED_OPTS+files))
+        pid = os.spawnl(os.P_NOWAIT,
+                        _SHRED_CMD, _SHRED_CMD, *(_SHRED_OPTS+files))
+        if blocking:
+            try:
+                os.waitpid(pid, 0)
+            except OSError:
+                # sigchild handler might get to the pid first.
+                pass
 
 #----------------------------------------------------------------------
 # Logging
@@ -749,28 +754,16 @@
 #----------------------------------------------------------------------
 # Signal handling
 
-# List of 0-argument functions to call on SIGHUP
-resetHooks = []
-
-# List of 0-argument functions to call on SIGTERM
-terminateHooks = []
-
-def onReset(fn):
-    """Given a 0-argument function fn, cause fn to be invoked when
-       this process next receives a SIGHUP."""
-    resetHooks.append(fn)
-
-def onTerminate(fn):
-    """Given a 0-argument function fn, cause fn to be invoked when
-       this process next receives a SIGTERM."""
-    terminateHooks.append(fn)
-
-def waitForChildren(onceOnly=0):
+def waitForChildren(onceOnly=0, blocking=1):
     """Wait until all subprocesses have finished.  Useful for testing."""
+    if blocking:
+        options = 0
+    else:
+        options = os.WNOHANG
     while 1:
         try:
             # FFFF This won't work on Windows.  What to do?
-            pid, status = os.waitpid(0, 0)
+            pid, status = os.waitpid(0, options)
         except OSError, e:
             break
         except e:
@@ -796,27 +789,9 @@
     #outcome, core, sig = status & 0xff00, status & 0x0080, status & 0x7f
     # FFFF Log if outcome wasn't as expected.
 
-def _sigHandler(signal_num, _):
-    '''(Signal handler for SIGTERM and SIGHUP)'''
-    signal.signal(signal_num, _sigHandler)
-    if signal_num == signal.SIGTERM:
-        for hook in terminateHooks:
-            hook()
-        sys.exit(1)
-    else:
-        for hook in resetHooks:
-            hook()
-
-def installSignalHandlers(child=1,hup=1,term=1):
-    '''Register signal handlers for this process.  If 'child', registers
-       a handler for SIGCHLD.  If 'hup', registers a handler for SIGHUP.
-       If 'term', registes a handler for SIGTERM.'''
-    if child:
-        signal.signal(signal.SIGCHLD, _sigChldHandler)
-    if hup:
-        signal.signal(signal.SIGHUP, _sigHandler)
-    if term:
-        signal.signal(signal.SIGTERM, _sigHandler)
+def installSIGCHLDHandler():
+    '''Register sigchld handler for this process.'''
+    signal.signal(signal.SIGCHLD, _sigChldHandler)
 
 #----------------------------------------------------------------------
 # File helpers.

Index: Crypto.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/Crypto.py,v
retrieving revision 1.36
retrieving revision 1.37
diff -u -d -r1.36 -r1.37
--- Crypto.py	8 Jan 2003 08:04:27 -0000	1.36
+++ Crypto.py	9 Jan 2003 06:28:58 -0000	1.37
@@ -479,6 +479,7 @@
            bytes at a time."""
         self.bytes = ""
         self.chunksize = chunksize
+        
     def getBytes(self, n):
         """Returns a string of 'n' random bytes."""
 
@@ -593,9 +594,11 @@
     '''Returns a general-use AESCounterPRNG, initializing it if necessary.'''
     # We create one PRNG per thread.
     thisThread = threading.currentThread()
-    if not hasattr(thisThread, "minion_shared_PRNG"):
+    try:
+        return thisThread.minion_shared_PRNG
+    except AttributeError:
         thisThread.minion_shared_PRNG = AESCounterPRNG()
-    return thisThread.minion_shared_PRNG
+        return thisThread.minion_shared_PRNG
 
 #----------------------------------------------------------------------
 # TRNG implementation
@@ -667,6 +670,8 @@
         """Creates a TrueRNG to retrieve data from our underlying RNG 'n'
            bytes at a time"""
         RNG.__init__(self,n)
+        self.__lock = threading.Lock()
+        
     def _prng(self,n):
         "Returns n fresh bytes from our true RNG."
         if _TRNG_FILENAME is None:
@@ -677,6 +682,19 @@
         f.close()
         return d
 
+    def getBytes(self, n):
+        # We need to synchronize this method, since a single TRNG instance
+        # is shared by all threads.
+        self.__lock.acquire()
+        b = RNG.getBytes(self, n)
+        self.__lock.release()
+        return b
+        
+
 # Global _TrueRNG instance, for use by trng().
 _theTrueRNG = _TrueRNG(1024)
 
+# Return the shared instance of the true RNG.
+def getTrueRNG():
+    """Return the shared instance of the true RNG."""
+    return _theTrueRNG

Index: benchmark.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/benchmark.py,v
retrieving revision 1.26
retrieving revision 1.27
diff -u -d -r1.26 -r1.27
--- benchmark.py	5 Jan 2003 13:19:53 -0000	1.26
+++ benchmark.py	9 Jan 2003 06:28:58 -0000	1.27
@@ -21,7 +21,7 @@
 import mixminion._minionlib as _ml
 from mixminion.BuildMessage import _buildHeader, buildForwardMessage, \
      compressData, uncompressData
-from mixminion.Common import secureDelete, installSignalHandlers, \
+from mixminion.Common import secureDelete, installSIGCHLDHandler, \
      waitForChildren, formatBase64
 from mixminion.Crypto import *
 from mixminion.Crypto import OAEP_PARAMETER
@@ -320,6 +320,7 @@
     h = HashLog(fname, "A")
     hashes = [ prng.getBytes(20) for _ in xrange(load) ]
 
+    # XXXX Check under different circumstances -- different sync patterns.
     t = time()
     for n in xrange(len(hashes)):
         h.logHash(hashes[n])
@@ -580,9 +581,8 @@
 
 def fileOpsTiming():
     print "#================= File ops ====================="
-    installSignalHandlers(child=1,hup=0,term=0)
+    installSIGCHLDHandler()
     dname = mix_mktemp(".d")
-
 
     os.mkdir(dname)
     for i in xrange(200):

Index: test.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/test.py,v
retrieving revision 1.69
retrieving revision 1.70
diff -u -d -r1.69 -r1.70
--- test.py	7 Jan 2003 01:41:20 -0000	1.69
+++ test.py	9 Jan 2003 06:28:58 -0000	1.70
@@ -61,7 +61,7 @@
 from mixminion.server.HashLog import HashLog
 from mixminion.server.Modules import *
 from mixminion.server.PacketHandler import *
-from mixminion.server.Queue import *
+from mixminion.server.ServerQueue import *
 from mixminion.server.ServerKeys import generateServerDescriptorAndKeys
 
 # Set this flag to 1 in order to have all RSA keys and diffie-hellman params
@@ -2121,7 +2121,7 @@
 
 class QueueTests(unittest.TestCase):
     def setUp(self):
-        mixminion.Common.installSignalHandlers(child=1,hup=0,term=0)
+        mixminion.Common.installSIGCHLDHandler()
         self.d1 = mix_mktemp("q1")
         self.d2 = mix_mktemp("q2")
         self.d3 = mix_mktemp("q3")
@@ -4210,12 +4210,12 @@
         # Test pool configuration
         pool = MixPool(configTimed, mixDir)
         self.assert_(isinstance(pool.queue,
-                                mixminion.server.Queue.TimedMixQueue))
+                                TimedMixQueue))
         self.assertEquals(pool.getNextMixTime(100), 100+2*60*60)
 
         pool = MixPool(configCottrell, mixDir)
         self.assert_(isinstance(pool.queue,
-                                mixminion.server.Queue.CottrellMixQueue))
+                                CottrellMixQueue))
         self.assertEquals(pool.getNextMixTime(100), 100+12*60*60)
         self.assertEquals(pool.queue.minPool, 10)
         self.assertEquals(pool.queue.minSend, 1)
@@ -4223,7 +4223,7 @@
 
         pool = MixPool(configBCottrell, mixDir)
         self.assert_(isinstance(pool.queue,
-                             mixminion.server.Queue.BinomialCottrellMixQueue))
+                                BinomialCottrellMixQueue))
         self.assertEquals(pool.getNextMixTime(100), 100+6*60*60)
         self.assertEquals(pool.queue.minPool, 10)
         self.assertEquals(pool.queue.minSend, 1)