[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [metrics-tasks/master] Add new user counting code (#8462).
commit 138b6c492bb4eddb7793a0740c2199bed7ac55fc
Author: Karsten Loesing <karsten.loesing@xxxxxxx>
Date: Wed Apr 24 08:54:35 2013 +0200
Add new user counting code (#8462).
---
task-8462/.gitignore | 7 +
task-8462/README | 50 ++++
task-8462/init-userstats.sql | 573 ++++++++++++++++++++++++++++++++++++++++++
task-8462/run-userstats.sh | 17 ++
task-8462/src/Parse.java | 449 +++++++++++++++++++++++++++++++++
5 files changed, 1096 insertions(+), 0 deletions(-)
diff --git a/task-8462/.gitignore b/task-8462/.gitignore
new file mode 100644
index 0000000..4c1e6ed
--- /dev/null
+++ b/task-8462/.gitignore
@@ -0,0 +1,7 @@
+in/
+bin/
+lib/
+out/
+status/
+*.csv
+
diff --git a/task-8462/README b/task-8462/README
new file mode 100644
index 0000000..9547efa
--- /dev/null
+++ b/task-8462/README
@@ -0,0 +1,50 @@
++------------------------------------------------------------------------+
+| An implementation of the user counting algorithm suggested in |
+| Tor Tech Report 2012-10-001 for later integration with metrics-web |
++------------------------------------------------------------------------+
+
+Instructions (for Debian Squeeze):
+
+Install Java 6 for descriptor parsing and PostgreSQL 8.4 for descriptor
+data storage and aggregation:
+
+ $ sudo apt-get install openjdk-6-jdk postgresql-8.4
+
+Create a database user and database:
+
+ $ sudo -u postgres createuser -P karsten
+ $ sudo -u postgres createdb -O karsten userstats
+ $ echo "password" > ~/.pgpass
+ $ chmod 0600 ~/.pgpass
+ $ psql -f init-userstats.sql userstats
+
+Create empty bin/, lib/, in/, status/, and out/ directories.
+
+Put required .jar files into the lib/ directory. See metrics-lib.git for
+instructions:
+
+ - lib/commons-codec-1.6.jar
+ - lib/commons-compress-1.4.1.jar
+ - lib/descriptor.jar
+
+Run the run-userstats.sh script:
+
+ $ ./run-userstats.sh
+
+Be patient.
+
+Advanced stuff: the database can also be initialized using descriptor
+archives available at https://metrics.torproject.org/data.html. Only
+relay consensuses, relay extra-info descriptors, and bridge descriptors
+are required. Put them into the following directories, ideally after
+decompressing (but not extracting them) using bunzip2:
+
+ - in/relay-descriptors/ (consensuses-*.tar and extra-infos-*.tar)
+ - in/bridge-descriptors/ (bridge-descriptors-*.tar)
+
+Also comment out the rsync command in run-userstats.sh. Then run
+run-userstats.sh. After initializing the database, clean up the in/ and
+out/ directory and don't forget to put back the rsync command in
+run-userstats.sh. It may be easier to set up separate instances of this
+tool for initializing the database and for running it on a regular basis.
+
diff --git a/task-8462/init-userstats.sql b/task-8462/init-userstats.sql
new file mode 100644
index 0000000..c586285
--- /dev/null
+++ b/task-8462/init-userstats.sql
@@ -0,0 +1,573 @@
+-- Copyright 2013 The Tor Project
+-- See LICENSE for licensing information
+
+-- Use enum types for dimensions that may only change if we write new code
+-- to support them. For example, if there's a new node type beyond relay
+-- and bridge, we'll have to write code to support it. This is in
+-- contrast to dimensions like country, transport, or version which don't
+-- have their possible values hard-coded anywhere.
+CREATE TYPE node AS ENUM ('relay', 'bridge');
+CREATE TYPE metric AS ENUM ('responses', 'bytes', 'status');
+
+-- All new data first goes into the imported table. The import tool
+-- should do some trivial checks for invalid or duplicate data, but
+-- ultimately, we're going to do these checks in the database. For
+-- example, the import tool could avoid importing data from the same
+-- descriptor more than once, but it's fine to import the same history
+-- string from distinct descriptors multiple times. The import tool must,
+-- however, make sure that stats_end is not greater than 00:00:00 of the
+-- day following stats_start. There are no constraints set on this table,
+-- because importing data should be really, really fast. Once the newly
+-- imported data is successfully processed, the imported table is emptied.
+CREATE TABLE imported (
+
+ -- The 40-character upper-case hex string identifies a descriptor
+ -- uniquely and is used to join metrics (responses, bytes, status)
+ -- published by the same node (relay or bridge).
+ fingerprint CHARACTER(40) NOT NULL,
+
+ -- The node type is used to decide the statistics that this entry will
+ -- be part of.
+ node node NOT NULL,
+
+ -- The metric of this entry describes the stored observation type.
+ -- We'll want to store different metrics published by a node:
+ -- - 'responses' are the number of v3 network status consensus requests
+ -- that the node responded to;
+ -- - 'bytes' are the number of bytes that the node wrote when answering
+ -- directory requests;
+ -- - 'status' are the intervals when the node was listed as running in
+ -- the network status published by either the directory authorities or
+ -- bridge authority.
+ metric metric NOT NULL,
+
+ -- The two-letter lower-case country code that the observation in this
+ -- entry can be attributed to; can be '??' if no country information is
+ -- known for this entry, or '' (empty string) if this entry summarizes
+ -- observations for all countries.
+ country CHARACTER VARYING(2) NOT NULL,
+
+ -- The pluggable transport name that the observation in this entry can
+ -- be attributed to; can be '<OR>' if no pluggable transport was used,
+ -- '<??>' if an unknown pluggable transport was used, or '' (empty
+ -- string) if this entry summarizes observations for all transports.
+ transport CHARACTER VARYING(20) NOT NULL,
+
+ -- The IP address version that the observation in this entry can be
+ -- attributed to; can be 'v4' or 'v6' or '' (empty string) if this entry
+ -- summarizes observations for all IP address versions.
+ version CHARACTER VARYING(2) NOT NULL,
+
+ -- The interval start of this observation.
+ stats_start TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+
+ -- The interval end of this observation. This timestamp must be greater
+ -- than stats_start and must not be greater than 00:00:00 of the day
+ -- following stats_start, which the import tool must make sure.
+ stats_end TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+
+ -- Finally, the observed value.
+ val DOUBLE PRECISION NOT NULL
+);
+
+-- After importing new data into the imported table, they are merged into
+-- the merged table using the merge() function. The merged table contains
+-- the same data as the imported table, except:
+-- (1) there are no duplicate or overlapping entries in the merged table
+-- with respect to stats_start and stats_end and the same fingerprint,
+-- node, metric, country, transport, and version columns;
+-- (2) all subsequent intervals with the same node, metric, country,
+-- transport, version, and stats_start date are compressed into a
+-- single entry.
+CREATE TABLE merged (
+
+ -- The unique key that is only used when merging newly imported data
+ -- into this table.
+ id SERIAL PRIMARY KEY,
+
+ -- All other columns have the same meaning as in the imported table.
+ fingerprint CHARACTER(40) NOT NULL,
+ node node NOT NULL,
+ metric metric NOT NULL,
+ country CHARACTER VARYING(2) NOT NULL,
+ transport CHARACTER VARYING(20) NOT NULL,
+ version CHARACTER VARYING(2) NOT NULL,
+ stats_start TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+ stats_end TIMESTAMP WITHOUT TIME ZONE NOT NULL,
+ val DOUBLE PRECISION NOT NULL
+);
+
+-- After merging new data into the merged table, they are aggregated to
+-- daily user number estimates using the aggregate() function. Only dates
+-- with new data in the imported table will be recomputed in the
+-- aggregated table. The aggregated components follow the algorithm
+-- proposed in Tor Tech Report 2012-10-001.
+CREATE TABLE aggregated (
+
+ -- The date of these aggregated observations.
+ date DATE NOT NULL,
+
+ -- The node, country, transport, and version columns all have the same
+ -- meaning as in the imported table.
+ node node NOT NULL,
+ country CHARACTER VARYING(2) NOT NULL DEFAULT '',
+ transport CHARACTER VARYING(20) NOT NULL DEFAULT '',
+ version CHARACTER VARYING(2) NOT NULL DEFAULT '',
+
+ -- Total number of reported responses, possibly broken down by country,
+ -- transport, or version if either of them is not ''. See r(R) in the
+ -- tech report.
+ rrx DOUBLE PRECISION NOT NULL DEFAULT 0,
+
+ -- Total number of seconds of nodes reporting responses, possibly broken
+ -- down by country, transport, or version if either of them is not ''.
+ -- This would be referred to as n(R) in the tech report, though it's not
+ -- used there.
+ nrx DOUBLE PRECISION NOT NULL DEFAULT 0,
+
+ -- Total number of reported bytes. See h(H) in the tech report.
+ hh DOUBLE PRECISION NOT NULL DEFAULT 0,
+
+ -- Total number of seconds of nodes in the status. See n(N) in the tech
+ -- report.
+ nn DOUBLE PRECISION NOT NULL DEFAULT 0,
+
+ -- Number of reported bytes of nodes that reported both responses and
+ -- bytes. See h(R intersect H) in the tech report.
+ hrh DOUBLE PRECISION NOT NULL DEFAULT 0,
+
+ -- Number of seconds of nodes reporting bytes. See n(H) in the tech
+ -- report.
+ nh DOUBLE PRECISION NOT NULL DEFAULT 0,
+
+ -- Number of seconds of nodes reporting responses but no bytes. See
+ -- n(R \ H) in the tech report.
+ nrh DOUBLE PRECISION NOT NULL DEFAULT 0
+);
+
+CREATE LANGUAGE plpgsql;
+
+-- Merge new entries from the imported table into the merged table, and
+-- compress them while doing so. This function first executes a query to
+-- match all entries in the imported table with adjacent or even
+-- overlapping entries in the merged table. It then loops over query
+-- results and either inserts or updates entries in the merged table. The
+-- idea is to leave query optimization to the database and only touch
+-- as few entries as possible while running this function.
+CREATE OR REPLACE FUNCTION merge() RETURNS VOID AS $$
+DECLARE
+
+ -- The current record that we're handling in the loop body.
+ cur RECORD;
+
+ -- Various information about the last record we processed, so that we
+ -- can merge the current record with the last one if possible.
+ last_fingerprint CHARACTER(40) := NULL;
+ last_node node;
+ last_metric metric;
+ last_country CHARACTER VARYING(2);
+ last_transport CHARACTER VARYING(20);
+ last_version CHARACTER VARYING(2);
+ last_start TIMESTAMP WITHOUT TIME ZONE;
+ last_end TIMESTAMP WITHOUT TIME ZONE;
+ last_id INTEGER;
+ last_val DOUBLE PRECISION;
+
+ -- Interval end and value of the last record before updating them in the
+ -- last loop step. In a few edge cases, we may update an entry and
+ -- learn in the next loop step that the updated entry overlaps with the
+ -- subsequent entry. In these cases we'll have to undo the update,
+ -- which is why we're storing the updated values.
+ undo_end TIMESTAMP WITHOUT TIME ZONE;
+ undo_val DOUBLE PRECISION;
+
+BEGIN
+ RAISE NOTICE '% Starting to merge.', timeofday();
+
+ -- TODO Maybe we'll have to materialize a merged_part table that only
+ -- contains dates IN (SELECT DISTINCT DATE(stats_start) FROM imported)
+ -- and use that in the query below.
+
+ -- Loop over results from a query that joins new entries in the imported
+ -- table with existing entries in the merged table.
+ FOR cur IN SELECT DISTINCT
+
+ -- Select id, interval start and end, and value of the existing entry
+ -- in merged; all these fields may be null if the imported entry is
+ -- not adjacent to an existing one.
+ merged.id AS merged_id,
+ merged.stats_start AS merged_start,
+ merged.stats_end AS merged_end,
+ merged.val AS merged_val,
+
+ -- Select interval start and end and value of the newly imported
+ -- entry.
+ imported.stats_start AS imported_start,
+ imported.stats_end AS imported_end,
+ imported.val AS imported_val,
+
+ -- Select columns that define the group of entries that can be merged
+ -- in the merged table.
+ imported.fingerprint AS fingerprint,
+ imported.node AS node,
+ imported.metric AS metric,
+ imported.country AS country,
+ imported.transport AS transport,
+ imported.version AS version
+
+ -- Select these columns from all entries in the imported table, plus
+ -- do an outer join on the merged table to find adjacent entries that
+ -- we might want to merge the new entries with. It's possible that we
+ -- handle the same imported entry twice, if it starts directly after
+ -- one existing entry and ends directly before another existing entry.
+ FROM imported LEFT JOIN merged
+
+ -- First two join conditions are to find adjacent intervals. In fact,
+ -- we also include overlapping intervals here, so that we can skip the
+ -- overlapping entry in the imported table.
+ ON imported.stats_end >= merged.stats_start AND
+ imported.stats_start <= merged.stats_end AND
+
+ -- Further join conditions are same date, fingerprint, node, etc.,
+ -- so that we don't merge entries that don't belong together.
+ DATE(imported.stats_start) = DATE(merged.stats_start) AND
+ imported.fingerprint = merged.fingerprint AND
+ imported.node = merged.node AND
+ imported.metric = merged.metric AND
+ imported.country = merged.country AND
+ imported.transport = merged.transport AND
+ imported.version = merged.version
+
+ -- Ordering is key, or our approach to merge subsequent entries is
+ -- going to break.
+ ORDER BY imported.fingerprint, imported.node, imported.metric,
+ imported.country, imported.transport, imported.version,
+ imported.stats_start, merged.stats_start, imported.stats_end
+
+ -- Now go through the results one by one.
+ LOOP
+
+ -- Log that we're done with the query and about to start merging.
+ IF last_fingerprint IS NULL THEN
+ RAISE NOTICE '% Query returned, now merging entries.', timeofday();
+ END IF;
+
+ -- If we're processing the very first entry or if we have reached a
+ -- new group of entries that belong together, (re-)set last_*
+ -- variables.
+ IF last_fingerprint IS NULL OR
+ DATE(cur.imported_start) <> DATE(last_start) OR
+ cur.fingerprint <> last_fingerprint OR
+ cur.node <> last_node OR
+ cur.metric <> last_metric OR
+ cur.country <> last_country OR
+ cur.transport <> last_transport OR
+ cur.version <> last_version THEN
+ last_id := -1;
+ last_start := '1970-01-01 00:00:00';
+ last_end := '1970-01-01 00:00:00';
+ last_val := -1;
+ END IF;
+
+ -- Remember all fields that determine the group of which entries
+ -- belong together.
+ last_fingerprint := cur.fingerprint;
+ last_node := cur.node;
+ last_metric := cur.metric;
+ last_country := cur.country;
+ last_transport := cur.transport;
+ last_version := cur.version;
+
+ -- If the existing entry that we're currently looking at starts before
+ -- the previous entry ends, we have created two overlapping entries in
+ -- the last iteration, and that is not allowed. Undo the previous
+ -- change.
+ IF cur.merged_start IS NOT NULL AND
+ cur.merged_start < last_end AND
+ undo_end IS NOT NULL AND undo_val IS NOT NULL THEN
+ UPDATE merged SET stats_end = undo_end, val = undo_val
+ WHERE id = last_id;
+ undo_end := NULL;
+ undo_val := NULL;
+
+ -- If there is no adjacent entry to the one we're about to merge,
+ -- insert it as new entry.
+ ELSIF cur.merged_end IS NULL THEN
+ IF cur.imported_start > last_end THEN
+ last_start := cur.imported_start;
+ last_end := cur.imported_end;
+ last_val := cur.imported_val;
+ INSERT INTO merged (fingerprint, node, metric, country, transport,
+ version, stats_start, stats_end, val)
+ VALUES (last_fingerprint, last_node, last_metric, last_country,
+ last_transport, last_version, last_start, last_end,
+ last_val)
+ RETURNING id INTO last_id;
+
+ -- If there was no adjacent entry before starting to merge, but
+ -- there is now one ending right before the new entry starts, merge
+ -- the new entry into the existing one.
+ ELSIF cur.imported_start = last_end THEN
+ last_val := last_val + cur.imported_val;
+ last_end := cur.imported_end;
+ UPDATE merged SET stats_end = last_end, val = last_val
+ WHERE id = last_id;
+ END IF;
+
+ -- There's no risk of this entry overlapping with the next.
+ undo_end := NULL;
+ undo_val := NULL;
+
+ -- If the new entry ends right when an existing entry starts, but
+ -- there's a gap between when the previously processed entry ends and
+ -- when the new entry starts, merge the new entry with the existing
+ -- entry we're currently looking at.
+ ELSIF cur.imported_end = cur.merged_start THEN
+ IF cur.imported_start > last_end THEN
+ last_id := cur.merged_id;
+ last_start := cur.imported_start;
+ last_end := cur.merged_end;
+ last_val := cur.imported_val + cur.merged_val;
+ UPDATE merged SET stats_start = last_start, val = last_val
+ WHERE id = last_id;
+
+ -- If the new entry ends right when an existing entry starts and
+ -- there's no gap between when the previousl processed entry ends
+ -- and when the new entry starts, merge the new entry with the other
+ -- two entries. This happens by deleting the previous entry and
+ -- expanding the subsequent entry to cover all three entries.
+ ELSIF cur.imported_start = last_end THEN
+ DELETE FROM merged WHERE id = last_id;
+ last_id := cur.merged_id;
+ last_end := cur.merged_end;
+ last_val := last_val + cur.merged_val;
+ UPDATE merged SET stats_start = last_start, val = last_val
+ WHERE id = last_id;
+ END IF;
+
+ -- There's no risk of this entry overlapping with the next.
+ undo_end := NULL;
+ undo_val := NULL;
+
+ -- If the new entry starts right when an existing entry ends, but
+ -- there's a gap between the previously processed entry and the
+ -- existing one, extend the existing entry. There's a special case
+ -- when this operation is false and must be undone, which is when the
+ -- newly added entry overlaps with the subsequent entry. That's why
+ -- we have to store the old interval end and value, so that this
+ -- operation can be undone in the next loop iteration.
+ ELSIF cur.imported_start = cur.merged_end THEN
+ IF last_end < cur.imported_start THEN
+ undo_end := cur.merged_end;
+ undo_val := cur.merged_val;
+ last_id := cur.merged_id;
+ last_start := cur.merged_start;
+ last_end := cur.imported_end;
+ last_val := cur.merged_val + cur.imported_val;
+ UPDATE merged SET stats_end = last_end, val = last_val
+ WHERE id = last_id;
+
+ -- If the new entry starts right when an existing entry ends and
+ -- there's no gap between the previously processed entry and the
+ -- existing entry, extend the existing entry. This is very similar
+ -- to the previous case. The same reasoning about possibly having
+ -- to undo this operation applies.
+ ELSE
+ undo_end := cur.merged_end;
+ undo_val := last_val;
+ last_end := cur.imported_end;
+ last_val := last_val + cur.imported_val;
+ UPDATE merged SET stats_end = last_end, val = last_val
+ WHERE id = last_id;
+ END IF;
+
+ -- If none of the cases above applies, there must have been an overlap
+ -- between the new entry and an existing one. Skip the new entry.
+ ELSE
+ last_id := cur.merged_id;
+ last_start := cur.merged_start;
+ last_end := cur.merged_end;
+ last_val := cur.merged_val;
+ END IF;
+ END LOOP;
+
+ -- That's it, we're done merging.
+ RAISE NOTICE '% Finishing merge.', timeofday();
+ RETURN;
+END;
+$$ LANGUAGE plpgsql;
+
+-- Aggregate user estimates for all dates that have updated entries in the
+-- merged table. This function first creates a temporary table with
+-- new or updated observations, then removes all existing estimates for
+-- the dates to be updated, and finally inserts newly computed aggregates
+-- for these dates.
+CREATE OR REPLACE FUNCTION aggregate() RETURNS VOID AS $$
+BEGIN
+ RAISE NOTICE '% Starting aggregate step.', timeofday();
+
+ -- Create a new temporary table containing all relevant information
+ -- needed to update the aggregated table. In this table, we sum up all
+ -- observations of a given type by reporting node. This query is
+ -- (temporarily) materialized, because we need to combine its entries
+ -- multiple times in various ways. A (non-materialized) view would have
+ -- meant to re-compute this query multiple times.
+ CREATE TEMPORARY TABLE update AS
+ SELECT fingerprint, node, metric, country, transport, version,
+ DATE(stats_start), SUM(val) AS val,
+ SUM(CAST(EXTRACT(EPOCH FROM stats_end - stats_start)
+ AS DOUBLE PRECISION)) AS seconds
+ FROM merged
+ WHERE DATE(stats_start) IN (
+ SELECT DISTINCT DATE(stats_start) FROM imported)
+ GROUP BY fingerprint, node, metric, country, transport, version,
+ DATE(stats_start);
+
+ -- Delete all entries from the aggregated table that we're about to
+ -- re-compute.
+ DELETE FROM aggregated WHERE date IN (SELECT DISTINCT date FROM update);
+
+ -- Insert partly empty results for all existing combinations of date,
+ -- node ('relay' or 'bridge'), country, transport, and version. Only
+ -- the rrx and nrx fields will contain number and seconds of reported
+ -- responses for the given combination of date, node, etc., while the
+ -- other fields will be updated below.
+ INSERT INTO aggregated (date, node, country, transport, version, rrx,
+ nrx)
+ SELECT date, node, country, transport, version, SUM(val) AS rrx,
+ SUM(seconds) AS nrx
+ FROM update WHERE metric = 'responses'
+ GROUP BY date, node, country, transport, version;
+
+ -- Create another temporary table with only those entries that aren't
+ -- broken down by any dimension. This table is much smaller, so the
+ -- following operations are much faster.
+ CREATE TEMPORARY TABLE update_no_dimensions AS
+ SELECT fingerprint, node, metric, date, val, seconds FROM update
+ WHERE country = ''
+ AND transport = ''
+ AND version = '';
+
+ -- Update results in the aggregated table by setting aggregates based
+ -- on reported directory bytes. These aggregates are only based on
+ -- date and node, so that the same values are set for all combinations
+ -- of country, transport, and version.
+ UPDATE aggregated
+ SET hh = aggregated_bytes.hh, nh = aggregated_bytes.nh
+ FROM (
+ SELECT date, node, SUM(val) AS hh, SUM(seconds) AS nh
+ FROM update_no_dimensions
+ WHERE metric = 'bytes'
+ GROUP BY date, node
+ ) aggregated_bytes
+ WHERE aggregated.date = aggregated_bytes.date
+ AND aggregated.node = aggregated_bytes.node;
+
+ -- Update results based on nodes being contained in the network status.
+ UPDATE aggregated
+ SET nn = aggregated_status.nn
+ FROM (
+ SELECT date, node, SUM(seconds) AS nn
+ FROM update_no_dimensions
+ WHERE metric = 'status'
+ GROUP BY date, node
+ ) aggregated_status
+ WHERE aggregated.date = aggregated_status.date
+ AND aggregated.node = aggregated_status.node;
+
+ -- Update results based on nodes reporting both bytes and responses.
+ UPDATE aggregated
+ SET hrh = aggregated_bytes_responses.hrh
+ FROM (
+ SELECT bytes.date, bytes.node,
+ SUM((LEAST(bytes.seconds, responses.seconds)
+ * bytes.val) / bytes.seconds) AS hrh
+ FROM update_no_dimensions bytes
+ LEFT JOIN update_no_dimensions responses
+ ON bytes.date = responses.date
+ AND bytes.fingerprint = responses.fingerprint
+ AND bytes.node = responses.node
+ WHERE bytes.metric = 'bytes'
+ AND responses.metric = 'responses'
+ GROUP BY bytes.date, bytes.node
+ ) aggregated_bytes_responses
+ WHERE aggregated.date = aggregated_bytes_responses.date
+ AND aggregated.node = aggregated_bytes_responses.node;
+
+ -- Update results based on notes reporting responses but no bytes.
+ UPDATE aggregated
+ SET nrh = aggregated_responses_bytes.nrh
+ FROM (
+ SELECT responses.date, responses.node,
+ SUM(GREATEST(0, responses.seconds
+ - COALESCE(bytes.seconds, 0))) AS nrh
+ FROM update_no_dimensions responses
+ LEFT JOIN update_no_dimensions bytes
+ ON responses.date = bytes.date
+ AND responses.fingerprint = bytes.fingerprint
+ AND responses.node = bytes.node
+ WHERE responses.metric = 'responses'
+ AND bytes.metric = 'bytes'
+ GROUP BY responses.date, responses.node
+ ) aggregated_responses_bytes
+ WHERE aggregated.date = aggregated_responses_bytes.date
+ AND aggregated.node = aggregated_responses_bytes.node;
+
+ -- We're done aggregating new data.
+ RAISE NOTICE '% Finishing aggregate step.', timeofday();
+ RETURN;
+END;
+$$ LANGUAGE plpgsql;
+
+-- User-friendly view on the aggregated table that implements the
+-- algorithm proposed in Tor Tech Report 2012-10-001. This view returns
+-- user number estimates for both relay and bridge staistics, possibly
+-- broken down by country or transport or version.
+CREATE OR REPLACE VIEW estimated AS SELECT
+
+ -- The date of this user number estimate.
+ a.date,
+
+ -- The node type, which is either 'relay' or 'bridge'.
+ a.node,
+
+ -- The two-letter lower-case country code of this estimate; can be '??'
+ -- for an estimate of users that could not be resolved to any country,
+ -- or '' (empty string) for an estimate of all users, regardless of
+ -- country.
+ a.country,
+
+ -- The pluggable transport name of this estimate; can be '<OR>' for an
+ -- estimate of users that did not use any pluggable transport, '<??>'
+ -- for unknown pluggable transports, or '' (empty string) for an
+ -- estimate of all users, regardless of transport.
+ a.transport,
+
+ -- The IP address version of this estimate; can be 'v4' or 'v6', or ''
+ -- (empty string) for an estimate of all users, regardless of IP address
+ -- version.
+ a.version,
+
+ -- Estimated fraction of nodes reporting directory requests, which is
+ -- used to extrapolate observed requests to estimated total requests in
+ -- the network. The closer this fraction is to 1.0, the more precise
+ -- the estimation.
+ CAST(a.frac * 100 AS INTEGER) AS frac,
+
+ -- Finally, the estimate number of users.
+ CAST(a.rrx / (a.frac * 10) AS INTEGER) AS users
+
+ -- Implement the estimation method in a subquery, so that the ugly
+ -- formula only has to be written once.
+ FROM (
+ SELECT date, node, country, transport, version, rrx, nrx,
+ (hrh * nh + hh * nrh) / (hh * nn) AS frac
+ FROM aggregated WHERE hh * nn > 0.0) a
+
+ -- Only include estimates with at least 10% of nodes reporting directory
+ -- request statistics.
+ WHERE a.frac BETWEEN 0.1 AND 1.0
+
+ -- Order results.
+ ORDER BY date DESC, node, version, transport, country;
+
diff --git a/task-8462/run-userstats.sh b/task-8462/run-userstats.sh
new file mode 100644
index 0000000..9a759ee
--- /dev/null
+++ b/task-8462/run-userstats.sh
@@ -0,0 +1,17 @@
+#!/bin/sh
+set -e
+echo `date` "Starting."
+echo `date` "Downloading descriptors."
+rsync -arz --delete --exclude 'relay-descriptors/votes' metrics.torproject.org::metrics-recent in
+echo `date` "Parsing descriptors."
+javac -d bin/ -cp lib/commons-codec-1.6.jar:lib/commons-compress-1.4.1.jar:lib/descriptor.jar src/Parse.java
+java -cp bin/:lib/commons-codec-1.6.jar:lib/commons-compress-1.4.1.jar:lib/descriptor.jar Parse
+for i in $(ls out/*.sql)
+do
+ echo `date` "Importing $i."
+ psql -f $i userstats
+done
+echo `date` "Exporting results."
+psql -c 'COPY (SELECT * FROM estimated) TO STDOUT WITH CSV HEADER;' userstats > userstats.csv
+echo `date` "Terminating."
+
diff --git a/task-8462/src/Parse.java b/task-8462/src/Parse.java
new file mode 100644
index 0000000..fdf9bf2
--- /dev/null
+++ b/task-8462/src/Parse.java
@@ -0,0 +1,449 @@
+/* Copyright 2013 The Tor Project
+ * See LICENSE for licensing information */
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.Stack;
+import java.util.TimeZone;
+import java.util.TreeMap;
+
+import org.torproject.descriptor.BandwidthHistory;
+import org.torproject.descriptor.BridgeNetworkStatus;
+import org.torproject.descriptor.Descriptor;
+import org.torproject.descriptor.DescriptorFile;
+import org.torproject.descriptor.DescriptorReader;
+import org.torproject.descriptor.DescriptorSourceFactory;
+import org.torproject.descriptor.ExtraInfoDescriptor;
+import org.torproject.descriptor.NetworkStatusEntry;
+import org.torproject.descriptor.RelayNetworkStatusConsensus;
+
+public class Parse {
+
+ public static void main(String[] args) throws Exception {
+ detectBulkOrRegular();
+ parseRelayDescriptors();
+ parseBridgeDescriptors();
+ closeOutputFiles();
+ }
+
+ private static boolean isBulkImport = false;
+ private static void detectBulkOrRegular() {
+ Stack<File> inFiles = new Stack<File>();
+ inFiles.add(new File("in"));
+ while (!inFiles.isEmpty()) {
+ File file = inFiles.pop();
+ if (file.isDirectory()) {
+ inFiles.addAll(Arrays.asList(file.listFiles()));
+ } else if (file.getName().endsWith(".tar") ||
+ file.getName().endsWith(".tar.bz2")) {
+ isBulkImport = true;
+ break;
+ } else {
+ isBulkImport = false;
+ break;
+ }
+ }
+ }
+
+ private static final long ONE_HOUR_MILLIS = 60L * 60L * 1000L,
+ ONE_DAY_MILLIS = 24L * ONE_HOUR_MILLIS,
+ ONE_WEEK_MILLIS = 7L * ONE_DAY_MILLIS;
+
+ private static void parseRelayDescriptors() throws Exception {
+ DescriptorReader descriptorReader =
+ DescriptorSourceFactory.createDescriptorReader();
+ descriptorReader.setExcludeFiles(new File(
+ "status/relay-descriptors"));
+ descriptorReader.addDirectory(new File("in/relay-descriptors/"));
+ Iterator<DescriptorFile> descriptorFiles =
+ descriptorReader.readDescriptors();
+ while (descriptorFiles.hasNext()) {
+ DescriptorFile descriptorFile = descriptorFiles.next();
+ for (Descriptor descriptor : descriptorFile.getDescriptors()) {
+ if (descriptor instanceof ExtraInfoDescriptor) {
+ parseRelayExtraInfoDescriptor((ExtraInfoDescriptor) descriptor);
+ } else if (descriptor instanceof RelayNetworkStatusConsensus) {
+ parseRelayNetworkStatusConsensus(
+ (RelayNetworkStatusConsensus) descriptor);
+ }
+ }
+ }
+ }
+
+ private static void parseRelayExtraInfoDescriptor(
+ ExtraInfoDescriptor descriptor) throws IOException {
+ long publishedMillis = descriptor.getPublishedMillis();
+ String fingerprint = descriptor.getFingerprint().
+ toUpperCase();
+ long dirreqStatsEndMillis = descriptor.getDirreqStatsEndMillis();
+ long dirreqStatsIntervalLengthMillis =
+ descriptor.getDirreqStatsIntervalLength() * 1000L;
+ SortedMap<String, Integer> requests = descriptor.getDirreqV3Reqs();
+ BandwidthHistory dirreqWriteHistory =
+ descriptor.getDirreqWriteHistory();
+ parseRelayDirreqV3Reqs(fingerprint, publishedMillis,
+ dirreqStatsEndMillis, dirreqStatsIntervalLengthMillis, requests);
+ parseRelayDirreqWriteHistory(fingerprint, publishedMillis,
+ dirreqWriteHistory);
+ }
+
+ private static void parseRelayDirreqV3Reqs(String fingerprint,
+ long publishedMillis, long dirreqStatsEndMillis,
+ long dirreqStatsIntervalLengthMillis,
+ SortedMap<String, Integer> requests) throws IOException {
+ if (requests == null ||
+ publishedMillis - dirreqStatsEndMillis > ONE_WEEK_MILLIS ||
+ dirreqStatsIntervalLengthMillis != ONE_DAY_MILLIS) {
+ /* Cut off all observations that are one week older than
+ * the descriptor publication time, or we'll have to update
+ * weeks of aggregate values every hour. */
+ return;
+ }
+ long statsStartMillis = dirreqStatsEndMillis
+ - dirreqStatsIntervalLengthMillis;
+ long utcBreakMillis = (dirreqStatsEndMillis / ONE_DAY_MILLIS)
+ * ONE_DAY_MILLIS;
+ for (int i = 0; i < 2; i++) {
+ long fromMillis = i == 0 ? statsStartMillis
+ : utcBreakMillis;
+ long toMillis = i == 0 ? utcBreakMillis : dirreqStatsEndMillis;
+ if (fromMillis >= toMillis) {
+ continue;
+ }
+ double intervalFraction = ((double) (toMillis - fromMillis))
+ / ((double) dirreqStatsIntervalLengthMillis);
+ double sum = 0L;
+ for (Map.Entry<String, Integer> e : requests.entrySet()) {
+ String country = e.getKey();
+ double reqs = ((double) e.getValue()) - 4.0;
+ sum += reqs;
+ writeOutputLine(fingerprint, "relay", "responses", country,
+ "", "", fromMillis, toMillis, reqs * intervalFraction);
+ }
+ writeOutputLine(fingerprint, "relay", "responses", "", "",
+ "", fromMillis, toMillis, sum * intervalFraction);
+ }
+ }
+
+ private static void parseRelayDirreqWriteHistory(String fingerprint,
+ long publishedMillis, BandwidthHistory dirreqWriteHistory)
+ throws IOException {
+ if (dirreqWriteHistory == null ||
+ publishedMillis - dirreqWriteHistory.getHistoryEndMillis()
+ > ONE_WEEK_MILLIS) {
+ return;
+ /* Cut off all observations that are one week older than
+ * the descriptor publication time, or we'll have to update
+ * weeks of aggregate values every hour. */
+ }
+ long intervalLengthMillis =
+ dirreqWriteHistory.getIntervalLength() * 1000L;
+ for (Map.Entry<Long, Long> e :
+ dirreqWriteHistory.getBandwidthValues().entrySet()) {
+ long intervalEndMillis = e.getKey();
+ long intervalStartMillis =
+ intervalEndMillis - intervalLengthMillis;
+ for (int i = 0; i < 2; i++) {
+ long fromMillis = intervalStartMillis;
+ long toMillis = intervalEndMillis;
+ double writtenBytes = (double) e.getValue();
+ if (intervalStartMillis / ONE_DAY_MILLIS <
+ intervalEndMillis / ONE_DAY_MILLIS) {
+ long utcBreakMillis = (intervalEndMillis
+ / ONE_DAY_MILLIS) * ONE_DAY_MILLIS;
+ if (i == 0) {
+ toMillis = utcBreakMillis;
+ } else if (i == 1) {
+ fromMillis = utcBreakMillis;
+ }
+ double intervalFraction = ((double) (toMillis - fromMillis))
+ / ((double) intervalLengthMillis);
+ writtenBytes *= intervalFraction;
+ } else if (i == 1) {
+ break;
+ }
+ writeOutputLine(fingerprint, "relay", "bytes", "", "", "",
+ fromMillis, toMillis, writtenBytes);
+ }
+ }
+ }
+
+ private static void parseRelayNetworkStatusConsensus(
+ RelayNetworkStatusConsensus consensus) throws IOException {
+ long fromMillis = consensus.getValidAfterMillis();
+ long toMillis = consensus.getFreshUntilMillis();
+ for (NetworkStatusEntry statusEntry :
+ consensus.getStatusEntries().values()) {
+ String fingerprint = statusEntry.getFingerprint().
+ toUpperCase();
+ if (statusEntry.getFlags().contains("Running")) {
+ writeOutputLine(fingerprint, "relay", "status", "", "", "",
+ fromMillis, toMillis, 0.0);
+ }
+ }
+ }
+
+ private static void parseBridgeDescriptors() throws Exception {
+ DescriptorReader descriptorReader =
+ DescriptorSourceFactory.createDescriptorReader();
+ descriptorReader.setExcludeFiles(new File(
+ "status/bridge-descriptors"));
+ descriptorReader.addDirectory(new File(
+ "in/bridge-descriptors/"));
+ Iterator<DescriptorFile> descriptorFiles =
+ descriptorReader.readDescriptors();
+ while (descriptorFiles.hasNext()) {
+ DescriptorFile descriptorFile = descriptorFiles.next();
+ for (Descriptor descriptor : descriptorFile.getDescriptors()) {
+ if (descriptor instanceof ExtraInfoDescriptor) {
+ parseBridgeExtraInfoDescriptor(
+ (ExtraInfoDescriptor) descriptor);
+ } else if (descriptor instanceof BridgeNetworkStatus) {
+ parseBridgeNetworkStatus((BridgeNetworkStatus) descriptor);
+ }
+ }
+ }
+ }
+
+ private static void parseBridgeExtraInfoDescriptor(
+ ExtraInfoDescriptor descriptor) throws IOException {
+ String fingerprint = descriptor.getFingerprint().toUpperCase();
+ long publishedMillis = descriptor.getPublishedMillis();
+ long dirreqStatsEndMillis = descriptor.getDirreqStatsEndMillis();
+ long dirreqStatsIntervalLengthMillis =
+ descriptor.getDirreqStatsIntervalLength() * 1000L;
+ parseBridgeDirreqV3Resp(fingerprint, publishedMillis,
+ dirreqStatsEndMillis, dirreqStatsIntervalLengthMillis,
+ descriptor.getDirreqV3Resp(),
+ descriptor.getBridgeIps(),
+ descriptor.getBridgeIpTransports(),
+ descriptor.getBridgeIpVersions());
+
+ parseBridgeDirreqWriteHistory(fingerprint, publishedMillis,
+ descriptor.getDirreqWriteHistory());
+ }
+
+ private static void parseBridgeDirreqV3Resp(String fingerprint,
+ long publishedMillis, long dirreqStatsEndMillis,
+ long dirreqStatsIntervalLengthMillis,
+ SortedMap<String, Integer> responses,
+ SortedMap<String, Integer> bridgeIps,
+ SortedMap<String, Integer> bridgeIpTransports,
+ SortedMap<String, Integer> bridgeIpVersions) throws IOException {
+ if (responses == null ||
+ publishedMillis - dirreqStatsEndMillis > ONE_WEEK_MILLIS ||
+ dirreqStatsIntervalLengthMillis != ONE_DAY_MILLIS) {
+ /* Cut off all observations that are one week older than
+ * the descriptor publication time, or we'll have to update
+ * weeks of aggregate values every hour. */
+ return;
+ }
+ long statsStartMillis = dirreqStatsEndMillis
+ - dirreqStatsIntervalLengthMillis;
+ long utcBreakMillis = (dirreqStatsEndMillis / ONE_DAY_MILLIS)
+ * ONE_DAY_MILLIS;
+ double resp = ((double) responses.get("ok")) - 4.0;
+ if (resp > 0.0) {
+ for (int i = 0; i < 2; i++) {
+ long fromMillis = i == 0 ? statsStartMillis
+ : utcBreakMillis;
+ long toMillis = i == 0 ? utcBreakMillis : dirreqStatsEndMillis;
+ if (fromMillis >= toMillis) {
+ continue;
+ }
+ double intervalFraction = ((double) (toMillis - fromMillis))
+ / ((double) dirreqStatsIntervalLengthMillis);
+ writeOutputLine(fingerprint, "bridge", "responses", "", "",
+ "", fromMillis, toMillis, resp * intervalFraction);
+ parseBridgeRespByCategory(fingerprint, fromMillis, toMillis, resp,
+ dirreqStatsIntervalLengthMillis, "country", bridgeIps);
+ parseBridgeRespByCategory(fingerprint, fromMillis, toMillis, resp,
+ dirreqStatsIntervalLengthMillis, "transport",
+ bridgeIpTransports);
+ parseBridgeRespByCategory(fingerprint, fromMillis, toMillis, resp,
+ dirreqStatsIntervalLengthMillis, "version", bridgeIpVersions);
+ }
+ }
+ }
+
+ private static void parseBridgeRespByCategory(String fingerprint,
+ long fromMillis, long toMillis, double resp,
+ long dirreqStatsIntervalLengthMillis, String category,
+ SortedMap<String, Integer> frequencies) throws IOException {
+ double total = 0.0;
+ SortedMap<String, Double> frequenciesCopy =
+ new TreeMap<String, Double>();
+ if (frequencies != null) {
+ for (Map.Entry<String, Integer> e : frequencies.entrySet()) {
+ if (e.getValue() < 4.0) {
+ continue;
+ }
+ double r = ((double) e.getValue()) - 4.0;
+ frequenciesCopy.put(e.getKey(), r);
+ total += r;
+ }
+ }
+ /* If we're not told any frequencies, or at least none of them are
+ * greater than 4, put in a default that we'll attribute all responses
+ * to. */
+ if (total == 0) {
+ if (category.equals("country")) {
+ frequenciesCopy.put("??", 4.0);
+ } else if (category.equals("transport")) {
+ frequenciesCopy.put("<OR>", 4.0);
+ } else if (category.equals("version")) {
+ frequenciesCopy.put("v4", 4.0);
+ }
+ total = 4.0;
+ }
+ for (Map.Entry<String, Double> e : frequenciesCopy.entrySet()) {
+ double intervalFraction = ((double) (toMillis - fromMillis))
+ / ((double) dirreqStatsIntervalLengthMillis);
+ double val = resp * intervalFraction * e.getValue() / total;
+ if (category.equals("country")) {
+ writeOutputLine(fingerprint, "bridge", "responses", e.getKey(),
+ "", "", fromMillis, toMillis, val);
+ } else if (category.equals("transport")) {
+ writeOutputLine(fingerprint, "bridge", "responses", "",
+ e.getKey(), "", fromMillis, toMillis, val);
+ } else if (category.equals("version")) {
+ writeOutputLine(fingerprint, "bridge", "responses", "", "",
+ e.getKey(), fromMillis, toMillis, val);
+ }
+ }
+ }
+
+ private static void parseBridgeDirreqWriteHistory(String fingerprint,
+ long publishedMillis, BandwidthHistory dirreqWriteHistory)
+ throws IOException {
+ if (dirreqWriteHistory == null ||
+ publishedMillis - dirreqWriteHistory.getHistoryEndMillis()
+ > ONE_WEEK_MILLIS) {
+ /* Cut off all observations that are one week older than
+ * the descriptor publication time, or we'll have to update
+ * weeks of aggregate values every hour. */
+ return;
+ }
+ long intervalLengthMillis =
+ dirreqWriteHistory.getIntervalLength() * 1000L;
+ for (Map.Entry<Long, Long> e :
+ dirreqWriteHistory.getBandwidthValues().entrySet()) {
+ long intervalEndMillis = e.getKey();
+ long intervalStartMillis =
+ intervalEndMillis - intervalLengthMillis;
+ for (int i = 0; i < 2; i++) {
+ long fromMillis = intervalStartMillis;
+ long toMillis = intervalEndMillis;
+ double writtenBytes = (double) e.getValue();
+ if (intervalStartMillis / ONE_DAY_MILLIS <
+ intervalEndMillis / ONE_DAY_MILLIS) {
+ long utcBreakMillis = (intervalEndMillis
+ / ONE_DAY_MILLIS) * ONE_DAY_MILLIS;
+ if (i == 0) {
+ toMillis = utcBreakMillis;
+ } else if (i == 1) {
+ fromMillis = utcBreakMillis;
+ }
+ double intervalFraction = ((double) (toMillis - fromMillis))
+ / ((double) intervalLengthMillis);
+ writtenBytes *= intervalFraction;
+ } else if (i == 1) {
+ break;
+ }
+ writeOutputLine(fingerprint, "bridge", "bytes", "",
+ "", "", fromMillis, toMillis, writtenBytes);
+ }
+ }
+ }
+
+ private static void parseBridgeNetworkStatus(BridgeNetworkStatus status)
+ throws IOException {
+ if (status.getPublishedMillis() % ONE_HOUR_MILLIS
+ > ONE_HOUR_MILLIS / 2) {
+ return;
+ }
+ long fromMillis = (status.getPublishedMillis()
+ / ONE_HOUR_MILLIS) * ONE_HOUR_MILLIS;
+ long toMillis = fromMillis + ONE_HOUR_MILLIS;
+ for (NetworkStatusEntry statusEntry :
+ status.getStatusEntries().values()) {
+ String fingerprint = statusEntry.getFingerprint().
+ toUpperCase();
+ if (statusEntry.getFlags().contains("Running")) {
+ writeOutputLine(fingerprint, "bridge", "status", "", "", "",
+ fromMillis, toMillis, 0.0);
+ }
+ }
+ }
+
+ private static Map<String, BufferedWriter> openOutputFiles =
+ new HashMap<String, BufferedWriter>();
+ private static void writeOutputLine(String fingerprint, String node,
+ String metric, String country, String transport, String version,
+ long fromMillis, long toMillis, double val) throws IOException {
+ if (fromMillis > toMillis) {
+ return;
+ }
+ String fromDateTime = formatDateTimeMillis(fromMillis);
+ String toDateTime = formatDateTimeMillis(toMillis);
+ BufferedWriter bw = getOutputFile(fromDateTime);
+ bw.write(String.format("%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\t%.1f\n",
+ fingerprint, node, metric, country, transport, version,
+ fromDateTime, toDateTime, val));
+ }
+
+ private static SimpleDateFormat dateTimeFormat = null;
+ private static String formatDateTimeMillis(long millis) {
+ if (dateTimeFormat == null) {
+ dateTimeFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ dateTimeFormat.setLenient(false);
+ dateTimeFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
+ }
+ return dateTimeFormat.format(millis);
+ }
+
+ private static BufferedWriter getOutputFile(String fromDateTime)
+ throws IOException {
+ String outputFileName = isBulkImport
+ ? "out/userstats-" + fromDateTime.substring(0, 10) + ".sql"
+ : "out/userstats.sql";
+ BufferedWriter bw = openOutputFiles.get(outputFileName);
+ if (bw == null) {
+ bw = openOutputFile(outputFileName);
+ openOutputFiles.put(outputFileName, bw);
+ }
+ return bw;
+ }
+
+ private static BufferedWriter openOutputFile(String outputFileName)
+ throws IOException {
+ BufferedWriter bw = new BufferedWriter(new FileWriter(
+ outputFileName));
+ bw.write("BEGIN;\n");
+ bw.write("LOCK TABLE imported NOWAIT;\n");
+ bw.write("COPY imported (fingerprint, node, metric, country, "
+ + "transport, version, stats_start, stats_end, val) FROM "
+ + "stdin;\n");
+ return bw;
+ }
+
+ private static void closeOutputFiles() throws IOException {
+ for (BufferedWriter bw : openOutputFiles.values()) {
+ bw.write("\\.\n");
+ bw.write("SELECT merge();\n");
+ bw.write("SELECT aggregate();\n");
+ bw.write("TRUNCATE imported;\n");
+ bw.write("COMMIT;\n");
+ bw.close();
+ }
+ }
+}
+
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits