[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [tor] 07/20: Prop#329 Algs: Conflux multiplexed cell receive handling



This is an automated email from the git hooks/post-receive script.

ahf pushed a commit to branch main
in repository tor.

commit 2bd1eca78c7771b9d16bb2a670420987c4a36c91
Author: Mike Perry <mikeperry-git@xxxxxxxxxxxxxx>
AuthorDate: Wed Dec 14 21:03:52 2022 +0000

    Prop#329 Algs: Conflux multiplexed cell receive handling
---
 src/core/or/conflux.c | 260 ++++++++++++++++++++++++++++++++++++++++++++++++++
 src/core/or/relay.c   |  80 ++++++++++++++--
 2 files changed, 331 insertions(+), 9 deletions(-)

diff --git a/src/core/or/conflux.c b/src/core/or/conflux.c
new file mode 100644
index 0000000000..6179fea279
--- /dev/null
+++ b/src/core/or/conflux.c
@@ -0,0 +1,260 @@
+/* Copyright (c) 2021, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * \file conflux.c
+ * \brief Conflux multipath core algorithms
+ */
+
+#define TOR_CONFLUX_PRIVATE
+
+#include "core/or/or.h"
+
+#include "core/or/circuit_st.h"
+#include "core/or/sendme.h"
+#include "core/or/congestion_control_common.h"
+#include "core/or/congestion_control_st.h"
+#include "core/or/origin_circuit_st.h"
+#include "core/or/circuitlist.h"
+#include "core/or/circuituse.h"
+#include "core/or/conflux.h"
+#include "core/or/conflux_params.h"
+#include "core/or/conflux_util.h"
+#include "core/or/conflux_st.h"
+#include "lib/time/compat_time.h"
+#include "app/config/config.h"
+
+#include "trunnel/extension.h"
+
+/** One million microseconds in a second */
+#define USEC_PER_SEC 1000000
+
+/**
+ * Determine if we should multiplex a specific relay command or not.
+ *
+ * TODO: Version of this that is the set of forbidden commands
+ * on linked circuits
+ */
+bool
+conflux_should_multiplex(int relay_command)
+{
+  switch (relay_command) {
+    /* These are all fine to multiplex, and must be
+     * so that ordering is preserved */
+    case RELAY_COMMAND_BEGIN:
+    case RELAY_COMMAND_DATA:
+    case RELAY_COMMAND_END:
+    case RELAY_COMMAND_CONNECTED:
+      return true;
+
+    /* We can't multiplex these because they are
+     * circuit-specific */
+    case RELAY_COMMAND_SENDME:
+    case RELAY_COMMAND_EXTEND:
+    case RELAY_COMMAND_EXTENDED:
+    case RELAY_COMMAND_TRUNCATE:
+    case RELAY_COMMAND_TRUNCATED:
+    case RELAY_COMMAND_DROP:
+      return false;
+
+    /* We must multiplex RESOLVEs because their ordering
+     * impacts begin/end. */
+    case RELAY_COMMAND_RESOLVE:
+    case RELAY_COMMAND_RESOLVED:
+      return true;
+
+    /* These are all circuit-specific */
+    case RELAY_COMMAND_BEGIN_DIR:
+    case RELAY_COMMAND_EXTEND2:
+    case RELAY_COMMAND_EXTENDED2:
+    case RELAY_COMMAND_ESTABLISH_INTRO:
+    case RELAY_COMMAND_ESTABLISH_RENDEZVOUS:
+    case RELAY_COMMAND_INTRODUCE1:
+    case RELAY_COMMAND_INTRODUCE2:
+    case RELAY_COMMAND_RENDEZVOUS1:
+    case RELAY_COMMAND_RENDEZVOUS2:
+    case RELAY_COMMAND_INTRO_ESTABLISHED:
+    case RELAY_COMMAND_RENDEZVOUS_ESTABLISHED:
+    case RELAY_COMMAND_INTRODUCE_ACK:
+    case RELAY_COMMAND_PADDING_NEGOTIATE:
+    case RELAY_COMMAND_PADDING_NEGOTIATED:
+      return false;
+
+    /* These must be multiplexed because their ordering
+     * relative to BEGIN/END must be preserved */
+    case RELAY_COMMAND_XOFF:
+    case RELAY_COMMAND_XON:
+      return true;
+
+    /* These two are not multiplexed, because they must
+     * be processed immediately to update sequence numbers
+     * before any other cells are processed on the circuit */
+    case RELAY_COMMAND_CONFLUX_SWITCH:
+    case RELAY_COMMAND_CONFLUX_LINK:
+    case RELAY_COMMAND_CONFLUX_LINKED:
+    case RELAY_COMMAND_CONFLUX_LINKED_ACK:
+      return false;
+
+    default:
+      log_warn(LD_BUG, "Conflux asked to multiplex unknown relay command %d",
+               relay_command);
+      return false;
+  }
+}
+
+/** Return the leg for a circuit in a conflux set. Return NULL if not found. */
+conflux_leg_t *
+conflux_get_leg(conflux_t *cfx, const circuit_t *circ)
+{
+  conflux_leg_t *leg_found = NULL;
+
+  // Find the leg that the cell is written on
+  CONFLUX_FOR_EACH_LEG_BEGIN(cfx, leg) {
+    if (leg->circ == circ) {
+      leg_found = leg;
+      break;
+    }
+  } CONFLUX_FOR_EACH_LEG_END(leg);
+
+  return leg_found;
+}
+
+/**
+ * Comparison function for ooo_q pqueue.
+ *
+ * Ensures that lower sequence numbers are at the head of the pqueue.
+ */
+static int
+conflux_queue_cmp(const void *a, const void *b)
+{
+  // Compare a and b as conflux_cell_t using the seq field, and return a
+  // comparison result such that the lowest seq is at the head of the pqueue.
+  const conflux_cell_t *cell_a = a;
+  const conflux_cell_t *cell_b = b;
+
+  tor_assert(cell_a);
+  tor_assert(cell_b);
+
+  if (cell_a->seq < cell_b->seq) {
+    return -1;
+  } else if (cell_a->seq > cell_b->seq) {
+    return 1;
+  } else {
+    return 0;
+  }
+}
+
+/**
+ * Get the congestion control object for a conflux circuit.
+ *
+ * Because conflux can only be negotiated with the last hop, we
+ * can use the last hop of the cpath to obtain the congestion
+ * control object for origin circuits. For non-origin circuits,
+ * we can use the circuit itself.
+ */
+const congestion_control_t *
+circuit_ccontrol(const circuit_t *circ)
+{
+  const congestion_control_t *ccontrol = NULL;
+  tor_assert(circ);
+
+  if (CIRCUIT_IS_ORIGIN(circ)) {
+    tor_assert(CONST_TO_ORIGIN_CIRCUIT(circ)->cpath);
+    tor_assert(CONST_TO_ORIGIN_CIRCUIT(circ)->cpath->prev);
+    ccontrol = CONST_TO_ORIGIN_CIRCUIT(circ)->cpath->prev->ccontrol;
+  } else {
+    ccontrol = circ->ccontrol;
+  }
+
+  /* Conflux circuits always have congestion control*/
+  tor_assert(ccontrol);
+  return ccontrol;
+}
+
+/**
+ * Process an incoming relay cell for conflux. Called from
+ * connection_edge_process_relay_cell().
+ *
+ * Returns true if the conflux system now has well-ordered cells to deliver
+ * to streams, false otherwise.
+ */
+bool
+conflux_process_cell(conflux_t *cfx, circuit_t *in_circ,
+                     crypt_path_t *layer_hint, cell_t *cell)
+{
+  // TODO-329-TUNING: Temporarily validate legs here. We can remove
+  // this after tuning is complete.
+  conflux_validate_legs(cfx);
+
+  conflux_leg_t *leg = conflux_get_leg(cfx, in_circ);
+  if (!leg) {
+    log_warn(LD_BUG, "Got a conflux cell on a circuit without "
+             "conflux leg. Closing circuit.");
+    circuit_mark_for_close(in_circ, END_CIRC_REASON_INTERNAL);
+    return false;
+  }
+
+  /* We need to make sure this cell came from the expected hop, or
+   * else it could be a data corruption attack from a middle node. */
+  if (!conflux_validate_source_hop(in_circ, layer_hint)) {
+    circuit_mark_for_close(in_circ, END_CIRC_REASON_TORPROTOCOL);
+    return false;
+  }
+
+  /* Update the running absolute sequence number */
+  leg->last_seq_recv++;
+
+  /* If this cell is next, fast-path it by processing the cell in-place */
+  if (leg->last_seq_recv == cfx->last_seq_delivered + 1) {
+    /* The cell is now ready to be processed, and rest of the queue should
+     * now be checked for remaining elements */
+    cfx->last_seq_delivered++;
+    return true;
+  } else if (BUG(leg->last_seq_recv <= cfx->last_seq_delivered)) {
+    log_warn(LD_BUG, "Got a conflux cell with a sequence number "
+             "less than the last delivered. Closing circuit.");
+    circuit_mark_for_close(in_circ, END_CIRC_REASON_INTERNAL);
+    return false;
+  } else {
+    conflux_cell_t *c_cell = tor_malloc_zero(sizeof(conflux_cell_t));
+    c_cell->seq = leg->last_seq_recv;
+
+    memcpy(&c_cell->cell, cell, sizeof(cell_t));
+
+    smartlist_pqueue_add(cfx->ooo_q, conflux_queue_cmp,
+            offsetof(conflux_cell_t, heap_idx), c_cell);
+    total_ooo_q_bytes += sizeof(cell_t);
+
+    /* This cell should not be processed yet, and the queue is not ready
+     * to process because the next absolute seqnum has not yet arrived */
+    return false;
+  }
+}
+
+/**
+ * Dequeue the top cell from our queue.
+ *
+ * Returns the cell as a conflux_cell_t, or NULL if the queue is empty
+ * or has a hole.
+ */
+conflux_cell_t *
+conflux_dequeue_cell(conflux_t *cfx)
+{
+  conflux_cell_t *top = NULL;
+  if (smartlist_len(cfx->ooo_q) == 0)
+    return NULL;
+
+  top = smartlist_get(cfx->ooo_q, 0);
+
+  /* If the top cell is the next sequence number we need, then
+   * pop and return it. */
+  if (top->seq == cfx->last_seq_delivered+1) {
+    smartlist_pqueue_pop(cfx->ooo_q, conflux_queue_cmp,
+                         offsetof(conflux_cell_t, heap_idx));
+    total_ooo_q_bytes -= sizeof(cell_t);
+    cfx->last_seq_delivered++;
+    return top;
+  } else {
+    return NULL;
+  }
+}
diff --git a/src/core/or/relay.c b/src/core/or/relay.c
index 7929f57ee6..26e52b0d95 100644
--- a/src/core/or/relay.c
+++ b/src/core/or/relay.c
@@ -99,6 +99,7 @@
 #include "core/or/sendme.h"
 #include "core/or/congestion_control_common.h"
 #include "core/or/congestion_control_flow.h"
+#include "core/or/conflux.h"
 
 static edge_connection_t *relay_lookup_conn(circuit_t *circ, cell_t *cell,
                                             cell_direction_t cell_direction,
@@ -116,6 +117,11 @@ static void adjust_exit_policy_from_exitpolicy_failure(origin_circuit_t *circ,
                                                   entry_connection_t *conn,
                                                   node_t *node,
                                                   const tor_addr_t *addr);
+static int connection_edge_process_ordered_relay_cell(cell_t *cell,
+                                           circuit_t *circ,
+                                           edge_connection_t *conn,
+                                           crypt_path_t *layer_hint,
+                                           relay_header_t *rh);
 
 /** Stats: how many relay cells have originated at this hop, or have
  * been relayed onward (not recognized at this hop)?
@@ -610,7 +616,7 @@ pad_cell_payload(uint8_t *cell_payload, size_t data_len)
  * return 0.
  */
 MOCK_IMPL(int,
-relay_send_command_from_edge_,(streamid_t stream_id, circuit_t *circ,
+relay_send_command_from_edge_,(streamid_t stream_id, circuit_t *orig_circ,
                                uint8_t relay_command, const char *payload,
                                size_t payload_len, crypt_path_t *cpath_layer,
                                const char *filename, int lineno))
@@ -640,6 +646,7 @@ relay_send_command_from_edge_,(streamid_t stream_id, circuit_t *circ,
   rh.stream_id = stream_id;
   rh.length = payload_len;
   relay_header_pack(cell.payload, &rh);
+
   if (payload_len)
     memcpy(cell.payload+RELAY_HEADER_SIZE, payload, payload_len);
 
@@ -2051,9 +2058,6 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
   static int num_seen=0;
   relay_header_t rh;
   unsigned domain = layer_hint?LD_APP:LD_EXIT;
-  int optimistic_data = 0; /* Set to 1 if we receive data on a stream
-                            * that's in the EXIT_CONN_STATE_RESOLVING
-                            * or EXIT_CONN_STATE_CONNECTING states. */
 
   tor_assert(cell);
   tor_assert(circ);
@@ -2086,8 +2090,66 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
     }
   }
 
+  /* Conflux handling: If conflux is disabled, or the relay command is not
+   * multiplexed across circuits, then process it immediately.
+   *
+   * Otherwise, we need to process the relay cell against our conflux
+   * queues, and if doing so results in ordered cells to deliver, we
+   * dequeue and process those in-order until there are no more.
+   */
+  if (!circ->conflux || !conflux_should_multiplex(rh.command)) {
+    return connection_edge_process_ordered_relay_cell(cell, circ, conn,
+                                                      layer_hint, &rh);
+  } else {
+    // If conflux says this cell is in-order, then begin processing
+    // cells from queue until there are none. Otherwise, we do nothing
+    // until further cells arrive.
+    if (conflux_process_cell(circ->conflux, circ, layer_hint, cell)) {
+      conflux_cell_t *c_cell = NULL;
+      int ret = 0;
+
+      /* First, process this cell */
+      if ((ret = connection_edge_process_ordered_relay_cell(cell, circ, conn,
+                                                 layer_hint, &rh)) < 0) {
+        return ret;
+      }
+
+      /* Now, check queue for more */
+      while ((c_cell = conflux_dequeue_cell(circ->conflux))) {
+        relay_header_unpack(&rh, c_cell->cell.payload);
+        conn = relay_lookup_conn(circ, &c_cell->cell, CELL_DIRECTION_OUT,
+                                 layer_hint);
+        if ((ret = connection_edge_process_ordered_relay_cell(&c_cell->cell,
+                                                   circ, conn, layer_hint,
+                                                   &rh)) < 0) {
+          /* Negative return value is a fatal error. Return early and tear down
+           * circuit */
+          tor_free(c_cell);
+          return ret;
+        }
+        tor_free(c_cell);
+      }
+    }
+  }
+
+  return 0;
+}
+
+/**
+ * Helper function to process a relay cell that is in the proper order
+ * for processing right now. */
+static int
+connection_edge_process_ordered_relay_cell(cell_t *cell, circuit_t *circ,
+                                           edge_connection_t *conn,
+                                           crypt_path_t *layer_hint,
+                                           relay_header_t *rh)
+{
+  int optimistic_data = 0; /* Set to 1 if we receive data on a stream
+                            * that's in the EXIT_CONN_STATE_RESOLVING
+                            * or EXIT_CONN_STATE_CONNECTING states. */
+
   /* Tell circpad that we've received a recognized cell */
-  circpad_deliver_recognized_relay_cell_events(circ, rh.command, layer_hint);
+  circpad_deliver_recognized_relay_cell_events(circ, rh->command, layer_hint);
 
   /* either conn is NULL, in which case we've got a control cell, or else
    * conn points to the recognized stream. */
@@ -2095,22 +2157,22 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
     if (conn->base_.type == CONN_TYPE_EXIT &&
         (conn->base_.state == EXIT_CONN_STATE_CONNECTING ||
          conn->base_.state == EXIT_CONN_STATE_RESOLVING) &&
-        rh.command == RELAY_COMMAND_DATA) {
+        rh->command == RELAY_COMMAND_DATA) {
       /* Allow DATA cells to be delivered to an exit node in state
        * EXIT_CONN_STATE_CONNECTING or EXIT_CONN_STATE_RESOLVING.
        * This speeds up HTTP, for example. */
       optimistic_data = 1;
-    } else if (rh.stream_id == 0 && rh.command == RELAY_COMMAND_DATA) {
+    } else if (rh->stream_id == 0 && rh->command == RELAY_COMMAND_DATA) {
       log_warn(LD_BUG, "Somehow I had a connection that matched a "
                "data cell with stream ID 0.");
     } else {
       return connection_edge_process_relay_cell_not_open(
-               &rh, cell, circ, conn, layer_hint);
+               rh, cell, circ, conn, layer_hint);
     }
   }
 
   return handle_relay_cell_command(cell, circ, conn, layer_hint,
-                              &rh, optimistic_data);
+                              rh, optimistic_data);
 }
 
 /** How many relay_data cells have we built, ever? */

-- 
To stop receiving notification emails like this one, please contact
the administrator of this repository.
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits