[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[or-cvs] r9469: Node monitor. Also made a new directory tordiffs for diffs t (in torflow/trunk: . tordiffs)
- To: or-cvs@xxxxxxxxxxxxx
- Subject: [or-cvs] r9469: Node monitor. Also made a new directory tordiffs for diffs t (in torflow/trunk: . tordiffs)
- From: mikeperry@xxxxxxxx
- Date: Wed, 31 Jan 2007 03:29:32 -0500 (EST)
- Delivered-to: archiver@seul.org
- Delivered-to: or-cvs-outgoing@seul.org
- Delivered-to: or-cvs@seul.org
- Delivery-date: Wed, 31 Jan 2007 03:29:40 -0500
- Reply-to: or-talk@xxxxxxxxxxxxx
- Sender: owner-or-cvs@xxxxxxxxxxxxx
Author: mikeperry
Date: 2007-01-31 03:29:23 -0500 (Wed, 31 Jan 2007)
New Revision: 9469
Added:
torflow/trunk/nodemon.py
torflow/trunk/tordiffs/
torflow/trunk/tordiffs/orconn-bw.diff
Modified:
torflow/trunk/TorCtl.py
Log:
Node monitor.
Also made a new directory tordiffs for diffs that
don't/won't/can't/shouldn't/didn't (yet?) make it into the proper Tor
distribution.
or-conn-bw.diff is a diff to enable bandwidth monitoring for OR
connections.
nodemon spits out stats about how much bandwidth is going to each or
connection, and also ORCONN failure statistics and circuit death info. On my
slow node the relative bandwidth rates do not match up with the rest of the
driectory, at least not after measurements covering durations on the order of
days.. Maybe this will change when I get a fast node.
Modified: torflow/trunk/TorCtl.py
===================================================================
--- torflow/trunk/TorCtl.py 2007-01-31 00:58:06 UTC (rev 9468)
+++ torflow/trunk/TorCtl.py 2007-01-31 08:29:23 UTC (rev 9469)
@@ -122,7 +122,6 @@
"""Create a Connection to communicate with the Tor process over the
socket 'sock'.
"""
- self._s = None
self._handler = None
self._handleFn = None
self._sendLock = threading.RLock()
@@ -626,17 +625,34 @@
if remote: remote = remote[15:]
args = ident, status, circ, target_host, int(target_port), reason, remote
elif evtype == "ORCONN":
- m = re.match(r"(\S+)\s+(\S+)(\s\S+)?(\s\S+)?", body)
+ m = re.match(r"(\S+)\s+(\S+)(\s\S+)?(\s\S+)?(\s\S+)?(\s\S+)?(\s\S+)?", body)
if not m:
raise ProtocolError("ORCONN event misformatted.")
- target, status, reason, ncircs = m.groups()
+ target, status, age, read, wrote, reason, ncircs = m.groups()
+
+ # XXX: Special hacks for bandwidth stat research
+ if status == "READ":
+ read = " READ=" + str(age)
+ age = 0
+ if status == "WRITE":
+ wrote = " WRITTEN=" + str(age)
+ age = 0
+
+
if reason and not ncircs:
if "NCIRCS=" in reason:
ncircs = reason
reason = None
- if ncircs: ncircs = int(ncircs[7:])
+ if ncircs: ncircs = int(ncircs[8:])
+ else: ncircs = 0
if reason: reason = reason[8:]
- args = status, target, reason, ncircs
+ if age: age = int(age[5:])
+ else: age = 0
+ if read: read = int(read[6:])
+ else: read = 0
+ if wrote: wrote = int(wrote[9:])
+ else: wrote = 0
+ args = status, target, age, read, wrote, reason, ncircs
elif evtype == "BW":
m = re.match(r"(\d+)\s+(\d+)", body)
if not m:
@@ -686,7 +702,8 @@
"""
raise NotImplemented
- def or_conn_status(self, eventtype, status, target, reason, ncircs):
+ def or_conn_status(self, eventtype, status, target, age, read, wrote,
+ reason, ncircs):
"""Called when an OR connection's status changes if listening to
ORCONNSTATUS events. 'status' is a member of OR_CONN_STATUS; target
is the OR in question.
@@ -745,12 +762,19 @@
def new_desc(self, eventtype, identities):
print " ".join((eventtype, " ".join(identities)))
- def or_conn_status(self, eventtype, status, target, reason, ncircs):
+ def or_conn_status(self, eventtype, status, target, age, read, wrote,
+ reason, ncircs):
+ if age: age = "AGE="+str(age)
+ else: age = ""
+ if read: read = "READ="+str(read)
+ else: read = ""
+ if wrote: wrote = "WRITTEN="+str(wrote)
+ else: wrote = ""
if reason: reason = "REASON="+reason
else: reason = ""
if ncircs: ncircs = "NCIRCS="+str(ncircs)
else: ncircs = ""
- print " ".join((eventtype, target, status, reason, ncircs))
+ print " ".join((eventtype, target, status, age, read, wrote, reason, ncircs))
def parseHostAndPort(h):
"""Given a string of the form 'address:port' or 'address' or
Added: torflow/trunk/nodemon.py
===================================================================
--- torflow/trunk/nodemon.py 2007-01-31 00:58:06 UTC (rev 9468)
+++ torflow/trunk/nodemon.py 2007-01-31 08:29:23 UTC (rev 9469)
@@ -0,0 +1,192 @@
+#!/usr/bin/python
+# Nodemon - Tor node monitor
+
+"""
+Nodemon - Tor node monitor
+"""
+
+import TorCtl
+import atexit
+import sys
+import socket
+import struct
+import traceback
+import re
+import random
+from TorUtil import *
+import sched, time
+import thread
+
+class Reason:
+ def __init__(self, reason): self.reason = reason
+ ncircs = 0
+ count = 0
+
+class RouterReasons:
+ # reason strings to Reason
+ def __init__(self, target):
+ self.name = target
+ self.reasons = {} # For a fun time, move this outside __init__
+ tot_ncircs = 0
+ tot_count = 0
+ tot_read = 0
+ tot_wrote = 0
+ running_read = 0
+ running_wrote = 0
+ tot_age = 0
+
+errors = {}
+errors_lock = thread.allocate_lock()
+key_to_name = {}
+name_to_key = {}
+
+# XXX: Move these to config file
+control_host = "127.0.0.1"
+control_port = 9051
+max_detach = 3
+
+
+# Make eventhandler
+class NodeHandler(TorCtl.EventHandler):
+ def __init__(self, c):
+ TorCtl.EventHandler.__init__(self)
+ self.c = c
+
+ def or_conn_status(self, eventtype, status, target, age, read, wrote,
+ reason, ncircs):
+ # XXX: Count all routers as one?
+ if re.search(r"^\$", target):
+ if target not in key_to_name:
+ target = "AllClients:HASH"
+ else: target = key_to_name[target]
+ elif target not in name_to_key:
+ target = "AllClients:IP"
+
+ if status == "READ" or status == "WRITE":
+ #plog("DEBUG", "Read: " + str(read) + " wrote: " + str(wrote))
+ errors_lock.acquire()
+ if target not in errors: errors[target] = RouterReasons(target)
+ errors[target].running_read += read
+ errors[target].running_wrote += wrote
+ errors_lock.release()
+
+
+ if status == "CLOSED" or status == "FAILED":
+ errors_lock.acquire()
+ if target not in errors: errors[target] = RouterReasons(target)
+ reason = status+":"+reason
+ if reason not in errors[target].reasons:
+ errors[target].reasons[reason] = Reason(reason)
+ errors[target].reasons[reason].ncircs += ncircs
+ errors[target].reasons[reason].count += 1
+ errors[target].tot_ncircs += ncircs
+ errors[target].tot_count += 1
+ if age: errors[target].tot_age += age
+ if read: errors[target].tot_read += read
+ if wrote: errors[target].tot_wrote += wrote
+ errors_lock.release()
+ else: return
+ if age: age = "AGE="+str(age)
+ else: age = ""
+ if read: read = "READ="+str(read)
+ else: read = ""
+ if wrote: wrote = "WRITTEN="+str(wrote)
+ else: wrote = ""
+ if reason: reason = "REASON="+reason
+ else: reason = ""
+ if ncircs: ncircs = "NCIRCS="+str(ncircs)
+ else: ncircs = ""
+ plog("DEBUG",
+ " ".join((eventtype, target, status, age, read, wrote,
+ reason, ncircs)))
+
+ def ns(self, eventtype, nslist):
+ for ns in nslist:
+ key_to_name[ns.idhex] = ns.name
+ name_to_key[ns.name] = ns.idhex
+
+ def new_desc(self, eventtype, identities):
+ for i in identities:
+ nslist = self.c.get_network_status("id/"+i)
+ for ns in nslist: # should be only 1, but eh
+ key_to_name[ns.idhex] = ns.name
+ name_to_key[ns.name] = ns.idhex
+
+def bw_stats(key, f):
+ routers = errors.values()
+ routers.sort(lambda x,y: cmp(key(y), key(x))) # Python < 2.4 hack
+
+ for r in routers:
+ f.write(r.name+"="+str(key(r))+"\n")
+
+ f.close()
+
+
+def save_stats(s):
+ errors_lock.acquire()
+ # Yes yes, adding + 0.005 to age is bloody.. but who cares,
+ # 1. Routers sorted by bytes read
+ bw_stats(lambda x: x.tot_read, file("./data/r_by_rbytes", "w"))
+ # 2. Routers sorted by bytes written
+ bw_stats(lambda x: x.tot_wrote, file("./data/r_by_wbytes", "w"))
+ # 3. Routers sorted by tot bytes
+ bw_stats(lambda x: x.tot_read+x.tot_wrote, file("./data/r_by_tbytes", "w"))
+ # 4. Routers sorted by downstream bw
+ bw_stats(lambda x: x.tot_read/(x.tot_age+0.005),
+ file("./data/r_by_rbw", "w"))
+ # 5. Routers sorted by upstream bw
+ bw_stats(lambda x: x.tot_wrote/(x.tot_age+0.005), file("./data/r_by_wbw", "w"))
+ # 6. Routers sorted by total bw
+ bw_stats(lambda x: (x.tot_read+x.tot_wrote)/(x.tot_age+0.005),
+ file("./data/r_by_tbw", "w"))
+
+ bw_stats(lambda x: x.running_read,
+ file("./data/r_by_rrunbytes", "w"))
+ bw_stats(lambda x: x.running_wrote,
+ file("./data/r_by_wrunbytes", "w"))
+ bw_stats(lambda x: x.running_read+x.running_wrote,
+ file("./data/r_by_trunbytes", "w"))
+
+
+
+ f = file("./data/reasons", "w")
+ routers = errors.values()
+ routers.sort(lambda x, y: cmp(y.tot_ncircs, x.tot_ncircs))
+
+ for r in routers:
+ f.write(r.name+" " +str(r.tot_ncircs)+"/"+str(r.tot_count)+"\n")
+ for reason in r.reasons.itervalues():
+ f.write("\t"+reason.reason+" "+str(reason.ncircs)+
+ "/"+str(reason.count)+"\n")
+
+ errors_lock.release()
+ f.close()
+ s.enter(60, 1, save_stats, (s,))
+
+
+def startmon(c):
+ global key_to_name, name_to_key
+ nslist = c.get_network_status()
+ for ns in nslist:
+ key_to_name[ns.idhex] = ns.name
+ name_to_key[ns.name] = ns.idhex
+
+ s=sched.scheduler(time.time, time.sleep)
+
+ s.enter(60, 1, save_stats, (s,))
+ s.run();
+
+
+def main(argv):
+ s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ s.connect((control_host,control_port))
+ c = TorCtl.get_connection(s)
+ c.set_event_handler(NodeHandler(c))
+ th = c.launch_thread()
+ c.authenticate()
+ c.set_events([TorCtl.EVENT_TYPE.ORCONN,
+ TorCtl.EVENT_TYPE.NS,
+ TorCtl.EVENT_TYPE.NEWDESC], True)
+ startmon(c)
+
+main(sys.argv)
Property changes on: torflow/trunk/nodemon.py
___________________________________________________________________
Name: svn:executable
+ *
Added: torflow/trunk/tordiffs/orconn-bw.diff
===================================================================
--- torflow/trunk/tordiffs/orconn-bw.diff 2007-01-31 00:58:06 UTC (rev 9468)
+++ torflow/trunk/tordiffs/orconn-bw.diff 2007-01-31 08:29:23 UTC (rev 9469)
@@ -0,0 +1,166 @@
+Index: src/or/control.c
+===================================================================
+--- src/or/control.c (revision 9468)
++++ src/or/control.c (working copy)
+@@ -3279,21 +3279,21 @@
+ {
+ switch (r) {
+ case END_OR_CONN_REASON_DONE:
+- return "REASON=DONE";
++ return " REASON=DONE";
+ case END_OR_CONN_REASON_TCP_REFUSED:
+- return "REASON=CONNECTREFUSED";
++ return " REASON=CONNECTREFUSED";
+ case END_OR_CONN_REASON_OR_IDENTITY:
+- return "REASON=IDENTITY";
++ return " REASON=IDENTITY";
+ case END_OR_CONN_REASON_TLS_CONNRESET:
+- return "REASON=CONNECTRESET";
++ return " REASON=CONNECTRESET";
+ case END_OR_CONN_REASON_TLS_TIMEOUT:
+- return "REASON=TIMEOUT";
++ return " REASON=TIMEOUT";
+ case END_OR_CONN_REASON_TLS_NO_ROUTE:
+- return "REASON=NOROUTE";
++ return " REASON=NOROUTE";
+ case END_OR_CONN_REASON_TLS_IO_ERROR:
+- return "REASON=IOERROR";
++ return " REASON=IOERROR";
+ case END_OR_CONN_REASON_TLS_MISC:
+- return "REASON=MISC";
++ return " REASON=MISC";
+ case 0:
+ return "";
+ default:
+@@ -3323,6 +3323,9 @@
+ const char *status;
+ char name[128];
+ char ncircs_buf[32] = {0}; /* > 8 + log10(2^32)=10 + 2 */
++ char age_buf[32] = {0}; /* > 8 + log10(2^32)=10 + 2 */
++ char read_buf[42] = {0}; /* > 8 + log10(2^64)=20 + 2 */
++ char wrote_buf[42] = {0}; /* > 8 + log10(2^64)=20 + 2 */
+ switch (tp)
+ {
+ case OR_CONN_EVENT_LAUNCHED: status = "LAUNCHED"; break;
+@@ -3330,30 +3333,46 @@
+ case OR_CONN_EVENT_FAILED: status = "FAILED"; break;
+ case OR_CONN_EVENT_CLOSED: status = "CLOSED"; break;
+ case OR_CONN_EVENT_NEW: status = "NEW"; break;
++ case OR_CONN_EVENT_READ: status = "READ"; break;
++ case OR_CONN_EVENT_WRITE: status = "WRITE"; break;
+ default:
+ log_warn(LD_BUG, "Unrecognized status code %d", (int)tp);
+ return 0;
+ }
+ ncircs = connection_or_count_pending_circs(conn);
+ ncircs += conn->n_circuits;
+- if (ncircs && (tp == OR_CONN_EVENT_FAILED || tp == OR_CONN_EVENT_CLOSED)) {
+- tor_snprintf(ncircs_buf, sizeof(ncircs_buf), "%sNCIRCS=%d",
+- reason ? " " : "", ncircs);
++ if(tp == OR_CONN_EVENT_READ || tp == OR_CONN_EVENT_WRITE) {
++ // XXX: Bloody fucking hack, but the odds of this being
++ // approved for the official dist are slim to none
++ tor_snprintf(read_buf, sizeof(read_buf), "%d", reason);
++ reason = 0;
+ }
++ if (tp == OR_CONN_EVENT_FAILED || tp == OR_CONN_EVENT_CLOSED) {
++ tor_snprintf(age_buf, sizeof(age_buf), "AGE=%d",
++ (int)(time(NULL) - conn->_base.timestamp_created));
++ tor_snprintf(read_buf, sizeof(read_buf), " READ=%Zu",
++ conn->_base.bytes_read);
++ tor_snprintf(wrote_buf, sizeof(wrote_buf), " WRITTEN=%Zu",
++ conn->_base.bytes_written);
++ if (ncircs)
++ tor_snprintf(ncircs_buf, sizeof(ncircs_buf), " NCIRCS=%d", ncircs);
++ }
+
+ if (EVENT_IS_INTERESTING1S(EVENT_OR_CONN_STATUS)) {
+ orconn_target_get_name(0, name, sizeof(name), conn);
+ send_control1_event_extended(EVENT_OR_CONN_STATUS, SHORT_NAMES,
+- "650 ORCONN %s %s@%s%s\r\n",
+- name, status,
+- or_conn_end_reason_to_string(reason), ncircs_buf);
++ "650 ORCONN %s %s@%s%s%s%s%s\r\n",
++ name, status,
++ age_buf, read_buf, wrote_buf,
++ or_conn_end_reason_to_string(reason), ncircs_buf);
+ }
+ if (EVENT_IS_INTERESTING1L(EVENT_OR_CONN_STATUS)) {
+- orconn_target_get_name(1, name, sizeof(name), conn);
+- send_control1_event_extended(EVENT_OR_CONN_STATUS, LONG_NAMES,
+- "650 ORCONN %s %s@%s%s\r\n",
+- name, status,
+- or_conn_end_reason_to_string(reason), ncircs_buf);
++ orconn_target_get_name(1, name, sizeof(name), conn);
++ send_control1_event_extended(EVENT_OR_CONN_STATUS, LONG_NAMES,
++ "650 ORCONN %s %s@%s%s%s%s%s\r\n",
++ name, status,
++ age_buf, read_buf, wrote_buf,
++ or_conn_end_reason_to_string(reason), ncircs_buf);
+ }
+ }
+ return 0;
+Index: src/or/or.h
+===================================================================
+--- src/or/or.h (revision 9468)
++++ src/or/or.h (working copy)
+@@ -710,6 +710,8 @@
+ * could write? */
+ time_t timestamp_created; /**< When was this connection_t created? */
+
++ size_t bytes_read; /**< Total number of bytes read off this connection */
++ size_t bytes_written; /**< Total number of bytes written to this connection*/
+ uint32_t addr; /**< IP of the other side of the connection; used to identify
+ * routers, along with port. */
+ uint16_t port; /**< If non-zero, port on the other end
+@@ -2246,6 +2248,8 @@
+ OR_CONN_EVENT_FAILED = 2,
+ OR_CONN_EVENT_CLOSED = 3,
+ OR_CONN_EVENT_NEW = 4,
++ OR_CONN_EVENT_READ = 5,
++ OR_CONN_EVENT_WRITE = 6
+ } or_conn_status_event_t;
+
+ void control_update_global_event_mask(void);
+Index: src/or/connection.c
+===================================================================
+--- src/or/connection.c (revision 9468)
++++ src/or/connection.c (working copy)
+@@ -1578,10 +1578,18 @@
+ if (n_read > 0) {
+ rep_hist_note_bytes_read(n_read, now);
+ connection_read_bucket_decrement(conn, n_read);
++ conn->bytes_read += n_read;
++ if(connection_speaks_cells(conn))
++ control_event_or_conn_status(TO_OR_CONN(conn), OR_CONN_EVENT_READ,
++ n_read);
+ }
+ if (n_written > 0) {
+ rep_hist_note_bytes_written(n_written, now);
+ global_write_bucket -= n_written;
++ conn->bytes_written += n_written;
++ if(connection_speaks_cells(conn))
++ control_event_or_conn_status(TO_OR_CONN(conn), OR_CONN_EVENT_WRITE,
++ n_written);
+ }
+ }
+
+@@ -1774,10 +1782,18 @@
+ if (n_written > 0) {
+ rep_hist_note_bytes_written(n_written, now);
+ global_write_bucket -= n_written;
++ conn->bytes_written += n_written;
++ if(connection_speaks_cells(conn))
++ control_event_or_conn_status(TO_OR_CONN(conn), OR_CONN_EVENT_WRITE,
++ n_written);
+ }
+ if (n_read > 0) {
+ rep_hist_note_bytes_read(n_read, now);
+ connection_read_bucket_decrement(conn, n_read);
++ conn->bytes_read += n_read;
++ if(connection_speaks_cells(conn))
++ control_event_or_conn_status(TO_OR_CONN(conn), OR_CONN_EVENT_READ,
++ n_read);
+ }
+ }
+