[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [tor] 13/20: Prop#329 streams: Handle stream usage with conflux
This is an automated email from the git hooks/post-receive script.
ahf pushed a commit to branch main
in repository tor.
commit 2f865b4bba1bedc96f17b696916d74c392c83e1b
Author: Mike Perry <mikeperry-git@xxxxxxxxxxxxxx>
AuthorDate: Sun Apr 2 21:06:20 2023 +0000
Prop#329 streams: Handle stream usage with conflux
This adds utility functions to help stream block decisions, as well as cpath
layer_hint checks for stream cell acceptance, and syncing stream lists
for conflux circuits.
These functions are then called throughout the codebase to properly manage
conflux streams.
---
src/core/or/circuitlist.c | 1 +
src/core/or/circuitpadding.c | 7 +-
src/core/or/circuituse.c | 21 +-
src/core/or/conflux_util.c | 393 ++++++++++++++++++++++++++++++++
src/core/or/conflux_util.h | 59 +++++
src/core/or/congestion_control_common.c | 3 +-
src/core/or/congestion_control_flow.c | 47 +---
src/core/or/congestion_control_flow.h | 2 -
src/core/or/connection_edge.c | 6 +
src/core/or/edge_connection_st.h | 10 +-
src/core/or/or_circuit_st.h | 12 +-
src/core/or/origin_circuit_st.h | 11 +-
src/core/or/relay.c | 127 ++++++-----
src/feature/relay/dns.c | 4 +
14 files changed, 596 insertions(+), 107 deletions(-)
diff --git a/src/core/or/circuitlist.c b/src/core/or/circuitlist.c
index 01baf7c795..8c5beebbf3 100644
--- a/src/core/or/circuitlist.c
+++ b/src/core/or/circuitlist.c
@@ -2368,6 +2368,7 @@ circuit_about_to_free(circuit_t *circ)
if (! CIRCUIT_IS_ORIGIN(circ)) {
or_circuit_t *or_circ = TO_OR_CIRCUIT(circ);
edge_connection_t *conn;
+
for (conn=or_circ->n_streams; conn; conn=conn->next_stream)
connection_edge_destroy(or_circ->p_circ_id, conn);
or_circ->n_streams = NULL;
diff --git a/src/core/or/circuitpadding.c b/src/core/or/circuitpadding.c
index 99dc5f9d83..590a2d60c9 100644
--- a/src/core/or/circuitpadding.c
+++ b/src/core/or/circuitpadding.c
@@ -78,6 +78,8 @@
#include "core/crypto/relay_crypto.h"
#include "feature/nodelist/nodelist.h"
+#include "src/core/or/conflux_util.h"
+
#include "app/config/config.h"
static inline circpad_circuit_state_t circpad_circuit_state(
@@ -251,8 +253,11 @@ circpad_marked_circuit_for_padding(circuit_t *circ, int reason)
* has shut down, but using the MaxCircuitDirtiness timer instead of
* the idle circuit timer (again, we want this because we're not
* supposed to look idle to Guard nodes that can see our lifespan). */
- if (!circ->timestamp_dirty)
+ if (!circ->timestamp_dirty) {
circ->timestamp_dirty = approx_time();
+ if (circ->conflux && CIRCUIT_IS_ORIGIN(circ))
+ conflux_sync_circ_fields(circ->conflux, TO_ORIGIN_CIRCUIT(circ));
+ }
/* Take ownership of the circuit */
circuit_change_purpose(circ, CIRCUIT_PURPOSE_C_CIRCUIT_PADDING);
diff --git a/src/core/or/circuituse.c b/src/core/or/circuituse.c
index 25401aea55..f5d5cb4397 100644
--- a/src/core/or/circuituse.c
+++ b/src/core/or/circuituse.c
@@ -64,6 +64,7 @@
#include "lib/time/tvdiff.h"
#include "lib/trace/events.h"
#include "src/core/mainloop/mainloop.h"
+#include "core/or/conflux.h"
#include "core/or/cpath_build_state_st.h"
#include "feature/dircommon/dir_connection_st.h"
@@ -700,7 +701,6 @@ circuit_expire_building(void)
} else { /* circuit not open, consider recording failure as timeout */
int first_hop_succeeded = TO_ORIGIN_CIRCUIT(victim)->cpath &&
TO_ORIGIN_CIRCUIT(victim)->cpath->state == CPATH_STATE_OPEN;
-
if (TO_ORIGIN_CIRCUIT(victim)->p_streams != NULL) {
log_warn(LD_BUG, "Circuit %d (purpose %d, %s) has timed out, "
"yet has attached streams!",
@@ -1351,6 +1351,7 @@ circuit_detach_stream(circuit_t *circ, edge_connection_t *conn)
int removed = 0;
if (conn == origin_circ->p_streams) {
origin_circ->p_streams = conn->next_stream;
+ conflux_update_p_streams(origin_circ, conn->next_stream);
removed = 1;
} else {
for (prevconn = origin_circ->p_streams;
@@ -1383,10 +1384,12 @@ circuit_detach_stream(circuit_t *circ, edge_connection_t *conn)
or_circuit_t *or_circ = TO_OR_CIRCUIT(circ);
if (conn == or_circ->n_streams) {
or_circ->n_streams = conn->next_stream;
+ conflux_update_n_streams(or_circ, conn->next_stream);
return;
}
if (conn == or_circ->resolving_streams) {
or_circ->resolving_streams = conn->next_stream;
+ conflux_update_resolving_streams(or_circ, conn->next_stream);
return;
}
@@ -2556,7 +2559,13 @@ circuit_get_open_circ_or_launch(entry_connection_t *conn,
}
/** Return true iff <b>crypt_path</b> is one of the crypt_paths for
- * <b>circ</b>. */
+ * <b>circ</b>.
+ *
+ * WARNING: This function only validates that the cpath is on the *current*
+ * circuit, for internal consistency checking. For codepaths involving streams,
+ * or cpaths or layer_hints that could be from a different circuit due to
+ * conflux, use edge_uses_cpath() or conflux_validate_source_hop() instead.
+ */
static int
cpath_is_on_circuit(origin_circuit_t *circ, crypt_path_t *crypt_path)
{
@@ -2594,6 +2603,7 @@ link_apconn_to_circ(entry_connection_t *apconn, origin_circuit_t *circ,
ENTRY_TO_EDGE_CONN(apconn)->on_circuit = TO_CIRCUIT(circ);
/* assert_connection_ok(conn, time(NULL)); */
circ->p_streams = ENTRY_TO_EDGE_CONN(apconn);
+ conflux_update_p_streams(circ, ENTRY_TO_EDGE_CONN(apconn));
if (connection_edge_is_rendezvous_stream(ENTRY_TO_EDGE_CONN(apconn))) {
/* We are attaching a stream to a rendezvous circuit. That means
@@ -2733,6 +2743,9 @@ connection_ap_handshake_attach_chosen_circuit(entry_connection_t *conn,
/* When stream isolation is in use and controlled by an application
* we are willing to keep using the stream. */
circ->base_.timestamp_dirty = approx_time();
+ if (TO_CIRCUIT(circ)->conflux) {
+ conflux_sync_circ_fields(TO_CIRCUIT(circ)->conflux, circ);
+ }
}
pathbias_count_use_attempt(circ);
@@ -3103,6 +3116,10 @@ mark_circuit_unusable_for_new_conns(origin_circuit_t *circ)
circ->base_.timestamp_dirty -= options->MaxCircuitDirtiness;
circ->unusable_for_new_conns = 1;
+
+ if (TO_CIRCUIT(circ)->conflux) {
+ conflux_sync_circ_fields(TO_CIRCUIT(circ)->conflux, circ);
+ }
}
/**
diff --git a/src/core/or/conflux_util.c b/src/core/or/conflux_util.c
new file mode 100644
index 0000000000..855d477428
--- /dev/null
+++ b/src/core/or/conflux_util.c
@@ -0,0 +1,393 @@
+/* Copyright (c) 2021, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * \file conflux_util.c
+ * \brief Conflux utility functions for stream blocking and management.
+ */
+
+#define TOR_CONFLUX_PRIVATE
+
+#include "core/or/or.h"
+
+#include "core/or/circuit_st.h"
+#include "core/or/sendme.h"
+#include "core/or/congestion_control_common.h"
+#include "core/or/congestion_control_st.h"
+#include "core/or/circuitlist.h"
+#include "core/or/origin_circuit_st.h"
+#include "core/or/or_circuit_st.h"
+#include "core/or/conflux.h"
+#include "core/or/conflux_params.h"
+#include "core/or/conflux_util.h"
+#include "core/or/conflux_st.h"
+#include "lib/time/compat_time.h"
+#include "app/config/config.h"
+
+/**
+ * This is a utility function that returns the package window circuit,
+ * regardless of if it has a conflux pair or not.
+ */
+int
+circuit_get_package_window(circuit_t *circ,
+ const crypt_path_t *cpath)
+{
+ if (circ->conflux) {
+ if (CIRCUIT_IS_ORIGIN(circ)) {
+ tor_assert_nonfatal(circ->purpose ==
+ CIRCUIT_PURPOSE_CONFLUX_LINKED);
+ }
+ circ = conflux_decide_next_circ(circ->conflux);
+
+ /* If conflux has no circuit to send on, the package window is 0. */
+ if (!circ) {
+ return 0;
+ }
+
+ /* If we are the origin, we need to get the last hop's cpath for
+ * congestion control information. */
+ if (CIRCUIT_IS_ORIGIN(circ)) {
+ cpath = CONST_TO_ORIGIN_CIRCUIT(circ)->cpath->prev;
+ } else {
+ if (BUG(cpath != NULL)) {
+ log_warn(LD_BUG, "cpath is not NULL for non-origin circuit");
+ }
+ }
+ }
+
+ return congestion_control_get_package_window(circ, cpath);
+}
+
+/**
+ * Returns true if conflux can send a data cell.
+ *
+ * Used to decide if we should block streams or not, for
+ * proccess_sendme_cell(), circuit_resume_edge_reading(),
+ * circuit_consider_stop_edge_reading(), circuit_resume_edge_reading_helper(),
+ * channel_flush_from_first_active_circuit()
+*/
+bool
+conflux_can_send(conflux_t *cfx)
+{
+ const circuit_t *send_circ = conflux_decide_next_circ(cfx);
+
+ /* If we have a circuit, we can send */
+ if (send_circ) {
+ return true;
+ } else {
+ return false;
+ }
+}
+
+/**
+ * For a given conflux circuit, return the cpath of the destination.
+ *
+ * The cpath destination is the last hop of the circuit, or NULL if
+ * the circuit is a non-origin circuit.
+ */
+crypt_path_t *
+conflux_get_destination_hop(circuit_t *circ)
+{
+ if (BUG(!circ)) {
+ log_warn(LD_BUG, "No circuit to send on for conflux");
+ return NULL;
+ } else {
+ /* Conflux circuits always send multiplexed relay commands to
+ * to the last hop. (Non-multiplexed commands go on their
+ * original circuit and hop). */
+ if (CIRCUIT_IS_ORIGIN(circ)) {
+ return TO_ORIGIN_CIRCUIT(circ)->cpath->prev;
+ } else {
+ return NULL;
+ }
+ }
+}
+
+/**
+ * Validates that the source of a cell is from the last hop of the circuit
+ * for origin circuits, and that there are no further hops for non-origin
+ * circuits.
+ */
+bool
+conflux_validate_source_hop(circuit_t *in_circ,
+ crypt_path_t *layer_hint)
+{
+ crypt_path_t *dest = conflux_get_destination_hop(in_circ);
+
+ if (dest != layer_hint) {
+ log_warn(LD_CIRC, "Got conflux command from incorrect hop");
+ return false;
+ }
+
+ if (layer_hint == NULL) {
+ /* We should not have further hops attached to this circuit */
+ if (in_circ->n_chan) {
+ log_warn(LD_BUG, "Got conflux command on circuit with further hops");
+ return false;
+ }
+ }
+ return true;
+}
+
+/**
+ * Returns true if the edge connection uses the given cpath.
+ *
+ * If there is a conflux object, we inspect all the last hops of the conflux
+ * circuits.
+ */
+bool
+edge_uses_cpath(const edge_connection_t *conn,
+ const crypt_path_t *cpath)
+{
+ if (!conn->on_circuit)
+ return false;
+
+ if (CIRCUIT_IS_ORIGIN(conn->on_circuit)) {
+ if (conn->on_circuit->conflux) {
+ tor_assert_nonfatal(conn->on_circuit->purpose ==
+ CIRCUIT_PURPOSE_CONFLUX_LINKED);
+
+ /* If the circuit is an origin circuit with a conflux object, the cpath
+ * is valid if it came from any of the conflux circuit's last hops. */
+ CONFLUX_FOR_EACH_LEG_BEGIN(conn->on_circuit->conflux, leg) {
+ const origin_circuit_t *ocirc = CONST_TO_ORIGIN_CIRCUIT(leg->circ);
+ if (ocirc->cpath->prev == cpath) {
+ return true;
+ }
+ } CONFLUX_FOR_EACH_LEG_END(leg);
+ } else {
+ return cpath == conn->cpath_layer;
+ }
+ } else {
+ /* For non-origin circuits, cpath should be null */
+ return cpath == NULL;
+ }
+
+ return false;
+}
+
+/**
+ * Returns the max RTT for the circuit that carries this stream,
+ * as observed by congestion control. For conflux circuits,
+ * we return the max RTT across all circuits.
+ */
+uint64_t
+edge_get_max_rtt(const edge_connection_t *stream)
+{
+ if (!stream->on_circuit)
+ return 0;
+
+ if (stream->on_circuit->conflux) {
+ tor_assert_nonfatal(stream->on_circuit->purpose ==
+ CIRCUIT_PURPOSE_CONFLUX_LINKED);
+
+ /* Find the max rtt from the ccontrol object of each circuit. */
+ uint64_t max_rtt = 0;
+ CONFLUX_FOR_EACH_LEG_BEGIN(stream->on_circuit->conflux, leg) {
+ const congestion_control_t *cc = circuit_ccontrol(leg->circ);
+ if (cc->max_rtt_usec > max_rtt) {
+ max_rtt = cc->max_rtt_usec;
+ }
+ } CONFLUX_FOR_EACH_LEG_END(leg);
+
+ return max_rtt;
+ } else {
+ if (stream->on_circuit && stream->on_circuit->ccontrol)
+ return stream->on_circuit->ccontrol->max_rtt_usec;
+ else if (stream->cpath_layer && stream->cpath_layer->ccontrol)
+ return stream->cpath_layer->ccontrol->max_rtt_usec;
+ }
+
+ return 0;
+}
+
+/**
+ * Return true iff our decryption layer_hint is from the last hop
+ * in a circuit.
+ */
+bool
+relay_crypt_from_last_hop(const origin_circuit_t *circ,
+ const crypt_path_t *layer_hint)
+{
+ tor_assert(circ);
+ tor_assert(layer_hint);
+ tor_assert(circ->cpath);
+
+ if (TO_CIRCUIT(circ)->conflux) {
+ tor_assert_nonfatal(TO_CIRCUIT(circ)->purpose ==
+ CIRCUIT_PURPOSE_CONFLUX_LINKED);
+
+ /* If we are a conflux circuit, we need to check if the layer_hint
+ * is from the last hop of any of the conflux circuits. */
+ CONFLUX_FOR_EACH_LEG_BEGIN(TO_CIRCUIT(circ)->conflux, leg) {
+ const origin_circuit_t *ocirc = CONST_TO_ORIGIN_CIRCUIT(leg->circ);
+ if (layer_hint == ocirc->cpath->prev) {
+ return true;
+ }
+ } CONFLUX_FOR_EACH_LEG_END(leg);
+
+ log_fn(LOG_PROTOCOL_WARN, LD_CIRC,
+ "Got unexpected relay data from intermediate hop");
+ return false;
+ } else {
+ if (layer_hint != circ->cpath->prev) {
+ log_fn(LOG_PROTOCOL_WARN, LD_CIRC,
+ "Got unexpected relay data from intermediate hop");
+ return false;
+ }
+ return true;
+ }
+}
+
+/**
+ * Update the head of the n_streams list on all circuits in the conflux
+ * set.
+ */
+void
+conflux_update_p_streams(origin_circuit_t *circ, edge_connection_t *stream)
+{
+ tor_assert(circ);
+
+ if (TO_CIRCUIT(circ)->conflux) {
+ tor_assert_nonfatal(TO_CIRCUIT(circ)->purpose ==
+ CIRCUIT_PURPOSE_CONFLUX_LINKED);
+ CONFLUX_FOR_EACH_LEG_BEGIN(TO_CIRCUIT(circ)->conflux, leg) {
+ TO_ORIGIN_CIRCUIT(leg->circ)->p_streams = stream;
+ } CONFLUX_FOR_EACH_LEG_END(leg);
+ }
+}
+
+/**
+ * Sync the next_stream_id, timestamp_dirty, and circuit_idle_timeout
+ * fields of a conflux set to the values in a particular circuit.
+ *
+ * This is called upon link, and whenever one of these fields
+ * changes on ref_circ. The ref_circ values are copied to all
+ * other circuits in the conflux set.
+*/
+void
+conflux_sync_circ_fields(conflux_t *cfx, origin_circuit_t *ref_circ)
+{
+ tor_assert(cfx);
+ tor_assert(ref_circ);
+
+ CONFLUX_FOR_EACH_LEG_BEGIN(cfx, leg) {
+ if (leg->circ == TO_CIRCUIT(ref_circ)) {
+ continue;
+ }
+ origin_circuit_t *ocirc = TO_ORIGIN_CIRCUIT(leg->circ);
+ ocirc->next_stream_id = ref_circ->next_stream_id;
+ leg->circ->timestamp_dirty = TO_CIRCUIT(ref_circ)->timestamp_dirty;
+ ocirc->circuit_idle_timeout = ref_circ->circuit_idle_timeout;
+ ocirc->unusable_for_new_conns = ref_circ->unusable_for_new_conns;
+ } CONFLUX_FOR_EACH_LEG_END(leg);
+}
+
+/**
+ * Update the head of the n_streams list on all circuits in the conflux
+ * set.
+ */
+void
+conflux_update_n_streams(or_circuit_t *circ, edge_connection_t *stream)
+{
+ tor_assert(circ);
+
+ if (TO_CIRCUIT(circ)->conflux) {
+ CONFLUX_FOR_EACH_LEG_BEGIN(TO_CIRCUIT(circ)->conflux, leg) {
+ TO_OR_CIRCUIT(leg->circ)->n_streams = stream;
+ } CONFLUX_FOR_EACH_LEG_END(leg);
+ }
+}
+
+/**
+ * Update the head of the resolving_streams list on all circuits in the conflux
+ * set.
+ */
+void
+conflux_update_resolving_streams(or_circuit_t *circ, edge_connection_t *stream)
+{
+ tor_assert(circ);
+
+ if (TO_CIRCUIT(circ)->conflux) {
+ CONFLUX_FOR_EACH_LEG_BEGIN(TO_CIRCUIT(circ)->conflux, leg) {
+ TO_OR_CIRCUIT(leg->circ)->resolving_streams = stream;
+ } CONFLUX_FOR_EACH_LEG_END(leg);
+ }
+}
+
+/**
+ * Update the half_streams list on all circuits in the conflux
+ */
+void
+conflux_update_half_streams(origin_circuit_t *circ, smartlist_t *half_streams)
+{
+ tor_assert(circ);
+
+ if (TO_CIRCUIT(circ)->conflux) {
+ tor_assert_nonfatal(TO_CIRCUIT(circ)->purpose ==
+ CIRCUIT_PURPOSE_CONFLUX_LINKED);
+ CONFLUX_FOR_EACH_LEG_BEGIN(TO_CIRCUIT(circ)->conflux, leg) {
+ TO_ORIGIN_CIRCUIT(leg->circ)->half_streams = half_streams;
+ } CONFLUX_FOR_EACH_LEG_END(leg);
+ }
+}
+
+/**
+ * Helper function that emits non-fatal asserts if the stream lists
+ * or next_stream_id is out of sync between any of the conflux legs.
+*/
+void
+conflux_validate_stream_lists(const conflux_t *cfx)
+{
+ const conflux_leg_t *first_leg = smartlist_get(cfx->legs, 0);
+ tor_assert(first_leg);
+
+ /* Compare the stream lists of the first leg to all other legs. */
+ if (CIRCUIT_IS_ORIGIN(first_leg->circ)) {
+ const origin_circuit_t *f_circ =
+ CONST_TO_ORIGIN_CIRCUIT(first_leg->circ);
+
+ CONFLUX_FOR_EACH_LEG_BEGIN(cfx, leg) {
+ const origin_circuit_t *l_circ = CONST_TO_ORIGIN_CIRCUIT(leg->circ);
+ tor_assert_nonfatal(l_circ->p_streams == f_circ->p_streams);
+ tor_assert_nonfatal(l_circ->half_streams == f_circ->half_streams);
+ tor_assert_nonfatal(l_circ->next_stream_id == f_circ->next_stream_id);
+ } CONFLUX_FOR_EACH_LEG_END(leg);
+ } else {
+ const or_circuit_t *f_circ = CONST_TO_OR_CIRCUIT(first_leg->circ);
+ CONFLUX_FOR_EACH_LEG_BEGIN(cfx, leg) {
+ const or_circuit_t *l_circ = CONST_TO_OR_CIRCUIT(leg->circ);
+ tor_assert_nonfatal(l_circ->n_streams == f_circ->n_streams);
+ tor_assert_nonfatal(l_circ->resolving_streams ==
+ f_circ->resolving_streams);
+ } CONFLUX_FOR_EACH_LEG_END(leg);
+ }
+}
+
+/**
+ * Validate the conflux set has two legs, and both circuits have
+ * no nonce, and for origin circuits, the purpose is CONFLUX_PURPOSE_LINKED.
+ */
+void
+conflux_validate_legs(const conflux_t *cfx)
+{
+ tor_assert(cfx);
+ // TODO-329-UDP: Eventually we want to allow three legs for the
+ // exit case, to allow reconnection of legs to hit an RTT target.
+ // For now, this validation helps find bugs.
+ if (BUG(smartlist_len(cfx->legs) > conflux_params_get_num_legs_set())) {
+ log_warn(LD_BUG, "Number of legs is above maximum of %d allowed: %d\n",
+ conflux_params_get_num_legs_set(), smartlist_len(cfx->legs));
+ }
+
+ CONFLUX_FOR_EACH_LEG_BEGIN(cfx, leg) {
+ /* Ensure we have no pending nonce on the circ */
+ tor_assert_nonfatal(leg->circ->conflux_pending_nonce == NULL);
+ tor_assert_nonfatal(leg->circ->conflux != NULL);
+
+ if (CIRCUIT_IS_ORIGIN(leg->circ)) {
+ tor_assert_nonfatal(leg->circ->purpose ==
+ CIRCUIT_PURPOSE_CONFLUX_LINKED);
+ }
+ } CONFLUX_FOR_EACH_LEG_END(leg);
+}
diff --git a/src/core/or/conflux_util.h b/src/core/or/conflux_util.h
new file mode 100644
index 0000000000..c556ae1848
--- /dev/null
+++ b/src/core/or/conflux_util.h
@@ -0,0 +1,59 @@
+/* Copyright (c) 2023, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * \file conflux_util.h
+ * \brief Header file for conflux_util.c.
+ **/
+
+#ifndef TOR_CONFLUX_UTIL_H
+#define TOR_CONFLUX_UTIL_H
+
+/* Forward decls */
+typedef struct edge_connection_t edge_connection_t;
+typedef struct crypt_path_t crypt_path_t;
+typedef struct origin_circuit_t origin_circuit_t;
+typedef struct conflux_t conflux_t;
+
+/* True iff the given circuit_t circ is conflux related. */
+static inline bool
+CIRCUIT_IS_CONFLUX(const circuit_t *circ)
+{
+ if (circ->conflux_pending_nonce) {
+ if (CIRCUIT_IS_ORIGIN(circ))
+ tor_assert_nonfatal(circ->purpose == CIRCUIT_PURPOSE_CONFLUX_UNLINKED);
+ return true;
+ } else if (circ->conflux) {
+ if (CIRCUIT_IS_ORIGIN(circ))
+ tor_assert_nonfatal(circ->purpose == CIRCUIT_PURPOSE_CONFLUX_LINKED);
+ return true;
+ } else {
+ tor_assert_nonfatal(circ->purpose != CIRCUIT_PURPOSE_CONFLUX_LINKED);
+ tor_assert_nonfatal(circ->purpose != CIRCUIT_PURPOSE_CONFLUX_UNLINKED);
+ return false;
+ }
+}
+
+int circuit_get_package_window(circuit_t *circ,
+ const crypt_path_t *cpath);
+bool conflux_can_send(conflux_t *cfx);
+
+bool edge_uses_cpath(const edge_connection_t *conn,
+ const crypt_path_t *cpath);
+crypt_path_t *conflux_get_destination_hop(circuit_t *circ);
+bool conflux_validate_source_hop(circuit_t *in_circ,
+ crypt_path_t *layer_hint);
+uint64_t edge_get_max_rtt(const edge_connection_t *stream);
+bool relay_crypt_from_last_hop(const origin_circuit_t *circ,
+ const crypt_path_t *layer_hint);
+
+void conflux_update_p_streams(origin_circuit_t *, edge_connection_t *);
+void conflux_update_half_streams(origin_circuit_t *, smartlist_t *);
+void conflux_update_n_streams(or_circuit_t *, edge_connection_t *);
+void conflux_update_resolving_streams(or_circuit_t *, edge_connection_t *);
+void conflux_sync_circ_fields(conflux_t *cfx, origin_circuit_t *ref_circ);
+void conflux_validate_stream_lists(const conflux_t *cfx);
+void conflux_validate_legs(const conflux_t *cfx);
+
+#endif /* TOR_CONFLUX_UTIL_H */
+
diff --git a/src/core/or/congestion_control_common.c b/src/core/or/congestion_control_common.c
index c7c950d0c8..1e0f504df1 100644
--- a/src/core/or/congestion_control_common.c
+++ b/src/core/or/congestion_control_common.c
@@ -24,6 +24,7 @@
#include "core/or/congestion_control_westwood.h"
#include "core/or/congestion_control_st.h"
#include "core/or/conflux.h"
+#include "core/or/conflux_util.h"
#include "core/or/trace_probes_cc.h"
#include "lib/time/compat_time.h"
#include "feature/nodelist/networkstatus.h"
@@ -703,7 +704,7 @@ circuit_has_active_streams(const circuit_t *circ,
if (conn->base_.marked_for_close)
continue;
- if (!layer_hint || conn->cpath_layer == layer_hint) {
+ if (edge_uses_cpath(conn, layer_hint)) {
if (connection_get_inbuf_len(TO_CONN(conn)) > 0) {
log_info(LD_CIRC, "CC: More in edge inbuf...");
return 1;
diff --git a/src/core/or/congestion_control_flow.c b/src/core/or/congestion_control_flow.c
index 90b1927ef9..fa9455a8a1 100644
--- a/src/core/or/congestion_control_flow.c
+++ b/src/core/or/congestion_control_flow.c
@@ -28,6 +28,7 @@
#include "core/or/connection_st.h"
#include "core/or/cell_st.h"
#include "app/config/config.h"
+#include "core/or/conflux_util.h"
/** Cache consensus parameters */
static uint32_t xoff_client;
@@ -60,27 +61,6 @@ double cc_stats_flow_xon_outbuf_ma = 0;
#define ONE_MEGABYTE (UINT64_C(1) << 20)
#define TOTAL_XMIT_SCALE_AT (10 * ONE_MEGABYTE)
-/**
- * Return the congestion control object of the given edge connection.
- *
- * Returns NULL if the edge connection doesn't have a cpath_layer or not
- * attached to a circuit. But also if the cpath_layer or circuit doesn't have a
- * congestion control object.
- */
-static inline const congestion_control_t *
-edge_get_ccontrol(const edge_connection_t *edge)
-{
- congestion_control_t *ccontrol = NULL;
-
- if (edge->on_circuit && edge->on_circuit->ccontrol) {
- ccontrol = edge->on_circuit->ccontrol;
- } else if (edge->cpath_layer && edge->cpath_layer->ccontrol) {
- ccontrol = edge->cpath_layer->ccontrol;
- }
-
- return ccontrol;
-}
-
/**
* Update global congestion control related consensus parameter values, every
* consensus update.
@@ -265,13 +245,13 @@ circuit_process_stream_xoff(edge_connection_t *conn,
}
/* Make sure this XOFF came from the right hop */
- if (layer_hint && layer_hint != conn->cpath_layer) {
+ if (!edge_uses_cpath(conn, layer_hint)) {
log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
"Got XOFF from wrong hop.");
return false;
}
- if (edge_get_ccontrol(conn) == NULL) {
+ if (!edge_uses_flow_control(conn)) {
log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
"Got XOFF for non-congestion control circuit");
return false;
@@ -359,13 +339,13 @@ circuit_process_stream_xon(edge_connection_t *conn,
}
/* Make sure this XON came from the right hop */
- if (layer_hint && layer_hint != conn->cpath_layer) {
+ if (!edge_uses_cpath(conn, layer_hint)) {
log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
"Got XON from wrong hop.");
return false;
}
- if (edge_get_ccontrol(conn) == NULL) {
+ if (!edge_uses_flow_control(conn)) {
log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
"Got XON for non-congestion control circuit");
return false;
@@ -464,7 +444,7 @@ flow_control_decide_xoff(edge_connection_t *stream)
size_t total_buffered = connection_get_outbuf_len(TO_CONN(stream));
uint32_t buffer_limit_xoff = 0;
- if (BUG(edge_get_ccontrol(stream) == NULL)) {
+ if (BUG(!edge_uses_flow_control(stream))) {
log_err(LD_BUG, "Flow control called for non-congestion control circuit");
return -1;
}
@@ -717,21 +697,6 @@ edge_uses_flow_control(const edge_connection_t *stream)
return ret;
}
-/**
- * Returns the max RTT for the circuit that carries this stream,
- * as observed by congestion control.
- */
-uint64_t
-edge_get_max_rtt(const edge_connection_t *stream)
-{
- if (stream->on_circuit && stream->on_circuit->ccontrol)
- return stream->on_circuit->ccontrol->max_rtt_usec;
- else if (stream->cpath_layer && stream->cpath_layer->ccontrol)
- return stream->cpath_layer->ccontrol->max_rtt_usec;
-
- return 0;
-}
-
/** Returns true if a connection is an edge conn that uses flow control */
bool
conn_uses_flow_control(connection_t *conn)
diff --git a/src/core/or/congestion_control_flow.h b/src/core/or/congestion_control_flow.h
index 5c735cce23..05e25c44d0 100644
--- a/src/core/or/congestion_control_flow.h
+++ b/src/core/or/congestion_control_flow.h
@@ -31,8 +31,6 @@ bool edge_uses_flow_control(const edge_connection_t *stream);
bool conn_uses_flow_control(connection_t *stream);
-uint64_t edge_get_max_rtt(const edge_connection_t *);
-
/** Metricsport externs */
extern uint64_t cc_stats_flow_num_xoff_sent;
extern uint64_t cc_stats_flow_num_xon_sent;
diff --git a/src/core/or/connection_edge.c b/src/core/or/connection_edge.c
index b0ccedc27f..ee6ab8596c 100644
--- a/src/core/or/connection_edge.c
+++ b/src/core/or/connection_edge.c
@@ -70,6 +70,7 @@
#include "core/or/circuitpadding.h"
#include "core/or/connection_edge.h"
#include "core/or/congestion_control_flow.h"
+#include "core/or/conflux_util.h"
#include "core/or/circuitstats.h"
#include "core/or/connection_or.h"
#include "core/or/extendinfo.h"
@@ -628,6 +629,7 @@ connection_half_edge_add(const edge_connection_t *conn,
if (!circ->half_streams) {
circ->half_streams = smartlist_new();
+ conflux_update_half_streams(circ, circ->half_streams);
}
half_conn->stream_id = conn->stream_id;
@@ -3102,6 +3104,10 @@ get_unique_stream_id_by_circ(origin_circuit_t *circ)
test_stream_id))
goto again;
+ if (TO_CIRCUIT(circ)->conflux) {
+ conflux_sync_circ_fields(TO_CIRCUIT(circ)->conflux, circ);
+ }
+
return test_stream_id;
}
diff --git a/src/core/or/edge_connection_st.h b/src/core/or/edge_connection_st.h
index 22f9040d15..e8a3039b33 100644
--- a/src/core/or/edge_connection_st.h
+++ b/src/core/or/edge_connection_st.h
@@ -28,11 +28,15 @@ struct edge_connection_t {
* circuit? */
int deliver_window; /**< How many more relay cells can end at me? */
- struct circuit_t *on_circuit; /**< The circuit (if any) that this edge
- * connection is using. */
+ /** The circuit (if any) that this edge connection is using.
+ * Note that edges that use conflux should use the helpers
+ * in conflux_util.c instead of accessing this directly. */
+ struct circuit_t *on_circuit;
/** A pointer to which node in the circ this conn exits at. Set for AP
- * connections and for hidden service exit connections. */
+ * connections and for hidden service exit connections.
+ * Note that edges that use conflux should use the helpers
+ * in conflux_util.c instead of accessing this directly. */
struct crypt_path_t *cpath_layer;
/* Hidden service connection identifier for edge connections. Used by the HS
diff --git a/src/core/or/or_circuit_st.h b/src/core/or/or_circuit_st.h
index 11695ec301..d5a7007928 100644
--- a/src/core/or/or_circuit_st.h
+++ b/src/core/or/or_circuit_st.h
@@ -35,10 +35,18 @@ struct or_circuit_t {
cell_queue_t p_chan_cells;
/** The channel that is previous in this circuit. */
channel_t *p_chan;
- /** Linked list of Exit streams associated with this circuit. */
+ /** Linked list of Exit streams associated with this circuit.
+ *
+ * Note that any updates to this pointer must be followed with
+ * conflux_update_n_streams() to keep the other legs n_streams
+ * in sync. */
edge_connection_t *n_streams;
/** Linked list of Exit streams associated with this circuit that are
- * still being resolved. */
+ * still being resolved.
+ *
+ * Just like with n_streams, any updates to this pointer must
+ * be followed with conflux_update_resolving_streams().
+ */
edge_connection_t *resolving_streams;
/** Cryptographic state used for encrypting and authenticating relay
diff --git a/src/core/or/origin_circuit_st.h b/src/core/or/origin_circuit_st.h
index 73b971f72d..c5c255bb49 100644
--- a/src/core/or/origin_circuit_st.h
+++ b/src/core/or/origin_circuit_st.h
@@ -80,11 +80,18 @@ struct origin_circuit_t {
circuit_t base_;
/** Linked list of AP streams (or EXIT streams if hidden service)
- * associated with this circuit. */
+ * associated with this circuit.
+ *
+ * Any updates to this pointer must be followed with
+ * conflux_update_p_streams(). */
edge_connection_t *p_streams;
/** Smartlist of half-closed streams (half_edge_t*) that still
- * have pending activity */
+ * have pending activity.
+ *
+ * Any updates to this pointer must be followed with
+ * conflux_update_half_streams().
+ */
smartlist_t *half_streams;
/** Bytes read on this circuit since last call to
diff --git a/src/core/or/relay.c b/src/core/or/relay.c
index 827f0c3e46..58e48df902 100644
--- a/src/core/or/relay.c
+++ b/src/core/or/relay.c
@@ -122,7 +122,8 @@ static int connection_edge_process_ordered_relay_cell(cell_t *cell,
edge_connection_t *conn,
crypt_path_t *layer_hint,
relay_header_t *rh);
-static void set_block_state_for_streams(edge_connection_t *stream_list,
+static void set_block_state_for_streams(circuit_t *circ,
+ edge_connection_t *stream_list,
int block, streamid_t stream_id);
/** Stats: how many relay cells have originated at this hop, or have
@@ -455,7 +456,7 @@ relay_lookup_conn(circuit_t *circ, cell_t *cell,
tmpconn=tmpconn->next_stream) {
if (rh.stream_id == tmpconn->stream_id &&
!tmpconn->base_.marked_for_close &&
- tmpconn->cpath_layer == layer_hint) {
+ edge_uses_cpath(tmpconn, layer_hint)) {
log_debug(LD_APP,"found conn for stream %d.", rh.stream_id);
return tmpconn;
}
@@ -1549,25 +1550,6 @@ connection_edge_process_relay_cell_not_open(
// return -1;
}
-/**
- * Return true iff our decryption layer_hint is from the last hop
- * in a circuit.
- */
-static bool
-relay_crypt_from_last_hop(origin_circuit_t *circ, crypt_path_t *layer_hint)
-{
- tor_assert(circ);
- tor_assert(layer_hint);
- tor_assert(circ->cpath);
-
- if (layer_hint != circ->cpath->prev) {
- log_fn(LOG_PROTOCOL_WARN, LD_CIRC,
- "Got unexpected relay data from intermediate hop");
- return false;
- }
- return true;
-}
-
/** Process a SENDME cell that arrived on <b>circ</b>. If it is a stream level
* cell, it is destined for the given <b>conn</b>. If it is a circuit level
* cell, it is destined for the <b>layer_hint</b>. The <b>domain</b> is the
@@ -2454,6 +2436,15 @@ circuit_resume_edge_reading(circuit_t *circ, crypt_path_t *layer_hint)
log_debug(layer_hint?LD_APP:LD_EXIT,"Too big queue, no resuming");
return;
}
+
+ /* If we have a conflux negotiated, and it still can't send on
+ * any circuit, then do not resume sending. */
+ if (circ->conflux && !conflux_can_send(circ->conflux)) {
+ log_debug(layer_hint?LD_APP:LD_EXIT,
+ "Conflux can't send, not resuming edges");
+ return;
+ }
+
log_debug(layer_hint?LD_APP:LD_EXIT,"resuming");
if (CIRCUIT_IS_ORIGIN(circ))
@@ -2487,20 +2478,6 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
return 0;
}
- /* How many cells do we have space for? It will be the minimum of
- * the number needed to exhaust the package window, and the minimum
- * needed to fill the cell queue. */
-
- max_to_package = congestion_control_get_package_window(circ, layer_hint);
- if (CIRCUIT_IS_ORIGIN(circ)) {
- cells_on_queue = circ->n_chan_cells.n;
- } else {
- or_circuit_t *or_circ = TO_OR_CIRCUIT(circ);
- cells_on_queue = or_circ->p_chan_cells.n;
- }
- if (cell_queue_highwatermark() - cells_on_queue < max_to_package)
- max_to_package = cell_queue_highwatermark() - cells_on_queue;
-
/* Once we used to start listening on the streams in the order they
* appeared in the linked list. That leads to starvation on the
* streams that appeared later on the list, since the first streams
@@ -2539,11 +2516,13 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
/* Activate reading starting from the chosen stream */
for (conn=chosen_stream; conn; conn = conn->next_stream) {
/* Start reading for the streams starting from here */
- if (conn->base_.marked_for_close || conn->package_window <= 0 ||
- conn->xoff_received)
+ if (conn->base_.marked_for_close || conn->package_window <= 0)
continue;
- if (!layer_hint || conn->cpath_layer == layer_hint) {
- connection_start_reading(TO_CONN(conn));
+
+ if (edge_uses_cpath(conn, layer_hint)) {
+ if (!conn->xoff_received) {
+ connection_start_reading(TO_CONN(conn));
+ }
if (connection_get_inbuf_len(TO_CONN(conn)) > 0)
++n_packaging_streams;
@@ -2551,11 +2530,13 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
}
/* Go back and do the ones we skipped, circular-style */
for (conn = first_conn; conn != chosen_stream; conn = conn->next_stream) {
- if (conn->base_.marked_for_close || conn->package_window <= 0 ||
- conn->xoff_received)
+ if (conn->base_.marked_for_close || conn->package_window <= 0)
continue;
- if (!layer_hint || conn->cpath_layer == layer_hint) {
- connection_start_reading(TO_CONN(conn));
+
+ if (edge_uses_cpath(conn, layer_hint)) {
+ if (!conn->xoff_received) {
+ connection_start_reading(TO_CONN(conn));
+ }
if (connection_get_inbuf_len(TO_CONN(conn)) > 0)
++n_packaging_streams;
@@ -2567,6 +2548,32 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
again:
+ /* If we're using conflux, the circuit we decide to send on may change
+ * after we're sending. Get it again, and re-check package windows
+ * for it */
+ if (circ->conflux) {
+ if (circuit_consider_stop_edge_reading(circ, layer_hint))
+ return -1;
+
+ circ = conflux_decide_next_circ(circ->conflux);
+
+ /* Get the destination layer hint for this circuit */
+ layer_hint = conflux_get_destination_hop(circ);
+ }
+
+ /* How many cells do we have space for? It will be the minimum of
+ * the number needed to exhaust the package window, and the minimum
+ * needed to fill the cell queue. */
+ max_to_package = congestion_control_get_package_window(circ, layer_hint);
+ if (CIRCUIT_IS_ORIGIN(circ)) {
+ cells_on_queue = circ->n_chan_cells.n;
+ } else {
+ or_circuit_t *or_circ = TO_OR_CIRCUIT(circ);
+ cells_on_queue = or_circ->p_chan_cells.n;
+ }
+ if (cell_queue_highwatermark() - cells_on_queue < max_to_package)
+ max_to_package = cell_queue_highwatermark() - cells_on_queue;
+
cells_per_conn = CEIL_DIV(max_to_package, n_packaging_streams);
packaged_this_round = 0;
@@ -2580,7 +2587,7 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
for (conn=first_conn; conn; conn=conn->next_stream) {
if (conn->base_.marked_for_close || conn->package_window <= 0)
continue;
- if (!layer_hint || conn->cpath_layer == layer_hint) {
+ if (edge_uses_cpath(conn, layer_hint)) {
int n = cells_per_conn, r;
/* handle whatever might still be on the inbuf */
r = connection_edge_package_raw_inbuf(conn, 1, &n);
@@ -2638,7 +2645,7 @@ circuit_consider_stop_edge_reading(circuit_t *circ, crypt_path_t *layer_hint)
or_circuit_t *or_circ = TO_OR_CIRCUIT(circ);
log_debug(domain,"considering circ->package_window %d",
circ->package_window);
- if (congestion_control_get_package_window(circ, layer_hint) <= 0) {
+ if (circuit_get_package_window(circ, layer_hint) <= 0) {
log_debug(domain,"yes, not-at-origin. stopped.");
for (conn = or_circ->n_streams; conn; conn=conn->next_stream)
connection_stop_reading(TO_CONN(conn));
@@ -2649,11 +2656,11 @@ circuit_consider_stop_edge_reading(circuit_t *circ, crypt_path_t *layer_hint)
/* else, layer hint is defined, use it */
log_debug(domain,"considering layer_hint->package_window %d",
layer_hint->package_window);
- if (congestion_control_get_package_window(circ, layer_hint) <= 0) {
+ if (circuit_get_package_window(circ, layer_hint) <= 0) {
log_debug(domain,"yes, at-origin. stopped.");
for (conn = TO_ORIGIN_CIRCUIT(circ)->p_streams; conn;
conn=conn->next_stream) {
- if (conn->cpath_layer == layer_hint)
+ if (edge_uses_cpath(conn, layer_hint))
connection_stop_reading(TO_CONN(conn));
}
return 1;
@@ -3029,7 +3036,7 @@ set_circuit_blocked_on_chan(circuit_t *circ, channel_t *chan, int block)
edge = TO_OR_CIRCUIT(circ)->n_streams;
}
- set_block_state_for_streams(edge, block, 0);
+ set_block_state_for_streams(circ, edge, block, 0);
}
/**
@@ -3039,15 +3046,29 @@ set_circuit_blocked_on_chan(circuit_t *circ, channel_t *chan, int block)
* in the stream list. If it is non-zero, only apply to that specific stream.
*/
static void
-set_block_state_for_streams(edge_connection_t *stream_list, int block,
- streamid_t stream_id)
+set_block_state_for_streams(circuit_t *circ, edge_connection_t *stream_list,
+ int block, streamid_t stream_id)
{
+ /* If we have a conflux object, we need to examine its status before
+ * blocking and unblocking streams. */
+ if (circ->conflux) {
+ bool can_send = conflux_can_send(circ->conflux);
+
+ if (block && can_send) {
+ /* Don't actually block streams, since conflux can send*/
+ return;
+ } else if (!block && !can_send) {
+ /* Don't actually unblock streams, since conflux still can't send */
+ return;
+ }
+ }
+
for (edge_connection_t *edge = stream_list; edge; edge = edge->next_stream) {
connection_t *conn = TO_CONN(edge);
if (stream_id && edge->stream_id != stream_id)
continue;
- if (!conn->read_event) {
+ if (!conn->read_event || edge->xoff_received) {
/* This connection is a placeholder for something; probably a DNS
* request. It can't actually stop or start reading.*/
continue;
@@ -3412,8 +3433,8 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan,
if (circ_blocked && fromstream) {
/* This edge connection is apparently not blocked; this can happen for
* new streams on a blocked circuit, for their CONNECTED response.
- * block it now. */
- set_block_state_for_streams(stream_list, 1, fromstream);
+ * block it now, unless we have conflux. */
+ set_block_state_for_streams(circ, stream_list, 1, fromstream);
}
update_circuit_on_cmux(circ, direction);
diff --git a/src/feature/relay/dns.c b/src/feature/relay/dns.c
index 7267ca06dd..f6a020d061 100644
--- a/src/feature/relay/dns.c
+++ b/src/feature/relay/dns.c
@@ -71,6 +71,7 @@
#include "core/or/edge_connection_st.h"
#include "core/or/or_circuit_st.h"
+#include "core/or/conflux_util.h"
#include "ht.h"
@@ -650,6 +651,7 @@ dns_resolve(edge_connection_t *exitconn)
* connected cell. */
exitconn->next_stream = oncirc->n_streams;
oncirc->n_streams = exitconn;
+ conflux_update_n_streams(oncirc, exitconn);
}
break;
case 0:
@@ -658,6 +660,7 @@ dns_resolve(edge_connection_t *exitconn)
exitconn->base_.state = EXIT_CONN_STATE_RESOLVING;
exitconn->next_stream = oncirc->resolving_streams;
oncirc->resolving_streams = exitconn;
+ conflux_update_resolving_streams(oncirc, exitconn);
break;
case -2:
case -1:
@@ -1234,6 +1237,7 @@ inform_pending_connections(cached_resolve_t *resolve)
pend->conn->next_stream = TO_OR_CIRCUIT(circ)->n_streams;
pend->conn->on_circuit = circ;
TO_OR_CIRCUIT(circ)->n_streams = pend->conn;
+ conflux_update_n_streams(TO_OR_CIRCUIT(circ), pend->conn);
connection_exit_connect(pend->conn);
} else {
--
To stop receiving notification emails like this one, please contact
the administrator of this repository.
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits