[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [collector/master] Optimize parallel processing and use static imports for readability.



commit bd948070e03ff71503fdba84cff6bc61c9fbe452
Author: iwakeh <iwakeh@xxxxxxxxxxxxxx>
Date:   Wed Jan 31 13:31:26 2018 +0000

    Optimize parallel processing and use static imports for readability.
---
 .../torproject/collector/webstats/SanitizeWeblogs.java   | 16 +++++++++-------
 1 file changed, 9 insertions(+), 7 deletions(-)

diff --git a/src/main/java/org/torproject/collector/webstats/SanitizeWeblogs.java b/src/main/java/org/torproject/collector/webstats/SanitizeWeblogs.java
index 4496861..635457c 100644
--- a/src/main/java/org/torproject/collector/webstats/SanitizeWeblogs.java
+++ b/src/main/java/org/torproject/collector/webstats/SanitizeWeblogs.java
@@ -3,6 +3,9 @@
 
 package org.torproject.collector.webstats;
 
+import static java.util.stream.Collectors.groupingByConcurrent;
+import static java.util.stream.Collectors.toList;
+
 import org.torproject.collector.conf.Configuration;
 import org.torproject.collector.conf.ConfigurationException;
 import org.torproject.collector.conf.Key;
@@ -37,7 +40,6 @@ import java.util.SortedSet;
 import java.util.StringJoiner;
 import java.util.TreeMap;
 import java.util.TreeSet;
-import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 /**
@@ -106,10 +108,9 @@ public class SanitizeWeblogs extends CollecTorMain {
         log.info("Processing logs for {} on {}.", virtualHost, physicalHost);
         Map<LocalDate, List<WebServerAccessLogLine>> linesByDate
             = physicalEntry.getValue().values().stream().parallel()
-            .flatMap((LogMetadata metadata)
-                -> lineStream(metadata).filter((line) -> line.isValid()))
-            .collect(Collectors.groupingBy(WebServerAccessLogLine::getDate,
-                Collectors.toList()));
+            .flatMap((LogMetadata metadata) -> lineStream(metadata)
+               .filter((line) -> line.isValid())).parallel()
+            .collect(groupingByConcurrent(WebServerAccessLogLine::getDate));
         LocalDate[] interval = determineInterval(linesByDate.keySet());
         linesByDate.entrySet().stream()
             .filter((entry) -> entry.getKey().isAfter(interval[0])
@@ -130,7 +131,7 @@ public class SanitizeWeblogs extends CollecTorMain {
     List<String> retainedLines = lines
         .stream().parallel().map((line) -> sanitize(line, date))
         .filter((line) -> line.isPresent()).map((line) -> line.get())
-        .collect(Collectors.toList());
+        .collect(toList());
     retainedLines.sort(null);
     try {
       WebServerAccessLogPersistence walp
@@ -142,6 +143,7 @@ public class SanitizeWeblogs extends CollecTorMain {
     } catch (DescriptorParseException dpe) {
       log.error("Cannot store log desriptor {}.", name, dpe);
     }
+    lines.clear();
   }
 
   static Optional<String> sanitize(WebServerAccessLogLine logLine,
@@ -188,7 +190,7 @@ public class SanitizeWeblogs extends CollecTorMain {
          metadata.fileType.decompress(Files.readAllBytes(metadata.path)))))) {
       return br.lines()
           .map((String line) -> WebServerAccessLogLine.makeLine(line))
-          .collect(Collectors.toList()).stream();
+          .collect(toList()).stream();
     } catch (Exception ex) {
       log.debug("Skipping log-file {}.", metadata.path, ex);
     }



_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits