[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [metrics-web/release] Remove advbw column from bandwidth.csv.



commit 09cfdfdff4efc1aa1cc60f53f7f1353a6193e6ad
Author: Karsten Loesing <karsten.loesing@xxxxxxx>
Date:   Mon Nov 12 19:50:46 2018 +0100

    Remove advbw column from bandwidth.csv.
    
    Instead use advbw data from ipv6servers module.
    
    As a result, we can stop aggregating advertised bandwidths in the
    legacy module.
    
    Required schema changes to live tordir databases:
    
    DROP VIEW stats_bandwidth;
    CREATE VIEW stats_bandwidth [...]
    CREATE OR REPLACE FUNCTION refresh_all() [...]
    DROP FUNCTION refresh_bandwidth_flags();
    DROP FUNCTION refresh_relay_statuses_per_day();
    DROP TABLE relay_statuses_per_day;
    DROP TABLE bandwidth_flags;
    DROP TABLE consensus;
    DROP FUNCTION delete_old_descriptor();
    DROP TABLE descriptor;
    
    Part of #28116.
---
 src/main/R/rserver/graphs.R                        |  58 +++---
 .../metrics/stats/ipv6servers/Database.java        |  22 ++
 .../torproject/metrics/stats/ipv6servers/Main.java |   2 +
 .../metrics/stats/servers/Configuration.java       |   1 -
 .../servers/RelayDescriptorDatabaseImporter.java   | 232 +--------------------
 src/main/sql/ipv6servers/init-ipv6servers.sql      |  11 +
 src/main/sql/legacy/tordir.sql                     | 135 +-----------
 7 files changed, 73 insertions(+), 388 deletions(-)

diff --git a/src/main/R/rserver/graphs.R b/src/main/R/rserver/graphs.R
index 9dc8c2d..df108e2 100644
--- a/src/main/R/rserver/graphs.R
+++ b/src/main/R/rserver/graphs.R
@@ -446,16 +446,19 @@ write_platforms <- function(start_p = NULL, end_p = NULL, path_p) {
 }
 
 prepare_bandwidth <- function(start_p, end_p) {
-  read.csv(paste(stats_dir, "bandwidth.csv", sep = ""),
+  advbw <- read.csv(paste(stats_dir, "advbw.csv", sep = ""),
+    colClasses = c("date" = "Date")) %>%
+    transmute(date, variable = "advbw", value = advbw * 8 / 1e9)
+  bwhist <- read.csv(paste(stats_dir, "bandwidth.csv", sep = ""),
     colClasses = c("date" = "Date")) %>%
+    transmute(date, variable = "bwhist", value = (bwread + bwwrite) * 8 / 2e9)
+  rbind(advbw, bwhist) %>%
     filter(if (!is.null(start_p)) date >= as.Date(start_p) else TRUE) %>%
     filter(if (!is.null(end_p)) date <= as.Date(end_p) else TRUE) %>%
-    filter(isexit != "") %>%
-    filter(isguard != "") %>%
-    group_by(date) %>%
-    summarize(advbw = sum(advbw) * 8 / 1e9,
-      bwhist = sum(bwread + bwwrite) * 8 / 2e9) %>%
-    select(date, advbw, bwhist)
+    filter(!is.na(value)) %>%
+    group_by(date, variable) %>%
+    summarize(value = sum(value)) %>%
+    spread(variable, value)
 }
 
 plot_bandwidth <- function(start_p, end_p, path_p) {
@@ -810,33 +813,24 @@ write_connbidirect <- function(start_p = NULL, end_p = NULL, path_p) {
 }
 
 prepare_bandwidth_flags <- function(start_p, end_p) {
-  b <- read.csv(paste(stats_dir, "bandwidth.csv", sep = ""),
-    colClasses = c("date" = "Date"))
-  b <- b %>%
+  advbw <- read.csv(paste(stats_dir, "advbw.csv", sep = ""),
+    colClasses = c("date" = "Date")) %>%
+    transmute(date, isguard, isexit, variable = "advbw",
+      value = advbw * 8 / 1e9)
+  bwhist <- read.csv(paste(stats_dir, "bandwidth.csv", sep = ""),
+    colClasses = c("date" = "Date")) %>%
+    transmute(date, isguard, isexit, variable = "bwhist",
+      value = (bwread + bwwrite) * 8 / 2e9)
+  rbind(advbw, bwhist) %>%
     filter(if (!is.null(start_p)) date >= as.Date(start_p) else TRUE) %>%
     filter(if (!is.null(end_p)) date <= as.Date(end_p) else TRUE) %>%
-    filter(isexit != "") %>%
-    filter(isguard != "")
-  b <- data.frame(date = b$date,
-                  isexit = b$isexit == "t", isguard = b$isguard == "t",
-                  advbw = b$advbw * 8 / 1e9,
-                  bwhist = (b$bwread + b$bwwrite) * 8 / 2e9)
-  b <- rbind(
-    data.frame(b[b$isguard == TRUE, ], flag = "guard"),
-    data.frame(b[b$isexit == TRUE, ], flag = "exit"))
-  b <- data.frame(date = b$date, advbw = b$advbw, bwhist = b$bwhist,
-                  flag = b$flag)
-  b <- aggregate(list(advbw = b$advbw, bwhist = b$bwhist),
-                 by = list(date = b$date, flag = b$flag), FUN = sum,
-                 na.rm = TRUE, na.action = NULL)
-  b <- gather(b, type, value, -c(date, flag))
-  bandwidth <- b[b$value > 0, ]
-  bandwidth <- data.frame(date = bandwidth$date,
-    variable = as.factor(paste(bandwidth$flag, "_", bandwidth$type,
-    sep = "")), value = bandwidth$value)
-  bandwidth$variable <- factor(bandwidth$variable,
-    levels = levels(bandwidth$variable)[c(3, 4, 1, 2)])
-  bandwidth
+    group_by(date, variable) %>%
+    summarize(exit = sum(value[isexit == "t"]),
+      guard = sum(value[isguard == "t"])) %>%
+    gather(flag, value, -date, -variable) %>%
+    unite(variable, flag, variable) %>%
+    mutate(variable = factor(variable,
+      levels = c("guard_advbw", "guard_bwhist", "exit_advbw", "exit_bwhist")))
 }
 
 plot_bandwidth_flags <- function(start_p, end_p, path_p) {
diff --git a/src/main/java/org/torproject/metrics/stats/ipv6servers/Database.java b/src/main/java/org/torproject/metrics/stats/ipv6servers/Database.java
index c3a1fec..b5efe3e 100644
--- a/src/main/java/org/torproject/metrics/stats/ipv6servers/Database.java
+++ b/src/main/java/org/torproject/metrics/stats/ipv6servers/Database.java
@@ -435,6 +435,28 @@ class Database implements AutoCloseable {
     return statistics;
   }
 
+  /** Query the bandwidth_advbw view. */
+  List<String[]> queryAdvbw() throws SQLException {
+    List<String[]> statistics = new ArrayList<>();
+    String columns = "date, isexit, isguard, advbw";
+    statistics.add(columns.split(", "));
+    Statement st = this.connection.createStatement();
+    Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"),
+        Locale.US);
+    String queryString = "SELECT " + columns + " FROM bandwidth_advbw";
+    try (ResultSet rs = st.executeQuery(queryString)) {
+      while (rs.next()) {
+        String[] outputLine = new String[4];
+        outputLine[0] = rs.getDate("date", calendar).toLocalDate().toString();
+        outputLine[1] = rs.getString("isexit");
+        outputLine[2] = rs.getString("isguard");
+        outputLine[3] = getLongFromResultSet(rs, "advbw");
+        statistics.add(outputLine);
+      }
+    }
+    return statistics;
+  }
+
   /** Query the servers_networksize view. */
   List<String[]> queryNetworksize() throws SQLException {
     List<String[]> statistics = new ArrayList<>();
diff --git a/src/main/java/org/torproject/metrics/stats/ipv6servers/Main.java b/src/main/java/org/torproject/metrics/stats/ipv6servers/Main.java
index a91a74f..d322a2e 100644
--- a/src/main/java/org/torproject/metrics/stats/ipv6servers/Main.java
+++ b/src/main/java/org/torproject/metrics/stats/ipv6servers/Main.java
@@ -88,6 +88,8 @@ public class Main {
       log.info("Querying aggregated statistics from the database.");
       new Writer().write(Paths.get(Configuration.output, "ipv6servers.csv"),
           database.queryServersIpv6());
+      new Writer().write(Paths.get(Configuration.output, "advbw.csv"),
+          database.queryAdvbw());
       new Writer().write(Paths.get(Configuration.output, "networksize.csv"),
           database.queryNetworksize());
       new Writer().write(Paths.get(Configuration.output, "relayflags.csv"),
diff --git a/src/main/java/org/torproject/metrics/stats/servers/Configuration.java b/src/main/java/org/torproject/metrics/stats/servers/Configuration.java
index c4597bc..76788df 100644
--- a/src/main/java/org/torproject/metrics/stats/servers/Configuration.java
+++ b/src/main/java/org/torproject/metrics/stats/servers/Configuration.java
@@ -102,7 +102,6 @@ public class Configuration {
     if (this.directoryArchivesDirectories.isEmpty()) {
       String prefix = "../../shared/in/recent/relay-descriptors/";
       return Arrays.asList(new File(prefix + "consensuses/"),
-          new File(prefix + "server-descriptors/"),
           new File(prefix + "extra-infos/"));
     } else {
       return this.directoryArchivesDirectories;
diff --git a/src/main/java/org/torproject/metrics/stats/servers/RelayDescriptorDatabaseImporter.java b/src/main/java/org/torproject/metrics/stats/servers/RelayDescriptorDatabaseImporter.java
index c9a6fa7..2d1ae47 100644
--- a/src/main/java/org/torproject/metrics/stats/servers/RelayDescriptorDatabaseImporter.java
+++ b/src/main/java/org/torproject/metrics/stats/servers/RelayDescriptorDatabaseImporter.java
@@ -9,7 +9,6 @@ import org.torproject.descriptor.DescriptorSourceFactory;
 import org.torproject.descriptor.ExtraInfoDescriptor;
 import org.torproject.descriptor.NetworkStatusEntry;
 import org.torproject.descriptor.RelayNetworkStatusConsensus;
-import org.torproject.descriptor.ServerDescriptor;
 
 import org.postgresql.util.PGbytea;
 
@@ -20,7 +19,6 @@ import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
 import java.sql.CallableStatement;
 import java.sql.Connection;
 import java.sql.DriverManager;
@@ -28,7 +26,6 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Timestamp;
-import java.sql.Types;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
@@ -44,10 +41,6 @@ import java.util.TreeSet;
 /**
  * Parse directory data.
  */
-
-/* TODO Split up this class and move its parts to cron.network,
- * cron.users, and status.relaysearch packages.  Requires extensive
- * changes to the database schema though. */
 public final class RelayDescriptorDatabaseImporter {
 
   /**
@@ -58,20 +51,10 @@ public final class RelayDescriptorDatabaseImporter {
   /* Counters to keep track of the number of records committed before
    * each transaction. */
 
-  private int rdsCount = 0;
-
-  private int resCount = 0;
-
   private int rhsCount = 0;
 
   private int rrsCount = 0;
 
-  private int rcsCount = 0;
-
-  private int rvsCount = 0;
-
-  private int rqsCount = 0;
-
   /**
    * Relay descriptor database connection.
    */
@@ -85,18 +68,6 @@ public final class RelayDescriptorDatabaseImporter {
   private PreparedStatement psSs;
 
   /**
-   * Prepared statement to check whether a given server descriptor has
-   * been imported into the database before.
-   */
-  private PreparedStatement psDs;
-
-  /**
-   * Prepared statement to check whether a given network status consensus
-   * has been imported into the database before.
-   */
-  private PreparedStatement psCs;
-
-  /**
    * Set of dates that have been inserted into the database for being
    * included in the next refresh run.
    */
@@ -115,22 +86,11 @@ public final class RelayDescriptorDatabaseImporter {
   private PreparedStatement psR;
 
   /**
-   * Prepared statement to insert a server descriptor into the database.
-   */
-  private PreparedStatement psD;
-
-  /**
    * Callable statement to insert the bandwidth history of an extra-info
    * descriptor into the database.
    */
   private CallableStatement csH;
 
-  /**
-   * Prepared statement to insert a network status consensus into the
-   * database.
-   */
-  private PreparedStatement psC;
-
   private static Logger log
       = LoggerFactory.getLogger(RelayDescriptorDatabaseImporter.class);
 
@@ -145,21 +105,11 @@ public final class RelayDescriptorDatabaseImporter {
   private BufferedWriter statusentryOut;
 
   /**
-   * Raw import file containing server descriptors.
-   */
-  private BufferedWriter descriptorOut;
-
-  /**
    * Raw import file containing bandwidth histories.
    */
   private BufferedWriter bwhistOut;
 
   /**
-   * Raw import file containing consensuses.
-   */
-  private BufferedWriter consensusOut;
-
-  /**
    * Date format to parse timestamps.
    */
   private SimpleDateFormat dateTimeFormat;
@@ -212,10 +162,6 @@ public final class RelayDescriptorDatabaseImporter {
         /* Prepare statements. */
         this.psSs = conn.prepareStatement("SELECT fingerprint "
             + "FROM statusentry WHERE validafter = ?");
-        this.psDs = conn.prepareStatement("SELECT COUNT(*) "
-            + "FROM descriptor WHERE descriptor = ?");
-        this.psCs = conn.prepareStatement("SELECT COUNT(*) "
-            + "FROM consensus WHERE validafter = ?");
         this.psR = conn.prepareStatement("INSERT INTO statusentry "
             + "(validafter, nickname, fingerprint, descriptor, "
             + "published, address, orport, dirport, isauthority, "
@@ -224,16 +170,8 @@ public final class RelayDescriptorDatabaseImporter {
             + "isvalid, isv2dir, isv3dir, version, bandwidth, ports, "
             + "rawdesc) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, "
             + "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)");
-        this.psD = conn.prepareStatement("INSERT INTO descriptor "
-            + "(descriptor, nickname, address, orport, dirport, "
-            + "fingerprint, bandwidthavg, bandwidthburst, "
-            + "bandwidthobserved, platform, published, uptime, "
-            + "extrainfo) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, "
-            + "?)");
         this.csH = conn.prepareCall("{call insert_bwhist(?, ?, ?, ?, ?, "
             + "?)}");
-        this.psC = conn.prepareStatement("INSERT INTO consensus "
-            + "(validafter) VALUES (?)");
         this.psU = conn.prepareStatement("INSERT INTO scheduled_updates "
             + "(date) VALUES (?)");
         this.scheduledUpdates = new HashSet<>();
@@ -390,95 +328,9 @@ public final class RelayDescriptorDatabaseImporter {
   }
 
   /**
-   * Insert server descriptor into database.
-   */
-  public void addServerDescriptorContents(String descriptor,
-      String nickname, String address, int orPort, int dirPort,
-      String relayIdentifier, long bandwidthAvg, long bandwidthBurst,
-      long bandwidthObserved, String platform, long published,
-      Long uptime, String extraInfoDigest) {
-    if (this.importIntoDatabase) {
-      try {
-        this.addDateToScheduledUpdates(published);
-        this.addDateToScheduledUpdates(
-            published + 24L * 60L * 60L * 1000L);
-        Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
-        this.psDs.setString(1, descriptor);
-        ResultSet rs = psDs.executeQuery();
-        rs.next();
-        if (rs.getInt(1) == 0) {
-          this.psD.clearParameters();
-          this.psD.setString(1, descriptor);
-          this.psD.setString(2, nickname);
-          this.psD.setString(3, address);
-          this.psD.setInt(4, orPort);
-          this.psD.setInt(5, dirPort);
-          this.psD.setString(6, relayIdentifier);
-          this.psD.setLong(7, bandwidthAvg);
-          this.psD.setLong(8, bandwidthBurst);
-          this.psD.setLong(9, bandwidthObserved);
-          /* Remove all non-ASCII characters from the platform string, or
-           * we'll make Postgres unhappy.  Sun's JDK and OpenJDK behave
-           * differently when creating a new String with a given encoding.
-           * That's what the regexp below is for. */
-          this.psD.setString(10, new String(platform.getBytes(),
-              StandardCharsets.US_ASCII).replaceAll("[^\\p{ASCII}]",""));
-          this.psD.setTimestamp(11, new Timestamp(published), cal);
-          if (null != uptime) {
-            this.psD.setLong(12, uptime);
-          } else {
-            this.psD.setNull(12, Types.BIGINT);
-          }
-          this.psD.setString(13, extraInfoDigest);
-          this.psD.executeUpdate();
-          rdsCount++;
-          if (rdsCount % autoCommitCount == 0)  {
-            this.conn.commit();
-          }
-        }
-      } catch (SQLException e) {
-        log.warn("Could not add server "
-            + "descriptor.  We won't make any further SQL requests in "
-            + "this execution.", e);
-        this.importIntoDatabase = false;
-      }
-    }
-    if (this.writeRawImportFiles) {
-      try {
-        if (this.descriptorOut == null) {
-          new File(rawFilesDirectory).mkdirs();
-          this.descriptorOut = new BufferedWriter(new FileWriter(
-              rawFilesDirectory + "/descriptor.sql"));
-          this.descriptorOut.write(" COPY descriptor (descriptor, "
-              + "nickname, address, orport, dirport, fingerprint, "
-              + "bandwidthavg, bandwidthburst, bandwidthobserved, "
-              + "platform, published, uptime, extrainfo) FROM stdin;\n");
-        }
-        this.descriptorOut.write(descriptor.toLowerCase() + "\t"
-            + nickname + "\t" + address + "\t" + orPort + "\t" + dirPort
-            + "\t" + relayIdentifier + "\t" + bandwidthAvg + "\t"
-            + bandwidthBurst + "\t" + bandwidthObserved + "\t"
-            + (platform != null && platform.length() > 0
-            ? new String(platform.getBytes(), StandardCharsets.US_ASCII)
-            : "\\N") + "\t" + this.dateTimeFormat.format(published) + "\t"
-            + (uptime >= 0 ? uptime : "\\N") + "\t"
-            + (extraInfoDigest != null ? extraInfoDigest : "\\N")
-            + "\n");
-      } catch (IOException e) {
-        log.warn("Could not write server "
-            + "descriptor to raw database import file.  We won't make "
-            + "any further attempts to write raw import files in this "
-            + "execution.", e);
-        this.writeRawImportFiles = false;
-      }
-    }
-  }
-
-  /**
    * Insert extra-info descriptor into database.
    */
-  public void addExtraInfoDescriptorContents(String extraInfoDigest,
-      String nickname, String fingerprint, long published,
+  public void addExtraInfoDescriptorContents(String fingerprint, long published,
       List<String> bandwidthHistoryLines) {
     if (!bandwidthHistoryLines.isEmpty()) {
       this.addBandwidthHistory(fingerprint.toLowerCase(), published,
@@ -766,55 +618,6 @@ public final class RelayDescriptorDatabaseImporter {
     }
   }
 
-  /**
-   * Insert network status consensus into database.
-   */
-  public void addConsensus(long validAfter) {
-    if (this.importIntoDatabase) {
-      try {
-        this.addDateToScheduledUpdates(validAfter);
-        Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
-        Timestamp validAfterTimestamp = new Timestamp(validAfter);
-        this.psCs.setTimestamp(1, validAfterTimestamp, cal);
-        ResultSet rs = psCs.executeQuery();
-        rs.next();
-        if (rs.getInt(1) == 0) {
-          this.psC.clearParameters();
-          this.psC.setTimestamp(1, validAfterTimestamp, cal);
-          this.psC.executeUpdate();
-          rcsCount++;
-          if (rcsCount % autoCommitCount == 0)  {
-            this.conn.commit();
-          }
-        }
-      } catch (SQLException e) {
-        log.warn("Could not add network status "
-            + "consensus.  We won't make any further SQL requests in "
-            + "this execution.", e);
-        this.importIntoDatabase = false;
-      }
-    }
-    if (this.writeRawImportFiles) {
-      try {
-        if (this.consensusOut == null) {
-          new File(rawFilesDirectory).mkdirs();
-          this.consensusOut = new BufferedWriter(new FileWriter(
-              rawFilesDirectory + "/consensus.sql"));
-          this.consensusOut.write(" COPY consensus (validafter) "
-              + "FROM stdin;\n");
-        }
-        String validAfterString = this.dateTimeFormat.format(validAfter);
-        this.consensusOut.write(validAfterString + "\n");
-      } catch (IOException e) {
-        log.warn("Could not write network status "
-            + "consensus to raw database import file.  We won't make "
-            + "any further attempts to write raw import files in this "
-            + "execution.", e);
-        this.writeRawImportFiles = false;
-      }
-    }
-  }
-
   /** Imports relay descriptors into the database. */
   public void importRelayDescriptors() {
     log.info("Importing files in directories " + archivesDirectories
@@ -834,8 +637,6 @@ public final class RelayDescriptorDatabaseImporter {
         if (descriptor instanceof RelayNetworkStatusConsensus) {
           this.addRelayNetworkStatusConsensus(
               (RelayNetworkStatusConsensus) descriptor);
-        } else if (descriptor instanceof ServerDescriptor) {
-          this.addServerDescriptor((ServerDescriptor) descriptor);
         } else if (descriptor instanceof ExtraInfoDescriptor) {
           this.addExtraInfoDescriptor((ExtraInfoDescriptor) descriptor);
         }
@@ -862,18 +663,6 @@ public final class RelayDescriptorDatabaseImporter {
           statusEntry.getBandwidth(), statusEntry.getPortList(),
           statusEntry.getStatusEntryBytes());
     }
-    this.addConsensus(consensus.getValidAfterMillis());
-  }
-
-  private void addServerDescriptor(ServerDescriptor descriptor) {
-    this.addServerDescriptorContents(
-        descriptor.getDigestSha1Hex(), descriptor.getNickname(),
-        descriptor.getAddress(), descriptor.getOrPort(),
-        descriptor.getDirPort(), descriptor.getFingerprint(),
-        descriptor.getBandwidthRate(), descriptor.getBandwidthBurst(),
-        descriptor.getBandwidthObserved(), descriptor.getPlatform(),
-        descriptor.getPublishedMillis(), descriptor.getUptime(),
-        descriptor.getExtraInfoDigestSha1Hex());
   }
 
   private void addExtraInfoDescriptor(ExtraInfoDescriptor descriptor) {
@@ -892,8 +681,7 @@ public final class RelayDescriptorDatabaseImporter {
       bandwidthHistoryLines.add(
           descriptor.getDirreqReadHistory().getLine());
     }
-    this.addExtraInfoDescriptorContents(descriptor.getDigestSha1Hex(),
-        descriptor.getNickname(),
+    this.addExtraInfoDescriptorContents(
         descriptor.getFingerprint().toLowerCase(),
         descriptor.getPublishedMillis(), bandwidthHistoryLines);
   }
@@ -904,12 +692,8 @@ public final class RelayDescriptorDatabaseImporter {
   public void closeConnection() {
 
     /* Log stats about imported descriptors. */
-    log.info("Finished importing relay "
-        + "descriptors: {} consensuses, {} network status entries, {} "
-        + "votes, {} server descriptors, {} extra-info descriptors, {} "
-        + "bandwidth history elements, and {} dirreq stats elements",
-        rcsCount, rrsCount, rvsCount, rdsCount, resCount, rhsCount,
-        rqsCount);
+    log.info("Finished importing relay descriptors: {} network status entries "
+        + "and {} bandwidth history elements", rrsCount, rhsCount);
 
     /* Insert scheduled updates a second time, just in case the refresh
      * run has started since inserting them the first time in which case
@@ -951,18 +735,10 @@ public final class RelayDescriptorDatabaseImporter {
         this.statusentryOut.write("\\.\n");
         this.statusentryOut.close();
       }
-      if (this.descriptorOut != null) {
-        this.descriptorOut.write("\\.\n");
-        this.descriptorOut.close();
-      }
       if (this.bwhistOut != null) {
         this.bwhistOut.write("\\.\n");
         this.bwhistOut.close();
       }
-      if (this.consensusOut != null) {
-        this.consensusOut.write("\\.\n");
-        this.consensusOut.close();
-      }
     } catch (IOException e) {
       log.warn("Could not close one or more raw database import files.", e);
     }
diff --git a/src/main/sql/ipv6servers/init-ipv6servers.sql b/src/main/sql/ipv6servers/init-ipv6servers.sql
index b478a49..c94a19d 100644
--- a/src/main/sql/ipv6servers/init-ipv6servers.sql
+++ b/src/main/sql/ipv6servers/init-ipv6servers.sql
@@ -312,6 +312,17 @@ GROUP BY DATE(valid_after), server, guard_relay, exit_relay, announced_ipv6,
 ORDER BY valid_after_date, server, guard_relay, exit_relay, announced_ipv6,
   exiting_ipv6_relay, reachable_ipv6_relay;
 
+-- View on advertised bandwidth by Exit/Guard flag combination.
+CREATE OR REPLACE VIEW bandwidth_advbw AS
+SELECT valid_after_date AS date,
+  exit_relay AS isexit,
+  guard_relay AS isguard,
+  FLOOR(SUM(advertised_bandwidth_bytes_sum_avg)) AS advbw
+FROM ipv6servers
+WHERE server = 'relay'
+GROUP BY date, isexit, isguard
+ORDER BY date, isexit, isguard;
+
 -- View on the number of running servers by relay flag.
 CREATE OR REPLACE VIEW servers_flags_complete AS
 WITH included_statuses AS (
diff --git a/src/main/sql/legacy/tordir.sql b/src/main/sql/legacy/tordir.sql
index f1d6767..dfe7b5d 100644
--- a/src/main/sql/legacy/tordir.sql
+++ b/src/main/sql/legacy/tordir.sql
@@ -3,33 +3,6 @@
 
 CREATE LANGUAGE plpgsql;
 
--- TABLE descriptor
--- Contains all of the descriptors published by routers.
-CREATE TABLE descriptor (
-    descriptor CHARACTER(40) NOT NULL,
-    nickname CHARACTER VARYING(19) NOT NULL,
-    address CHARACTER VARYING(15) NOT NULL,
-    orport INTEGER NOT NULL,
-    dirport INTEGER NOT NULL,
-    fingerprint CHARACTER(40) NOT NULL,
-    bandwidthavg BIGINT NOT NULL,
-    bandwidthburst BIGINT NOT NULL,
-    bandwidthobserved BIGINT NOT NULL,
-    platform CHARACTER VARYING(256),
-    published TIMESTAMP WITHOUT TIME ZONE NOT NULL,
-    uptime BIGINT,
-    extrainfo CHARACTER(40),
-    CONSTRAINT descriptor_pkey PRIMARY KEY (descriptor)
-);
-
-CREATE OR REPLACE FUNCTION delete_old_descriptor()
-RETURNS INTEGER AS $$
-    BEGIN
-    DELETE FROM descriptor WHERE DATE(published) < current_date - 14;
-    RETURN 1;
-    END;
-$$ LANGUAGE plpgsql;
-
 -- Contains bandwidth histories reported by relays in extra-info
 -- descriptors. Each row contains the reported bandwidth in 15-minute
 -- intervals for each relay and date.
@@ -97,22 +70,6 @@ RETURNS INTEGER AS $$
     END;
 $$ LANGUAGE plpgsql;
 
--- TABLE consensus
--- Contains all of the consensuses published by the directories.
-CREATE TABLE consensus (
-    validafter TIMESTAMP WITHOUT TIME ZONE NOT NULL,
-    CONSTRAINT consensus_pkey PRIMARY KEY (validafter)
-);
-
--- TABLE bandwidth_flags
-CREATE TABLE bandwidth_flags (
-    date DATE NOT NULL,
-    isexit BOOLEAN NOT NULL,
-    isguard BOOLEAN NOT NULL,
-    bwadvertised BIGINT NOT NULL,
-    CONSTRAINT bandwidth_flags_pkey PRIMARY KEY(date, isexit, isguard)
-);
-
 -- TABLE bwhist_flags
 CREATE TABLE bwhist_flags (
     date DATE NOT NULL,
@@ -149,15 +106,6 @@ CREATE TABLE user_stats (
     CONSTRAINT user_stats_pkey PRIMARY KEY(date, country)
 );
 
--- TABLE relay_statuses_per_day
--- A helper table which is commonly used to update the tables above in the
--- refresh_* functions.
-CREATE TABLE relay_statuses_per_day (
-    date DATE NOT NULL,
-    count INTEGER NOT NULL,
-    CONSTRAINT relay_statuses_per_day_pkey PRIMARY KEY(date)
-);
-
 -- Dates to be included in the next refresh run.
 CREATE TABLE scheduled_updates (
     id SERIAL,
@@ -174,24 +122,6 @@ CREATE TABLE updates (
     date DATE
 );
 
--- FUNCTION refresh_relay_statuses_per_day()
--- Updates helper table which is used to refresh the aggregate tables.
-CREATE OR REPLACE FUNCTION refresh_relay_statuses_per_day()
-RETURNS INTEGER AS $$
-    BEGIN
-    DELETE FROM relay_statuses_per_day
-    WHERE date IN (SELECT date FROM updates);
-    INSERT INTO relay_statuses_per_day (date, count)
-    SELECT DATE(validafter) AS date, COUNT(*) AS count
-    FROM consensus
-    WHERE DATE(validafter) >= (SELECT MIN(date) FROM updates)
-    AND DATE(validafter) <= (SELECT MAX(date) FROM updates)
-    AND DATE(validafter) IN (SELECT date FROM updates)
-    GROUP BY DATE(validafter);
-    RETURN 1;
-    END;
-$$ LANGUAGE plpgsql;
-
 CREATE OR REPLACE FUNCTION array_sum (BIGINT[]) RETURNS BIGINT AS $$
   SELECT SUM($1[i])::bigint
   FROM generate_series(array_lower($1, 1), array_upper($1, 1)) index(i);
@@ -247,45 +177,11 @@ $$ LANGUAGE plpgsql;
 
 -- refresh_* functions
 -- The following functions keep their corresponding aggregate tables
--- up-to-date. They should be called every time ERNIE is run, or when new
--- data is finished being added to the descriptor or statusentry tables.
+-- up-to-date. They should be called every time this module is run, or when new
+-- data is finished being added to the statusentry tables.
 -- They find what new data has been entered or updated based on the
 -- updates table.
 
-CREATE OR REPLACE FUNCTION refresh_bandwidth_flags() RETURNS INTEGER AS $$
-    DECLARE
-        min_date TIMESTAMP WITHOUT TIME ZONE;
-        max_date TIMESTAMP WITHOUT TIME ZONE;
-    BEGIN
-
-    min_date := (SELECT MIN(date) FROM updates);
-    max_date := (SELECT MAX(date) + 1 FROM updates);
-
-  DELETE FROM bandwidth_flags WHERE date IN (SELECT date FROM updates);
-  EXECUTE '
-  INSERT INTO bandwidth_flags (date, isexit, isguard, bwadvertised)
-  SELECT DATE(validafter) AS date,
-      BOOL_OR(isexit) AS isexit,
-      BOOL_OR(isguard) AS isguard,
-      (SUM(LEAST(bandwidthavg, bandwidthobserved))
-      / relay_statuses_per_day.count)::BIGINT AS bwadvertised
-    FROM descriptor RIGHT JOIN statusentry
-    ON descriptor.descriptor = statusentry.descriptor
-    JOIN relay_statuses_per_day
-    ON DATE(validafter) = relay_statuses_per_day.date
-    WHERE isrunning = TRUE
-          AND validafter >= ''' || min_date || '''
-          AND validafter < ''' || max_date || '''
-          AND DATE(validafter) IN (SELECT date FROM updates)
-          AND relay_statuses_per_day.date >= ''' || min_date || '''
-          AND relay_statuses_per_day.date < ''' || max_date || '''
-          AND DATE(relay_statuses_per_day.date) IN
-              (SELECT date FROM updates)
-    GROUP BY DATE(validafter), isexit, isguard, relay_statuses_per_day.count';
-  RETURN 1;
-  END;
-$$ LANGUAGE plpgsql;
-
 CREATE OR REPLACE FUNCTION refresh_bwhist_flags() RETURNS INTEGER AS $$
     DECLARE
         min_date TIMESTAMP WITHOUT TIME ZONE;
@@ -391,18 +287,12 @@ CREATE OR REPLACE FUNCTION refresh_all() RETURNS INTEGER AS $$
     DELETE FROM updates;
     RAISE NOTICE '% Copying scheduled dates.', timeofday();
     INSERT INTO updates SELECT * FROM scheduled_updates;
-    RAISE NOTICE '% Refreshing relay statuses per day.', timeofday();
-    PERFORM refresh_relay_statuses_per_day();
-    RAISE NOTICE '% Refreshing total relay bandwidth.', timeofday();
-    PERFORM refresh_bandwidth_flags();
     RAISE NOTICE '% Refreshing bandwidth history.', timeofday();
     PERFORM refresh_bwhist_flags();
     RAISE NOTICE '% Refreshing user statistics.', timeofday();
     PERFORM refresh_user_stats();
     RAISE NOTICE '% Deleting processed dates.', timeofday();
     DELETE FROM scheduled_updates WHERE id IN (SELECT id FROM updates);
-    RAISE NOTICE '% Deleting old descriptors.', timeofday();
-    PERFORM delete_old_descriptor();
     RAISE NOTICE '% Deleting old bandwidth histories.', timeofday();
     PERFORM delete_old_bwhist();
     RAISE NOTICE '% Deleting old status entries.', timeofday();
@@ -414,23 +304,14 @@ $$ LANGUAGE plpgsql;
 
 -- View for exporting bandwidth statistics.
 CREATE VIEW stats_bandwidth AS
-  (SELECT COALESCE(bandwidth_flags.date, bwhist_flags.date) AS date,
-  COALESCE(bandwidth_flags.isexit, bwhist_flags.isexit) AS isexit,
-  COALESCE(bandwidth_flags.isguard, bwhist_flags.isguard) AS isguard,
-  bandwidth_flags.bwadvertised AS advbw,
-  CASE WHEN bwhist_flags.read IS NOT NULL
-  THEN bwhist_flags.read / 86400 END AS bwread,
-  CASE WHEN bwhist_flags.written IS NOT NULL
-  THEN bwhist_flags.written / 86400 END AS bwwrite,
+  (SELECT date, isexit, isguard,
+  read / 86400 AS bwread,
+  written / 86400 AS bwwrite,
   NULL AS dirread, NULL AS dirwrite
-  FROM bandwidth_flags FULL OUTER JOIN bwhist_flags
-  ON bandwidth_flags.date = bwhist_flags.date
-  AND bandwidth_flags.isexit = bwhist_flags.isexit
-  AND bandwidth_flags.isguard = bwhist_flags.isguard
-  WHERE COALESCE(bandwidth_flags.date, bwhist_flags.date) <
-  current_date - 2)
+  FROM bwhist_flags
+  WHERE date < current_date - 2)
 UNION ALL
-  (SELECT date, NULL AS isexit, NULL AS isguard, NULL AS advbw,
+  (SELECT date, NULL AS isexit, NULL AS isguard,
   NULL AS bwread, NULL AS bwwrite,
   FLOOR(CAST(dr AS NUMERIC) / CAST(86400 AS NUMERIC)) AS dirread,
   FLOOR(CAST(dw AS NUMERIC) / CAST(86400 AS NUMERIC)) AS dirwrite



_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits