[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

Re: [tor-dev] Metrics: Estimating fraction of reported directory-request statistics



On Sat, Apr 16, 2022 at 06:16:23PM -0600, David Fifield wrote:
> I am trying to reproduce the "frac" computation from the Reproducible
> Metrics instructions:
> https://metrics.torproject.org/reproducible-metrics.html#relay-users
> Which is also Section 3 in the tech report on counting bridge users:
> https://research.torproject.org/techreports/counting-daily-bridge-users-2012-10-24.pdf#page=4
> 
>        h(R^H) * n(H) + h(H) * n(R\H)
> frac = -----------------------------
>                 h(H) * n(N)
> 
> My minor goal is to reproduce the "frac" column from the Metrics web
> site (which I assume is the same as the frac above, expressed as a
> percentage):
> 
> https://metrics.torproject.org/userstats-relay-country.csv?start=2022-04-01&end=2022-04-08&country=all&events=off
> date,country,users,lower,upper,frac
> 2022-04-01,,2262557,,,92
> 2022-04-02,,2181639,,,92
> 2022-04-03,,2179544,,,93
> 2022-04-04,,2350360,,,93
> 2022-04-05,,2388772,,,93
> 2022-04-06,,2356170,,,93
> 2022-04-07,,2323184,,,93
> 2022-04-08,,2310170,,,91
> 
> I'm having trouble with the computation of n(R\H) and h(R∧H). I
> understand that R is the subset of relays that report directory request
> counts (i.e. that have dirreq-stats-end in their extra-info descriptors)
> and H is the subset of relays that report directory request byte counts
> (i.e. that have dirreq-write-history in their extra-info descriptors).
> R and H partially overlap: there are relays that are in R but not H,
> others that are in H but not R, and others that are in both.
>
> The computations depend on some values that are directly from
> descriptors:
> n(R) = sum of hours, for relays with directory request counts
> n(H) = sum of hours, for relays with directory write histories
> h(H) = sum of written bytes, for relays with directory write histories
>
> ...
> 
> Using the formulas and assumptions above, here's my attempt at computing
> recent "frac" values:
> 
> date       `n(N)`  `n(H)`   `h(H)`  `n(R)` `n(R\H)` `h(R∧H)` frac
> 2022-04-01 166584 177638.  2.24e13 125491.       0   1.59e13 0.753
> 2022-04-02 166951 177466.  2.18e13 125686.       0   1.54e13 0.753
> 2022-04-03 167100 177718.  2.27e13 127008.       0   1.62e13 0.760
> 2022-04-04 166970 177559.  2.43e13 126412.       0   1.73e13 0.757
> 2022-04-05 166729 177585.  2.44e13 125389.       0   1.72e13 0.752
> 2022-04-06 166832 177470.  2.39e13 127077.       0   1.71e13 0.762
> 2022-04-07 166532 177210.  2.48e13 127815.       0   1.79e13 0.768
> 2022-04-08 167695 176879.  2.52e13 127697.       0   1.82e13 0.761

I tried computing n(R\H) and h(R∧H) from the definitions, rather than by
using the formulas in the Reproducible Metrics guide. This achieves an
almost matching "frac" column, though it is still about 1% too high.

date       `n(N)`  `n(H)`   `h(H)`  `n(R)` `n(R\H)` `h(R∧H)` frac
2022-04-01 166584 177638.  2.24e13 125491.     90.9  1.96e13 0.930
2022-04-02 166951 177466.  2.18e13 125686.    181.   1.92e13 0.937
2022-04-03 167100 177718.  2.27e13 127008.    154.   2.00e13 0.942
2022-04-04 166970 177559.  2.43e13 126412.    134.   2.14e13 0.936
2022-04-05 166729 177585.  2.44e13 125389.     94.6  2.15e13 0.938
2022-04-06 166832 177470.  2.39e13 127077.    162.   2.11e13 0.940
2022-04-07 166532 177210.  2.48e13 127815.    102.   2.18e13 0.938
2022-04-08 167695 176879.  2.52e13 127697.    158.   2.21e13 0.926

I got this by taking an explicit set intersection between the R and H
time intervals. So, for example, if the intervals making up n(R) and
n(H) are (with their lengths):

n(R)    [---10---]  [----12----]          [---9---]
n(H)         [----12----]    [------16------]      [--7--]

Then the intersection n(R∧H) is:

n(R∧H)       [-5-]  [-5-]    [3]          [3]

h(R∧H) comes pro-rating the n(H) intervals, each of which is associated
with an h(H) byte count). Suppose the [----12----] interval represents
1000 bytes. Then each of the [-5-] intervals that result from it in the
intersection are worth 5/12 × 1000 = 417 bytes.

We get n(R\H) from n(R) − n(R∧H):

n(R\H)  [-5-]            [4-]                [-6--]

This seems overall more correct, though it required a more elaborate
computation than the Reproducible Metrics guide prescribes. I'm still
not sure why it does not match exactly, and I would still appreciate a
pointer to where Tor Metrics does the "frac" computation.

I was initially interested in this for the purpose of better estimating
the number of Snowflake users. But now I've decided "frac" is not useful
for that purpose: since there is only one bridge we care about, it does
not make sense to adjust the numbers to account for other bridges that
may not report the same set of statistics. I don't plan to take this
investigation any further for the time being, but here is source code to
reproduce the above tables. You will need:
https://collector.torproject.org/archive/relay-descriptors/consensuses/consensuses-2022-04.tar.xz
https://collector.torproject.org/archive/relay-descriptors/extra-infos/extra-infos-2022-04.tar.xz

./relay_uptime.py consensuses-2022-04.tar.xz > relay_uptime.csv
./relay_dir.py extra-infos-2022-04.tar.xz > relay_dir.csv
./frac.py relay_uptime.csv relay_dir.csv
#!/usr/bin/env python3

import getopt
import multiprocessing
import sys

import stem
import stem.descriptor
import stem.descriptor.reader
import stem.descriptor.networkstatus

import numpy as np
import pandas as pd

import common

def process_network_status(network_status):
    assert type(network_status) == stem.descriptor.networkstatus.NetworkStatusDocumentV3, type(network_status)

    data = {
        "date": [],
        "relay_uptime_hours": [],
    }
    # We assume the intervals seen by this function are non-overlapping.
    num_running = sum(stem.Flag.RUNNING in router.flags for router in network_status.routers.values())
    for (date, frac_int, _) in common.segment_datetime_interval(network_status.valid_after, network_status.fresh_until):
        data["date"].append(date)
        data["relay_uptime_hours"].append(num_running * frac_int)
    return pd.DataFrame(data)

def process_file(f):
    with stem.descriptor.reader.DescriptorReader([f], document_handler = stem.descriptor.DocumentHandler.DOCUMENT) as reader:
        return (
            pd.concat(process_network_status(desc) for desc in reader)
                .groupby("date").sum().reset_index()
        )

if __name__ == "__main__":
    _, inputs = getopt.gnu_getopt(sys.argv[1:], "")
    with multiprocessing.Pool(common.NUM_PROCESSES) as pool:
        (
            pd.concat(pool.imap_unordered(process_file, inputs))
                .groupby("date").sum().reset_index()
        ).to_csv(sys.stdout, index = False, float_format = "%.2f", columns = [
            "date",
            "relay_uptime_hours",
        ])
date,relay_uptime_hours
2022-04-01,166584.00
2022-04-02,166951.00
2022-04-03,167100.00
2022-04-04,166970.00
2022-04-05,166729.00
2022-04-06,166832.00
2022-04-07,166532.00
2022-04-08,167695.00
2022-04-09,167592.00
2022-04-10,167801.00
2022-04-11,167098.00
2022-04-12,166777.00
2022-04-13,166411.00
2022-04-14,27594.00
#!/usr/bin/env python3

import datetime
import getopt
import multiprocessing
import sys

import stem
import stem.descriptor
import stem.descriptor.reader
import stem.descriptor.networkstatus
import stem.descriptor.extrainfo_descriptor

import numpy as np
import pandas as pd

import common

def intersect_intervals(a, b):
    a = list(sorted(a))
    b = list(sorted(b))
    result = []
    i = 0
    j = 0
    while i < len(a) and j < len(b):
        if a[i][0] < b[j][1] and a[i][1] > b[j][0]:
            result.append((max(a[i][0], b[j][0]), min(a[i][1], b[j][1]), i, j))
        # Advance whichever sequence of intervals currently has the leftmost
        # right edge.
        if a[i][1] < b[j][1]:
            i += 1
        else:
            j += 1
    return result

def process_relay_extra_infos(reader):
    dir_write_history = {
        "published": [],
        "fingerprint": [],
        "nickname": [],
        "begin": [],
        "end": [],
        "bytes": [],
    }
    dir_stats = {
        "published": [],
        "fingerprint": [],
        "nickname": [],
        "begin": [],
        "end": [],
        "resp_ok": [],
    }
    for desc in reader:
        assert type(desc) == stem.descriptor.extrainfo_descriptor.RelayExtraInfoDescriptor, type(desc)

        if desc.dir_write_history_end is not None \
            and desc.published - desc.dir_write_history_end < common.END_THRESHOLD \
            and datetime.timedelta(seconds = desc.dir_write_history_interval) < common.INTERVAL_THRESHOLD:
            # Break the write history into separate rows, one for each interval.
            end = desc.dir_write_history_end
            for value in reversed(desc.dir_write_history_values):
                begin = end - datetime.timedelta(seconds = desc.dir_write_history_interval)
                dir_write_history["published"].append(desc.published)
                dir_write_history["fingerprint"].append(desc.fingerprint)
                dir_write_history["nickname"].append(desc.nickname)
                dir_write_history["begin"].append(begin)
                dir_write_history["end"].append(end)
                dir_write_history["bytes"].append(value)
                end = begin

        if desc.dir_stats_end is not None \
            and desc.published - desc.dir_stats_end < common.END_THRESHOLD \
            and datetime.timedelta(seconds = desc.dir_stats_interval) < common.INTERVAL_THRESHOLD:
            resp_ok = desc.dir_v3_responses[stem.descriptor.extrainfo_descriptor.DirResponse.OK] - 4
            if resp_ok > 0:
                dir_stats["published"].append(desc.published)
                dir_stats["fingerprint"].append(desc.fingerprint)
                dir_stats["nickname"].append(desc.nickname)
                dir_stats["begin"].append(desc.dir_stats_end - datetime.timedelta(seconds = desc.dir_stats_interval))
                dir_stats["end"].append(desc.dir_stats_end)
                dir_stats["resp_ok"].append(resp_ok)

    # Different descriptors for the same relay contain overlapping write
    # histories. Keep only the most recent "published" for each "end".
    dir_write_history = (
        pd.DataFrame(dir_write_history)
            .sort_values("published")
            .groupby(["fingerprint", "nickname", "end"])
            .last()
            .reset_index()
    )
    # Do the same for directory responses, though we don't expect these to
    # overlap.
    dir_stats = (
        pd.DataFrame(dir_stats)
            .sort_values("published")
            .groupby(["fingerprint", "nickname", "end"])
            .last()
            .reset_index()
    )

    # Now compute the intervals, for each relay, which are covered by *both*
    # dir_write_history and dir_stats.
    both = []
    dir_write_history_grouped = dir_write_history.groupby(["fingerprint", "nickname"])
    dir_stats_grouped = dir_stats.groupby(["fingerprint", "nickname"])
    for (fingerprint, nickname), dir_write_history_group in dir_write_history_grouped:
        try:
            dir_stats_group = dir_stats_grouped.get_group((fingerprint, nickname))
        except KeyError:
            continue
        # Find the intersection, H∧R, of write history intervals and dir stats
        # intervals.
        dir_write_history_intervals = [(row.begin, row.end) for row in dir_write_history_group.itertuples()]
        dir_stats_intervals = [(row.begin, row.end) for row in dir_stats_group.itertuples()]
        intersection = intersect_intervals(dir_write_history_intervals, dir_stats_intervals)
        if not intersection:
            continue
        # Each tuple returned by intersect_intervals contains:
        #   [0]: beginning of interval in intersection
        #   [1]: end of interval in intersection
        #   [2]: index in dir_write_history_intervals that contributes to this interval
        #   [3]: index in dir_stats_intervals that contributes to this interval
        # We make a joint dataframe that maps the intersection intervals
        # (ibegin = [0], iend = [1]) to their [2] corresponding intervals in
        # dir_write_history_intervals, along with their byte counts. We use this
        # to scale the byte counts for the intersection intervals.
        joint = pd.concat([
            pd.DataFrame({
                "ibegin": [x[0] for x in intersection],
                "iend": [x[1] for x in intersection],
            }),
            dir_write_history_group.iloc[[x[2] for x in intersection]][["begin", "end", "bytes"]].reset_index(drop = True),
        ], axis = 1)
        both.append(pd.DataFrame({
            "fingerprint": fingerprint,
            "nickname": nickname,
            "begin": joint["ibegin"],
            "end": joint["iend"],
            "bytes": joint["bytes"] * (pd.TimedeltaIndex(joint["iend"] - joint["ibegin"]).to_pytimedelta() / pd.TimedeltaIndex(joint["end"] - joint["begin"]).to_pytimedelta()),
        }))
    both = pd.concat(both)

    # Sum by date over all relays.
    dir_write_history_bydate = {
        "date": [],
        "relay_dir_write_hours": [],
        "relay_dir_write_bytes": [],
    }
    dir_stats_bydate = {
        "date": [],
        "relay_dir_stats_hours": [],
        "relay_dir_stats_resp_ok": [],
    }
    both_bydate = {
        "date": [],
        "both_hours": [],
        "both_bytes": [],
    }
    for row in dir_write_history.itertuples():
        for (date, frac_int, _) in common.segment_datetime_interval(row.begin, row.end):
            dir_write_history_bydate["date"].append(date)
            dir_write_history_bydate["relay_dir_write_hours"].append((row.end - row.begin) / datetime.timedelta(hours = 1) * frac_int)
            dir_write_history_bydate["relay_dir_write_bytes"].append(row.bytes * frac_int)
    for row in dir_stats.itertuples():
        for (date, frac_int, _) in common.segment_datetime_interval(row.begin, row.end):
            dir_stats_bydate["date"].append(date)
            dir_stats_bydate["relay_dir_stats_hours"].append((row.end - row.begin) / datetime.timedelta(hours = 1) * frac_int)
            dir_stats_bydate["relay_dir_stats_resp_ok"].append(row.resp_ok * frac_int)
    for row in both.itertuples():
        for (date, frac_int, _) in common.segment_datetime_interval(row.begin, row.end):
            both_bydate["date"].append(date)
            both_bydate["both_hours"].append((row.end - row.begin) / datetime.timedelta(hours = 1) * frac_int)
            both_bydate["both_bytes"].append(row.bytes * frac_int)
    dir_write_history_bydate = (
        pd.DataFrame(dir_write_history_bydate)
            .groupby("date").sum().reset_index()
    )
    dir_stats_bydate = (
        pd.DataFrame(dir_stats_bydate)
            .groupby("date").sum().reset_index()
    )
    both_bydate = (
        pd.DataFrame(both_bydate)
            .groupby("date").sum().reset_index()
    )
    return pd.merge(
        pd.merge(dir_write_history_bydate, dir_stats_bydate, on = ["date"], how = "outer"),
        both_bydate, on = ["date"], how = "outer",
    )

def process_file(f):
    with stem.descriptor.reader.DescriptorReader([f]) as reader:
        return process_relay_extra_infos(reader)

if __name__ == "__main__":
    _, inputs = getopt.gnu_getopt(sys.argv[1:], "")
    with multiprocessing.Pool(common.NUM_PROCESSES) as pool:
        (
            pd.concat(pool.imap_unordered(process_file, inputs))
                .groupby("date").sum().reset_index()
        ).to_csv(sys.stdout, index = False, float_format = "%.2f", columns = [
            "date",
            "relay_dir_write_hours",
            "relay_dir_write_bytes",
            "relay_dir_stats_hours",
            "relay_dir_stats_resp_ok",
            "both_hours",
            "both_bytes",
        ])
date,relay_dir_write_hours,relay_dir_write_bytes,relay_dir_stats_hours,relay_dir_stats_resp_ok,both_hours,both_bytes
2022-03-20,82.91,1736713782.47,0.00,0.00,0.00,0.00
2022-03-21,421.90,6437321840.55,0.00,0.00,0.00,0.00
2022-03-22,970.53,8279352647.53,0.00,0.00,0.00,0.00
2022-03-23,1579.50,13942016383.53,0.00,0.00,0.00,0.00
2022-03-24,2558.18,16947613780.65,0.00,0.00,0.00,0.00
2022-03-25,5265.61,31445067926.07,0.00,0.00,0.00,0.00
2022-03-26,49073.66,5239496653355.36,0.00,0.00,0.00,0.00
2022-03-27,158442.15,20979790952547.85,0.00,0.00,0.00,0.00
2022-03-28,172902.40,23790345564006.61,0.00,0.00,0.00,0.00
2022-03-29,175311.71,23872774104777.91,0.00,0.00,0.00,0.00
2022-03-30,176491.41,24071621689387.25,27273.67,5126398.60,27235.07,5104977982724.75
2022-03-31,177534.60,23529414875973.20,109920.94,19643266.92,109835.57,18734199905639.44
2022-04-01,177638.09,22439702932089.59,125491.45,21001365.60,125400.58,19561513440430.52
2022-04-02,177466.08,21760791688019.58,125685.84,20363465.06,125504.89,19162123059384.01
2022-04-03,177717.51,22650212443851.81,127008.39,20455819.55,126854.33,20044713521642.67
2022-04-04,177559.24,24329589093181.60,126412.11,21929666.19,126277.69,21407202944760.75
2022-04-05,177585.25,24395314853928.89,125388.58,22312434.77,125294.95,21462939747109.41
2022-04-06,177470.40,23918457487092.34,127077.40,22067464.65,126915.31,21102579463539.30
2022-04-07,177210.36,24768051969233.11,127814.89,21760745.43,127713.06,21811098715725.13
2022-04-08,176879.16,25187290225432.38,127697.28,21309827.80,127539.29,22087431800503.34
2022-04-09,176262.96,23593325365455.48,126498.62,20655939.77,126337.55,20627973646625.13
2022-04-10,175357.06,22622047200557.93,125273.01,20220477.13,125078.36,20043119503663.02
2022-04-11,173470.74,25317282391208.27,125630.57,21293321.28,125232.90,22026316726563.52
2022-04-12,162104.32,24604704321104.66,121130.54,20856377.21,116487.69,20913938468593.55
2022-04-13,64924.25,9802425975580.58,50547.50,8701435.60,33860.77,6183056411127.21
2022-04-14,192.04,30127318662.96,266.81,39914.44,49.49,3116201356.98
#!/usr/bin/env python3

import getopt
import sys

import numpy as np
import pandas as pd

if __name__ == "__main__":
    _, (relay_uptime_csv_filename, relay_dir_csv_filename) = getopt.gnu_getopt(sys.argv[1:], "")
    relay_uptime = pd.read_csv(relay_uptime_csv_filename)
    relay_dir = pd.read_csv(relay_dir_csv_filename)
    j = (
        pd.merge(relay_uptime, relay_dir, on = "date", how = "inner")
            .rename(columns = {
                "relay_uptime_hours": "n(N)",
                "relay_dir_write_hours": "n(H)",
                "relay_dir_write_bytes": "h(H)",
                "relay_dir_stats_hours": "n(R)",
                "both_hours": "n(R∧H)",
                "both_bytes": "h(R∧H)",
            })
    )
    j["n(R\\H)"] = j["n(R)"] - j["n(R∧H)"]
    # Uncomment these to use the formulas for n(R\H) and h(R∧H) from
    # https://metrics.torproject.org/reproducible-metrics.html#relay-users
    # j["n(R\\H)"] = np.maximum(0, j["n(R)"] - j["n(H)"])
    # j["h(R∧H)"] = np.minimum(j["n(R)"], j["n(H)"]) / np.maximum(j["n(R)"], j["n(H)"]) * j["h(H)"]
    j["frac"] = (j["h(R∧H)"] * j["n(H)"] + j["h(H)"] * j["n(R\\H)"]) / (j["h(H)"] * j["n(N)"])
    print(j[[
        "date",
        "n(N)",
        "n(H)",
        "h(H)",
        "n(R)",
        "n(R∧H)",
        "h(R∧H)",
        "n(R\H)",
        "frac",
    ]])
_______________________________________________
tor-dev mailing list
tor-dev@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-dev