[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [sbws/m12] fix: scanner: Increase time getting measurements
commit 3653396c9daa9170033fd2b706cd9d290eac6f6b
Author: juga0 <juga@xxxxxxxxxx>
Date: Thu May 13 12:10:35 2021 +0000
fix: scanner: Increase time getting measurements
- Increase the time waiting for the last measurements queued, to avoid
canceling unfinished measurements and gc maybe not releasing thread
variables
- Use the already declared global pool instead of passing it by args
- Log more information when the last measuremetns timeout
Closes: #40087
---
sbws/core/scanner.py | 30 +++++++++++++++++++++++++-----
setup.cfg | 1 +
2 files changed, 26 insertions(+), 5 deletions(-)
diff --git a/sbws/core/scanner.py b/sbws/core/scanner.py
index dbcceb9..7637028 100644
--- a/sbws/core/scanner.py
+++ b/sbws/core/scanner.py
@@ -15,7 +15,12 @@ from multiprocessing.dummy import Pool
import sbws.util.requests as requests_utils
import sbws.util.stem as stem_utils
-from sbws.globals import HTTP_GET_HEADERS, TIMEOUT_MEASUREMENTS, fail_hard
+from sbws.globals import (
+ HTTP_GET_HEADERS,
+ SOCKET_TIMEOUT,
+ TIMEOUT_MEASUREMENTS,
+ fail_hard,
+)
from .. import settings
from ..lib.circuitbuilder import GapsCircuitBuilder as CB
@@ -77,7 +82,7 @@ def dumpstacks():
log.critical(
"Thread: %s(%d)", thread_id2name.get(thread_id, ""), thread_id
)
- log.critical(traceback.format_stack("".join(stack)))
+ log.critical("Traceback: %s", "".join(traceback.format_stack(stack)))
# If logging level is less than DEBUG (more verbose), start pdb so that
# developers can debug the issue.
if log.getEffectiveLevel() < logging.DEBUG:
@@ -660,7 +665,6 @@ def main_loop(
result_dump,
relay_prioritizer,
destinations,
- pool,
):
"""Starts and reuse the threads that measure the relays forever.
@@ -696,6 +700,7 @@ def main_loop(
measured.
"""
+ global pool
log.info("Started the main loop to measure the relays.")
hbeat = Heartbeat(conf.getpath("paths", "state_fname"))
@@ -743,6 +748,7 @@ def main_loop(
# Register this measurement to the heartbeat module
hbeat.register_measured_fpr(target.fingerprint)
+ log.debug("Measurements queued.")
# After the for has finished, the pool has queued all the relays
# and pending_results has the list of all the AsyncResults.
# It could also be obtained with pool._cache, which contains
@@ -815,6 +821,7 @@ def wait_for_results(num_relays_to_measure, pending_results):
len(pending_results),
num_relays_to_measure,
)
+ log.info("Last measured: %s", num_last_measured)
time.sleep(TIMEOUT_MEASUREMENTS)
old_pending_results = pending_results
pending_results = [r for r in pending_results if not r.ready()]
@@ -836,15 +843,28 @@ def force_get_results(pending_results):
``get`` is not call before, because it blocks and the callbacks
are not call.
"""
+ global pool
log.debug("Forcing get")
+ # In case there are no finished AsyncResults, print the cache here
+ # at level info so that is visible even if debug is not enabled.
+ log.info("Pool cache %s", pool._cache)
for r in pending_results:
try:
- result = r.get(timeout=0.1)
+ # HTTP timeout is 10
+ result = r.get(timeout=SOCKET_TIMEOUT + 10)
log.warning("Result %s was not stored, it took too long.", result)
# TimeoutError is raised when the result is not ready, ie. has not
# been processed yet
except TimeoutError:
log.warning("A result was not stored, it was not ready.")
+ # This is the only place where using psutil so far.
+ import psutil
+
+ log.warning(psutil.Process(os.getpid()).memory_full_info())
+ virtualMemoryInfo = psutil.virtual_memory()
+ availableMemory = virtualMemoryInfo.available
+ log.warning("Memory available %s MB.", availableMemory / 1024 ** 2)
+ dumpstacks()
# If the result raised an exception, `get` returns it,
# then log any exception so that it can be fixed.
# This should not happen, since `callback_err` would have been call
@@ -910,7 +930,7 @@ def run_speedtest(args, conf):
max_pending_results = conf.getint("scanner", "measurement_threads")
pool = Pool(max_pending_results)
try:
- main_loop(args, conf, controller, rl, cb, rd, rp, destinations, pool)
+ main_loop(args, conf, controller, rl, cb, rd, rp, destinations)
except KeyboardInterrupt:
log.info("Interrupted by the user.")
stop_threads(signal.SIGINT, None)
diff --git a/setup.cfg b/setup.cfg
index 0074a1d..88d4613 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -32,6 +32,7 @@ include_package_data = True
# See stable releases at https://www.python.org/downloads/
python_requires = >= 3.6
install_requires =
+ psutil >= 5.5
stem >= 1.7.0
; # Now versioneer is also needed as dependency
versioneer
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits