[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [sbws/master] new: relaylist: Store relays' consensus timestamps



commit 417ebfa90429d3531e8a5310cf58dff4fc2ff158
Author: juga0 <juga@xxxxxxxxxx>
Date:   Fri Feb 15 08:39:05 2019 +0000

    new: relaylist: Store relays' consensus timestamps
    
    for each relay and for the list of relays when fetching new
    network statuses.
    To count the number of times a relay was in a consensus and how
    many consensuses have been seen.
    In the test network router status entries don't have document
    attribute.
    Part of #28566
---
 sbws/core/scanner.py  |   6 +-
 sbws/globals.py       |  10 +++
 sbws/lib/relaylist.py | 178 ++++++++++++++++++++++++++++++++++++++++++++++----
 3 files changed, 179 insertions(+), 15 deletions(-)

diff --git a/sbws/core/scanner.py b/sbws/core/scanner.py
index 6842372..503f9cc 100644
--- a/sbws/core/scanner.py
+++ b/sbws/core/scanner.py
@@ -624,8 +624,10 @@ def run_speedtest(args, conf):
     # Call only once to initialize http_headers
     settings.init_http_headers(conf.get('scanner', 'nickname'), state['uuid'],
                                str(controller.get_version()))
-
-    rl = RelayList(args, conf, controller)
+    # To do not have to pass args and conf to RelayList, pass an extra
+    # argument with the data_period
+    measurements_period = conf.getint('general', 'data_period')
+    rl = RelayList(args, conf, controller, measurements_period)
     cb = CB(args, conf, controller, rl)
     rd = ResultDump(args, conf)
     rp = RelayPrioritizer(args, conf, rl, rd)
diff --git a/sbws/globals.py b/sbws/globals.py
index abd4791..015e857 100644
--- a/sbws/globals.py
+++ b/sbws/globals.py
@@ -84,6 +84,16 @@ MAX_BW_DIFF_PERC = 50
 # Tor already accept lines of any size, but leaving the limit anyway.
 BW_LINE_SIZE = 1022
 
+# RelayList, ResultDump, v3bwfile
+# For how many seconds in the past the relays and measurements data is keep/
+# considered valid.
+# This is currently set by default in config.default.ini as ``date_period``,
+# and used in ResultDump and v3bwfile.
+# In a future refactor, constants in config.default.ini should be moved here,
+# or calculated in settings, so that there's no need to pass the configuration
+# to all the functions.
+MEASUREMENTS_PERIOD = 5 * 24 * 60 * 60
+
 # Metadata to send in every requests, so that data servers can know which
 # scanners are using them.
 # In Requests these keys are case insensitive.
diff --git a/sbws/lib/relaylist.py b/sbws/lib/relaylist.py
index b296c06..bee0a14 100644
--- a/sbws/lib/relaylist.py
+++ b/sbws/lib/relaylist.py
@@ -1,16 +1,52 @@
+import copy
+from datetime import datetime, timedelta
+
 from stem.descriptor.router_status_entry import RouterStatusEntryV3
 from stem.descriptor.server_descriptor import ServerDescriptor
 from stem import Flag, DescriptorUnavailable, ControllerError
 import random
-import time
 import logging
 from threading import Lock
 
+from ..globals import MEASUREMENTS_PERIOD
+
 log = logging.getLogger(__name__)
 
 
+def remove_old_consensus_timestamps(
+        consensus_timestamps, measurements_period=MEASUREMENTS_PERIOD):
+    """
+    Remove the consensus timestamps that are older than period for which
+    the measurements are keep from a list of consensus_timestamps.
+
+    :param list consensus_timestamps:
+    :param int measurements_period:
+    :returns list: a new list of ``consensus_timestamps``
+    """
+    oldest_date = datetime.utcnow() - timedelta(measurements_period)
+    new_consensus_timestamps = \
+        [t for t in consensus_timestamps if t >= oldest_date]
+    return new_consensus_timestamps
+
+
+def valid_after_from_network_statuses(network_statuses):
+    """Obtain the consensus Valid-After datetime from the ``document``
+    attribute of a ``stem.descriptor.RouterStatusEntryV3``.
+
+    :param list network_statuses:
+    returns datetime:
+    """
+    for ns in network_statuses:
+        document = getattr(ns, 'document', None)
+        if document:
+            valid_after = getattr(document, 'valid_after', None)
+            if valid_after:
+                return valid_after
+    return datetime.utcnow().replace(microsecond=0)
+
+
 class Relay:
-    def __init__(self, fp, cont, ns=None, desc=None):
+    def __init__(self, fp, cont, ns=None, desc=None, timestamp=None):
         '''
         Given a relay fingerprint, fetch all the information about a relay that
         sbws currently needs and store it in this class. Acts as an abstraction
@@ -18,6 +54,9 @@ class Relay:
 
         :param str fp: fingerprint of the relay.
         :param cont: active and valid stem Tor controller connection
+
+        :param datatime timestamp: the timestamp of a consensus
+            (RouterStatusEntryV3) from which this relay has been obtained.
         '''
         assert isinstance(fp, str)
         assert len(fp) == 40
@@ -38,6 +77,8 @@ class Relay:
                 self._desc = cont.get_server_descriptor(fp, default=None)
             except (DescriptorUnavailable, ControllerError) as e:
                 log.exception("Exception trying to get desc %s", e)
+        self._consensus_timestamps = []
+        self._add_consensus_timestamp(timestamp)
 
     def _from_desc(self, attr):
         if not self._desc:
@@ -107,6 +148,63 @@ class Relay:
             return None
         return key.rstrip('=')
 
+    @property
+    def consensus_valid_after(self):
+        """Obtain the consensus Valid-After from the document of this relay
+        network status.
+        """
+        network_status_document = self._from_ns('document')
+        if network_status_document:
+            return getattr(network_status_document, 'valid_after', None)
+        return None
+
+    @property
+    def last_consensus_timestamp(self):
+        if len(self._consensus_timestamps) >= 1:
+            return self._consensus_timestamps[-1]
+        return None
+
+    def _add_consensus_timestamp(self, timestamp=None):
+        """Add the consensus timestamp in which this relay is present.
+        """
+        # It is possible to access to the relay's consensensus Valid-After
+        if self.consensus_valid_after is not None:
+            # The consensus timestamp list was initialized.
+            if self.last_consensus_timestamp is not None:
+                # Valid-After is more recent than the most recent stored
+                # consensus timestamp.
+                if self.consensus_valid_after > self.last_consensus_timestamp:
+                    # Add Valid-After
+                    self._consensus_timestamps.append(
+                        self.consensus_valid_after
+                        )
+            # The consensus timestamp list was not initialized.
+            else:
+                # Add Valid-After
+                self._consensus_timestamps.append(self.consensus_valid_after)
+        # If there was already a list the timestamp arg is more recent than
+        # the most recent timestamp stored,
+        elif (self.last_consensus_timestamp is not None
+              and timestamp > self.last_consensus_timestamp):
+            # Add the arg timestamp.
+            self._consensus_timestamps.append(timestamp)
+        # In any other case
+        else:
+            # Add the current datetime
+            self._consensus_timestamps.append(
+                datetime.utcnow().replace(microsecond=0))
+
+    def _remove_old_consensus_timestamps(
+            self, measurements_period=MEASUREMENTS_PERIOD):
+        self._consensus_timestamps = \
+            remove_old_consensus_timestamps(
+                copy.deepcopy(self._consensus_timestamps, measurements_period)
+                )
+
+    def update_consensus_timestamps(self, timestamp=None):
+        self._add_consensus_timestamp(timestamp)
+        self._remove_old_consensus_timestamps()
+
     def can_exit_to_port(self, port):
         """
         Returns True if the relay has an exit policy and the policy accepts
@@ -129,16 +227,39 @@ class RelayList:
     transparently in the background. Provides useful interfaces for getting
     only relays of a certain type.
     '''
-    REFRESH_INTERVAL = 300  # seconds
 
-    def __init__(self, args, conf, controller):
+    def __init__(self, args, conf, controller,
+                 measurements_period=MEASUREMENTS_PERIOD):
         self._controller = controller
         self.rng = random.SystemRandom()
         self._refresh_lock = Lock()
+        # To track all the consensus seen.
+        self._consensus_timestamps = []
+        # Initialize so that there's no error trying to access to it.
+        # In future refactor, change to a dictionary, where the keys are
+        # the relays' fingerprint.
+        self._relays = []
+        # The period of time for which the measurements are keep.
+        self._measurements_period = measurements_period
         self._refresh()
 
     def _need_refresh(self):
-        return time.time() >= self._last_refresh + self.REFRESH_INTERVAL
+        # New consensuses happen every hour.
+        return datetime.utcnow() >= \
+            self.last_consensus_timestamp + timedelta(seconds=60*60)
+
+    @property
+    def last_consensus_timestamp(self):
+        """Returns the datetime when the last consensus was obtained."""
+        if (getattr(self, "_consensus_timestamps")
+                and self._consensus_timestamps):
+            return self._consensus_timestamps[-1]
+        # If the object was not created from __init__, it won't have
+        # consensus_timestamps attribute or it might be empty.
+        # In this case force new update.
+        # Anytime more than 1h in the past will be old.
+        self._consensus_timestamps = []
+        return datetime.utcnow() - timedelta(seconds=60*61)
 
     @property
     def relays(self):
@@ -197,19 +318,50 @@ class RelayList:
     def _relays_without_flag(self, flag):
         return [r for r in self.relays if flag not in r.flags]
 
+    def _remove_old_consensus_timestamps(self):
+        self._consensus_timestamps = remove_old_consensus_timestamps(
+            copy.deepcopy(self._consensus_timestamps),
+            self._measurements_period
+            )
+
     def _init_relays(self):
+        """Returns a new list of relays that are in the current consensus.
+        And update the consensus timestamp list with the current one.
+
+        """
         c = self._controller
-        try:
-            relays = [Relay(ns.fingerprint, c, ns=ns)
-                      for ns in c.get_network_statuses()]
-        except ControllerError as e:
-            log.exception("Exception trying to init relays %s", e)
-            return []
-        return relays
+        # This will get router statuses from this Tor cache, might not be
+        # updated with the network.
+        # Change to stem.descriptor.remote in future refactor.
+        network_statuses = c.get_network_statuses()
+        new_relays_dict = dict([(r.fingerprint, r) for r in network_statuses])
+
+        # Find the timestamp of the last consensus.
+        timestamp = valid_after_from_network_statuses(network_statuses)
+        self._consensus_timestamps.append(timestamp)
+        self._remove_old_consensus_timestamps()
+        # Update the relays that were in the previous consensus with the
+        # new timestamp
+        new_relays = []
+        relays = copy.deepcopy(self._relays)
+        for r in relays:
+            if r.fingerprint in new_relays_dict.keys():
+                r.update_consensus_timestamps(timestamp)
+                new_relays_dict.pop(r.fingerprint)
+                new_relays.append(r)
+
+        # Add the relays that were not in the previous consensus
+        # If there was an relay in some older previous consensus,
+        # it won't get stored, so its previous consensuses are lost,
+        # but probably this is fine for now to don't make it more complicated.
+        for fp, ns in new_relays_dict.items():
+            r = Relay(ns.fingerprint, c, ns=ns, timestamp=timestamp)
+            new_relays.append(r)
+        return new_relays
 
     def _refresh(self):
+        # Set a new list of relays.
         self._relays = self._init_relays()
-        self._last_refresh = time.time()
 
     def exits_not_bad_allowing_port(self, port):
         return [r for r in self.exits



_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits