[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [bridgedb/develop] Parse bridge blocking info from SQL database.



commit 5e0ed0af43e876c3305ab75c782ac67b51ea1eb9
Author: Philipp Winter <phw@xxxxxxxxxxxxxx>
Date:   Wed Jul 8 20:23:53 2020 +0000

    Parse bridge blocking info from SQL database.
    
    The plan currently is that wolpertinger will populate our SQL database
    with bridge blocking info provided by OONI.  This patch adds code that
    parses this data and adds it to our existing bridge objects.
    
    This fixes tpo/anti-censorship/bridgedb#34260.
---
 CHANGELOG                     |   3 +
 bridgedb/Storage.py           | 138 ++++++++++++++++++++++++++++++++++++-
 bridgedb/main.py              |  26 +++++++
 bridgedb/test/test_Storage.py | 157 ++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 321 insertions(+), 3 deletions(-)

diff --git a/CHANGELOG b/CHANGELOG
index 0351de7..10012ce 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -1,3 +1,6 @@
+    * FIXES https://bugs.torproject.org/34260
+    Parse bridge blocking information from SQL database.
+
     * FIXES https://gitlab.torproject.org/tpo/anti-censorship/bridgedb/-/issues/40001
     Remove the --reload command line switch.  It doesn't actually do anything.
 
diff --git a/bridgedb/Storage.py b/bridgedb/Storage.py
index 2859cf1..4ae3c9e 100644
--- a/bridgedb/Storage.py
+++ b/bridgedb/Storage.py
@@ -12,6 +12,7 @@ from functools import wraps
 from ipaddr import IPAddress
 from contextlib import contextmanager
 import sys
+import datetime
 
 from bridgedb.Stability import BridgeHistory
 import threading
@@ -19,6 +20,7 @@ import threading
 toHex = binascii.b2a_hex
 fromHex = binascii.a2b_hex
 HEX_ID_LEN = 40
+BRIDGE_REACHABLE, BRIDGE_BLOCKED = 0, 1
 
 def _escapeValue(v):
     return "'%s'" % v.replace("'", "''")
@@ -68,7 +70,7 @@ SCHEMA2_SCRIPT = """
 
  CREATE INDEX EmailedBridgesWhenMailed on EmailedBridges ( email );
 
- CREATE TABLE BlockedBridges (
+ CREATE TABLE BridgeMeasurements (
      id INTEGER PRIMARY KEY NOT NULL,
      hex_key,
      bridge_type,
@@ -77,10 +79,11 @@ SCHEMA2_SCRIPT = """
      blocking_country,
      blocking_asn,
      measured_by,
-     last_measured
+     last_measured,
+     verdict INTEGER
  );
 
- CREATE INDEX BlockedBridgesBlockingCountry on BlockedBridges(hex_key);
+ CREATE INDEX BlockedBridgesBlockingCountry on BridgeMeasurements(hex_key);
 
  CREATE TABLE WarnedEmails (
      email PRIMARY KEY NOT NULL,
@@ -242,6 +245,34 @@ class Database(object):
 
         return retBridges
 
+    def getBlockedBridges(self):
+        """Return a dictionary of bridges that are blocked.
+
+        :rtype: dict
+        :returns: A dictionary that maps bridge fingerprints (as strings) to a
+            three-tuple that captures its blocking state: (country,  address,
+            port).
+        """
+        ms = self.__fetchBridgeMeasurements()
+        return getBlockedBridgesFromSql(ms)
+
+    def __fetchBridgeMeasurements(self):
+        """Return all bridge measurement rows from the last three years.
+
+        We limit our search to three years for performance reasons because the
+        bridge measurement table keeps growing and therefore slowing down
+        queries.
+
+        :rtype: list
+        :returns: A list of tuples.
+        """
+        cur = self._cur
+        old_year = datetime.datetime.utcnow() - datetime.timedelta(days=365*3)
+        cur.execute("SELECT * FROM BridgeMeasurements WHERE last_measured > "
+                    "'%s' ORDER BY blocking_country DESC" %
+                    old_year.strftime("%Y-%m-%d"))
+        return cur.fetchall()
+
     def getBridgesForDistributor(self, distributor):
         """Return a list of BridgeData value classes of all bridges in the
            database that are allocated to distributor 'distributor'
@@ -352,6 +383,107 @@ _LOCKED = 0
 _OPENED_DB = None
 _REFCOUNT = 0
 
+class BridgeMeasurement(object):
+    def __init__(self, id, fingerprint, bridge_type, address, port,
+            country, asn, measured_by, last_measured, verdict):
+        self.fingerprint = fingerprint
+        self.country = country
+        self.address = address
+        self.port = port
+        try:
+            self.date = datetime.datetime.strptime(last_measured, "%Y-%m-%d")
+        except ValueError:
+            logging.error("Could not convert SQL date string '%s' to "
+                            "datetime object." % last_measured)
+            self.date = datetime.datetime(1970, 1, 1, 0, 0)
+        self.verdict = verdict
+
+    def compact(self):
+        return (self.country, self.address, self.port)
+
+    def __contains__(self, item):
+        return (self.country == item.country and
+                self.address == item.address and
+                self.port == item.port)
+
+    def newerThan(self, other):
+        return self.date > other.date
+
+    def conflicts(self, other):
+        return (self.verdict != other.verdict and
+                self.country == other.country and
+                self.address == other.address and
+                self.port == other.port)
+
+def getBlockedBridgesFromSql(sql_rows):
+    """Return a dictionary that maps bridge fingerprints to a list of
+    bridges that are known to be blocked somewhere.
+
+    :param list sql_rows: A list of tuples.  Each tuple represents an SQL row.
+    :rtype: dict
+    :returns: A dictionary that maps bridge fingerprints (as strings) to a
+        three-tuple that captures its blocking state: (country,  address,
+        port).
+    """
+    # Separately keep track of measurements that conclude that a bridge is
+    # blocked or reachable.
+    blocked = {}
+    reachable = {}
+
+    def _shouldSkip(m1):
+        """Return `True` if we can skip this measurement."""
+        # Use our 'reachable' dictionary if our original measurement says that
+        # a bridge is blocked, and vice versa.  The purpose is to process
+        # measurements that are possibly conflicting with the one at hand.
+        d = reachable if m1.verdict == BRIDGE_BLOCKED else blocked
+        maybe_conflicting = d.get(m1.fingerprint, None)
+        if maybe_conflicting is None:
+            # There is no potentially conflicting measurement.
+            return False
+
+        for m2 in maybe_conflicting:
+            if m1.compact() != m2.compact():
+                continue
+            # Conflicting measurement.  If m2 is newer than m1, we believe m2.
+            if m2.newerThan(m1):
+                return True
+            # Conflicting measurement.  If m1 is newer than m2, we believe m1,
+            # and remove m1.
+            if m1.newerThan(m2):
+                d[m1.fingerprint].remove(m2)
+                # If we're left with an empty list, get rid of the dictionary
+                # key altogether.
+                if len(d[m1.fingerprint]) == 0:
+                    del d[m1.fingerprint]
+                return False
+        return False
+
+    for fields in sql_rows:
+        m = BridgeMeasurement(*fields)
+        if _shouldSkip(m):
+            continue
+
+        d = blocked if m.verdict == BRIDGE_BLOCKED else reachable
+        other_measurements = d.get(m.fingerprint, None)
+        if other_measurements is None:
+            # We're dealing with the first "blocked" or "reachable" measurement
+            # for the given bridge fingerprint.
+            d[m.fingerprint] = [m]
+        else:
+            # Do we have an existing measurement that agrees with the given
+            # measurement?
+            if m in other_measurements:
+                d[m.fingerprint] = [m if m.compact() == x.compact() and
+                                    m.newerThan(x) else x for x in other_measurements]
+            # We're dealing with a new measurement.  Add it to the list.
+            else:
+                d[m.fingerprint] = other_measurements + [m]
+
+    # Compact-ify the measurements in our dictionary.
+    for k, v in blocked.items():
+        blocked[k] = [i.compact() for i in v]
+    return blocked
+
 def clearGlobalDB():
     """Start from scratch.
 
diff --git a/bridgedb/main.py b/bridgedb/main.py
index 44d0668..8b851a3 100644
--- a/bridgedb/main.py
+++ b/bridgedb/main.py
@@ -308,6 +308,31 @@ def createBridgeRings(cfg, proxyList, key):
 
     return hashring, emailDistributor, ipDistributor, moatDistributor
 
+def loadBlockedBridges(hashring):
+    """Load bridge blocking info from our SQL database and add it to bridge
+    objects."""
+
+    blockedBridges = {}
+    with bridgedb.Storage.getDB() as db:
+        blockedBridges = db.getBlockedBridges()
+
+    num_blocked = 0
+    for name, ring in hashring.ringsByName.items():
+        if name == "unallocated":
+            continue
+        for _, bridge in ring.bridges.items():
+            l = []
+            try:
+                l = blockedBridges[bridge.fingerprint]
+            except KeyError:
+                continue
+            for blocking_country, address, port in l:
+                bridge.setBlockedIn(blocking_country, address, port)
+            num_blocked += 1
+
+    logging.info("Loaded blocking info for %d bridges.".format(num_blocked))
+
+
 def run(options, reactor=reactor):
     """This is BridgeDB's main entry point and main runtime loop.
 
@@ -441,6 +466,7 @@ def run(options, reactor=reactor):
         logging.info("Reparsing bridge descriptors...")
         load(state, hashring, clear=False)
         logging.info("Bridges loaded: %d" % len(hashring))
+        loadBlockedBridges(hashring)
 
         if emailDistributorTmp is not None:
             emailDistributorTmp.prepopulateRings() # create default rings
diff --git a/bridgedb/test/test_Storage.py b/bridgedb/test/test_Storage.py
index 720afdd..a1eb7bd 100644
--- a/bridgedb/test/test_Storage.py
+++ b/bridgedb/test/test_Storage.py
@@ -4,6 +4,7 @@
 import os
 import threading
 import time
+import datetime
 
 from twisted.python import log
 from twisted.trial import unittest
@@ -11,6 +12,8 @@ from twisted.internet import reactor
 from twisted.internet.threads import deferToThread
 
 import bridgedb.Storage as Storage
+import bridgedb.main as main
+from bridgedb.bridges import Bridge
 
 from bridgedb.test.util import generateFakeBridges
 
@@ -113,3 +116,157 @@ class DatabaseTest(unittest.TestCase):
         with Storage.getDB() as db:
             ringname = db.getBridgeDistributor(bridge, self.validRings)
             self.assertEqual(ringname, "unallocated")
+
+    def test_BridgeMeasurementComparison(self):
+        m1 = Storage.BridgeMeasurement(0, "", "", "", "", "", "", "",
+                                       "2020-06-17", 0)
+        m2 = Storage.BridgeMeasurement(0, "", "", "", "", "", "", "",
+                                       "2020-06-18", 0)
+        self.assertTrue(m2.newerThan(m1))
+        self.assertFalse(m1.newerThan(m2))
+        self.assertFalse(m1.newerThan(m1))
+
+    def test_BridgeMeasurementCompact(self):
+        m = Storage.BridgeMeasurement(0, "FINGERPRINT", "obfs4", "1.2.3.4",
+                                      "1234", "ru", "1234", "ooni",
+                                      "2020-06-17", 0)
+        self.assertEquals(m.compact(), ("ru", "1.2.3.4", "1234"))
+
+    def test_fetchBridgeMeasurements(self):
+
+        query = "INSERT INTO BridgeMeasurements (hex_key, bridge_type, " \
+                "address, port, blocking_country, blocking_asn, " \
+                "measured_by, last_measured, verdict) VALUES ('key', " \
+                "'obfs4', '1.2.3.4', '1234', 'RU', '1234', 'OONI', '%s', 1)"
+        oldMsmt = query % "2017-01-01"
+        newMsmt = query % datetime.datetime.utcnow().strftime("%Y-%m-%d")
+
+        Storage.initializeDBLock()
+        with Storage.getDB() as db:
+            db._cur.execute(oldMsmt)
+            # We're calling _Database__fetchBridgeMeasurements instead of
+            # __fetchBridgeMeasurements to account for Python's name meddling.
+            rows = db._Database__fetchBridgeMeasurements()
+            # Outdated measurements should not be returned.
+            self.assertEquals(len(rows), 0)
+
+            db._cur.execute(newMsmt)
+            rows = db._Database__fetchBridgeMeasurements()
+            # Measurements that are "young enough" should be returned.
+            self.assertEquals(len(rows), 1)
+
+    def test_main_loadBlockedBridges(self):
+        Storage.initializeDBLock()
+
+        # Mock configuration object that we use to initialize our bridge rings.
+        class Cfg(object):
+            def __init__(self):
+                self.FORCE_PORTS = [(443, 1)]
+                self.FORCE_FLAGS = [("Stable", 1)]
+                self.MOAT_DIST = False
+                self.HTTPS_DIST = True
+                self.HTTPS_SHARE = 10
+                self.N_IP_CLUSTERS = 1
+                self.EMAIL_DIST = False
+                self.RESERVED_SHARE = 0
+
+        bridge = self.fakeBridges[0]
+        addr, port, _ = bridge.orAddresses[0]
+        cc= "de"
+
+        # Mock object that we use to simulate a database connection.
+        class DummyDB(object):
+            def __init__(self):
+                pass
+            def __enter__(self):
+                return self
+            def __exit__(self, type, value, traceback):
+                pass
+            def getBlockedBridges(self):
+                return {bridge.fingerprint: [(cc, addr, port)]}
+            def getBridgeDistributor(self, bridge, validRings):
+                return "https"
+            def insertBridgeAndGetRing(self, bridge, setRing, seenAt, validRings, defaultPool="unallocated"):
+                return "https"
+            def commit(self):
+                pass
+
+        oldObj = Storage.getDB
+        Storage.getDB = DummyDB
+
+        hashring, _, _, _ = main.createBridgeRings(Cfg(), None, b'key')
+        hashring.insert(bridge)
+
+        self.assertEqual(len(hashring), 1)
+        self.assertFalse(bridge.isBlockedIn(cc))
+        self.assertFalse(bridge.isBlockedIn("ab"))
+        self.assertFalse(bridge.addressIsBlockedIn(cc, addr, port))
+
+        main.loadBlockedBridges(hashring)
+
+        self.assertTrue(bridge.isBlockedIn(cc))
+        self.assertFalse(bridge.isBlockedIn("ab"))
+        self.assertTrue(bridge.addressIsBlockedIn(cc, addr, port))
+
+        Storage.getDB = oldObj
+
+    def test_getBlockedBridgesFromSql(self):
+
+        elems = [(0, "0000000000000000000000000000000000000000", "obfs4",
+                  "1.2.3.4", "1234", "ru", "4321", "ooni", "2020-06-17",
+                  Storage.BRIDGE_BLOCKED),
+                 (1, "1111111111111111111111111111111111111111", "obfs4",
+                  "1.2.3.4", "1234", "ru", "4321", "ooni", "2020-06-01",
+                  Storage.BRIDGE_BLOCKED),
+                 (2, "1111111111111111111111111111111111111111", "obfs4",
+                  "1.2.3.4", "4321", "ru", "4321", "ooni", "2020-06-01",
+                  Storage.BRIDGE_BLOCKED),
+                 (3, "1111111111111111111111111111111111111111", "obfs4",
+                  "1.2.3.4", "4321", "ru", "4321", "ooni", "2020-05-01",
+                  Storage.BRIDGE_REACHABLE)]
+        b = Storage.getBlockedBridgesFromSql(elems)
+        self.assertEqual(b, {"0000000000000000000000000000000000000000":
+                             [("ru", "1.2.3.4", "1234")],
+                             "1111111111111111111111111111111111111111":
+                             [("ru", "1.2.3.4", "1234"),
+                              ("ru", "1.2.3.4", "4321")]})
+
+        # If multiple measurements disagree, we believe the newest one.
+        elems = [(0, "0000000000000000000000000000000000000000", "obfs4",
+                  "1.2.3.4", "1234", "ru", "4321", "ooni", "2020-06-17",
+                  Storage.BRIDGE_BLOCKED),
+                 (1, "0000000000000000000000000000000000000000", "obfs4",
+                  "1.2.3.4", "1234", "ru", "4321", "ooni", "2020-06-01",
+                  Storage.BRIDGE_REACHABLE)]
+        b = Storage.getBlockedBridgesFromSql(elems)
+        self.assertEqual(b, {"0000000000000000000000000000000000000000":
+                             [("ru", "1.2.3.4", "1234")]})
+
+        elems = [(0, "0000000000000000000000000000000000000000", "obfs4",
+                  "1.2.3.4", "1234", "ru", "4321", "ooni", "2020-06-01",
+                  Storage.BRIDGE_BLOCKED),
+                 (1, "0000000000000000000000000000000000000000", "obfs4",
+                  "1.2.3.4", "1234", "ru", "4321", "ooni", "2020-06-17",
+                  Storage.BRIDGE_REACHABLE)]
+        b = Storage.getBlockedBridgesFromSql(elems)
+        self.assertTrue(len(b) == 0)
+
+        # Element ordering must not affect the outcome.
+        elems = [(1, "0000000000000000000000000000000000000000", "obfs4",
+                  "1.2.3.4", "1234", "ru", "4321", "ooni", "2020-06-17",
+                  Storage.BRIDGE_REACHABLE),
+                 (0, "0000000000000000000000000000000000000000", "obfs4",
+                  "1.2.3.4", "1234", "ru", "4321", "ooni", "2020-06-01",
+                  Storage.BRIDGE_BLOCKED)]
+        b = Storage.getBlockedBridgesFromSql(elems)
+        self.assertTrue(len(b) == 0)
+
+        # Redundant measurements should be discarded.
+        elems = [(1, "0000000000000000000000000000000000000000", "obfs4",
+                  "1.2.3.4", "1234", "ru", "4321", "ooni", "2020-06-17",
+                  Storage.BRIDGE_BLOCKED),
+                 (0, "0000000000000000000000000000000000000000", "obfs4",
+                  "1.2.3.4", "1234", "ru", "4321", "ooni", "2020-06-01",
+                  Storage.BRIDGE_BLOCKED)]
+        b = Storage.getBlockedBridgesFromSql(elems)
+        self.assertTrue(len(b) == 1)



_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits