[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [metrics-lib/master] Make DownloadCoordinator interface to facilitate testing.



commit bc9925b34030c4bba62fd0bda248a720e7da2989
Author: Karsten Loesing <karsten.loesing@xxxxxxx>
Date:   Thu Jan 19 15:21:50 2012 +0100

    Make DownloadCoordinator interface to facilitate testing.
---
 .../descriptor/impl/DownloadCoordinator.java       |  263 +-------------------
 .../descriptor/impl/DownloadCoordinatorImpl.java   |  267 ++++++++++++++++++++
 .../impl/RelayDescriptorDownloaderImpl.java        |   11 +-
 3 files changed, 276 insertions(+), 265 deletions(-)

diff --git a/src/org/torproject/descriptor/impl/DownloadCoordinator.java b/src/org/torproject/descriptor/impl/DownloadCoordinator.java
index 362006d..6d837a5 100644
--- a/src/org/torproject/descriptor/impl/DownloadCoordinator.java
+++ b/src/org/torproject/descriptor/impl/DownloadCoordinator.java
@@ -2,266 +2,9 @@
  * See LICENSE for licensing information */
 package org.torproject.descriptor.impl;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedMap;
+public interface DownloadCoordinator {
 
-import org.torproject.descriptor.Descriptor;
-import org.torproject.descriptor.DescriptorRequest;
-import org.torproject.descriptor.DirSourceEntry;
-import org.torproject.descriptor.RelayNetworkStatusConsensus;
-import org.torproject.descriptor.RelayNetworkStatusVote;
+  public DescriptorRequestImpl createRequest(String nickname);
 
-/* TODO This whole download logic is a mess and needs a cleanup. */
-public class DownloadCoordinator {
-
-  private BlockingIteratorImpl<DescriptorRequest> descriptorQueue =
-      new BlockingIteratorImpl<DescriptorRequest>();
-  protected Iterator<DescriptorRequest> getDescriptorQueue() {
-    return this.descriptorQueue;
-  }
-
-  private SortedMap<String, DirectoryDownloader> directoryAuthorities;
-  private SortedMap<String, DirectoryDownloader> directoryMirrors;
-  private boolean downloadConsensusFromAllAuthorities;
-  private boolean includeCurrentReferencedVotes;
-  private long requestTimeoutMillis;
-  private long globalTimeoutMillis;
-
-  protected DownloadCoordinator(
-      SortedMap<String, DirectoryDownloader> directoryAuthorities,
-      SortedMap<String, DirectoryDownloader> directoryMirrors,
-      boolean downloadConsensus,
-      boolean downloadConsensusFromAllAuthorities,
-      Set<String> downloadVotes, boolean includeCurrentReferencedVotes,
-      long requestTimeoutMillis, long globalTimeoutMillis) {
-    this.directoryAuthorities = directoryAuthorities;
-    this.directoryMirrors = directoryMirrors;
-    this.missingConsensus = downloadConsensus;
-    this.downloadConsensusFromAllAuthorities =
-        downloadConsensusFromAllAuthorities;
-    this.missingVotes = downloadVotes;
-    this.includeCurrentReferencedVotes = includeCurrentReferencedVotes;
-    this.requestTimeoutMillis = requestTimeoutMillis;
-    this.globalTimeoutMillis = globalTimeoutMillis;
-    if (this.directoryMirrors.isEmpty() &&
-        this.directoryAuthorities.isEmpty()) {
-      this.descriptorQueue.setOutOfDescriptors();
-      /* TODO Should we say anything if we don't have any directories
-       * configured? */
-    } else {
-      GlobalTimer globalTimer = new GlobalTimer(this.globalTimeoutMillis,
-          this);
-      this.globalTimerThread = new Thread(globalTimer);
-      this.globalTimerThread.start();
-      for (DirectoryDownloader directoryMirror :
-          this.directoryMirrors.values()) {
-        directoryMirror.setDownloadCoordinator(this);
-        directoryMirror.setRequestTimeout(this.requestTimeoutMillis);
-        new Thread(directoryMirror).start();
-      }
-      for (DirectoryDownloader directoryAuthority :
-          this.directoryAuthorities.values()) {
-        directoryAuthority.setDownloadCoordinator(this);
-        directoryAuthority.setRequestTimeout(this.requestTimeoutMillis);
-        new Thread(directoryAuthority).start();
-      }
-    }
-  }
-
-  /* Interrupt all downloads if the total download time exceeds a given
-   * time. */
-  private Thread globalTimerThread;
-  private static class GlobalTimer implements Runnable {
-    private long timeoutMillis;
-    private DownloadCoordinator downloadCoordinator;
-    private GlobalTimer(long timeoutMillis,
-        DownloadCoordinator downloadCoordinator) {
-      this.timeoutMillis = timeoutMillis;
-      this.downloadCoordinator = downloadCoordinator;
-    }
-    public void run() {
-      long started = System.currentTimeMillis(), sleep;
-      while ((sleep = started + this.timeoutMillis
-          - System.currentTimeMillis()) > 0L) {
-        try {
-          Thread.sleep(sleep);
-        } catch (InterruptedException e) {
-          return;
-        }
-      }
-      this.downloadCoordinator.interruptAllDownloads();
-    }
-  }
-
-  /* Are we missing the consensus, and should the next directory that
-   * hasn't tried downloading it before attempt to download it? */
-  private boolean missingConsensus = false;
-
-  /* Which directories are currently attempting to download the
-   * consensus? */
-  private Set<String> requestingConsensuses = new HashSet<String>();
-
-  /* Which directories have attempted to download the consensus so far,
-   * including those directories that are currently attempting it? */
-  private Set<String> requestedConsensuses = new HashSet<String>();
-
-  /* Which votes are we currently missing? */
-  private Set<String> missingVotes = new HashSet<String>();
-
-  /* Which vote (map value) is a given directory (map key) currently
-   * attempting to download? */
-  private Map<String, String> requestingVotes =
-      new HashMap<String, String>();
-
-  /* Which votes (map value) has a given directory (map key) attempted or
-   * is currently attempting to download? */
-  private Map<String, Set<String>> requestedVotes =
-      new HashMap<String, Set<String>>();
-
-  private boolean hasFinishedDownloading = false;
-
-  /* Look up what request a directory should make next.  If there is
-   * nothing to do right now, but maybe later, block the caller.  If
-   * we're done downloading, return null to notify the caller. */
-  protected synchronized DescriptorRequestImpl createRequest(
-      String nickname) {
-    while (!this.hasFinishedDownloading) {
-      DescriptorRequestImpl request = new DescriptorRequestImpl();
-      request.setDirectoryNickname(nickname);
-      if ((this.missingConsensus ||
-          (this.downloadConsensusFromAllAuthorities &&
-          this.directoryAuthorities.containsKey(nickname))) &&
-          !this.requestedConsensuses.contains(nickname)) {
-        if (!this.downloadConsensusFromAllAuthorities) {
-          this.missingConsensus = false;
-        }
-        this.requestingConsensuses.add(nickname);
-        this.requestedConsensuses.add(nickname);
-        request.setRequestedResource(
-            "/tor/status-vote/current/consensus.z");
-        request.setDescriptorType("consensus");
-        return request;
-      }
-      if (!this.missingVotes.isEmpty() &&
-          this.directoryAuthorities.containsKey(nickname)) {
-        String requestingVote = null;
-        for (String missingVote : this.missingVotes) {
-          if (!this.requestedVotes.containsKey(nickname) ||
-              !this.requestedVotes.get(nickname).contains(missingVote)) {
-            requestingVote = missingVote;
-          }
-        }
-        if (requestingVote != null) {
-          this.requestingVotes.put(nickname, requestingVote);
-          if (!this.requestedVotes.containsKey(nickname)) {
-            this.requestedVotes.put(nickname, new HashSet<String>());
-          }
-          this.requestedVotes.get(nickname).add(requestingVote);
-          this.missingVotes.remove(requestingVote);
-          request.setRequestedResource("/tor/status-vote/current/"
-              + requestingVote + ".z");
-          request.setDescriptorType("vote");
-          return request;
-        }
-      }
-      /* TODO Add server descriptors and extra-info descriptors later. */
-      try {
-        this.wait();
-      } catch (InterruptedException e) {
-        /* TODO What shall we do? */
-      }
-    }
-    return null;
-  }
-
-  /* Deliver a response which may either contain one or more descriptors
-   * or a failure response code.  Update the lists of missing descriptors,
-   * decide if there are more descriptors to download, and wake up any
-   * waiting downloader threads. */
-  protected synchronized void deliverResponse(
-      DescriptorRequestImpl response) {
-    String nickname = response.getDirectoryNickname();
-    if (response.getDescriptorType().equals("consensus")) {
-      this.requestingConsensuses.remove(nickname);
-      if (response.getResponseCode() == 200) {
-        List<RelayNetworkStatusConsensus> parsedConsensuses =
-            RelayNetworkStatusConsensusImpl.parseConsensuses(
-            response.getResponseBytes());
-        List<Descriptor> parsedDescriptors =
-            new ArrayList<Descriptor>(parsedConsensuses);
-        response.setDescriptors(parsedDescriptors);
-        if (this.includeCurrentReferencedVotes) {
-          /* TODO Only add votes if the consensus is not older than one
-           * hour.  Or does that make no sense? */
-          for (RelayNetworkStatusConsensus parsedConsensus :
-              parsedConsensuses) {
-            for (DirSourceEntry dirSource :
-                parsedConsensus.getDirSourceEntries().values()) {
-              String identity = dirSource.getIdentity();
-              if (!this.missingVotes.contains(identity)) {
-                boolean alreadyRequested = false;
-                for (Set<String> requestedBefore :
-                    this.requestedVotes.values()) {
-                  if (requestedBefore.contains(identity)) {
-                    alreadyRequested = true;
-                    break;
-                  }
-                }
-                if (!alreadyRequested) {
-                  this.missingVotes.add(identity);
-                }
-              }
-            }
-          }
-          /* TODO Later, add referenced server descriptors. */
-        }
-      } else {
-        this.missingConsensus = true;
-      }
-    } else if (response.getDescriptorType().equals("vote")) {
-      String requestedVote = requestingVotes.remove(nickname);
-      if (response.getResponseCode() == 200) {
-        List<RelayNetworkStatusVote> parsedVotes =
-            RelayNetworkStatusVoteImpl.parseVotes(
-            response.getResponseBytes());
-        List<Descriptor> parsedDescriptors =
-            new ArrayList<Descriptor>(parsedVotes);
-        response.setDescriptors(parsedDescriptors);
-      } else {
-        this.missingVotes.add(requestedVote);
-      }
-    }
-    if (response.getRequestEnd() != 0L) {
-      this.descriptorQueue.add(response);
-    }
-    if ((!this.missingConsensus ||
-        (this.downloadConsensusFromAllAuthorities &&
-        this.requestedConsensuses.containsAll(
-        this.directoryAuthorities.keySet()) &&
-        this.requestingConsensuses.isEmpty())) &&
-        this.missingVotes.isEmpty() &&
-        this.requestingVotes.isEmpty()) {
-      /* TODO This logic may be somewhat broken.  We don't wait for all
-       * consensus requests to complete or fail, which results in adding
-       * (failed) requests to the queue when we think we're done. */
-      this.hasFinishedDownloading = true;
-      this.globalTimerThread.interrupt();
-      this.descriptorQueue.setOutOfDescriptors();
-    }
-    /* Wake up all waiting downloader threads.  Maybe they can now
-     * download something, or they'll realize we're done downloading. */
-    this.notifyAll();
-  }
-
-  private synchronized void interruptAllDownloads() {
-    this.hasFinishedDownloading = true;
-    this.notifyAll();
-  }
+  public void deliverResponse(DescriptorRequestImpl request);
 }
-
diff --git a/src/org/torproject/descriptor/impl/DownloadCoordinatorImpl.java b/src/org/torproject/descriptor/impl/DownloadCoordinatorImpl.java
new file mode 100644
index 0000000..5d4ca21
--- /dev/null
+++ b/src/org/torproject/descriptor/impl/DownloadCoordinatorImpl.java
@@ -0,0 +1,267 @@
+/* Copyright 2011, 2012 The Tor Project
+ * See LICENSE for licensing information */
+package org.torproject.descriptor.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+
+import org.torproject.descriptor.Descriptor;
+import org.torproject.descriptor.DescriptorRequest;
+import org.torproject.descriptor.DirSourceEntry;
+import org.torproject.descriptor.RelayNetworkStatusConsensus;
+import org.torproject.descriptor.RelayNetworkStatusVote;
+
+/* TODO This whole download logic is a mess and needs a cleanup. */
+public class DownloadCoordinatorImpl implements DownloadCoordinator {
+
+  private BlockingIteratorImpl<DescriptorRequest> descriptorQueue =
+      new BlockingIteratorImpl<DescriptorRequest>();
+  protected Iterator<DescriptorRequest> getDescriptorQueue() {
+    return this.descriptorQueue;
+  }
+
+  private SortedMap<String, DirectoryDownloader> directoryAuthorities;
+  private SortedMap<String, DirectoryDownloader> directoryMirrors;
+  private boolean downloadConsensusFromAllAuthorities;
+  private boolean includeCurrentReferencedVotes;
+  private long requestTimeoutMillis;
+  private long globalTimeoutMillis;
+
+  protected DownloadCoordinatorImpl(
+      SortedMap<String, DirectoryDownloader> directoryAuthorities,
+      SortedMap<String, DirectoryDownloader> directoryMirrors,
+      boolean downloadConsensus,
+      boolean downloadConsensusFromAllAuthorities,
+      Set<String> downloadVotes, boolean includeCurrentReferencedVotes,
+      long requestTimeoutMillis, long globalTimeoutMillis) {
+    this.directoryAuthorities = directoryAuthorities;
+    this.directoryMirrors = directoryMirrors;
+    this.missingConsensus = downloadConsensus;
+    this.downloadConsensusFromAllAuthorities =
+        downloadConsensusFromAllAuthorities;
+    this.missingVotes = downloadVotes;
+    this.includeCurrentReferencedVotes = includeCurrentReferencedVotes;
+    this.requestTimeoutMillis = requestTimeoutMillis;
+    this.globalTimeoutMillis = globalTimeoutMillis;
+    if (this.directoryMirrors.isEmpty() &&
+        this.directoryAuthorities.isEmpty()) {
+      this.descriptorQueue.setOutOfDescriptors();
+      /* TODO Should we say anything if we don't have any directories
+       * configured? */
+    } else {
+      GlobalTimer globalTimer = new GlobalTimer(this.globalTimeoutMillis,
+          this);
+      this.globalTimerThread = new Thread(globalTimer);
+      this.globalTimerThread.start();
+      for (DirectoryDownloader directoryMirror :
+          this.directoryMirrors.values()) {
+        directoryMirror.setDownloadCoordinator(this);
+        directoryMirror.setRequestTimeout(this.requestTimeoutMillis);
+        new Thread(directoryMirror).start();
+      }
+      for (DirectoryDownloader directoryAuthority :
+          this.directoryAuthorities.values()) {
+        directoryAuthority.setDownloadCoordinator(this);
+        directoryAuthority.setRequestTimeout(this.requestTimeoutMillis);
+        new Thread(directoryAuthority).start();
+      }
+    }
+  }
+
+  /* Interrupt all downloads if the total download time exceeds a given
+   * time. */
+  private Thread globalTimerThread;
+  private static class GlobalTimer implements Runnable {
+    private long timeoutMillis;
+    private DownloadCoordinatorImpl downloadCoordinator;
+    private GlobalTimer(long timeoutMillis,
+        DownloadCoordinatorImpl downloadCoordinator) {
+      this.timeoutMillis = timeoutMillis;
+      this.downloadCoordinator = downloadCoordinator;
+    }
+    public void run() {
+      long started = System.currentTimeMillis(), sleep;
+      while ((sleep = started + this.timeoutMillis
+          - System.currentTimeMillis()) > 0L) {
+        try {
+          Thread.sleep(sleep);
+        } catch (InterruptedException e) {
+          return;
+        }
+      }
+      this.downloadCoordinator.interruptAllDownloads();
+    }
+  }
+
+  /* Are we missing the consensus, and should the next directory that
+   * hasn't tried downloading it before attempt to download it? */
+  private boolean missingConsensus = false;
+
+  /* Which directories are currently attempting to download the
+   * consensus? */
+  private Set<String> requestingConsensuses = new HashSet<String>();
+
+  /* Which directories have attempted to download the consensus so far,
+   * including those directories that are currently attempting it? */
+  private Set<String> requestedConsensuses = new HashSet<String>();
+
+  /* Which votes are we currently missing? */
+  private Set<String> missingVotes = new HashSet<String>();
+
+  /* Which vote (map value) is a given directory (map key) currently
+   * attempting to download? */
+  private Map<String, String> requestingVotes =
+      new HashMap<String, String>();
+
+  /* Which votes (map value) has a given directory (map key) attempted or
+   * is currently attempting to download? */
+  private Map<String, Set<String>> requestedVotes =
+      new HashMap<String, Set<String>>();
+
+  private boolean hasFinishedDownloading = false;
+
+  /* Look up what request a directory should make next.  If there is
+   * nothing to do right now, but maybe later, block the caller.  If
+   * we're done downloading, return null to notify the caller. */
+  public synchronized DescriptorRequestImpl createRequest(
+      String nickname) {
+    while (!this.hasFinishedDownloading) {
+      DescriptorRequestImpl request = new DescriptorRequestImpl();
+      request.setDirectoryNickname(nickname);
+      if ((this.missingConsensus ||
+          (this.downloadConsensusFromAllAuthorities &&
+          this.directoryAuthorities.containsKey(nickname))) &&
+          !this.requestedConsensuses.contains(nickname)) {
+        if (!this.downloadConsensusFromAllAuthorities) {
+          this.missingConsensus = false;
+        }
+        this.requestingConsensuses.add(nickname);
+        this.requestedConsensuses.add(nickname);
+        request.setRequestedResource(
+            "/tor/status-vote/current/consensus.z");
+        request.setDescriptorType("consensus");
+        return request;
+      }
+      if (!this.missingVotes.isEmpty() &&
+          this.directoryAuthorities.containsKey(nickname)) {
+        String requestingVote = null;
+        for (String missingVote : this.missingVotes) {
+          if (!this.requestedVotes.containsKey(nickname) ||
+              !this.requestedVotes.get(nickname).contains(missingVote)) {
+            requestingVote = missingVote;
+          }
+        }
+        if (requestingVote != null) {
+          this.requestingVotes.put(nickname, requestingVote);
+          if (!this.requestedVotes.containsKey(nickname)) {
+            this.requestedVotes.put(nickname, new HashSet<String>());
+          }
+          this.requestedVotes.get(nickname).add(requestingVote);
+          this.missingVotes.remove(requestingVote);
+          request.setRequestedResource("/tor/status-vote/current/"
+              + requestingVote + ".z");
+          request.setDescriptorType("vote");
+          return request;
+        }
+      }
+      /* TODO Add server descriptors and extra-info descriptors later. */
+      try {
+        this.wait();
+      } catch (InterruptedException e) {
+        /* TODO What shall we do? */
+      }
+    }
+    return null;
+  }
+
+  /* Deliver a response which may either contain one or more descriptors
+   * or a failure response code.  Update the lists of missing descriptors,
+   * decide if there are more descriptors to download, and wake up any
+   * waiting downloader threads. */
+  public synchronized void deliverResponse(
+      DescriptorRequestImpl response) {
+    String nickname = response.getDirectoryNickname();
+    if (response.getDescriptorType().equals("consensus")) {
+      this.requestingConsensuses.remove(nickname);
+      if (response.getResponseCode() == 200) {
+        List<RelayNetworkStatusConsensus> parsedConsensuses =
+            RelayNetworkStatusConsensusImpl.parseConsensuses(
+            response.getResponseBytes());
+        List<Descriptor> parsedDescriptors =
+            new ArrayList<Descriptor>(parsedConsensuses);
+        response.setDescriptors(parsedDescriptors);
+        if (this.includeCurrentReferencedVotes) {
+          /* TODO Only add votes if the consensus is not older than one
+           * hour.  Or does that make no sense? */
+          for (RelayNetworkStatusConsensus parsedConsensus :
+              parsedConsensuses) {
+            for (DirSourceEntry dirSource :
+                parsedConsensus.getDirSourceEntries().values()) {
+              String identity = dirSource.getIdentity();
+              if (!this.missingVotes.contains(identity)) {
+                boolean alreadyRequested = false;
+                for (Set<String> requestedBefore :
+                    this.requestedVotes.values()) {
+                  if (requestedBefore.contains(identity)) {
+                    alreadyRequested = true;
+                    break;
+                  }
+                }
+                if (!alreadyRequested) {
+                  this.missingVotes.add(identity);
+                }
+              }
+            }
+          }
+          /* TODO Later, add referenced server descriptors. */
+        }
+      } else {
+        this.missingConsensus = true;
+      }
+    } else if (response.getDescriptorType().equals("vote")) {
+      String requestedVote = requestingVotes.remove(nickname);
+      if (response.getResponseCode() == 200) {
+        List<RelayNetworkStatusVote> parsedVotes =
+            RelayNetworkStatusVoteImpl.parseVotes(
+            response.getResponseBytes());
+        List<Descriptor> parsedDescriptors =
+            new ArrayList<Descriptor>(parsedVotes);
+        response.setDescriptors(parsedDescriptors);
+      } else {
+        this.missingVotes.add(requestedVote);
+      }
+    }
+    if (response.getRequestEnd() != 0L) {
+      this.descriptorQueue.add(response);
+    }
+    if ((!this.missingConsensus ||
+        (this.downloadConsensusFromAllAuthorities &&
+        this.requestedConsensuses.containsAll(
+        this.directoryAuthorities.keySet()) &&
+        this.requestingConsensuses.isEmpty())) &&
+        this.missingVotes.isEmpty() &&
+        this.requestingVotes.isEmpty()) {
+      /* TODO This logic may be somewhat broken.  We don't wait for all
+       * consensus requests to complete or fail, which results in adding
+       * (failed) requests to the queue when we think we're done. */
+      this.hasFinishedDownloading = true;
+      this.globalTimerThread.interrupt();
+      this.descriptorQueue.setOutOfDescriptors();
+    }
+    /* Wake up all waiting downloader threads.  Maybe they can now
+     * download something, or they'll realize we're done downloading. */
+    this.notifyAll();
+  }
+
+  private synchronized void interruptAllDownloads() {
+    this.hasFinishedDownloading = true;
+    this.notifyAll();
+  }
+}
+
diff --git a/src/org/torproject/descriptor/impl/RelayDescriptorDownloaderImpl.java b/src/org/torproject/descriptor/impl/RelayDescriptorDownloaderImpl.java
index 6ea81e1..7cdcc0c 100644
--- a/src/org/torproject/descriptor/impl/RelayDescriptorDownloaderImpl.java
+++ b/src/org/torproject/descriptor/impl/RelayDescriptorDownloaderImpl.java
@@ -227,11 +227,12 @@ public class RelayDescriptorDownloaderImpl
           + "permitted once.");
     }
     this.hasStartedDownloading = true;
-    DownloadCoordinator downloadCoordinator = new DownloadCoordinator(
-        this.directoryAuthorities, this.directoryMirrors,
-        this.downloadConsensus, this.downloadConsensusFromAllAuthorities,
-        this.downloadVotes, this.includeCurrentReferencedVotes,
-        this.requestTimeoutMillis, this.globalTimeoutMillis);
+    DownloadCoordinatorImpl downloadCoordinator =
+        new DownloadCoordinatorImpl(this.directoryAuthorities,
+        this.directoryMirrors, this.downloadConsensus,
+        this.downloadConsensusFromAllAuthorities, this.downloadVotes,
+        this.includeCurrentReferencedVotes, this.requestTimeoutMillis,
+        this.globalTimeoutMillis);
     Iterator<DescriptorRequest> descriptorQueue = downloadCoordinator.
         getDescriptorQueue();
     return descriptorQueue;



_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits