[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [sbws/master] Merge branch 'bug28933_01'



commit 65e7e29de0746b69265bd24451d9f59bbec1921c
Merge: 09ea037 adcaf04
Author: juga0 <juga@xxxxxxxxxx>
Date:   Sat Feb 23 11:32:38 2019 +0000

    Merge branch 'bug28933_01'
    
    Solved conflicts:
            sbws/core/scanner.py
            tox.ini

 sbws/core/scanner.py               |  6 +++++-
 tests/integration/sbws_testnet.ini | 26 ++++++++++++++++++++++++++
 tox.ini                            |  8 +++++++-
 3 files changed, 38 insertions(+), 2 deletions(-)

diff --cc sbws/core/scanner.py
index 72df4c0,791d719..b486dd9
--- a/sbws/core/scanner.py
+++ b/sbws/core/scanner.py
@@@ -391,111 -330,7 +391,115 @@@ def result_putter_error(target)
      return closure
  
  
 +def main_loop(args, conf, controller, relay_list, circuit_builder, result_dump,
 +              relay_prioritizer, destinations, max_pending_results, pool):
 +    """Starts and reuse the threads that measure the relays forever.
 +
 +    It starts a loop that will be run while there is not and event signaling
 +    that sbws is stopping (because of SIGTERM or SIGINT).
 +
 +    Then, it starts a second loop with an ordered list (generator) of relays
 +    to measure that might a subset of all the current relays in the Network.
 +
 +    For every relay, it starts a new thread which runs ``measure_relay`` to
 +    measure the relay until there are ``max_pending_results`` threads.
 +    After that, it will reuse a thread that has finished for every relay to
 +    measure.
 +    It is the the pool method ``apply_async`` which starts or reuse a thread.
 +    This method returns an ``ApplyResult`` immediately, which has a ``ready``
 +    methods that tells whether the thread has finished or not.
 +
 +    When the thread finish, ie. ``ApplyResult`` is ``ready``, it triggers
 +    ``result_putter`` callback, which put the ``Result`` in ``ResultDump``
 +    queue and complete immediately.
 +
 +    ``ResultDump`` thread (started before and out of this function) will get
 +    the ``Result`` from the queue and write it to disk, so this doesn't block
 +    the measurement threads.
 +
 +    If there was an exception not catched by ``measure_relay``, it will call
 +    instead ``result_putter_error``, which logs the error and complete
 +    immediately.
 +
 +    Before iterating over the next relay, it waits (non blocking, since it
 +    happens in the main thread) until one of the ``max_pending_results``
 +    threads has finished.
 +
 +    This is not needed, since otherwise async_result will queue the relays to
 +    measure in order and won't start reusing a thread to measure a relay until
 +    other thread has finished. But it makes the logic a bit more sequential.
 +
 +    Before the outer loop iterates, it also waits (again non blocking) that all
 +    the ``Results`` are ready.
 +    This avoid to start measuring the same relay which might still being
 +    measured.
 +
 +    """
 +    pending_results = []
 +    # Set the time to wait for a thread to finish as the half of an HTTP
 +    # request timeout.
 +    time_to_sleep = conf.getfloat('general', 'http_timeout') / 2
 +    # Do not start a new loop if sbws is stopping.
 +    while not settings.end_event.is_set():
 +        log.debug("Starting a new measurement loop.")
 +        num_relays = 0
 +        loop_tstart = time.time()
 +        for target in relay_prioritizer.best_priority():
 +            # Don't start measuring a relay if sbws is stopping.
 +            if settings.end_event.is_set():
 +                break
 +            num_relays += 1
 +            # callback and callback_err must be non-blocking
 +            callback = result_putter(result_dump)
 +            callback_err = result_putter_error(target)
 +            async_result = pool.apply_async(
 +                dispatch_worker_thread,
 +                [args, conf, destinations, circuit_builder, relay_list,
 +                 target], {}, callback, callback_err)
 +            pending_results.append(async_result)
 +            # Instead of letting apply_async to queue the relays in order until
 +            # a thread has finished, wait here until a thread has finished.
 +            while len(pending_results) >= max_pending_results:
 +                # sleep is non-blocking since happens in the main process.
 +                time.sleep(time_to_sleep)
 +                pending_results = [r for r in pending_results if not r.ready()]
 +        time_waiting = 0
 +        while (len(pending_results) > 0
 +               and time_waiting <= TIMEOUT_MEASUREMENTS):
 +            log.debug("Number of pending measurement threads %s after "
 +                      "a prioritization loop.", len(pending_results))
 +            time.sleep(time_to_sleep)
 +            time_waiting += time_to_sleep
 +            pending_results = [r for r in pending_results if not r.ready()]
 +        if time_waiting > TIMEOUT_MEASUREMENTS:
 +            dumpstacks()
 +        loop_tstop = time.time()
 +        loop_tdelta = (loop_tstop - loop_tstart) / 60
 +        log.debug("Measured %s relays in %s minutes", num_relays, loop_tdelta)
- 
++        # In a testing network, exit after first loop
++        if controller.get_conf('TestingTorNetwork') == '1':
++            log.info("In a testing network, exiting after the first loop.")
++            # Threads should be closed nicely in some refactor
++            exit(0)
 +
  def run_speedtest(args, conf):
 +    """Initializes all the data and threads needed to measure the relays.
 +
 +    It launches or connect to Tor in a thread.
 +    It initializes the list of relays seen in the Tor network.
 +    It starts a thread to read the previous measurements and wait for new
 +    measurements to write them to the disk.
 +    It initializes a class that will be used to order the relays depending
 +    on their measurements age.
 +    It initializes the list of destinations that will be used for the
 +    measurements.
 +    It initializes the thread pool that will launch the measurement threads.
 +    The pool starts 3 other threads that are not the measurement (worker)
 +    threads.
 +    Finally, it calls the function that will manage the measurement threads.
 +
 +    """
 +    global rd, pool, controller
      controller, _ = stem_utils.init_controller(
          path=conf.getpath('tor', 'control_socket'))
      if not controller:
diff --cc tox.ini
index 4aca0ca,d8fd1c4..5094b78
--- a/tox.ini
+++ b/tox.ini
@@@ -43,16 -43,21 +43,22 @@@ whitelist_externals 
      bash
      sleep
      wget
+     mkdir
  commands =
 -    tar -C {envtmpdir} -vxf {toxinidir}/tests/integration/net.tar
 +    cp -af {toxinidir}/tests/integration/net {envtmpdir}
      bash {envtmpdir}/net/start.sh
      bash -c "time python3 {envtmpdir}/net/wait.py {envtmpdir}/net/{auth,relay,exit}*"
      bash -c "python3 {toxinidir}/scripts/tools/sbws-http-server.py --port 28888 &>/dev/null &"
-     sleep 15
+     sleep 1
      wget -O/dev/null http://127.0.0.1:28888/sbws.bin
+     ; Run actually the scanner
+     mkdir -p /tmp/.sbws
+     ; This add around 3min more to the tests
+     sbws -c {toxinidir}/tests/integration/sbws_testnet.ini scanner
      coverage run -a --rcfile={toxinidir}/.coveragerc --source=sbws -m pytest -s {toxinidir}/tests/integration -vv
      bash {envtmpdir}/net/stop.sh
 +    # no need to remove .tox/net directory.
+     rm -rf /tmp/.sbws
  
  [testenv:lint]
  skip_install = True



_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits