[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[or-cvs] [metrics-db/master] Add raw consensuses and raw votes to database.
Author: Karsten Loesing <karsten.loesing@xxxxxxx>
Date: Tue, 21 Sep 2010 12:55:19 +0200
Subject: Add raw consensuses and raw votes to database.
Commit: 1a299d83369b80cae50a9a9c2fc634c2b9ebb744
---
db/tordir.sql | 21 +++-
.../ernie/db/RelayDescriptorDatabaseImporter.java | 154 +++++++++++++++++--
.../torproject/ernie/db/RelayDescriptorParser.java | 18 ++-
3 files changed, 168 insertions(+), 25 deletions(-)
diff --git a/db/tordir.sql b/db/tordir.sql
index 3e0502c..40b527e 100644
--- a/db/tordir.sql
+++ b/db/tordir.sql
@@ -33,8 +33,8 @@ CREATE TABLE extrainfo (
);
-- TABLE statusentry
--- Contains all of the consensuses published by the directories. Each
--- statusentry references a valid descriptor.
+-- Contains all of the consensus entries published by the directories.
+-- Each statusentry references a valid descriptor.
CREATE TABLE statusentry (
validafter TIMESTAMP WITHOUT TIME ZONE NOT NULL,
nickname CHARACTER VARYING(19) NOT NULL,
@@ -65,6 +65,23 @@ CREATE TABLE statusentry (
CONSTRAINT statusentry_pkey PRIMARY KEY (validafter, fingerprint)
);
+-- TABLE consensus
+-- Contains all of the consensuses published by the directories.
+CREATE TABLE consensus (
+ validafter TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+ rawdesc BYTEA NOT NULL,
+ CONSTRAINT consensus_pkey PRIMARY KEY (validafter)
+);
+
+-- TABLE vote
+-- Contains all of the votes published by the directories
+CREATE TABLE vote (
+ validafter TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+ dirsource CHARACTER(40) NOT NULL,
+ rawdesc BYTEA NOT NULL,
+ CONSTRAINT vote_pkey PRIMARY KEY (validafter, dirsource)
+);
+
-- Create the various indexes we need for searching relays
CREATE INDEX statusentry_address ON statusentry (address);
CREATE INDEX statusentry_descriptor ON statusentry (descriptor);
diff --git a/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java b/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java
index fc16089..588fe27 100644
--- a/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java
+++ b/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java
@@ -26,6 +26,8 @@ public final class RelayDescriptorDatabaseImporter {
private int rdsCount = 0;
private int resCount = 0;
private int rrsCount = 0;
+ private int rcsCount = 0;
+ private int rvsCount = 0;
/**
* Relay descriptor database connection.
@@ -51,6 +53,18 @@ public final class RelayDescriptorDatabaseImporter {
private PreparedStatement psDs;
/**
+ * Prepared statement to check whether a given network status consensus
+ * has been imported into the database before.
+ */
+ private PreparedStatement psCs;
+
+ /**
+ * Prepared statement to check whether a given network status vote has
+ * been imported into the database before.
+ */
+ private PreparedStatement psVs;
+
+ /**
* Prepared statement to insert a network status consensus entry into
* the database.
*/
@@ -68,6 +82,18 @@ public final class RelayDescriptorDatabaseImporter {
private PreparedStatement psE;
/**
+ * Prepared statement to insert a network status consensus into the
+ * database.
+ */
+ private PreparedStatement psC;
+
+ /**
+ * Prepared statement to insert a network status vote into the
+ * database.
+ */
+ private PreparedStatement psV;
+
+ /**
* Logger for this class.
*/
private Logger logger;
@@ -75,6 +101,8 @@ public final class RelayDescriptorDatabaseImporter {
private BufferedWriter statusentryOut;
private BufferedWriter descriptorOut;
private BufferedWriter extrainfoOut;
+ private BufferedWriter consensusOut;
+ private BufferedWriter voteOut;
/**
* Initialize database importer by connecting to the database and
@@ -102,6 +130,10 @@ public final class RelayDescriptorDatabaseImporter {
+ "FROM descriptor WHERE descriptor = ?");
this.psEs = conn.prepareStatement("SELECT COUNT(*) "
+ "FROM extrainfo WHERE extrainfo = ?");
+ this.psCs = conn.prepareStatement("SELECT COUNT(*) "
+ + "FROM consensus WHERE validafter = ?");
+ this.psVs = conn.prepareStatement("SELECT COUNT(*) "
+ + "FROM vote WHERE validafter = ? AND dirsource = ?");
this.psR = conn.prepareStatement("INSERT INTO statusentry "
+ "(validafter, nickname, fingerprint, descriptor, "
+ "published, address, orport, dirport, isauthority, "
@@ -119,6 +151,10 @@ public final class RelayDescriptorDatabaseImporter {
this.psE = conn.prepareStatement("INSERT INTO extrainfo "
+ "(extrainfo, nickname, fingerprint, published, rawdesc) "
+ "VALUES (?, ?, ?, ?, ?)");
+ this.psC = conn.prepareStatement("INSERT INTO consensus "
+ + "(validafter, rawdesc) VALUES (?, ?)");
+ this.psV = conn.prepareStatement("INSERT INTO vote "
+ + "(validafter, dirsource, rawdesc) VALUES (?, ?, ?)");
} catch (SQLException e) {
this.logger.log(Level.WARNING, "Could not connect to database or "
+ "prepare statements.", e);
@@ -134,6 +170,10 @@ public final class RelayDescriptorDatabaseImporter {
rawFilesDirectory + "/descriptor.sql"));
this.extrainfoOut = new BufferedWriter(new FileWriter(
rawFilesDirectory + "/extrainfo.sql"));
+ this.consensusOut = new BufferedWriter(new FileWriter(
+ rawFilesDirectory + "/consensus.sql"));
+ this.voteOut = new BufferedWriter(new FileWriter(
+ rawFilesDirectory + "/vote.sql"));
} catch (IOException e) {
this.logger.log(Level.WARNING, "Could not open raw database "
+ "import files.", e);
@@ -346,6 +386,90 @@ public final class RelayDescriptorDatabaseImporter {
}
/**
+ * Insert network status consensus into database.
+ */
+ public void addConsensus(long validAfter, byte[] rawDescriptor) {
+ try {
+ if (this.psCs != null && this.psC != null) {
+ Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+ Timestamp validAfterTimestamp = new Timestamp(validAfter);
+ this.psCs.setTimestamp(1, validAfterTimestamp, cal);
+ ResultSet rs = psCs.executeQuery();
+ rs.next();
+ if (rs.getInt(1) == 0) {
+ this.psC.clearParameters();
+ this.psC.setTimestamp(1, validAfterTimestamp, cal);
+ this.psC.setBytes(2, rawDescriptor);
+ this.psC.executeUpdate();
+ rcsCount++;
+ if (rcsCount % autoCommitCount == 0) {
+ this.conn.commit();
+ rcsCount = 0;
+ }
+ }
+ }
+ if (this.consensusOut != null) {
+ SimpleDateFormat dateTimeFormat =
+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+ this.consensusOut.write(dateTimeFormat.format(validAfter) + "\t");
+ this.consensusOut.write(PGbytea.toPGString(rawDescriptor).
+ replaceAll("\\\\\\\\", "\\\\\\\\\\\\\\\\") + "\n");
+ }
+ } catch (SQLException e) {
+ this.logger.log(Level.WARNING, "Could not add network status "
+ + "consensus.", e);
+ } catch (IOException e) {
+ this.logger.log(Level.WARNING, "Could not write network status "
+ + "consensus to raw database import file.", e);
+ }
+ }
+
+ /**
+ * Insert network status vote into database.
+ */
+ public void addVote(long validAfter, String dirSource,
+ byte[] rawDescriptor) {
+ try {
+ if (this.psVs != null && this.psV != null) {
+ Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+ Timestamp validAfterTimestamp = new Timestamp(validAfter);
+ this.psVs.setTimestamp(1, validAfterTimestamp, cal);
+ this.psVs.setString(2, dirSource);
+ ResultSet rs = psVs.executeQuery();
+ rs.next();
+ if (rs.getInt(1) == 0) {
+ this.psV.clearParameters();
+ this.psV.setTimestamp(1, validAfterTimestamp, cal);
+ this.psV.setString(2, dirSource);
+ this.psV.setBytes(3, rawDescriptor);
+ this.psV.executeUpdate();
+ rvsCount++;
+ if (rvsCount % autoCommitCount == 0) {
+ this.conn.commit();
+ rvsCount = 0;
+ }
+ }
+ }
+ if (this.voteOut != null) {
+ SimpleDateFormat dateTimeFormat =
+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+ this.voteOut.write(dateTimeFormat.format(validAfter) + "\t"
+ + dirSource + "\t");
+ this.voteOut.write(PGbytea.toPGString(rawDescriptor).
+ replaceAll("\\\\\\\\", "\\\\\\\\\\\\\\\\") + "\n");
+ }
+ } catch (SQLException e) {
+ this.logger.log(Level.WARNING, "Could not add network status vote.",
+ e);
+ } catch (IOException e) {
+ this.logger.log(Level.WARNING, "Could not write network status "
+ + "vote to raw database import file.", e);
+ }
+ }
+
+ /**
* Close the relay descriptor database connection.
*/
public void closeConnection() {
@@ -365,29 +489,25 @@ public final class RelayDescriptorDatabaseImporter {
}
}
/* Close raw import files. */
- if (this.statusentryOut != null) {
- try {
+ try {
+ if (this.statusentryOut != null) {
this.statusentryOut.close();
- } catch (IOException e) {
- this.logger.log(Level.WARNING, "Could not close raw database "
- + "import file.", e);
}
- }
- if (this.descriptorOut != null) {
- try {
+ if (this.descriptorOut != null) {
this.descriptorOut.close();
- } catch (IOException e) {
- this.logger.log(Level.WARNING, "Could not close raw database "
- + "import file.", e);
}
- }
- if (this.extrainfoOut != null) {
- try {
+ if (this.extrainfoOut != null) {
this.extrainfoOut.close();
- } catch (IOException e) {
- this.logger.log(Level.WARNING, "Could not close raw database "
- + "import file.", e);
}
+ if (this.consensusOut != null) {
+ this.consensusOut.close();
+ }
+ if (this.voteOut != null) {
+ this.voteOut.close();
+ }
+ } catch (IOException e) {
+ this.logger.log(Level.WARNING, "Could not close one or more raw "
+ + "database import files.", e);
}
}
}
diff --git a/src/org/torproject/ernie/db/RelayDescriptorParser.java b/src/org/torproject/ernie/db/RelayDescriptorParser.java
index 07e6a90..2e0e17f 100644
--- a/src/org/torproject/ernie/db/RelayDescriptorParser.java
+++ b/src/org/torproject/ernie/db/RelayDescriptorParser.java
@@ -216,12 +216,15 @@ public class RelayDescriptorParser {
}
}
if (isConsensus) {
- if (relayIdentity != null && this.rddi != null) {
- byte[] rawDescriptor = rawStatusEntry.toString().getBytes();
- this.rddi.addStatusEntry(validAfter, nickname,
- relayIdentity, serverDesc, published, address, orPort,
- dirPort, relayFlags, version, bandwidth, ports,
- rawDescriptor);
+ if (this.rddi != null) {
+ this.rddi.addConsensus(validAfter, data);
+ if (relayIdentity != null) {
+ byte[] rawDescriptor = rawStatusEntry.toString().getBytes();
+ this.rddi.addStatusEntry(validAfter, nickname,
+ relayIdentity, serverDesc, published, address, orPort,
+ dirPort, relayFlags, version, bandwidth, ports,
+ rawDescriptor);
+ }
}
if (this.bsfh != null) {
for (String hashedRelayIdentity : hashedRelayIdentities) {
@@ -247,6 +250,9 @@ public class RelayDescriptorParser {
this.chc.processConsensus(validAfterTime, data);
}
} else {
+ if (this.rddi != null) {
+ this.rddi.addVote(validAfter, dirSource, data);
+ }
if (this.rdd != null) {
this.rdd.haveParsedVote(validAfterTime, fingerprint,
serverDescriptors);
--
1.7.1