[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[or-cvs] r17292: {updater} Add main-thread callbacks for download success/failure. (updater/trunk/lib/thandy)



Author: nickm
Date: 2008-11-16 16:09:48 -0500 (Sun, 16 Nov 2008)
New Revision: 17292

Modified:
   updater/trunk/lib/thandy/download.py
   updater/trunk/lib/thandy/util.py
Log:
Add main-thread callbacks for download success/failure.

Modified: updater/trunk/lib/thandy/download.py
===================================================================
--- updater/trunk/lib/thandy/download.py	2008-11-16 21:07:28 UTC (rev 17291)
+++ updater/trunk/lib/thandy/download.py	2008-11-16 21:09:48 UTC (rev 17292)
@@ -28,6 +28,9 @@
         # Work queue of DownloadJobs that we intend to process once a thread
         # is free.
         self.downloadQueue = Queue.Queue()
+        # DOCDOC
+        self.resultQueue = Queue.Queue()
+
         # List of worker threads.
         self.threads = [ threading.Thread(target=self._thread, args=[idx])
                          for idx in xrange(n_threads) ]
@@ -66,19 +69,23 @@
         """Return true iff we have no active or pending jobs."""
         self._lock.acquire()
         try:
-            return downloadQueue.empty() and len(self.downloads) == 0
+            return self.downloadQueue.empty() and len(self.downloads) == 0
         finally:
             self._lock.release()
 
     def wait(self):
         """Pause until we have no active or pending jobs."""
-        while True:
+        while not self.finished():
             self.done.acquire()
             self.done.wait()
             self.done.release()
 
-            if self.finished():
-                break
+            try:
+                while True:
+                    item = self.resultQueue.get(block=False)
+                    item()
+            except Queue.Empty:
+                pass
 
     def addDownloadJob(self, job):
         """Add another DownloadJob to the end of the work queue."""
@@ -97,6 +104,7 @@
         while True:
             job = self.downloadQueue.get() # Grab job from queue.
             rp = job.getRelativePath()
+            success = False
             try:
                 logging.info("start %s in Thread %s", rp, idx)
                 success = job.download() # Execute the download.
@@ -110,6 +118,11 @@
                 finally:
                     self._lock.release()
 
+                if success:
+                    self.resultQueue.put(job._success)
+                else:
+                    self.resultQueue.put(job._failure)
+
                 self.done.acquire()
                 self.done.notify()
                 self.done.release()
@@ -129,6 +142,14 @@
         self._wantHash = wantHash
         self._useTor = useTor
 
+        self._success = lambda : None
+        self._failure = lambda : None
+
+    def setCallbacks(self, success, failure):
+        """DOCDOC"""
+        self._success = success
+        self._failure = failure
+
     def getURL(self):
         """Abstract implementation helper.  Returns the URL that the
            _download function downloads from."""
@@ -155,7 +176,8 @@
         try:
             self._download()
             return True
-        except (OSError, thandy.DownloadError), err:
+        except (OSError, httplib.error, urllib2.HTTPError,
+                thandy.DownloadError), err:
             # XXXXX retry on failure
             logging.warn("Download failed: %s", err)
             return False
@@ -165,7 +187,6 @@
                          traceback.format_exc())
             sys.exit(1)
 
-
     def _download(self):
         # Implementation function.  Unlike download(), can throw exceptions.
         f_in = f_out = None
@@ -177,7 +198,8 @@
 
             if self.haveStalledFile():
                 have_length = os.stat(self._tmpPath).st_size
-                print "Have stalled file with %s bytes"%have_length
+                logging.info("Have stalled file for %s with %s bytes", url,
+                             have_length)
             else:
                 have_length = None
 
@@ -218,6 +240,7 @@
             if gotHash != self._wantHash:
                 raise thandy.DownloadError("File hash was not as expected.")
 
+        thandy.util.ensureParentDir(self._destPath)
         thandy.util.moveFile(self._tmpPath, self._destPath)
 
 
@@ -245,7 +268,7 @@
 
         DownloadJob.__init__(self, destPath, None, wantHash=wantHash,
                              useTor=useTor)
-        self._mirrorList = mirrorList[:]
+        self._mirrorList = mirrorList
         self._relPath = relPath
 
         tmppath = thandy.util.userFilename("tmp")
@@ -279,7 +302,10 @@
 
         mirror = thandy.util.randChooseWeighted(usable)
 
-        return m['urlbase'] + self._relPath
+        if m['urlbase'][-1] == '/' and self._relPath[0] == '/':
+            return m['urlbase'] + self._relPath[1:]
+        else:
+            return m['urlbase'] + self._relPath
 
     def getRelativePath(self):
         return self._relPath

Modified: updater/trunk/lib/thandy/util.py
===================================================================
--- updater/trunk/lib/thandy/util.py	2008-11-16 21:07:28 UTC (rev 17291)
+++ updater/trunk/lib/thandy/util.py	2008-11-16 21:09:48 UTC (rev 17292)
@@ -24,7 +24,6 @@
 
     os.rename(fromLocation, toLocation)
 
-
 def replaceFile(fname, contents, textMode=False):
     """overwrite the file in 'fname' atomically with the content of 'contents'
     """
@@ -48,13 +47,19 @@
         os.makedirs(base, 0700)
     return os.path.join(base, name)
 
+def ensureParentDir(name):
+    """DOCDOC"""
+    directory = os.path.split(name)[0]
+    if not os.path.exists(directory):
+        os.makedirs(directory, 0700)
+
 def getKeylist(keys_fname, checkKeys=True):
     import thandy.master_keys
 
     keydb = thandy.formats.Keylist()
 
     for key in thandy.master_keys.MASTER_KEYS:
-        keydb.addKey(key)
+        keydb.addKey(thandy.keys.RSAKey.fromJSon(key))
 
     user_keys = userFilename("preload_keys")
     if os.path.exists(user_keys):