[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [sbws/master] scanner: catch SIGINT in the main loop



commit fe16d6d8a82e53cd62b9bdda577e79b2f6e666d9
Author: juga0 <juga@xxxxxxxxxx>
Date:   Tue Jan 8 15:59:00 2019 +0000

    scanner: catch SIGINT in the main loop
    
    also split main function into an extra main_loop function to be
    able to stop the threads after they have started.
    Also check end event in the mean loop and before starting to
    measure a new relay.
    
    Fixes bug #28869. Bugfix v0.1.0.
---
 sbws/core/scanner.py | 115 ++++++++++++++++++++++++++++++++++++++++-----------
 1 file changed, 90 insertions(+), 25 deletions(-)

diff --git a/sbws/core/scanner.py b/sbws/core/scanner.py
index 7c2a937..166ce5d 100644
--- a/sbws/core/scanner.py
+++ b/sbws/core/scanner.py
@@ -354,6 +354,88 @@ def result_putter_error(target):
     return closure
 
 
+def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump,
+              relay_prioritizer, destinations, max_pending_results, pool):
+    """Starts and reuse the threads that measure the relays forever.
+
+    It starts a loop that will be run while there is not and event signaling
+    that sbws is stopping (because of SIGTERM or SIGINT).
+
+    Then, it starts a second loop with an ordered list (generator) of relays
+    to measure that might a subset of all the current relays in the Network.
+
+    For every relay, it starts a new thread which runs ``measure_relay`` to
+    measure the relay until there are ``max_pending_results`` threads.
+    After that, it will reuse a thread that has finished for every relay to
+    measure.
+    It is the the pool method ``apply_async`` which starts or reuse a thread.
+    This method returns an ``ApplyResult`` immediately, which has a ``ready``
+    methods that tells whether the thread has finished or not.
+
+    When the thread finish, ie. ``ApplyResult`` is ``ready``, it triggers
+    ``result_putter`` callback, which put the ``Result`` in ``ResultDump``
+    queue and complete immediately.
+
+    ``ResultDump`` thread (started before and out of this function) will get
+    the ``Result`` from the queue and write it to disk, so this doesn't block
+    the measurement threads.
+
+    If there was an exception not catched by ``measure_relay``, it will call
+    instead ``result_putter_error``, which logs the error and complete
+    immediately.
+
+    Before iterating over the next relay, it waits (non blocking, since it
+    happens in the main thread) until one of the ``max_pending_results``
+    threads has finished.
+
+    This is not needed, since otherwise async_result will queue the relays to
+    measure in order and won't start reusing a thread to measure a relay until
+    other thread has finished. But it makes the logic a bit more sequential.
+
+    Before the outer loop iterates, it also waits (again non blocking) that all
+    the ``Results`` are ready.
+    This avoid to start measuring the same relay which might still being
+    measured.
+
+    """
+    pending_results = []
+    # Do not start a new loop if sbws is stopping.
+    while not settings.end_event.is_set():
+        log.debug("Starting a new measurement loop.")
+        num_relays = 0
+        loop_tstart = time.time()
+        for target in relay_prioritizer.best_priority():
+            # Don't start measuring a relay if sbws is stopping.
+            if settings.end_event.is_set():
+                break
+            num_relays += 1
+            log.debug('Measuring %s %s', target.nickname,
+                      target.fingerprint[0:8])
+            # callback and callback_err must be non-blocking
+            callback = result_putter(result_dump)
+            callback_err = result_putter_error(target)
+            async_result = pool.apply_async(
+                dispatch_worker_thread,
+                [args, conf, destinations, circuit_builder, relay_list,
+                 target], {}, callback, callback_err)
+            pending_results.append(async_result)
+            # Instead of letting apply_async to queue the relays in order until
+            # a thread has finished, wait here until a thread has finished.
+            while len(pending_results) >= max_pending_results:
+                # sleep is non-blocking sine happens in the main process
+                time.sleep(5)
+                pending_results = [r for r in pending_results if not r.ready()]
+        while len(pending_results) > 0:
+            log.debug("There are %s pending measurements.",
+                      len(pending_results))
+            # sleep is non-blocking sine happens in the main process
+            time.sleep(5)
+            pending_results = [r for r in pending_results if not r.ready()]
+        loop_tstop = time.time()
+        loop_tdelta = (loop_tstop - loop_tstart) / 60
+        log.debug("Measured %s relays in %s minutes", num_relays, loop_tdelta)
+
+
 def run_speedtest(args, conf):
     global rd, pool, controller
     controller, _ = stem_utils.init_controller(
@@ -382,31 +464,14 @@ def run_speedtest(args, conf):
         fail_hard(error_msg)
     max_pending_results = conf.getint('scanner', 'measurement_threads')
     pool = Pool(max_pending_results)
-    pending_results = []
-    while True:
-        num_relays = 0
-        loop_tstart = time.time()
-        log.info("Starting a new loop to measure relays.")
-        for target in rp.best_priority():
-            num_relays += 1
-            log.debug('Measuring %s %s', target.nickname,
-                      target.fingerprint[0:8])
-            callback = result_putter(rd)
-            callback_err = result_putter_error(target)
-            async_result = pool.apply_async(
-                dispatch_worker_thread,
-                [args, conf, destinations, cb, rl, target],
-                {}, callback, callback_err)
-            pending_results.append(async_result)
-            while len(pending_results) >= max_pending_results:
-                time.sleep(5)
-                pending_results = [r for r in pending_results if not r.ready()]
-        while len(pending_results) > 0:
-            time.sleep(5)
-            pending_results = [r for r in pending_results if not r.ready()]
-        loop_tstop = time.time()
-        loop_tdelta = (loop_tstop - loop_tstart) / 60
-        log.info("Measured %s relays in %s minutes", num_relays, loop_tdelta)
+
+    try:
+        main_loop(args, conf, controller, rl, cb, rd, rp, destinations,
+                  max_pending_results, pool)
+    except KeyboardInterrupt:
+        log.info("Interrupted by the user.")
+    finally:
+        stop_threads(signal.SIGINT, None)
 
 
 def gen_parser(sub):



_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits