[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [tor] 12/20: Refactor stream blocking due to channel cell queues



This is an automated email from the git hooks/post-receive script.

ahf pushed a commit to branch main
in repository tor.

commit 21c861bfa3188444798a35e21f26579dd910a452
Author: Mike Perry <mikeperry-git@xxxxxxxxxxxxxx>
AuthorDate: Fri Jan 20 19:14:33 2023 +0000

    Refactor stream blocking due to channel cell queues
    
    Streams can get blocked on a circuit in two ways:
      1. When the circuit package window is full
      2. When the channel's cell queue is too high
    
    Conflux needs to decouple stream blocking from both of these conditions,
    because streams can continue on another circuit, even if the primary circuit
    is blocked for either of these cases.
    
    However, both conflux and congestion control need to know if the channel's
    cell queue hit the highwatermark and is still draining, because this condition
    is used by those components, independent of stream state.
    
    Therefore, this commit renames the 'streams_blocked_on_chan' variable to
    signify that it refers to the cell queue state, and also refactors the actual
    stream blocking bits out, so they can be handled separately if conflux is
    present.
---
 src/core/mainloop/mainloop.c            |  2 +-
 src/core/mainloop/mainloop.h            |  2 +-
 src/core/or/circuit_st.h                |  8 ++--
 src/core/or/circuituse.c                |  3 +-
 src/core/or/congestion_control_common.c |  4 +-
 src/core/or/edge_connection_st.h        |  3 --
 src/core/or/relay.c                     | 85 +++++++++++++++++++--------------
 src/test/fakecircs.c                    |  4 +-
 8 files changed, 60 insertions(+), 51 deletions(-)

diff --git a/src/core/mainloop/mainloop.c b/src/core/mainloop/mainloop.c
index a1ea32220a..e1c9786b2e 100644
--- a/src/core/mainloop/mainloop.c
+++ b/src/core/mainloop/mainloop.c
@@ -497,7 +497,7 @@ connection_watch_events(connection_t *conn, watchable_events_t events)
 
 /** Return true iff <b>conn</b> is listening for read events. */
 int
-connection_is_reading(connection_t *conn)
+connection_is_reading(const connection_t *conn)
 {
   tor_assert(conn);
 
diff --git a/src/core/mainloop/mainloop.h b/src/core/mainloop/mainloop.h
index 98d0b3a058..64782c1318 100644
--- a/src/core/mainloop/mainloop.h
+++ b/src/core/mainloop/mainloop.h
@@ -38,7 +38,7 @@ typedef enum watchable_events {
   WRITE_EVENT=0x04 /**< We want to know when a connection is writable */
 } watchable_events_t;
 void connection_watch_events(connection_t *conn, watchable_events_t events);
-int connection_is_reading(connection_t *conn);
+int connection_is_reading(const connection_t *conn);
 MOCK_DECL(void,connection_stop_reading,(connection_t *conn));
 MOCK_DECL(void,connection_start_reading,(connection_t *conn));
 
diff --git a/src/core/or/circuit_st.h b/src/core/or/circuit_st.h
index 7f39c9337e..1afb4d4426 100644
--- a/src/core/or/circuit_st.h
+++ b/src/core/or/circuit_st.h
@@ -88,11 +88,11 @@ struct circuit_t {
   extend_info_t *n_hop;
 
   /** True iff we are waiting for n_chan_cells to become less full before
-   * allowing p_streams to add any more cells. (Origin circuit only.) */
-  unsigned int streams_blocked_on_n_chan : 1;
+   * allowing any more cells on this circuit. (Origin circuit only.) */
+  unsigned int circuit_blocked_on_n_chan : 1;
   /** True iff we are waiting for p_chan_cells to become less full before
-   * allowing n_streams to add any more cells. (OR circuit only.) */
-  unsigned int streams_blocked_on_p_chan : 1;
+   * allowing any more cells on this circuit. (OR circuit only.) */
+  unsigned int circuit_blocked_on_p_chan : 1;
 
   /** True iff we have queued a delete backwards on this circuit, but not put
    * it on the output buffer. */
diff --git a/src/core/or/circuituse.c b/src/core/or/circuituse.c
index 9110252976..25401aea55 100644
--- a/src/core/or/circuituse.c
+++ b/src/core/or/circuituse.c
@@ -63,6 +63,7 @@
 #include "lib/math/fp.h"
 #include "lib/time/tvdiff.h"
 #include "lib/trace/events.h"
+#include "src/core/mainloop/mainloop.h"
 
 #include "core/or/cpath_build_state_st.h"
 #include "feature/dircommon/dir_connection_st.h"
@@ -938,7 +939,7 @@ circuit_log_ancient_one_hop_circuits(int age)
                  c->marked_for_close,
                  c->hold_open_until_flushed ? "" : "not ",
                  conn->edge_has_sent_end ? "" : "not ",
-                 conn->edge_blocked_on_circ ? "Blocked" : "Not blocked");
+                 connection_is_reading(c) ? "Not blocked" : "Blocked");
       if (! c->linked_conn)
         continue;
 
diff --git a/src/core/or/congestion_control_common.c b/src/core/or/congestion_control_common.c
index 920b57cf00..c7c950d0c8 100644
--- a/src/core/or/congestion_control_common.c
+++ b/src/core/or/congestion_control_common.c
@@ -954,11 +954,11 @@ congestion_control_update_circuit_bdp(congestion_control_t *cc,
   if (CIRCUIT_IS_ORIGIN(circ)) {
     /* origin circs use n_chan */
     chan_q = circ->n_chan_cells.n;
-    blocked_on_chan = circ->streams_blocked_on_n_chan;
+    blocked_on_chan = circ->circuit_blocked_on_n_chan;
   } else {
     /* Both onion services and exits use or_circuit and p_chan */
     chan_q = CONST_TO_OR_CIRCUIT(circ)->p_chan_cells.n;
-    blocked_on_chan = circ->streams_blocked_on_p_chan;
+    blocked_on_chan = circ->circuit_blocked_on_p_chan;
   }
 
   /* If we have no EWMA RTT, it is because monotime has been stalled
diff --git a/src/core/or/edge_connection_st.h b/src/core/or/edge_connection_st.h
index 942991f139..22f9040d15 100644
--- a/src/core/or/edge_connection_st.h
+++ b/src/core/or/edge_connection_st.h
@@ -66,9 +66,6 @@ struct edge_connection_t {
                          * connections.  Set once we've set the stream end,
                          * and check in connection_about_to_close_connection().
                          */
-  /** True iff we've blocked reading until the circuit has fewer queued
-   * cells. */
-  unsigned int edge_blocked_on_circ:1;
 
   /** Unique ID for directory requests; this used to be in connection_t, but
    * that's going away and being used on channels instead.  We still tag
diff --git a/src/core/or/relay.c b/src/core/or/relay.c
index 38fb560e34..827f0c3e46 100644
--- a/src/core/or/relay.c
+++ b/src/core/or/relay.c
@@ -122,6 +122,8 @@ static int connection_edge_process_ordered_relay_cell(cell_t *cell,
                                            edge_connection_t *conn,
                                            crypt_path_t *layer_hint,
                                            relay_header_t *rh);
+static void set_block_state_for_streams(edge_connection_t *stream_list,
+                                        int block, streamid_t stream_id);
 
 /** Stats: how many relay cells have originated at this hop, or have
  * been relayed onward (not recognized at this hop)?
@@ -3005,41 +3007,46 @@ channel_unlink_all_circuits(channel_t *chan, smartlist_t *circuits_out)
   chan->num_p_circuits = 0;
 }
 
-/** Block (if <b>block</b> is true) or unblock (if <b>block</b> is false)
+/**
+ * Called when a circuit becomes blocked or unblocked due to the channel
+ * cell queue.
+ *
+ * Block (if <b>block</b> is true) or unblock (if <b>block</b> is false)
  * every edge connection that is using <b>circ</b> to write to <b>chan</b>,
  * and start or stop reading as appropriate.
- *
- * If <b>stream_id</b> is nonzero, block only the edge connection whose
- * stream_id matches it.
- *
- * Returns the number of streams whose status we changed.
  */
-static int
-set_streams_blocked_on_circ(circuit_t *circ, channel_t *chan,
-                            int block, streamid_t stream_id)
+static void
+set_circuit_blocked_on_chan(circuit_t *circ, channel_t *chan, int block)
 {
   edge_connection_t *edge = NULL;
-  int n = 0;
   if (circ->n_chan == chan) {
-    circ->streams_blocked_on_n_chan = block;
+    circ->circuit_blocked_on_n_chan = block;
     if (CIRCUIT_IS_ORIGIN(circ))
       edge = TO_ORIGIN_CIRCUIT(circ)->p_streams;
   } else {
-    circ->streams_blocked_on_p_chan = block;
+    circ->circuit_blocked_on_p_chan = block;
     tor_assert(!CIRCUIT_IS_ORIGIN(circ));
     edge = TO_OR_CIRCUIT(circ)->n_streams;
   }
 
-  for (; edge; edge = edge->next_stream) {
+  set_block_state_for_streams(edge, block, 0);
+}
+
+/**
+ * Helper function to block or unblock streams in a stream list.
+ *
+ * If <b>stream_id</id> is 0, apply the <b>block</b> state to all streams
+ * in the stream list. If it is non-zero, only apply to that specific stream.
+ */
+static void
+set_block_state_for_streams(edge_connection_t *stream_list, int block,
+                            streamid_t stream_id)
+{
+  for (edge_connection_t *edge = stream_list; edge; edge = edge->next_stream) {
     connection_t *conn = TO_CONN(edge);
     if (stream_id && edge->stream_id != stream_id)
       continue;
 
-    if (edge->edge_blocked_on_circ != block) {
-      ++n;
-      edge->edge_blocked_on_circ = block;
-    }
-
     if (!conn->read_event) {
       /* This connection is a placeholder for something; probably a DNS
        * request.  It can't actually stop or start reading.*/
@@ -3055,8 +3062,6 @@ set_streams_blocked_on_circ(circuit_t *circ, channel_t *chan,
         connection_start_reading(conn);
     }
   }
-
-  return n;
 }
 
 /** Extract the command from a packed cell. */
@@ -3094,7 +3099,7 @@ channel_flush_from_first_active_circuit, (channel_t *chan, int max))
   destroy_cell_queue_t *destroy_queue=NULL;
   circuit_t *circ;
   or_circuit_t *or_circ;
-  int streams_blocked;
+  int circ_blocked;
   packed_cell_t *cell;
 
   /* Get the cmux */
@@ -3134,12 +3139,12 @@ channel_flush_from_first_active_circuit, (channel_t *chan, int max))
 
     if (circ->n_chan == chan) {
       queue = &circ->n_chan_cells;
-      streams_blocked = circ->streams_blocked_on_n_chan;
+      circ_blocked = circ->circuit_blocked_on_n_chan;
     } else {
       or_circ = TO_OR_CIRCUIT(circ);
       tor_assert(or_circ->p_chan == chan);
       queue = &TO_OR_CIRCUIT(circ)->p_chan_cells;
-      streams_blocked = circ->streams_blocked_on_p_chan;
+      circ_blocked = circ->circuit_blocked_on_p_chan;
     }
 
     /* Circuitmux told us this was active, so it should have cells.
@@ -3240,8 +3245,8 @@ channel_flush_from_first_active_circuit, (channel_t *chan, int max))
 
     /* Is the cell queue low enough to unblock all the streams that are waiting
      * to write to this circuit? */
-    if (streams_blocked && queue->n <= cell_queue_lowwatermark())
-      set_streams_blocked_on_circ(circ, chan, 0, 0); /* unblock streams */
+    if (circ_blocked && queue->n <= cell_queue_lowwatermark())
+      set_circuit_blocked_on_chan(circ, chan, 0); /* unblock streams */
 
     /* If n_flushed < max still, loop around and pick another circuit */
   }
@@ -3346,9 +3351,10 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan,
                              streamid_t fromstream)
 {
   or_circuit_t *orcirc = NULL;
+  edge_connection_t *stream_list = NULL;
   cell_queue_t *queue;
   int32_t max_queue_size;
-  int streams_blocked;
+  int circ_blocked;
   int exitward;
   if (circ->marked_for_close)
     return;
@@ -3356,13 +3362,16 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan,
   exitward = (direction == CELL_DIRECTION_OUT);
   if (exitward) {
     queue = &circ->n_chan_cells;
-    streams_blocked = circ->streams_blocked_on_n_chan;
+    circ_blocked = circ->circuit_blocked_on_n_chan;
     max_queue_size = max_circuit_cell_queue_size_out;
+    if (CIRCUIT_IS_ORIGIN(circ))
+      stream_list = TO_ORIGIN_CIRCUIT(circ)->p_streams;
   } else {
     orcirc = TO_OR_CIRCUIT(circ);
     queue = &orcirc->p_chan_cells;
-    streams_blocked = circ->streams_blocked_on_p_chan;
+    circ_blocked = circ->circuit_blocked_on_p_chan;
     max_queue_size = max_circuit_cell_queue_size;
+    stream_list = TO_OR_CIRCUIT(circ)->n_streams;
   }
 
   if (PREDICT_UNLIKELY(queue->n >= max_queue_size)) {
@@ -3395,14 +3404,16 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan,
       return;
   }
 
-  /* If we have too many cells on the circuit, we should stop reading from
-   * the edge streams for a while. */
-  if (!streams_blocked && queue->n >= cell_queue_highwatermark())
-    set_streams_blocked_on_circ(circ, chan, 1, 0); /* block streams */
+  /* If we have too many cells on the circuit, note that it should
+   * be blocked from new cells. */
+  if (!circ_blocked && queue->n >= cell_queue_highwatermark())
+    set_circuit_blocked_on_chan(circ, chan, 1);
 
-  if (streams_blocked && fromstream) {
-    /* This edge connection is apparently not blocked; block it. */
-    set_streams_blocked_on_circ(circ, chan, 1, fromstream);
+  if (circ_blocked && fromstream) {
+    /* This edge connection is apparently not blocked; this can happen for
+     * new streams on a blocked circuit, for their CONNECTED response.
+     * block it now. */
+    set_block_state_for_streams(stream_list, 1, fromstream);
   }
 
   update_circuit_on_cmux(circ, direction);
@@ -3508,8 +3519,8 @@ static int
 circuit_queue_streams_are_blocked(circuit_t *circ)
 {
   if (CIRCUIT_IS_ORIGIN(circ)) {
-    return circ->streams_blocked_on_n_chan;
+    return circ->circuit_blocked_on_n_chan;
   } else {
-    return circ->streams_blocked_on_p_chan;
+    return circ->circuit_blocked_on_p_chan;
   }
 }
diff --git a/src/test/fakecircs.c b/src/test/fakecircs.c
index cca3b43483..caeacd84ef 100644
--- a/src/test/fakecircs.c
+++ b/src/test/fakecircs.c
@@ -41,8 +41,8 @@ new_fake_orcirc(channel_t *nchan, channel_t *pchan)
   cell_queue_init(&(circ->n_chan_cells));
 
   circ->n_hop = NULL;
-  circ->streams_blocked_on_n_chan = 0;
-  circ->streams_blocked_on_p_chan = 0;
+  circ->circuit_blocked_on_n_chan = 0;
+  circ->circuit_blocked_on_p_chan = 0;
   circ->n_delete_pending = 0;
   circ->p_delete_pending = 0;
   circ->received_destroy = 0;

-- 
To stop receiving notification emails like this one, please contact
the administrator of this repository.
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits