[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [tor] 07/20: Prop#329 Algs: Conflux multiplexed cell receive handling
This is an automated email from the git hooks/post-receive script.
ahf pushed a commit to branch main
in repository tor.
commit 2bd1eca78c7771b9d16bb2a670420987c4a36c91
Author: Mike Perry <mikeperry-git@xxxxxxxxxxxxxx>
AuthorDate: Wed Dec 14 21:03:52 2022 +0000
Prop#329 Algs: Conflux multiplexed cell receive handling
---
src/core/or/conflux.c | 260 ++++++++++++++++++++++++++++++++++++++++++++++++++
src/core/or/relay.c | 80 ++++++++++++++--
2 files changed, 331 insertions(+), 9 deletions(-)
diff --git a/src/core/or/conflux.c b/src/core/or/conflux.c
new file mode 100644
index 0000000000..6179fea279
--- /dev/null
+++ b/src/core/or/conflux.c
@@ -0,0 +1,260 @@
+/* Copyright (c) 2021, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * \file conflux.c
+ * \brief Conflux multipath core algorithms
+ */
+
+#define TOR_CONFLUX_PRIVATE
+
+#include "core/or/or.h"
+
+#include "core/or/circuit_st.h"
+#include "core/or/sendme.h"
+#include "core/or/congestion_control_common.h"
+#include "core/or/congestion_control_st.h"
+#include "core/or/origin_circuit_st.h"
+#include "core/or/circuitlist.h"
+#include "core/or/circuituse.h"
+#include "core/or/conflux.h"
+#include "core/or/conflux_params.h"
+#include "core/or/conflux_util.h"
+#include "core/or/conflux_st.h"
+#include "lib/time/compat_time.h"
+#include "app/config/config.h"
+
+#include "trunnel/extension.h"
+
+/** One million microseconds in a second */
+#define USEC_PER_SEC 1000000
+
+/**
+ * Determine if we should multiplex a specific relay command or not.
+ *
+ * TODO: Version of this that is the set of forbidden commands
+ * on linked circuits
+ */
+bool
+conflux_should_multiplex(int relay_command)
+{
+ switch (relay_command) {
+ /* These are all fine to multiplex, and must be
+ * so that ordering is preserved */
+ case RELAY_COMMAND_BEGIN:
+ case RELAY_COMMAND_DATA:
+ case RELAY_COMMAND_END:
+ case RELAY_COMMAND_CONNECTED:
+ return true;
+
+ /* We can't multiplex these because they are
+ * circuit-specific */
+ case RELAY_COMMAND_SENDME:
+ case RELAY_COMMAND_EXTEND:
+ case RELAY_COMMAND_EXTENDED:
+ case RELAY_COMMAND_TRUNCATE:
+ case RELAY_COMMAND_TRUNCATED:
+ case RELAY_COMMAND_DROP:
+ return false;
+
+ /* We must multiplex RESOLVEs because their ordering
+ * impacts begin/end. */
+ case RELAY_COMMAND_RESOLVE:
+ case RELAY_COMMAND_RESOLVED:
+ return true;
+
+ /* These are all circuit-specific */
+ case RELAY_COMMAND_BEGIN_DIR:
+ case RELAY_COMMAND_EXTEND2:
+ case RELAY_COMMAND_EXTENDED2:
+ case RELAY_COMMAND_ESTABLISH_INTRO:
+ case RELAY_COMMAND_ESTABLISH_RENDEZVOUS:
+ case RELAY_COMMAND_INTRODUCE1:
+ case RELAY_COMMAND_INTRODUCE2:
+ case RELAY_COMMAND_RENDEZVOUS1:
+ case RELAY_COMMAND_RENDEZVOUS2:
+ case RELAY_COMMAND_INTRO_ESTABLISHED:
+ case RELAY_COMMAND_RENDEZVOUS_ESTABLISHED:
+ case RELAY_COMMAND_INTRODUCE_ACK:
+ case RELAY_COMMAND_PADDING_NEGOTIATE:
+ case RELAY_COMMAND_PADDING_NEGOTIATED:
+ return false;
+
+ /* These must be multiplexed because their ordering
+ * relative to BEGIN/END must be preserved */
+ case RELAY_COMMAND_XOFF:
+ case RELAY_COMMAND_XON:
+ return true;
+
+ /* These two are not multiplexed, because they must
+ * be processed immediately to update sequence numbers
+ * before any other cells are processed on the circuit */
+ case RELAY_COMMAND_CONFLUX_SWITCH:
+ case RELAY_COMMAND_CONFLUX_LINK:
+ case RELAY_COMMAND_CONFLUX_LINKED:
+ case RELAY_COMMAND_CONFLUX_LINKED_ACK:
+ return false;
+
+ default:
+ log_warn(LD_BUG, "Conflux asked to multiplex unknown relay command %d",
+ relay_command);
+ return false;
+ }
+}
+
+/** Return the leg for a circuit in a conflux set. Return NULL if not found. */
+conflux_leg_t *
+conflux_get_leg(conflux_t *cfx, const circuit_t *circ)
+{
+ conflux_leg_t *leg_found = NULL;
+
+ // Find the leg that the cell is written on
+ CONFLUX_FOR_EACH_LEG_BEGIN(cfx, leg) {
+ if (leg->circ == circ) {
+ leg_found = leg;
+ break;
+ }
+ } CONFLUX_FOR_EACH_LEG_END(leg);
+
+ return leg_found;
+}
+
+/**
+ * Comparison function for ooo_q pqueue.
+ *
+ * Ensures that lower sequence numbers are at the head of the pqueue.
+ */
+static int
+conflux_queue_cmp(const void *a, const void *b)
+{
+ // Compare a and b as conflux_cell_t using the seq field, and return a
+ // comparison result such that the lowest seq is at the head of the pqueue.
+ const conflux_cell_t *cell_a = a;
+ const conflux_cell_t *cell_b = b;
+
+ tor_assert(cell_a);
+ tor_assert(cell_b);
+
+ if (cell_a->seq < cell_b->seq) {
+ return -1;
+ } else if (cell_a->seq > cell_b->seq) {
+ return 1;
+ } else {
+ return 0;
+ }
+}
+
+/**
+ * Get the congestion control object for a conflux circuit.
+ *
+ * Because conflux can only be negotiated with the last hop, we
+ * can use the last hop of the cpath to obtain the congestion
+ * control object for origin circuits. For non-origin circuits,
+ * we can use the circuit itself.
+ */
+const congestion_control_t *
+circuit_ccontrol(const circuit_t *circ)
+{
+ const congestion_control_t *ccontrol = NULL;
+ tor_assert(circ);
+
+ if (CIRCUIT_IS_ORIGIN(circ)) {
+ tor_assert(CONST_TO_ORIGIN_CIRCUIT(circ)->cpath);
+ tor_assert(CONST_TO_ORIGIN_CIRCUIT(circ)->cpath->prev);
+ ccontrol = CONST_TO_ORIGIN_CIRCUIT(circ)->cpath->prev->ccontrol;
+ } else {
+ ccontrol = circ->ccontrol;
+ }
+
+ /* Conflux circuits always have congestion control*/
+ tor_assert(ccontrol);
+ return ccontrol;
+}
+
+/**
+ * Process an incoming relay cell for conflux. Called from
+ * connection_edge_process_relay_cell().
+ *
+ * Returns true if the conflux system now has well-ordered cells to deliver
+ * to streams, false otherwise.
+ */
+bool
+conflux_process_cell(conflux_t *cfx, circuit_t *in_circ,
+ crypt_path_t *layer_hint, cell_t *cell)
+{
+ // TODO-329-TUNING: Temporarily validate legs here. We can remove
+ // this after tuning is complete.
+ conflux_validate_legs(cfx);
+
+ conflux_leg_t *leg = conflux_get_leg(cfx, in_circ);
+ if (!leg) {
+ log_warn(LD_BUG, "Got a conflux cell on a circuit without "
+ "conflux leg. Closing circuit.");
+ circuit_mark_for_close(in_circ, END_CIRC_REASON_INTERNAL);
+ return false;
+ }
+
+ /* We need to make sure this cell came from the expected hop, or
+ * else it could be a data corruption attack from a middle node. */
+ if (!conflux_validate_source_hop(in_circ, layer_hint)) {
+ circuit_mark_for_close(in_circ, END_CIRC_REASON_TORPROTOCOL);
+ return false;
+ }
+
+ /* Update the running absolute sequence number */
+ leg->last_seq_recv++;
+
+ /* If this cell is next, fast-path it by processing the cell in-place */
+ if (leg->last_seq_recv == cfx->last_seq_delivered + 1) {
+ /* The cell is now ready to be processed, and rest of the queue should
+ * now be checked for remaining elements */
+ cfx->last_seq_delivered++;
+ return true;
+ } else if (BUG(leg->last_seq_recv <= cfx->last_seq_delivered)) {
+ log_warn(LD_BUG, "Got a conflux cell with a sequence number "
+ "less than the last delivered. Closing circuit.");
+ circuit_mark_for_close(in_circ, END_CIRC_REASON_INTERNAL);
+ return false;
+ } else {
+ conflux_cell_t *c_cell = tor_malloc_zero(sizeof(conflux_cell_t));
+ c_cell->seq = leg->last_seq_recv;
+
+ memcpy(&c_cell->cell, cell, sizeof(cell_t));
+
+ smartlist_pqueue_add(cfx->ooo_q, conflux_queue_cmp,
+ offsetof(conflux_cell_t, heap_idx), c_cell);
+ total_ooo_q_bytes += sizeof(cell_t);
+
+ /* This cell should not be processed yet, and the queue is not ready
+ * to process because the next absolute seqnum has not yet arrived */
+ return false;
+ }
+}
+
+/**
+ * Dequeue the top cell from our queue.
+ *
+ * Returns the cell as a conflux_cell_t, or NULL if the queue is empty
+ * or has a hole.
+ */
+conflux_cell_t *
+conflux_dequeue_cell(conflux_t *cfx)
+{
+ conflux_cell_t *top = NULL;
+ if (smartlist_len(cfx->ooo_q) == 0)
+ return NULL;
+
+ top = smartlist_get(cfx->ooo_q, 0);
+
+ /* If the top cell is the next sequence number we need, then
+ * pop and return it. */
+ if (top->seq == cfx->last_seq_delivered+1) {
+ smartlist_pqueue_pop(cfx->ooo_q, conflux_queue_cmp,
+ offsetof(conflux_cell_t, heap_idx));
+ total_ooo_q_bytes -= sizeof(cell_t);
+ cfx->last_seq_delivered++;
+ return top;
+ } else {
+ return NULL;
+ }
+}
diff --git a/src/core/or/relay.c b/src/core/or/relay.c
index 7929f57ee6..26e52b0d95 100644
--- a/src/core/or/relay.c
+++ b/src/core/or/relay.c
@@ -99,6 +99,7 @@
#include "core/or/sendme.h"
#include "core/or/congestion_control_common.h"
#include "core/or/congestion_control_flow.h"
+#include "core/or/conflux.h"
static edge_connection_t *relay_lookup_conn(circuit_t *circ, cell_t *cell,
cell_direction_t cell_direction,
@@ -116,6 +117,11 @@ static void adjust_exit_policy_from_exitpolicy_failure(origin_circuit_t *circ,
entry_connection_t *conn,
node_t *node,
const tor_addr_t *addr);
+static int connection_edge_process_ordered_relay_cell(cell_t *cell,
+ circuit_t *circ,
+ edge_connection_t *conn,
+ crypt_path_t *layer_hint,
+ relay_header_t *rh);
/** Stats: how many relay cells have originated at this hop, or have
* been relayed onward (not recognized at this hop)?
@@ -610,7 +616,7 @@ pad_cell_payload(uint8_t *cell_payload, size_t data_len)
* return 0.
*/
MOCK_IMPL(int,
-relay_send_command_from_edge_,(streamid_t stream_id, circuit_t *circ,
+relay_send_command_from_edge_,(streamid_t stream_id, circuit_t *orig_circ,
uint8_t relay_command, const char *payload,
size_t payload_len, crypt_path_t *cpath_layer,
const char *filename, int lineno))
@@ -640,6 +646,7 @@ relay_send_command_from_edge_,(streamid_t stream_id, circuit_t *circ,
rh.stream_id = stream_id;
rh.length = payload_len;
relay_header_pack(cell.payload, &rh);
+
if (payload_len)
memcpy(cell.payload+RELAY_HEADER_SIZE, payload, payload_len);
@@ -2051,9 +2058,6 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
static int num_seen=0;
relay_header_t rh;
unsigned domain = layer_hint?LD_APP:LD_EXIT;
- int optimistic_data = 0; /* Set to 1 if we receive data on a stream
- * that's in the EXIT_CONN_STATE_RESOLVING
- * or EXIT_CONN_STATE_CONNECTING states. */
tor_assert(cell);
tor_assert(circ);
@@ -2086,8 +2090,66 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
}
}
+ /* Conflux handling: If conflux is disabled, or the relay command is not
+ * multiplexed across circuits, then process it immediately.
+ *
+ * Otherwise, we need to process the relay cell against our conflux
+ * queues, and if doing so results in ordered cells to deliver, we
+ * dequeue and process those in-order until there are no more.
+ */
+ if (!circ->conflux || !conflux_should_multiplex(rh.command)) {
+ return connection_edge_process_ordered_relay_cell(cell, circ, conn,
+ layer_hint, &rh);
+ } else {
+ // If conflux says this cell is in-order, then begin processing
+ // cells from queue until there are none. Otherwise, we do nothing
+ // until further cells arrive.
+ if (conflux_process_cell(circ->conflux, circ, layer_hint, cell)) {
+ conflux_cell_t *c_cell = NULL;
+ int ret = 0;
+
+ /* First, process this cell */
+ if ((ret = connection_edge_process_ordered_relay_cell(cell, circ, conn,
+ layer_hint, &rh)) < 0) {
+ return ret;
+ }
+
+ /* Now, check queue for more */
+ while ((c_cell = conflux_dequeue_cell(circ->conflux))) {
+ relay_header_unpack(&rh, c_cell->cell.payload);
+ conn = relay_lookup_conn(circ, &c_cell->cell, CELL_DIRECTION_OUT,
+ layer_hint);
+ if ((ret = connection_edge_process_ordered_relay_cell(&c_cell->cell,
+ circ, conn, layer_hint,
+ &rh)) < 0) {
+ /* Negative return value is a fatal error. Return early and tear down
+ * circuit */
+ tor_free(c_cell);
+ return ret;
+ }
+ tor_free(c_cell);
+ }
+ }
+ }
+
+ return 0;
+}
+
+/**
+ * Helper function to process a relay cell that is in the proper order
+ * for processing right now. */
+static int
+connection_edge_process_ordered_relay_cell(cell_t *cell, circuit_t *circ,
+ edge_connection_t *conn,
+ crypt_path_t *layer_hint,
+ relay_header_t *rh)
+{
+ int optimistic_data = 0; /* Set to 1 if we receive data on a stream
+ * that's in the EXIT_CONN_STATE_RESOLVING
+ * or EXIT_CONN_STATE_CONNECTING states. */
+
/* Tell circpad that we've received a recognized cell */
- circpad_deliver_recognized_relay_cell_events(circ, rh.command, layer_hint);
+ circpad_deliver_recognized_relay_cell_events(circ, rh->command, layer_hint);
/* either conn is NULL, in which case we've got a control cell, or else
* conn points to the recognized stream. */
@@ -2095,22 +2157,22 @@ connection_edge_process_relay_cell(cell_t *cell, circuit_t *circ,
if (conn->base_.type == CONN_TYPE_EXIT &&
(conn->base_.state == EXIT_CONN_STATE_CONNECTING ||
conn->base_.state == EXIT_CONN_STATE_RESOLVING) &&
- rh.command == RELAY_COMMAND_DATA) {
+ rh->command == RELAY_COMMAND_DATA) {
/* Allow DATA cells to be delivered to an exit node in state
* EXIT_CONN_STATE_CONNECTING or EXIT_CONN_STATE_RESOLVING.
* This speeds up HTTP, for example. */
optimistic_data = 1;
- } else if (rh.stream_id == 0 && rh.command == RELAY_COMMAND_DATA) {
+ } else if (rh->stream_id == 0 && rh->command == RELAY_COMMAND_DATA) {
log_warn(LD_BUG, "Somehow I had a connection that matched a "
"data cell with stream ID 0.");
} else {
return connection_edge_process_relay_cell_not_open(
- &rh, cell, circ, conn, layer_hint);
+ rh, cell, circ, conn, layer_hint);
}
}
return handle_relay_cell_command(cell, circ, conn, layer_hint,
- &rh, optimistic_data);
+ rh, optimistic_data);
}
/** How many relay_data cells have we built, ever? */
--
To stop receiving notification emails like this one, please contact
the administrator of this repository.
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits