[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[or-cvs] [metrics-db/master] Add raw consensuses and raw votes to database.



Author: Karsten Loesing <karsten.loesing@xxxxxxx>
Date: Tue, 21 Sep 2010 12:55:19 +0200
Subject: Add raw consensuses and raw votes to database.
Commit: 1a299d83369b80cae50a9a9c2fc634c2b9ebb744

---
 db/tordir.sql                                      |   21 +++-
 .../ernie/db/RelayDescriptorDatabaseImporter.java  |  154 +++++++++++++++++--
 .../torproject/ernie/db/RelayDescriptorParser.java |   18 ++-
 3 files changed, 168 insertions(+), 25 deletions(-)

diff --git a/db/tordir.sql b/db/tordir.sql
index 3e0502c..40b527e 100644
--- a/db/tordir.sql
+++ b/db/tordir.sql
@@ -33,8 +33,8 @@ CREATE TABLE extrainfo (
 );
 
 -- TABLE statusentry
--- Contains all of the consensuses published by the directories. Each
--- statusentry references a valid descriptor.
+-- Contains all of the consensus entries published by the directories.
+-- Each statusentry references a valid descriptor.
 CREATE TABLE statusentry (
     validafter TIMESTAMP WITHOUT TIME ZONE NOT NULL,
     nickname CHARACTER VARYING(19) NOT NULL,
@@ -65,6 +65,23 @@ CREATE TABLE statusentry (
     CONSTRAINT statusentry_pkey PRIMARY KEY (validafter, fingerprint)
 );
 
+-- TABLE consensus
+-- Contains all of the consensuses published by the directories.
+CREATE TABLE consensus (
+    validafter TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+    rawdesc BYTEA NOT NULL,
+    CONSTRAINT consensus_pkey PRIMARY KEY (validafter)
+);
+
+-- TABLE vote
+-- Contains all of the votes published by the directories
+CREATE TABLE vote (
+    validafter TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+    dirsource CHARACTER(40) NOT NULL,
+    rawdesc BYTEA NOT NULL,
+    CONSTRAINT vote_pkey PRIMARY KEY (validafter, dirsource)
+);
+
 -- Create the various indexes we need for searching relays
 CREATE INDEX statusentry_address ON statusentry (address);
 CREATE INDEX statusentry_descriptor ON statusentry (descriptor);
diff --git a/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java b/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java
index fc16089..588fe27 100644
--- a/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java
+++ b/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java
@@ -26,6 +26,8 @@ public final class RelayDescriptorDatabaseImporter {
   private int rdsCount = 0;
   private int resCount = 0;
   private int rrsCount = 0;
+  private int rcsCount = 0;
+  private int rvsCount = 0;
 
   /**
    * Relay descriptor database connection.
@@ -51,6 +53,18 @@ public final class RelayDescriptorDatabaseImporter {
   private PreparedStatement psDs;
 
   /**
+   * Prepared statement to check whether a given network status consensus
+   * has been imported into the database before.
+   */
+  private PreparedStatement psCs;
+
+  /**
+   * Prepared statement to check whether a given network status vote has
+   * been imported into the database before.
+   */
+  private PreparedStatement psVs;
+
+  /**
    * Prepared statement to insert a network status consensus entry into
    * the database.
    */
@@ -68,6 +82,18 @@ public final class RelayDescriptorDatabaseImporter {
   private PreparedStatement psE;
 
   /**
+   * Prepared statement to insert a network status consensus into the
+   * database.
+   */
+  private PreparedStatement psC;
+
+  /**
+   * Prepared statement to insert a network status vote into the
+   * database.
+   */
+  private PreparedStatement psV;
+
+  /**
    * Logger for this class.
    */
   private Logger logger;
@@ -75,6 +101,8 @@ public final class RelayDescriptorDatabaseImporter {
   private BufferedWriter statusentryOut;
   private BufferedWriter descriptorOut;
   private BufferedWriter extrainfoOut;
+  private BufferedWriter consensusOut;
+  private BufferedWriter voteOut;
 
   /**
    * Initialize database importer by connecting to the database and
@@ -102,6 +130,10 @@ public final class RelayDescriptorDatabaseImporter {
             + "FROM descriptor WHERE descriptor = ?");
         this.psEs = conn.prepareStatement("SELECT COUNT(*) "
             + "FROM extrainfo WHERE extrainfo = ?");
+        this.psCs = conn.prepareStatement("SELECT COUNT(*) "
+            + "FROM consensus WHERE validafter = ?");
+        this.psVs = conn.prepareStatement("SELECT COUNT(*) "
+            + "FROM vote WHERE validafter = ? AND dirsource = ?");
         this.psR = conn.prepareStatement("INSERT INTO statusentry "
             + "(validafter, nickname, fingerprint, descriptor, "
             + "published, address, orport, dirport, isauthority, "
@@ -119,6 +151,10 @@ public final class RelayDescriptorDatabaseImporter {
         this.psE = conn.prepareStatement("INSERT INTO extrainfo "
             + "(extrainfo, nickname, fingerprint, published, rawdesc) "
             + "VALUES (?, ?, ?, ?, ?)");
+        this.psC = conn.prepareStatement("INSERT INTO consensus "
+            + "(validafter, rawdesc) VALUES (?, ?)");
+        this.psV = conn.prepareStatement("INSERT INTO vote "
+            + "(validafter, dirsource, rawdesc) VALUES (?, ?, ?)");
       } catch (SQLException e) {
         this.logger.log(Level.WARNING, "Could not connect to database or "
             + "prepare statements.", e);
@@ -134,6 +170,10 @@ public final class RelayDescriptorDatabaseImporter {
             rawFilesDirectory + "/descriptor.sql"));
         this.extrainfoOut = new BufferedWriter(new FileWriter(
             rawFilesDirectory + "/extrainfo.sql"));
+        this.consensusOut = new BufferedWriter(new FileWriter(
+            rawFilesDirectory + "/consensus.sql"));
+        this.voteOut = new BufferedWriter(new FileWriter(
+            rawFilesDirectory + "/vote.sql"));
       } catch (IOException e) {
         this.logger.log(Level.WARNING, "Could not open raw database "
             + "import files.", e);
@@ -346,6 +386,90 @@ public final class RelayDescriptorDatabaseImporter {
   }
 
   /**
+   * Insert network status consensus into database.
+   */
+  public void addConsensus(long validAfter, byte[] rawDescriptor) {
+    try {
+      if (this.psCs != null && this.psC != null) {
+        Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+        Timestamp validAfterTimestamp = new Timestamp(validAfter);
+        this.psCs.setTimestamp(1, validAfterTimestamp, cal);
+        ResultSet rs = psCs.executeQuery();
+        rs.next();
+        if (rs.getInt(1) == 0) {
+          this.psC.clearParameters();
+          this.psC.setTimestamp(1, validAfterTimestamp, cal);
+          this.psC.setBytes(2, rawDescriptor);
+          this.psC.executeUpdate();
+          rcsCount++;
+          if (rcsCount % autoCommitCount == 0)  {
+            this.conn.commit();
+            rcsCount = 0;
+          }
+        }
+      }
+      if (this.consensusOut != null) {
+        SimpleDateFormat dateTimeFormat =
+             new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+        this.consensusOut.write(dateTimeFormat.format(validAfter) + "\t");
+        this.consensusOut.write(PGbytea.toPGString(rawDescriptor).
+            replaceAll("\\\\\\\\", "\\\\\\\\\\\\\\\\") + "\n");
+      }
+    } catch (SQLException e) {
+      this.logger.log(Level.WARNING, "Could not add network status "
+          + "consensus.", e);
+    } catch (IOException e) {
+      this.logger.log(Level.WARNING, "Could not write network status "
+          + "consensus to raw database import file.", e);
+    }
+  }
+
+  /**
+   * Insert network status vote into database.
+   */
+  public void addVote(long validAfter, String dirSource,
+      byte[] rawDescriptor) {
+    try {
+      if (this.psVs != null && this.psV != null) {
+        Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+        Timestamp validAfterTimestamp = new Timestamp(validAfter);
+        this.psVs.setTimestamp(1, validAfterTimestamp, cal);
+        this.psVs.setString(2, dirSource);
+        ResultSet rs = psVs.executeQuery();
+        rs.next();
+        if (rs.getInt(1) == 0) {
+          this.psV.clearParameters();
+          this.psV.setTimestamp(1, validAfterTimestamp, cal);
+          this.psV.setString(2, dirSource);
+          this.psV.setBytes(3, rawDescriptor);
+          this.psV.executeUpdate();
+          rvsCount++;
+          if (rvsCount % autoCommitCount == 0)  {
+            this.conn.commit();
+            rvsCount = 0;
+          }
+        }
+      }
+      if (this.voteOut != null) {
+        SimpleDateFormat dateTimeFormat =
+             new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+        dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+        this.voteOut.write(dateTimeFormat.format(validAfter) + "\t"
+            + dirSource + "\t");
+        this.voteOut.write(PGbytea.toPGString(rawDescriptor).
+            replaceAll("\\\\\\\\", "\\\\\\\\\\\\\\\\") + "\n");
+      }
+    } catch (SQLException e) {
+      this.logger.log(Level.WARNING, "Could not add network status vote.",
+          e);
+    } catch (IOException e) {
+      this.logger.log(Level.WARNING, "Could not write network status "
+          + "vote to raw database import file.", e);
+    }
+  }
+
+  /**
    * Close the relay descriptor database connection.
    */
   public void closeConnection() {
@@ -365,29 +489,25 @@ public final class RelayDescriptorDatabaseImporter {
       }
     }
     /* Close raw import files. */
-    if (this.statusentryOut != null) {
-      try {
+    try {
+      if (this.statusentryOut != null) {
         this.statusentryOut.close();
-      } catch (IOException e) {
-        this.logger.log(Level.WARNING, "Could not close raw database "
-            + "import file.", e);
       }
-    }
-    if (this.descriptorOut != null) {
-      try {
+      if (this.descriptorOut != null) {
         this.descriptorOut.close();
-      } catch (IOException e) {
-        this.logger.log(Level.WARNING, "Could not close raw database "
-            + "import file.", e);
       }
-    }
-    if (this.extrainfoOut != null) {
-      try {
+      if (this.extrainfoOut != null) {
         this.extrainfoOut.close();
-      } catch (IOException e) {
-        this.logger.log(Level.WARNING, "Could not close raw database "
-            + "import file.", e);
       }
+      if (this.consensusOut != null) {
+        this.consensusOut.close();
+      }
+      if (this.voteOut != null) {
+        this.voteOut.close();
+      }
+    } catch (IOException e) {
+      this.logger.log(Level.WARNING, "Could not close one or more raw "
+          + "database import files.", e);
     }
   }
 }
diff --git a/src/org/torproject/ernie/db/RelayDescriptorParser.java b/src/org/torproject/ernie/db/RelayDescriptorParser.java
index 07e6a90..2e0e17f 100644
--- a/src/org/torproject/ernie/db/RelayDescriptorParser.java
+++ b/src/org/torproject/ernie/db/RelayDescriptorParser.java
@@ -216,12 +216,15 @@ public class RelayDescriptorParser {
           }
         }
         if (isConsensus) {
-          if (relayIdentity != null && this.rddi != null) {
-            byte[] rawDescriptor = rawStatusEntry.toString().getBytes();
-            this.rddi.addStatusEntry(validAfter, nickname,
-                relayIdentity, serverDesc, published, address, orPort,
-                dirPort, relayFlags, version, bandwidth, ports,
-                rawDescriptor);
+          if (this.rddi != null) {
+            this.rddi.addConsensus(validAfter, data);
+            if (relayIdentity != null) {
+              byte[] rawDescriptor = rawStatusEntry.toString().getBytes();
+              this.rddi.addStatusEntry(validAfter, nickname,
+                  relayIdentity, serverDesc, published, address, orPort,
+                  dirPort, relayFlags, version, bandwidth, ports,
+                  rawDescriptor);
+            }
           }
           if (this.bsfh != null) {
             for (String hashedRelayIdentity : hashedRelayIdentities) {
@@ -247,6 +250,9 @@ public class RelayDescriptorParser {
             this.chc.processConsensus(validAfterTime, data);
           }
         } else {
+          if (this.rddi != null) {
+            this.rddi.addVote(validAfter, dirSource, data);
+          }
           if (this.rdd != null) {
             this.rdd.haveParsedVote(validAfterTime, fingerprint,
                 serverDescriptors);
-- 
1.7.1