[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[or-cvs] [metrics-db/master] Format raw import files for being processed by psql directly.
Author: Karsten Loesing <karsten.loesing@xxxxxxx>
Date: Fri, 10 Dec 2010 17:45:28 +0100
Subject: Format raw import files for being processed by psql directly.
Commit: f5043ad901639fc2096ce0e7444f858eac763aec
---
.../ernie/db/RelayDescriptorDatabaseImporter.java | 179 ++++++++++++++------
1 files changed, 126 insertions(+), 53 deletions(-)
diff --git a/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java b/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java
index fb46d94..3c8a124 100644
--- a/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java
+++ b/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java
@@ -111,11 +111,39 @@ public final class RelayDescriptorDatabaseImporter {
*/
private Logger logger;
+ /**
+ * Directory for writing raw import files.
+ */
+ private String rawFilesDirectory;
+
+ /**
+ * Raw import file containing status entries.
+ */
private BufferedWriter statusentryOut;
+
+ /**
+ * Raw import file containing server descriptors.
+ */
private BufferedWriter descriptorOut;
+
+ /**
+ * Raw import file containing extra-info descriptors.
+ */
private BufferedWriter extrainfoOut;
+
+ /**
+ * Raw import file containing bandwidth histories.
+ */
private BufferedWriter bwhistOut;
+
+ /**
+ * Raw import file containing consensuses.
+ */
private BufferedWriter consensusOut;
+
+ /**
+ * Raw import file containing votes.
+ */
private BufferedWriter voteOut;
private SimpleDateFormat dateTimeFormat;
@@ -182,29 +210,12 @@ public final class RelayDescriptorDatabaseImporter {
}
}
- if (rawFilesDirectory != null) {
- try {
- new File(rawFilesDirectory).mkdirs();
- this.statusentryOut = new BufferedWriter(new FileWriter(
- rawFilesDirectory + "/statusentry.sql"));
- this.descriptorOut = new BufferedWriter(new FileWriter(
- rawFilesDirectory + "/descriptor.sql"));
- this.extrainfoOut = new BufferedWriter(new FileWriter(
- rawFilesDirectory + "/extrainfo.sql"));
- this.bwhistOut = new BufferedWriter(new FileWriter(
- rawFilesDirectory + "/bwhist.sql"));
- this.consensusOut = new BufferedWriter(new FileWriter(
- rawFilesDirectory + "/consensus.sql"));
- this.voteOut = new BufferedWriter(new FileWriter(
- rawFilesDirectory + "/vote.sql"));
- } catch (IOException e) {
- this.logger.log(Level.WARNING, "Could not open raw database "
- + "import files.", e);
- }
+ /* Remember where we want to write raw import files. */
+ this.rawFilesDirectory = rawFilesDirectory;
- this.dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- this.dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
- }
+ /* Initialize data format, so that we can format timestamps. */
+ this.dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ this.dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
}
/**
@@ -259,7 +270,19 @@ public final class RelayDescriptorDatabaseImporter {
}
}
}
- if (this.statusentryOut != null) {
+ if (this.rawFilesDirectory != null) {
+ if (this.statusentryOut == null) {
+ new File(rawFilesDirectory).mkdirs();
+ this.statusentryOut = new BufferedWriter(new FileWriter(
+ rawFilesDirectory + "/statusentry.sql"));
+ this.statusentryOut.write(" COPY statusentry (validafter, "
+ + "nickname, fingerprint, descriptor, published, address, "
+ + "orport, dirport, isauthority, isbadExit, "
+ + "isbaddirectory, isexit, isfast, isguard, ishsdir, "
+ + "isnamed, isstable, isrunning, isunnamed, isvalid, "
+ + "isv2dir, isv3dir, version, bandwidth, ports, rawdesc) "
+ + "FROM stdin;\n");
+ }
this.statusentryOut.write(
this.dateTimeFormat.format(validAfter) + "\t" + nickname
+ "\t" + fingerprint.toLowerCase() + "\t"
@@ -284,7 +307,7 @@ public final class RelayDescriptorDatabaseImporter {
+ (bandwidth >= 0 ? bandwidth : "\\N") + "\t"
+ (ports != null ? ports : "\\N") + "\t");
this.statusentryOut.write(PGbytea.toPGString(rawDescriptor).
- replaceAll("\\\\\\\\", "\\\\\\\\\\\\\\\\") + "\n");
+ replaceAll("\\\\", "\\\\\\\\") + "\n");
}
} catch (SQLException e) {
this.logger.log(Level.WARNING, "Could not add network status "
@@ -334,18 +357,29 @@ public final class RelayDescriptorDatabaseImporter {
}
}
}
- if (this.descriptorOut != null) {
+ if (this.rawFilesDirectory != null) {
+ if (this.descriptorOut == null) {
+ new File(rawFilesDirectory).mkdirs();
+ this.descriptorOut = new BufferedWriter(new FileWriter(
+ rawFilesDirectory + "/descriptor.sql"));
+ this.descriptorOut.write(" COPY descriptor (descriptor, "
+ + "nickname, address, orport, dirport, fingerprint, "
+ + "bandwidthavg, bandwidthburst, bandwidthobserved, "
+ + "platform, published, uptime, extrainfo, rawdesc) FROM "
+ + "stdin;\n");
+ }
this.descriptorOut.write(descriptor.toLowerCase() + "\t"
+ nickname + "\t" + address + "\t" + orPort + "\t" + dirPort
+ "\t" + relayIdentifier + "\t" + bandwidthAvg + "\t"
+ bandwidthBurst + "\t" + bandwidthObserved + "\t"
+ (platform != null && platform.length() > 0
- ? new String(platform.getBytes(), "US-ASCII") : "\\N") + "\t"
- + this.dateTimeFormat.format(published) + "\t"
+ ? new String(platform.getBytes(), "US-ASCII") : "\\N")
+ + "\t" + this.dateTimeFormat.format(published) + "\t"
+ (uptime >= 0 ? uptime : "\\N") + "\t"
- + (extraInfoDigest != null ? extraInfoDigest : "\\N") + "\t");
+ + (extraInfoDigest != null ? extraInfoDigest : "\\N")
+ + "\t");
this.descriptorOut.write(PGbytea.toPGString(rawDescriptor).
- replaceAll("\\\\\\\\", "\\\\\\\\\\\\\\\\") + "\n");
+ replaceAll("\\\\", "\\\\\\\\") + "\n");
}
} catch (UnsupportedEncodingException e) {
this.logger.log(Level.WARNING, "Could not add server descriptor.",
@@ -387,7 +421,7 @@ public final class RelayDescriptorDatabaseImporter {
}
}
if ((this.psHs != null && this.psH != null) ||
- this.bwhistOut != null) {
+ this.rawFilesDirectory != null) {
boolean addToDatabase = false;
if (psHs != null && this.psH != null) {
this.psHs.setString(1, extraInfoDigest);
@@ -397,7 +431,7 @@ public final class RelayDescriptorDatabaseImporter {
addToDatabase = true;
}
}
- if (addToDatabase || this.bwhistOut != null) {
+ if (addToDatabase || this.rawFilesDirectory != null) {
String lastIntervalEnd = null;
List<String> bandwidthHistoryValues = new ArrayList<String>();
bandwidthHistoryValues.addAll(bandwidthHistory.values());
@@ -442,18 +476,28 @@ public final class RelayDescriptorDatabaseImporter {
}
this.psH.executeUpdate();
}
- if (this.bwhistOut != null) {
- this.bwhistOut.write(fingerprint.toLowerCase() + "\t"
- + extraInfoDigest.toLowerCase() + "\t"
- + this.dateTimeFormat.format(Long.parseLong(
- lastIntervalEnd)) + "\t"
- + (readBytes != null ? readBytes : "\\N") + "\t"
- + (writtenBytes != null ? writtenBytes : "\\N") + "\t"
- + (dirReadBytes != null ? dirReadBytes : "\\N") + "\t"
- + (dirWrittenBytes != null ? dirWrittenBytes : "\\N")
- + "\n");
+ if (this.rawFilesDirectory != null) {
+ if (this.bwhistOut == null) {
+ new File(rawFilesDirectory).mkdirs();
+ this.bwhistOut = new BufferedWriter(new FileWriter(
+ rawFilesDirectory + "/bwhist.sql"));
+ this.bwhistOut.write(" COPY bwhist (fingerprint, "
+ + "extrainfo, intervalend, read, written, dirread, "
+ + "dirwritten) FROM stdin;\n");
+ }
+ String extraInfo = extraInfoDigest.toLowerCase();
+ String intervalEndString = this.dateTimeFormat.format(
+ Long.parseLong(lastIntervalEnd));
+ this.bwhistOut.write(fingerprint.toLowerCase() + "\t"
+ + extraInfo + "\t" + intervalEndString + "\t"
+ + (readBytes != null ? readBytes : "\\N") + "\t"
+ + (writtenBytes != null ? writtenBytes : "\\N")
+ + "\t" + (dirReadBytes != null ? dirReadBytes
+ : "\\N") + "\t" + (dirWrittenBytes != null
+ ? dirWrittenBytes : "\\N") + "\n");
}
- readBytes = writtenBytes = dirReadBytes = dirWrittenBytes = null;
+ readBytes = writtenBytes = dirReadBytes = dirWrittenBytes =
+ null;
}
if (intervalEnd.equals("EOL")) {
break;
@@ -480,12 +524,19 @@ public final class RelayDescriptorDatabaseImporter {
}
}
}
- if (this.extrainfoOut != null) {
+ if (this.rawFilesDirectory != null) {
+ if (this.extrainfoOut == null) {
+ new File(rawFilesDirectory).mkdirs();
+ this.extrainfoOut = new BufferedWriter(new FileWriter(
+ rawFilesDirectory + "/extrainfo.sql"));
+ this.extrainfoOut.write(" COPY extrainfo (extrainfo, nickname, "
+ + "fingerprint, published, rawdesc) FROM stdin;\n");
+ }
this.extrainfoOut.write(extraInfoDigest.toLowerCase() + "\t"
+ nickname + "\t" + fingerprint.toLowerCase() + "\t"
+ this.dateTimeFormat.format(published) + "\t");
this.extrainfoOut.write(PGbytea.toPGString(rawDescriptor).
- replaceAll("\\\\\\\\", "\\\\\\\\\\\\\\\\") + "\n");
+ replaceAll("\\\\", "\\\\\\\\") + "\n");
}
} catch (SQLException e) {
this.logger.log(Level.WARNING, "Could not add extra-info "
@@ -519,11 +570,18 @@ public final class RelayDescriptorDatabaseImporter {
}
}
}
- if (this.consensusOut != null) {
- this.consensusOut.write(this.dateTimeFormat.format(validAfter)
- + "\t");
+ if (this.rawFilesDirectory != null) {
+ if (this.consensusOut == null) {
+ new File(rawFilesDirectory).mkdirs();
+ this.consensusOut = new BufferedWriter(new FileWriter(
+ rawFilesDirectory + "/consensus.sql"));
+ this.consensusOut.write(" COPY consensus (validafter, rawdesc) "
+ + "FROM stdin;\n");
+ }
+ String validAfterString = this.dateTimeFormat.format(validAfter);
+ this.consensusOut.write(validAfterString + "\t");
this.consensusOut.write(PGbytea.toPGString(rawDescriptor).
- replaceAll("\\\\\\\\", "\\\\\\\\\\\\\\\\") + "\n");
+ replaceAll("\\\\", "\\\\\\\\") + "\n");
}
} catch (SQLException e) {
this.logger.log(Level.WARNING, "Could not add network status "
@@ -560,11 +618,18 @@ public final class RelayDescriptorDatabaseImporter {
}
}
}
- if (this.voteOut != null) {
- this.voteOut.write(this.dateTimeFormat.format(validAfter) + "\t"
- + dirSource + "\t");
+ if (this.rawFilesDirectory != null) {
+ if (this.voteOut == null) {
+ new File(rawFilesDirectory).mkdirs();
+ this.voteOut = new BufferedWriter(new FileWriter(
+ rawFilesDirectory + "/vote.sql"));
+ this.voteOut.write(" COPY vote (validafter, dirsource, "
+ + "rawdesc) FROM stdin;\n");
+ }
+ String validAfterString = this.dateTimeFormat.format(validAfter);
+ this.voteOut.write(validAfterString + "\t" + dirSource + "\t");
this.voteOut.write(PGbytea.toPGString(rawDescriptor).
- replaceAll("\\\\\\\\", "\\\\\\\\\\\\\\\\") + "\n");
+ replaceAll("\\\\", "\\\\\\\\") + "\n");
}
} catch (SQLException e) {
this.logger.log(Level.WARNING, "Could not add network status vote.",
@@ -579,7 +644,8 @@ public final class RelayDescriptorDatabaseImporter {
* Close the relay descriptor database connection.
*/
public void closeConnection() {
- /* commit any stragglers before closing */
+
+ /* Commit any stragglers before closing. */
if (this.conn != null) {
try {
this.conn.commit();
@@ -594,24 +660,31 @@ public final class RelayDescriptorDatabaseImporter {
+ "connection.", e);
}
}
+
/* Close raw import files. */
try {
if (this.statusentryOut != null) {
+ this.statusentryOut.write("\\.\n");
this.statusentryOut.close();
}
if (this.descriptorOut != null) {
+ this.descriptorOut.write("\\.\n");
this.descriptorOut.close();
}
if (this.extrainfoOut != null) {
+ this.extrainfoOut.write("\\.\n");
this.extrainfoOut.close();
}
if (this.bwhistOut != null) {
+ this.bwhistOut.write("\\.\n");
this.bwhistOut.close();
}
if (this.consensusOut != null) {
+ this.consensusOut.write("\\.\n");
this.consensusOut.close();
}
if (this.voteOut != null) {
+ this.voteOut.write("\\.\n");
this.voteOut.close();
}
} catch (IOException e) {
--
1.7.1