[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[or-cvs] [metrics-db/master] Add bandwidth histories to database.



Author: Karsten Loesing <karsten.loesing@xxxxxxx>
Date: Thu, 23 Sep 2010 12:16:01 +0200
Subject: Add bandwidth histories to database.
Commit: 2b5617e6f5ed80b3624740ef2a60feedaf1652e1

---
 db/refresh.sql                                     |    1 +
 db/tordir.sql                                      |   99 +++++++++++++++
 .../ernie/db/RelayDescriptorDatabaseImporter.java  |  128 ++++++++++++++++----
 .../torproject/ernie/db/RelayDescriptorParser.java |   37 +++++-
 4 files changed, 235 insertions(+), 30 deletions(-)

diff --git a/db/refresh.sql b/db/refresh.sql
index 9e55016..9b98b1a 100644
--- a/db/refresh.sql
+++ b/db/refresh.sql
@@ -13,6 +13,7 @@ SELECT * FROM refresh_network_size();
 SELECT * FROM refresh_relay_platforms();
 SELECT * FROM refresh_relay_versions();
 SELECT * FROM refresh_total_bandwidth();
+SELECT * FROM refresh_total_bwhist();
 
 -- Clear the updates table, since we have just updated everything.
 DELETE FROM updates;
diff --git a/db/tordir.sql b/db/tordir.sql
index bb74a62..37654de 100644
--- a/db/tordir.sql
+++ b/db/tordir.sql
@@ -32,6 +32,25 @@ CREATE TABLE extrainfo (
     CONSTRAINT extrainfo_pkey PRIMARY KEY (extrainfo)
 );
 
+-- TABLE bandwidth
+-- Contains bandwidth histories contained in extra-info descriptors.
+-- Every row represents a 15-minute interval and can have read, written,
+-- dirread, and dirwritten set or not. We're making sure that there's only
+-- one interval for each extrainfo. However, it's possible that an
+-- interval is contained in another extra-info descriptor of the same
+-- relay. These duplicates need to be filtered when aggregating bandwidth
+-- histories.
+CREATE TABLE bwhist (
+    fingerprint CHARACTER(40) NOT NULL,
+    extrainfo CHARACTER(40) NOT NULL,
+    intervalend TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+    read BIGINT,
+    written BIGINT,
+    dirread BIGINT,
+    dirwritten BIGINT,
+    CONSTRAINT bwhist_pkey PRIMARY KEY (extrainfo, intervalend)
+);
+
 -- TABLE statusentry
 -- Contains all of the consensus entries published by the directories.
 -- Each statusentry references a valid descriptor.
@@ -134,6 +153,19 @@ CREATE TABLE total_bandwidth (
     CONSTRAINT total_bandwidth_pkey PRIMARY KEY(date)
 );
 
+-- TABLE total_bwhist
+-- Contains the total number of read/written and the number of dir bytes
+-- read/written by all relays in the network on a given day. The dir bytes
+-- are an estimate based on the subset of relays that count dir bytes.
+CREATE TABLE total_bwhist (
+    date DATE NOT NULL,
+    read BIGINT,
+    written BIGINT,
+    dirread BIGINT,
+    dirwritten BIGINT,
+    CONSTRAINT total_bwhist_pkey PRIMARY KEY(date)
+);
+
 -- TABLE relay_statuses_per_day
 -- A helper table which is commonly used to update the tables above in the
 -- refresh_* functions.
@@ -227,6 +259,36 @@ AFTER INSERT OR UPDATE OR DELETE
 ON descriptor
     FOR EACH ROW EXECUTE PROCEDURE update_desc();
 
+-- FUNCTION update_bwhist
+-- This keeps the updates table up to date for the time graphs.
+CREATE OR REPLACE FUNCTION update_bwhist() RETURNS TRIGGER AS $$
+    BEGIN
+    IF (TG_OP='INSERT' OR TG_OP='UPDATE') THEN
+      IF (SELECT COUNT(*) FROM updates
+          WHERE DATE = DATE(NEW.intervalend)) = 0 THEN
+          INSERT INTO updates
+          VALUES (DATE(NEW.intervalend));
+      END IF;
+    END IF;
+    IF (TG_OP='DELETE' OR TG_OP='UPDATE') THEN
+      IF (SELECT COUNT(*) FROM updates
+          WHERE DATE = DATE(OLD.intervalend)) = 0 THEN
+          INSERT INTO updates
+          VALUES (DATE(OLD.intervalend));
+      END IF;
+    END IF;
+    RETURN NULL; -- result is ignored since this is an AFTER trigger
+END;
+$$ LANGUAGE plpgsql;
+
+-- TRIGGER update_desc
+-- This calls the function update_desc() each time a row is inserted,
+-- updated, or deleted from the descriptors table.
+CREATE TRIGGER update_bwhist
+AFTER INSERT OR UPDATE OR DELETE
+ON bwhist
+    FOR EACH ROW EXECUTE PROCEDURE update_bwhist();
+
 -- FUNCTION refresh_relay_statuses_per_day()
 -- Updates helper table which is used to refresh the aggregate tables.
 CREATE OR REPLACE FUNCTION refresh_relay_statuses_per_day()
@@ -401,3 +463,40 @@ CREATE OR REPLACE FUNCTION refresh_total_bandwidth() RETURNS INTEGER AS $$
     END;
 $$ LANGUAGE plpgsql;
 
+-- FUNCTION refresh_network_size()
+CREATE OR REPLACE FUNCTION refresh_total_bwhist() RETURNS INTEGER AS $$
+  BEGIN
+  DELETE FROM total_bwhist WHERE date IN (SELECT * FROM updates);
+  INSERT INTO total_bwhist (date, read, written, dirread, dirwritten)
+  SELECT date,
+         SUM(read) AS read,
+         SUM(written) AS written,
+         SUM(CASE WHEN dirwritten IS NULL THEN NULL ELSE written END)
+             AS dirwritten,
+         SUM(CASE WHEN dirread IS NULL THEN NULL ELSE read END) AS dirread
+  FROM (
+    SELECT fingerprint,
+           DATE(intervalend) as date,
+           SUM(read) AS read,
+           SUM(written) AS written,
+           SUM(dirread) AS dirread,
+           SUM(dirwritten) AS dirwritten
+    FROM (
+      SELECT DISTINCT fingerprint,
+                      intervalend,
+                      read,
+                      written,
+                      dirread,
+                      dirwritten
+      FROM bwhist
+      WHERE DATE(intervalend) >= (SELECT MIN(date) FROM updates)
+      AND DATE(intervalend) <= (SELECT MAX(date) FROM updates)
+      AND DATE(intervalend) IN (SELECT date FROM updates)
+    ) byinterval
+    GROUP BY fingerprint, DATE(intervalend)
+  ) byrelay
+  GROUP BY date;
+  RETURN 1;
+  END;
+$$ LANGUAGE plpgsql;
+
diff --git a/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java b/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java
index 588fe27..7eeb9d7 100644
--- a/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java
+++ b/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java
@@ -25,6 +25,7 @@ public final class RelayDescriptorDatabaseImporter {
    */
   private int rdsCount = 0;
   private int resCount = 0;
+  private int rhsCount = 0;
   private int rrsCount = 0;
   private int rcsCount = 0;
   private int rvsCount = 0;
@@ -47,6 +48,12 @@ public final class RelayDescriptorDatabaseImporter {
   private PreparedStatement psEs;
 
   /**
+   * Prepared statement to check whether the bandwidth history of an
+   * extra-info descriptor has been imported into the database before.
+   */
+  private PreparedStatement psHs;
+
+  /**
    * Prepared statement to check whether a given server descriptor has
    * been imported into the database before.
    */
@@ -82,6 +89,12 @@ public final class RelayDescriptorDatabaseImporter {
   private PreparedStatement psE;
 
   /**
+   * Prepared statement to insert the bandwidth history of an extra-info
+   * descriptor into the database.
+   */
+  private PreparedStatement psH;
+
+  /**
    * Prepared statement to insert a network status consensus into the
    * database.
    */
@@ -104,6 +117,8 @@ public final class RelayDescriptorDatabaseImporter {
   private BufferedWriter consensusOut;
   private BufferedWriter voteOut;
 
+  private SimpleDateFormat dateTimeFormat;
+
   /**
    * Initialize database importer by connecting to the database and
    * preparing statements.
@@ -130,6 +145,8 @@ public final class RelayDescriptorDatabaseImporter {
             + "FROM descriptor WHERE descriptor = ?");
         this.psEs = conn.prepareStatement("SELECT COUNT(*) "
             + "FROM extrainfo WHERE extrainfo = ?");
+        this.psHs = conn.prepareStatement("SELECT COUNT(*) "
+            + "FROM bwhist WHERE extrainfo = ?");
         this.psCs = conn.prepareStatement("SELECT COUNT(*) "
             + "FROM consensus WHERE validafter = ?");
         this.psVs = conn.prepareStatement("SELECT COUNT(*) "
@@ -151,6 +168,9 @@ public final class RelayDescriptorDatabaseImporter {
         this.psE = conn.prepareStatement("INSERT INTO extrainfo "
             + "(extrainfo, nickname, fingerprint, published, rawdesc) "
             + "VALUES (?, ?, ?, ?, ?)");
+        this.psH = conn.prepareStatement("INSERT INTO bwhist "
+            + "(fingerprint, extrainfo, intervalend, read, written, "
+            + "dirread, dirwritten) VALUES (?, ?, ?, ?, ?, ?, ?)");
         this.psC = conn.prepareStatement("INSERT INTO consensus "
             + "(validafter, rawdesc) VALUES (?, ?)");
         this.psV = conn.prepareStatement("INSERT INTO vote "
@@ -178,6 +198,9 @@ public final class RelayDescriptorDatabaseImporter {
         this.logger.log(Level.WARNING, "Could not open raw database "
             + "import files.", e);
       }
+
+      this.dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+      this.dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
     }
   }
 
@@ -234,15 +257,12 @@ public final class RelayDescriptorDatabaseImporter {
         }
       }
       if (this.statusentryOut != null) {
-        SimpleDateFormat dateTimeFormat =
-             new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-        dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
         this.statusentryOut.write(
-            dateTimeFormat.format(validAfter) + "\t" + nickname
+            this.dateTimeFormat.format(validAfter) + "\t" + nickname
             + "\t" + fingerprint.toLowerCase() + "\t"
             + descriptor.toLowerCase() + "\t"
-            + dateTimeFormat.format(published) + "\t" + address + "\t"
-            + orPort + "\t" + dirPort + "\t"
+            + this.dateTimeFormat.format(published) + "\t" + address
+            + "\t" + orPort + "\t" + dirPort + "\t"
             + (flags.contains("Authority") ? "t" : "f") + "\t"
             + (flags.contains("BadExit") ? "t" : "f") + "\t"
             + (flags.contains("BadDirectory") ? "t" : "f") + "\t"
@@ -312,16 +332,13 @@ public final class RelayDescriptorDatabaseImporter {
         }
       }
       if (this.descriptorOut != null) {
-        SimpleDateFormat dateTimeFormat =
-             new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-        dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
         this.descriptorOut.write(descriptor.toLowerCase() + "\t"
             + nickname + "\t" + address + "\t" + orPort + "\t" + dirPort
             + "\t" + relayIdentifier + "\t" + bandwidthAvg + "\t"
             + bandwidthBurst + "\t" + bandwidthObserved + "\t"
             + (platform != null && platform.length() > 0
             ? new String(platform.getBytes(), "US-ASCII") : "\\N") + "\t"
-            + dateTimeFormat.format(published) + "\t"
+            + this.dateTimeFormat.format(published) + "\t"
             + (uptime >= 0 ? uptime : "\\N") + "\t"
             + (extraInfoDigest != null ? extraInfoDigest : "\\N") + "\t");
         this.descriptorOut.write(PGbytea.toPGString(rawDescriptor).
@@ -344,10 +361,10 @@ public final class RelayDescriptorDatabaseImporter {
    */
   public void addExtraInfoDescriptor(String extraInfoDigest,
       String nickname, String fingerprint, long published,
-      byte[] rawDescriptor) {
+      byte[] rawDescriptor, SortedMap<String, String> bandwidthHistory) {
     try {
+      Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
       if (this.psEs != null && this.psE != null) {
-        Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
         this.psEs.setString(1, extraInfoDigest);
         ResultSet rs = psEs.executeQuery();
         rs.next();
@@ -366,13 +383,81 @@ public final class RelayDescriptorDatabaseImporter {
           }
         }
       }
+      if (this.psHs != null && this.psH != null) {
+        this.psHs.setString(1, extraInfoDigest);
+        ResultSet rs = this.psHs.executeQuery();
+        rs.next();
+        if (rs.getInt(1) == 0) {
+          this.psH.clearParameters();
+          String lastIntervalEnd = null;
+          List<String> bandwidthHistoryValues = new ArrayList<String>();
+          bandwidthHistoryValues.addAll(bandwidthHistory.values());
+          bandwidthHistoryValues.add("EOL");
+          String readBytes = null, writtenBytes = null,
+              dirReadBytes = null, dirWrittenBytes = null;
+          for (String bandwidthHistoryValue : bandwidthHistoryValues) {
+            String[] entryParts = bandwidthHistoryValue.split(",");
+            String intervalEnd = entryParts[0];
+            if ((intervalEnd.equals("EOL") ||
+                !intervalEnd.equals(lastIntervalEnd)) &&
+                lastIntervalEnd != null) {
+              this.psH.setString(1, fingerprint);
+              this.psH.setString(2, extraInfoDigest);
+              try {
+                this.psH.setTimestamp(3, new Timestamp(Long.parseLong(
+                    lastIntervalEnd)), cal);
+                if (readBytes != null) {
+                  this.psH.setLong(4, Long.parseLong(readBytes));
+                } else {
+                  this.psH.setNull(4, Types.BIGINT);
+                }
+                if (writtenBytes != null) {
+                  this.psH.setLong(5, Long.parseLong(writtenBytes));
+                } else {
+                  this.psH.setNull(5, Types.BIGINT);
+                }
+                if (dirReadBytes != null) {
+                  this.psH.setLong(6, Long.parseLong(dirReadBytes));
+                } else {
+                  this.psH.setNull(6, Types.BIGINT);
+                }
+                if (dirWrittenBytes != null) {
+                  this.psH.setLong(7, Long.parseLong(dirWrittenBytes));
+                } else {
+                  this.psH.setNull(7, Types.BIGINT);
+                }
+              } catch (NumberFormatException e) {
+                break;
+              }
+              this.psH.executeUpdate();
+            }
+            if (intervalEnd.equals("EOL")) {
+              break;
+            }
+            lastIntervalEnd = intervalEnd;
+            String type = entryParts[1];
+            String bytes = entryParts[2];
+            if (type.equals("read-history")) {
+              readBytes = bytes;
+            } else if (type.equals("write-history")) {
+              writtenBytes = bytes;
+            } else if (type.equals("dirreq-read-history")) {
+              dirReadBytes = bytes;
+            } else if (type.equals("dirreq-write-history")) {
+              dirWrittenBytes = bytes;
+            }
+          }
+          rhsCount++;
+          if (rhsCount % autoCommitCount == 0)  {
+            this.conn.commit();
+            rhsCount = 0;
+          }
+        }
+      }
       if (this.extrainfoOut != null) {
-        SimpleDateFormat dateTimeFormat =
-             new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-        dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
         this.extrainfoOut.write(extraInfoDigest.toLowerCase() + "\t"
             + nickname + "\t" + fingerprint.toLowerCase() + "\t"
-            + dateTimeFormat.format(published) + "\t");
+            + this.dateTimeFormat.format(published) + "\t");
         this.extrainfoOut.write(PGbytea.toPGString(rawDescriptor).
             replaceAll("\\\\\\\\", "\\\\\\\\\\\\\\\\") + "\n");
       }
@@ -409,10 +494,8 @@ public final class RelayDescriptorDatabaseImporter {
         }
       }
       if (this.consensusOut != null) {
-        SimpleDateFormat dateTimeFormat =
-             new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-        dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
-        this.consensusOut.write(dateTimeFormat.format(validAfter) + "\t");
+        this.consensusOut.write(this.dateTimeFormat.format(validAfter)
+            + "\t");
         this.consensusOut.write(PGbytea.toPGString(rawDescriptor).
             replaceAll("\\\\\\\\", "\\\\\\\\\\\\\\\\") + "\n");
       }
@@ -452,10 +535,7 @@ public final class RelayDescriptorDatabaseImporter {
         }
       }
       if (this.voteOut != null) {
-        SimpleDateFormat dateTimeFormat =
-             new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-        dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
-        this.voteOut.write(dateTimeFormat.format(validAfter) + "\t"
+        this.voteOut.write(this.dateTimeFormat.format(validAfter) + "\t"
             + dirSource + "\t");
         this.voteOut.write(PGbytea.toPGString(rawDescriptor).
             replaceAll("\\\\\\\\", "\\\\\\\\\\\\\\\\") + "\n");
diff --git a/src/org/torproject/ernie/db/RelayDescriptorParser.java b/src/org/torproject/ernie/db/RelayDescriptorParser.java
index 2e0e17f..d82b041 100644
--- a/src/org/torproject/ernie/db/RelayDescriptorParser.java
+++ b/src/org/torproject/ernie/db/RelayDescriptorParser.java
@@ -76,6 +76,8 @@ public class RelayDescriptorParser {
    */
   private Logger logger;
 
+  private SimpleDateFormat dateTimeFormat;
+
   /**
    * Initializes this class.
    */
@@ -96,6 +98,9 @@ public class RelayDescriptorParser {
 
     /* Initialize logger. */
     this.logger = Logger.getLogger(RelayDescriptorParser.class.getName());
+
+    this.dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    this.dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
   }
 
   public void setRelayDescriptorDownloader(
@@ -351,18 +356,38 @@ public class RelayDescriptorParser {
         long published = -1L;
         String dir = line.split(" ")[2];
         String date = null, v3Reqs = null;
+        SortedMap<String, String> bandwidthHistory =
+            new TreeMap<String, String>();
         boolean skip = false;
         while ((line = br.readLine()) != null) {
           if (line.startsWith("published ")) {
             publishedTime = line.substring("published ".length());
             published = parseFormat.parse(publishedTime).getTime();
+          } else if (line.startsWith("read-history ") ||
+              line.startsWith("write-history ") ||
+              line.startsWith("dirreq-read-history ") ||
+              line.startsWith("dirreq-write-history ")) {
+            String[] parts = line.split(" ");
+            if (parts.length == 6) {
+              String type = parts[0];
+              long intervalEnd = dateTimeFormat.parse(parts[1] + " "
+                  + parts[2]).getTime();
+              try {
+                long intervalLength = Long.parseLong(parts[3].
+                    substring(1));
+                String[] values = parts[5].split(",");
+                for (int i = values.length - 1; i >= 0; i--) {
+                  Long.parseLong(values[i]);
+                  bandwidthHistory.put(intervalEnd + "," + type,
+                      intervalEnd + "," + type + "," + values[i]);
+                  intervalEnd -= intervalLength * 1000L;
+                }
+              } catch (NumberFormatException e) {
+                break;
+              }
+            }
           } else if (line.startsWith("dirreq-stats-end ")) {
             date = line.split(" ")[1];
-            // trusted had very strange dirreq-v3-shares here...
-            // TODO don't check that here, but in DirreqStatsFileHandler
-            skip = dir.equals("8522EB98C91496E80EC238E732594D1509158E77")
-                && (date.equals("2009-09-10") ||
-                    date.equals("2009-09-11"));
           } else if (line.startsWith("dirreq-v3-reqs ")
               && line.length() > "dirreq-v3-reqs ".length()) {
             v3Reqs = line.split(" ")[1];
@@ -410,7 +435,7 @@ public class RelayDescriptorParser {
         }
         if (this.rddi != null && digest != null) {
           this.rddi.addExtraInfoDescriptor(digest, nickname,
-              dir.toLowerCase(), published, data);
+              dir.toLowerCase(), published, data, bandwidthHistory);
         }
       }
     } catch (IOException e) {
-- 
1.7.1