[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[or-cvs] [metrics-db/master] Improve database schema and speed up importing.
Author: Karsten Loesing <karsten.loesing@xxxxxxx>
Date: Fri, 10 Sep 2010 11:16:11 +0200
Subject: Improve database schema and speed up importing.
Commit: fe9d66badeee6a1ccc4b267d80ec5eb9da12fd3b
Improve database schema by adding more relay descriptor parts to the
database.
Speed up importing by exporting to to files that can be read by psql's
\copy command.
---
build.xml | 2 +-
config | 8 +
db/tordir.sql | 24 ++
src/org/torproject/ernie/db/Configuration.java | 16 +-
src/org/torproject/ernie/db/Main.java | 8 +-
.../ernie/db/RelayDescriptorDatabaseImporter.java | 371 +++++++++++++++-----
.../torproject/ernie/db/RelayDescriptorParser.java | 88 ++++-
7 files changed, 399 insertions(+), 118 deletions(-)
diff --git a/build.xml b/build.xml
index 49376e0..bddac1d 100644
--- a/build.xml
+++ b/build.xml
@@ -36,7 +36,7 @@
destdir="${classes}"
excludes="org/torproject/ernie/web/"
debug="true" debuglevel="lines,source"
- classpath="lib/commons-codec-1.4.jar;lib/commons-compress-1.0.jar"/>
+ classpath="lib/commons-codec-1.4.jar;lib/commons-compress-1.0.jar;lib/postgresql-8.4-701.jdbc4.jar"/>
</target>
<target name="run" depends="compile">
<java classpath="${classes};lib/commons-codec-1.4.jar:lib/commons-compress-1.0.jar;lib/postgresql-8.4-701.jdbc4.jar"
diff --git a/config b/config
index 695e268..64ce8ff 100644
--- a/config
+++ b/config
@@ -86,6 +86,14 @@
## JDBC string for relay descriptor database
#RelayDescriptorDatabaseJDBC jdbc:postgresql://localhost/tordir?user=ernie&password=password
#
+## Write relay descriptors to raw text files for importing them into a
+## database using PostgreSQL's \copy command
+#WriteRelayDescriptorsRawFiles 0
+#
+## Relative path to directory to write raw text files; note that existing
+## files will be overwritten!
+#RelayDescriptorRawFilesDirectory pg-import/
+#
## Write statistics about the current consensus and votes to the
## website
#WriteConsensusHealth 0
diff --git a/db/tordir.sql b/db/tordir.sql
index a421730..af4f64e 100644
--- a/db/tordir.sql
+++ b/db/tordir.sql
@@ -5,6 +5,7 @@
-- Contains all of the descriptors published by routers.
CREATE TABLE descriptor (
descriptor CHARACTER(40) NOT NULL,
+ nickname CHARACTER VARYING(19) NOT NULL,
address CHARACTER VARYING(15) NOT NULL,
orport INTEGER NOT NULL,
dirport INTEGER NOT NULL,
@@ -14,15 +15,34 @@ CREATE TABLE descriptor (
platform CHARACTER VARYING(256),
published TIMESTAMP WITHOUT TIME ZONE NOT NULL,
uptime BIGINT,
+ extrainfo CHARACTER(40),
+ rawdesc BYTEA NOT NULL,
CONSTRAINT descriptor_pkey PRIMARY KEY (descriptor)
);
+-- TABLE extrainfo
+-- Contains all of the extra-info descriptors published by the routers.
+CREATE TABLE extrainfo (
+ extrainfo CHARACTER(40) NOT NULL,
+ nickname CHARACTER VARYING(19) NOT NULL,
+ fingerprint CHARACTER(40) NOT NULL,
+ published TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+ rawdesc BYTEA NOT NULL,
+ CONSTRAINT extrainfo_pkey PRIMARY KEY (extrainfo)
+);
+
-- TABLE statusentry
-- Contains all of the consensuses published by the directories. Each
-- statusentry references a valid descriptor.
CREATE TABLE statusentry (
validafter TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+ nickname CHARACTER VARYING(19) NOT NULL,
+ fingerprint CHARACTER(40) NOT NULL,
descriptor CHARACTER(40) NOT NULL,
+ published TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+ address CHARACTER VARYING(15) NOT NULL,
+ orport INTEGER NOT NULL,
+ dirport INTEGER NOT NULL,
isauthority BOOLEAN DEFAULT FALSE NOT NULL,
isbadexit BOOLEAN DEFAULT FALSE NOT NULL,
isbaddirectory BOOLEAN DEFAULT FALSE NOT NULL,
@@ -37,6 +57,10 @@ CREATE TABLE statusentry (
isvalid BOOLEAN DEFAULT FALSE NOT NULL,
isv2dir BOOLEAN DEFAULT FALSE NOT NULL,
isv3dir BOOLEAN DEFAULT FALSE NOT NULL,
+ version CHARACTER VARYING(50),
+ bandwidth BIGINT,
+ ports TEXT,
+ rawdesc BYTEA NOT NULL,
CONSTRAINT statusentry_pkey PRIMARY KEY (validafter, descriptor)
);
diff --git a/src/org/torproject/ernie/db/Configuration.java b/src/org/torproject/ernie/db/Configuration.java
index f7910e7..5fb5e0e 100644
--- a/src/org/torproject/ernie/db/Configuration.java
+++ b/src/org/torproject/ernie/db/Configuration.java
@@ -39,6 +39,8 @@ public class Configuration {
private boolean writeRelayDescriptorDatabase = false;
private String relayDescriptorDatabaseJdbc =
"jdbc:postgresql://localhost/tordir?user=ernie&password=password";
+ private boolean writeRelayDescriptorsRawFiles = false;
+ private String relayDescriptorRawFilesDirectory = "pg-import/";
private boolean writeSanitizedBridges = false;
private String sanitizedBridgesWriteDirectory = "sanitized-bridges/";
private boolean importSanitizedBridges = false;
@@ -141,6 +143,11 @@ public class Configuration {
line.split(" ")[1]) != 0;
} else if (line.startsWith("RelayDescriptorDatabaseJDBC")) {
this.relayDescriptorDatabaseJdbc = line.split(" ")[1];
+ } else if (line.startsWith("WriteRelayDescriptorsRawFiles")) {
+ this.writeRelayDescriptorsRawFiles = Integer.parseInt(
+ line.split(" ")[1]) != 0;
+ } else if (line.startsWith("RelayDescriptorRawFilesDirectory")) {
+ this.relayDescriptorRawFilesDirectory = line.split(" ")[1];
} else if (line.startsWith("WriteSanitizedBridges")) {
this.writeSanitizedBridges = Integer.parseInt(
line.split(" ")[1]) != 0;
@@ -249,7 +256,8 @@ public class Configuration {
if ((this.importCachedRelayDescriptors ||
this.importDirectoryArchives || this.downloadRelayDescriptors) &&
!(this.writeDirectoryArchives ||
- this.writeRelayDescriptorDatabase || this.writeConsensusStats ||
+ this.writeRelayDescriptorDatabase ||
+ this.writeRelayDescriptorsRawFiles || this.writeConsensusStats ||
this.writeDirreqStats || this.writeBridgeStats ||
this.writeServerDescriptorStats || this.writeConsensusHealth)) {
logger.warning("We are configured to import/download relay "
@@ -338,6 +346,12 @@ public class Configuration {
public String getRelayDescriptorDatabaseJDBC() {
return this.relayDescriptorDatabaseJdbc;
}
+ public boolean getWriteRelayDescriptorsRawFiles() {
+ return this.writeRelayDescriptorsRawFiles;
+ }
+ public String getRelayDescriptorRawFilesDirectory() {
+ return this.relayDescriptorRawFilesDirectory;
+ }
public boolean getWriteSanitizedBridges() {
return this.writeSanitizedBridges;
}
diff --git a/src/org/torproject/ernie/db/Main.java b/src/org/torproject/ernie/db/Main.java
index 5cf8d91..f463684 100644
--- a/src/org/torproject/ernie/db/Main.java
+++ b/src/org/torproject/ernie/db/Main.java
@@ -57,9 +57,13 @@ public class Main {
// Prepare writing relay descriptors to database
RelayDescriptorDatabaseImporter rddi =
- config.getWriteRelayDescriptorDatabase() ?
+ config.getWriteRelayDescriptorDatabase() ||
+ config.getWriteRelayDescriptorsRawFiles() ?
new RelayDescriptorDatabaseImporter(
- config.getRelayDescriptorDatabaseJDBC()) : null;
+ config.getWriteRelayDescriptorDatabase() ?
+ config.getRelayDescriptorDatabaseJDBC() : null,
+ config.getWriteRelayDescriptorsRawFiles() ?
+ config.getRelayDescriptorRawFilesDirectory() : null) : null;
// Prepare relay descriptor parser (only if we are writing stats or
// directory archives to disk)
diff --git a/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java b/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java
index 71d25b8..af65743 100644
--- a/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java
+++ b/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java
@@ -2,9 +2,12 @@
* See LICENSE for licensing information */
package org.torproject.ernie.db;
+import java.io.*;
import java.sql.*;
+import java.text.*;
import java.util.*;
import java.util.logging.*;
+import org.postgresql.util.*;
/**
* Parse directory data.
@@ -21,6 +24,7 @@ public final class RelayDescriptorDatabaseImporter {
* Keep track of the number of records committed before each transaction
*/
private int rdsCount = 0;
+ private int resCount = 0;
private int rrsCount = 0;
/**
@@ -35,6 +39,12 @@ public final class RelayDescriptorDatabaseImporter {
private PreparedStatement psRs;
/**
+ * Prepared statement to check whether a given extra-info descriptor has
+ * been imported into the database before.
+ */
+ private PreparedStatement psEs;
+
+ /**
* Prepared statement to check whether a given server descriptor has
* been imported into the database before.
*/
@@ -52,135 +62,282 @@ public final class RelayDescriptorDatabaseImporter {
private PreparedStatement psD;
/**
+ * Prepared statement to insert an extra-info descriptor into the
+ * database.
+ */
+ private PreparedStatement psE;
+
+ /**
* Logger for this class.
*/
private Logger logger;
+ private BufferedWriter statusentryOut;
+ private BufferedWriter descriptorOut;
+ private BufferedWriter extrainfoOut;
+
/**
* Initialize database importer by connecting to the database and
* preparing statements.
*/
- public RelayDescriptorDatabaseImporter(String connectionURL) {
+ public RelayDescriptorDatabaseImporter(String connectionURL,
+ String rawFilesDirectory) {
/* Initialize logger. */
this.logger = Logger.getLogger(
RelayDescriptorDatabaseImporter.class.getName());
- try {
- /* Connect to database. */
- this.conn = DriverManager.getConnection(connectionURL);
-
- /* Turn autocommit off */
- this.conn.setAutoCommit(false);
-
- /* Prepare statements. */
- this.psRs = conn.prepareStatement("SELECT COUNT(*) "
- + "FROM statusentry WHERE validafter = ? AND descriptor = ?");
- this.psDs = conn.prepareStatement("SELECT COUNT(*) "
- + "FROM descriptor WHERE descriptor = ?");
- this.psR = conn.prepareStatement("INSERT INTO statusentry "
- + "(validafter, descriptor, isauthority, isbadexit, "
- + "isbaddirectory, isexit, isfast, isguard, ishsdir, isnamed, "
- + "isstable, isrunning, isunnamed, isvalid, isv2dir, isv3dir) "
- + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)");
- this.psD = conn.prepareStatement("INSERT INTO descriptor "
- + "(descriptor, address, orport, dirport, bandwidthavg, "
- + "bandwidthburst, bandwidthobserved, platform, published, "
- + "uptime) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)");
+ if (connectionURL != null) {
+ try {
+ /* Connect to database. */
+ this.conn = DriverManager.getConnection(connectionURL);
- } catch (SQLException e) {
- this.logger.log(Level.WARNING, "Could not connect to database or "
- + "prepare statements.", e);
+ /* Turn autocommit off */
+ this.conn.setAutoCommit(false);
+
+ /* Prepare statements. */
+ this.psRs = conn.prepareStatement("SELECT COUNT(*) "
+ + "FROM statusentry WHERE validafter = ? AND descriptor = ?");
+ this.psDs = conn.prepareStatement("SELECT COUNT(*) "
+ + "FROM descriptor WHERE descriptor = ?");
+ this.psEs = conn.prepareStatement("SELECT COUNT(*) "
+ + "FROM extrainfo WHERE extrainfo = ?");
+ this.psR = conn.prepareStatement("INSERT INTO statusentry "
+ + "(validafter, nickname, fingerprint, descriptor, "
+ + "published, address, orport, dirport, isauthority, "
+ + "isbadexit, isbaddirectory, isexit, isfast, isguard, "
+ + "ishsdir, isnamed, isstable, isrunning, isunnamed, "
+ + "isvalid, isv2dir, isv3dir, version, bandwidth, ports, "
+ + "rawdesc) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, "
+ + "?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)");
+ this.psD = conn.prepareStatement("INSERT INTO descriptor "
+ + "(descriptor, nickname, address, orport, dirport, "
+ + "bandwidthavg, bandwidthburst, bandwidthobserved, "
+ + "platform, published, uptime, extrainfo, rawdesc) "
+ + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)");
+ this.psE = conn.prepareStatement("INSERT INTO extrainfo "
+ + "(extrainfo, nickname, fingerprint, published, rawdesc) "
+ + "VALUES (?, ?, ?, ?, ?)");
+ } catch (SQLException e) {
+ this.logger.log(Level.WARNING, "Could not connect to database or "
+ + "prepare statements.", e);
+ }
+ }
+
+ if (rawFilesDirectory != null) {
+ try {
+ new File(rawFilesDirectory).mkdirs();
+ this.statusentryOut = new BufferedWriter(new FileWriter(
+ rawFilesDirectory + "/statusentry.sql"));
+ this.descriptorOut = new BufferedWriter(new FileWriter(
+ rawFilesDirectory + "/descriptor.sql"));
+ this.extrainfoOut = new BufferedWriter(new FileWriter(
+ rawFilesDirectory + "/extrainfo.sql"));
+ } catch (IOException e) {
+ this.logger.log(Level.WARNING, "Could not open raw database "
+ + "import files.", e);
+ }
}
}
/**
* Insert network status consensus entry into database.
*/
- public void addStatusEntry(long validAfter, String descriptor,
- SortedSet<String> flags) {
- if (this.psRs == null || this.psR == null) {
- return;
- }
+ public void addStatusEntry(long validAfter, String nickname,
+ String fingerprint, String descriptor, long published,
+ String address, long orPort, long dirPort,
+ SortedSet<String> flags, String version, long bandwidth,
+ String ports, byte[] rawDescriptor) {
try {
- Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
- Timestamp validAfterTimestamp = new Timestamp(validAfter);
- this.psRs.setTimestamp(1, validAfterTimestamp, cal);
- this.psRs.setString(2, descriptor);
- ResultSet rs = psRs.executeQuery();
- rs.next();
- if (rs.getInt(1) > 0) {
- return;
+ if (this.psRs != null && this.psR != null) {
+ Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+ Timestamp validAfterTimestamp = new Timestamp(validAfter);
+ this.psRs.setTimestamp(1, validAfterTimestamp, cal);
+ this.psRs.setString(2, descriptor);
+ ResultSet rs = psRs.executeQuery();
+ rs.next();
+ if (rs.getInt(1) > 0) {
+ return;
+ }
+ this.psR.clearParameters();
+ this.psR.setTimestamp(1, validAfterTimestamp, cal);
+ this.psR.setString(2, nickname);
+ this.psR.setString(3, fingerprint);
+ this.psR.setString(4, descriptor);
+ this.psR.setTimestamp(5, new Timestamp(published), cal);
+ this.psR.setString(6, address);
+ this.psR.setLong(7, orPort);
+ this.psR.setLong(8, dirPort);
+ this.psR.setBoolean(9, flags.contains("Authority"));
+ this.psR.setBoolean(10, flags.contains("BadExit"));
+ this.psR.setBoolean(11, flags.contains("BadDirectory"));
+ this.psR.setBoolean(12, flags.contains("Exit"));
+ this.psR.setBoolean(13, flags.contains("Fast"));
+ this.psR.setBoolean(14, flags.contains("Guard"));
+ this.psR.setBoolean(15, flags.contains("HSDir"));
+ this.psR.setBoolean(16, flags.contains("Named"));
+ this.psR.setBoolean(17, flags.contains("Stable"));
+ this.psR.setBoolean(18, flags.contains("Running"));
+ this.psR.setBoolean(19, flags.contains("Unnamed"));
+ this.psR.setBoolean(20, flags.contains("Valid"));
+ this.psR.setBoolean(21, flags.contains("V2Dir"));
+ this.psR.setBoolean(22, flags.contains("V3Dir"));
+ this.psR.setString(23, version);
+ this.psR.setLong(24, bandwidth);
+ this.psR.setString(25, ports);
+ this.psR.setBytes(26, rawDescriptor);
+ this.psR.executeUpdate();
+ rrsCount++;
+ if (rrsCount % autoCommitCount == 0) {
+ this.conn.commit();
+ rrsCount = 0;
+ }
}
- this.psR.clearParameters();
- this.psR.setTimestamp(1, validAfterTimestamp, cal);
- this.psR.setString(2, descriptor);
- this.psR.setBoolean(3, flags.contains("Authority"));
- this.psR.setBoolean(4, flags.contains("BadExit"));
- this.psR.setBoolean(5, flags.contains("BadDirectory"));
- this.psR.setBoolean(6, flags.contains("Exit"));
- this.psR.setBoolean(7, flags.contains("Fast"));
- this.psR.setBoolean(8, flags.contains("Guard"));
- this.psR.setBoolean(9, flags.contains("HSDir"));
- this.psR.setBoolean(10, flags.contains("Named"));
- this.psR.setBoolean(11, flags.contains("Stable"));
- this.psR.setBoolean(12, flags.contains("Running"));
- this.psR.setBoolean(13, flags.contains("Unnamed"));
- this.psR.setBoolean(14, flags.contains("Valid"));
- this.psR.setBoolean(15, flags.contains("V2Dir"));
- this.psR.setBoolean(16, flags.contains("V3Dir"));
- this.psR.executeUpdate();
- rrsCount++;
- if (rrsCount % autoCommitCount == 0) {
- this.conn.commit();
- rrsCount = 0;
+ if (this.statusentryOut != null) {
+ SimpleDateFormat dateTimeFormat =
+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+ this.statusentryOut.write(
+ dateTimeFormat.format(validAfter) + "\t" + nickname
+ + "\t" + fingerprint.toLowerCase() + "\t"
+ + descriptor.toLowerCase() + "\t"
+ + dateTimeFormat.format(published) + "\t" + address + "\t"
+ + orPort + "\t" + dirPort + "\t"
+ + (flags.contains("Authority") ? "t" : "f") + "\t"
+ + (flags.contains("BadExit") ? "t" : "f") + "\t"
+ + (flags.contains("BadDirectory") ? "t" : "f") + "\t"
+ + (flags.contains("Exit") ? "t" : "f") + "\t"
+ + (flags.contains("Fast") ? "t" : "f") + "\t"
+ + (flags.contains("Guard") ? "t" : "f") + "\t"
+ + (flags.contains("HSDir") ? "t" : "f") + "\t"
+ + (flags.contains("Named") ? "t" : "f") + "\t"
+ + (flags.contains("Stable") ? "t" : "f") + "\t"
+ + (flags.contains("Running") ? "t" : "f") + "\t"
+ + (flags.contains("Unnamed") ? "t" : "f") + "\t"
+ + (flags.contains("Valid") ? "t" : "f") + "\t"
+ + (flags.contains("V2Dir") ? "t" : "f") + "\t"
+ + (flags.contains("V3Dir") ? "t" : "f") + "\t"
+ + version + "\t" + bandwidth + "\t" + ports + "\t");
+ this.statusentryOut.write(PGbytea.toPGString(rawDescriptor).
+ replaceAll("\\\\\\\\", "\\\\\\\\\\\\\\\\") + "\n");
}
-
} catch (SQLException e) {
this.logger.log(Level.WARNING, "Could not add network status "
+ "consensus entry.", e);
+ } catch (IOException e) {
+ this.logger.log(Level.WARNING, "Could not write network status "
+ + "consensus entry to raw database import file.", e);
}
}
/**
* Insert server descriptor into database.
*/
- public void addServerDescriptor(String descriptor, String address,
- int orPort, int dirPort, long bandwidthAvg, long bandwidthBurst,
- long bandwidthObserved, String platform, long published,
- long uptime) {
- if (this.psDs == null || this.psD == null) {
- return;
- }
+ public void addServerDescriptor(String descriptor, String nickname,
+ String address, int orPort, int dirPort, long bandwidthAvg,
+ long bandwidthBurst, long bandwidthObserved, String platform,
+ long published, long uptime, String extraInfoDigest,
+ byte[] rawDescriptor) {
try {
- Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
- this.psDs.setString(1, descriptor);
- ResultSet rs = psDs.executeQuery();
- rs.next();
- if (rs.getInt(1) > 0) {
- return;
+ if (this.psDs != null && this.psD != null) {
+ Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+ this.psDs.setString(1, descriptor);
+ ResultSet rs = psDs.executeQuery();
+ rs.next();
+ if (rs.getInt(1) > 0) {
+ return;
+ }
+ this.psD.clearParameters();
+ this.psD.setString(1, descriptor);
+ this.psD.setString(2, nickname);
+ this.psD.setString(3, address);
+ this.psD.setInt(4, orPort);
+ this.psD.setInt(5, dirPort);
+ this.psD.setLong(6, bandwidthAvg);
+ this.psD.setLong(7, bandwidthBurst);
+ this.psD.setLong(8, bandwidthObserved);
+ this.psD.setString(9, new String(platform.getBytes(),
+ "US-ASCII"));
+ this.psD.setTimestamp(10, new Timestamp(published), cal);
+ this.psD.setLong(11, uptime);
+ this.psD.setString(12, extraInfoDigest);
+ this.psD.setBytes(13, rawDescriptor);
+ this.psD.executeUpdate();
+ rdsCount++;
+ if (rdsCount % autoCommitCount == 0) {
+ this.conn.commit();
+ rdsCount = 0;
+ }
}
- this.psD.clearParameters();
- this.psD.setString(1, descriptor);
- this.psD.setString(2, address);
- this.psD.setInt(3, orPort);
- this.psD.setInt(4, dirPort);
- this.psD.setLong(5, bandwidthAvg);
- this.psD.setLong(6, bandwidthBurst);
- this.psD.setLong(7, bandwidthObserved);
- this.psD.setString(8, platform);
- this.psD.setTimestamp(9, new Timestamp(published), cal);
- this.psD.setLong(10, uptime);
- this.psD.executeUpdate();
- rdsCount++;
- if (rdsCount % autoCommitCount == 0) {
- this.conn.commit();
- rdsCount = 0;
+ if (this.descriptorOut != null) {
+ SimpleDateFormat dateTimeFormat =
+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+ this.descriptorOut.write(descriptor.toLowerCase() + "\t"
+ + nickname + "\t" + address + "\t" + orPort + "\t" + dirPort
+ + "\t" + bandwidthAvg + "\t" + bandwidthBurst + "\t"
+ + bandwidthObserved + "\t" + new String(platform.getBytes(),
+ "US-ASCII") + "\t" + dateTimeFormat.format(published) + "\t"
+ + uptime + "\t" + extraInfoDigest + "\t");
+ this.descriptorOut.write(PGbytea.toPGString(rawDescriptor).
+ replaceAll("\\\\\\\\", "\\\\\\\\\\\\\\\\") + "\n");
}
-
+ } catch (UnsupportedEncodingException e) {
+ this.logger.log(Level.WARNING, "Could not add server descriptor.",
+ e);
} catch (SQLException e) {
this.logger.log(Level.WARNING, "Could not add server descriptor.",
e);
+ } catch (IOException e) {
+ this.logger.log(Level.WARNING, "Could not write server descriptor "
+ + "to raw database import file.", e);
+ }
+ }
+
+ /**
+ * Insert extra-info descriptor into database.
+ */
+ public void addExtraInfoDescriptor(String extraInfoDigest,
+ String nickname, String fingerprint, long published,
+ byte[] rawDescriptor) {
+ try {
+ if (this.psEs != null && this.psE != null) {
+ Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
+ this.psEs.setString(1, extraInfoDigest);
+ ResultSet rs = psEs.executeQuery();
+ rs.next();
+ if (rs.getInt(1) > 0) {
+ return;
+ }
+ this.psE.clearParameters();
+ this.psE.setString(1, extraInfoDigest);
+ this.psE.setString(2, nickname);
+ this.psE.setString(3, fingerprint);
+ this.psE.setTimestamp(4, new Timestamp(published), cal);
+ this.psE.setBytes(5, rawDescriptor);
+ this.psE.executeUpdate();
+ resCount++;
+ if (resCount % autoCommitCount == 0) {
+ this.conn.commit();
+ resCount = 0;
+ }
+ }
+ if (this.extrainfoOut != null) {
+ SimpleDateFormat dateTimeFormat =
+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+ this.extrainfoOut.write(extraInfoDigest.toLowerCase() + "\t"
+ + nickname + "\t" + fingerprint.toLowerCase() + "\t"
+ + dateTimeFormat.format(published) + "\t");
+ this.extrainfoOut.write(PGbytea.toPGString(rawDescriptor).
+ replaceAll("\\\\\\\\", "\\\\\\\\\\\\\\\\") + "\n");
+ }
+ } catch (SQLException e) {
+ this.logger.log(Level.WARNING, "Could not add extra-info "
+ + "descriptor.", e);
+ } catch (IOException e) {
+ this.logger.log(Level.WARNING, "Could not write extra-info "
+ + "descriptor to raw database import file.", e);
}
}
@@ -191,9 +348,9 @@ public final class RelayDescriptorDatabaseImporter {
/* commit any stragglers before closing */
try {
this.conn.commit();
- }
- catch (SQLException e) {
- this.logger.log(Level.WARNING, "Could not commit final records to database", e);
+ } catch (SQLException e) {
+ this.logger.log(Level.WARNING, "Could not commit final records to "
+ + "database", e);
}
try {
this.conn.close();
@@ -201,5 +358,31 @@ public final class RelayDescriptorDatabaseImporter {
this.logger.log(Level.WARNING, "Could not close database "
+ "connection.", e);
}
+ /* Close raw import files. */
+ if (this.statusentryOut != null) {
+ try {
+ this.statusentryOut.close();
+ } catch (IOException e) {
+ this.logger.log(Level.WARNING, "Could not close raw database "
+ + "import file.", e);
+ }
+ }
+ if (this.descriptorOut != null) {
+ try {
+ this.descriptorOut.close();
+ } catch (IOException e) {
+ this.logger.log(Level.WARNING, "Could not close raw database "
+ + "import file.", e);
+ }
+ }
+ if (this.extrainfoOut != null) {
+ try {
+ this.extrainfoOut.close();
+ } catch (IOException e) {
+ this.logger.log(Level.WARNING, "Could not close raw database "
+ + "import file.", e);
+ }
+ }
}
}
+
diff --git a/src/org/torproject/ernie/db/RelayDescriptorParser.java b/src/org/torproject/ernie/db/RelayDescriptorParser.java
index 5582cfb..9dfac56 100644
--- a/src/org/torproject/ernie/db/RelayDescriptorParser.java
+++ b/src/org/torproject/ernie/db/RelayDescriptorParser.java
@@ -126,13 +126,17 @@ public class RelayDescriptorParser {
boolean isConsensus = true;
int exit = 0, fast = 0, guard = 0, running = 0, stable = 0;
String validAfterTime = null, descriptorIdentity = null,
- serverDesc = null;
+ nickname = null, relayIdentity = null, serverDesc = null,
+ version = null, ports = null;
StringBuilder descriptorIdentities = new StringBuilder();
- String fingerprint = null, dirSource = null;
- long validAfter = -1L;
+ String fingerprint = null, dirSource = null, address = null;
+ long validAfter = -1L, published = -1L, bandwidth = -1L,
+ orPort = 0L, dirPort = 0L;
SortedSet<String> dirSources = new TreeSet<String>();
SortedSet<String> serverDescriptors = new TreeSet<String>();
SortedSet<String> hashedRelayIdentities = new TreeSet<String>();
+ SortedSet<String> relayFlags = null;
+ StringBuilder rawStatusEntry = null;
while ((line = br.readLine()) != null) {
if (line.equals("vote-status vote")) {
isConsensus = false;
@@ -146,20 +150,39 @@ public class RelayDescriptorParser {
} else if (line.startsWith("fingerprint ")) {
fingerprint = line.split(" ")[1];
} else if (line.startsWith("r ")) {
- String publishedTime = line.split(" ")[4] + " "
- + line.split(" ")[5];
- String relayIdentity = Hex.encodeHexString(
- Base64.decodeBase64(line.split(" ")[2] + "=")).
+ if (relayIdentity != null && this.rddi != null) {
+ byte[] rawDescriptor = rawStatusEntry.toString().getBytes();
+ this.rddi.addStatusEntry(validAfter, nickname,
+ relayIdentity, serverDesc, published, address, orPort,
+ dirPort, relayFlags, version, bandwidth, ports,
+ rawDescriptor);
+ relayFlags = null;
+ version = null;
+ bandwidth = -1L;
+ ports = null;
+ }
+ rawStatusEntry = new StringBuilder(line + "\n");
+ String[] parts = line.split(" ");
+ String publishedTime = parts[4] + " " + parts[5];
+ nickname = parts[1];
+ relayIdentity = Hex.encodeHexString(
+ Base64.decodeBase64(parts[2] + "=")).
toLowerCase();
serverDesc = Hex.encodeHexString(Base64.decodeBase64(
- line.split(" ")[3] + "=")).toLowerCase();
+ parts[3] + "=")).toLowerCase();
serverDescriptors.add(publishedTime + "," + relayIdentity
+ "," + serverDesc);
hashedRelayIdentities.add(DigestUtils.shaHex(
- Base64.decodeBase64(line.split(" ")[2] + "=")).
+ Base64.decodeBase64(parts[2] + "=")).
toUpperCase());
- descriptorIdentity = line.split(" ")[3];
+ descriptorIdentity = parts[3];
+ published = parseFormat.parse(parts[4] + " " + parts[5]).
+ getTime();
+ address = parts[6];
+ orPort = Long.parseLong(parts[7]);
+ dirPort = Long.parseLong(parts[8]);
} else if (line.startsWith("s ")) {
+ rawStatusEntry.append(line + "\n");
if (line.contains(" Running")) {
exit += line.contains(" Exit") ? 1 : 0;
fast += line.contains(" Fast") ? 1 : 0;
@@ -168,17 +191,36 @@ public class RelayDescriptorParser {
running++;
descriptorIdentities.append("," + descriptorIdentity);
}
- if (this.rddi != null) {
- SortedSet<String> flags = new TreeSet<String>();
- if (line.length() > 2) {
- for (String flag : line.substring(2).split(" ")) {
- flags.add(flag);
- }
+ relayFlags = new TreeSet<String>();
+ if (line.length() > 2) {
+ for (String flag : line.substring(2).split(" ")) {
+ relayFlags.add(flag);
+ }
+ }
+ } else if (line.startsWith("v ")) {
+ rawStatusEntry.append(line + "\n");
+ version = line.substring(2);
+ } else if (line.startsWith("w ")) {
+ rawStatusEntry.append(line + "\n");
+ String[] parts = line.split(" ");
+ for (String part : parts) {
+ if (part.startsWith("Bandwidth=")) {
+ bandwidth = Long.parseLong(part.substring(
+ "Bandwidth=".length()));
}
- this.rddi.addStatusEntry(validAfter, serverDesc, flags);
}
+ } else if (line.startsWith("p ")) {
+ rawStatusEntry.append(line + "\n");
+ ports = line.substring(2);
}
}
+ if (relayIdentity != null && this.rddi != null) {
+ byte[] rawDescriptor = rawStatusEntry.toString().getBytes();
+ this.rddi.addStatusEntry(validAfter, nickname,
+ relayIdentity, serverDesc, published, address, orPort,
+ dirPort, relayFlags, version, bandwidth, ports,
+ rawDescriptor);
+ }
if (isConsensus) {
if (this.bsfh != null) {
for (String hashedRelayIdentity : hashedRelayIdentities) {
@@ -233,6 +275,7 @@ public class RelayDescriptorParser {
publishedTime = null, bandwidthLine = null,
extraInfoDigest = null, relayIdentifier = null;
String[] parts = line.split(" ");
+ String nickname = parts[1];
String address = parts[2];
int orPort = Integer.parseInt(parts[3]);
int dirPort = Integer.parseInt(parts[4]);
@@ -290,11 +333,12 @@ public class RelayDescriptorParser {
long bandwidthBurst = Long.parseLong(bwParts[2]);
long bandwidthObserved = Long.parseLong(bwParts[3]);
String platform = platformLine.substring("platform ".length());
- this.rddi.addServerDescriptor(digest, address, orPort, dirPort,
- bandwidthAvg, bandwidthBurst, bandwidthObserved, platform,
- published, uptime);
+ this.rddi.addServerDescriptor(digest, nickname, address, orPort,
+ dirPort, bandwidthAvg, bandwidthBurst, bandwidthObserved,
+ platform, published, uptime, extraInfoDigest, data);
}
} else if (line.startsWith("extra-info ")) {
+ String nickname = line.split(" ")[1];
String publishedTime = null, relayIdentifier = line.split(" ")[2];
long published = -1L;
String dir = line.split(" ")[2];
@@ -356,6 +400,10 @@ public class RelayDescriptorParser {
this.rdd.haveParsedExtraInfoDescriptor(publishedTime,
relayIdentifier.toLowerCase(), digest);
}
+ if (this.rddi != null && digest != null) {
+ this.rddi.addExtraInfoDescriptor(digest, nickname,
+ dir.toLowerCase(), published, data);
+ }
}
} catch (IOException e) {
this.logger.log(Level.WARNING, "Could not parse descriptor. "
--
1.7.1