[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [arm/master] Better thread safety for tracker operations
commit fbaa098f4731a6d3bd2713033efc4bfbf62b913a
Author: Damian Johnson <atagar@xxxxxxxxxxxxxx>
Date: Mon Oct 28 11:45:32 2013 -0700
Better thread safety for tracker operations
Resolving tor's pid and process name under a daemon lock, which is also used
for the tracker tasks (where it's used).
---
arm/util/tracker.py | 61 +++++++++++++++++++++++++++------------------------
1 file changed, 32 insertions(+), 29 deletions(-)
diff --git a/arm/util/tracker.py b/arm/util/tracker.py
index 8e93e58..647acee 100644
--- a/arm/util/tracker.py
+++ b/arm/util/tracker.py
@@ -3,7 +3,7 @@ Background tasks for gathering informatino about the tor process.
::
- get_connection_tracker - provides a ConnectionResolver for our tor process
+ get_connection_tracker - provides a ConnectionTracker for our tor process
get_resource_tracker - provides a ResourceTracker for our tor process
Daemon - common parent for resolvers
@@ -13,7 +13,7 @@ Background tasks for gathering informatino about the tor process.
|- set_paused - pauses or continues work
+- stop - stops further work by the daemon
- ConnectionResolver - periodically checks the connections established by tor
+ ConnectionTracker - periodically checks the connections established by tor
|- get_custom_resolver - provide the custom conntion resolver we're using
|- set_custom_resolver - overwrites automatic resolver selecion with a custom resolver
+- get_connections - provides our latest connection results
@@ -32,8 +32,8 @@ import arm.util.torTools
from stem.control import State
from stem.util import conf, connection, log, proc, str_tools, system
-CONFIG = conf.config_dict("arm", {
- "queries.resourceUsage.rate": 5,
+CONFIG = conf.config_dict('arm', {
+ 'queries.resourceUsage.rate': 5,
'queries.connections.minRate': 5,
'msg.unable_to_use_resolver': '',
'msg.unable_to_use_all_resolvers': '',
@@ -64,7 +64,7 @@ def get_connection_tracker():
global CONNECTION_TRACKER
if CONNECTION_TRACKER is None:
- CONNECTION_TRACKER = ConnectionResolver()
+ CONNECTION_TRACKER = ConnectionTracker()
return CONNECTION_TRACKER
@@ -91,6 +91,10 @@ class Daemon(threading.Thread):
threading.Thread.__init__(self)
self.daemon = True
+ # PID and process name of the tor process we're tracking. These should only
+ # be used under the daemon's lock.
+
+ self._daemon_lock = threading.RLock()
self._process_name = None
self._process_pid = None
@@ -99,8 +103,8 @@ class Daemon(threading.Thread):
self._run_counter = 0 # counter for the number of successful runs
self._is_paused = False
+ self._pause_condition = threading.Condition()
self._halt = False # terminates thread if true
- self._cond = threading.Condition() # used for pausing the thread
controller = arm.util.torTools.getConn().controller
controller.add_status_listener(self._tor_status_listener)
@@ -113,24 +117,24 @@ class Daemon(threading.Thread):
if self._is_paused or time_since_last_ran < self._rate:
sleep_duration = max(0.2, self._rate - time_since_last_ran)
- self._cond.acquire()
- if not self._halt:
- self._cond.wait(sleep_duration)
- self._cond.release()
+ with self._pause_condition:
+ if not self._halt:
+ self._pause_condition.wait(sleep_duration)
continue # done waiting, try again
- is_successful = self.task()
+ with self._daemon_lock:
+ is_successful = self._task()
if is_successful:
self._run_counter += 1
self._last_ran = time.time()
- def task(self):
+ def _task(self):
"""
Task the resolver is meant to perform. This should be implemented by
- subclasses.
+ subclasses. This is executed under the daemon's lock.
"""
pass
@@ -175,24 +179,24 @@ class Daemon(threading.Thread):
Halts further work and terminates the thread.
"""
- self._cond.acquire()
- self._halt = True
- self._cond.notifyAll()
- self._cond.release()
+ with self._pause_condition:
+ self._halt = True
+ self._pause_condition.notifyAll()
def _tor_status_listener(self, controller, event_type, _):
- if event_type in (State.INIT, State.RESET):
- tor_pid = controller.get_pid(None)
- tor_cmd = system.get_name_by_pid(tor_pid) if tor_pid else None
+ with self._daemon_lock:
+ if not self._halt and event_type in (State.INIT, State.RESET):
+ tor_pid = controller.get_pid(None)
+ tor_cmd = system.get_name_by_pid(tor_pid) if tor_pid else None
- if tor_cmd is None:
- tor_cmd = 'tor'
+ if tor_cmd is None:
+ tor_cmd = 'tor'
- self._process_name = tor_cmd
- self._process_pid = tor_pid
+ self._process_name = tor_cmd
+ self._process_pid = tor_pid
-class ConnectionResolver(Daemon):
+class ConnectionTracker(Daemon):
"""
Periodically retrieves the connections established by tor.
"""
@@ -210,7 +214,7 @@ class ConnectionResolver(Daemon):
self._failure_count = 0
self._rate_too_low_count = 0
- def task(self):
+ def _task(self):
if self._custom_resolver:
resolver = self._custom_resolver
is_default_resolver = False
@@ -325,7 +329,6 @@ class ResourceTracker(Daemon):
self._last_cpu_total = 0
self.last_lookup = -1
- self._val_lock = threading.RLock()
# sequential times we've failed with this method of resolution
self._failure_count = 0
@@ -336,7 +339,7 @@ class ResourceTracker(Daemon):
(cpuUsage_sampling, cpuUsage_avg, memUsage_bytes, memUsage_percent)
"""
- with self._val_lock:
+ with self._daemon_lock:
return self._last_sample if self._last_sample else Resources(0.0, 0.0, 0, 0.0)
def last_query_failed(self):
@@ -347,7 +350,7 @@ class ResourceTracker(Daemon):
return self._failure_count != 0
- def task(self):
+ def _task(self):
if self._procss_pid is None:
return
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits