[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[minion-cvs] Check in some long-pending tweaks from my laptop. Impl...
Update of /home/minion/cvsroot/src/minion/lib/mixminion/server
In directory moria.mit.edu:/tmp/cvs-serv18251/lib/mixminion/server
Modified Files:
DNSFarm.py MMTPServer.py ServerConfig.py ServerKeys.py
ServerMain.py
Log Message:
Check in some long-pending tweaks from my laptop. Implements DNS, refactors.
NetUtils (New File):
- Added new module to hold general-purpose networking code. Currently
includes gethostname/getaddrinfo wrappers; timeout helper code; and
IPv6-support testing.
BuildMessage:
- Finally remove the old build*Message calls and switch to the build*Packet
messages. The difference is that the old ones combined payload encoding
with packet building, while the new ones take a complete (~28K) payload
generated by encodeMessage.
ClientDirectory:
- Debug lots of new code.
- Turn timeout from global to argument
- Use new NetUtils timeout logic.
ClientDirectory,ClientMain:
- Make DROP packets work again.
Common, MMTPClient:
- Move TimeoutError into Common
Common:
- Add function to detect strings that look like hostnames
- Debug TimeoutQueue
Config:
- Add _parseHost and _parseIP6
MMTPClient:
- Handle DNS lookup for MMTPHostInfo code and (possible) IPv6 result.
- Use new NetUtils timeout code
Packet:
- Don't allow MMTPHostInfo objects when hostname is not a possible hostname
- Force hostnames to lower case
ServerInfo, ServerKeys:
- Rename Packet-Formats to Packet-Versions
- Deprecate IP, add Hostname.
- Deprecate Key-Digest.
- Add getIP, getHostname
- Make getKeyDigest compute digest of identity key; we don't need Key-Digest
any more.
- Refactor protocol matchup code
- Add canStartAt function to tell whether we can connect directly to a given
server.
test:
- Add test for new packet types and config types
- Change code to use new BuildMessage interfaces
DNSFarm:
- Handle panding requests correctly.
- Move getIP functions to NetUtils.
MMTPServer,ServerMain:
- Implement DNS farm hookup for MMTPHostInfo routing.
ServerConfig:
- Add hostnames to configuration.
Index: DNSFarm.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/DNSFarm.py,v
retrieving revision 1.1
retrieving revision 1.2
diff -u -d -r1.1 -r1.2
--- DNSFarm.py 13 Oct 2003 17:32:25 -0000 1.1
+++ DNSFarm.py 19 Oct 2003 03:12:02 -0000 1.2
@@ -8,6 +8,7 @@
import time
import sys
from mixminion.Common import LOG, TimeoutQueue, QueueEmpty
+from mixminion.NetUtils import getIP
__all__ = [ 'DNSCache' ]
@@ -15,16 +16,16 @@
def __cmp__(self,o):
return cmp(type(self), type(o))
PENDING = _Pending
-NOENT = -1
MIN_THREADS = 2
MIN_FREE_THREADS = 1
MAX_THREADS = 8
MAX_THREAD_IDLE = 5*60
MAX_ENTRY_TTL = 15*60
-PREFER_INET4 = 1
+
class DNSCache:
+ """DOCDOC"""
def __init__(self):
self.cache = {} # name -> getIP return / PENDING
self.callbacks = {}
@@ -44,12 +45,14 @@
try:
self.lock.acquire()
v = self.cache.get(name)
- if v is None:
+ if v is None or v is PENDING:
self.callbacks.setdefault(name, []).append(cb)
+ #XXXX006 We should check for literal addresses before we queue
+ if v is None:
self._beginLookup(name)
finally:
self.lock.release()
- if v is not None:
+ if v is not None and v is not PENDING:
cb(name,v)
def shutdown(self, wait=0):
try:
@@ -142,43 +145,4 @@
finally:
self.dnscache.adjLiveThreads(-1)
-if hasattr(socket, 'getaddrinfo'):
- def getIP(name):
- try:
- r = socket.getaddrinfo(name, None)
- inet4 = [ addr[4][0] for addr in r if addr[0] == socket.AF_INET ]
- inet6 = [ addr[4][0] for addr in r if addr[0] == socket.AF_INET6 ]
- if not (inet4 or inet6):
- LOG.error("getaddrinfo returned no inet addresses!")
- return (NOENT, "No inet addresses returned", time.time())
- best4=best6=None
- now=time.time()
- if inet4: best4=(socket.AF_INET,inet4[0],now)
- if inet6: best6=(socket.AF_INET,inet6[0],now)
- if PREFER_INET4:
- res = best4 or best6
- else:
- res = best6 or best4
- protoname = (res[0] == socket.AF_INET) and "inet" or "inet6"
- LOG.trace("Result for getaddrinfo(%r): %s:%s (%d others dropped)",
- name,protoname,res[1],len(r)-1)
- return res
- except socket.error, e:
- LOG.trace("Result for getaddrinfo(%r): error:%r",name,e)
- if len(e.args) == 2:
- return (NOENT, str(e[1]), time.time())
- else:
- return (NOENT, str(e), time.time())
-else:
- def getIP(name):
- '''return family/NOENT, address/error, time'''
- try:
- r = socket.gethostbyname(name)
- LOG.trace("Result for gethostbyname(%r): inet:%r",name,r)
- return (socket.AF_INET, r, time.time())
- except socket.error, e:
- LOG.trace("Result for gethostbyname(%r): error:%r",name,e)
- if len(e.args) == 2:
- return (NOENT, str(e[1]), time.time())
- else:
- return (NOENT, str(e), time.time())
+
Index: MMTPServer.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/MMTPServer.py,v
retrieving revision 1.52
retrieving revision 1.53
diff -u -d -r1.52 -r1.53
--- MMTPServer.py 1 Oct 2003 01:34:50 -0000 1.52
+++ MMTPServer.py 19 Oct 2003 03:12:02 -0000 1.53
@@ -28,10 +28,11 @@
import mixminion._minionlib as _ml
from mixminion.Common import MixError, MixFatalError, MixProtocolError, \
- LOG, stringContains
+ LOG, stringContains, MessageQueue, QueueEmpty
from mixminion.Crypto import sha1, getCommonPRNG
-from mixminion.Packet import MESSAGE_LEN, DIGEST_LEN
+from mixminion.Packet import MESSAGE_LEN, DIGEST_LEN, IPV4Info, MMTPHostInfo
from mixminion.MMTPClient import PeerCertificateCache
+from mixminion.NetUtils import IN_PROGRESS_ERRNOS
import mixminion.server.EventStats as EventStats
from mixminion.Filestore import CorruptedFile
@@ -44,13 +45,6 @@
warn = LOG.warn
error = LOG.error
-# For windows -- list of errno values that we can expect when blocking IO
-# blocks on a connect.
-IN_PROGRESS_ERRNOS = [ getattr(errno, ename)
- for ename in [ "EINPROGRESS", "WSAEWOULDBLOCK"]
- if hasattr(errno,ename) ]
-del ename
-
class AsyncServer:
"""AsyncServer is the core of a general-purpose asynchronous
select-based server loop. AsyncServer maintains two lists of
@@ -196,11 +190,11 @@
# connectionFactory: a function that takes as input a socket from a
# newly received connection, and returns a Connection object to
# register with the async server.
- def __init__(self, ip, port, backlog, connectionFactory):
+ def __init__(self, family, ip, port, backlog, connectionFactory):
"""Create a new ListenConnection"""
self.ip = ip
self.port = port
- self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.sock = socket.socket(family, socket.SOCK_STREAM)
self.sock.setblocking(0)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
@@ -1073,22 +1067,43 @@
# FFFF Don't always listen; don't always retransmit!
# FFFF Support listening on multiple IPs
- IP = config['Incoming/MMTP'].get('ListenIP')
- if IP is None:
- IP = config['Incoming/MMTP']['IP']
-
+ ip4_supported, ip6_supported = mixminion.NetUtils.getProtocolSupport()
+ IP, IP6 = None, None
+ if ip4_supported:
+ IP = config['Incoming/MMTP'].get('ListenIP')
+ if IP is None:
+ IP = config['Incoming/MMTP'].get('IP')
+ if IP is None:
+ IP = "0.0.0.0"
+ if ip6_supported:
+ IP6 = config['Incoming/MMTP'].get('ListenIP6')
+ if IP6 is None:
+ IP6 = "::"
+
port = config['Incoming/MMTP'].get('ListenPort')
if port is None:
port = config['Incoming/MMTP']['Port']
- self.listener = ListenConnection(IP, port,
- LISTEN_BACKLOG,
- self._newMMTPConnection)
+ self.listeners = [] #DOCDOC
+ for (supported, addr, family) in [(ip4_supported,IP,socket.AF_INET),
+ (ip6_supported,IP6,socket.AF_INET6)]:
+ if not supported or not addr:
+ continue
+ listener = ListenConnection(family, addr, port,
+ LISTEN_BACKLOG,
+ self._newMMTPConnection)
+ self.listeners.append(listener)
+ listener.register(self)
+
#self.config = config
- self.listener.register(self)
self._timeout = config['Server']['Timeout'].getSeconds()
self.clientConByAddr = {}
self.certificateCache = PeerCertificateCache()
+ self.dnsCache = None #DOCDOC
+ self.msgQueue = MessageQueue() #DOCDOC
+
+ def connectDNSCache(self, dnsCache):
+ self.dnsCache = dnsCache
def setContext(self, context):
"""Change the TLS context used for newly received connections.
@@ -1113,15 +1128,50 @@
return con
def stopListening(self):
- self.listener.shutdown()
+ for listener in self.listeners:
+ listener.shutdown()
- def sendMessages(self, ip, port, keyID, deliverable):
+ def sendMessagesByRouting(self, routing, deliverable):
+ """DOCDOC"""
+ if isinstance(routing, IPV4Info):
+ self.sendMessages(socket.AF_INET, routing.ip, routing.port,
+ routing.keyinfo, deliverable)
+ else:
+ assert isinstance(routing, MMTPHostInfo)
+ def lookupDone(name, (family, addr, when),
+ self=self, routing=routing, deliverable=deliverable):
+ if addr == "NOENT":
+ for m in deliverable:
+ try:
+ m.failed(1)
+ except AttributeError:
+ pass
+ else:
+ self.queueSendableMessages(family, addr,
+ routing.port, routing.keyinfo,
+ deliverable)
+
+ self.dnsCache.lookup(routing.hostname, lookupDone)
+
+ def queueSendableMessages(self, family, addr, port, keyID, deliverable):
+ """DOCDOC"""
+ self.msgQueue.put((family,addr,port,keyID,deliverable))
+
+ def sendQueuedMessages(self):
+ """DOCDOC"""
+ while 1:
+ try:
+ family,addr,port,keyID,deliverable=self.msgQueue.get(1)
+ except QueueEmpty:
+ return
+ self.sendMessages(family,addr,port,keyID,deliverable)
+
+ def sendMessages(self, family, ip, port, keyID, deliverable):
"""Begin sending a set of messages to a given server.
deliverable is a list of objects obeying the DeliverableMessage
interface.
"""
-
try:
# Is there an existing connection open to the right server?
con = self.clientConByAddr[(ip,port,keyID)]
@@ -1171,3 +1221,8 @@
def onMessageReceived(self, msg):
"""Abstract function. Called when we get a message"""
pass
+
+ def process(self, timeout):
+ """DOCDOC overrides"""
+ self.sendQueuedMessages()
+ AsyncServer.process(self, timeout)
Index: ServerConfig.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerConfig.py,v
retrieving revision 1.37
retrieving revision 1.38
diff -u -d -r1.37 -r1.38
--- ServerConfig.py 31 Aug 2003 19:29:29 -0000 1.37
+++ ServerConfig.py 19 Oct 2003 03:12:02 -0000 1.38
@@ -299,10 +299,13 @@
"10 minutes",) },
# FFFF Generic multi-port listen/publish options.
'Incoming/MMTP' : { 'Enabled' : ('REQUIRE', C._parseBoolean, "no"),
+ #XXXX007 deprecate or remove IP.
'IP' : ('ALLOW', C._parseIP, "0.0.0.0"),
+ 'Hostname' : ('ALLOW', C._parseHost, None),
'Port' : ('ALLOW', C._parseInt, "48099"),
'ListenIP' : ('ALLOW', C._parseIP, None),
'ListenPort' : ('ALLOW', C._parseInt, None),
+ 'ListenIP6' : ('ALLOW', C._parseIP6, None),
'Allow' : ('ALLOW*', C._parseAddressSet_allow, None),
'Deny' : ('ALLOW*', C._parseAddressSet_deny, None)
},
Index: ServerKeys.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerKeys.py,v
retrieving revision 1.50
retrieving revision 1.51
diff -u -d -r1.50 -r1.51
--- ServerKeys.py 31 Aug 2003 19:29:29 -0000 1.50
+++ ServerKeys.py 19 Oct 2003 03:12:02 -0000 1.51
@@ -23,6 +23,7 @@
import mixminion._minionlib
import mixminion.Crypto
import mixminion.Packet
+import mixminion.server.DNSFarm
import mixminion.server.HashLog
import mixminion.server.MMTPServer
import mixminion.server.ServerMain
@@ -519,7 +520,7 @@
desc = keys.getServerDescriptor()
return (desc['Incoming/MMTP']['IP'],
desc['Incoming/MMTP']['Port'],
- desc['Incoming/MMTP']['Key-Digest'])
+ desc.getKeyDigest())
def lock(self, blocking=1):
return self._lock.acquire(blocking)
@@ -794,7 +795,7 @@
warn("Mismatched ports: %s configured; %s published.",
config_im['Port'], info_im['Port'])
- info_ip = info['Incoming/MMTP']['IP']
+ info_ip = info_im.get('IP',None)
if config_im['IP'] == '0.0.0.0':
guessed = _guessLocalIP()
if guessed != info_ip:
@@ -804,6 +805,17 @@
warn("Mismatched IPs: %s configured; %s published.",
config_im['IP'], info_ip)
+ info_host = info_im.get('Hostname',None)
+ config_host = config_im['Hostname']
+ if config_host is None:
+ guessed = socket.getfqdn()
+ if guessed != info_host:
+ warn("Mismatched hostnames: %s guessed; %s published",
+ guessed, info_host)
+ elif config_host != info_host:
+ warn("Mismatched hostnames: %s configured, %s published",
+ config_host, info_host)
+
if config_im['Enabled'] and not info_im.get('Version'):
warn("Incoming MMTP enabled but not published.")
elif not config_im['Enabled'] and info_im.get('Version'):
@@ -915,12 +927,14 @@
mmtpProtocolsIn = ",".join(mmtpProtocolsIn)
mmtpProtocolsOut = ",".join(mmtpProtocolsOut)
+ #XXXX007 remove
identityKeyID = formatBase64(
mixminion.Crypto.sha1(
mixminion.Crypto.pk_encode_public_key(identityKey)))
fields = {
"IP": config['Incoming/MMTP'].get('IP', "0.0.0.0"),
+ "Hostname": config['Incoming/MMTP'].get('Hostname', None),
"Port": config['Incoming/MMTP'].get('Port', 0),
"Nickname": nickname,
"Identity":
@@ -940,13 +954,18 @@
}
# If we don't know our IP address, try to guess
- if fields['IP'] == '0.0.0.0':
+ if fields['IP'] == '0.0.0.0': #XXXX007 remove
try:
fields['IP'] = _guessLocalIP()
LOG.warn("No IP configured; guessing %s",fields['IP'])
except IPGuessError, e:
LOG.error("Can't guess IP: %s", str(e))
raise UIError("Can't guess IP: %s" % str(e))
+ # If we don't know our Hostname, try to guess
+ if fields['Hostname'] is None:
+ fields['Hostname'] = socket.getfqdn()
+ LOG.warn("No Hostname configured; guessing %s",fields['Hostname'])
+ _checkHostnameIsLocal(fields['Hostname'])
# Fill in a stock server descriptor. Note the empty Digest: and
# Signature: lines.
@@ -961,7 +980,7 @@
Valid-After: %(ValidAfter)s
Valid-Until: %(ValidUntil)s
Packet-Key: %(PacketKey)s
- Packet-Formats: %(PacketFormat)s
+ Packet-Versions: %(PacketFormat)s
Software: Mixminion %(mm_version)s
Secure-Configuration: %(Secure)s
""" % fields
@@ -978,6 +997,7 @@
[Incoming/MMTP]
Version: 0.1
IP: %(IP)s
+ Hostname: %(Hostname)s
Port: %(Port)s
Key-Digest: %(KeyID)s
Protocols: %(MMTPProtocolsIn)s
@@ -1118,6 +1138,26 @@
raise IPGuessError("Only address found is in a private IP block")
return IP
+
+_KNOWN_LOCAL_HOSTNAMES = {}
+
+def _checkHostnameIsLocal(name):
+ if _KNOWN_LOCAL_HOSTNAMES.has_key(name):
+ return
+ try:
+ r = mixminion.server.DNSFarm.getIPs(name)
+ for family, addr, _ in r:
+ if family == mixminion.server.DNSFarm.AF_INET:
+ if addr.startswith("127.") or addr.startswith("0."):
+ raise UIError("Hostname %r resolves to reserved address %s"
+ %(name, addr))
+ else:
+ if addr in ("::", "::1"):
+ raise UIError("Hostname %r resolves to reserved address %s"
+ %(name,addr))
+ except socket.error, e:
+ raise UIError("Cannot resolve hostname %r: %s"%(name,e))
+ _KNOWN_LOCAL_HOSTNAMES[name] = 1
def generateCertChain(filename, mmtpKey, identityKey, nickname,
certStarts, certEnds):
Index: ServerMain.py
===================================================================
RCS file: /home/minion/cvsroot/src/minion/lib/mixminion/server/ServerMain.py,v
retrieving revision 1.94
retrieving revision 1.95
diff -u -d -r1.94 -r1.95
--- ServerMain.py 13 Oct 2003 17:30:24 -0000 1.94
+++ ServerMain.py 19 Oct 2003 03:12:02 -0000 1.95
@@ -53,6 +53,7 @@
import mixminion.Config
import mixminion.Crypto
import mixminion.Filestore
+import mixminion.server.DNSFarm
import mixminion.server.MMTPServer
import mixminion.server.Modules
import mixminion.server.PacketHandler
@@ -299,7 +300,7 @@
mixminion.server.ServerQueue.DeliveryQueue.__init__(self, location)
self.server = None
self.incomingQueue = None
- self.addr = (ip,port,keyid)
+ self.addr = (ip,port,keyid) #XXXX006 need to detect same host.
def configure(self, config):
"""Set up this queue according to a ServerConfig object."""
@@ -325,9 +326,9 @@
except mixminion.Filestore.CorruptedFile:
continue
msgs.setdefault(addr, []).append(pending)
- for addr, messages in msgs.items():
- if self.addr[:2] == (addr.ip, addr.port):
- if self.addr[2] != addr.keyinfo:
+ for routing, messages in msgs.items():
+ if self.addr[:2] == (routing.ip, routing.port): #XXX006 detect host
+ if self.addr[2] != routing.keyinfo:
LOG.warn("Delivering messages to myself with bad KeyID")
for pending in messages:
LOG.trace("Delivering message OUT:%s to myself.",
@@ -340,11 +341,10 @@
deliverable = [
mixminion.server.MMTPServer.DeliverablePacket(pending)
for pending in messages ]
- LOG.trace("Delivering messages OUT:[%s] to %s:%s",
+ LOG.trace("Delivering messages OUT:[%s] to %s",
" ".join([p.getHandle() for p in messages]),
- addr.ip, addr.port)
- self.server.sendMessages(addr.ip, addr.port, addr.keyinfo,
- deliverable)
+ routing)
+ self.server.sendMessagesByRouting(routing, deliverable)
class _MMTPServer(mixminion.server.MMTPServer.MMTPAsyncServer):
"""Implementation of mixminion.server.MMTPServer that knows about
@@ -709,6 +709,8 @@
self.cleaningThread = CleaningThread()
self.processingThread = ProcessingThread()
+ self.dnsCache = mixminion.server.DNSFarm.DNSCache()
+
LOG.debug("Connecting queues")
self.incomingQueue.connectQueues(mixPool=self.mixPool,
processingThread=self.processingThread)
@@ -718,6 +720,7 @@
incoming=self.incomingQueue)
self.mmtpServer.connectQueues(incoming=self.incomingQueue,
outgoing=self.outgoingQueue)
+ self.mmtpServer.connectDNSCache(self.dnsCache)
self.cleaningThread.start()
self.processingThread.start()
@@ -791,6 +794,7 @@
def _tryTimeout(self=self):
self.mmtpServer.tryTimeout()
+ self.dnsCache.cleanCache()
return self.mmtpServer.getNextTimeoutTime()
self.scheduleRecurringComplex(self.mmtpServer.getNextTimeoutTime(now),
@@ -1352,4 +1356,3 @@
LOG.info("Telling server to publish descriptors")
_signalServer(config, reload=1)
-