[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [tor/main] Support rate limiting of edge connections reads.



commit 819b69244a7d15014c031683a0327e8561cd58e9
Author: David Goulet <dgoulet@xxxxxxxxxxxxxx>
Date:   Tue Sep 28 22:23:32 2021 +0000

    Support rate limiting of edge connections reads.
    
    We only need to rate limit reading on edges for flow control, as per the rate
    that comes in the XON from the other side. When we rate limit reading from the
    edge source to this rate, we will only deliver that fast to the other side,
    thus satisfying its rate request.
    
    Signed-off-by: David Goulet <dgoulet@xxxxxxxxxxxxxx>
---
 src/core/mainloop/connection.c | 50 +++++++++++++++++++++++++++++++++++++-----
 1 file changed, 44 insertions(+), 6 deletions(-)

diff --git a/src/core/mainloop/connection.c b/src/core/mainloop/connection.c
index 79e034fb34..48bea792ae 100644
--- a/src/core/mainloop/connection.c
+++ b/src/core/mainloop/connection.c
@@ -117,6 +117,7 @@
 #include "lib/cc/ctassert.h"
 #include "lib/sandbox/sandbox.h"
 #include "lib/net/buffers_net.h"
+#include "lib/net/address.h"
 #include "lib/tls/tortls.h"
 #include "lib/evloop/compat_libevent.h"
 #include "lib/compress/compress.h"
@@ -612,6 +613,11 @@ entry_connection_new(int type, int socket_family)
     entry_conn->entry_cfg.ipv4_traffic = 1;
   else if (socket_family == AF_INET6)
     entry_conn->entry_cfg.ipv6_traffic = 1;
+
+  /* Initialize the read token bucket to the maximum value which is the same as
+   * no rate limiting. */
+  token_bucket_rw_init(&ENTRY_TO_EDGE_CONN(entry_conn)->bucket, INT32_MAX,
+                       INT32_MAX, monotime_coarse_get_stamp());
   return entry_conn;
 }
 
@@ -623,6 +629,10 @@ edge_connection_new(int type, int socket_family)
   edge_connection_t *edge_conn = tor_malloc_zero(sizeof(edge_connection_t));
   tor_assert(type == CONN_TYPE_EXIT);
   connection_init(time(NULL), TO_CONN(edge_conn), type, socket_family);
+  /* Initialize the read token bucket to the maximum value which is the same as
+   * no rate limiting. */
+  token_bucket_rw_init(&edge_conn->bucket, INT32_MAX, INT32_MAX,
+                       monotime_coarse_get_stamp());
   return edge_conn;
 }
 
@@ -3457,6 +3467,19 @@ connection_bucket_read_limit(connection_t *conn, time_t now)
     base = get_cell_network_size(or_conn->wide_circ_ids);
   }
 
+  /* Edge connection have their own read bucket due to flow control being able
+   * to set a rate limit for them. However, for exit connections, we still need
+   * to honor the global bucket as well. */
+  if (CONN_IS_EDGE(conn)) {
+    const edge_connection_t *edge_conn = CONST_TO_EDGE_CONN(conn);
+    conn_bucket = token_bucket_rw_get_read(&edge_conn->bucket);
+    if (conn->type == CONN_TYPE_EXIT) {
+      /* Decide between our limit and the global one. */
+      goto end;
+    }
+    return conn_bucket;
+  }
+
   if (!connection_is_rate_limited(conn)) {
     /* be willing to read on local conns even if our buckets are empty */
     return conn_bucket>=0 ? conn_bucket : 1<<14;
@@ -3467,6 +3490,7 @@ connection_bucket_read_limit(connection_t *conn, time_t now)
     global_bucket_val = MIN(global_bucket_val, relayed);
   }
 
+ end:
   return connection_bucket_get_share(base, priority,
                                      global_bucket_val, conn_bucket);
 }
@@ -3644,6 +3668,13 @@ connection_buckets_decrement(connection_t *conn, time_t now,
 
   record_num_bytes_transferred_impl(conn, now, num_read, num_written);
 
+  /* Edge connection need to decrement the read side of the bucket used by our
+   * congestion control. */
+  if (CONN_IS_EDGE(conn) && num_read > 0) {
+    edge_connection_t *edge_conn = TO_EDGE_CONN(conn);
+    token_bucket_rw_dec(&edge_conn->bucket, num_read, 0);
+  }
+
   if (!connection_is_rate_limited(conn))
     return; /* local IPs are free */
 
@@ -3697,14 +3728,16 @@ connection_write_bw_exhausted(connection_t *conn, bool is_global_bw)
 void
 connection_consider_empty_read_buckets(connection_t *conn)
 {
+  int is_global = 1;
   const char *reason;
 
-  if (!connection_is_rate_limited(conn))
+  if (CONN_IS_EDGE(conn) &&
+             token_bucket_rw_get_read(&TO_EDGE_CONN(conn)->bucket) <= 0) {
+    reason = "edge connection read bucket exhausted. Pausing.";
+    is_global = false;
+  } else if (!connection_is_rate_limited(conn)) {
     return; /* Always okay. */
-
-  int is_global = 1;
-
-  if (token_bucket_rw_get_read(&global_bucket) <= 0) {
+  } else if (token_bucket_rw_get_read(&global_bucket) <= 0) {
     reason = "global read bucket exhausted. Pausing.";
   } else if (connection_counts_as_relayed_traffic(conn, approx_time()) &&
              token_bucket_rw_get_read(&global_relayed_bucket) <= 0) {
@@ -3714,8 +3747,9 @@ connection_consider_empty_read_buckets(connection_t *conn)
              token_bucket_rw_get_read(&TO_OR_CONN(conn)->bucket) <= 0) {
     reason = "connection read bucket exhausted. Pausing.";
     is_global = false;
-  } else
+  } else {
     return; /* all good, no need to stop it */
+  }
 
   LOG_FN_CONN(conn, (LOG_DEBUG, LD_NET, "%s", reason));
   connection_read_bw_exhausted(conn, is_global);
@@ -3819,6 +3853,10 @@ connection_bucket_refill_single(connection_t *conn, uint32_t now_ts)
     or_connection_t *or_conn = TO_OR_CONN(conn);
     token_bucket_rw_refill(&or_conn->bucket, now_ts);
   }
+
+  if (CONN_IS_EDGE(conn)) {
+    token_bucket_rw_refill(&TO_EDGE_CONN(conn)->bucket, now_ts);
+  }
 }
 
 /**



_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits