[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[or-cvs] r17339: {updater} Teach thandy downloader how to back off on failure, ignore f (updater/trunk/lib/thandy)



Author: nickm
Date: 2008-11-20 16:55:35 -0500 (Thu, 20 Nov 2008)
New Revision: 17339

Modified:
   updater/trunk/lib/thandy/ClientCLI.py
   updater/trunk/lib/thandy/download.py
Log:
Teach thandy downloader how to back off on failure, ignore failing mirrors for a while, etc.  still needs to be taught to persist this info, if that seems smart.

Modified: updater/trunk/lib/thandy/ClientCLI.py
===================================================================
--- updater/trunk/lib/thandy/ClientCLI.py	2008-11-20 21:27:01 UTC (rev 17338)
+++ updater/trunk/lib/thandy/ClientCLI.py	2008-11-20 21:55:35 UTC (rev 17339)
@@ -61,6 +61,9 @@
     if use_packagesys:
         packagesys = thandy.packagesys.PackageSystem.PackageMetasystem.create(repo)
 
+    downloader = thandy.download.DownloadManager()
+    downloader.start()
+
     # XXXX We could make this loop way smarter.  Right now, it doesn't
     # back off between failures, and it doesn't notice newly downloadable files
     # until all downloading files are finished.
@@ -110,9 +113,24 @@
         if not mirrorlist:
             mirrorlist = thandy.master_keys.DEFAULT_MIRRORLIST
 
-        downloader = thandy.download.DownloadManager()
+        if files:
+            waitTill = min(downloader.getRetryTime(mirrorlist, f)
+                           for f in files)
+            now = time.time()
+            if waitTill > now:
+                delay = int(waitTill - now) + 1
+                logging.info("Waiting another %s seconds before we are willing "
+                             "to retry any mirror.", delay)
+                time.sleep(delay)
+                continue
 
+        logging.debug("Launching downloads")
+        now = time.time()
         for f in files:
+            if downloader.getRetryTime(mirrorlist, f) > now:
+                logging.info("Waiting a while before we fetch %s", f)
+                continue
+
             dj = thandy.download.ThandyDownloadJob(
                 f, repo.getFilename(f),
                 mirrorlist,
@@ -130,9 +148,6 @@
 
             downloader.addDownloadJob(dj)
 
-        logging.debug("Launching downloads")
-        downloader.start()
-
         logging.debug("Waiting for downloads to finish.")
         downloader.wait()
         logging.info("All downloads finished.")

Modified: updater/trunk/lib/thandy/download.py
===================================================================
--- updater/trunk/lib/thandy/download.py	2008-11-20 21:27:01 UTC (rev 17338)
+++ updater/trunk/lib/thandy/download.py	2008-11-20 21:55:35 UTC (rev 17339)
@@ -7,12 +7,17 @@
 import random
 import sys
 import threading
+import time
 import traceback
 import urllib2
 
 import thandy.util
 import thandy.socksurls
+import thandy.checkJson
 
+class BadCompoundData(thandy.DownloadError):
+    """DOCDOC"""
+    pass
 
 class DownloadManager:
     """Class to track a set of downloads and pass them out to worker threads.
@@ -40,6 +45,9 @@
         for t in self.threads:
             t.setDaemon(True)
 
+        # DOCDOC
+        self.statusLog = DownloadStatusLog()
+
         #DOCDOC
         self._raiseMe = None
 
@@ -95,12 +103,25 @@
 
     def addDownloadJob(self, job):
         """Add another DownloadJob to the end of the work queue."""
+        job.setDownloadStatusLog(self.statusLog)
         rp = job.getRelativePath()
         self._lock.acquire()
         self.downloads[rp] = job
         self._lock.release()
         self.downloadQueue.put(job)
 
+    def getRetryTime(self, mirrorList, relPath):
+        """DOCDOC"""
+        readyAt = None
+        for m in mirrorsThatSupport(mirrorList, relPath):
+            r = self.statusLog.getDelayTime(m['urlbase'])
+            if readyAt == None or r < readyAt:
+                readyAt = r
+        if readyAt != None:
+            return readyAt
+        else:
+            return 0
+
     def downloadFailed(self, mirror, relpath):
         """DOCDOC"""
         pass # Track failure; don't try the same mirror right away.
@@ -113,7 +134,7 @@
             success = False
             try:
                 logging.info("start %s in Thread %s", rp, idx)
-                success = job.download() # Execute the download.
+                failure = job.download() # Execute the download.
                 logging.info("end %s in Thread %s", rp, idx)
             finally:
                 self._lock.acquire()
@@ -124,15 +145,159 @@
                 finally:
                     self._lock.release()
 
-                if success:
+                if failure == None:
+                    self.statusLog.succeeded(job.getMirror(),
+                                             job.getRelativePath())
                     self.resultQueue.put(job._success)
                 else:
+                    self.statusLog.failed(failure)
                     self.resultQueue.put(job._failure)
 
                 self.done.acquire()
                 self.done.notify()
                 self.done.release()
 
+class DownloadFailure:
+    # Broadly speaking, these errors are possible:
+    #
+    #  A - The whole internet is down for us, either because our network
+    #      connection sucks, our proxy is down, or whatever.
+    #    - A particular mirror is down or nonfunctional.
+    #
+    #      DownloadFailure.connectionFailed()
+    #
+    #  B - The mirror is giving us errors we don't understand.
+    #    - A particular mirror is missing a file we need.
+    #    - A particular mirror served us something that was allegedly a
+    #      file we need, but that file was no good.
+    #
+    #      DownloadFailure.mirrorFailed()
+    #
+    #  C - We finished a partial download and it was no good, but we
+    #      can't tell who was at fault, because we don't know which
+    #      part was corrupt.
+    #
+    #      DownloadFailure.badCompoundFile()
+    #
+    def __init__(self, urlbase, relPath, networkError=False):
+        self._urlbase = urlbase
+        self._relPath = relPath
+        self._network = networkError
+
+        self._when = time.time()
+
+    @staticmethod
+    def badCompoundFile(relpath):
+        return DownloadFailure(None, relpath)
+
+    @staticmethod
+    def mirrorFailed(urlbase, relpath):
+        return DownloadFailure(urlbase, relpath)
+
+    @staticmethod
+    def connectionFailed(urlbase):
+        return DownloadFailure(urlbase, None, True)
+
+S = thandy.checkJson
+_FAIL_SCHEMA = S.Struct([S.Int(), S.Int()], allowMore=True)
+_STATUS_LOG_SCHEMA = S.Obj(
+    v=S.Int(),
+    mirrorFailures=S.DictOf(S.AnyStr(), _FAIL_SCHEMA),
+    networkFailures=_FAIL_SCHEMA)
+del S
+
+class DownloadStatusLog:
+    """DOCDOC"""
+    # XXXX get smarter.
+    # XXXX make this persistent.
+    def __init__(self, mirrorFailures={}, networkFailures=[0,0]):
+        self._lock = threading.RLock()
+        # urlbase -> [ nFailures, dontTryUntil ]
+        self._mirrorFailures = dict(mirrorFailures)
+        # None, or [ nFailures, dontTryUntil ]
+        self._netFailure = list(networkFailures)
+
+    def _getDelay(self, isMirror, failureCount):
+        if isMirror:
+            DELAYS = [ 0, 300, 300, 600, 900, 8400, 8400, 9000 ]
+        else:
+            DELAYS = [ 0, 10, 30, 30, 60, 300, 600, 1800, 3600, 7200 ]
+
+        if failureCount < len(DELAYS):
+            return DELAYS[failureCount]
+        else:
+            return DELAYS[-1]
+
+    def toJSON(self):
+        return { 'v': 1,
+                 'networkFailures' : self._netFailure,
+                 'mirrorFailures' : self._mirrorFailures }
+
+    @staticmethod
+    def fromJSON(obj):
+        _STATUS_LOG_SCHEMA.checkMatch(obj)
+        return DownloadStatusLog( obj['mirrorFailures'],
+                                  obj['networkFailures'] )
+
+    def failed(self, failure):
+        self._lock.acquire()
+        try:
+            when = long(failure._when)
+
+            # If there's a mirror to blame, blame it.
+            if failure._urlbase != None:
+                s = self._mirrorFailures.setdefault(failure._urlbase, [0, 0])
+                if s[1] + 5 < when: # Two failures within 5s count as one.
+                    s[0] += 1
+                    s[1] = when
+
+            # If there is no mirror to blame, or we suspect a network error,
+            # blame the network too.
+            if failure._urlbase == None or failure._network:
+                s = self._netFailure
+                if s[1] + 5 < when: # Two failures within 5s count as one.
+                    s[0] += 1
+                    s[1] = when
+        finally:
+            self._lock.release()
+
+    def succeeded(self, urlbase, url):
+        self._lock.acquire()
+        try:
+            try:
+                del self._mirrorFailures[urlbase]
+            except KeyError:
+                pass
+            self._netFailure = [0, 0]
+        finally:
+            self._lock.release()
+
+    def canRetry(self, urlbase=None, now=None):
+        if now == None:
+            now = time.time()
+
+        d = self.getDelayTime(urlbase)
+        return d <= now
+
+    def getDelayTime(self, urlbase=None):
+        self._lock.acquire()
+        try:
+            readyAt = 0
+
+            if urlbase:
+                status = self._mirrorFailures.get(urlbase, (0,0))
+                if status[1] > readyAt:
+                    readyAt = status[1] + self._getDelay(True, status[0])
+
+            if self._netFailure[1] > readyAt:
+                readyAt = (self._netFailure[1] +
+                           self._getDelay(False, self._netFailure[0]))
+
+            return readyAt
+        finally:
+            self._lock.release()
+
+
 class DownloadJob:
     """Abstract base class.  Represents a thing to be downloaded, and the
        knowledge of how to download it."""
@@ -164,6 +329,10 @@
            _download function downloads from."""
         raise NotImplemented()
 
+    def getMirror(self):
+        """DOCDOC"""
+        return None
+
     def getRelativePath(self):
         """Abstract. Returns a string representing this download, to
            keep two downloads of the same object from running at once.
@@ -180,32 +349,49 @@
 
     def download(self):
         """Main interface function: Start the download, and return
-           when complete.
+           when complete. DOCDOC return value.
         """
         try:
             self._download()
-            return True
-        except (OSError, httplib.error, urllib2.HTTPError,
-                thandy.DownloadError), err:
-            # XXXXX retry on failure
+            return None
+        except BadCompoundData, err:
             logging.warn("Download failed: %s", err)
-            return False
+            # No way to apportion the blame.
+            return DownloadFailure.badCompoundFile(self.getRelativePath())
+        except (urllib2.HTTPError, thandy.DownloadError), err:
+            # looks like we may have irreconcilable differences with a
+            # particular mirror.
+            logging.warn("Download failed: %s", err)
+            return DownloadFailure.mirrorFailed(self.getMirror(),
+                                                self.getRelativePath())
+        except (OSError, httplib.error, IOError, urllib2.URLError), err:
+            logging.warn("Download failed: %s", err)
+            # Could be the mirror; could be the network.  Hard to say.
+            return DownloadFailure.connectionFailed(self.getMirror())
         except:
             tp, val, tb = sys.exc_info()
-            logging.warn("Internal during download: %s, %s", val,
-                         traceback.format_exc())
-            return False
+            logging.exception("Internal error during download: %s", val)
+            # We have an exception!  Treat it like a network error, I guess.
+            return DownloadFailure.connectionFailed(None)
 
+    def setDownloadStatusLog(self, log):
+        """DOCDOC"""
+        pass
+
     def _checkTmpFile(self):
         """Helper: check whether the downloaded temporary file matches
            the hash and/or format we need."""
         if self._wantHash and not self._repoFile:
             gotHash = thandy.formats.getFileDigest(self._tmpPath)
             if gotHash != self._wantHash:
-                raise thandy.DownloadError("File hash was not as expected.")
+                raise thandy.FormatException("File hash was not as expected.")
         elif self._repoFile:
             self._repoFile.checkFile(self._tmpPath, self._wantHash)
 
+    def _removeTmpFile(self):
+        """Helper: remove the temporary file so that we do not get stuck in
+           a downloading-it-forever loop."""
+        os.unlink(self._tmpPath)
 
     def _download(self):
         # Implementation function.  Unlike download(), can throw exceptions.
@@ -219,7 +405,7 @@
                 pass
             else:
                 # What luck!  This stalled file was what we wanted.
-                # (This happens mostly with )
+                # (This happens mostly when we have an internal error.)
                 thandy.util.ensureParentDir(self._destPath)
                 thandy.util.moveFile(self._tmpPath, self._destPath)
                 return
@@ -236,7 +422,15 @@
             else:
                 have_length = None
 
-            f_in = getConnection(url, self._useTor, have_length)
+            try:
+                f_in = getConnection(url, self._useTor, have_length)
+            except urllib2.HTTPError, err:
+                if err.code == 416:
+                    # We asked for a range that couldn't be satisfied.
+                    # Usually, this means that the server thinks the file
+                    # is shorter than we think it is.  We need to start over.
+                    self._removeTmpFile()
+                raise
 
             logging.info("Connected to %s", url)
 
@@ -268,7 +462,14 @@
             if f_out is not None:
                 f_out.close()
 
-        self._checkTmpFile()
+        try:
+            self._checkTmpFile()
+        except (thandy.FormatException, thandy.DownloadError), err:
+            self._removeTmpFile()
+            if haveStalled:
+                raise BadCompoundData(err)
+            else:
+                raise
 
         thandy.util.ensureParentDir(self._destPath)
         thandy.util.moveFile(self._tmpPath, self._destPath)
@@ -290,11 +491,28 @@
     def getRelativePath(self):
         return self._url
 
+def mirrorsThatSupport(mirrorList, relPath, urlTypes=None, statusLog=None):
+    now = time.time()
+    for m in mirrorList['mirrors']:
+        if urlTypes != None:
+            urltype = urllib2.splittype(m['urlbase'])[0]
+            if urltype.lower() not in urlTypes:
+                continue
+
+        if statusLog != None and not statusLog.canRetry(m['urlbase'], now):
+            continue
+
+        for c in m['contents']:
+            if thandy.formats.rolePathMatches(c, relPath):
+                yield m
+                break
+
 class ThandyDownloadJob(DownloadJob):
     """Thandy's subtype of DownloadJob: knows about mirrors, weighting,
        and Thandy's directory structure."""
     def __init__(self, relPath, destPath, mirrorList, wantHash=None,
-                 supportedURLTypes=None, useTor=None, repoFile=None):
+                 supportedURLTypes=None, useTor=None, repoFile=None,
+                 downloadStatusLog=None):
 
         DownloadJob.__init__(self, destPath, None, wantHash=wantHash,
                              useTor=useTor, repoFile=repoFile)
@@ -314,24 +532,27 @@
         if self._supportedURLTypes is None and useTor:
             self._supportedURLTypes = [ "http", "https" ]
 
+        self._usingMirror = None #DOCDOC
+        self._downloadStatusLog = downloadStatusLog
 
+    def setDownloadStatusLog(self, log):
+        self._downloadStatusLog = log
+
     def getURL(self):
         usable = []
 
-        for m in self._mirrorList['mirrors']:
-            for c in m['contents']:
+        for m in mirrorsThatSupport(self._mirrorList, self._relPath,
+                                    self._supportedURLTypes,
+                                    self._downloadStatusLog):
+            usable.append( (m['weight'], m) )
 
-                if self._supportedURLTypes is not None:
-                    urltype = urllib2.splittype(m['urlbase'])[0]
-                    if urltype.lower() not in self._supportedURLTypes:
-                        continue
+        try:
+            mirror = thandy.util.randChooseWeighted(usable)
+        except IndexError:
+            raise thandy.DownloadError("No mirror supports download.")
 
-                if thandy.formats.rolePathMatches(c, self._relPath):
-                    usable.append( (m['weight'], m) )
-                    break
+        self._usingMirror = mirror['urlbase']
 
-        mirror = thandy.util.randChooseWeighted(usable)
-
         if m['urlbase'][-1] == '/' and self._relPath[0] == '/':
             return m['urlbase'] + self._relPath[1:]
         else:
@@ -340,7 +561,10 @@
     def getRelativePath(self):
         return self._relPath
 
+    def getMirror(self):
+        return self._usingMirror
 
+
 _socks_opener = thandy.socksurls.build_socks_opener()
 
 def getConnection(url, useTor, have_length=None):