[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[or-cvs] r17288: {updater} have some more thandy. This update includes a working downlo (in updater/trunk/lib/thandy: . packagesys)



Author: nickm
Date: 2008-11-16 15:15:34 -0500 (Sun, 16 Nov 2008)
New Revision: 17288

Added:
   updater/trunk/lib/thandy/packagesys/
   updater/trunk/lib/thandy/packagesys/ExePackages.py
   updater/trunk/lib/thandy/packagesys/PackageDB.py
   updater/trunk/lib/thandy/packagesys/PackageSystem.py
   updater/trunk/lib/thandy/packagesys/RPMPackages.py
   updater/trunk/lib/thandy/packagesys/__init__.py
   updater/trunk/lib/thandy/socksurls.py
Modified:
   updater/trunk/lib/thandy/ClientCLI.py
   updater/trunk/lib/thandy/__init__.py
   updater/trunk/lib/thandy/download.py
   updater/trunk/lib/thandy/formats.py
   updater/trunk/lib/thandy/repository.py
   updater/trunk/lib/thandy/util.py
Log:
have some more thandy.  This update includes a working downloader with tor support, a package system framework, and more.  Most of what's left is glue code.

Modified: updater/trunk/lib/thandy/ClientCLI.py
===================================================================
--- updater/trunk/lib/thandy/ClientCLI.py	2008-11-16 18:25:20 UTC (rev 17287)
+++ updater/trunk/lib/thandy/ClientCLI.py	2008-11-16 20:15:34 UTC (rev 17288)
@@ -33,11 +33,11 @@
 
     for f in files:
         # XXXX Use hash.
-        dj = thandy.download.DownloadJob(f, repo.getFilename(f),
-                                         mirrorlist)
+        dj = thandy.download.ThandyDownloadJob(f, repo.getFilename(f),
+                                               mirrorlist)
         downloader.addDownloadJob(dj)
         # XXXX replace file in repository if ok; reload; see what changed.
-    
+
     # Wait for in-progress jobs
 
 # Check my repository
@@ -48,6 +48,7 @@
 
 # Tell me what to install.
 
+
 def usage():
     print "Known commands:"
     print "  update [--repo=repository] [--no-download]"
@@ -58,7 +59,7 @@
         usage()
     cmd = sys.argv[1]
     args = sys.argv[2:]
-    if cmd in [ "update" ]:
+    if cmd in [ "update", "geturls" ]:
         globals()[cmd](args)
     else:
         usage()

Modified: updater/trunk/lib/thandy/__init__.py
===================================================================
--- updater/trunk/lib/thandy/__init__.py	2008-11-16 18:25:20 UTC (rev 17287)
+++ updater/trunk/lib/thandy/__init__.py	2008-11-16 20:15:34 UTC (rev 17288)
@@ -34,3 +34,5 @@
 class UnknownMethod(CryptoError):
     pass
 
+class DownloadError(Exception):
+    pass

Modified: updater/trunk/lib/thandy/download.py
===================================================================
--- updater/trunk/lib/thandy/download.py	2008-11-16 18:25:20 UTC (rev 17287)
+++ updater/trunk/lib/thandy/download.py	2008-11-16 20:15:34 UTC (rev 17288)
@@ -1,28 +1,50 @@
 # Copyright 2008 The Tor Project, Inc.  See LICENSE for licensing information.
 
-import urllib2
 import httplib
+import logging
+import os
+import Queue
 import random
+import sys
+import threading
+import traceback
+import urllib2
 
-import threading, Queue
-
 import thandy.util
+import thandy.socksurls
 
-class Downloads:
+
+class DownloadManager:
+    """Class to track a set of downloads and pass them out to worker threads.
+    """
     def __init__(self, n_threads=2):
+        # Prevents concurrent modification to downloads and haveDownloaded
         self._lock = threading.RLock()
+        # Map from resource relPath to job.
         self.downloads = {}
+        # Map from resource relPath from True to objects that we have
+        # managed to dowload.
         self.haveDownloaded = {}
+        # Work queue of DownloadJobs that we intend to process once a thread
+        # is free.
         self.downloadQueue = Queue.Queue()
-        self.threads = [ threading.Thread(target=self._thread) ]
+        # List of worker threads.
+        self.threads = [ threading.Thread(target=self._thread, args=[idx])
+                         for idx in xrange(n_threads) ]
+        # Condition that gets triggered whenever a thread is finished doing
+        # something.
+        self.done = threading.Condition()
         for t in self.threads:
             t.setDaemon(True)
 
     def start(self):
+        """Start all of this download manager's worker threads."""
         for t in self.threads:
             t.start()
 
     def isCurrentlyDownloading(self, relPath):
+        """Return true iff this download manager is currently downloading
+           some copy of the resource at relPath."""
         self._lock.acquire()
         try:
             return self.downloads.has_key(relPath)
@@ -30,6 +52,9 @@
             self._lock.release()
 
     def isRedundant(self, relPath):
+        """Return true iff we are currently downloading, or have
+           downloaded, the resource at relPath."""
+
         self._lock.acquire()
         try:
             return (self.downloads.has_key(relPath) or
@@ -37,91 +62,285 @@
         finally:
             self._lock.release()
 
+    def finished(self):
+        """Return true iff we have no active or pending jobs."""
+        self._lock.acquire()
+        try:
+            return downloadQueue.empty() and len(self.downloads) == 0
+        finally:
+            self._lock.release()
+
+    def wait(self):
+        """Pause until we have no active or pending jobs."""
+        while True:
+            self.done.acquire()
+            self.done.wait()
+            self.done.release()
+
+            if self.finished():
+                break
+
     def addDownloadJob(self, job):
+        """Add another DownloadJob to the end of the work queue."""
         rp = job.getRelativePath()
         self._lock.acquire()
         self.downloads[rp] = job
         self._lock.release()
         self.downloadQueue.put(job)
 
-    def _thread(self):
+    def downloadFailed(self, mirror, relpath):
+        """DOCDOC"""
+        pass # Track failure; don't try the same mirror right away.
+
+    def _thread(self, idx):
+        # Run in the background per thread.  idx is the number of the thread.
         while True:
-            job = self.downloadQueue.get()
-            job.download()
+            job = self.downloadQueue.get() # Grab job from queue.
             rp = job.getRelativePath()
-            self._lock.acquire()
             try:
-                del self.downloads[rp]
-                self.haveDownloaded[rp] = True
+                logging.info("start %s in Thread %s", rp, idx)
+                success = job.download() # Execute the download.
+                logging.info("end %s in Thread %s", rp, idx)
             finally:
-                self._lock.release()
+                self._lock.acquire()
+                try:
+                    del self.downloads[rp]
+                    if success: # If we downloaded correctly, say so.
+                        self.haveDownloaded[rp] = True
+                finally:
+                    self._lock.release()
 
+                self.done.acquire()
+                self.done.notify()
+                self.done.release()
+
 class DownloadJob:
-    def __init__(self, relPath, destPath, mirrorlist=None,
-                 wantHash=None, canStall=False):
-        self._relPath = relPath
+    """Abstract base class.  Represents a thing to be downloaded, and the
+       knowledge of how to download it."""
+    def __init__(self, targetPath, tmpPath, wantHash=None, useTor=False):
+        """Create a new DownloadJob.  When it is finally downloaded,
+           store it in targetPath.  Store partial results in tmpPath;
+           if there is already a file in tmpPath, assume that it is an
+           incomplete download. If wantHash, reject the file unless
+           the hash is as given.  If useTor, use a socks connection."""
+
+        self._destPath = targetPath
+        self._tmpPath = tmpPath
         self._wantHash = wantHash
-        self._mirrorList = mirrorlist
-        self._destPath = destPath
+        self._useTor = useTor
 
+    def getURL(self):
+        """Abstract implementation helper.  Returns the URL that the
+           _download function downloads from."""
+        raise NotImplemented()
+
+    def getRelativePath(self):
+        """Abstract. Returns a string representing this download, to
+           keep two downloads of the same object from running at once.
+           In Thandy, this is usually a relative path of a downloaded
+           object within the repository.
+        """
+        raise NotImplemented()
+
+    def haveStalledFile(self):
+        """Return true iff we have an existing incomplete download stored in
+           the temporary file.
+        """
+        return os.path.exists(self._tmpPath)
+
+    def download(self):
+        """Main interface function: Start the download, and return
+           when complete.
+        """
+        try:
+            self._download()
+            return True
+        except (OSError, thandy.DownloadError), err:
+            # XXXXX retry on failure
+            logging.warn("Download failed: %s", err)
+            return False
+        except:
+            tp, val, tb = sys.exc_info()
+            logging.warn("Internal during download: %s, %s", val,
+                         traceback.format_exc())
+            sys.exit(1)
+
+
+    def _download(self):
+        # Implementation function.  Unlike download(), can throw exceptions.
+        f_in = f_out = None
+
+        try:
+            url = self.getURL()
+
+            logging.info("Downloading %s", url)
+
+            if self.haveStalledFile():
+                have_length = os.stat(self._tmpPath).st_size
+                print "Have stalled file with %s bytes"%have_length
+            else:
+                have_length = None
+
+            f_in = getConnection(url, self._useTor, have_length)
+
+            logging.info("Connected to %s", url)
+
+            gotRange = f_in.info().get("Content-Range")
+            expectLength = f_in.info().get("Content-Length", "???")
+            if gotRange:
+                if gotRange.startswith("bytes %s-"%(have_length+1)):
+                    logging.info("Resuming download from %s"%url)
+                    f_out = open(self._tmpPath, 'a')
+                else:
+                    raise thandy.DownloadError("Got an unexpected range %s"
+                                               %gotRange)
+            else:
+                f_out = open(self._tmpPath, 'w')
+
+            total = 0
+            while True:
+                c = f_in.read(1024)
+                if not c:
+                    break
+                f_out.write(c)
+                total += len(c)
+                logging.debug("Got %s/%s bytes from %s",
+                              total, expectLength, url)
+
+        finally:
+            if f_in is not None:
+                f_in.close()
+            if f_out is not None:
+                f_out.close()
+
+        if self._wantHash:
+            gotHash = thandy.formats.getFileDigest(self._tmpPath)
+            if gotHash != self._wantHash:
+                raise thandy.DownloadError("File hash was not as expected.")
+
+        thandy.util.moveFile(self._tmpPath, self._destPath)
+
+
+class SimpleDownloadJob(DownloadJob):
+    """Testing subtype of DownloadJob: just downloads a URL and writes it to
+       disk."""
+    def __init__(self, targetPath, url,
+                 wantHash=None, supportedURLTypes=None, useTor=False):
+        DownloadJob.__init__(self, targetPath, targetPath+".tmp",
+                                 wantHash=wantHash,
+                                 useTor=useTor)
+        self._url = url
+
+    def getURL(self):
+        return self._url
+
+    def getRelativePath(self):
+        return self._url
+
+class ThandyDownloadJob(DownloadJob):
+    """Thandy's subtype of DownloadJob: knows about mirrors, weighting,
+       and Thandy's directory structure."""
+    def __init__(self, relPath, destPath, mirrorList, wantHash=None,
+                 supportedURLTypes=None, useTor=None):
+
+        DownloadJob.__init__(self, destPath, None, wantHash=wantHash,
+                             useTor=useTor)
+        self._mirrorList = mirrorList[:]
+        self._relPath = relPath
+
         tmppath = thandy.util.userFilename("tmp")
         if relPath.startswith("/"):
             relPath = relPath[1:]
-        self._tmppath = os.path.join(tmppath, relPath)
+        self._tmpPath = os.path.join(tmppath, relPath)
 
-        d = os.path.dirname(self._tmppath)
+        d = os.path.dirname(self._tmpPath)
         if not os.path.exists(d):
             os.makedirs(d, 0700)
 
-    def getRelativePath(self):
-        return self._relPath
+        self._supportedURLTypes = None
+        if self._supportedURLTypes is None and useTor:
+            self._supportedURLTypes = [ "http", "https" ]
 
-    def haveStalledFile(self):
-        return os.path.exists(self._tmppath)
 
-    def getURL(self, mirrorlist=None):
-        if mirrorlist is None:
-            mirrorlist = self._mirrorList
-        weightSoFar = 0
+    def getURL(self):
         usable = []
 
-        for m in mirrorlist['mirrors']:
+        for m in self._mirrorList['mirrors']:
             for c in m['contents']:
-                # CHECK FOR URL SUITABILITY XXXXX
 
+                if self._supportedURLTypes is not None:
+                    urltype = urllib2.splittype(m['urlbase'][0])
+                    if urltype.lower() not in self._supportedURLTypes:
+                        continue
+
                 if thandy.formats.rolePathMatches(c, self._relPath):
-                    weightSoFar += m['weight']
-                    usable.append( (weightSoFar, m) )
+                    usable.append( (m['weight'], m) )
                     break
 
-        wTarget = random.randint(0, weightSoFar)
-        mirror = None
-        # Could use bisect here instead
-        for w, m in mirrorlist:
-            if w >= wTarget:
-                mirror = m
-                break
+        mirror = thandy.util.randChooseWeighted(usable)
 
         return m['urlbase'] + self._relPath
 
-    def download(self):
-        # XXXX RESUME
+    def getRelativePath(self):
+        return self._relPath
 
-        f_in = urllib2.urlopen(self.getURL())
-        f_out = open(self._tmpPath, 'w')
-        while True:
-            c = f_in.read(1024)
-            if not c:
-                break
-            f_out.write(c)
-        f_in.close()
-        f_out.close()
-        # XXXXX retry on failure
 
-        if self._wantHash:
-            gotHash = thandy.formats.getFileDigest(self._tmpPath)
-            if gotHash != self._wantHash:
-                # XXXX Corrupt file.
-                pass
+_socks_opener = thandy.socksurls.build_socks_opener()
 
-        thandy.utils.moveFile(self._tmpPath, self._destPath)
+def getConnection(url, useTor, have_length=None):
+    """Open a connection to 'url'.  We already have received
+       have_length bytes of the file we're trying to fetch, so resume
+       if possible.
+
+    """
+    headers = {}
+    urltype = urllib2.splittype(url)[0]
+    is_http = urltype in ["http", "https"]
+
+    if have_length is not None and is_http:
+        headers['Range'] = "bytes=%s-"%(have_length+1)
+
+    req = urllib2.Request(url, headers=headers)
+
+    if useTor:
+        conn = _socks_opener.open(req)
+    else:
+        conn = urllib2.urlopen(req)
+
+    return conn
+
+
+if __name__ == '__main__':
+    # Trivial CLI to test out downloading.
+
+    import getopt
+    options, args = getopt.getopt(sys.argv[1:], "",
+                                  ["use-tor", "socksport=", "threads="])
+
+    useTor = False
+    socksPort = 9050
+    nThreads = 2
+    for o,v in options:
+        if o == "--use-tor":
+            useTor = True
+        elif o == "--socksport":
+            socksPort = int(v)
+        elif o == "--threads":
+            nThreads = int(v)
+
+    logging.basicConfig(level=logging.DEBUG)
+
+    if useTor:
+        thandy.socksurls.setSocksProxy("127.0.0.1", socksPort)
+
+    manager = DownloadManager(nThreads)
+
+    for url in args:
+        fn = urllib2.splithost(urllib2.splittype(url)[1])[1]
+        fn = os.path.split(fn)[1]
+
+        job = SimpleDownloadJob(fn, url, useTor=useTor)
+        manager.addDownloadJob(job)
+
+    manager.start()
+    manager.wait()

Modified: updater/trunk/lib/thandy/formats.py
===================================================================
--- updater/trunk/lib/thandy/formats.py	2008-11-16 18:25:20 UTC (rev 17287)
+++ updater/trunk/lib/thandy/formats.py	2008-11-16 20:15:34 UTC (rev 17288)
@@ -258,9 +258,9 @@
         return digestObj.digest()
 
 def getFileDigest(f, digestObj=None):
-    """Update 'digestObj' (typically a SHA256 object) with the digest of
-       the file object in f.  If digestObj is none, compute the SHA256
-       hash and return it.
+    """Update 'digestObj' (typically a SHA256 object) with the digest
+       of the file object (or filename) in f.  If digestObj is none,
+       compute the SHA256 hash and return it.
 
        >>> s = "here is a long string"*1000
        >>> import cStringIO, Crypto.Hash.SHA256
@@ -271,15 +271,23 @@
        >>> h1.digest() == h2.digest()
        True
     """
+    f_to_close = None
+    if isinstance(f, basestring):
+        t_to_close = f = open(f, 'rb')
+
     useTempDigestObj = (digestObj == None)
     if useTempDigestObj:
         digestObj = Crypto.Hash.SHA256.new()
 
-    while 1:
-        s = f.read(4096)
-        if not s:
-            break
-        digestObj.update(s)
+    try:
+        while 1:
+            s = f.read(4096)
+            if not s:
+                break
+            digestObj.update(s)
+    finally:
+        if f_to_close != None:
+            f_to_close.close()
 
     if useTempDigestObj:
         return digestObj.digest()

Added: updater/trunk/lib/thandy/packagesys/ExePackages.py
===================================================================
--- updater/trunk/lib/thandy/packagesys/ExePackages.py	                        (rev 0)
+++ updater/trunk/lib/thandy/packagesys/ExePackages.py	2008-11-16 20:15:34 UTC (rev 17288)
@@ -0,0 +1,33 @@
+# Copyright 2008 The Tor Project, Inc.  See LICENSE for licensing information.
+
+import thandy.packagesys.PackageSystem as ps
+import thandy.packagesys.PackageDB as pdb
+
+class ExePackageSystem(pdb.DBBackedPackageSystem):
+
+    def getName(self):
+        return "executable"
+
+    def packageHandleFromJSON(self, json):
+        raise NotImplemented()  #XXXX????
+
+    def canBeAutomatic(self):
+        return True
+
+    def canHaveUI(self):
+        return True
+
+class ExePackageHandle(pdb.DBBackedPackageHandle):
+    def __init__(self, packageDB, name, version, filelist, filename,
+                 arguments):
+        pdb.DBBackedPackageHandle.__init__(packageDB, name, version, filelist)
+        self._filename = filename
+        self._arguments = arguments
+
+    def _doInstall(self):
+        commandline = [ self._filename ] + self._arguments
+        logging.info("Installing %s.  Command line: %s", self._filename,
+                     commandLine)
+        subprocess.call(commandline)
+
+

Added: updater/trunk/lib/thandy/packagesys/PackageDB.py
===================================================================
--- updater/trunk/lib/thandy/packagesys/PackageDB.py	                        (rev 0)
+++ updater/trunk/lib/thandy/packagesys/PackageDB.py	2008-11-16 20:15:34 UTC (rev 17288)
@@ -0,0 +1,81 @@
+# Copyright 2008 The Tor Project, Inc.  See LICENSE for licensing information.
+
+import anydbm
+import shelve
+
+import thandy.util
+import thandy.formats
+
+class SimplePackageDB:
+
+    def __init__(self, filename):
+        self._db = anydbm.open(filename, 'c')
+
+    def setVersion(self, package, version, filelist):
+        pass
+
+    def setInstallParameters(self, package, params):
+        pass
+
+    def getCurVersion(self, package):
+        pass
+
+    def getInstallParameters(self, package):
+        pass
+
+
+class DBBackedPackageSystem(thandy.packagesys.PackageSystem):
+    def __init__(self, packageDB):
+        self._packageDB = packageDB
+
+class DBBackedPackageHandle(thandy.packagesys.PackageHandle):
+    def __init__(self, packageDB, name, version, filelist):
+        thandy.packagesys.PackageSystem.PackageHandle.__init__(self)
+        self._packageDB = packageDB
+        self._name = name
+        self._version = version
+        self._filelist = filelist
+
+        self._metaData = None
+
+    def _getInstallBase(self):
+        raise NotImplemented()
+
+    def anyVersionInstalled(self, transaction=None):
+        return self._packageDB.getCurVersion(self._name) != None
+
+    def getInstalledVersion(self, transaction=None):
+        return self._packageDB.getCurVersion(self._name)
+
+    def install(self):
+        params = self._doInstall()
+        self._packageDB.setCurVersion(
+            self._name, self._version, self._filelist)
+        self._packageDB.setInstallParameters(self._name, params)
+
+    def _doInstall(self):
+        raise NotImplemented()
+
+    def isInstalled(self):
+        return self.getInstalledVersion(self, transaction) == self._version
+
+    def checkInstall(self):
+        base = self._getInstallBase()
+
+        all_ok = True
+        for fn, hash in self._filelist:
+            fn = os.path.join(base, fn)
+            if not os.path.exists(fn):
+                all_ok = False
+            else:
+                f = open(fn, 'rb')
+                try:
+                    try:
+                        d = thandy.formats.getFileDigest(f)
+                    except OSError:
+                        all_ok = False
+                        break
+                finally:
+                    f.close()
+
+        return all_ok

Added: updater/trunk/lib/thandy/packagesys/PackageSystem.py
===================================================================
--- updater/trunk/lib/thandy/packagesys/PackageSystem.py	                        (rev 0)
+++ updater/trunk/lib/thandy/packagesys/PackageSystem.py	2008-11-16 20:15:34 UTC (rev 17288)
@@ -0,0 +1,58 @@
+# Copyright 2008 The Tor Project, Inc.  See LICENSE for licensing information.
+
+class PackageSystem:
+    def getName(self):
+        raise NotImplemented()
+
+    def packageHandleFromJSON(self, json):
+        raise NotImplemented()
+
+    def canBeAutomatic(self):
+        return True
+
+    def canHaveUI(self):
+        return False
+
+    def getTransaction(self):
+        return PackageTransaction()
+
+class PackageTransaction:
+    def __init__(self):
+        self._transactions = []
+
+    def _start(self):
+        pass
+
+    def _commit(self):
+        pass
+
+    def run(self):
+        self._start()
+        for cb in self._transactions:
+            cb(self)
+        self._commit()
+
+    def addInstall(self, packageHandle):
+        self._transactions.append(packageHandle.install)
+
+    def addRemove(self, packageHandle):
+        self._transactions.append(packageHandle.remove)
+
+class PackageHandle:
+    def isInstalled(self, transaction=None):
+        raise NotImplemented()
+
+    def anyVersionInstalled(self, transaction=None):
+        raise NotImplemented()
+
+    def getInstalledVersion(self, transaction=None):
+        raise NotImplemented()
+
+    def install(self, transaction):
+        raise NotImplemented()
+
+    def remove(self, transaction):
+        raise NotImplemented()
+
+    def checkInstall(self, transaction=None):
+        raise NotImplemented()

Added: updater/trunk/lib/thandy/packagesys/RPMPackages.py
===================================================================
--- updater/trunk/lib/thandy/packagesys/RPMPackages.py	                        (rev 0)
+++ updater/trunk/lib/thandy/packagesys/RPMPackages.py	2008-11-16 20:15:34 UTC (rev 17288)
@@ -0,0 +1,156 @@
+# Copyright 2008 The Tor Project, Inc.  See LICENSE for licensing information.
+
+import thandy.packagesys.PackageSystem
+
+import os
+import rpm
+import md5
+
+__all__ = [ 'RPMPackageSystem' ]
+
+class RPMPackageSystem(thandy.packagesys.PackageSystem.PackageSystem):
+    def getName(self):
+        return "RPM"
+
+    def packageHandleFromJSON(self, json):
+        raise NotImplemented() # XXXX
+
+    def getTransaction(self):
+        return RPMPackageTransaction()
+
+_CALLBACK_CODES = {}
+
+for name in dir(rpm):
+    if name.startswith("RPMCALLBACK_"):
+        _CALLBACK_CODES[getattr(rpm, name)] = name[12:]
+del name
+
+class RPMPackageTransaction(thandy.packagesys.PackageSystem.PackageTransaction):
+
+    def _start(self):
+        thandy.packagesys.PackageSystem.PackageTransaction.__init__(self)
+        self._tset = rpm.TransactionSet()
+
+    def _commit(self):
+        self._tset.run(self._callback, "")
+
+    def _callback(self, what, amount, total, mydata, _):
+        if what == rpm.RPMCALLBACK_INST_OPEN_FILE:
+            hdr, path = mydata
+            logging.info("Installing RPM for %s [%s]", hdr['name'], path)
+
+        elif what == rpm.RPMCALLBACK_INST_CLOSE_FILE:
+            hdr, path = mydata
+            logging.info("Done installing RPM for %s", path)
+
+        elif what == rpm.RPMCALLBACK_INST_PROGRESS:
+            hdr, path = mydata
+            logging.info("%s: %.5s%% done", name, float(amount)/total*100)
+
+        else:
+            hdr, path = mydata
+            logging.info("RPM event %s on %s [%s/%s]",
+                         _CALLBACK_CODES.get(what,str(what)),
+                         hdr['name'], amount, total)
+
+def addRPMInstall(ts, path):
+    fd = os.open(path, os.O_RDONLY)
+    try:
+        hdr = ts.hdrFromFdno(fd)
+    finally:
+        os.close(fd)
+    ts.addInstall(hdr, (hdr, path), "u")
+
+def addRPMErase(ts, name):
+    ts.addErase(name)
+
+def getInstalledRPMVersions(name, ts=None):
+    if ts is None:
+        ts = rpm.TransactionSet()
+        #XXXX need to close?
+
+    versions = set()
+    for match in ts.dbMatch(rpm.RPMTAG_NAME, name):
+        versions.add(match['version'])
+
+    return versions
+
+def fileMD5(fname):
+    d = md5.new()
+    try:
+        f = open(fname, 'r')
+        try:
+            while 1:
+                s = f.read(4096)
+                if not s:
+                    break
+                d.update(s)
+
+        finally:
+            f.close()
+    except OSError, e:
+        logging.warn("Couldn't get digest of %s: %s", fname, e)
+        return None
+
+    return d.hexdigest()
+
+def checkRPMInstall(name, version, ts=None):
+    if ts is None:
+        ts = rpm.TransactionSet()
+        #XXXX need to close?
+
+    found = False
+    all_ok = True
+
+    for h in ts.dbMatch(rpm.RPMTAG_NAME, name):
+        if h['version'] != version:
+            continue
+
+        found = True
+
+        for fname, flags, md5sum in zip(h['filenames'], h['fileflags'], h['filemd5s']):
+            haveMD5 = fileMD5(fname)
+            if not haveMD5:
+                if flags & RPMFILE_MISSINGOK:
+                    logging.info("%s is missing or unreadable from %s %s; "
+                                 "that's ok.", fname, name, h['version'])
+                else:
+                    logging.warn("%s is missing or unreadable from %s %s."
+                                 fname, name, h['version'])
+                    all_ok = False
+            elif haveMD5 == md5sum:
+                logging.info("%s is unchanged from %s %s",
+                             fname, name, h['version'])
+            else:
+                # file changed.  If it's not configuration, that's a problem.
+                if not flags & RPMFILE_CONFIG:
+                    logging.warn("%s changed from installed version of %s %s",
+                                 fname, name, h['version'])
+                    all_ok = False
+
+    return found and all_ok
+
+class RPMPackageHandle(thandy.packagesys.PackageSystem.PackageHandle):
+    def __init__(self, name, version, filename):
+        self._name = name
+        self._version = version
+        self._filename = filename
+
+    def anyVersionInstalled(self, transaction=None):
+        return len(getInstalledRPMVersions(self.name, transaction)) > 1
+
+    def getInstalledVersion(self, transaction=None):
+        s = max(getInstalledRPMVersions(self._name, transaction))
+
+    def install(self, transaction):
+        addRPMInstall(transaction._trans, self._filename)
+
+    def remove(self, transaction):
+        addRPMErase(transaction._trans, self._name)
+
+    def isInstalled(self, transaction=None):
+        return self._version in getInstalledRPMVersions(self._name,transaction)
+
+    def checkInstall(self, transaction=None):
+        return checkRPMInstall(self._name, self._version)
+

Added: updater/trunk/lib/thandy/packagesys/__init__.py
===================================================================
--- updater/trunk/lib/thandy/packagesys/__init__.py	                        (rev 0)
+++ updater/trunk/lib/thandy/packagesys/__init__.py	2008-11-16 20:15:34 UTC (rev 17288)
@@ -0,0 +1,4 @@
+# Copyright 2008 The Tor Project, Inc.  See LICENSE for licensing information.
+
+__all__ = [ ]
+

Modified: updater/trunk/lib/thandy/repository.py
===================================================================
--- updater/trunk/lib/thandy/repository.py	2008-11-16 18:25:20 UTC (rev 17287)
+++ updater/trunk/lib/thandy/repository.py	2008-11-16 20:15:34 UTC (rev 17288)
@@ -16,8 +16,18 @@
 MAX_TIMESTAMP_AGE = 24*60*60
 
 class RepositoryFile:
+    """Represents information about a file stored in our local repository
+       cache.  Used to validate and load files.
+    """
     def __init__(self, repository, relativePath, schema,
                  needRole=None, signedFormat=True, needSigs=1):
+        """Allocate a new RepositoryFile for a file to be stored under
+           the LocalRepository 'repository' in relativePath.  Make
+           sure the file validates with 'schema' (or its signed form,
+           if 'signedFormat').  When checking signatures, this file needs
+           at least 'needSigs' signatures with role 'needRole'.
+        """
+        # These fields are as in the arguments.
         self._repository = repository
         self._relativePath = relativePath
         self._schema = schema
@@ -25,17 +35,37 @@
         self._signedFormat = signedFormat
         self._needSigs = needSigs
 
-        self._signed_obj = self._main_obj = None
+        # The contents of the file, parsed.  None if we haven't loaded
+        # the file.
+        self._main_obj = None
+
+        # The contents of the file along with their signatures.  May
+        # be aliased by _main_obj.  None if we haven't loaded the
+        # file.
+        self._signed_obj = None
+
+        # A SignatureStatus object, if we have checked signatures.
         self._sigStatus = None
+        # The mtime of the file on disk, if we know it.
         self._mtime = None
 
+    def clear(self):
+        """DOCDOC"""
+        self._main_obj = self._signed_obj = None
+        self._sigStatus = None
+        self._mtime = None
+
     def getRelativePath(self):
+        """Return the filename for this item relative to the top of the
+           repository."""
         return self._relativePath
 
     def getPath(self):
+        """Return the actual filename for this item."""
         return self._repository.getFilename(self._relativePath)
 
     def _load(self):
+        """Helper: load and parse this item's contents."""
         fname = self.getPath()
 
         # Propagate OSError
@@ -59,6 +89,7 @@
         self._mtime = mtime
 
     def _save(self, content=None):
+        """Helper: Flush this object's contents to disk."""
         if content == None:
             content = sexpr.encode
 
@@ -69,9 +100,13 @@
 
         self._signed_obj = signed_obj
         self._main_obj = main_obj
-        self._mtime = mtime
+        self._mtime = time.time()
 
     def _checkContent(self, content):
+        """Helper.  Check whether 'content' matches SIGNED_SCHEMA, and
+           self._schema (as appropraite).  Return a tuple of the
+           signed_schema match, and the schema match, or raise
+           FormatException."""
 
         try:
             obj = json.loads(content)
@@ -94,20 +129,26 @@
         return signed_obj, main_obj
 
     def load(self):
+        """Load this object from disk if it hasn't already been loaded."""
         if self._main_obj == None:
             self._load()
 
     def get(self):
+        """Return the object, or None if it isn't loaded."""
         return self._main_obj
 
     def isLoaded(self):
+        """Return true iff this object is loaded."""
         return self._main_obj != None
 
     def getContent(self):
+        """Load this object as needed and return its content."""
         self.load()
         return self._main_obj
 
     def _checkSignatures(self):
+        """Helper: Try to verify all the signatures on this object, and
+           cache the SignatureStatus object."""
         self.load()
         sigStatus = thandy.formats.checkSignatures(self._signed_obj,
                                      self._repository._keyDB,
@@ -115,15 +156,47 @@
         self._sigStatus = sigStatus
 
     def checkSignatures(self):
+        """Try to verify all the signatures on this object if we
+           haven't already done so, and return a SignatureStatus
+           object."""
         if self._sigStatus is None:
             self._checkSignatures()
         return self._sigStatus
 
+class PkgFile:
+    def __init__(self, repository, relativePath, needHash):
+        self._repository = repository
+        self._relativePath = relativePath
+        self._needHash = needHash
+
+        self._mtime = None
+
+    def clear(self):
+        self._mtime = None
+
+    def getRelativePath(self):
+        return self._relativePath
+
+    def getPath(self):
+        return self._repository.getFilename(self._relativePath)
+
+    def getExpectedHash(self):
+        return self._needHash
+
+    def checkFile(self):
+        return self._needHash == self._repository.getFileDigest()
+
 class LocalRepository:
+    """Represents a client's partial copy of a remote mirrored repository."""
     def __init__(self, root):
+        """Create a new local repository that stores its files under 'root'"""
+        # Top of our mirror.
         self._root = root
+
+        # A base keylist of master keys; we'll add others later.
         self._keyDB = thandy.util.getKeylist(None)
 
+        # Entries for the three invariant metafiles.
         self._keylistFile = RepositoryFile(
             self, "/meta/keys.txt", thandy.formats.KEYLIST_SCHEMA,
             needRole="master")
@@ -133,28 +206,38 @@
         self._mirrorlistFile = RepositoryFile(
             self, "/meta/mirrors.txt", thandy.formats.MIRRORLIST_SCHEMA,
             needRole="mirrors")
+
         self._metaFiles = [ self._keylistFile,
                             self._timestampFile,
                             self._mirrorlistFile ]
 
+        # Map from relative path to a RepositoryFile for packages.
         self._packageFiles = {}
+
+        # Map from relative path to a RepositoryFile for bundles.
         self._bundleFiles = {}
 
     def getFilename(self, relativePath):
+        """Return the file on disk that caches 'relativePath'."""
         if relativePath.startswith("/"):
             relativePath = relativePath[1:]
         return os.path.join(self._root, relativePath)
 
     def getKeylistFile(self):
+        """Return a RepositoryFile for our keylist."""
         return self._keylistFile
 
     def getTimestampFile(self):
+        """Return a RepositoryFile for our timestamp file."""
         return self._timestampFile
 
     def getMirrorlistFile(self):
+        """Return a RepositoryFile for our mirrorlist."""
         return self._mirrorlistFile
 
     def getPackageFile(self, relPath):
+        """Return a RepositoryFile for a package stored at relative path
+           'relPath'."""
         try:
             return self._packageFiles[relPath]
         except KeyError:
@@ -164,6 +247,8 @@
             return pkg
 
     def getBundleFile(self, relPath):
+        """Return a RepositoryFile for a bundle stored at relative path
+           'relPath'."""
         try:
             return self._bundleFiles[relPath]
         except KeyError:
@@ -172,10 +257,38 @@
                 needRole='bundle')
             return pkg
 
-    def getFilesToUpdate(self, now=None, trackingBundles=()):
+    def getRequestedFile(self, relPath):
+        """ """
+        for f in self._metafiles:
+            if f.getRelativePath() == relPath:
+                return f
+        for f in self._bundleFiles.itervalues():
+            if f.getRelativePath() == relPath:
+                return f
+        for f in self._packageFiles.itervalues():
+            if f.getRelativePath() == relPath:
+                return f
+            f.load()
+            for item in f.get()['files']:
+                rp, h = item[:2]
+                if rp == relPath:
+                    return PkgFile(self, rp, thandy.formats.parseHash(h))
+
+    def getFilesToUpdate(self, now=None, trackingBundles=(), hashDict=None):
+        """Return a set of relative paths for all files that we need
+           to fetch.  Assumes that we care about the bundles
+           'trackingBundles'.  If hashDict is provided, add mappings to it
+           from the relative paths we want to fecth to the hashes that we
+           want those items to have, when we know those hashes.
+        """
+
         if now == None:
             now = time.time()
 
+        if hashDict == None:
+            # Use a dummy hashdict.
+            hashDict = {}
+
         need = set()
 
         # Fetch missing metafiles.
@@ -196,6 +309,8 @@
             age = now - thandy.formats.parseTime(ts['at'])
             ts = thandy.formats.TimestampFile.fromJSon(ts)
             if age > MAX_TIMESTAMP_AGE:
+                logging.info("Timestamp file from %s is out of "
+                             "date; must fetch it.", ts['at'])
                 need.add(self._timestampFile.getRelativePath())
 
         # If the keylist isn't signed right, we can't check the
@@ -203,6 +318,8 @@
         if self._keylistFile.get():
             s = self._keylistFile.checkSignatures()
             if not s.isValid(): # For now only require one master key.
+                logging.info("Key list is not properly signed; must get a "
+                             "new one.")
                 need.add(self._keylistFile.getRelativePath())
 
         if need:
@@ -215,6 +332,8 @@
         # new keylist.
         s = self._timestampFile.checkSignatures()
         if not s.isValid():
+            logging.info("Timestamp file is not properly signed; fetching new "
+                         "timestamp file and keylist.")
             need.add(self._keylistFile.getRelativePath())
             need.add(self._timestampFile.getRelativePath())
             return need
@@ -222,9 +341,15 @@
         # FINALLY, we know we have an up-to-date, signed timestamp
         # file.  Check whether the keys and mirrors file are as
         # authenticated.
+        hashDict[self._keylistFile.getRelativePath()] = \
+            ts.getKeylistInfo().getHash()
+        hashDict[self._mirrorlistFile.getRelativePath()] = \
+            ts.getMirrorlistInfo().getHash()
+
         h_kf = thandy.formats.getDigest(self._keylistFile.get())
         h_expected = ts.getKeylistInfo().getHash()
         if h_kf != h_expected:
+            logging.info("Keylist file hash did not match.  Must fetch it.")
             need.add(self._keylistFile.getRelativePath())
 
         if need:
@@ -232,11 +357,13 @@
 
         s = self._mirrorlistFile.checkSignatures()
         if not s.isValid():
+            logging.info("Mirrorlist file signatures not valid. Must fetch.")
             need.add(self._mirrorlistFile.getRelativePath())
 
         h_mf = thandy.formats.getDigest(self._mirrorlistFile.get())
         h_expected = ts.getMirrorlistInfo().getHash()
         if h_mf != h_expected:
+            logging.info("Mirrorlist file hash did not match. Must fetch.")
             need.add(self._mirrorlistFile.getRelativePath())
 
         if need:
@@ -249,26 +376,30 @@
             try:
                 binfo = ts.getBundleInfo(b)
             except KeyError:
-                logging.warn("Unrecognized bundle %s"%b)
+                logging.warn("Bundle %s not listed in timestamp file."%b)
                 continue
 
             rp = binfo.getRelativePath()
+            hashDict[rp] = h_expected = binfo.getHash()
             bfile = self.getBundleFile(rp)
             try:
                 bfile.load()
             except OSError:
+                logging.info("Can't find bundle %s on disk; must fetch.", rp)
                 need.add(rp)
                 continue
 
             h_b = thandy.formats.getDigest(bfile.get())
-            h_expected = binfo.getHash()
             if h_b != h_expected:
+                logging.info("Bundle hash not as expected; must fetch.", rp)
                 need.add(rp)
                 continue
 
             s = bfile.checkSignatures()
             if not s.isValid():
                 # Can't actually use it.
+                logging.warn("Bundle hash was as expected, but signatures did "
+                             "not match.")
                 continue
 
             bundles[rp] = bfile
@@ -280,20 +411,26 @@
             for pkginfo in bundle['packages']:
                 rp = pkginfo['path']
                 pfile = self.getPackageFile(rp)
+                h_expected = thandy.formats.parseHash(pkginfo['hash'])
+                hashDict[rp] = h_expected
                 try:
                     pfile.load()
                 except OSError:
+                    logging.info("Can't find package %s on disk; must fetch.",
+                                 rp)
                     need.add(rp)
                     continue
 
                 h_p = thandy.formats.getDigest(pfile.get())
-                h_expected = thandy.formats.parseHash(pkginfo['hash'])
                 if h_p != h_expected:
+                    logging.info("Wrong hash for package %s; must fetch.", rp)
                     need.add(rp)
                     continue
 
                 s = pfile.checkSignatures()
                 if not s.isValid():
+                    logging.warn("Package hash was as expected, but signature "
+                                 "did nto match")
                     # Can't use it.
                     continue
                 packages[rp] = pfile
@@ -305,13 +442,17 @@
             for f in package['files']:
                 rp, h = f[:2]
                 h_expected = thandy.formats.parseHash(h)
+                hashDict[rp] = h_expected
                 fn = self.getFilename(rp)
                 try:
                     h_got = thandy.formats.getFileDigest(fn)
                 except OSError:
+                    logging.info("Installable file %s not found on disk; "
+                                 "must load", rp)
                     need.add(rp)
                     continue
                 if h_got != h_expected:
+                    logging.info("Hash for %s not as expected; must load.", rp)
                     need.add(rp)
 
         # Okay; these are the files we need.

Added: updater/trunk/lib/thandy/socksurls.py
===================================================================
--- updater/trunk/lib/thandy/socksurls.py	                        (rev 0)
+++ updater/trunk/lib/thandy/socksurls.py	2008-11-16 20:15:34 UTC (rev 17288)
@@ -0,0 +1,93 @@
+# Copyright 2008 The Tor Project, Inc.  See LICENSE for licensing information.
+
+"""Implements URL types for socks-mediated connections."""
+
+import socket
+import httplib
+import logging
+import struct
+import urllib2
+
+# XXXX This isn't really threadsafe, but for now we don't change this after
+# startup.
+SOCKS_HOST = None
+SOCKS_PORT = None
+
+def setSocksProxy(host, port):
+    """Set the global SOCKS proxy to host:port."""
+    global SOCKS_HOST, SOCKS_PORT
+    SOCKS_HOST = host
+    SOCKS_PORT = port
+
+def _recvall(sock, n):
+    """Helper: fetch N bytes from the socket sock."""
+    result = ""
+    while 1:
+        s = sock.recv(n)
+        if not s:
+            return result
+        result += s
+        n -= len(s)
+        if n <= 0:
+            return result
+
+def socks_connect(host, port):
+    """Helper: use the SOCKS proxy to open a connection to host:port.
+       Uses the simple and Tor-friendly SOCKS4a protocol."""
+    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    try:
+        logging.debug("Connecting to SOCKS proxy")
+        sock.connect((SOCKS_HOST, SOCKS_PORT))
+
+        # Now, the handshake!  We just do socks4a, since that's the simplest.
+        version = 4 # socks 4
+        command = 1 # connect
+        addr    = 1   # 0.0.0.1, signals socks4a.
+        userid  = ""
+
+        messageheader = struct.pack("!BBHL", version, command, port, addr)
+        message = "%s%s\x00%s\x00" % (messageheader, userid, host)
+
+        sock.sendall(message)
+
+        logging.debug("Waiting for reply from SOCKS proxy")
+        reply = _recvall(sock, 8)
+        code = ord(reply[1])
+        if code == 0x5a:
+            logging.debug("SOCKS proxy is connected.")
+            return sock
+        else:
+            raise socket.error("Bad SOCKS response code from proxy: %d", code)
+    except:
+        sock.close()
+        raise
+
+# Copies of HTTPConnection and HTTPSConnection that use socks instead of
+# direct connections.
+class SocksHTTPConnection(httplib.HTTPConnection):
+    def connect(self):
+        self.sock = socks_connect(self.host, self.port)
+class SocksHTTPSConnection(httplib.HTTPSConnection):
+    def connect(self):
+        socket = socks_connect(self.host, self.port)
+        ssl = socket.ssl(sock, None, None)
+        self.sock = socket.FakeSocket(socket, ssl)
+
+# URL handlers for HTTP and HTTPS urls that use socks instead of direct
+# connections.
+class SocksHTTPHandler(urllib2.AbstractHTTPHandler):
+    def http_open(self, req):
+        return self.do_open(SocksHTTPConnection, req)
+    http_request = urllib2.AbstractHTTPHandler.do_request_
+class SocksHTTPSHandler(urllib2.AbstractHTTPHandler):
+    def https_open(self, req):
+        return self.do_open(SocksHTTPSConnection, req)
+    https_request = urllib2.AbstractHTTPHandler.do_request_
+
+def build_socks_opener():
+    """Return an urllib2.OpenerDirector object to open HTTP and HTTPS
+       urls using SOCKS connections."""
+    opener = urllib2.OpenerDirector()
+    opener.add_handler(SocksHTTPSHandler())
+    opener.add_handler(SocksHTTPHandler())
+    return opener

Modified: updater/trunk/lib/thandy/util.py
===================================================================
--- updater/trunk/lib/thandy/util.py	2008-11-16 18:25:20 UTC (rev 17287)
+++ updater/trunk/lib/thandy/util.py	2008-11-16 20:15:34 UTC (rev 17288)
@@ -3,6 +3,7 @@
 import os
 import sys
 import tempfile
+import random
 
 try:
     import json
@@ -20,6 +21,7 @@
             os.unlink(toLocation)
         except OSError:
             pass
+
     os.rename(fromLocation, toLocation)
 
 
@@ -75,3 +77,21 @@
         keydb.addFromKeylist(obj['signed'], allowMasterKeys=False)
 
     return keydb
+
+def randChooseWeighted(lst):
+    """Given a list of (weight,item) tuples, pick an item with
+       probability proportional to its weight.
+    """
+
+    totalweight = sum(w for w,i in lst)
+    position = random.uniform(0, totalweight)
+    soFar = 0
+
+    # We could use bisect here, but this is not going to be in the
+    # critical path.  If it is, oops.
+    for w,i in lst:
+        soFar += w
+        if position < soFar:
+            return i
+
+    return lst[-1][1]