[or-cvs] r21924: {weather} Initial weather rewrite commit (weather/trunk)

Author: kaner
Date: 2010-03-12 13:13:44 +0000 (Fri, 12 Mar 2010)
New Revision: 21924

Initial weather rewrite commit

Modified: weather/trunk/weather.py
--- weather/trunk/weather.py	2010-03-11 22:13:02 UTC (rev 21923)
+++ weather/trunk/weather.py	2010-03-12 13:13:44 UTC (rev 21924)
@@ -1,392 +1,376 @@
-#!/usr/bin/env python2.5
 import os
-import web
-import DNS
 import re
-import random
 import sys
-import gdbm
-import time
-import threading
-import signal # does this help with keyboard interrupts?
+import DNS
 import base64
+import smtplib
+import socket
+from twisted.web import resource, static, server, http
+from twisted.enterprise import adbapi
+from twisted.internet import reactor
+from twisted.internet.task import LoopingCall 
+from email.mime.multipart import MIMEMultipart
+from email.mime.base import MIMEBase
+from email.mime.text import MIMEText
-from config import URLbase, weather_email, weather_storage, apache_fcgi
+import traceback
-debug = 0
-dummy_testing = 0
+import TorCtl.TorCtl as TorCtl
+# Globals
+URLbase = "https://weather.torproject.org";
+mailFrom = "tor-ops@xxxxxxxxxxxxxx"
+pollPeriod = 10
-urls = (
-'/', 'subscribe',
-'/subscribe', 'subscribe', 
-'/confirm-subscribe/(.*)', 'confirm',
-'/unsubscribe/(.*)', 'unsubscribe'
+debugfile = open("debug", "w")
+# Text strings
+    Thanks for using tor weather. A confirmation request has been sent to '%s'.
-if apache_fcgi:
-  web.wsgi.runwsgi = lambda func, addr=None: web.wsgi.runfcgi(func, addr)
+    Dear human, this is the Tor Weather Report system.  
-# Should do something more elegant with this!
-if __name__ == "__main__": 
+    Somebody (possibly you) has requested that status monitoring information 
+    about a tor node (id: %s) be sent to this email address.
-# This is a single lock for all the gdbm write rights, to ensure that
-# different web.py threads aren't trying to write at the same time.
-  try: 
+    If you wish to confirm this request, please visit the following link:
-      gdbm_lock = threading.RLock()
+    %s 
-      requests = gdbm.open(weather_storage + "/requests.gdbm","cs")
-      print "requests:"
-      for s in requests.keys():
-        print s, requests[s]
-      subscriptions = gdbm.open(weather_storage + "/subscriptions.gdbm","cs")
-      print "subscriptions:"
-      for s in subscriptions.keys():
-        print s, '"'+subscriptions[s]+'"'
-      unsubscriptions = gdbm.open(weather_storage + "/unsubscriptions.gdbm","cs")
-      print "unsubscriptions:"
-      for s in unsubscriptions.keys():
-        print s, unsubscriptions[s]
+    If you do *not* wish to receive Tor Weather Reports, you do not need to do 
+    anything.
-      failures = gdbm.open(weather_storage + "/failures.gdbm", "cs")
-      print "failures:"
-      for s in failures.keys():
-        print s, failures[s]
+# Query strings
+CHECK_SUBS_Q = "SELECT id FROM subscriptions WHERE email='%s' and node='%s'"
+    INSERT INTO subscriptions (email, node, subs_auth, unsubs_auth, subscribed) VALUES ('%s', '%s', '%s', '%s', 0)
+CHECK_SUBS_AUTH_Q = "SELECT unsubs_auth FROM subscriptions WHERE subs_auth='%s'"
+ACK_SUB_Q = "UPDATE subscriptions SET subscribed=1 WHERE subs_auth='%s'"
+UNSUBSCRIBE_Q = "DELETE from subscriptions where unsubs_auth='%s'"
+GETALL_SUBS_Q = "SELECT * from subscriptions WHERE subscribed=1"
-      antispam_lock = threading.RLock()
-      antispam = {}      # a dict mapping IP to the number of recent unanswered requests allowed
-                         # from that IP
-      antispam_min = 2
-      antispam_max = 10
+class WeatherIndex(resource.Resource):
+    def render(self, request):
+        return open("subscribe.template").read()
-  except:
-    print "Unable to get lock on database"
-    exit()
+class SubscribeRequest(resource.Resource):
+    def __init__(self, dbConn):
+        self.dbConn = dbConn
+        resource.Resource.__init__(self)
-# these may or may not be better than storing pickles with gdbm
+    def render(self, request):
+        self.request = request
+        self.email = request.args['email'][0]
+        self.node = request.args['node'][0]
-class DatabaseError(Exception):
-  pass
+        if not self._checkMail():
+            return "Error: Bad email address '%s'" % self.email
-def parse_subscriptions(node, subs):
-  "Turn a string in the db back into a list of pairs"
-  words = subs[node].split()
-  try:
-    return [ (words[i], words[i+1]) for i in xrange(0, len(words), 2) ]
-  except IndexError:
-    raise DatabaseError, words
+        self.subs_auth = self._getRandString()
+        self.unsubs_auth = self._getRandString()
+        self._isSubscribedAlready()
+        return server.NOT_DONE_YET
-def delete_sub(pair, sub, node):
-  "Craziness to delete pair from a string in the subscriptions db"
-  # regexps probably aren't easily made safe here
-  words = sub[node].split()
-  if (len(words) % 2 != 0):
-    raise DatabaseError, words
-  for n in xrange(len(words) / 2):
-    if pair[0] == words[n*2] and pair[1] == words[n*2 + 1]:
-      sub[node] = " ".join(words[:n*2] + words[n*2 + 2:])
-      break
-  else:
-    raise DatabaseError, pair
-  sub.sync()
-def randstring():
-  # This is where we sometimes return '-' and we shouldn't
-  "Produce a random alphanumeric string for authentication"
-  theory = base64.urlsafe_b64encode(os.urandom(18))[:-1]
-  if theory[-1] == "-":    # some email clients don't like URLs ending in -
-    theory[-1] = 'x'
-  return theory
+    def _isSubscribedAlready(self):
+        dbQuery = CHECK_SUBS_Q % (self.email, self.node)
+        q = self.dbConn.runQuery(dbQuery)
+        q.addCallback(self._checkHasSubRet)
+        q.addErrback(self._errback)
-subscribe_text = \
-"""Dear human, this is the Tor Weather Report system.  
+    def _checkHasSubRet(self, result):
+        # Do we already have a subscription for this address for this node?
+        if len(result) is not 0:
+            self.request.setResponseCode(http.OK)
+            self.request.write("Error: Already subscribed.")
+            self.request.finish()
+        else:
+            # Alright, subscribe it
+            return self._runSaveQuery()
-Somebody (possibly you) has requested that status monitoring information about 
-a tor node (id: %s) 
-be sent to this email address.
+    def _runSaveQuery(self):
+        dbQuery = INSERT_SUBS_Q % (self.email, self.node, \
+                              self.subs_auth, self.unsubs_auth)
+        q = self.dbConn.runOperation(dbQuery)
+        q.addCallback(self._saved)
+        q.addErrback(self._errback)
+        return q
-If you wish to confirm this request, please visit the following link:
+    def _saved(self, result):
+        # Back to index
+        #request.redirect("/")
+        url = URLbase + "/confirm-subscribe?auth=" + self.subs_auth
+        try:
+            self._sendConfirmationMail(url)
+        except Exception, e:
+            self.error = "Unknown error while sending confirmation mail." + \
+                         "Please try again later." + \
+                         "[Exception %s]" % sys.exc_info()[0]
+            self._rollBack()
+            return
+        self.request.setResponseCode(http.OK)
+        self.request.write(THANKS_OUT % self.email)
+        self.request.finish()
-If you do *not* wish to receive Tor Weather Reports, you do not need to do 
+    def _rollBack(self):
+        dbQuery = CHECK_SUBS_Q % (self.email, self.node)
+        q = self.dbConn.runQuery(dbQuery)
+        q.addCallback(self._errOut)
+        q.addErrback(self._errback)
-class subscribe:
-  def GET(self):
-    web.header('content-type', 'text/html')
-    print open("subscribe.template").read()
+    def _errOut(self, result):
+        self.request.setResponseCode(http.INTERNAL_SERVER_ERROR)
+        self.request.write(self.error)
+        self.request.finish()
-  whitespace = re.compile("\s*")
-  # XXX TODO: I think this is where we want to add new form values
-  # Specifically, we want to allow 'nickname' and 'nodenote'
-  # These strings are 50 chars max and should only allow [a-zA-Z0-9.,]
-  # Once we add this, we want to add some method of storing this user input
-  def POST(self):
-    web.header('content-type', 'text/html')
-    i = web.input(node="none",email="none")
-    if not self.check_email(i.email):
-      print 'That email address looks fishy to our refined sensibilities!'
-      if debug: print "(" + self.email_error + ")"
-      return True # XXX temp
+    def _errback(self, failure):
+        self.error = "Error: %s" % (failure.getErrorMessage())
+        self._errOut()
-    node_cleaned = self.whitespace.sub("", i.node) 
-    if not self.check_node_id(node_cleaned):
-      print "That doesn't look like a proper Tor node ID."
-      return True
+    def _sendConfirmationMail(self, url):
+        message = MIMEMultipart()
+        message['Subject'] = "Tor Weather Subscription Request"
+        message['To'] = self.email
+        message['From'] = mailFrom
-    if not self.allowed_to_subscribe(web.ctx.ip):
-      print "Sorry, too many recent unconfirmed subscriptions from your IP address."
-      return True
+        messageText = CONFIRMATION_MAIL % (self.node, url)
+        text = MIMEText(messageText, _subtype="plain", _charset="utf-8")
+        # Add text part
+        message.attach(text)
-    if not self.already_subscribed(i.email, node_cleaned):
-      self.send_confirmation_email(i.email, node_cleaned)
-    elif debug:
-      print "Sorry, I'm not subscribing you twice."
-    else:
-      # Leak no information about who is subscribed
-      print "Thank you for using Tor Weather.  A confirmation request has been sent to", i.email + "."
+        # Try to send
+        smtp = smtplib.SMTP("localhost:25")
+        smtp.sendmail(mailFrom, self.email, message.as_string())
+        smtp.quit()
-  # node ids are 40 digit hexidecimal numbers
-  node_okay = re.compile("(0x)?[a-fA-F0-9]{40}\Z")
+    def _getRandString(self):
+        """Produce a random alphanumeric string for authentication"""
+        r = base64.urlsafe_b64encode(os.urandom(18))[:-1]
+        # some email clients don't like URLs ending in -
+        if r[-1] == "-":    
+            r.replace("-", "x")
+        return r
-  def check_node_id(self, node):
-    if self.node_okay.match(node):
-      return True
-    else:
-      return False
-  random.seed()
-  def allowed_to_subscribe(self,ip):
-    "An antispam measure!"
-    antispam_lock.acquire()
-    try:
-      if antispam.has_key(ip):
-        if antispam[ip] == 0:
-          return False
+    def _checkMail(self):
+        # Unsure if this is enough
+        mailValidator = "^[a-zA-Z0-9._%-+]+@([a-zA-Z0-9._%-]+\\.[a-zA-Z]{2,6}$)"
+        mailOk = re.compile(mailValidator)
+        match = mailOk.match(self.email)
+        if match:
+            mailDomain = match.group(1)
+            return self._doDNSLookup(mailDomain)
-          antispam[ip] -= 1
-          return True
-      else:
-        # okay this is silly but leaks very slightly less information
-        antispam[ip] = random.randrange(antispam_min,antispam_max)
-        return True
-    finally:
-      antispam_lock.release()
+            return False
-  def already_subscribed(self, address, node):
-    gdbm_lock.acquire()
-    try:
+    def _doDNSLookup(self, mailDomain):
+        DNS.DiscoverNameServers()
+        querinator = DNS.Request(qtype='mx')
-          words = subscriptions[node].split()
-          if address in words:
-            already = True
-          else:
-            already = False
-        except KeyError:
-          already = False
+            dnsquery = querinator.req(mailDomain)
+        except DNS.DNSError, type:
+            if type == 'Timeout':
+                return False
+            else:
+                raise
+        if not dnsquery.answers:
+            # No DNS MX records for this domain
+            return False
-    finally:
-        gdbm_lock.release()
-    return already
-  def send_confirmation_email(self, address, node):
-    authstring = randstring()
+        return True
-    gdbm_lock.acquire()
-    try:
-        requests[authstring] = address + " " + node
-    finally:
-        gdbm_lock.release()
+    def _checkNode(self):
+        nodeOk = re.compile("(0x)?[a-fA-F0-9]{40}\Z")
+        if nodeOk.match(self.node):
+            return True
+        else:
+            return False
-    if dummy_testing:
-      print "gotcha"
-      return True
+class ConfirmSubscribeRequest(resource.Resource):
+    def __init__(self, dbConn):
+        self.dbConn = dbConn
+        resource.Resource.__init__(self)
-    #def f(s):
-    #  requests[authstring] = s
-    #gdbm_lock.lock(f, address + " " + node)
+    def render(self, request):
+        self.subs_auth = request.args['auth'][0]
+        self._lookupSubsAuth(request)
-    #url = web.ctx.homedomain + "/confirm-subscribe/" + authstring
-    url = URLbase + "/confirm-subscribe/" + authstring
+        return server.NOT_DONE_YET
-    import smtplib
-    from email.mime.text import MIMEText
-    msg= MIMEText(subscribe_text % (node, url))
-    s = smtplib.SMTP()
-    s.connect()
-    sender = weather_email
-    msg["Subject"] = "Tor weather subscription request"
-    msg["From"] = sender
-    msg["To"] = address
-    s.sendmail(sender, [address], msg.as_string())
-    s.close()
+    def _lookupSubsAuth(self, request):
+        dbQuery = CHECK_SUBS_AUTH_Q % self.subs_auth
+        q = self.dbConn.runQuery(dbQuery)
+        q.addCallback(self._checkRet, request)
+        q.addErrback(self._errback, request)
-    print "Thank you for using Tor Weather.  A confirmation request has been sent to", address + "."
-    #print url
+    def _checkRet(self, result, request):
+        if len(result) is 0:
+            request.setResponseCode(http.OK)
+            request.write("Error: No subscription with your auth code.")
+            request.finish()
+        else:
+            self.unsubs_auth = str(result[0][0])
+            return self._ackSubscription(request)
-  # Section 3.4.1 of RFC 2822 is much more liberal than this!
-  domain_okay = re.compile("[A-Za-z0-9\-\.']*")
-  local_okay = re.compile("[A-Za-z0-9\-_\.\+]*")
-  querinator=DNS.Request(qtype='mx')
-  email_error = None
+    def _ackSubscription(self, request):
+        dbQuery = ACK_SUB_Q % self.subs_auth
+        q = self.dbConn.runQuery(dbQuery)
+        q.addCallback(self._subDone, request)
+        q.addErrback(self._errback, request)
-  def check_email(self, address):
-    "Just check that address is not something fruity"
-    # This is wrong (see http://www.ex-parrot.com/~pdw/Mail-RFC822-Address.html)
-    # but it should prevent crazy stuff from being accepted
+    def _subDone(self, result, request):
+        url = URLbase + "/unsubscribe?auth=" + self.unsubs_auth
+        link = "<a href=\"" + url + "\">" + url + "</a>"
+        request.write("<p>Subscription finished. Thank you very much.")
+        request.write("You can unsubscribe anytime with the following link: ")
+        request.write(link)
+        #request.write(URLbase + "/unsubscribe?auth=" + self.unsubs_auth)
+        request.write("</p>")
+        request.finish()
-    if len(address) >= 80:
-      self.email_error = "We declare this address too long"
-      return False
-    atpos = address.find('@')
-    if atpos == -1:
-      self.email_error = "No @ symbol"
-      return False
+    def _errback(self, failure, request):
+        request.setResponseCode(http.INTERNAL_SERVER_ERROR)
+        request.write("Error: %s" % (failure.getErrorMessage()))
+        request.finish()
-    if address[atpos:].find('.') == -1:
-      self.email_error = "No .s after @"
-      return False
+class UnsubscribeRequest(resource.Resource):
+    def __init__(self, dbConn):
+        self.dbConn = dbConn
+        resource.Resource.__init__(self)
-    local = address[:atpos]
-    domain = address[atpos + 1:]
+    def render(self, request):
+        self.unsubs_auth = request.args['auth'][0]
+        self._deleteSub(request)
-    if self.local_okay.match(local).end() != len(local):
-      self.email_error = "unauthorised chars in local part"
-      return False
+        return server.NOT_DONE_YET
-    for component in domain.split("."):
-      l = len(component)
-      if l == 0:
-        self.email_error = "empty domain segment"
-        return False
-      if self.domain_okay.match(component).end() != l:
-        self.email_error = "unauthorised chars in domain, " + component
-        return False
+    def _deleteSub(self, request):
+        dbQuery = UNSUBSCRIBE_Q % self.unsubs_auth
+        q = self.dbConn.runQuery(dbQuery)
+        q.addCallback(self._deleteDone, request)
+        q.addErrback(self._errback, request)
-    # XXX it's not clear yet what exception handling this should do:
-    try:
-      dnsquery = self.querinator.req(domain)
-    except DNS.DNSError, type:
-      if type == 'Timeout':
-        self.email_error = "Can't find a DNS server!"
-        return False
-      else:
-        raise 
+    def _deleteDone(self, result, request):
+        request.setResponseCode(http.OK)
+        request.write("Subscription deleted. Goodbye.")
+        request.finish()
-    if not dnsquery.answers:
-      # No DNS MX records for this domain
-      self.email_error = "no MX records for domain"
-      return False
+    def _errback(self, failure, request):
+        request.setResponseCode(http.INTERNAL_SERVER_ERROR)
+        request.write("Error: %s" % (failure.getErrorMessage()))
+        request.finish()
-    return True
+class RootResource(resource.Resource):
+    def __init__(self, dbConn):
+        resource.Resource.__init__(self)
+        self.putChild('top-left.png', static.File("./top-left.png"))
+        self.putChild('top-middle.png', static.File("./top-middle.png"))
+        self.putChild('top-right.png', static.File("./top-right.png"))
+        self.putChild('stylesheet.css', static.File("./stylesheet.css"))
+        self.putChild('', WeatherIndex())
+        self.putChild('subscribe', SubscribeRequest(dbConn))
+        self.putChild('confirm-subscribe', ConfirmSubscribeRequest(dbConn))
+        self.putChild('unsubscribe', UnsubscribeRequest(dbConn))
-class confirm:
-  def GET(self,authstring):
-    web.header('content-type', 'text/html')
-    print "<html>"
-    if debug: print "checking confirmation..."
+class WeatherPoller():
+    def __init__(self, dbConn):
+        self.dbConn = dbConn
-    gdbm_lock.acquire()
-    try:
+    def poller(self):
+        print "Polling.."
+        self._checkAll()
+        return server.NOT_DONE_YET
-        if not requests.has_key(authstring):
-          print "Error in subscription request!"
-          return 0
+    def _checkAll(self):
+        dbQuery = GETALL_SUBS_Q
+        q = self.dbConn.runQuery(dbQuery)
+        q.addCallback(self._checkRet)
+        q.addErrback(self._errBack)
-        email, node = requests[authstring].split()
+    def _checkRet(self, resultList):
+        print "Checkret"
+        for result in resultList:
+            print "Result: ", result
+            checkHost = result[2]
+            if not self._checkHost(checkHost):
+                print "Server %s seems to be offline" % checkHost
+    def _errBack(self, failure):
+        print "Error: ", failure.getErrorMessage()
-        # We want a single link in every outgoing email that will unsubscribe that
-        # user.  But we don't want to generate a new database entry every time
-        # an email gets sent.  So do it now, and remember the token.
-        unsub_authstring = randstring()
-        subscription = email + " " + node + " " + unsub_authstring
-        unsubscriptions[unsub_authstring] = subscription
-        subscription2 = email + " " + unsub_authstring
-        if subscriptions.has_key(node):
-          subscriptions[node] += " " +subscription2
-        else:
-          subscriptions[node] = subscription2
-        url = web.ctx.homedomain + "/unsubscribe/" + unsub_authstring
-        print "Succesfully subscribed <tt>", email, 
-        print "</tt> to weather reports about Tor node", node
-        print "<p>You can unsubscribe at any time by clicking on the following link:"
-        print '<p><a href="' + url + '">' + url + '</a>'
-        print '<p>(you will be reminded of it in each weather report we send)'
-        del(requests[authstring])
-        subscriptions.sync()
+    def _checkHost(self, hostID):
+        print "Checking host %s" % hostID
+        torPing = TorPing()
+        return torPing.ping(hostID)
-    finally:
-        gdbm_lock.release()
+class TorPing:
+  "Check to see if various tor nodes respond to SSL hanshakes"
+  def __init__(self, control_host = "", control_port = 9051):
+    "Keep the connection to the control port lying around"
+    self.control_host = control_host
+    self.control_port = control_port
+    self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    self.sock.connect((control_host,control_port))
+    self.control = TorCtl.Connection(self.sock)
+    self.control.authenticate("")
+    self.control.debug(debugfile)
-    # okay now slacken antispam watch
-    antispam_lock.acquire()
-    try:
-      if antispam.has_key(web.ctx.ip):
-        antispam[web.ctx.ip] += 1
-        if antispam[web.ctx.ip] >= antispam_max:
-          del antispam[web.ctx.ip]
-    finally:
-      antispam_lock.release()
-class unsubscribe:
-  def GET(self,authstring):
+  def __del__(self):
+    self.sock.close()
+    del self.sock
+    self.sock = None                # prevents double deletion exceptions
-    web.header('content-type', 'text/html')
-    gdbm_lock.acquire()
+    # it would be better to fix TorCtl!
-        if not unsubscriptions.has_key(authstring):
-          print "Invalid unsubscription request!"
-          print unsubscriptions
-          return 0
+      self.control.close()
+    except:
+      pass
-        email, node, _ = unsubscriptions[authstring].split()
+    del self.control
+    self.control = None
-        delete_sub ((email, authstring), subscriptions, node)
-        #subscriptions[node].remove((email,authstring))
-        print "<html>Succesfully unsubscribed <tt>", email, 
-        print "</tt> from weather reports about Tor node", node, "</html>"
-        del (unsubscriptions[authstring])
+  def ping(self, node_id):
+    "Let's see if this tor node is up by only asking Tor."
+    string = "ns/id/" + node_id
+    try:
+       info = self.control.get_info(string)
+    except TorCtl.ErrorReply:
+        # If we're getting here, we're likely seeing:
+        # ErrorReply: 552 Unrecognized key "ns/id/46D9..."
+        # This means that the node isn't recognized by 
+       x = traceback.format_exc()
+       print x
+       info = None
+       return False
-    finally:
-        gdbm_lock.release()
+    except:
+        # Remove this, it's a hack to debug this specific bug
+        x = traceback.format_exc()
+        print x
+        info = None
+        return False
-class AntispamRelaxer(threading.Thread):
-  "Prevent long term accretion of antispam counts."
-  timescale = 24 * 3600          # sleep for up to a day
-  def run(self):
-    while True:
-      time.sleep(random.randrange(0,self.timescale))
-      antispam_lock.acquire()
-      try:
-        for ip in antispam.keys():
-          antispam[ip] += 1
-          if antispam[ip] == antispam_max:
-            del antispam[ip]
-      finally:
-        antispam_lock.release()
+    # If we're here, we were able to fetch information about the router
+    return True
 def main():
-  from poll import WeatherPoller
-  weather_reports = WeatherPoller(subscriptions, failures, gdbm_lock)
-  weather_reports.start()                 # this starts another thread
+    # Set up database connection
+    dbConn = adbapi.ConnectionPool("sqlite3", "subscriptions.db")
+    # Set up polling timer
+    weatherPoller = WeatherPoller(dbConn)
+    pollTimer = LoopingCall(weatherPoller.poller)
+    pollTimer.start(pollPeriod)
+    # Set up webserver
+    weatherSite = server.Site(RootResource(dbConn))
+    reactor.listenTCP(8000, weatherSite)
+    reactor.run()
-  relaxant = AntispamRelaxer()
-  relaxant.start()
-  web.run(urls, globals())
-if __name__ == "__main__": 
-  main()
+if __name__ == "__main__":
+    main()