[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[or-cvs] [metrics-db/master] Format raw import files for being processed by psql directly.



Author: Karsten Loesing <karsten.loesing@xxxxxxx>
Date: Fri, 10 Dec 2010 17:45:28 +0100
Subject: Format raw import files for being processed by psql directly.
Commit: f5043ad901639fc2096ce0e7444f858eac763aec

---
 .../ernie/db/RelayDescriptorDatabaseImporter.java  |  179 ++++++++++++++------
 1 files changed, 126 insertions(+), 53 deletions(-)

diff --git a/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java b/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java
index fb46d94..3c8a124 100644
--- a/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java
+++ b/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java
@@ -111,11 +111,39 @@ public final class RelayDescriptorDatabaseImporter {
    */
   private Logger logger;
 
+  /**
+   * Directory for writing raw import files.
+   */
+  private String rawFilesDirectory;
+
+  /**
+   * Raw import file containing status entries.
+   */
   private BufferedWriter statusentryOut;
+
+  /**
+   * Raw import file containing server descriptors.
+   */
   private BufferedWriter descriptorOut;
+
+  /**
+   * Raw import file containing extra-info descriptors.
+   */
   private BufferedWriter extrainfoOut;
+
+  /**
+   * Raw import file containing bandwidth histories.
+   */
   private BufferedWriter bwhistOut;
+
+  /**
+   * Raw import file containing consensuses.
+   */
   private BufferedWriter consensusOut;
+
+  /**
+   * Raw import file containing votes.
+   */
   private BufferedWriter voteOut;
 
   private SimpleDateFormat dateTimeFormat;
@@ -182,29 +210,12 @@ public final class RelayDescriptorDatabaseImporter {
       }
     }
 
-    if (rawFilesDirectory != null) {
-      try {
-        new File(rawFilesDirectory).mkdirs();
-        this.statusentryOut = new BufferedWriter(new FileWriter(
-            rawFilesDirectory + "/statusentry.sql"));
-        this.descriptorOut = new BufferedWriter(new FileWriter(
-            rawFilesDirectory + "/descriptor.sql"));
-        this.extrainfoOut = new BufferedWriter(new FileWriter(
-            rawFilesDirectory + "/extrainfo.sql"));
-        this.bwhistOut = new BufferedWriter(new FileWriter(
-            rawFilesDirectory + "/bwhist.sql"));
-        this.consensusOut = new BufferedWriter(new FileWriter(
-            rawFilesDirectory + "/consensus.sql"));
-        this.voteOut = new BufferedWriter(new FileWriter(
-            rawFilesDirectory + "/vote.sql"));
-      } catch (IOException e) {
-        this.logger.log(Level.WARNING, "Could not open raw database "
-            + "import files.", e);
-      }
+    /* Remember where we want to write raw import files. */
+    this.rawFilesDirectory = rawFilesDirectory;
 
-      this.dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
-      this.dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
-    }
+    /* Initialize data format, so that we can format timestamps. */
+    this.dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+    this.dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
   }
 
   /**
@@ -259,7 +270,19 @@ public final class RelayDescriptorDatabaseImporter {
           }
         }
       }
-      if (this.statusentryOut != null) {
+      if (this.rawFilesDirectory != null) {
+        if (this.statusentryOut == null) {
+          new File(rawFilesDirectory).mkdirs();
+          this.statusentryOut = new BufferedWriter(new FileWriter(
+              rawFilesDirectory + "/statusentry.sql"));
+          this.statusentryOut.write(" COPY statusentry (validafter, "
+              + "nickname, fingerprint, descriptor, published, address, "
+              + "orport, dirport, isauthority, isbadExit, "
+              + "isbaddirectory, isexit, isfast, isguard, ishsdir, "
+              + "isnamed, isstable, isrunning, isunnamed, isvalid, "
+              + "isv2dir, isv3dir, version, bandwidth, ports, rawdesc) "
+              + "FROM stdin;\n");
+        }
         this.statusentryOut.write(
             this.dateTimeFormat.format(validAfter) + "\t" + nickname
             + "\t" + fingerprint.toLowerCase() + "\t"
@@ -284,7 +307,7 @@ public final class RelayDescriptorDatabaseImporter {
             + (bandwidth >= 0 ? bandwidth : "\\N") + "\t"
             + (ports != null ? ports : "\\N") + "\t");
         this.statusentryOut.write(PGbytea.toPGString(rawDescriptor).
-            replaceAll("\\\\\\\\", "\\\\\\\\\\\\\\\\") + "\n");
+            replaceAll("\\\\", "\\\\\\\\") + "\n");
       }
     } catch (SQLException e) {
       this.logger.log(Level.WARNING, "Could not add network status "
@@ -334,18 +357,29 @@ public final class RelayDescriptorDatabaseImporter {
           }
         }
       }
-      if (this.descriptorOut != null) {
+      if (this.rawFilesDirectory != null) {
+        if (this.descriptorOut == null) {
+          new File(rawFilesDirectory).mkdirs();
+          this.descriptorOut = new BufferedWriter(new FileWriter(
+              rawFilesDirectory + "/descriptor.sql"));
+          this.descriptorOut.write(" COPY descriptor (descriptor, "
+              + "nickname, address, orport, dirport, fingerprint, "
+              + "bandwidthavg, bandwidthburst, bandwidthobserved, "
+              + "platform, published, uptime, extrainfo, rawdesc) FROM "
+              + "stdin;\n");
+        }
         this.descriptorOut.write(descriptor.toLowerCase() + "\t"
             + nickname + "\t" + address + "\t" + orPort + "\t" + dirPort
             + "\t" + relayIdentifier + "\t" + bandwidthAvg + "\t"
             + bandwidthBurst + "\t" + bandwidthObserved + "\t"
             + (platform != null && platform.length() > 0
-            ? new String(platform.getBytes(), "US-ASCII") : "\\N") + "\t"
-            + this.dateTimeFormat.format(published) + "\t"
+            ? new String(platform.getBytes(), "US-ASCII") : "\\N")
+            + "\t" + this.dateTimeFormat.format(published) + "\t"
             + (uptime >= 0 ? uptime : "\\N") + "\t"
-            + (extraInfoDigest != null ? extraInfoDigest : "\\N") + "\t");
+            + (extraInfoDigest != null ? extraInfoDigest : "\\N")
+            + "\t");
         this.descriptorOut.write(PGbytea.toPGString(rawDescriptor).
-            replaceAll("\\\\\\\\", "\\\\\\\\\\\\\\\\") + "\n");
+            replaceAll("\\\\", "\\\\\\\\") + "\n");
       }
     } catch (UnsupportedEncodingException e) {
       this.logger.log(Level.WARNING, "Could not add server descriptor.",
@@ -387,7 +421,7 @@ public final class RelayDescriptorDatabaseImporter {
         }
       }
       if ((this.psHs != null && this.psH != null) ||
-          this.bwhistOut != null) {
+          this.rawFilesDirectory != null) {
         boolean addToDatabase = false;
         if (psHs != null && this.psH != null) {
           this.psHs.setString(1, extraInfoDigest);
@@ -397,7 +431,7 @@ public final class RelayDescriptorDatabaseImporter {
             addToDatabase = true;
           }
         }
-        if (addToDatabase || this.bwhistOut != null) {
+        if (addToDatabase || this.rawFilesDirectory != null) {
           String lastIntervalEnd = null;
           List<String> bandwidthHistoryValues = new ArrayList<String>();
           bandwidthHistoryValues.addAll(bandwidthHistory.values());
@@ -442,18 +476,28 @@ public final class RelayDescriptorDatabaseImporter {
                 }
                 this.psH.executeUpdate();
               }
-              if (this.bwhistOut != null) {
-               this.bwhistOut.write(fingerprint.toLowerCase() + "\t"
-                   + extraInfoDigest.toLowerCase() + "\t"
-                   + this.dateTimeFormat.format(Long.parseLong(
-                   lastIntervalEnd)) + "\t"
-                   + (readBytes != null ? readBytes : "\\N") + "\t"
-                   + (writtenBytes != null ? writtenBytes : "\\N") + "\t"
-                   + (dirReadBytes != null ? dirReadBytes : "\\N") + "\t"
-                   + (dirWrittenBytes != null ? dirWrittenBytes : "\\N")
-                   + "\n");
+              if (this.rawFilesDirectory != null) {
+                if (this.bwhistOut == null) {
+                  new File(rawFilesDirectory).mkdirs();
+                  this.bwhistOut = new BufferedWriter(new FileWriter(
+                      rawFilesDirectory + "/bwhist.sql"));
+                  this.bwhistOut.write(" COPY bwhist (fingerprint, "
+                      + "extrainfo, intervalend, read, written, dirread, "
+                      + "dirwritten) FROM stdin;\n");
+                }
+                String extraInfo = extraInfoDigest.toLowerCase();
+                String intervalEndString = this.dateTimeFormat.format(
+                    Long.parseLong(lastIntervalEnd));
+                this.bwhistOut.write(fingerprint.toLowerCase() + "\t"
+                    + extraInfo + "\t" + intervalEndString + "\t"
+                    + (readBytes != null ? readBytes : "\\N") + "\t"
+                    + (writtenBytes != null ? writtenBytes : "\\N")
+                    + "\t" + (dirReadBytes != null ? dirReadBytes
+                    : "\\N") + "\t" + (dirWrittenBytes != null
+                    ? dirWrittenBytes : "\\N") + "\n");
               }
-              readBytes = writtenBytes = dirReadBytes = dirWrittenBytes = null;
+              readBytes = writtenBytes = dirReadBytes = dirWrittenBytes =
+                  null;
             }
             if (intervalEnd.equals("EOL")) {
               break;
@@ -480,12 +524,19 @@ public final class RelayDescriptorDatabaseImporter {
           }
         }
       }
-      if (this.extrainfoOut != null) {
+      if (this.rawFilesDirectory != null) {
+        if (this.extrainfoOut == null) {
+          new File(rawFilesDirectory).mkdirs();
+          this.extrainfoOut = new BufferedWriter(new FileWriter(
+              rawFilesDirectory + "/extrainfo.sql"));
+          this.extrainfoOut.write(" COPY extrainfo (extrainfo, nickname, "
+              + "fingerprint, published, rawdesc) FROM stdin;\n");
+        }
         this.extrainfoOut.write(extraInfoDigest.toLowerCase() + "\t"
             + nickname + "\t" + fingerprint.toLowerCase() + "\t"
             + this.dateTimeFormat.format(published) + "\t");
         this.extrainfoOut.write(PGbytea.toPGString(rawDescriptor).
-            replaceAll("\\\\\\\\", "\\\\\\\\\\\\\\\\") + "\n");
+            replaceAll("\\\\", "\\\\\\\\") + "\n");
       }
     } catch (SQLException e) {
       this.logger.log(Level.WARNING, "Could not add extra-info "
@@ -519,11 +570,18 @@ public final class RelayDescriptorDatabaseImporter {
           }
         }
       }
-      if (this.consensusOut != null) {
-        this.consensusOut.write(this.dateTimeFormat.format(validAfter)
-            + "\t");
+      if (this.rawFilesDirectory != null) {
+        if (this.consensusOut == null) {
+          new File(rawFilesDirectory).mkdirs();
+          this.consensusOut = new BufferedWriter(new FileWriter(
+              rawFilesDirectory + "/consensus.sql"));
+          this.consensusOut.write(" COPY consensus (validafter, rawdesc) "
+              + "FROM stdin;\n");
+        }
+        String validAfterString = this.dateTimeFormat.format(validAfter);
+        this.consensusOut.write(validAfterString + "\t");
         this.consensusOut.write(PGbytea.toPGString(rawDescriptor).
-            replaceAll("\\\\\\\\", "\\\\\\\\\\\\\\\\") + "\n");
+            replaceAll("\\\\", "\\\\\\\\") + "\n");
       }
     } catch (SQLException e) {
       this.logger.log(Level.WARNING, "Could not add network status "
@@ -560,11 +618,18 @@ public final class RelayDescriptorDatabaseImporter {
           }
         }
       }
-      if (this.voteOut != null) {
-        this.voteOut.write(this.dateTimeFormat.format(validAfter) + "\t"
-            + dirSource + "\t");
+      if (this.rawFilesDirectory != null) {
+        if (this.voteOut == null) {
+          new File(rawFilesDirectory).mkdirs();
+          this.voteOut = new BufferedWriter(new FileWriter(
+              rawFilesDirectory + "/vote.sql"));
+          this.voteOut.write(" COPY vote (validafter, dirsource, "
+              + "rawdesc) FROM stdin;\n");
+        }
+        String validAfterString = this.dateTimeFormat.format(validAfter);
+        this.voteOut.write(validAfterString + "\t" + dirSource + "\t");
         this.voteOut.write(PGbytea.toPGString(rawDescriptor).
-            replaceAll("\\\\\\\\", "\\\\\\\\\\\\\\\\") + "\n");
+            replaceAll("\\\\", "\\\\\\\\") + "\n");
       }
     } catch (SQLException e) {
       this.logger.log(Level.WARNING, "Could not add network status vote.",
@@ -579,7 +644,8 @@ public final class RelayDescriptorDatabaseImporter {
    * Close the relay descriptor database connection.
    */
   public void closeConnection() {
-    /* commit any stragglers before closing */
+
+    /* Commit any stragglers before closing. */
     if (this.conn != null) {
       try {
         this.conn.commit();
@@ -594,24 +660,31 @@ public final class RelayDescriptorDatabaseImporter {
             + "connection.", e);
       }
     }
+
     /* Close raw import files. */
     try {
       if (this.statusentryOut != null) {
+        this.statusentryOut.write("\\.\n");
         this.statusentryOut.close();
       }
       if (this.descriptorOut != null) {
+        this.descriptorOut.write("\\.\n");
         this.descriptorOut.close();
       }
       if (this.extrainfoOut != null) {
+        this.extrainfoOut.write("\\.\n");
         this.extrainfoOut.close();
       }
       if (this.bwhistOut != null) {
+        this.bwhistOut.write("\\.\n");
         this.bwhistOut.close();
       }
       if (this.consensusOut != null) {
+        this.consensusOut.write("\\.\n");
         this.consensusOut.close();
       }
       if (this.voteOut != null) {
+        this.voteOut.write("\\.\n");
         this.voteOut.close();
       }
     } catch (IOException e) {
-- 
1.7.1