[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[or-cvs] r17292: {updater} Add main-thread callbacks for download success/failure. (updater/trunk/lib/thandy)
Author: nickm
Date: 2008-11-16 16:09:48 -0500 (Sun, 16 Nov 2008)
New Revision: 17292
Modified:
updater/trunk/lib/thandy/download.py
updater/trunk/lib/thandy/util.py
Log:
Add main-thread callbacks for download success/failure.
Modified: updater/trunk/lib/thandy/download.py
===================================================================
--- updater/trunk/lib/thandy/download.py 2008-11-16 21:07:28 UTC (rev 17291)
+++ updater/trunk/lib/thandy/download.py 2008-11-16 21:09:48 UTC (rev 17292)
@@ -28,6 +28,9 @@
# Work queue of DownloadJobs that we intend to process once a thread
# is free.
self.downloadQueue = Queue.Queue()
+ # DOCDOC
+ self.resultQueue = Queue.Queue()
+
# List of worker threads.
self.threads = [ threading.Thread(target=self._thread, args=[idx])
for idx in xrange(n_threads) ]
@@ -66,19 +69,23 @@
"""Return true iff we have no active or pending jobs."""
self._lock.acquire()
try:
- return downloadQueue.empty() and len(self.downloads) == 0
+ return self.downloadQueue.empty() and len(self.downloads) == 0
finally:
self._lock.release()
def wait(self):
"""Pause until we have no active or pending jobs."""
- while True:
+ while not self.finished():
self.done.acquire()
self.done.wait()
self.done.release()
- if self.finished():
- break
+ try:
+ while True:
+ item = self.resultQueue.get(block=False)
+ item()
+ except Queue.Empty:
+ pass
def addDownloadJob(self, job):
"""Add another DownloadJob to the end of the work queue."""
@@ -97,6 +104,7 @@
while True:
job = self.downloadQueue.get() # Grab job from queue.
rp = job.getRelativePath()
+ success = False
try:
logging.info("start %s in Thread %s", rp, idx)
success = job.download() # Execute the download.
@@ -110,6 +118,11 @@
finally:
self._lock.release()
+ if success:
+ self.resultQueue.put(job._success)
+ else:
+ self.resultQueue.put(job._failure)
+
self.done.acquire()
self.done.notify()
self.done.release()
@@ -129,6 +142,14 @@
self._wantHash = wantHash
self._useTor = useTor
+ self._success = lambda : None
+ self._failure = lambda : None
+
+ def setCallbacks(self, success, failure):
+ """DOCDOC"""
+ self._success = success
+ self._failure = failure
+
def getURL(self):
"""Abstract implementation helper. Returns the URL that the
_download function downloads from."""
@@ -155,7 +176,8 @@
try:
self._download()
return True
- except (OSError, thandy.DownloadError), err:
+ except (OSError, httplib.error, urllib2.HTTPError,
+ thandy.DownloadError), err:
# XXXXX retry on failure
logging.warn("Download failed: %s", err)
return False
@@ -165,7 +187,6 @@
traceback.format_exc())
sys.exit(1)
-
def _download(self):
# Implementation function. Unlike download(), can throw exceptions.
f_in = f_out = None
@@ -177,7 +198,8 @@
if self.haveStalledFile():
have_length = os.stat(self._tmpPath).st_size
- print "Have stalled file with %s bytes"%have_length
+ logging.info("Have stalled file for %s with %s bytes", url,
+ have_length)
else:
have_length = None
@@ -218,6 +240,7 @@
if gotHash != self._wantHash:
raise thandy.DownloadError("File hash was not as expected.")
+ thandy.util.ensureParentDir(self._destPath)
thandy.util.moveFile(self._tmpPath, self._destPath)
@@ -245,7 +268,7 @@
DownloadJob.__init__(self, destPath, None, wantHash=wantHash,
useTor=useTor)
- self._mirrorList = mirrorList[:]
+ self._mirrorList = mirrorList
self._relPath = relPath
tmppath = thandy.util.userFilename("tmp")
@@ -279,7 +302,10 @@
mirror = thandy.util.randChooseWeighted(usable)
- return m['urlbase'] + self._relPath
+ if m['urlbase'][-1] == '/' and self._relPath[0] == '/':
+ return m['urlbase'] + self._relPath[1:]
+ else:
+ return m['urlbase'] + self._relPath
def getRelativePath(self):
return self._relPath
Modified: updater/trunk/lib/thandy/util.py
===================================================================
--- updater/trunk/lib/thandy/util.py 2008-11-16 21:07:28 UTC (rev 17291)
+++ updater/trunk/lib/thandy/util.py 2008-11-16 21:09:48 UTC (rev 17292)
@@ -24,7 +24,6 @@
os.rename(fromLocation, toLocation)
-
def replaceFile(fname, contents, textMode=False):
"""overwrite the file in 'fname' atomically with the content of 'contents'
"""
@@ -48,13 +47,19 @@
os.makedirs(base, 0700)
return os.path.join(base, name)
+def ensureParentDir(name):
+ """DOCDOC"""
+ directory = os.path.split(name)[0]
+ if not os.path.exists(directory):
+ os.makedirs(directory, 0700)
+
def getKeylist(keys_fname, checkKeys=True):
import thandy.master_keys
keydb = thandy.formats.Keylist()
for key in thandy.master_keys.MASTER_KEYS:
- keydb.addKey(key)
+ keydb.addKey(thandy.keys.RSAKey.fromJSon(key))
user_keys = userFilename("preload_keys")
if os.path.exists(user_keys):