[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[or-cvs] [tor/master 5/7] Fix behavior of adding a cell to a blocked queue.



Author: Nick Mathewson <nickm@xxxxxxxxxxxxxx>
Date: Thu, 2 Sep 2010 15:26:17 -0400
Subject: Fix behavior of adding a cell to a blocked queue.
Commit: f89323afdadadb8db7eb48f7cbe75c5f4384dae4

We frequently add cells to stream-blocked queues for valid reasons
that don't mean we need to block streams.  The most obvious reason
is if the cell arrives over a circuit rather than from an edge: we
don't block circuits, no matter how full queues get.  The next most
obvious reason is that we allow CONNECTED cells from a newly created
stream to get delivered just fine.

This patch changes the behavior so that we only iterate over the
streams on a circuit when the cell in question came from a stream,
and we only block the stream that generated the cell, so that other
streams can still get their CONNECTEDs in.
---
 src/or/circuitbuild.c |    4 ++--
 src/or/relay.c        |   38 +++++++++++++++++++-------------------
 src/or/relay.h        |    3 ++-
 3 files changed, 23 insertions(+), 22 deletions(-)

diff --git a/src/or/circuitbuild.c b/src/or/circuitbuild.c
index e5e7d22..5567b24 100644
--- a/src/or/circuitbuild.c
+++ b/src/or/circuitbuild.c
@@ -1752,7 +1752,7 @@ circuit_deliver_create_cell(circuit_t *circ, uint8_t cell_type,
   cell.circ_id = circ->n_circ_id;
 
   memcpy(cell.payload, payload, ONIONSKIN_CHALLENGE_LEN);
-  append_cell_to_circuit_queue(circ, circ->n_conn, &cell, CELL_DIRECTION_OUT);
+  append_cell_to_circuit_queue(circ, circ->n_conn, &cell, CELL_DIRECTION_OUT, 0);
 
   if (CIRCUIT_IS_ORIGIN(circ)) {
     /* mark it so it gets better rate limiting treatment. */
@@ -2329,7 +2329,7 @@ onionskin_answer(or_circuit_t *circ, uint8_t cell_type, const char *payload,
   circ->is_first_hop = (cell_type == CELL_CREATED_FAST);
 
   append_cell_to_circuit_queue(TO_CIRCUIT(circ),
-                               circ->p_conn, &cell, CELL_DIRECTION_IN);
+                               circ->p_conn, &cell, CELL_DIRECTION_IN, 0);
   log_debug(LD_CIRC,"Finished sending 'created' cell.");
 
   if (!is_local_addr(&circ->p_conn->_base.addr) &&
diff --git a/src/or/relay.c b/src/or/relay.c
index 88106e5..794f448 100644
--- a/src/or/relay.c
+++ b/src/or/relay.c
@@ -269,7 +269,7 @@ circuit_receive_relay_cell(cell_t *cell, circuit_t *circ,
                                   * we might kill the circ before we relay
                                   * the cells. */
 
-  append_cell_to_circuit_queue(circ, or_conn, cell, cell_direction);
+  append_cell_to_circuit_queue(circ, or_conn, cell, cell_direction, 0);
   return 0;
 }
 
@@ -366,7 +366,7 @@ relay_crypt(circuit_t *circ, cell_t *cell, cell_direction_t cell_direction,
 static int
 circuit_package_relay_cell(cell_t *cell, circuit_t *circ,
                            cell_direction_t cell_direction,
-                           crypt_path_t *layer_hint)
+                           crypt_path_t *layer_hint, uint16_t on_stream)
 {
   or_connection_t *conn; /* where to send the cell */
 
@@ -410,7 +410,7 @@ circuit_package_relay_cell(cell_t *cell, circuit_t *circ,
   }
   ++stats_n_relay_cells_relayed;
 
-  append_cell_to_circuit_queue(circ, conn, cell, cell_direction);
+  append_cell_to_circuit_queue(circ, conn, cell, cell_direction, on_stream);
   return 0;
 }
 
@@ -625,7 +625,7 @@ relay_send_command_from_edge(uint16_t stream_id, circuit_t *circ,
     }
   }
 
-  if (circuit_package_relay_cell(&cell, circ, cell_direction, cpath_layer)
+  if (circuit_package_relay_cell(&cell, circ, cell_direction, cpath_layer, 0)
       < 0) {
     log_warn(LD_BUG,"circuit_package_relay_cell failed. Closing.");
     circuit_mark_for_close(circ, END_CIRC_REASON_INTERNAL);
@@ -2102,11 +2102,14 @@ connection_or_unlink_all_active_circs(or_connection_t *orconn)
  * every edge connection that is using <b>circ</b> to write to <b>orconn</b>,
  * and start or stop reading as appropriate.
  *
+ * If <b>stream_id</b> is nonzero, block only the edge connection whose
+ * stream_id matches it.
+ *
  * Returns the number of streams whose status we changed.
  */
 static int
 set_streams_blocked_on_circ(circuit_t *circ, or_connection_t *orconn,
-                            int block)
+                            int block, uint16_t stream_id)
 {
   edge_connection_t *edge = NULL;
   int n = 0;
@@ -2122,6 +2125,9 @@ set_streams_blocked_on_circ(circuit_t *circ, or_connection_t *orconn,
 
   for (; edge; edge = edge->next_stream) {
     connection_t *conn = TO_CONN(edge);
+    if (stream_id && edge->stream_id != stream_id)
+      continue;
+
     if (edge->edge_blocked_on_circ != block) {
       ++n;
       edge->edge_blocked_on_circ = block;
@@ -2269,7 +2275,7 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max,
   /* Is the cell queue low enough to unblock all the streams that are waiting
    * to write to this circuit? */
   if (streams_blocked && queue->n <= CELL_QUEUE_LOWWATER_SIZE)
-    set_streams_blocked_on_circ(circ, conn, 0); /* unblock streams */
+    set_streams_blocked_on_circ(circ, conn, 0, 0); /* unblock streams */
 
   /* Did we just run out of cells on this circuit's queue? */
   if (queue->n == 0) {
@@ -2286,7 +2292,8 @@ connection_or_flush_from_first_active_circuit(or_connection_t *conn, int max,
  * transmitting in <b>direction</b>. */
 void
 append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn,
-                             cell_t *cell, cell_direction_t direction)
+                             cell_t *cell, cell_direction_t direction,
+                             uint16_t fromstream)
 {
   cell_queue_t *queue;
   int streams_blocked;
@@ -2308,18 +2315,11 @@ append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn,
   /* If we have too many cells on the circuit, we should stop reading from
    * the edge streams for a while. */
   if (!streams_blocked && queue->n >= CELL_QUEUE_HIGHWATER_SIZE)
-    set_streams_blocked_on_circ(circ, orconn, 1); /* block streams */
-
-  if (streams_blocked) {
-    /* We must have missed a connection! */
-    int n = set_streams_blocked_on_circ(circ, orconn, 1);
-    if (n) {
-      log_info(LD_BUG, "Got a cell added to a cell queue when streams were "
-               "supposed to be blocked; found that %d streams weren't.", n);
-    } else {
-      log_info(LD_BUG, "Got a cell added to a cell queue when streams were "
-               "all blocked. We should figure out why.");
-    }
+    set_streams_blocked_on_circ(circ, orconn, 1, 0); /* block streams */
+
+  if (streams_blocked && fromstream) {
+    /* This edge connection is apparently not blocked; block it. */
+    set_streams_blocked_on_circ(circ, orconn, 1, fromstream);
   }
 
   if (queue->n == 1) {
diff --git a/src/or/relay.h b/src/or/relay.h
index 73855a5..088ef32 100644
--- a/src/or/relay.h
+++ b/src/or/relay.h
@@ -45,7 +45,8 @@ void cell_queue_append(cell_queue_t *queue, packed_cell_t *cell);
 void cell_queue_append_packed_copy(cell_queue_t *queue, const cell_t *cell);
 
 void append_cell_to_circuit_queue(circuit_t *circ, or_connection_t *orconn,
-                                  cell_t *cell, cell_direction_t direction);
+                                  cell_t *cell, cell_direction_t direction,
+                                  uint16_t fromstream);
 void connection_or_unlink_all_active_circs(or_connection_t *conn);
 int connection_or_flush_from_first_active_circuit(or_connection_t *conn,
                                                   int max, time_t now);
-- 
1.7.1