[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[or-cvs] r11949: Adding weather to svn... (weather)



Author: pde
Date: 2007-10-15 15:05:46 -0400 (Mon, 15 Oct 2007)
New Revision: 11949

Added:
   weather/config.py
   weather/poll.py
   weather/subscribe.html
   weather/weather.py
Log:
Adding weather to svn...



Added: weather/config.py
===================================================================
--- weather/config.py	                        (rev 0)
+++ weather/config.py	2007-10-15 19:05:46 UTC (rev 11949)
@@ -0,0 +1,17 @@
+#!/usr/bin/env python2.5
+
+authenticator = ""  # customise this
+
+#URLbase = "http://weather.torpoject.org";
+URLbase = "http://ip-adress:port";
+
+weather_email = "no-reply@xxxxxxxxxxxxxx"
+
+# these respond to pings (for now!) and are geographically dispersed
+
+ping_targets = ["google.com", "telstra.com.au", "yahoo.co.uk"]
+
+failure_threshold = 4 # this number of failures in a row counts as being
+                      # down
+
+poll_period = 1800 # try to wait this number of seconds in between polling

Added: weather/poll.py
===================================================================
--- weather/poll.py	                        (rev 0)
+++ weather/poll.py	2007-10-15 19:05:46 UTC (rev 11949)
@@ -0,0 +1,242 @@
+#!/usr/bin/env python2.5
+import socket
+import sys
+import os
+import gdbm
+import re
+import time
+import threading
+from datetime import datetime
+from traceback import print_exception
+from subprocess import Popen, PIPE
+import TorCtl.TorCtl as TorCtl
+
+from config import authenticator, URLbase, weather_email, failure_threshold
+from config import poll_period, ping_targets
+from weather import parse_subscriptions
+
+debug = 0
+
+
+debugfile = open("torctl-debug","w")
+class TorPing:
+  "Check to see if various tor nodes respond to SSL hanshakes"
+  def __init__(self, control_host = "127.0.0.1", control_port = 9051):
+    "Keep the connection to the control port lying around"
+    # need to know this hasn't gone down!
+    self.control_host = control_host
+    self.control_port = control_port
+    self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
+    self.sock.connect((control_host,control_port)) 
+    self.control = TorCtl.Connection(self.sock)
+    self.control.authenticate(authenticator)
+    self.control.debug(debugfile)
+
+  def __del__(self):
+    self.sock.close()
+    del self.sock
+    self.sock = None                # prevents double deletion exceptions
+
+    # it would be better to fix TorCtl!
+    try:
+      self.control.close()
+    except:
+      pass
+
+    del self.control
+    self.control = None
+
+  def ping(self, node_id):
+    "Let's see if this tor node is up."
+    string = "ns/id/" + node_id
+    info = self.control.get_info(string)
+    # info looks like this:
+    # {'ns/id/FFCB46DB1339DA84674C70D7CB586434C4370441': 'r moria1 /8tG2xM52oRnTHDXy1hkNMQ3BEE pavoLDqxMvw+T1VHR5hmmgpr9self 2007-10-10 21:12:08 128.31.0.34 9001 9031\ns Authority Fast Named Running Valid V2Dir\n'}
+    ip,port = info[string].split()[6:8]
+    # throw exceptions like confetti if this isn't legit
+    socket.inet_aton(ip)
+    # port 0 is not kosher
+    assert int(port) > 0
+
+    if debug: print "contacting node at %s:%s" % (ip,port)
+
+    # XXX check: could non-blocking io be used to make us safe from
+    # answer-very-slowly DOSes?  or do we need to spawn threads here?
+
+    cmd = ["openssl", "s_client",  "-connect", ip + ':' + port]
+    ssl_handshake = Popen( args = cmd, stdout = PIPE, stderr = PIPE, stdin=PIPE)
+    ssl_handshake.stdin.close()
+    safe_from_DOS = 10000    # moria1's response is ~1500 chars long
+    output = ssl_handshake.stdout.read(safe_from_DOS)
+    n = output.find("Server public key is 1024 bit")
+    if n > 0:
+      return True
+    else:
+      return False
+
+  def test(self):
+    "Check that the connection to the Tor Control port is still okay."
+    try:
+      self.control.get_info("version")
+      return True
+    except:
+      if debug: print "Respawning control port connection..."
+      self.__del__()
+      try:
+        self.__init__(self.control_host, self.control_port)
+        return True
+      except:
+        if debug: print "Respawn failed"
+        return False
+
+
+report_text = \
+"""This is a Tor Weather report.
+
+It appears that a tor node you elected to monitor,
+
+(node id: %s)
+
+has been uncontactable through the Tor network for a while.  You may wish 
+to look at it to see why.  The last error message from our code while trying to
+contact it is included below.  You may or may not find it helpful!
+
+(You can unsubscribe from these reports at any time by visiting the 
+following url:
+
+%s )
+
+The last error message was as follows:
+--------------------------------------
+%s"""
+    
+class WeatherPoller(threading.Thread):
+  "This thread sits around, checking to see if tor nodes are up."
+
+  def __init__(self, subscriptions, lock):
+    #self.subscriptions = gdbm.open("subscriptions")
+    self.gdbm_lock = lock
+    self.subscriptions = subscriptions
+    self.failure_counts = gdbm.open("failures.gdbm", "cs") 
+    self.failure_counts.reorganize() # just in case
+    if debug:
+      print "failure counts"
+      for node in self.failure_counts.keys():
+        print node, self.failure_counts[node]
+    self.tp = TorPing()
+    threading.Thread.__init__(self)
+
+  def run(self):
+    "Keep polling nodes... forever."
+    while True:
+      stamp = time.time()
+      self.ping_all()
+      offset = time.time() - stamp
+      if offset < poll_period:
+        time.sleep(poll_period - offset)
+      
+  def ping_all(self):
+    if debug: print "starting a new round of polls"
+    #self.tp = TorPing()
+    if not self.tp.test():
+      return False
+    print 'Timestamp', datetime.now().isoformat('-')
+    self.gdbm_lock.acquire()
+    node = self.subscriptions.firstkey()
+    while node != None:
+      # nodes stay in the subscription db even if nobody is subscribed to them
+      # anymore
+      if self.subscriptions[node] != "":
+        self.gdbm_lock.release()
+        self.ping(node)       # this is time consuming ; don't hold the lock
+        self.gdbm_lock.acquire()
+
+      node = self.subscriptions.nextkey(node)
+    self.gdbm_lock.release()
+    #del self.tp   # this minimises the chance of confusion a local tor control
+                  # port crash with a remote node being down
+    if debug: print "Ping_all finished"
+ 
+  def ping(self, node):
+    if debug: print "pinging", node
+    try: 
+      assert self.tp.ping(node)
+      # Okay we can see this node.  Zero its count, if it has one
+      if debug: print node, "is okay"
+      try:
+        if int(self.failure_counts[node]) != 0:
+          self.failure_counts[node] = "0"
+      except KeyError:
+        pass
+    except:
+      # for /some/ reason, we can't contact this tor node
+      ex1,ex2,ex3 = sys.exc_info()
+      if self.internet_looks_okay():
+        # But we can ping the net.  That's bad.
+        reason = print_exception(ex1,ex2,ex3)
+        if (debug):
+          print "logging a strike against node", node, "because of:"
+          print reason
+        self.strike_against(node, reason)
+      else:
+        print "I would have concluded that tor node", node, "was down;"
+        print "The problem looked like this:"
+        print print_exception(ex1,ex2,ex3)
+        print "But I couldn't ping %s!" % (self.ping_failure)
+ 
+  good_ping = re.compile("0% packet loss")
+
+  def internet_looks_okay(self):
+    cmd = ["ping", "-c", "3", "x"]
+    pings = []
+    for host in ping_targets:
+      cmd[3] = host
+      pings.append((Popen(args=cmd,stdout=PIPE,stdin=PIPE,stderr=PIPE), host))
+    for ping,host in pings:
+      output = ping.stdout.read()
+      ping.stdin.close()
+      if not self.good_ping.search(output):
+        self.ping_failure = host
+        return False
+    return True
+      
+  def strike_against(self, node, reason):
+    "Increment the failure count for this node"
+    # gdbm is string based
+    if not self.failure_counts.has_key(node):
+      self.failure_counts[node] = "1"
+    else:
+      count = int(self.failure_counts[node]) + 1
+      self.failure_counts[node] = "%d" %(count)
+      if count == failure_threshold:
+        self.send_failure_email(node, reason)
+
+  def send_failure_email(self, node, reason):
+    import smtplib
+    from email.mime.text import MIMEText
+
+    # Send the message via our own SMTP server, but don"t include the
+    # envelope header.
+    s = smtplib.SMTP()
+    s.connect()
+    self.gdbm_lock.acquire()
+    list = parse_subscriptions(node,self.subscriptions)
+    self.gdbm_lock.release()
+    for address, unsub_token in list:
+ 
+      unsub_url = URLbase+"/unsubscribe/" + unsub_token
+      msg= MIMEText(report_text % (node, unsub_url, reason))
+
+      sender = weather_email
+      msg["Subject"] = "Tor weather report"
+      msg["From"] = sender
+      msg["To"] = address
+      msg["List-Unsubscribe"] = unsub_url
+      s.sendmail(sender, [address], msg.as_string())
+    s.close()
+        
+def ping_test():
+  x = NodePoller()
+  print x.internet_looks_okay()
+  x.send_failure_email()
+

Added: weather/subscribe.html
===================================================================
--- weather/subscribe.html	                        (rev 0)
+++ weather/subscribe.html	2007-10-15 19:05:46 UTC (rev 11949)
@@ -0,0 +1,15 @@
+<html>
+<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
+<form method="post" action="/subscribe">
+You can use this form to request status updates to tell you when a particular
+Tor node has become unreachable for a sustained period of time.
+<p>
+<input type="text" name="email" size="18" maxlength="255" value="Enter email address" onclick="if (this.value == 'Enter email address') {this.value = ''}" />
+</p><p>
+<input type="text" name="node" size="18" maxlength="255" value="Enter Tor node ID" onclick="if (this.value == 'Enter Tor node ID') {this.value = ''}" />
+</p><p>
+<input type="submit" class="submit" value="Subscribe to Tor Weather" name="sa"/>
+</p>
+<p><i>Please note that while we won't ever intentionally publish them, the address/node pairs sent to this server are not protected against SMTP eavesdropping, hacking, or lawyers.</i>
+</form>
+</html>

Added: weather/weather.py
===================================================================
--- weather/weather.py	                        (rev 0)
+++ weather/weather.py	2007-10-15 19:05:46 UTC (rev 11949)
@@ -0,0 +1,302 @@
+#!/usr/bin/env python2.5
+import web
+import string
+import socket
+import DNS
+import re
+import random
+import sys
+import gdbm
+import time
+import threading
+import signal # does this help with keyboard interrupts?
+
+from config import URLbase, weather_email
+
+debug = 0
+
+DNS.ParseResolvConf()
+
+urls = (
+'/subscribe', 'subscribe', 
+'/confirm-subscribe/(.*)', 'confirm',
+'/unsubscribe/(.*)', 'unsubscribe'
+)
+
+# Should do something more elegant with this!
+if __name__ == "__main__": 
+
+# This is a single lock for all the gdbm write rights, to ensure that
+# different web.py threads aren't trying to write at the same time.
+# poll.py has its own gdbm objects, but they only read from these databases.
+
+  gdbm_lock = threading.RLock()
+
+  requests = gdbm.open("requests.gdbm","cs")
+  print "requests:"
+  for s in requests.keys():
+    print s, requests[s]
+  subscriptions = gdbm.open("subscriptions.gdbm","cs")
+  print "subscriptions:"
+  for s in subscriptions.keys():
+    print s, '"'+subscriptions[s]+'"'
+  unsubscriptions = gdbm.open("unsubscriptions.gdbm","cs")
+  print "unsubscriptions:"
+  for s in unsubscriptions.keys():
+    print s, unsubscriptions[s]
+
+# these may or may not be better than storing pickles with gdbm
+
+class DatabaseError(Exception):
+  pass
+
+def parse_subscriptions(node, subs):
+  "Turn a string in the db back into a list of pairs"
+  words = subs[node].split()
+  if (len(words) % 2 != 0):
+    raise DatabaseError, words
+
+  results = []
+  while True:
+    try:
+      auth = words.pop()
+    except IndexError:
+      break
+    email = words.pop()
+    results.append((email, auth))
+
+  return results
+
+def delete_sub(pair, sub, node):
+  "Craziness to delete pair from a string in the subscriptions db"
+  # regexps probably aren't easily made safe here
+  words = sub[node].split()
+  if (len(words) % 2 != 0):
+    raise DatabaseError, words
+  for n in range(len(words) / 2):
+    if pair[0] == words[n*2] and pair[1] == words[n*2 + 1]:
+      sub[node] = " ".join(words[:n*2] + words[n*2 + 2:])
+      break
+  else:
+    raise DatabaseError, pair
+  sub.sync()
+      
+random.seed()
+def randstring():
+  "Produce a random alphanumeric string for authentication"
+  # This gives us a 190.5 bit random space
+  return "".join([random.choice(chars) for x in xrange(32)])
+
+subscribe_text = \
+"""Dear human, this is the Tor Weather Report system.  
+
+Somebody (possibly you) has requested that status monitoring information about 
+a tor node (id: %s) 
+be sent to this email address.
+
+If you wish to confirm this request, please visit the following link:
+
+%s 
+
+If you do *not* wish to receive Tor Weather Reports, you do not need to do 
+anything."""
+
+class subscribe:
+  
+  def GET(self):
+    print open("subscribe.html").read()
+
+  whitespace = re.compile("\s*")
+  def POST(self):
+    i = web.input(node="none",email="none")
+    if not self.check_email(i.email):
+      print 'That email address looks fishy to our refined sensibilities!'
+      if debug: print "(" + self.email_error + ")"
+      return True # XXX temp
+
+    node_cleaned = self.whitespace.sub("", i.node) 
+    if not self.check_node_id(node_cleaned):
+      print "That doesn't look like a proper Tor node ID."
+      return True
+
+    if not self.already_subscribed(i.email, node_cleaned):
+      self.send_confirmation_email(i.email, node_cleaned)
+    elif debug:
+      print "Sorry, I'm not subscribing you twice."
+
+  
+  # node ids are 40 digit hexidecimal numbers
+  node_okay = re.compile("(0x)?[a-fA-F0-9]{40}\Z")
+
+  def check_node_id(self, node):
+    if self.node_okay.match(node):
+      return True
+    else:
+      return False
+
+  def already_subscribed(self, address, node):
+    gdbm_lock.acquire()
+
+    try:
+      words = subscriptions[node].split()
+      if address in words:
+        already = True
+      else:
+        already = False
+    except KeyError:
+      already = False
+
+    gdbm_lock.release()
+    return already
+    
+  def send_confirmation_email(self, address, node):
+    authstring = randstring()
+
+    gdbm_lock.acquire()
+    requests[authstring] = address + " " + node
+    gdbm_lock.release()
+    #def f(s):
+    #  requests[authstring] = s
+    #gdbm_lock.lock(f, address + " " + node)
+
+    #url = web.ctx.homedomain + "/confirm-subscribe/" + authstring
+    url = URLbase + "/confirm-subscribe/" + authstring
+
+    import smtplib
+    from email.mime.text import MIMEText
+    msg= MIMEText(subscribe_text % (node, url))
+    s = smtplib.SMTP()
+    s.connect()
+    sender = weather_email
+    msg["Subject"] = "Tor weather subscription request"
+    msg["From"] = sender
+    msg["To"] = address
+    s.sendmail(sender, [address], msg.as_string())
+    s.close()
+
+    print "Thankyou for using Tor Weather.  A confirmation request has been sent to", address + "."
+    #print url
+
+  # Section 3.4.1 of RFC 2822 is much more liberal than this!
+  domain_okay = re.compile("[A-Za-z0-9\-\.']*")
+  local_okay = re.compile("[A-Za-z0-9\-_\.\+]*")
+  querinator=DNS.Request(qtype='mx')
+  email_error = None
+
+  def check_email(self, address):
+    "Just check that address is not something fruity"
+    # This is wrong (see http://www.ex-parrot.com/~pdw/Mail-RFC822-Address.html)
+    # but it should prevent crazy stuff from being accepted
+
+    if len(address) >= 80:
+      self.email_error = "We declare this address too long"
+      return False
+    atpos = address.find('@')
+    if atpos == -1:
+      self.email_error = "No @ symbol"
+      return False
+
+    if address[atpos:].find('.') == -1:
+      self.email_error = "No .s after @"
+      return False
+
+    local = address[:atpos]
+    domain = address[atpos + 1:]
+
+    if self.local_okay.match(local).end() != len(local):
+      self.email_error = "unauthorised chars in local part"
+      return False
+
+    for component in domain.split("."):
+      l = len(component)
+      if l == 0:
+        self.email_error = "empty domain segment"
+        return False
+      if self.domain_okay.match(component).end() != l:
+        self.email_error = "unauthorised chars in domain, " + component
+        return False
+
+    # XXX it's not clear yet what exception handling this should do:
+    try:
+      dnsquery = self.querinator.req(domain)
+    except DNS.DNSError, type:
+      if type == 'Timeout':
+        self.email_error = "Can't find a DNS server!"
+        return False
+      else:
+        raise 
+      
+
+    if not dnsquery.answers:
+      # No DNS MX records for this domain
+      self.email_error = "no MX records for domain"
+      return False
+
+    return True
+
+class confirm:
+  def GET(self,authstring):
+    
+    print "<html>"
+    if debug: print "checking confirmation..."
+    gdbm_lock.acquire()
+
+    if not requests.has_key(authstring):
+      print "Error in subscription request!"
+      gdbm_lock.release()
+      return 0
+
+    email, node = requests[authstring].split()
+
+    # We want a single link in every outgoing email that will unsubscribe that
+    # user.  But we don't want to generate a new database entry every time
+    # an email gets sent.  So do it now, and remember the token.
+    unsub_authstring = randstring()
+    subscription = email + " " + node + " " + unsub_authstring
+    unsubscriptions[unsub_authstring] = subscription
+    subscription2 = email + " " + unsub_authstring
+    if subscriptions.has_key(node):
+      subscriptions[node] += " " +subscription2
+    else:
+      subscriptions[node] = subscription2
+    url = web.ctx.homedomain + "/unsubscribe/" + unsub_authstring
+    print "Succesfully subscribed <tt>", email, 
+    print "</tt> to weather reports about Tor node", node
+    print "<p>You can unsubscribe at any time by clicking on the following link:"
+    print '<p><a href="' + url + '">' + url + '</a>'
+    print '<p>(you will be reminded of it in each weather report we send)'
+    
+    del(requests[authstring])
+    subscriptions.sync()
+    gdbm_lock.release()
+    
+class unsubscribe:
+  def GET(self,authstring):
+
+    gdbm_lock.acquire()
+    if not unsubscriptions.has_key(authstring):
+      print "Invalid unsubscription request!"
+      print unsubscriptions
+      return 0
+
+    email, node, _ = unsubscriptions[authstring].split()
+
+    delete_sub ((email, authstring), subscriptions, node)
+    #subscriptions[node].remove((email,authstring))
+    print "<html>Succesfully unsubscribed <tt>", email, 
+    print "</tt> from weather reports about Tor node", node, "</html>"
+    del (unsubscriptions[authstring])
+    gdbm_lock.release()
+
+def main():
+  from poll import WeatherPoller
+  weather_reports = WeatherPoller(subscriptions, gdbm_lock)
+  weather_reports.start()                 # this starts another thread
+
+  web.run(urls, globals())
+
+
+if __name__ == "__main__": 
+  main()
+  
+


Property changes on: weather/weather.py
___________________________________________________________________
Name: svn:executable
   + *