[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [sbws/master] Merge branch 'bug28869_squashed'
commit 149e236e64bf469221a3874bcfc82e89fe408785
Merge: c523619 c24b235
Author: juga0 <juga@xxxxxxxxxx>
Date: Mon Feb 4 14:56:10 2019 +0000
Merge branch 'bug28869_squashed'
Solved merge conflicts in sbws/__init__.py and sbws/core/scanner.py
after merging bug28741
sbws/__init__.py | 18 +++
sbws/core/scanner.py | 201 ++++++++++++++++++-------
sbws/lib/circuitbuilder.py | 16 +-
sbws/lib/resultdump.py | 10 +-
sbws/util/stem.py | 9 +-
tests/integration/lib/test_relayprioritizer.py | 8 +-
6 files changed, 185 insertions(+), 77 deletions(-)
diff --cc sbws/__init__.py
index eea7006,3871edb..84b2787
--- a/sbws/__init__.py
+++ b/sbws/__init__.py
@@@ -1,18 -1,25 +1,36 @@@
__version__ = '1.0.3-dev0'
-
+ import threading # noqa
+
+from . import globals # noqa
+
class Settings:
+ """Singleton settings for all the packages.
+ This way change settings can be seen by all the packages that import it.
+
+ It lives in ``__init__.py`` to leave open the possibility of having a
+ ``settings.py`` module for user settings.
+
+ .. note:: After refactoring, globals should only have constants.
+ Any other variable that needs to be modified when initializing
+ should be initialized here.
+
+ """
def __init__(self):
+ # update this dict from globals (but only for ALL_CAPS settings)
+ for setting in dir(globals):
+ if setting.isupper():
+ setattr(self, setting, getattr(globals, setting))
+ self.end_event = threading.Event()
+ def init_http_headers(self, nickname, uuid, tor_version):
+ self.HTTP_HEADERS['Tor-Bandwidth-Scanner-Nickname'] = nickname
+ self.HTTP_HEADERS['Tor-Bandwidth-Scanner-UUID'] = uuid
+ self.HTTP_HEADERS['User-Agent'] += tor_version
+
+ def set_end_event(self):
+ self.end_event.set()
+
++
settings = Settings() # noqa
diff --cc sbws/core/scanner.py
index 8fe8a5b,bb7a2ca..013d4ee
--- a/sbws/core/scanner.py
+++ b/sbws/core/scanner.py
@@@ -1,8 -1,7 +1,9 @@@
''' Measure the relays. '''
+ import signal
import sys
+import threading
+import uuid
from ..lib.circuitbuilder import GapsCircuitBuilder as CB
from ..lib.resultdump import ResultDump
@@@ -13,7 -12,7 +14,7 @@@ from ..lib.relayprioritizer import Rela
from ..lib.destination import DestinationList
from ..util.timestamp import now_isodt_str
from ..util.state import State
- from sbws.globals import fail_hard, TIMEOUT_MEASUREMENTS, HTTP_GET_HEADERS
-from sbws.globals import fail_hard
++from sbws.globals import fail_hard, HTTP_GET_HEADERS
import sbws.util.stem as stem_utils
import sbws.util.requests as requests_utils
from argparse import ArgumentDefaultsHelpFormatter
@@@ -25,54 -23,52 +25,78 @@@ import loggin
import requests
import random
- from sbws import settings
-
+ from .. import settings
rng = random.SystemRandom()
- end_event = Event()
log = logging.getLogger(__name__)
+ # Declare the objects that manage the threads global so that sbws can exit
+ # gracefully at any time.
+ pool = None
+ rd = None
+ controller = None
+
+
+ def stop_threads(signal, frame):
+ global rd, pool
+ log.debug('Stopping sbws.')
+ # Avoid new threads to start.
+ settings.set_end_event()
+ # Stop Pool threads
+ pool.close()
+ pool.join()
+ # Stop ResultDump thread
+ rd.thread.join()
+ # Stop Tor thread
+ controller.close()
+ sys.exit(0)
+
+
+ signal.signal(signal.SIGTERM, stop_threads)
+def dumpstacks():
+ import traceback
+ log.critical("sbws stop measuring relays, probably because of a bug."
+ "Please, open a ticket in trac.torproject.org with this"
+ "backtrace.")
+ thread_id2name = dict([(t.ident, t.name) for t in threading.enumerate()])
+ for thread_id, stack in sys._current_frames().items():
+ log.critical("Thread: %s(%d)",
+ thread_id2name.get(thread_id, ""), thread_id)
+ log.critical(traceback.print_stack(stack))
+ # If logging level is less than DEBUG (more verbose), start pdb so that
+ # developers can debug the issue.
+ if log.getEffectiveLevel() < logging.DEBUG:
+ import pdb
+ pdb.set_trace()
+ # Otherwise exit.
+ else:
+ # Change to stop threads when #28869 is merged
+ sys.exit(1)
+
+
def timed_recv_from_server(session, dest, byte_range):
''' Request the **byte_range** from the URL at **dest**. If successful,
return True and the time it took to download. Otherwise return False and an
exception. '''
- headers = {'Range': byte_range, 'Accept-Encoding': 'identity'}
+
start_time = time.time()
+ HTTP_GET_HEADERS['Range'] = byte_range
# TODO:
# - What other exceptions can this throw?
- # - Do we have to read the content, or did requests already do so?
+ # - response.elapsed "measures the time taken between sending the first
+ # byte of the request and finishing parsing the headers.
+ # It is therefore unaffected by consuming the response content"
+ # If this mean that the content has arrived, elapsed could be used to
+ # know the time it took.
try:
- requests_utils.get(
- session, dest.url, headers=headers, verify=dest.verify)
+ # headers are merged with the session ones, not overwritten.
+ session.get(dest.url, headers=HTTP_GET_HEADERS, verify=dest.verify)
+ # NewConnectionError will be raised when shutting down.
except (requests.exceptions.ConnectionError,
- requests.exceptions.ReadTimeout) as e:
+ requests.exceptions.ReadTimeout,
+ requests.exceptions.NewConnectionError) as e:
+ log.debug(e)
return False, e
end_time = time.time()
return True, end_time - start_time
@@@ -379,17 -472,9 +501,17 @@@ def run_speedtest(args, conf)
'even lead to messed up results.',
conf.getpath('tor', 'control_socket'))
time.sleep(15)
+
+ # When there will be a refactor where conf is global, this can be removed
+ # from here.
+ state = State(conf.getpath('paths', 'state_fname'))
+ # Call only once to initialize http_headers
+ settings.init_http_headers(conf.get('scanner', 'nickname'), state['uuid'],
+ str(controller.get_version()))
+
rl = RelayList(args, conf, controller)
cb = CB(args, conf, controller, rl)
- rd = ResultDump(args, conf, end_event)
+ rd = ResultDump(args, conf)
rp = RelayPrioritizer(args, conf, rl, rd)
destinations, error_msg = DestinationList.from_config(
conf, cb, rl, controller)
@@@ -454,13 -515,5 +552,8 @@@ def main(args, conf)
state = State(conf.getpath('paths', 'state_fname'))
state['scanner_started'] = now_isodt_str()
+ # Generate an unique identifier for each scanner
+ if 'uuid' not in state:
+ state['uuid'] = str(uuid.uuid4())
- try:
- run_speedtest(args, conf)
- except KeyboardInterrupt as e:
- raise e
- finally:
- end_event.set()
+ run_speedtest(args, conf)
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits