[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[or-cvs] [metrics-db/master] Abort database imports after the first SQLException.



commit 90ca76fd7ad30d0507e3dcea8399ae682092863a
Author: Karsten Loesing <karsten.loesing@xxxxxxx>
Date:   Fri Jan 28 15:32:13 2011 +0100

    Abort database imports after the first SQLException.
---
 .../ernie/db/RelayDescriptorDatabaseImporter.java  |  237 +++++++++++++------
 1 files changed, 162 insertions(+), 75 deletions(-)

diff --git a/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java b/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java
index 6fce2db..3a06a58 100644
--- a/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java
+++ b/src/org/torproject/ernie/db/RelayDescriptorDatabaseImporter.java
@@ -218,6 +218,9 @@ public final class RelayDescriptorDatabaseImporter {
    */
   private boolean separateStatusEntryCheckNecessary;
 
+  private boolean importIntoDatabase;
+  private boolean writeRawImportFiles;
+
   /**
    * Initialize database importer by connecting to the database and
    * preparing statements.
@@ -286,6 +289,7 @@ public final class RelayDescriptorDatabaseImporter {
         this.psU = conn.prepareStatement("INSERT INTO scheduled_updates "
             + "(date) VALUES (?)");
         this.scheduledUpdates = new HashSet<Long>();
+        this.importIntoDatabase = true;
       } catch (SQLException e) {
         this.logger.log(Level.WARNING, "Could not connect to database or "
             + "prepare statements.", e);
@@ -297,7 +301,10 @@ public final class RelayDescriptorDatabaseImporter {
     }
 
     /* Remember where we want to write raw import files. */
-    this.rawFilesDirectory = rawFilesDirectory;
+    if (rawFilesDirectory != null) {
+      this.rawFilesDirectory = rawFilesDirectory;
+      this.writeRawImportFiles = true;
+    }
 
     /* Initialize date format, so that we can format timestamps. */
     this.dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@@ -306,7 +313,7 @@ public final class RelayDescriptorDatabaseImporter {
 
   private void addDateToScheduledUpdates(long timestamp)
       throws SQLException {
-    if (this.psU == null) {
+    if (!this.importIntoDatabase) {
       return;
     }
     long dateMillis = 0L;
@@ -333,8 +340,8 @@ public final class RelayDescriptorDatabaseImporter {
       String address, long orPort, long dirPort,
       SortedSet<String> flags, String version, long bandwidth,
       String ports, byte[] rawDescriptor) {
-    try {
-      if (this.psSs != null && this.psRs != null && this.psR != null) {
+    if (this.importIntoDatabase) {
+      try {
         this.addDateToScheduledUpdates(validAfter);
         Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
         Timestamp validAfterTimestamp = new Timestamp(validAfter);
@@ -399,8 +406,15 @@ public final class RelayDescriptorDatabaseImporter {
             this.conn.commit();
           }
         }
+      } catch (SQLException e) {
+        this.logger.log(Level.WARNING, "Could not add network status "
+            + "consensus entry.  We won't make any further SQL requests "
+            + "in this execution.", e);
+        this.importIntoDatabase = false;
       }
-      if (this.rawFilesDirectory != null) {
+    }
+    if (this.writeRawImportFiles) {
+      try {
         if (this.statusentryOut == null) {
           new File(rawFilesDirectory).mkdirs();
           this.statusentryOut = new BufferedWriter(new FileWriter(
@@ -438,13 +452,19 @@ public final class RelayDescriptorDatabaseImporter {
             + (ports != null ? ports : "\\N") + "\t");
         this.statusentryOut.write(PGbytea.toPGString(rawDescriptor).
             replaceAll("\\\\", "\\\\\\\\") + "\n");
+      } catch (SQLException e) {
+        this.logger.log(Level.WARNING, "Could not write network status "
+            + "consensus entry to raw database import file.  We won't "
+            + "make any further attempts to write raw import files in "
+            + "this execution.", e);
+        this.writeRawImportFiles = false;
+      } catch (IOException e) {
+        this.logger.log(Level.WARNING, "Could not write network status "
+            + "consensus entry to raw database import file.  We won't "
+            + "make any further attempts to write raw import files in "
+            + "this execution.", e);
+        this.writeRawImportFiles = false;
       }
-    } catch (SQLException e) {
-      this.logger.log(Level.WARNING, "Could not add network status "
-          + "consensus entry.", e);
-    } catch (IOException e) {
-      this.logger.log(Level.WARNING, "Could not write network status "
-          + "consensus entry to raw database import file.", e);
     }
   }
 
@@ -456,8 +476,8 @@ public final class RelayDescriptorDatabaseImporter {
       long bandwidthAvg, long bandwidthBurst, long bandwidthObserved,
       String platform, long published, long uptime,
       String extraInfoDigest, byte[] rawDescriptor) {
-    try {
-      if (this.psDs != null && this.psD != null) {
+    if (this.importIntoDatabase) {
+      try {
         this.addDateToScheduledUpdates(published);
         this.addDateToScheduledUpdates(
             published + 24L * 60L * 60L * 1000L);
@@ -488,8 +508,17 @@ public final class RelayDescriptorDatabaseImporter {
             this.conn.commit();
           }
         }
+      } catch (UnsupportedEncodingException e) {
+        // US-ASCII is supported for sure
+      } catch (SQLException e) {
+        this.logger.log(Level.WARNING, "Could not add server "
+            + "descriptor.  We won't make any further SQL requests in "
+            + "this execution.", e);
+        this.importIntoDatabase = false;
       }
-      if (this.rawFilesDirectory != null) {
+    }
+    if (this.writeRawImportFiles) {
+      try {
         if (this.descriptorOut == null) {
           new File(rawFilesDirectory).mkdirs();
           this.descriptorOut = new BufferedWriter(new FileWriter(
@@ -512,16 +541,21 @@ public final class RelayDescriptorDatabaseImporter {
             + "\t");
         this.descriptorOut.write(PGbytea.toPGString(rawDescriptor).
             replaceAll("\\\\", "\\\\\\\\") + "\n");
+      } catch (UnsupportedEncodingException e) {
+        // US-ASCII is supported for sure
+      } catch (SQLException e) {
+        this.logger.log(Level.WARNING, "Could not write server "
+            + "descriptor to raw database import file.  We won't make "
+            + "any further attempts to write raw import files in this "
+            + "execution.", e);
+        this.writeRawImportFiles = false;
+      } catch (IOException e) {
+        this.logger.log(Level.WARNING, "Could not write server "
+            + "descriptor to raw database import file.  We won't make "
+            + "any further attempts to write raw import files in this "
+            + "execution.", e);
+        this.writeRawImportFiles = false;
       }
-    } catch (UnsupportedEncodingException e) {
-      this.logger.log(Level.WARNING, "Could not add server descriptor.",
-          e);
-    } catch (SQLException e) {
-      this.logger.log(Level.WARNING, "Could not add server descriptor.",
-          e);
-    } catch (IOException e) {
-      this.logger.log(Level.WARNING, "Could not write server descriptor "
-          + "to raw database import file.", e);
     }
   }
 
@@ -531,9 +565,9 @@ public final class RelayDescriptorDatabaseImporter {
   public void addExtraInfoDescriptor(String extraInfoDigest,
       String nickname, String fingerprint, long published,
       byte[] rawDescriptor, List<String> bandwidthHistoryLines) {
-    try {
-      Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
-      if (this.psEs != null && this.psE != null) {
+    if (this.importIntoDatabase) {
+      try {
+        Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
         this.psEs.setString(1, extraInfoDigest);
         ResultSet rs = psEs.executeQuery();
         rs.next();
@@ -550,13 +584,15 @@ public final class RelayDescriptorDatabaseImporter {
             this.conn.commit();
           }
         }
+      } catch (SQLException e) {
+        this.logger.log(Level.WARNING, "Could not add extra-info "
+            + "descriptor.  We won't make any further SQL requests in "
+            + "this execution.", e);
+        this.importIntoDatabase = false;
       }
-    } catch (SQLException e) {
-      this.logger.log(Level.WARNING, "Could not add extra-info "
-          + "descriptor.", e);
     }
-    try {
-      if (this.rawFilesDirectory != null) {
+    if (this.writeRawImportFiles) {
+      try {
         if (this.extrainfoOut == null) {
           new File(rawFilesDirectory).mkdirs();
           this.extrainfoOut = new BufferedWriter(new FileWriter(
@@ -569,13 +605,19 @@ public final class RelayDescriptorDatabaseImporter {
             + this.dateTimeFormat.format(published) + "\t");
         this.extrainfoOut.write(PGbytea.toPGString(rawDescriptor).
             replaceAll("\\\\", "\\\\\\\\") + "\n");
+      } catch (IOException e) {
+        this.logger.log(Level.WARNING, "Could not write extra-info "
+            + "descriptor to raw database import file.  We won't make "
+            + "any further attempts to write raw import files in this "
+            + "execution.", e);
+        this.writeRawImportFiles = false;
+      } catch (SQLException e) {
+        this.logger.log(Level.WARNING, "Could not write extra-info "
+            + "descriptor to raw database import file.  We won't make "
+            + "any further attempts to write raw import files in this "
+            + "execution.", e);
+        this.writeRawImportFiles = false;
       }
-    } catch (IOException e) {
-      this.logger.log(Level.WARNING, "Could not write extra-info "
-          + "descriptor to raw database import file.", e);
-    } catch (SQLException e) {
-      this.logger.log(Level.WARNING, "Could not write extra-info "
-          + "descriptor to raw database import file.", e);
     }
     if (!bandwidthHistoryLines.isEmpty()) {
       this.addBandwidthHistory(fingerprint.toLowerCase(), published,
@@ -742,7 +784,7 @@ public final class RelayDescriptorDatabaseImporter {
             dirreadOffset);
         BigIntArray dirwrittenIntArray = new BigIntArray(dirwrittenArray,
             dirwrittenOffset);
-        if (this.csH != null) {
+        if (this.importIntoDatabase) {
           try {
             long dateMillis = dateTimeFormat.parse(lastDate
                 + " 00:00:00").getTime();
@@ -760,13 +802,17 @@ public final class RelayDescriptorDatabaseImporter {
             }
           } catch (SQLException e) {
             this.logger.log(Level.WARNING, "Could not insert bandwidth "
-                + "history line into database.", e);
+                + "history line into database.  We won't make any "
+                + "further SQL requests in this execution.", e);
+            this.importIntoDatabase = false;
           } catch (ParseException e) {
             this.logger.log(Level.WARNING, "Could not insert bandwidth "
-                + "history line into database.", e);
+                + "history line into database.  We won't make any "
+                + "further SQL requests in this execution.", e);
+            this.importIntoDatabase = false;
           }
         }
-        if (this.rawFilesDirectory != null) {
+        if (this.writeRawImportFiles) {
           try {
             if (this.bwhistOut == null) {
               new File(rawFilesDirectory).mkdirs();
@@ -780,7 +826,10 @@ public final class RelayDescriptorDatabaseImporter {
                 + dirwrittenIntArray.toString() + "');\n");
           } catch (IOException e) {
             this.logger.log(Level.WARNING, "Could not write bandwidth "
-                + "history to raw database import file.", e);
+                + "history to raw database import file.  We won't make "
+                + "any further attempts to write raw import files in "
+                + "this execution.", e);
+            this.writeRawImportFiles = false;
           }
         }
         readArray = writtenArray = dirreadArray = dirwrittenArray = null;
@@ -826,8 +875,8 @@ public final class RelayDescriptorDatabaseImporter {
    * Insert network status consensus into database.
    */
   public void addConsensus(long validAfter, byte[] rawDescriptor) {
-    try {
-      if (this.psCs != null && this.psC != null) {
+    if (this.importIntoDatabase) {
+      try {
         this.addDateToScheduledUpdates(validAfter);
         Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
         Timestamp validAfterTimestamp = new Timestamp(validAfter);
@@ -844,8 +893,15 @@ public final class RelayDescriptorDatabaseImporter {
             this.conn.commit();
           }
         }
+      } catch (SQLException e) {
+        this.logger.log(Level.WARNING, "Could not add network status "
+            + "consensus.  We won't make any further SQL requests in "
+            + "this execution.", e);
+        this.importIntoDatabase = false;
       }
-      if (this.rawFilesDirectory != null) {
+    }
+    if (this.writeRawImportFiles) {
+      try {
         if (this.consensusOut == null) {
           new File(rawFilesDirectory).mkdirs();
           this.consensusOut = new BufferedWriter(new FileWriter(
@@ -857,13 +913,19 @@ public final class RelayDescriptorDatabaseImporter {
         this.consensusOut.write(validAfterString + "\t");
         this.consensusOut.write(PGbytea.toPGString(rawDescriptor).
             replaceAll("\\\\", "\\\\\\\\") + "\n");
+      } catch (SQLException e) {
+        this.logger.log(Level.WARNING, "Could not write network status "
+            + "consensus to raw database import file.  We won't make "
+            + "any further attempts to write raw import files in this "
+            + "execution.", e);
+        this.writeRawImportFiles = false;
+      } catch (IOException e) {
+        this.logger.log(Level.WARNING, "Could not write network status "
+            + "consensus to raw database import file.  We won't make "
+            + "any further attempts to write raw import files in this "
+            + "execution.", e);
+        this.writeRawImportFiles = false;
       }
-    } catch (SQLException e) {
-      this.logger.log(Level.WARNING, "Could not add network status "
-          + "consensus.", e);
-    } catch (IOException e) {
-      this.logger.log(Level.WARNING, "Could not write network status "
-          + "consensus to raw database import file.", e);
     }
   }
 
@@ -872,8 +934,8 @@ public final class RelayDescriptorDatabaseImporter {
    */
   public void addVote(long validAfter, String dirSource,
       byte[] rawDescriptor) {
-    try {
-      if (this.psVs != null && this.psV != null) {
+    if (this.importIntoDatabase) {
+      try {
         Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
         Timestamp validAfterTimestamp = new Timestamp(validAfter);
         this.psVs.setTimestamp(1, validAfterTimestamp, cal);
@@ -891,8 +953,15 @@ public final class RelayDescriptorDatabaseImporter {
             this.conn.commit();
           }
         }
+      } catch (SQLException e) {
+        this.logger.log(Level.WARNING, "Could not add network status "
+            + "vote.  We won't make any further SQL requests in this "
+            + "execution.", e);
+        this.importIntoDatabase = false;
       }
-      if (this.rawFilesDirectory != null) {
+    }
+    if (this.writeRawImportFiles) {
+      try {
         if (this.voteOut == null) {
           new File(rawFilesDirectory).mkdirs();
           this.voteOut = new BufferedWriter(new FileWriter(
@@ -904,13 +973,19 @@ public final class RelayDescriptorDatabaseImporter {
         this.voteOut.write(validAfterString + "\t" + dirSource + "\t");
         this.voteOut.write(PGbytea.toPGString(rawDescriptor).
             replaceAll("\\\\", "\\\\\\\\") + "\n");
+      } catch (SQLException e) {
+        this.logger.log(Level.WARNING, "Could not write network status "
+            + "vote to raw database import file.  We won't make any "
+            + "further attempts to write raw import files in this "
+            + "execution.", e);
+        this.writeRawImportFiles = false;
+      } catch (IOException e) {
+        this.logger.log(Level.WARNING, "Could not write network status "
+            + "vote to raw database import file.  We won't make any "
+            + "further attempts to write raw import files in this "
+            + "execution.", e);
+        this.writeRawImportFiles = false;
       }
-    } catch (SQLException e) {
-      this.logger.log(Level.WARNING, "Could not add network status vote.",
-          e);
-    } catch (IOException e) {
-      this.logger.log(Level.WARNING, "Could not write network status "
-          + "vote to raw database import file.", e);
     }
   }
 
@@ -927,7 +1002,7 @@ public final class RelayDescriptorDatabaseImporter {
           + "stats string with interval ending '" + statsEnd + "'.", e);
       return;
     }
-    if (this.psBs != null && this.psB != null) {
+    if (this.importIntoDatabase) {
       try {
         this.addDateToScheduledUpdates(statsEndTime);
         Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
@@ -953,10 +1028,12 @@ public final class RelayDescriptorDatabaseImporter {
         }
       } catch (SQLException e) {
         this.logger.log(Level.WARNING, "Could not add conn-bi-direct "
-            + "stats string.", e);
+            + "stats string. We won't make any further SQL requests in "
+            + "this execution.", e);
+        this.importIntoDatabase = false;
       }
     }
-    if (this.rawFilesDirectory != null) {
+    if (this.writeRawImportFiles) {
       try {
         if (this.connBiDirectOut == null) {
           new File(rawFilesDirectory).mkdirs();
@@ -971,7 +1048,10 @@ public final class RelayDescriptorDatabaseImporter {
             + both + "\n");
       } catch (IOException e) {
         this.logger.log(Level.WARNING, "Could not write conn-bi-direct "
-            + "stats string to raw database import file.", e);
+            + "stats string to raw database import file.  We won't make "
+            + "any further attempts to write raw import files in this "
+            + "execution.", e);
+        this.writeRawImportFiles = false;
       }
     }
   }
@@ -990,7 +1070,7 @@ public final class RelayDescriptorDatabaseImporter {
           + "interval ending '" + statsEnd + "'.", e);
       return;
     }
-    if (this.psQs != null && this.psQ != null) {
+    if (this.importIntoDatabase) {
       try {
         this.addDateToScheduledUpdates(statsEndTime);
         Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
@@ -1016,10 +1096,13 @@ public final class RelayDescriptorDatabaseImporter {
           }
         }
       } catch (SQLException e) {
-        this.logger.log(Level.WARNING, "Could not add dirreq stats.", e);
+        this.logger.log(Level.WARNING, "Could not add dirreq stats.  We "
+            + "won't make any further SQL requests in this execution.",
+            e);
+        this.importIntoDatabase = false;
       }
     }
-    if (this.rawFilesDirectory != null) {
+    if (this.writeRawImportFiles) {
       try {
         if (this.dirReqOut == null) {
           new File(rawFilesDirectory).mkdirs();
@@ -1035,7 +1118,9 @@ public final class RelayDescriptorDatabaseImporter {
         }
       } catch (IOException e) {
         this.logger.log(Level.WARNING, "Could not write dirreq stats to "
-            + "raw database import file.", e);
+            + "raw database import file.  We won't make any further "
+            + "attempts to write raw import files in this execution.", e);
+        this.writeRawImportFiles = false;
       }
     }
   }
@@ -1059,14 +1144,16 @@ public final class RelayDescriptorDatabaseImporter {
      * insert them only now, because if a Java execution fails at a random
      * point, we might have added data, but not the corresponding dates to
      * update statistics. */
-    try {
-      for (long dateMillis : this.scheduledUpdates) {
-        this.psU.setDate(1, new java.sql.Date(dateMillis));
-        this.psU.execute();
+    if (this.importIntoDatabase) {
+      try {
+        for (long dateMillis : this.scheduledUpdates) {
+          this.psU.setDate(1, new java.sql.Date(dateMillis));
+          this.psU.execute();
+        }
+      } catch (SQLException e) {
+        this.logger.log(Level.WARNING, "Could not add scheduled dates "
+            + "for the next refresh run.", e);
       }
-    } catch (SQLException e) {
-      this.logger.log(Level.WARNING, "Could not add scheduled dates for "
-          + "the next refresh run.", e);
     }
 
     /* Commit any stragglers before closing. */