[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [torflow/master] Move more parameters to consensus.
commit 13aa3b4268a98a07e76e8ba9cb8cfccac39ff2c0
Author: Mike Perry <mikeperry-git@xxxxxxxxxx>
Date: Sun Nov 13 17:45:01 2011 -0800
Move more parameters to consensus.
---
NetworkScanners/BwAuthority/aggregate.py | 103 +++++++++++++++++++++++-------
1 files changed, 79 insertions(+), 24 deletions(-)
diff --git a/NetworkScanners/BwAuthority/aggregate.py b/NetworkScanners/BwAuthority/aggregate.py
index 1746256..9bed1d0 100755
--- a/NetworkScanners/BwAuthority/aggregate.py
+++ b/NetworkScanners/BwAuthority/aggregate.py
@@ -23,22 +23,25 @@ IGNORE_GUARDS = 0
# rate for guard nodes
GUARD_SAMPLE_RATE = 2*7*24*60*60 # 2wks
-# PID constants
-# See https://en.wikipedia.org/wiki/PID_controller#Ideal_versus_standard_PID_form
+# PID constant defaults. May be overridden by consensus
+# https://en.wikipedia.org/wiki/PID_controller#Ideal_versus_standard_PID_form
K_p = 1.0
# We expect to correct steady state error in 5 samples (guess)
T_i = 5.0
+# T_i_decay is a weight factor to govern how fast integral sums
+# decay. For the values of T_i that we care about, T_i_decay represents
+# the fraction of integral sum that is eliminated after T_i sample rounds.
+# This decay is non-standard, but we do it to avoid overflow
+T_i_decay = 0.5
+
# We can only expect to predict less than one sample into the future, as
# after 1 sample, clients will have migrated
-# FIXME: Our prediction ability is a function of the consensus time
+# FIXME: Our prediction ability is a function of the consensus uptake time
# vs measurement rate
T_d = 0.5
-K_i = K_p/T_i
-K_d = K_p*T_d
-
NODE_CAP = 0.05
MIN_REPORT = 60 # Percent of the network we must measure before reporting
@@ -94,6 +97,7 @@ class Node:
self.circ_fail_rate = 0
self.strm_fail_rate = 0
+ # FIXME: Need to set pid_error_sum.. It is getting lost when we don't vote
def revert_to_vote(self, vote):
self.new_bw = vote.bw*1000
self.pid_bw = vote.pid_bw
@@ -101,17 +105,17 @@ class Node:
self.measured_at = vote.measured_at
# Derivative of error for pid control
- def get_pid_bw(self, prev_vote, kp):
+ def get_pid_bw(self, prev_vote, kp, ki, kd, kidecay):
self.prev_error = prev_vote.pid_error
self.prev_measured_at = prev_vote.measured_at
- # We decay the interval by 1/T_i each round to keep it bounded.
- # This is non-standard
- self.pid_error_sum = prev_vote.pid_error_sum*(1 - 1.0/T_i) + self.pid_error
+ # We decay the interval each round to keep it bounded.
+ # This decay is non-standard. We do it to avoid overflow
+ self.pid_error_sum = prev_vote.pid_error_sum*kidecay + self.pid_error
self.pid_bw = self.ns_bw \
- + kp*(self.ns_bw*self.pid_error \
- + self.ns_bw*self.integral_error()/T_i \
- + self.ns_bw*self.d_error_dt()*T_d)
+ + kp*self.ns_bw*self.pid_error \
+ + ki*self.ns_bw*self.integral_error() \
+ + kd*self.ns_bw*self.d_error_dt()
return self.pid_bw
# Time-weighted sum of error per unit of time (measurement sample)
@@ -139,8 +143,7 @@ class Node:
self.filt_bw = line.filt_bw
self.ns_bw = line.ns_bw
self.desc_bw = line.desc_bw
- # XXX: Temporary test
- self.circ_fail_rate = 0.0 #line.circ_fail_rate
+ self.circ_fail_rate = line.circ_fail_rate
self.strm_fail_rate = line.strm_fail_rate
class Line:
@@ -197,16 +200,54 @@ class ConsensusJunk:
def __init__(self, c):
cs_bytes = c.sendAndRecv("GETINFO dir/status-vote/current/consensus\r\n")[0][2]
self.bwauth_pid_control = False
+ self.use_circ_fails = False
+ self.use_best_ratio = False
+
+ self.K_p = K_p
+ self.T_i = T_i
+ self.T_d = T_d
+ self.T_i_decay = T_i_decay
+
try:
cs_params = re.search("^params ((?:[\S]+=[\d]+[\s]?)+)",
cs_bytes, re.M).group(1).split()
for p in cs_params:
if p == "bwauthpid=1":
self.bwauth_pid_control = True
+ elif p == "bwauthcircs=1":
+ self.use_circ_fails = True
+ plog("INFO", "Counting circuit failures")
+ elif p == "bwauthbestratio=1":
+ self.use_best_ratio = True
+ plog("INFO", "Choosing larger of sbw vs fbw")
+ elif p.startswith("bwauthkp="):
+ self.K_p = int(p.split("=")[1])/10000.0
+ plog("INFO", "Got K_p=%f from consensus." % self.K_p)
+ elif p.startswith("bwauthti="):
+ self.T_i = (int(p.split("=")[1])/10000.0)
+ plog("INFO", "Got T_i=%f from consensus." % self.T_i)
+ elif p.startswith("bwauthtd="):
+ self.T_d = (int(p.split("=")[1])/10000.0)
+ plog("INFO", "Got T_d=%f from consensus." % self.T_d)
+ elif p.startswith("bwauthtidecay="):
+ self.T_i_decay = (int(p.split("=")[1])/10000.0)
+ plog("INFO", "Got T_i_decay=%f from consensus." % self.T_i_decay)
except:
plog("NOTICE", "Bw auth PID control disabled due to parse error.")
traceback.print_exc()
+ if self.T_i == 0:
+ self.K_i = 0
+ self.K_i_decay = 0
+ else:
+ self.K_i = self.K_p/self.T_i
+ self.K_i_decay = (1.0-self.T_i_decay/self.T_i)
+
+ self.K_d = self.K_p*self.T_d
+
+ plog("INFO", "Got K_p=%f K_i=%f K_d=%f K_i_decay=%f" %
+ (self.K_p, self.K_i, self.K_d, self.K_i_decay))
+
self.bw_weights = {}
try:
bw_weights = re.search("^bandwidth-weights ((?:[\S]+=[\d]+[\s]?)+)",
@@ -355,6 +396,11 @@ def main(argv):
plog("NOTICE", "No scan results yet.")
sys.exit(1)
+ if not cs_junk.use_circ_fails:
+ plog("INFO", "Ignoring circuit failures")
+ for n in nodes.itervalues():
+ n.circ_fail_rate = 0.0
+
if cs_junk.bwauth_pid_control:
# Penalize nodes for circuit failure: it indicates CPU pressure
# TODO: Potentially penalize for stream failure, if we run into
@@ -406,10 +452,7 @@ def main(argv):
if cs_junk.bwauth_pid_control:
# Penalize nodes for circ failure rate
- # n.pid_error = (n.filt_bw*(1.0-n.circ_fail_rate) - true_filt_avg)/true_filt_avg
-
- # FIXME: For now, let's try this larger-ratio thing..
- if n.sbw_ratio > n.fbw_ratio:
+ if cs_junk.use_best_ratio and n.sbw_ratio > n.fbw_ratio:
n.pid_error = (n.strm_bw*(1.0-n.circ_fail_rate) - true_strm_avg)/true_strm_avg
else:
n.pid_error = (n.filt_bw*(1.0-n.circ_fail_rate) - true_filt_avg)/true_filt_avg
@@ -426,9 +469,14 @@ def main(argv):
# Do full feedback if our previous vote > 2.5 weeks old
if n.idhex not in prev_votes.vote_map or \
n.measured_at - prev_votes.vote_map[n.idhex].measured_at > GUARD_SAMPLE_RATE:
- n.new_bw = n.get_pid_bw(prev_votes.vote_map[n.idhex], K_p)
+ n.new_bw = n.get_pid_bw(prev_votes.vote_map[n.idhex],
+ cs_junk.K_p,
+ cs_junk.K_i,
+ cs_junk.K_d,
+ cs_junk.K_i_decay)
else:
pid_error = n.pid_error
+ # FIXME: We possibly lose the pid_error_sum here
n.revert_to_vote(prev_votes.vote_map[n.idhex])
# Don't use feedback here, but we might as well use our
# new measurement against the previous vote.
@@ -436,11 +484,11 @@ def main(argv):
# This should no longer happen
plog("NOTICE", "Zero bw for Guard node "+n.nick+"="+n.idhex)
n.new_bw = prev_votes.vote_map[n.idhex].bw + \
- K_p*prev_votes.vote_map[n.idhex].bw*pid_error
+ cs_junk.K_p*prev_votes.vote_map[n.idhex].bw*pid_error
n.pid_bw = n.new_bw
else:
n.new_bw = prev_votes.vote_map[n.idhex].pid_bw + \
- K_p*prev_votes.vote_map[n.idhex].pid_bw*pid_error
+ cs_junk.K_p*prev_votes.vote_map[n.idhex].pid_bw*pid_error
else:
# Everyone else should be pretty instantenous to respond.
# Full feedback should be fine for them (we hope),
@@ -453,9 +501,16 @@ def main(argv):
("Guard" in prev_consensus[n.idhex].flags \
and "Exit" in prev_consensus[n.idhex].flags):
n.new_bw = n.get_pid_bw(prev_votes.vote_map[n.idhex],
- K_p*(1.0-cs_junk.bw_weights["Wgd"]))
+ cs_junk.K_p*(1.0-cs_junk.bw_weights["Wgd"]),
+ cs_junk.K_i,
+ cs_junk.K_d,
+ cs_junk.K_i_decay)
else:
- n.new_bw = n.get_pid_bw(prev_votes.vote_map[n.idhex], K_p)
+ n.new_bw = n.get_pid_bw(prev_votes.vote_map[n.idhex],
+ cs_junk.K_p,
+ cs_junk.K_i,
+ cs_junk.K_d,
+ cs_junk.K_i_decay)
else:
# Reset values. Don't vote/sample this measurement round.
n.revert_to_vote(prev_votes.vote_map[n.idhex])
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits