[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [tor/main] Prop#324: Hook up flow control



commit 0422eb26a70fc1450cc6b57902f189edc4eed10a
Author: Mike Perry <mikeperry-git@xxxxxxxxxxxxxx>
Date:   Tue Aug 10 21:35:46 2021 +0000

    Prop#324: Hook up flow control
---
 src/app/main/main.c                  |  2 ++
 src/core/mainloop/connection.c       | 16 ++++++++++--
 src/core/mainloop/mainloop.c         |  7 ++++++
 src/core/or/or.h                     |  3 +++
 src/core/or/relay.c                  | 47 +++++++++++++++++++++++++++++++++---
 src/core/or/sendme.c                 | 28 +++++++++++++++++++--
 src/core/or/sendme.h                 |  2 +-
 src/feature/nodelist/networkstatus.c |  2 ++
 8 files changed, 99 insertions(+), 8 deletions(-)

diff --git a/src/app/main/main.c b/src/app/main/main.c
index 89564490e6..0742abe70a 100644
--- a/src/app/main/main.c
+++ b/src/app/main/main.c
@@ -27,6 +27,7 @@
 #include "core/or/channel.h"
 #include "core/or/channelpadding.h"
 #include "core/or/circuitpadding.h"
+#include "core/or/congestion_control_flow.h"
 #include "core/or/circuitlist.h"
 #include "core/or/command.h"
 #include "core/or/connection_or.h"
@@ -630,6 +631,7 @@ tor_init(int argc, char *argv[])
    * until we get a consensus */
   channelpadding_new_consensus_params(NULL);
   circpad_new_consensus_params(NULL);
+  flow_control_new_consensus_params(NULL);
 
   /* Initialize circuit padding to defaults+torrc until we get a consensus */
   circpad_machines_init();
diff --git a/src/core/mainloop/connection.c b/src/core/mainloop/connection.c
index 48bea792ae..9271a70914 100644
--- a/src/core/mainloop/connection.c
+++ b/src/core/mainloop/connection.c
@@ -147,6 +147,8 @@
 #include "feature/nodelist/routerinfo_st.h"
 #include "core/or/socks_request_st.h"
 
+#include "core/or/congestion_control_flow.h"
+
 /**
  * On Windows and Linux we cannot reliably bind() a socket to an
  * address and port if: 1) There's already a socket bound to wildcard
@@ -4594,9 +4596,9 @@ connection_handle_write_impl(connection_t *conn, int force)
       !dont_stop_writing) { /* it's done flushing */
     if (connection_finished_flushing(conn) < 0) {
       /* already marked */
-      return -1;
+      goto err;
     }
-    return 0;
+    goto done;
   }
 
   /* Call even if result is 0, since the global write bucket may
@@ -4606,7 +4608,17 @@ connection_handle_write_impl(connection_t *conn, int force)
   if (n_read > 0 && connection_is_reading(conn))
     connection_consider_empty_read_buckets(conn);
 
+ done:
+  /* If this is an edge connection with congestion control, check to see
+   * if it is time to send an xon */
+  if (conn_uses_flow_control(conn)) {
+    flow_control_decide_xon(TO_EDGE_CONN(conn), n_written);
+  }
+
   return 0;
+
+ err:
+  return -1;
 }
 
 /* DOCDOC connection_handle_write */
diff --git a/src/core/mainloop/mainloop.c b/src/core/mainloop/mainloop.c
index 37b53db92a..cd57dea3d4 100644
--- a/src/core/mainloop/mainloop.c
+++ b/src/core/mainloop/mainloop.c
@@ -641,6 +641,13 @@ connection_start_reading,(connection_t *conn))
     if (connection_should_read_from_linked_conn(conn))
       connection_start_reading_from_linked_conn(conn);
   } else {
+    if (CONN_IS_EDGE(conn) && TO_EDGE_CONN(conn)->xoff_received) {
+      /* We should not get called here if we're waiting for an XON, but
+       * belt-and-suspenders */
+      log_notice(LD_NET,
+                 "Request to start reading on an edgeconn blocked with XOFF");
+      return;
+    }
     if (event_add(conn->read_event, NULL))
       log_warn(LD_NET, "Error from libevent setting read event state for %d "
                "to watched: %s",
diff --git a/src/core/or/or.h b/src/core/or/or.h
index 99948f26e2..ad82130301 100644
--- a/src/core/or/or.h
+++ b/src/core/or/or.h
@@ -210,6 +210,9 @@ struct curve25519_public_key_t;
 #define RELAY_COMMAND_PADDING_NEGOTIATE 41
 #define RELAY_COMMAND_PADDING_NEGOTIATED 42
 
+#define RELAY_COMMAND_XOFF 43
+#define RELAY_COMMAND_XON 44
+
 /* Reasons why an OR connection is closed. */
 #define END_OR_CONN_REASON_DONE           1
 #define END_OR_CONN_REASON_REFUSED        2 /* connection refused */
diff --git a/src/core/or/relay.c b/src/core/or/relay.c
index e3d41d7bf0..0e889eb348 100644
--- a/src/core/or/relay.c
+++ b/src/core/or/relay.c
@@ -98,6 +98,7 @@
 #include "core/or/socks_request_st.h"
 #include "core/or/sendme.h"
 #include "core/or/congestion_control_common.h"
+#include "core/or/congestion_control_flow.h"
 
 static edge_connection_t *relay_lookup_conn(circuit_t *circ, cell_t *cell,
                                             cell_direction_t cell_direction,
@@ -1739,6 +1740,44 @@ handle_relay_cell_command(cell_t *cell, circuit_t *circ,
         sendme_connection_edge_consider_sending(conn);
       }
 
+      return 0;
+    case RELAY_COMMAND_XOFF:
+      if (!conn) {
+        if (CIRCUIT_IS_ORIGIN(circ)) {
+          origin_circuit_t *ocirc = TO_ORIGIN_CIRCUIT(circ);
+          if (relay_crypt_from_last_hop(ocirc, layer_hint) &&
+              connection_half_edge_is_valid_data(ocirc->half_streams,
+                                                rh->stream_id)) {
+            circuit_read_valid_data(ocirc, rh->length);
+          }
+        }
+        return 0;
+      }
+
+      if (circuit_process_stream_xoff(conn, layer_hint, cell)) {
+        if (CIRCUIT_IS_ORIGIN(circ)) {
+          circuit_read_valid_data(TO_ORIGIN_CIRCUIT(circ), rh->length);
+        }
+      }
+      return 0;
+    case RELAY_COMMAND_XON:
+      if (!conn) {
+        if (CIRCUIT_IS_ORIGIN(circ)) {
+          origin_circuit_t *ocirc = TO_ORIGIN_CIRCUIT(circ);
+          if (relay_crypt_from_last_hop(ocirc, layer_hint) &&
+              connection_half_edge_is_valid_data(ocirc->half_streams,
+                                                rh->stream_id)) {
+            circuit_read_valid_data(ocirc, rh->length);
+          }
+        }
+        return 0;
+      }
+
+      if (circuit_process_stream_xon(conn, layer_hint, cell)) {
+        if (CIRCUIT_IS_ORIGIN(circ)) {
+          circuit_read_valid_data(TO_ORIGIN_CIRCUIT(circ), rh->length);
+        }
+      }
       return 0;
     case RELAY_COMMAND_END:
       reason = rh->length > 0 ?
@@ -2287,7 +2326,7 @@ connection_edge_package_raw_inbuf(edge_connection_t *conn, int package_partial,
   }
 
   /* Handle the stream-level SENDME package window. */
-  if (sendme_note_stream_data_packaged(conn) < 0) {
+  if (sendme_note_stream_data_packaged(conn, length) < 0) {
     connection_stop_reading(TO_CONN(conn));
     log_debug(domain,"conn->package_window reached 0.");
     circuit_consider_stop_edge_reading(circ, cpath_layer);
@@ -2402,7 +2441,8 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
   /* Activate reading starting from the chosen stream */
   for (conn=chosen_stream; conn; conn = conn->next_stream) {
     /* Start reading for the streams starting from here */
-    if (conn->base_.marked_for_close || conn->package_window <= 0)
+    if (conn->base_.marked_for_close || conn->package_window <= 0 ||
+        conn->xoff_received)
       continue;
     if (!layer_hint || conn->cpath_layer == layer_hint) {
       connection_start_reading(TO_CONN(conn));
@@ -2413,7 +2453,8 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
   }
   /* Go back and do the ones we skipped, circular-style */
   for (conn = first_conn; conn != chosen_stream; conn = conn->next_stream) {
-    if (conn->base_.marked_for_close || conn->package_window <= 0)
+    if (conn->base_.marked_for_close || conn->package_window <= 0 ||
+        conn->xoff_received)
       continue;
     if (!layer_hint || conn->cpath_layer == layer_hint) {
       connection_start_reading(TO_CONN(conn));
diff --git a/src/core/or/sendme.c b/src/core/or/sendme.c
index 900490a892..ee670f9d51 100644
--- a/src/core/or/sendme.c
+++ b/src/core/or/sendme.c
@@ -22,6 +22,7 @@
 #include "core/or/relay.h"
 #include "core/or/sendme.h"
 #include "core/or/congestion_control_common.h"
+#include "core/or/congestion_control_flow.h"
 #include "feature/nodelist/networkstatus.h"
 #include "lib/ctime/di_ops.h"
 #include "trunnel/sendme_cell.h"
@@ -370,6 +371,10 @@ sendme_connection_edge_consider_sending(edge_connection_t *conn)
 
   int log_domain = TO_CONN(conn)->type == CONN_TYPE_AP ? LD_APP : LD_EXIT;
 
+  /* If we use flow control, we do not send stream sendmes */
+  if (edge_uses_flow_control(conn))
+    goto end;
+
   /* Don't send it if we still have data to deliver. */
   if (connection_outbuf_too_full(TO_CONN(conn))) {
     goto end;
@@ -546,6 +551,12 @@ sendme_process_stream_level(edge_connection_t *conn, circuit_t *circ,
   tor_assert(conn);
   tor_assert(circ);
 
+  if (edge_uses_flow_control(conn)) {
+    log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
+           "Congestion control got stream sendme");
+    return -END_CIRC_REASON_TORPROTOCOL;
+  }
+
   /* Don't allow the other endpoint to request more than our maximum (i.e.
    * initial) stream SENDME window worth of data. Well-behaved stock clients
    * will not request more than this max (as per the check in the while loop
@@ -603,7 +614,12 @@ int
 sendme_stream_data_received(edge_connection_t *conn)
 {
   tor_assert(conn);
-  return --conn->deliver_window;
+
+  if (edge_uses_flow_control(conn)) {
+    return flow_control_decide_xoff(conn);
+  } else {
+    return --conn->deliver_window;
+  }
 }
 
 /* Called when a relay DATA cell is packaged on the given circuit. If
@@ -651,10 +667,18 @@ sendme_note_circuit_data_packaged(circuit_t *circ, crypt_path_t *layer_hint)
 /* Called when a relay DATA cell is packaged for the given edge connection
  * conn. Update the package window and return its new value. */
 int
-sendme_note_stream_data_packaged(edge_connection_t *conn)
+sendme_note_stream_data_packaged(edge_connection_t *conn, size_t len)
 {
   tor_assert(conn);
 
+  if (edge_uses_flow_control(conn)) {
+    flow_control_note_sent_data(conn, len);
+    if (conn->xoff_received)
+      return -1;
+    else
+      return 1;
+  }
+
   --conn->package_window;
   log_debug(LD_APP, "Stream package_window now %d.", conn->package_window);
   return conn->package_window;
diff --git a/src/core/or/sendme.h b/src/core/or/sendme.h
index c224d0a921..2abec91a91 100644
--- a/src/core/or/sendme.h
+++ b/src/core/or/sendme.h
@@ -33,7 +33,7 @@ int sendme_circuit_data_received(circuit_t *circ, crypt_path_t *layer_hint);
 /* Update package window functions. */
 int sendme_note_circuit_data_packaged(circuit_t *circ,
                                       crypt_path_t *layer_hint);
-int sendme_note_stream_data_packaged(edge_connection_t *conn);
+int sendme_note_stream_data_packaged(edge_connection_t *conn, size_t len);
 
 /* Record cell digest on circuit. */
 void sendme_record_cell_digest_on_circ(circuit_t *circ, crypt_path_t *cpath);
diff --git a/src/feature/nodelist/networkstatus.c b/src/feature/nodelist/networkstatus.c
index 7a1e73ef60..0138dff033 100644
--- a/src/feature/nodelist/networkstatus.c
+++ b/src/feature/nodelist/networkstatus.c
@@ -45,6 +45,7 @@
 #include "core/or/channel.h"
 #include "core/or/channelpadding.h"
 #include "core/or/circuitpadding.h"
+#include "core/or/congestion_control_flow.h"
 #include "core/or/circuitmux.h"
 #include "core/or/circuitmux_ewma.h"
 #include "core/or/circuitstats.h"
@@ -1699,6 +1700,7 @@ notify_after_networkstatus_changes(void)
   channelpadding_new_consensus_params(c);
   circpad_new_consensus_params(c);
   router_new_consensus_params(c);
+  flow_control_new_consensus_params(c);
 
   /* Maintenance of our L2 guard list */
   maintain_layer2_guards();



_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits