[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [sbws/m12] chg: scanner: Move to concurrent.futures
commit e11faefd010ca0447f10d79ca9da1ef71f37a1d9
Author: juga0 <juga@xxxxxxxxxx>
Date: Sat Jun 19 16:07:20 2021 +0000
chg: scanner: Move to concurrent.futures
away from multiprocessing, because it looks like we hit python bug
22393, in which the pool hangs forever when a worker process dies.
We don't know the reason why a worker process might due, maybe oom.
See https://stackoverflow.com/questions/65115092/occasional-deadlock-in-multiprocessing-pool,
We also run into several other issues in the past with multiprocessing.
Concurrent.futures has a simpler API and is more modern.
Closes #40092.
---
sbws/core/scanner.py | 370 +++++++++++++++++-----------------------
sbws/globals.py | 1 -
setup.cfg | 1 +
tests/unit/conftest.py | 7 +
tests/unit/core/test_scanner.py | 83 ++++++++-
5 files changed, 241 insertions(+), 221 deletions(-)
diff --git a/sbws/core/scanner.py b/sbws/core/scanner.py
index 7637028..bb723bf 100644
--- a/sbws/core/scanner.py
+++ b/sbws/core/scanner.py
@@ -1,4 +1,5 @@
""" Measure the relays. """
+import concurrent.futures
import logging
import os
import queue
@@ -10,17 +11,10 @@ import time
import traceback
import uuid
from argparse import ArgumentDefaultsHelpFormatter
-from multiprocessing.context import TimeoutError
-from multiprocessing.dummy import Pool
import sbws.util.requests as requests_utils
import sbws.util.stem as stem_utils
-from sbws.globals import (
- HTTP_GET_HEADERS,
- SOCKET_TIMEOUT,
- TIMEOUT_MEASUREMENTS,
- fail_hard,
-)
+from sbws.globals import HTTP_GET_HEADERS, SOCKET_TIMEOUT, fail_hard
from .. import settings
from ..lib.circuitbuilder import GapsCircuitBuilder as CB
@@ -47,7 +41,6 @@ rng = random.SystemRandom()
log = logging.getLogger(__name__)
# Declare the objects that manage the threads global so that sbws can exit
# gracefully at any time.
-pool = None
rd = None
controller = None
@@ -58,13 +51,10 @@ traceback."""
def stop_threads(signal, frame, exit_code=0):
- global rd, pool
+ global rd
log.debug("Stopping sbws.")
# Avoid new threads to start.
settings.set_end_event()
- # Stop Pool threads
- pool.close()
- pool.join()
# Stop ResultDump thread
rd.thread.join()
# Stop Tor thread
@@ -609,51 +599,48 @@ def _next_expected_amount(
return expected_amount
-def result_putter(result_dump):
- """Create a function that takes a single argument -- the measurement
- result -- and return that function so it can be used by someone else"""
-
- def closure(measurement_result):
- # Since result_dump thread is calling queue.get() every second,
- # the queue should be full for only 1 second.
- # This call blocks at maximum timeout seconds.
- try:
- result_dump.queue.put(measurement_result, timeout=3)
- except queue.Full:
- # The result would be lost, the scanner will continue working.
- log.warning(
- "The queue with measurements is full, when adding %s.\n"
- "It is possible that the thread that get them to "
- "write them to the disk (ResultDump.enter) is stalled.",
- measurement_result,
- )
-
- return closure
-
+def result_putter(result_dump, measurement):
+ # Since result_dump thread is calling queue.get() every second,
+ # the queue should be full for only 1 second.
+ # This call blocks at maximum timeout seconds.
+ try:
+ result_dump.queue.put(measurement, timeout=3)
+ except queue.Full:
+ # The result would be lost, the scanner will continue working.
+ log.warning(
+ "The queue with measurements is full, when adding %s.\n"
+ "It is possible that the thread that get them to "
+ "write them to the disk (ResultDump.enter) is stalled.",
+ measurement,
+ )
-def result_putter_error(target):
- """Create a function that takes a single argument -- an error from a
- measurement -- and return that function so it can be used by someone else
- """
- def closure(object):
- if settings.end_event.is_set():
- return
- # The only object that can be here if there is not any uncatched
- # exception is stem.SocketClosed when stopping sbws
- # An exception here means that the worker thread finished.
- log.warning(FILLUP_TICKET_MSG)
- # To print the traceback that happened in the thread, not here in
- # the main process.
- log.warning(
- "".join(
- traceback.format_exception(
- type(object), object, object.__traceback__
- )
+def result_putter_error(target, exception):
+ print("in result putter error")
+ if settings.end_event.is_set():
+ return
+ # The only object that can be here if there is not any uncatched
+ # exception is stem.SocketClosed when stopping sbws
+ # An exception here means that the worker thread finished.
+ log.warning(FILLUP_TICKET_MSG)
+ # To print the traceback that happened in the thread, not here in
+ # the main process.
+ log.warning(
+ "".join(
+ traceback.format_exception(
+ type(exception), exception, exception.__traceback__
)
)
-
- return closure
+ )
+ log.debug(
+ "".join(
+ target.fingerprint,
+ target.nickname,
+ traceback.format_exception(
+ type(exception), exception, exception.__traceback__
+ ),
+ )
+ )
def main_loop(
@@ -666,41 +653,23 @@ def main_loop(
relay_prioritizer,
destinations,
):
- """Starts and reuse the threads that measure the relays forever.
+ r"""Create the queue of future measurements for every relay to measure.
It starts a loop that will be run while there is not and event signaling
that sbws is stopping (because of SIGTERM or SIGINT).
- Then, it starts a second loop with an ordered list (generator) of relays
- to measure that might a subset of all the current relays in the Network.
+ Then the ``ThreadPoolExecutor`` (executor) queues all the relays to
+ measure in ``Future`` objects. These objects have an ``state``.
- For every relay, it starts a new thread which runs ``measure_relay`` to
- measure the relay until there are ``max_pending_results`` threads.
+ The executor starts a new thread for every relay to measure, which runs
+ ``measure_relay`` until there are ``max_pending_results`` threads.
After that, it will reuse a thread that has finished for every relay to
measure.
- It is the the pool method ``apply_async`` which starts or reuse a thread.
- This method returns an ``ApplyResult`` immediately, which has a ``ready``
- methods that tells whether the thread has finished or not.
- When the thread finish, ie. ``ApplyResult`` is ``ready``, it triggers
- ``result_putter`` callback, which put the ``Result`` in ``ResultDump``
- queue and complete immediately.
-
- ``ResultDump`` thread (started before and out of this function) will get
- the ``Result`` from the queue and write it to disk, so this doesn't block
- the measurement threads.
-
- If there was an exception not caught by ``measure_relay``, it will call
- instead ``result_putter_error``, which logs the error and complete
- immediately.
-
- Before the outer loop iterates, it waits (non blocking) that all
- the ``Results`` are ready calling ``wait_for_results``.
- This avoid to start measuring the same relay which might still being
- measured.
+ Then ``wait_for_results`` is call, to obtain the results in the completed
+ ``future``\s.
"""
- global pool
log.info("Started the main loop to measure the relays.")
hbeat = Heartbeat(conf.getpath("paths", "state_fname"))
@@ -710,59 +679,57 @@ def main_loop(
while not settings.end_event.is_set():
log.debug("Starting a new measurement loop.")
num_relays = 0
- # Since loop might finish before pending_results is 0 due waiting too
- # long, set it here and not outside the loop.
- pending_results = []
loop_tstart = time.time()
# Register relay fingerprints to the heartbeat module
hbeat.register_consensus_fprs(relay_list.relays_fingerprints)
-
- for target in relay_prioritizer.best_priority():
- # Don't start measuring a relay if sbws is stopping.
- if settings.end_event.is_set():
- break
- # 40023, disable to decrease state.dat json lines
- # relay_list.increment_recent_measurement_attempt()
- target.increment_relay_recent_measurement_attempt()
- num_relays += 1
- # callback and callback_err must be non-blocking
- callback = result_putter(result_dump)
- callback_err = result_putter_error(target)
- async_result = pool.apply_async(
- dispatch_worker_thread,
- [
+ # num_threads
+ max_pending_results = conf.getint("scanner", "measurement_threads")
+ with concurrent.futures.ThreadPoolExecutor(
+ max_workers=max_pending_results, thread_name_prefix="measurer"
+ ) as executor:
+ log.info("In the executor, queue all future measurements.")
+ # With futures, there's no need for callback, what it was the
+ # callback with multiprocessing library can be just a function
+ # that gets executed when the future result is obtained.
+ pending_results = {
+ executor.submit(
+ dispatch_worker_thread,
args,
conf,
destinations,
circuit_builder,
relay_list,
target,
- ],
- {},
- callback,
- callback_err,
+ ): target
+ for target in relay_prioritizer.best_priority()
+ }
+ log.debug("Measurements queued.")
+ # After the submitting all the targets to the executor, the pool
+ # has queued all the relays and pending_results has the list of all
+ # `Future`s.
+
+ # Each target relay_recent_measurement_attempt is incremented in
+ # `wait_for_results` as well as hbeat measured fingerprints.
+ num_relays = len(pending_results)
+ # Without a callback, it's needed to pass `result_dump` here to
+ # call the function that writes the measurement when it's
+ # finished.
+ wait_for_results(
+ executor,
+ hbeat,
+ result_dump,
+ pending_results,
)
- pending_results.append(async_result)
-
- # Register this measurement to the heartbeat module
- hbeat.register_measured_fpr(target.fingerprint)
-
- log.debug("Measurements queued.")
- # After the for has finished, the pool has queued all the relays
- # and pending_results has the list of all the AsyncResults.
- # It could also be obtained with pool._cache, which contains
- # a dictionary with AsyncResults as items.
- num_relays_to_measure = len(pending_results)
- wait_for_results(num_relays_to_measure, pending_results)
+ force_get_results(pending_results)
# Print the heartbeat message
hbeat.print_heartbeat_message()
loop_tstop = time.time()
loop_tdelta = (loop_tstop - loop_tstart) / 60
- # At this point, we know the relays that were queued to be measured.
- # That does not mean they were actually measured.
+ # At this point, we know the relays that were queued to be
+ # measured.
log.debug(
"Attempted to measure %s relays in %s minutes",
num_relays,
@@ -775,113 +742,88 @@ def main_loop(
stop_threads(signal.SIGTERM, None)
-def wait_for_results(num_relays_to_measure, pending_results):
- """Wait for the pool to finish and log progress.
-
- While there are relays being measured, just log the progress
- and sleep :const:`~sbws.globals.TIMEOUT_MEASUREMENTS` (3mins),
- which is approximately the time it can take to measure a relay in
- the worst case.
-
- When there has not been any relay measured in ``TIMEOUT_MEASUREMENTS``
- and there are still relays pending to be measured, it means there is no
- progress and call :func:`~sbws.core.scanner.force_get_results`.
-
- This can happen in the case of a bug that makes either
- :func:`~sbws.core.scanner.measure_relay`,
- :func:`~sbws.core.scanner.result_putter` (callback) and/or
- :func:`~sbws.core.scanner.result_putter_error` (callback error) stall.
+def wait_for_results(executor, hbeat, result_dump, pending_results):
+ """Obtain the relays' measurements as they finish.
- .. note:: in a future refactor, this could be simpler by:
+ For every ``Future`` measurements that gets completed, obtain the
+ ``result`` and call ``result_putter``, which put the ``Result`` in
+ ``ResultDump`` queue and complete immediately.
- 1. Initializing the pool at the begingging of each loop
- 2. Callling :meth:`~Pool.close`; :meth:`~Pool.join` after
- :meth:`~Pool.apply_async`,
- to ensure no new jobs are added until the pool has finished with all
- the ones in the queue.
-
- As currently, there would be still two cases when the pool could stall:
-
- 1. There's an exception in ``measure_relay`` and another in
- ``callback_err``
- 2. There's an exception ``callback``.
+ ``ResultDump`` thread (started before and out of this function) will get
+ the ``Result`` from the queue and write it to disk, so this doesn't block
+ the measurement threads.
- This could also be simpler by not having callback and callback error in
- ``apply_async`` and instead just calling callback with the
- ``pending_results``.
+ If there was an exception not caught by ``measure_relay``, it will call
+ instead ``result_putter_error``, which logs the error and complete
+ immediately.
- (callback could be also simpler by not having a thread and queue and
- just storing to disk, since the time to write to disk is way smaller
- than the time to request over the network.)
"""
- num_last_measured = 1
- while num_last_measured > 0 and not settings.end_event.is_set():
- log.info(
- "Pending measurements: %s out of %s: ",
- len(pending_results),
- num_relays_to_measure,
- )
- log.info("Last measured: %s", num_last_measured)
- time.sleep(TIMEOUT_MEASUREMENTS)
- old_pending_results = pending_results
- pending_results = [r for r in pending_results if not r.ready()]
- num_last_measured = len(old_pending_results) - len(pending_results)
- if len(pending_results) > 0:
- force_get_results(pending_results)
-
-
-def force_get_results(pending_results):
- """Try to get either the result or an exception, which gets logged.
-
- It is call by :func:`~sbws.core.scanner.wait_for_results` when
- the time waiting for the results was long.
-
- To get either the :class:`~sbws.lib.resultdump.Result` or an exception,
- call :meth:`~AsyncResult.get` with timeout.
- Timeout is low since we already waited.
+ num_relays_to_measure = num_pending_results = len(pending_results)
+ with executor:
+ for future_measurement in concurrent.futures.as_completed(
+ pending_results
+ ):
+ target = pending_results[future_measurement]
+ # 40023, disable to decrease state.dat json lines
+ # relay_list.increment_recent_measurement_attempt()
+ target.increment_relay_recent_measurement_attempt()
- ``get`` is not call before, because it blocks and the callbacks
- are not call.
- """
- global pool
- log.debug("Forcing get")
- # In case there are no finished AsyncResults, print the cache here
- # at level info so that is visible even if debug is not enabled.
- log.info("Pool cache %s", pool._cache)
- for r in pending_results:
- try:
- # HTTP timeout is 10
- result = r.get(timeout=SOCKET_TIMEOUT + 10)
- log.warning("Result %s was not stored, it took too long.", result)
- # TimeoutError is raised when the result is not ready, ie. has not
- # been processed yet
- except TimeoutError:
- log.warning("A result was not stored, it was not ready.")
- # This is the only place where using psutil so far.
- import psutil
-
- log.warning(psutil.Process(os.getpid()).memory_full_info())
- virtualMemoryInfo = psutil.virtual_memory()
- availableMemory = virtualMemoryInfo.available
- log.warning("Memory available %s MB.", availableMemory / 1024 ** 2)
- dumpstacks()
- # If the result raised an exception, `get` returns it,
- # then log any exception so that it can be fixed.
- # This should not happen, since `callback_err` would have been call
- # first.
- except Exception as e:
- log.critical(FILLUP_TICKET_MSG)
- # If the exception happened in the threads, `log.exception` does
- # not have the traceback.
- # Using `format_exception` instead of of `print_exception` to show
- # the traceback in all the log handlers.
- log.warning(
- "".join(
- traceback.format_exception(type(e), e, e.__traceback__)
+ # Register this measurement to the heartbeat module
+ hbeat.register_measured_fpr(target.fingerprint)
+ log.debug(
+ "Future measurement for target %s (%s) is done: %s",
+ target.fingerprint,
+ target.nickname,
+ future_measurement.done(),
+ )
+ try:
+ measurement = future_measurement.result()
+ except Exception as e:
+ result_putter_error(target, e)
+ import psutil
+
+ log.warning(psutil.Process(os.getpid()).memory_full_info())
+ virtualMemoryInfo = psutil.virtual_memory()
+ availableMemory = virtualMemoryInfo.available
+ log.warning(
+ "Memory available %s MB.", availableMemory / 1024 ** 2
)
+ dumpstacks()
+ else:
+ log.info("Measurement ready: %s" % (measurement))
+ result_putter(result_dump, measurement)
+ # `pending_results` has all the initial queued `Future`s,
+ # they don't decrease as they get completed, but we know 1 has be
+ # completed in each loop,
+ num_pending_results -= 1
+ log.info(
+ "Pending measurements: %s out of %s: ",
+ num_pending_results,
+ num_relays_to_measure,
)
+def force_get_results(pending_results):
+ """Wait for last futures to finish, before starting new loop."""
+ log.info("Wait for any remaining measurements.")
+ done, not_done = concurrent.futures.wait(
+ pending_results,
+ timeout=SOCKET_TIMEOUT + 10, # HTTP timeout is 10
+ return_when=concurrent.futures.ALL_COMPLETED,
+ )
+ log.info("Completed futures: %s", len(done))
+ # log.debug([f.__dict__ for f in done])
+ cancelled = [f for f in done if f.cancelled()]
+ if cancelled:
+ log.warning("Cancelled futures: %s", len(cancelled))
+ for f, t in cancelled:
+ log.debug(t.fingerprint)
+ if not_done:
+ log.warning("Not completed futures: %s", len(not_done))
+ for f, t in not_done:
+ log.debug(t.fingerprint)
+
+
def run_speedtest(args, conf):
"""Initializes all the data and threads needed to measure the relays.
@@ -899,7 +841,7 @@ def run_speedtest(args, conf):
Finally, it calls the function that will manage the measurement threads.
"""
- global rd, pool, controller
+ global rd, controller
controller = stem_utils.launch_or_connect_to_tor(conf)
@@ -927,8 +869,6 @@ def run_speedtest(args, conf):
)
if not destinations:
fail_hard(error_msg)
- max_pending_results = conf.getint("scanner", "measurement_threads")
- pool = Pool(max_pending_results)
try:
main_loop(args, conf, controller, rl, cb, rd, rp, destinations)
except KeyboardInterrupt:
diff --git a/sbws/globals.py b/sbws/globals.py
index 6954fd1..d628bf0 100644
--- a/sbws/globals.py
+++ b/sbws/globals.py
@@ -73,7 +73,6 @@ SUPERVISED_USER_CONFIG_PATH = "/etc/sbws/sbws.ini"
SUPERVISED_RUN_DPATH = "/run/sbws/tor"
SOCKET_TIMEOUT = 60 # seconds
-TIMEOUT_MEASUREMENTS = 60 * 3 # 3 minutes
SBWS_SCALE_CONSTANT = 7500
TORFLOW_SCALING = 1
diff --git a/setup.cfg b/setup.cfg
index 88d4613..7f9d7fb 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -49,6 +49,7 @@ test =
isort
; pylint ; when we ever fix all the errors it throughs
pytest
+ pytest-mock
tox
sphinx
doc =
diff --git a/tests/unit/conftest.py b/tests/unit/conftest.py
index 79d8f37..dafbb43 100644
--- a/tests/unit/conftest.py
+++ b/tests/unit/conftest.py
@@ -229,6 +229,13 @@ def conf(sbwshome_empty, tmpdir):
conf = _get_default_config()
conf["paths"]["sbws_home"] = sbwshome_empty
conf["paths"]["state_fpath"] = str(tmpdir.join(".sbws", "state.dat"))
+ conf["destinations"]["local"] = "on"
+ conf["destinations.local"] = {
+ "url": "http://127.0.0.1:28888/sbws.bin",
+ "verify": False,
+ "country": "ZZ",
+ }
+
return conf
diff --git a/tests/unit/core/test_scanner.py b/tests/unit/core/test_scanner.py
index 3f84472..f7ec69d 100644
--- a/tests/unit/core/test_scanner.py
+++ b/tests/unit/core/test_scanner.py
@@ -1,28 +1,101 @@
"""Unit tests for scanner.py."""
+import concurrent.futures
+import logging
+
import pytest
+from freezegun import freeze_time
+
+from sbws.core import scanner
+from sbws.lib.circuitbuilder import CircuitBuilder
+from sbws.lib.destination import DestinationList
+from sbws.lib.heartbeat import Heartbeat
+from sbws.lib.relayprioritizer import RelayPrioritizer
-from sbws.core.scanner import result_putter
+log = logging.getLogger(__name__)
def test_result_putter(sbwshome_only_datadir, result_success, rd, end_event):
if rd is None:
pytest.skip("ResultDump is None")
# Put one item in the queue
- callback = result_putter(rd)
- callback(result_success)
+ scanner.result_putter(rd, result_success)
assert rd.queue.qsize() == 1
# Make queue maxsize 1, so that it'll be full after the first callback.
# The second callback will wait 1 second, then the queue will be empty
# again.
rd.queue.maxsize = 1
- callback(result_success)
+ scanner.result_putter(rd, result_success)
# after putting 1 result, the queue will be full
assert rd.queue.qsize() == 1
assert rd.queue.full()
# it's still possible to put another results, because the callback will
# wait 1 second and the queue will be empty again.
- callback(result_success)
+ scanner.result_putter(rd, result_success)
assert rd.queue.qsize() == 1
assert rd.queue.full()
end_event.set()
+
+
+def test_complete_measurements(
+ args,
+ conf,
+ sbwshome_only_datadir,
+ controller,
+ relay_list,
+ result_dump,
+ rd,
+ mocker,
+):
+ """
+ Test that the ``ThreadPoolExecutor``` creates the epexted number of
+ futures, ``wait_for_results``process all of them and ``force_get_results``
+ completes them if they were not already completed by the time
+ ``wait_for_results`` has already processed them.
+ There are not real measurements done and the ``results`` are None objects.
+ Running the scanner with the test network, test the real measurements.
+
+ """
+ with freeze_time("2020-02-29 10:00:00"):
+ hbeat = Heartbeat(conf.getpath("paths", "state_fname"))
+ # rl = RelayList(args, conf, controller, measurements_period, state)
+ circuit_builder = CircuitBuilder(args, conf, controller, relay_list)
+ # rd = ResultDump(args, conf)
+ relay_prioritizer = RelayPrioritizer(args, conf, relay_list, rd)
+ destinations, error_msg = DestinationList.from_config(
+ conf, circuit_builder, relay_list, controller
+ )
+ num_threads = conf.getint("scanner", "measurement_threads")
+
+ mocker.patch(
+ "sbws.lib.destination.DestinationList.functional_destinations",
+ side_effect=[d for d in destinations._all_dests],
+ )
+ print("start threads")
+ with concurrent.futures.ThreadPoolExecutor(
+ max_workers=num_threads, thread_name_prefix="measurer"
+ ) as executor:
+ pending_results = {
+ executor.submit(
+ scanner.dispatch_worker_thread,
+ args,
+ conf,
+ destinations,
+ circuit_builder,
+ relay_list,
+ target,
+ ): target
+ for target in relay_prioritizer.best_priority()
+ }
+
+ assert len(pending_results) == 321
+ assert len(hbeat.measured_fp_set) == 0
+ log.debug("Before wait_for_results.")
+ scanner.wait_for_results(executor, hbeat, rd, pending_results)
+ log.debug("After wait_for_results")
+ for pending_result in pending_results:
+ assert pending_result.done() is True
+ assert len(hbeat.measured_fp_set) == 321
+ scanner.force_get_results(pending_results)
+ log.debug("After force_get_results.")
+ assert concurrent.futures.ALL_COMPLETED
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits