[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[minion-cvs] Make batching algorithm configurable



Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv3408/lib/mixminion/server

Modified Files:
	ServerConfig.py ServerMain.py 
Log Message:
Make batching algorithm configurable

Index: ServerConfig.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerConfig.py,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -d -r1.3 -r1.4
--- ServerConfig.py	16 Dec 2002 02:40:11 -0000	1.3
+++ ServerConfig.py	20 Dec 2002 23:52:07 -0000	1.4
@@ -32,6 +32,9 @@
         mixminion.Config._ConfigFile.__init__(self, fname, string)
 
     def validate(self, sections, entries, lines, contents):
+        def _haveEntry(entries, section, ent):
+            return len([e for e in entries[section] if e[0] == ent]) != 0
+
         # Pre-emptively configure the log before validation, so we don't
         # write to the terminal if we've been asked not to.
         if not sections['Server'].get("EchoMessages", 0):
@@ -42,6 +45,7 @@
         mixminion.Config._validateHostSection(sections.get('Host', {}))
         # Server section
         server = sections['Server']
+        serverents = entries['Server']
         bits = server['IdentityKeyBits']
         if not (2048 <= bits <= 4096):
             raise ConfigError("IdentityKeyBits must be between 2048 and 4096")
@@ -53,9 +57,24 @@
             raise ConfigError("PublicKeyLifetime must be at least 1 day.")
         if server['PublicKeySloppiness'][2] > 20*60:
             raise ConfigError("PublicKeySloppiness must be <= 20 minutes.")
-        if [e for e in entries['Server'] if e[0]=='Mode']:
+        if _haveEntry(entries, 'Server', 'Mode'):
             LOG.warn("Mode specification is not yet supported.")
 
+        if server['MixInterval'][2] < 30*60:
+            LOG.warn("Dangerously low MixInterval")
+        if server['MixAlgorithm'] == 'TimedMixQueue':
+            if _haveEntry(entries, 'Server', 'MixPoolRate'):
+                LOG.warn("Option MixPoolRate is not used for Timed mixing.")
+            if _haveEntry(entries, 'Server', 'MixPoolMinSize'):
+                LOG.warn("Option MixPoolMinSize is not used for Timed mixing.")
+        else:
+            rate = server['MixPoolRate']
+            minSize = server['MixPoolMinSize']
+            if rate < 0.05:
+                LOG.warn("Unusually low MixPoolRate %s", rate)
+            if minSize < 0:
+                raise ConfigError("MixPoolMinSize %s must be nonnegative.")
+
         if not sections['Incoming/MMTP'].get('Enabled'):
             LOG.warn("Disabling incoming MMTP is not yet supported.")
         if [e for e in entries['Incoming/MMTP'] if e[0] in ('Allow', 'Deny')]:
@@ -83,6 +102,42 @@
         "Return the module manager initialized by this server."
         return self.moduleManager
 
+#======================================================================
+
+_MIX_RULE_NAMES = {
+    'timed' : "TimedMixQueue",
+    'cottrell'     : "CottrellMixQueue",
+    'mixmaster'    : "CottrellMixQueue",
+    'dynamicpool'  : "CottrellMixQueue",
+    'binomial'            : "BinomialCottrellMixQueue",
+    'binomialcottrell'    : "BinomialCottrellMixQueue",
+    'binomialdynamicpool' : "BinomialCottrellMixQueue",
+}
+
+def _parseMixRule(s):
+    """Validation function.  Given a string representation of a mixing
+       algorithm, return the name of the Mix queue class to be used."""
+    name = s.strip().lower()
+    v = _MIX_RULE_NAMES.get(name)
+    if not v:
+        raise ConfigError("Unrecognized mix algorithm %s"%s)
+    return v
+
+def _parseFraction(frac):
+    """Validation function.  Converts a percentage or a number into a 
+       number between 0 and 1."""
+    s = frac.strip().lower()
+    try:
+        if s.endswith("%"):
+            ratio = float(s[:-1].strip())/100.0
+        else:
+            ratio = float(s)
+    except ValueError:
+        raise ConfigError("%s is not a fraction" %frac)
+    if not 0 <= ratio <= 1:
+        raise ConfigError("%s is not in range (between 0%% and 100%%)"%frac)
+    return ratio
+
 # alias to make the syntax more terse.
 C = mixminion.Config
 
@@ -107,6 +162,10 @@
                      'Comments': ('ALLOW', None, None),
                      'ModulePath': ('ALLOW', None, None),
                      'Module': ('ALLOW*', None, None),
+                     'MixAlgorithm' : ('ALLOW', _parseMixRule, "Cottrell"),
+                     'MixInterval' : ('ALLOW', C._parseInterval, "30 min"),
+                     'MixPoolRate' : ('ALLOW', _parseFraction, "60%"),
+                     'MixPoolMinSize' : ('ALLOW', C._parseInt, "5"),
                      },
         'DirectoryServers' : { 'ServerURL' : ('ALLOW*', None, None),
                                'Publish' : ('ALLOW', C._parseBoolean, "no"),

Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -d -r1.7 -r1.8
--- ServerMain.py	16 Dec 2002 04:01:14 -0000	1.7
+++ ServerMain.py	20 Dec 2002 23:52:07 -0000	1.8
@@ -77,9 +77,28 @@
 class MixPool:
     """Wraps a mixminion.server.Queue.*MixQueue to send messages to an exit
        queue and a delivery queue."""
-    def __init__(self, queue):
-        """Create a new MixPool to wrap a given *MixQueue."""
-        self.queue = queue
+    def __init__(self, config, queueDir):
+        """Create a new MixPool, based on this server's configuration and
+           queue location."""
+
+        server = config['Server']
+        interval = server['MixInterval'][2]
+        if server['MixAlgorithm'] == 'TimedMixQueue':
+            self.queue = mixminion.server.Queue.TimedMixQueue(
+                location=queueDir, interval=interval)
+        elif server['MixAlgorithm'] == 'CottrellMixQueue':
+            self.queue = mixminion.server.Queue.CottrellMixQueue(
+                location=queueDir, interval=interval, 
+                minPool=server.get("MixPoolMinSize", 5),
+                sendRate=server.get("MixPoolRate", 0.6))
+        elif server['MixAlgorithm'] == 'BinomialCottrellMixQueue':
+            self.queue = mixminion.server.Queue.BinomialCottrellMixQueue(
+                location=queueDir, interval=interval, 
+                minPool=server.get("MixPoolMinSize", 5),
+                sendRate=server.get("MixPoolRate", 0.6))
+        else:
+            raise MixFatalError("Got impossible mix queue type from config")
+
         self.outgoingQueue = None
         self.moduleManager = None
 
@@ -118,6 +137,11 @@
                 self.outgoingQueue.queueDeliveryMessage(ipv4, msg)
             self.queue.removeMessage(h)
 
+    def getNextMixTime(self, now):
+        """Given the current time, return the time at which we should next
+           mix."""
+        return now + self.queue.getInterval()
+
 class OutgoingQueue(mixminion.server.Queue.DeliveryQueue):
     """DeliveryQueue to send messages via outgoing MMTP connections."""
     def __init__(self, location):
@@ -235,7 +259,7 @@
         mixDir = os.path.join(queueDir, "mix")
         # FFFF The choice of mix algorithm should be configurable
         LOG.trace("Initializing Mix pool")
-        self.mixPool =MixPool(mixminion.server.Queue.TimedMixQueue(mixDir, 60))
+        self.mixPool = MixPool(config, mixDir)
         LOG.trace("Found %d pending messages in Mix pool",
                        self.mixPool.count())
 
@@ -266,8 +290,7 @@
         self.cleanQueues()
 
         now = time.time()
-        MIX_INTERVAL = 600  # FFFF Configurable!
-        nextMix = now + MIX_INTERVAL
+        nextMix = self.mixPool.getNextMixTime(now)
         nextShred = now + 6000
         #FFFF Unused
         #nextRotate = self.keyring.getNextKeyRotation()
@@ -297,7 +320,7 @@
 
             # Choose next mix interval
             now = time.time()
-            nextMix = now + MIX_INTERVAL
+            nextMix = self.mixPool.getNextMixTime(now)
 
             if now > nextShred:
                 # FFFF Configurable shred interval