[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[minion-cvs] Check in some long-pending tweaks from my laptop. Impl...



Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv18251/lib/mixminion/server

Modified Files:
	DNSFarm.py MMTPServer.py ServerConfig.py ServerKeys.py 
	ServerMain.py 
Log Message:
Check in some long-pending tweaks from my laptop.  Implements DNS, refactors.

NetUtils (New File):
- Added new module to hold general-purpose networking code.  Currently
  includes gethostname/getaddrinfo wrappers; timeout helper code; and
  IPv6-support testing.

BuildMessage:
- Finally remove the old build*Message calls and switch to the build*Packet
  messages.  The difference is that the old ones combined payload encoding
  with packet building, while the new ones take a complete (~28K) payload
  generated by encodeMessage.

ClientDirectory:
- Debug lots of new code.
- Turn timeout from global to argument
- Use new NetUtils timeout logic.

ClientDirectory,ClientMain:
- Make DROP packets work again.

Common, MMTPClient:
- Move TimeoutError into Common

Common:
- Add function to detect strings that look like hostnames
- Debug TimeoutQueue

Config:
- Add _parseHost and _parseIP6

MMTPClient:
- Handle DNS lookup for MMTPHostInfo code and (possible) IPv6 result.
- Use new NetUtils timeout code

Packet:
- Don't allow MMTPHostInfo objects when hostname is not a possible hostname
- Force hostnames to lower case

ServerInfo, ServerKeys:
- Rename Packet-Formats to Packet-Versions
- Deprecate IP, add Hostname.
- Deprecate Key-Digest.
- Add getIP, getHostname
- Make getKeyDigest compute digest of identity key; we don't need Key-Digest
  any more.
- Refactor protocol matchup code
- Add canStartAt function to tell whether we can connect directly to a given
  server.

test:
- Add test for new packet types and config types
- Change code to use new BuildMessage interfaces

DNSFarm:
- Handle panding requests correctly.
- Move getIP functions to NetUtils.

MMTPServer,ServerMain:
- Implement DNS farm hookup for MMTPHostInfo routing.

ServerConfig:
- Add hostnames to configuration.






Index: DNSFarm.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/DNSFarm.py,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -d -r1.1 -r1.2
--- DNSFarm.py	13 Oct 2003 17:32:25 -0000	1.1
+++ DNSFarm.py	19 Oct 2003 03:12:02 -0000	1.2
@@ -8,6 +8,7 @@
 import time
 import sys
 from mixminion.Common import LOG, TimeoutQueue, QueueEmpty
+from mixminion.NetUtils import getIP
 
 __all__ = [ 'DNSCache' ]
 
@@ -15,16 +16,16 @@
     def __cmp__(self,o):
         return cmp(type(self), type(o))
 PENDING = _Pending
-NOENT = -1
 
 MIN_THREADS = 2
 MIN_FREE_THREADS = 1
 MAX_THREADS = 8
 MAX_THREAD_IDLE = 5*60
 MAX_ENTRY_TTL = 15*60
-PREFER_INET4 = 1
+
 
 class DNSCache:
+    """DOCDOC"""
     def __init__(self):
         self.cache = {} # name -> getIP return / PENDING
         self.callbacks = {}
@@ -44,12 +45,14 @@
         try:
             self.lock.acquire()
             v = self.cache.get(name)
-            if v is None:
+            if v is None or v is PENDING:
                 self.callbacks.setdefault(name, []).append(cb)
+            #XXXX006 We should check for literal addresses before we queue
+            if v is None:
                 self._beginLookup(name)
         finally:
             self.lock.release()
-        if v is not None:
+        if v is not None and v is not PENDING:
             cb(name,v)
     def shutdown(self, wait=0):
         try:
@@ -142,43 +145,4 @@
         finally:
             self.dnscache.adjLiveThreads(-1)
 
-if hasattr(socket, 'getaddrinfo'):
-    def getIP(name):
-        try:
-            r = socket.getaddrinfo(name, None)
-            inet4 = [ addr[4][0] for addr in r if addr[0] == socket.AF_INET ]
-            inet6 = [ addr[4][0] for addr in r if addr[0] == socket.AF_INET6 ]
-            if not (inet4 or inet6):
-                LOG.error("getaddrinfo returned no inet addresses!")
-                return (NOENT, "No inet addresses returned", time.time())
-            best4=best6=None
-            now=time.time()
-            if inet4: best4=(socket.AF_INET,inet4[0],now)
-            if inet6: best6=(socket.AF_INET,inet6[0],now)
-            if PREFER_INET4:
-                res = best4 or best6
-            else:
-                res = best6 or best4
-            protoname = (res[0] == socket.AF_INET) and "inet" or "inet6"
-            LOG.trace("Result for getaddrinfo(%r): %s:%s (%d others dropped)",
-                      name,protoname,res[1],len(r)-1)
-            return res
-        except socket.error, e:
-            LOG.trace("Result for getaddrinfo(%r): error:%r",name,e)
-            if len(e.args) == 2:
-                return (NOENT, str(e[1]), time.time())
-            else:
-                return (NOENT, str(e), time.time())            
-else:
-    def getIP(name):
-        '''return family/NOENT, address/error, time'''
-        try:
-            r = socket.gethostbyname(name)
-            LOG.trace("Result for gethostbyname(%r): inet:%r",name,r)
-            return (socket.AF_INET, r, time.time())
-        except socket.error, e:
-            LOG.trace("Result for gethostbyname(%r): error:%r",name,e)
-            if len(e.args) == 2:
-                return (NOENT, str(e[1]), time.time())
-            else:
-                return (NOENT, str(e), time.time())
+

Index: MMTPServer.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/MMTPServer.py,v
retrieving revision 1.52
retrieving revision 1.53
diff -u -d -r1.52 -r1.53
--- MMTPServer.py	1 Oct 2003 01:34:50 -0000	1.52
+++ MMTPServer.py	19 Oct 2003 03:12:02 -0000	1.53
@@ -28,10 +28,11 @@
 
 import mixminion._minionlib as _ml
 from mixminion.Common import MixError, MixFatalError, MixProtocolError, \
-     LOG, stringContains
+     LOG, stringContains, MessageQueue, QueueEmpty
 from mixminion.Crypto import sha1, getCommonPRNG
-from mixminion.Packet import MESSAGE_LEN, DIGEST_LEN
+from mixminion.Packet import MESSAGE_LEN, DIGEST_LEN, IPV4Info, MMTPHostInfo
 from mixminion.MMTPClient import PeerCertificateCache
+from mixminion.NetUtils import IN_PROGRESS_ERRNOS
 import mixminion.server.EventStats as EventStats
 from mixminion.Filestore import CorruptedFile
 
@@ -44,13 +45,6 @@
 warn = LOG.warn
 error = LOG.error
 
-# For windows -- list of errno values that we can expect when blocking IO
-# blocks on a connect.
-IN_PROGRESS_ERRNOS = [ getattr(errno, ename) 
-   for ename in [ "EINPROGRESS", "WSAEWOULDBLOCK"]
-   if hasattr(errno,ename) ]
-del ename
-
 class AsyncServer:
     """AsyncServer is the core of a general-purpose asynchronous
        select-based server loop.  AsyncServer maintains two lists of
@@ -196,11 +190,11 @@
     # connectionFactory: a function that takes as input a socket from a
     #    newly received connection, and returns a Connection object to
     #    register with the async server.
-    def __init__(self, ip, port, backlog, connectionFactory):
+    def __init__(self, family, ip, port, backlog, connectionFactory):
         """Create a new ListenConnection"""
         self.ip = ip
         self.port = port
-        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        self.sock = socket.socket(family, socket.SOCK_STREAM)
         self.sock.setblocking(0)
         self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
         try:
@@ -1073,22 +1067,43 @@
         # FFFF Don't always listen; don't always retransmit!
         # FFFF Support listening on multiple IPs
 
-        IP = config['Incoming/MMTP'].get('ListenIP')
-        if IP is None:
-            IP = config['Incoming/MMTP']['IP']
-
+        ip4_supported, ip6_supported = mixminion.NetUtils.getProtocolSupport()
+        IP, IP6 = None, None
+        if ip4_supported:
+            IP = config['Incoming/MMTP'].get('ListenIP')
+            if IP is None:
+                IP = config['Incoming/MMTP'].get('IP')
+            if IP is None:
+                IP = "0.0.0.0"
+        if ip6_supported:
+            IP6 = config['Incoming/MMTP'].get('ListenIP6')
+            if IP6 is None:
+                IP6 = "::"
+            
         port =  config['Incoming/MMTP'].get('ListenPort')
         if port is None:
             port = config['Incoming/MMTP']['Port']
 
-        self.listener = ListenConnection(IP, port,
-                                         LISTEN_BACKLOG,
-                                         self._newMMTPConnection)
+        self.listeners = [] #DOCDOC
+        for (supported, addr, family) in [(ip4_supported,IP,socket.AF_INET),
+                                          (ip6_supported,IP6,socket.AF_INET6)]:
+            if not supported or not addr:
+                continue
+            listener = ListenConnection(family, addr, port,
+                                        LISTEN_BACKLOG,
+                                        self._newMMTPConnection)
+            self.listeners.append(listener)
+            listener.register(self)
+        
         #self.config = config
-        self.listener.register(self)
         self._timeout = config['Server']['Timeout'].getSeconds()
         self.clientConByAddr = {}
         self.certificateCache = PeerCertificateCache()
+        self.dnsCache = None #DOCDOC
+        self.msgQueue = MessageQueue() #DOCDOC
+
+    def connectDNSCache(self, dnsCache):
+        self.dnsCache = dnsCache
 
     def setContext(self, context):
         """Change the TLS context used for newly received connections.
@@ -1113,15 +1128,50 @@
         return con
 
     def stopListening(self):
-        self.listener.shutdown()
+        for listener in self.listeners:
+            listener.shutdown()
 
-    def sendMessages(self, ip, port, keyID, deliverable):
+    def sendMessagesByRouting(self, routing, deliverable):
+        """DOCDOC"""
+        if isinstance(routing, IPV4Info):
+            self.sendMessages(socket.AF_INET, routing.ip, routing.port,
+                              routing.keyinfo, deliverable)
+        else:
+            assert isinstance(routing, MMTPHostInfo)
+            def lookupDone(name, (family, addr, when),
+                          self=self, routing=routing, deliverable=deliverable):
+                if addr == "NOENT":
+                    for m in deliverable:
+                        try:
+                            m.failed(1)
+                        except AttributeError:
+                            pass
+                else:
+                    self.queueSendableMessages(family, addr,
+                                         routing.port, routing.keyinfo,
+                                         deliverable)
+
+            self.dnsCache.lookup(routing.hostname, lookupDone)
+
+    def queueSendableMessages(self, family, addr, port, keyID, deliverable):
+        """DOCDOC"""
+        self.msgQueue.put((family,addr,port,keyID,deliverable))
+
+    def sendQueuedMessages(self):
+        """DOCDOC"""
+        while 1:
+            try:
+                family,addr,port,keyID,deliverable=self.msgQueue.get(1)
+            except QueueEmpty:
+                return
+            self.sendMessages(family,addr,port,keyID,deliverable)
+
+    def sendMessages(self, family, ip, port, keyID, deliverable):
         """Begin sending a set of messages to a given server.
 
            deliverable is a list of objects obeying the DeliverableMessage
            interface.
         """
-
         try:
             # Is there an existing connection open to the right server?
             con = self.clientConByAddr[(ip,port,keyID)]
@@ -1171,3 +1221,8 @@
     def onMessageReceived(self, msg):
         """Abstract function.  Called when we get a message"""
         pass
+
+    def process(self, timeout):
+        """DOCDOC overrides"""
+        self.sendQueuedMessages()
+        AsyncServer.process(self, timeout)

Index: ServerConfig.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerConfig.py,v
retrieving revision 1.37
retrieving revision 1.38
diff -u -d -r1.37 -r1.38
--- ServerConfig.py	31 Aug 2003 19:29:29 -0000	1.37
+++ ServerConfig.py	19 Oct 2003 03:12:02 -0000	1.38
@@ -299,10 +299,13 @@
                                             "10 minutes",) },
         # FFFF Generic multi-port listen/publish options.
         'Incoming/MMTP' : { 'Enabled' : ('REQUIRE', C._parseBoolean, "no"),
+                            #XXXX007 deprecate or remove IP.
                             'IP' : ('ALLOW', C._parseIP, "0.0.0.0"),
+                          'Hostname' : ('ALLOW', C._parseHost, None),
                           'Port' : ('ALLOW', C._parseInt, "48099"),
                           'ListenIP' : ('ALLOW', C._parseIP, None),
                           'ListenPort' : ('ALLOW', C._parseInt, None),
+                          'ListenIP6' : ('ALLOW', C._parseIP6, None),
   		          'Allow' : ('ALLOW*', C._parseAddressSet_allow, None),
                           'Deny' : ('ALLOW*', C._parseAddressSet_deny, None)
 			 },

Index: ServerKeys.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerKeys.py,v
retrieving revision 1.50
retrieving revision 1.51
diff -u -d -r1.50 -r1.51
--- ServerKeys.py	31 Aug 2003 19:29:29 -0000	1.50
+++ ServerKeys.py	19 Oct 2003 03:12:02 -0000	1.51
@@ -23,6 +23,7 @@
 import mixminion._minionlib
 import mixminion.Crypto
 import mixminion.Packet
+import mixminion.server.DNSFarm
 import mixminion.server.HashLog
 import mixminion.server.MMTPServer
 import mixminion.server.ServerMain
@@ -519,7 +520,7 @@
         desc = keys.getServerDescriptor()
         return (desc['Incoming/MMTP']['IP'],
                 desc['Incoming/MMTP']['Port'],
-                desc['Incoming/MMTP']['Key-Digest'])
+                desc.getKeyDigest())
 
     def lock(self, blocking=1):
         return self._lock.acquire(blocking)
@@ -794,7 +795,7 @@
         warn("Mismatched ports: %s configured; %s published.",
              config_im['Port'], info_im['Port'])
 
-    info_ip = info['Incoming/MMTP']['IP']
+    info_ip = info_im.get('IP',None)
     if config_im['IP'] == '0.0.0.0':
         guessed = _guessLocalIP()
         if guessed != info_ip:
@@ -804,6 +805,17 @@
         warn("Mismatched IPs: %s configured; %s published.",
              config_im['IP'], info_ip)
 
+    info_host = info_im.get('Hostname',None)
+    config_host = config_im['Hostname']
+    if config_host is None:
+        guessed = socket.getfqdn()
+        if guessed != info_host:
+            warn("Mismatched hostnames: %s guessed; %s published",
+                 guessed, info_host)
+    elif config_host != info_host:
+        warn("Mismatched hostnames: %s configured, %s published",
+             config_host, info_host)
+
     if config_im['Enabled'] and not info_im.get('Version'):
         warn("Incoming MMTP enabled but not published.")
     elif not config_im['Enabled'] and info_im.get('Version'):
@@ -915,12 +927,14 @@
     mmtpProtocolsIn = ",".join(mmtpProtocolsIn)
     mmtpProtocolsOut = ",".join(mmtpProtocolsOut)
 
+    #XXXX007 remove
     identityKeyID = formatBase64(
                       mixminion.Crypto.sha1(
                           mixminion.Crypto.pk_encode_public_key(identityKey)))
 
     fields = {
         "IP": config['Incoming/MMTP'].get('IP', "0.0.0.0"),
+        "Hostname": config['Incoming/MMTP'].get('Hostname', None),
         "Port": config['Incoming/MMTP'].get('Port', 0),
         "Nickname": nickname,
         "Identity":
@@ -940,13 +954,18 @@
         }
 
     # If we don't know our IP address, try to guess
-    if fields['IP'] == '0.0.0.0':
+    if fields['IP'] == '0.0.0.0': #XXXX007 remove
         try:
             fields['IP'] = _guessLocalIP()
             LOG.warn("No IP configured; guessing %s",fields['IP'])
         except IPGuessError, e:
             LOG.error("Can't guess IP: %s", str(e))
             raise UIError("Can't guess IP: %s" % str(e))
+    # If we don't know our Hostname, try to guess
+    if fields['Hostname'] is None:
+        fields['Hostname'] = socket.getfqdn()
+        LOG.warn("No Hostname configured; guessing %s",fields['Hostname'])
+    _checkHostnameIsLocal(fields['Hostname'])
 
     # Fill in a stock server descriptor.  Note the empty Digest: and
     # Signature: lines.
@@ -961,7 +980,7 @@
         Valid-After: %(ValidAfter)s
         Valid-Until: %(ValidUntil)s
         Packet-Key: %(PacketKey)s
-        Packet-Formats: %(PacketFormat)s
+        Packet-Versions: %(PacketFormat)s
         Software: Mixminion %(mm_version)s
         Secure-Configuration: %(Secure)s
         """ % fields
@@ -978,6 +997,7 @@
             [Incoming/MMTP]
             Version: 0.1
             IP: %(IP)s
+            Hostname: %(Hostname)s
             Port: %(Port)s
             Key-Digest: %(KeyID)s
             Protocols: %(MMTPProtocolsIn)s
@@ -1118,6 +1138,26 @@
         raise IPGuessError("Only address found is in a private IP block")
 
     return IP
+
+_KNOWN_LOCAL_HOSTNAMES = {}
+
+def _checkHostnameIsLocal(name):
+    if _KNOWN_LOCAL_HOSTNAMES.has_key(name):
+        return
+    try:
+        r = mixminion.server.DNSFarm.getIPs(name)
+        for family, addr, _ in r:
+            if family == mixminion.server.DNSFarm.AF_INET:
+                if addr.startswith("127.") or addr.startswith("0."):
+                    raise UIError("Hostname %r resolves to reserved address %s"
+                                  %(name, addr))
+            else:
+                if addr in ("::", "::1"):
+                    raise UIError("Hostname %r resolves to reserved address %s"
+                                  %(name,addr))
+    except socket.error, e:
+        raise UIError("Cannot resolve hostname %r: %s"%(name,e))
+    _KNOWN_LOCAL_HOSTNAMES[name] = 1
 
 def generateCertChain(filename, mmtpKey, identityKey, nickname,
                       certStarts, certEnds):

Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.94
retrieving revision 1.95
diff -u -d -r1.94 -r1.95
--- ServerMain.py	13 Oct 2003 17:30:24 -0000	1.94
+++ ServerMain.py	19 Oct 2003 03:12:02 -0000	1.95
@@ -53,6 +53,7 @@
 import mixminion.Config
 import mixminion.Crypto
 import mixminion.Filestore
+import mixminion.server.DNSFarm
 import mixminion.server.MMTPServer
 import mixminion.server.Modules
 import mixminion.server.PacketHandler
@@ -299,7 +300,7 @@
         mixminion.server.ServerQueue.DeliveryQueue.__init__(self, location)
         self.server = None
         self.incomingQueue = None
-        self.addr = (ip,port,keyid)
+        self.addr = (ip,port,keyid) #XXXX006 need to detect same host.
 
     def configure(self, config):
         """Set up this queue according to a ServerConfig object."""
@@ -325,9 +326,9 @@
             except mixminion.Filestore.CorruptedFile:
                 continue
             msgs.setdefault(addr, []).append(pending)
-        for addr, messages in msgs.items():
-            if self.addr[:2] == (addr.ip, addr.port):
-                if self.addr[2] != addr.keyinfo:
+        for routing, messages in msgs.items():
+            if self.addr[:2] == (routing.ip, routing.port): #XXX006 detect host
+                if self.addr[2] != routing.keyinfo:
                     LOG.warn("Delivering messages to myself with bad KeyID")
                 for pending in messages:
                     LOG.trace("Delivering message OUT:%s to myself.",
@@ -340,11 +341,10 @@
             deliverable = [
                 mixminion.server.MMTPServer.DeliverablePacket(pending)
                 for pending in messages ]
-            LOG.trace("Delivering messages OUT:[%s] to %s:%s",
+            LOG.trace("Delivering messages OUT:[%s] to %s",
                       " ".join([p.getHandle() for p in messages]),
-                      addr.ip, addr.port)
-            self.server.sendMessages(addr.ip, addr.port, addr.keyinfo,
-                                     deliverable)
+                      routing)
+            self.server.sendMessagesByRouting(routing, deliverable)
 
 class _MMTPServer(mixminion.server.MMTPServer.MMTPAsyncServer):
     """Implementation of mixminion.server.MMTPServer that knows about
@@ -709,6 +709,8 @@
         self.cleaningThread = CleaningThread()
         self.processingThread = ProcessingThread()
 
+        self.dnsCache = mixminion.server.DNSFarm.DNSCache()
+
         LOG.debug("Connecting queues")
         self.incomingQueue.connectQueues(mixPool=self.mixPool,
                                        processingThread=self.processingThread)
@@ -718,6 +720,7 @@
                                          incoming=self.incomingQueue)
         self.mmtpServer.connectQueues(incoming=self.incomingQueue,
                                       outgoing=self.outgoingQueue)
+        self.mmtpServer.connectDNSCache(self.dnsCache)
 
         self.cleaningThread.start()
         self.processingThread.start()
@@ -791,6 +794,7 @@
 
         def _tryTimeout(self=self):
             self.mmtpServer.tryTimeout()
+            self.dnsCache.cleanCache()
             return self.mmtpServer.getNextTimeoutTime()
 
         self.scheduleRecurringComplex(self.mmtpServer.getNextTimeoutTime(now),
@@ -1352,4 +1356,3 @@
     LOG.info("Telling server to publish descriptors")
 
     _signalServer(config, reload=1)
-