[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[or-cvs] r9881: Add a separate set of token buckets for relayed traffic. Rig (in tor/trunk: doc src/or)



Author: arma
Date: 2007-03-19 22:55:31 -0400 (Mon, 19 Mar 2007)
New Revision: 9881

Modified:
   tor/trunk/doc/tor.1.in
   tor/trunk/src/or/config.c
   tor/trunk/src/or/connection.c
   tor/trunk/src/or/main.c
   tor/trunk/src/or/or.h
Log:
Add a separate set of token buckets for relayed traffic. Right
now that's just defined as answers to directory requests.


Modified: tor/trunk/doc/tor.1.in
===================================================================
--- tor/trunk/doc/tor.1.in	2007-03-20 02:10:18 UTC (rev 9880)
+++ tor/trunk/doc/tor.1.in	2007-03-20 02:55:31 UTC (rev 9881)
@@ -65,8 +65,7 @@
 .TP
 \fBBandwidthBurst \fR\fIN\fR \fBbytes\fR|\fBKB\fR|\fBMB\fR|\fBGB\fR|\fBTB\fP
 Limit the maximum token bucket size (also known as the burst) to the
-given number of bytes in each direction. This value should be at least
-twice your BandwidthRate. (Default: 6 MB)
+given number of bytes in each direction. (Default: 6 MB)
 .LP
 .TP
 \fBMaxAdvertisedBandwidth \fR\fIN\fR \fBbytes\fR|\fBKB\fR|\fBMB\fR|\fBGB\fR|\fBTB\fP
@@ -77,6 +76,20 @@
 server without impacting network performance.
 .LP
 .TP
+\fBRelayBandwidthRate \fR\fIN\fR \fBbytes\fR|\fBKB\fR|\fBMB\fR|\fBGB\fR|\fBTB\fP
+If defined, a separate token bucket limits the average incoming bandwidth
+usage for _relayed traffic_ on this node to the specified number of
+bytes per second, and the average outgoing bandwidth usage to that same
+value. Relayed traffic is currently defined as answers to directory
+requests, but that may change. (Default: 0)
+.LP
+.TP
+\fBRelayBandwidthBurst \fR\fIN\fR \fBbytes\fR|\fBKB\fR|\fBMB\fR|\fBGB\fR|\fBTB\fP
+Limit the maximum token bucket size (also known as the burst) for
+_relayed traffic_ to the
+given number of bytes in each direction. (Default: 0)
+.LP
+.TP
 \fBConnLimit \fR\fINUM\fP
 The minimum number of file descriptors that must be available to
 the Tor process before it will start. Tor will ask the OS for as

Modified: tor/trunk/src/or/config.c
===================================================================
--- tor/trunk/src/or/config.c	2007-03-20 02:10:18 UTC (rev 9880)
+++ tor/trunk/src/or/config.c	2007-03-20 02:55:31 UTC (rev 9881)
@@ -216,6 +216,8 @@
   VAR("RecommendedClientVersions", LINELIST, RecommendedClientVersions,  NULL),
   VAR("RecommendedServerVersions", LINELIST, RecommendedServerVersions,  NULL),
   VAR("RedirectExit",        LINELIST, RedirectExit,         NULL),
+  VAR("RelayBandwidthBurst", MEMUNIT,  RelayBandwidthBurst,  "0"),
+  VAR("RelayBandwidthRate",  MEMUNIT,  RelayBandwidthRate,   "0"),
   VAR("RendExcludeNodes",    STRING,   RendExcludeNodes,     NULL),
   VAR("RendNodes",           STRING,   RendNodes,            NULL),
   VAR("RendPostPeriod",      INTERVAL, RendPostPeriod,       "1 hour"),
@@ -2666,6 +2668,19 @@
       *msg = tor_strdup(r >= 0 ? buf : "internal error");
       return -1;
     }
+    if (options->RelayBandwidthRate > options->RelayBandwidthBurst)
+      REJECT("RelayBandwidthBurst must be at least equal "
+             "to RelayBandwidthRate.");
+    if (options->RelayBandwidthRate &&
+      options->RelayBandwidthRate < ROUTER_REQUIRED_MIN_BANDWIDTH) {
+      r = tor_snprintf(buf, sizeof(buf),
+                       "RelayBandwidthRate is set to %d bytes/second. "
+                       "For servers, it must be at least %d.",
+                       (int)options->RelayBandwidthRate,
+                       ROUTER_REQUIRED_MIN_BANDWIDTH);
+      *msg = tor_strdup(r >= 0 ? buf : "internal error");
+      return -1;
+    }
   }
 
   if (options->BandwidthRate > options->BandwidthBurst)

Modified: tor/trunk/src/or/connection.c
===================================================================
--- tor/trunk/src/or/connection.c	2007-03-20 02:10:18 UTC (rev 9880)
+++ tor/trunk/src/or/connection.c	2007-03-20 02:55:31 UTC (rev 9881)
@@ -1105,12 +1105,29 @@
 }
 
 extern int global_read_bucket, global_write_bucket;
+extern int global_relayed_read_bucket, global_relayed_write_bucket;
 
-/** Did our global write bucket run dry last second? If so, we are
- * likely to run dry again this second, so be stingy with the tokens
- * we just put in. */
-static int global_write_bucket_empty_last_second = 0;
+/** Did our either global write bucket run dry last second? If so,
+ * we are likely to run dry again this second, so be stingy with the
+ * tokens we just put in. */
+static int write_buckets_empty_last_second = 0;
 
+/** Return 1 if <b>conn</b> should use tokens from the "relayed"
+ * bandwidth rates, else 0. Currently, only OR conns with bandwidth
+ * class 1, and directory conns that are serving data out, count.
+ */
+static int
+connection_counts_as_relayed_traffic(connection_t *conn)
+{
+#if 0
+  if (conn->type == CONN_TYPE_OR && TO_OR_CONN(conn)->bandwidth_class)
+    return 1;
+#endif
+  if (conn->type == CONN_TYPE_DIR && DIR_CONN_IS_SERVER(conn))
+    return 1;
+  return 0;
+}
+
 /** Helper function to decide how many bytes out of <b>global_bucket</b>
  * we're willing to use for this transaction. <b>base</b> is the size
  * of a cell on the network; <b>priority</b> says whether we should
@@ -1153,16 +1170,25 @@
                CELL_NETWORK_SIZE : RELAY_PAYLOAD_SIZE;
   int priority = conn->type != CONN_TYPE_DIR;
   int conn_bucket = -1;
-  if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) {
+  int global_bucket = global_read_bucket;
+
+  if (connection_speaks_cells(conn)) {
     or_connection_t *or_conn = TO_OR_CONN(conn);
-    conn_bucket = or_conn->read_bucket;
+    if (conn->state == OR_CONN_STATE_OPEN)
+      conn_bucket = or_conn->read_bucket;
   }
+
   if (!connection_is_rate_limited(conn)) {
     /* be willing to read on local conns even if our buckets are empty */
     return conn_bucket>=0 ? conn_bucket : 1<<14;
   }
+
+  if (connection_counts_as_relayed_traffic(conn) &&
+      global_relayed_read_bucket <= global_read_bucket)
+    global_bucket = global_relayed_read_bucket;
+
   return connection_bucket_round_robin(base, priority,
-                                       global_read_bucket, conn_bucket);
+                                       global_bucket, conn_bucket);
 }
 
 /** How many bytes at most can we write onto this connection? */
@@ -1172,24 +1198,31 @@
   int base = connection_speaks_cells(conn) ?
                CELL_NETWORK_SIZE : RELAY_PAYLOAD_SIZE;
   int priority = conn->type != CONN_TYPE_DIR;
+  int global_bucket = global_write_bucket;
 
   if (!connection_is_rate_limited(conn)) {
     /* be willing to write to local conns even if our buckets are empty */
     return conn->outbuf_flushlen;
   }
-  return connection_bucket_round_robin(base, priority, global_write_bucket,
+
+  if (connection_counts_as_relayed_traffic(conn) &&
+      global_relayed_write_bucket <= global_write_bucket)
+    global_bucket = global_relayed_write_bucket;
+
+  return connection_bucket_round_robin(base, priority, global_bucket,
                                        conn->outbuf_flushlen);
 }
 
-/** Return 1 if the global write bucket is low enough that we shouldn't
- * send <b>attempt</b> bytes of low-priority directory stuff out to
- * <b>conn</b>. Else return 0.
+/** Return 1 if the global write buckets are low enough that we
+ * shouldn't send <b>attempt</b> bytes of low-priority directory stuff
+ * out to <b>conn</b>. Else return 0.
 
  * Priority is 1 for v1 requests (directories and running-routers),
  * and 2 for v2 requests (statuses and descriptors). But see FFFF in
  * directory_handle_command_get() for why we don't use priority 2 yet.
  *
  * There are a lot of parameters we could use here:
+ * - global_relayed_write_bucket. Low is bad.
  * - global_write_bucket. Low is bad.
  * - bandwidthrate. Low is bad.
  * - bandwidthburst. Not a big factor?
@@ -1203,22 +1236,26 @@
 int
 global_write_bucket_low(connection_t *conn, size_t attempt, int priority)
 {
+  int smaller_bucket = global_write_bucket < global_relayed_write_bucket ?
+                       global_write_bucket : global_relayed_write_bucket;
   if (authdir_mode(get_options()) && priority>1)
     return 0; /* there's always room to answer v2 if we're an auth dir */
 
   if (!connection_is_rate_limited(conn))
     return 0; /* local conns don't get limited */
 
-  if (global_write_bucket < (int)attempt)
+  if (smaller_bucket < (int)attempt)
     return 1; /* not enough space no matter the priority */
 
-  if (global_write_bucket_empty_last_second)
+  if (write_buckets_empty_last_second)
     return 1; /* we're already hitting our limits, no more please */
 
   if (priority == 1) { /* old-style v1 query */
     /* Could we handle *two* of these requests within the next two seconds? */
-    int64_t can_write = (int64_t)global_write_bucket
-      + 2*get_options()->BandwidthRate;
+    or_options_t *options = get_options();
+    int64_t can_write = (int64_t)smaller_bucket
+      + 2*(options->RelayBandwidthRate ? options->RelayBandwidthRate :
+                                         options->BandwidthRate);
     if (can_write < 2*(int64_t)attempt)
       return 1;
   } else { /* v2 query */
@@ -1227,14 +1264,28 @@
   return 0;
 }
 
-/** We just read num_read onto conn. Decrement buckets appropriately. */
+/** We just read num_read and wrote num_written onto conn.
+ * Decrement buckets appropriately. */
 static void
-connection_read_bucket_decrement(connection_t *conn, int num_read)
+connection_buckets_decrement(connection_t *conn, time_t now,
+                             int num_read, int num_written)
 {
+  if (!connection_is_rate_limited(conn))
+    return; /* local IPs are free */
+
+  if (num_read > 0)
+    rep_hist_note_bytes_read(num_read, now);
+  if (num_written > 0)
+    rep_hist_note_bytes_written(num_written, now);
+
+  if (connection_counts_as_relayed_traffic(conn)) {
+    global_relayed_read_bucket -= num_read;
+    global_relayed_write_bucket -= num_written;
+  }
   global_read_bucket -= num_read;
-  if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN) {
+  global_write_bucket -= num_written;
+  if (connection_speaks_cells(conn) && conn->state == OR_CONN_STATE_OPEN)
     TO_OR_CONN(conn)->read_bucket -= num_read;
-  }
 }
 
 /** If we have exhausted our global buckets, or the buckets for conn,
@@ -1242,21 +1293,23 @@
 static void
 connection_consider_empty_read_buckets(connection_t *conn)
 {
+  const char *reason;
+
   if (global_read_bucket <= 0) {
-    LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET,
-                       "global read bucket exhausted. Pausing."));
-    conn->wants_to_read = 1;
-    connection_stop_reading(conn);
-    return;
-  }
-  if (connection_speaks_cells(conn) &&
-      conn->state == OR_CONN_STATE_OPEN &&
-      TO_OR_CONN(conn)->read_bucket <= 0) {
-    LOG_FN_CONN(conn,
-                (LOG_DEBUG,LD_NET,"read bucket exhausted. Pausing."));
-    conn->wants_to_read = 1;
-    connection_stop_reading(conn);
-  }
+    reason = "global read bucket exhausted. Pausing.";
+  } else if (connection_counts_as_relayed_traffic(conn) &&
+             global_relayed_read_bucket <= 0) {
+    reason = "global relayed read bucket exhausted. Pausing.";
+  } else if (connection_speaks_cells(conn) &&
+             conn->state == OR_CONN_STATE_OPEN &&
+             TO_OR_CONN(conn)->read_bucket <= 0) {
+    reason = "connection read bucket exhausted. Pausing.";
+  } else
+    return; /* all good, no need to stop it */
+
+  LOG_FN_CONN(conn, (LOG_DEBUG, LD_NET, "%s", reason));
+  conn->wants_to_read = 1;
+  connection_stop_reading(conn);
 }
 
 /** If we have exhausted our global buckets, or the buckets for conn,
@@ -1264,26 +1317,28 @@
 static void
 connection_consider_empty_write_buckets(connection_t *conn)
 {
+  const char *reason;
+
   if (global_write_bucket <= 0) {
-    LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET,
-                       "global write bucket exhausted. Pausing."));
-    conn->wants_to_write = 1;
-    connection_stop_writing(conn);
-    return;
-  }
+    reason = "global write bucket exhausted. Pausing.";
+  } else if (connection_counts_as_relayed_traffic(conn) &&
+             global_relayed_write_bucket <= 0) {
+    reason = "global relayed write bucket exhausted. Pausing.";
 #if 0
-  if (connection_speaks_cells(conn) &&
-      conn->state == OR_CONN_STATE_OPEN &&
-      TO_OR_CONN(conn)->write_bucket <= 0) {
-    LOG_FN_CONN(conn,
-                (LOG_DEBUG,LD_NET,"write bucket exhausted. Pausing."));
-    conn->wants_to_write = 1;
-    connection_stop_writing(conn);
-  }
+  } else if (connection_speaks_cells(conn) &&
+             conn->state == OR_CONN_STATE_OPEN &&
+             TO_OR_CONN(conn)->write_bucket <= 0) {
+    reason = "connection write bucket exhausted. Pausing.";
 #endif
+  } else
+    return; /* all good, no need to stop it */
+
+  LOG_FN_CONN(conn, (LOG_DEBUG, LD_NET, "%s", reason));
+  conn->wants_to_write = 1;
+  connection_stop_writing(conn);
 }
 
-/** Initialize the global read bucket to options->BandwidthBurst. */
+/** Initialize the global read bucket to options-\>BandwidthBurst. */
 void
 connection_bucket_init(void)
 {
@@ -1291,8 +1346,28 @@
   /* start it at max traffic */
   global_read_bucket = (int)options->BandwidthBurst;
   global_write_bucket = (int)options->BandwidthBurst;
+  if (options->RelayBandwidthRate) {
+    global_relayed_read_bucket = (int)options->RelayBandwidthBurst;
+    global_relayed_write_bucket = (int)options->RelayBandwidthBurst;
+  } else {
+    global_relayed_read_bucket = (int)options->BandwidthBurst;
+    global_relayed_write_bucket = (int)options->BandwidthBurst;
+  }
 }
 
+static void
+connection_bucket_refill_helper(int *bucket, int rate, int burst,
+                                int seconds_elapsed, const char *name)
+{
+  if (*bucket < burst) {
+    *bucket += rate*seconds_elapsed;
+    if (*bucket > burst)
+      *bucket = burst;
+    log(LOG_DEBUG, LD_NET,"%s now %d.", name, *bucket);
+  }
+}
+
+
 /** A second has rolled over; increment buckets appropriately. */
 void
 connection_bucket_refill(int seconds_elapsed)
@@ -1301,23 +1376,36 @@
   connection_t *conn;
   connection_t **carray;
   or_options_t *options = get_options();
+  int relayrate, relayburst;
 
+  if (options->RelayBandwidthRate) {
+    relayrate = (int)options->RelayBandwidthRate;
+    relayburst = (int)options->RelayBandwidthBurst;
+  } else {
+    relayrate = (int)options->BandwidthRate;
+    relayburst = (int)options->BandwidthBurst;
+  }
+
   tor_assert(seconds_elapsed >= 0);
 
+  write_buckets_empty_last_second =
+    global_relayed_write_bucket == 0 || global_write_bucket == 0;
+
   /* refill the global buckets */
-  if (global_read_bucket < (int)options->BandwidthBurst) {
-    global_read_bucket += (int)options->BandwidthRate*seconds_elapsed;
-    if (global_read_bucket > (int)options->BandwidthBurst)
-      global_read_bucket = (int)options->BandwidthBurst;
-    log(LOG_DEBUG, LD_NET,"global_read_bucket now %d.", global_read_bucket);
-  }
-  if (global_write_bucket < (int)options->BandwidthBurst) {
-    global_write_bucket_empty_last_second = global_write_bucket == 0;
-    global_write_bucket += (int)options->BandwidthRate*seconds_elapsed;
-    if (global_write_bucket > (int)options->BandwidthBurst)
-      global_write_bucket = (int)options->BandwidthBurst;
-    log(LOG_DEBUG, LD_NET,"global_write_bucket now %d.", global_write_bucket);
-  }
+  connection_bucket_refill_helper(&global_read_bucket,
+                                  (int)options->BandwidthRate,
+                                  (int)options->BandwidthBurst,
+                                  seconds_elapsed, "global_read_bucket");
+  connection_bucket_refill_helper(&global_write_bucket,
+                                  (int)options->BandwidthRate,
+                                  (int)options->BandwidthBurst,
+                                  seconds_elapsed, "global_write_bucket");
+  connection_bucket_refill_helper(&global_relayed_read_bucket,
+                                  relayrate, relayburst, seconds_elapsed,
+                                  "global_relayed_read_bucket");
+  connection_bucket_refill_helper(&global_relayed_write_bucket,
+                                  relayrate, relayburst, seconds_elapsed,
+                                  "global_relayed_write_bucket");
 
   /* refill the per-connection buckets */
   get_connection_array(&carray,&n);
@@ -1337,19 +1425,25 @@
 
     if (conn->wants_to_read == 1 /* it's marked to turn reading back on now */
         && global_read_bucket > 0 /* and we're allowed to read */
+        && (!connection_counts_as_relayed_traffic(conn) ||
+            global_relayed_read_bucket > 0) /* even if we're relayed traffic */
         && (!connection_speaks_cells(conn) ||
             conn->state != OR_CONN_STATE_OPEN ||
             TO_OR_CONN(conn)->read_bucket > 0)) {
         /* and either a non-cell conn or a cell conn with non-empty bucket */
       LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET,
-                         "waking up conn (fd %d) for read",conn->s));
+                         "waking up conn (fd %d) for read", conn->s));
       conn->wants_to_read = 0;
       connection_start_reading(conn);
     }
-    if (conn->wants_to_write == 1 &&
-        global_write_bucket > 0) { /* and we're allowed to write */
+
+    if (conn->wants_to_write == 1
+        && global_write_bucket > 0 /* and we're allowed to write */
+        && (!connection_counts_as_relayed_traffic(conn) ||
+            global_relayed_write_bucket > 0)) {
+            /* even if we're relayed traffic */
       LOG_FN_CONN(conn, (LOG_DEBUG,LD_NET,
-                         "waking up conn (fd %d) for write",conn->s));
+                         "waking up conn (fd %d) for write", conn->s));
       conn->wants_to_write = 0;
       connection_start_writing(conn);
     }
@@ -1561,18 +1655,7 @@
     edge_conn->n_read += n_read;
   }
 
-  if (connection_is_rate_limited(conn)) {
-    /* For non-local IPs, remember if we flushed any bytes over the wire. */
-    time_t now = time(NULL);
-    if (n_read > 0) {
-      rep_hist_note_bytes_read(n_read, now);
-      connection_read_bucket_decrement(conn, n_read);
-    }
-    if (n_written > 0) {
-      rep_hist_note_bytes_written(n_written, now);
-      global_write_bucket -= n_written;
-    }
-  }
+  connection_buckets_decrement(conn, time(NULL), n_read, n_written);
 
   if (more_to_read && result == at_most) {
     bytes_in_buf = buf_capacity(conn->inbuf) - buf_datalen(conn->inbuf);
@@ -1762,18 +1845,7 @@
     edge_conn->n_written += n_written;
   }
 
-  if (connection_is_rate_limited(conn)) {
-    /* For non-local IPs, remember if we flushed any bytes over the wire. */
-    time_t now = time(NULL);
-    if (n_written > 0) {
-      rep_hist_note_bytes_written(n_written, now);
-      global_write_bucket -= n_written;
-    }
-    if (n_read > 0) {
-      rep_hist_note_bytes_read(n_read, now);
-      connection_read_bucket_decrement(conn, n_read);
-    }
-  }
+  connection_buckets_decrement(conn, time(NULL), n_read, n_written);
 
   if (result > 0) {
     /* If we wrote any bytes from our buffer, then call the appropriate

Modified: tor/trunk/src/or/main.c
===================================================================
--- tor/trunk/src/or/main.c	2007-03-20 02:10:18 UTC (rev 9880)
+++ tor/trunk/src/or/main.c	2007-03-20 02:55:31 UTC (rev 9881)
@@ -34,12 +34,18 @@
 int global_read_bucket; /**< Max number of bytes I can read this second. */
 int global_write_bucket; /**< Max number of bytes I can write this second. */
 
+/** Max number of relayed (bandwidth class 1) bytes I can read this second. */
+int global_relayed_read_bucket;
+/** Max number of relayed (bandwidth class 1) bytes I can write this second. */
+int global_relayed_write_bucket;
+
 /** What was the read bucket before the last call to prepare_for_pool?
  * (used to determine how many bytes we've read). */
 static int stats_prev_global_read_bucket;
 /** What was the write bucket before the last call to prepare_for_pool?
  * (used to determine how many bytes we've written). */
 static int stats_prev_global_write_bucket;
+/* XXX we might want to keep stats about global_relayed_*_bucket too. Or not.*/
 /** How many bytes have we read/written since we started the process? */
 static uint64_t stats_n_bytes_read = 0;
 static uint64_t stats_n_bytes_written = 0;

Modified: tor/trunk/src/or/or.h
===================================================================
--- tor/trunk/src/or/or.h	2007-03-20 02:10:18 UTC (rev 9880)
+++ tor/trunk/src/or/or.h	2007-03-20 02:55:31 UTC (rev 9881)
@@ -804,7 +804,7 @@
   int n_circuits; /**< How many circuits use this connection as p_conn or
                    * n_conn ? */
   struct or_connection_t *next_with_same_id; /**< Next connection with same
-                                           * identity digest as this one. */
+                                              * identity digest as this one. */
   /** Linked list of bridged dirserver connections that can't write until
    * this connection's outbuf is less full. */
   struct dir_connection_t *blocked_dir_connections;
@@ -1697,6 +1697,10 @@
                             * to use in a second? */
   uint64_t MaxAdvertisedBandwidth; /**< How much bandwidth are we willing to
                                     * tell people we have? */
+  uint64_t RelayBandwidthRate; /**< How much bandwidth, on average, are we
+                                 * willing to use for all relayed conns? */
+  uint64_t RelayBandwidthBurst; /**< How much bandwidth, at maximum, will we
+                                 * use in a second for all relayed conns? */
   int NumCpus; /**< How many CPUs should we try to use? */
   int RunTesting; /**< If true, create testing circuits to measure how well the
                    * other ORs are running. */