[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [tor] 13/20: Prop#329 streams: Handle stream usage with conflux



This is an automated email from the git hooks/post-receive script.

ahf pushed a commit to branch main
in repository tor.

commit 2f865b4bba1bedc96f17b696916d74c392c83e1b
Author: Mike Perry <mikeperry-git@xxxxxxxxxxxxxx>
AuthorDate: Sun Apr 2 21:06:20 2023 +0000

    Prop#329 streams: Handle stream usage with conflux
    
    This adds utility functions to help stream block decisions, as well as cpath
    layer_hint checks for stream cell acceptance, and syncing stream lists
    for conflux circuits.
    
    These functions are then called throughout the codebase to properly manage
    conflux streams.
---
 src/core/or/circuitlist.c               |   1 +
 src/core/or/circuitpadding.c            |   7 +-
 src/core/or/circuituse.c                |  21 +-
 src/core/or/conflux_util.c              | 393 ++++++++++++++++++++++++++++++++
 src/core/or/conflux_util.h              |  59 +++++
 src/core/or/congestion_control_common.c |   3 +-
 src/core/or/congestion_control_flow.c   |  47 +---
 src/core/or/congestion_control_flow.h   |   2 -
 src/core/or/connection_edge.c           |   6 +
 src/core/or/edge_connection_st.h        |  10 +-
 src/core/or/or_circuit_st.h             |  12 +-
 src/core/or/origin_circuit_st.h         |  11 +-
 src/core/or/relay.c                     | 127 ++++++-----
 src/feature/relay/dns.c                 |   4 +
 14 files changed, 596 insertions(+), 107 deletions(-)

diff --git a/src/core/or/circuitlist.c b/src/core/or/circuitlist.c
index 01baf7c795..8c5beebbf3 100644
--- a/src/core/or/circuitlist.c
+++ b/src/core/or/circuitlist.c
@@ -2368,6 +2368,7 @@ circuit_about_to_free(circuit_t *circ)
   if (! CIRCUIT_IS_ORIGIN(circ)) {
     or_circuit_t *or_circ = TO_OR_CIRCUIT(circ);
     edge_connection_t *conn;
+
     for (conn=or_circ->n_streams; conn; conn=conn->next_stream)
       connection_edge_destroy(or_circ->p_circ_id, conn);
     or_circ->n_streams = NULL;
diff --git a/src/core/or/circuitpadding.c b/src/core/or/circuitpadding.c
index 99dc5f9d83..590a2d60c9 100644
--- a/src/core/or/circuitpadding.c
+++ b/src/core/or/circuitpadding.c
@@ -78,6 +78,8 @@
 #include "core/crypto/relay_crypto.h"
 #include "feature/nodelist/nodelist.h"
 
+#include "src/core/or/conflux_util.h"
+
 #include "app/config/config.h"
 
 static inline circpad_circuit_state_t circpad_circuit_state(
@@ -251,8 +253,11 @@ circpad_marked_circuit_for_padding(circuit_t *circ, int reason)
      * has shut down, but using the MaxCircuitDirtiness timer instead of
      * the idle circuit timer (again, we want this because we're not
      * supposed to look idle to Guard nodes that can see our lifespan). */
-    if (!circ->timestamp_dirty)
+    if (!circ->timestamp_dirty) {
       circ->timestamp_dirty = approx_time();
+      if (circ->conflux && CIRCUIT_IS_ORIGIN(circ))
+        conflux_sync_circ_fields(circ->conflux, TO_ORIGIN_CIRCUIT(circ));
+    }
 
     /* Take ownership of the circuit */
     circuit_change_purpose(circ, CIRCUIT_PURPOSE_C_CIRCUIT_PADDING);
diff --git a/src/core/or/circuituse.c b/src/core/or/circuituse.c
index 25401aea55..f5d5cb4397 100644
--- a/src/core/or/circuituse.c
+++ b/src/core/or/circuituse.c
@@ -64,6 +64,7 @@
 #include "lib/time/tvdiff.h"
 #include "lib/trace/events.h"
 #include "src/core/mainloop/mainloop.h"
+#include "core/or/conflux.h"
 
 #include "core/or/cpath_build_state_st.h"
 #include "feature/dircommon/dir_connection_st.h"
@@ -700,7 +701,6 @@ circuit_expire_building(void)
     } else { /* circuit not open, consider recording failure as timeout */
       int first_hop_succeeded = TO_ORIGIN_CIRCUIT(victim)->cpath &&
             TO_ORIGIN_CIRCUIT(victim)->cpath->state == CPATH_STATE_OPEN;
-
       if (TO_ORIGIN_CIRCUIT(victim)->p_streams != NULL) {
         log_warn(LD_BUG, "Circuit %d (purpose %d, %s) has timed out, "
                  "yet has attached streams!",
@@ -1351,6 +1351,7 @@ circuit_detach_stream(circuit_t *circ, edge_connection_t *conn)
     int removed = 0;
     if (conn == origin_circ->p_streams) {
       origin_circ->p_streams = conn->next_stream;
+      conflux_update_p_streams(origin_circ, conn->next_stream);
       removed = 1;
     } else {
       for (prevconn = origin_circ->p_streams;
@@ -1383,10 +1384,12 @@ circuit_detach_stream(circuit_t *circ, edge_connection_t *conn)
     or_circuit_t *or_circ = TO_OR_CIRCUIT(circ);
     if (conn == or_circ->n_streams) {
       or_circ->n_streams = conn->next_stream;
+      conflux_update_n_streams(or_circ, conn->next_stream);
       return;
     }
     if (conn == or_circ->resolving_streams) {
       or_circ->resolving_streams = conn->next_stream;
+      conflux_update_resolving_streams(or_circ, conn->next_stream);
       return;
     }
 
@@ -2556,7 +2559,13 @@ circuit_get_open_circ_or_launch(entry_connection_t *conn,
 }
 
 /** Return true iff <b>crypt_path</b> is one of the crypt_paths for
- * <b>circ</b>. */
+ * <b>circ</b>.
+ *
+ * WARNING: This function only validates that the cpath is on the *current*
+ * circuit, for internal consistency checking. For codepaths involving streams,
+ * or cpaths or layer_hints that could be from a different circuit due to
+ * conflux, use edge_uses_cpath() or conflux_validate_source_hop() instead.
+ */
 static int
 cpath_is_on_circuit(origin_circuit_t *circ, crypt_path_t *crypt_path)
 {
@@ -2594,6 +2603,7 @@ link_apconn_to_circ(entry_connection_t *apconn, origin_circuit_t *circ,
   ENTRY_TO_EDGE_CONN(apconn)->on_circuit = TO_CIRCUIT(circ);
   /* assert_connection_ok(conn, time(NULL)); */
   circ->p_streams = ENTRY_TO_EDGE_CONN(apconn);
+  conflux_update_p_streams(circ, ENTRY_TO_EDGE_CONN(apconn));
 
   if (connection_edge_is_rendezvous_stream(ENTRY_TO_EDGE_CONN(apconn))) {
     /* We are attaching a stream to a rendezvous circuit.  That means
@@ -2733,6 +2743,9 @@ connection_ap_handshake_attach_chosen_circuit(entry_connection_t *conn,
     /* When stream isolation is in use and controlled by an application
      * we are willing to keep using the stream. */
     circ->base_.timestamp_dirty = approx_time();
+    if (TO_CIRCUIT(circ)->conflux) {
+      conflux_sync_circ_fields(TO_CIRCUIT(circ)->conflux, circ);
+    }
   }
 
   pathbias_count_use_attempt(circ);
@@ -3103,6 +3116,10 @@ mark_circuit_unusable_for_new_conns(origin_circuit_t *circ)
     circ->base_.timestamp_dirty -= options->MaxCircuitDirtiness;
 
   circ->unusable_for_new_conns = 1;
+
+  if (TO_CIRCUIT(circ)->conflux) {
+    conflux_sync_circ_fields(TO_CIRCUIT(circ)->conflux, circ);
+  }
 }
 
 /**
diff --git a/src/core/or/conflux_util.c b/src/core/or/conflux_util.c
new file mode 100644
index 0000000000..855d477428
--- /dev/null
+++ b/src/core/or/conflux_util.c
@@ -0,0 +1,393 @@
+/* Copyright (c) 2021, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * \file conflux_util.c
+ * \brief Conflux utility functions for stream blocking and management.
+ */
+
+#define TOR_CONFLUX_PRIVATE
+
+#include "core/or/or.h"
+
+#include "core/or/circuit_st.h"
+#include "core/or/sendme.h"
+#include "core/or/congestion_control_common.h"
+#include "core/or/congestion_control_st.h"
+#include "core/or/circuitlist.h"
+#include "core/or/origin_circuit_st.h"
+#include "core/or/or_circuit_st.h"
+#include "core/or/conflux.h"
+#include "core/or/conflux_params.h"
+#include "core/or/conflux_util.h"
+#include "core/or/conflux_st.h"
+#include "lib/time/compat_time.h"
+#include "app/config/config.h"
+
+/**
+ * This is a utility function that returns the package window circuit,
+ * regardless of if it has a conflux pair or not.
+ */
+int
+circuit_get_package_window(circuit_t *circ,
+                           const crypt_path_t *cpath)
+{
+  if (circ->conflux) {
+    if (CIRCUIT_IS_ORIGIN(circ)) {
+      tor_assert_nonfatal(circ->purpose ==
+                          CIRCUIT_PURPOSE_CONFLUX_LINKED);
+    }
+    circ = conflux_decide_next_circ(circ->conflux);
+
+    /* If conflux has no circuit to send on, the package window is 0. */
+    if (!circ) {
+      return 0;
+    }
+
+    /* If we are the origin, we need to get the last hop's cpath for
+     * congestion control information. */
+    if (CIRCUIT_IS_ORIGIN(circ)) {
+      cpath = CONST_TO_ORIGIN_CIRCUIT(circ)->cpath->prev;
+    } else {
+      if (BUG(cpath != NULL)) {
+        log_warn(LD_BUG, "cpath is not NULL for non-origin circuit");
+      }
+    }
+  }
+
+  return congestion_control_get_package_window(circ, cpath);
+}
+
+/**
+ * Returns true if conflux can send a data cell.
+ *
+ * Used to decide if we should block streams or not, for
+ * proccess_sendme_cell(), circuit_resume_edge_reading(),
+ * circuit_consider_stop_edge_reading(), circuit_resume_edge_reading_helper(),
+ * channel_flush_from_first_active_circuit()
+*/
+bool
+conflux_can_send(conflux_t *cfx)
+{
+  const circuit_t *send_circ = conflux_decide_next_circ(cfx);
+
+  /* If we have a circuit, we can send */
+  if (send_circ) {
+    return true;
+  } else {
+    return false;
+  }
+}
+
+/**
+ * For a given conflux circuit, return the cpath of the destination.
+ *
+ * The cpath destination is the last hop of the circuit, or NULL if
+ * the circuit is a non-origin circuit.
+ */
+crypt_path_t *
+conflux_get_destination_hop(circuit_t *circ)
+{
+  if (BUG(!circ)) {
+    log_warn(LD_BUG, "No circuit to send on for conflux");
+    return NULL;
+  } else {
+    /* Conflux circuits always send multiplexed relay commands to
+     * to the last hop. (Non-multiplexed commands go on their
+     * original circuit and hop). */
+    if (CIRCUIT_IS_ORIGIN(circ)) {
+      return TO_ORIGIN_CIRCUIT(circ)->cpath->prev;
+    } else {
+      return NULL;
+    }
+  }
+}
+
+/**
+ * Validates that the source of a cell is from the last hop of the circuit
+ * for origin circuits, and that there are no further hops for non-origin
+ * circuits.
+ */
+bool
+conflux_validate_source_hop(circuit_t *in_circ,
+                            crypt_path_t *layer_hint)
+{
+  crypt_path_t *dest = conflux_get_destination_hop(in_circ);
+
+  if (dest != layer_hint) {
+    log_warn(LD_CIRC, "Got conflux command from incorrect hop");
+    return false;
+  }
+
+  if (layer_hint == NULL) {
+    /* We should not have further hops attached to this circuit */
+    if (in_circ->n_chan) {
+      log_warn(LD_BUG, "Got conflux command on circuit with further hops");
+      return false;
+    }
+  }
+  return true;
+}
+
+/**
+ * Returns true if the edge connection uses the given cpath.
+ *
+ * If there is a conflux object, we inspect all the last hops of the conflux
+ * circuits.
+ */
+bool
+edge_uses_cpath(const edge_connection_t *conn,
+                const crypt_path_t *cpath)
+{
+  if (!conn->on_circuit)
+    return false;
+
+  if (CIRCUIT_IS_ORIGIN(conn->on_circuit)) {
+    if (conn->on_circuit->conflux) {
+     tor_assert_nonfatal(conn->on_circuit->purpose ==
+                         CIRCUIT_PURPOSE_CONFLUX_LINKED);
+
+     /* If the circuit is an origin circuit with a conflux object, the cpath
+      * is valid if it came from any of the conflux circuit's last hops. */
+      CONFLUX_FOR_EACH_LEG_BEGIN(conn->on_circuit->conflux, leg) {
+        const origin_circuit_t *ocirc = CONST_TO_ORIGIN_CIRCUIT(leg->circ);
+        if (ocirc->cpath->prev == cpath) {
+          return true;
+        }
+      } CONFLUX_FOR_EACH_LEG_END(leg);
+    } else {
+      return cpath == conn->cpath_layer;
+    }
+  } else {
+    /* For non-origin circuits, cpath should be null */
+    return cpath == NULL;
+  }
+
+  return false;
+}
+
+/**
+ * Returns the max RTT for the circuit that carries this stream,
+ * as observed by congestion control. For conflux circuits,
+ * we return the max RTT across all circuits.
+ */
+uint64_t
+edge_get_max_rtt(const edge_connection_t *stream)
+{
+  if (!stream->on_circuit)
+    return 0;
+
+  if (stream->on_circuit->conflux) {
+    tor_assert_nonfatal(stream->on_circuit->purpose ==
+                        CIRCUIT_PURPOSE_CONFLUX_LINKED);
+
+    /* Find the max rtt from the ccontrol object of each circuit. */
+    uint64_t max_rtt = 0;
+    CONFLUX_FOR_EACH_LEG_BEGIN(stream->on_circuit->conflux, leg) {
+      const congestion_control_t *cc = circuit_ccontrol(leg->circ);
+      if (cc->max_rtt_usec > max_rtt) {
+        max_rtt = cc->max_rtt_usec;
+      }
+    } CONFLUX_FOR_EACH_LEG_END(leg);
+
+    return max_rtt;
+  } else {
+    if (stream->on_circuit && stream->on_circuit->ccontrol)
+      return stream->on_circuit->ccontrol->max_rtt_usec;
+    else if (stream->cpath_layer && stream->cpath_layer->ccontrol)
+      return stream->cpath_layer->ccontrol->max_rtt_usec;
+  }
+
+  return 0;
+}
+
+/**
+ * Return true iff our decryption layer_hint is from the last hop
+ * in a circuit.
+ */
+bool
+relay_crypt_from_last_hop(const origin_circuit_t *circ,
+                          const crypt_path_t *layer_hint)
+{
+  tor_assert(circ);
+  tor_assert(layer_hint);
+  tor_assert(circ->cpath);
+
+  if (TO_CIRCUIT(circ)->conflux) {
+    tor_assert_nonfatal(TO_CIRCUIT(circ)->purpose ==
+                        CIRCUIT_PURPOSE_CONFLUX_LINKED);
+
+    /* If we are a conflux circuit, we need to check if the layer_hint
+     * is from the last hop of any of the conflux circuits. */
+    CONFLUX_FOR_EACH_LEG_BEGIN(TO_CIRCUIT(circ)->conflux, leg) {
+      const origin_circuit_t *ocirc = CONST_TO_ORIGIN_CIRCUIT(leg->circ);
+      if (layer_hint == ocirc->cpath->prev) {
+        return true;
+      }
+    } CONFLUX_FOR_EACH_LEG_END(leg);
+
+    log_fn(LOG_PROTOCOL_WARN, LD_CIRC,
+           "Got unexpected relay data from intermediate hop");
+    return false;
+  } else {
+    if (layer_hint != circ->cpath->prev) {
+      log_fn(LOG_PROTOCOL_WARN, LD_CIRC,
+             "Got unexpected relay data from intermediate hop");
+      return false;
+    }
+    return true;
+  }
+}
+
+/**
+ * Update the head of the n_streams list on all circuits in the conflux
+ * set.
+ */
+void
+conflux_update_p_streams(origin_circuit_t *circ, edge_connection_t *stream)
+{
+  tor_assert(circ);
+
+  if (TO_CIRCUIT(circ)->conflux) {
+    tor_assert_nonfatal(TO_CIRCUIT(circ)->purpose ==
+                        CIRCUIT_PURPOSE_CONFLUX_LINKED);
+    CONFLUX_FOR_EACH_LEG_BEGIN(TO_CIRCUIT(circ)->conflux, leg) {
+      TO_ORIGIN_CIRCUIT(leg->circ)->p_streams = stream;
+    } CONFLUX_FOR_EACH_LEG_END(leg);
+  }
+}
+
+/**
+ * Sync the next_stream_id, timestamp_dirty, and circuit_idle_timeout
+ * fields of a conflux set to the values in a particular circuit.
+ *
+ * This is called upon link, and whenever one of these fields
+ * changes on ref_circ. The ref_circ values are copied to all
+ * other circuits in the conflux set.
+*/
+void
+conflux_sync_circ_fields(conflux_t *cfx, origin_circuit_t *ref_circ)
+{
+  tor_assert(cfx);
+  tor_assert(ref_circ);
+
+  CONFLUX_FOR_EACH_LEG_BEGIN(cfx, leg) {
+    if (leg->circ == TO_CIRCUIT(ref_circ)) {
+      continue;
+    }
+    origin_circuit_t *ocirc = TO_ORIGIN_CIRCUIT(leg->circ);
+    ocirc->next_stream_id = ref_circ->next_stream_id;
+    leg->circ->timestamp_dirty = TO_CIRCUIT(ref_circ)->timestamp_dirty;
+    ocirc->circuit_idle_timeout = ref_circ->circuit_idle_timeout;
+    ocirc->unusable_for_new_conns = ref_circ->unusable_for_new_conns;
+  } CONFLUX_FOR_EACH_LEG_END(leg);
+}
+
+/**
+ * Update the head of the n_streams list on all circuits in the conflux
+ * set.
+ */
+void
+conflux_update_n_streams(or_circuit_t *circ, edge_connection_t *stream)
+{
+  tor_assert(circ);
+
+  if (TO_CIRCUIT(circ)->conflux) {
+    CONFLUX_FOR_EACH_LEG_BEGIN(TO_CIRCUIT(circ)->conflux, leg) {
+      TO_OR_CIRCUIT(leg->circ)->n_streams = stream;
+    } CONFLUX_FOR_EACH_LEG_END(leg);
+  }
+}
+
+/**
+ * Update the head of the resolving_streams list on all circuits in the conflux
+ * set.
+ */
+void
+conflux_update_resolving_streams(or_circuit_t *circ, edge_connection_t *stream)
+{
+  tor_assert(circ);
+
+  if (TO_CIRCUIT(circ)->conflux) {
+    CONFLUX_FOR_EACH_LEG_BEGIN(TO_CIRCUIT(circ)->conflux, leg) {
+      TO_OR_CIRCUIT(leg->circ)->resolving_streams = stream;
+    } CONFLUX_FOR_EACH_LEG_END(leg);
+  }
+}
+
+/**
+ * Update the half_streams list on all circuits in the conflux
+ */
+void
+conflux_update_half_streams(origin_circuit_t *circ, smartlist_t *half_streams)
+{
+  tor_assert(circ);
+
+  if (TO_CIRCUIT(circ)->conflux) {
+    tor_assert_nonfatal(TO_CIRCUIT(circ)->purpose ==
+                        CIRCUIT_PURPOSE_CONFLUX_LINKED);
+    CONFLUX_FOR_EACH_LEG_BEGIN(TO_CIRCUIT(circ)->conflux, leg) {
+      TO_ORIGIN_CIRCUIT(leg->circ)->half_streams = half_streams;
+    } CONFLUX_FOR_EACH_LEG_END(leg);
+  }
+}
+
+/**
+ * Helper function that emits non-fatal asserts if the stream lists
+ * or next_stream_id is out of sync between any of the conflux legs.
+*/
+void
+conflux_validate_stream_lists(const conflux_t *cfx)
+{
+  const conflux_leg_t *first_leg = smartlist_get(cfx->legs, 0);
+  tor_assert(first_leg);
+
+  /* Compare the stream lists of the first leg to all other legs. */
+  if (CIRCUIT_IS_ORIGIN(first_leg->circ)) {
+    const origin_circuit_t *f_circ =
+        CONST_TO_ORIGIN_CIRCUIT(first_leg->circ);
+
+    CONFLUX_FOR_EACH_LEG_BEGIN(cfx, leg) {
+      const origin_circuit_t *l_circ = CONST_TO_ORIGIN_CIRCUIT(leg->circ);
+      tor_assert_nonfatal(l_circ->p_streams == f_circ->p_streams);
+      tor_assert_nonfatal(l_circ->half_streams == f_circ->half_streams);
+      tor_assert_nonfatal(l_circ->next_stream_id == f_circ->next_stream_id);
+    } CONFLUX_FOR_EACH_LEG_END(leg);
+  } else {
+    const or_circuit_t *f_circ = CONST_TO_OR_CIRCUIT(first_leg->circ);
+    CONFLUX_FOR_EACH_LEG_BEGIN(cfx, leg) {
+      const or_circuit_t *l_circ = CONST_TO_OR_CIRCUIT(leg->circ);
+      tor_assert_nonfatal(l_circ->n_streams == f_circ->n_streams);
+      tor_assert_nonfatal(l_circ->resolving_streams ==
+                          f_circ->resolving_streams);
+    } CONFLUX_FOR_EACH_LEG_END(leg);
+  }
+}
+
+/**
+ * Validate the conflux set has two legs, and both circuits have
+ * no nonce, and for origin circuits, the purpose is CONFLUX_PURPOSE_LINKED.
+ */
+void
+conflux_validate_legs(const conflux_t *cfx)
+{
+  tor_assert(cfx);
+  // TODO-329-UDP: Eventually we want to allow three legs for the
+  // exit case, to allow reconnection of legs to hit an RTT target.
+  // For now, this validation helps find bugs.
+  if (BUG(smartlist_len(cfx->legs) > conflux_params_get_num_legs_set())) {
+    log_warn(LD_BUG, "Number of legs is above maximum of %d allowed: %d\n",
+             conflux_params_get_num_legs_set(), smartlist_len(cfx->legs));
+  }
+
+  CONFLUX_FOR_EACH_LEG_BEGIN(cfx, leg) {
+    /* Ensure we have no pending nonce on the circ */
+    tor_assert_nonfatal(leg->circ->conflux_pending_nonce == NULL);
+    tor_assert_nonfatal(leg->circ->conflux != NULL);
+
+    if (CIRCUIT_IS_ORIGIN(leg->circ)) {
+      tor_assert_nonfatal(leg->circ->purpose ==
+                          CIRCUIT_PURPOSE_CONFLUX_LINKED);
+    }
+  } CONFLUX_FOR_EACH_LEG_END(leg);
+}
diff --git a/src/core/or/conflux_util.h b/src/core/or/conflux_util.h
new file mode 100644
index 0000000000..c556ae1848
--- /dev/null
+++ b/src/core/or/conflux_util.h
@@ -0,0 +1,59 @@
+/* Copyright (c) 2023, The Tor Project, Inc. */
+/* See LICENSE for licensing information */
+
+/**
+ * \file conflux_util.h
+ * \brief Header file for conflux_util.c.
+ **/
+
+#ifndef TOR_CONFLUX_UTIL_H
+#define TOR_CONFLUX_UTIL_H
+
+/* Forward decls */
+typedef struct edge_connection_t edge_connection_t;
+typedef struct crypt_path_t crypt_path_t;
+typedef struct origin_circuit_t origin_circuit_t;
+typedef struct conflux_t conflux_t;
+
+/* True iff the given circuit_t circ is conflux related. */
+static inline bool
+CIRCUIT_IS_CONFLUX(const circuit_t *circ)
+{
+  if (circ->conflux_pending_nonce) {
+    if (CIRCUIT_IS_ORIGIN(circ))
+      tor_assert_nonfatal(circ->purpose == CIRCUIT_PURPOSE_CONFLUX_UNLINKED);
+    return true;
+  } else if (circ->conflux) {
+    if (CIRCUIT_IS_ORIGIN(circ))
+      tor_assert_nonfatal(circ->purpose == CIRCUIT_PURPOSE_CONFLUX_LINKED);
+    return true;
+  } else {
+    tor_assert_nonfatal(circ->purpose != CIRCUIT_PURPOSE_CONFLUX_LINKED);
+    tor_assert_nonfatal(circ->purpose != CIRCUIT_PURPOSE_CONFLUX_UNLINKED);
+    return false;
+  }
+}
+
+int circuit_get_package_window(circuit_t *circ,
+                               const crypt_path_t *cpath);
+bool conflux_can_send(conflux_t *cfx);
+
+bool edge_uses_cpath(const edge_connection_t *conn,
+                     const crypt_path_t *cpath);
+crypt_path_t *conflux_get_destination_hop(circuit_t *circ);
+bool conflux_validate_source_hop(circuit_t *in_circ,
+                            crypt_path_t *layer_hint);
+uint64_t edge_get_max_rtt(const edge_connection_t *stream);
+bool relay_crypt_from_last_hop(const origin_circuit_t *circ,
+                          const crypt_path_t *layer_hint);
+
+void conflux_update_p_streams(origin_circuit_t *, edge_connection_t *);
+void conflux_update_half_streams(origin_circuit_t *, smartlist_t *);
+void conflux_update_n_streams(or_circuit_t *, edge_connection_t *);
+void conflux_update_resolving_streams(or_circuit_t *, edge_connection_t *);
+void conflux_sync_circ_fields(conflux_t *cfx, origin_circuit_t *ref_circ);
+void conflux_validate_stream_lists(const conflux_t *cfx);
+void conflux_validate_legs(const conflux_t *cfx);
+
+#endif /* TOR_CONFLUX_UTIL_H */
+
diff --git a/src/core/or/congestion_control_common.c b/src/core/or/congestion_control_common.c
index c7c950d0c8..1e0f504df1 100644
--- a/src/core/or/congestion_control_common.c
+++ b/src/core/or/congestion_control_common.c
@@ -24,6 +24,7 @@
 #include "core/or/congestion_control_westwood.h"
 #include "core/or/congestion_control_st.h"
 #include "core/or/conflux.h"
+#include "core/or/conflux_util.h"
 #include "core/or/trace_probes_cc.h"
 #include "lib/time/compat_time.h"
 #include "feature/nodelist/networkstatus.h"
@@ -703,7 +704,7 @@ circuit_has_active_streams(const circuit_t *circ,
     if (conn->base_.marked_for_close)
       continue;
 
-    if (!layer_hint || conn->cpath_layer == layer_hint) {
+    if (edge_uses_cpath(conn, layer_hint)) {
       if (connection_get_inbuf_len(TO_CONN(conn)) > 0) {
         log_info(LD_CIRC, "CC: More in edge inbuf...");
         return 1;
diff --git a/src/core/or/congestion_control_flow.c b/src/core/or/congestion_control_flow.c
index 90b1927ef9..fa9455a8a1 100644
--- a/src/core/or/congestion_control_flow.c
+++ b/src/core/or/congestion_control_flow.c
@@ -28,6 +28,7 @@
 #include "core/or/connection_st.h"
 #include "core/or/cell_st.h"
 #include "app/config/config.h"
+#include "core/or/conflux_util.h"
 
 /** Cache consensus parameters */
 static uint32_t xoff_client;
@@ -60,27 +61,6 @@ double cc_stats_flow_xon_outbuf_ma = 0;
 #define ONE_MEGABYTE (UINT64_C(1) << 20)
 #define TOTAL_XMIT_SCALE_AT (10 * ONE_MEGABYTE)
 
-/**
- * Return the congestion control object of the given edge connection.
- *
- * Returns NULL if the edge connection doesn't have a cpath_layer or not
- * attached to a circuit. But also if the cpath_layer or circuit doesn't have a
- * congestion control object.
- */
-static inline const congestion_control_t *
-edge_get_ccontrol(const edge_connection_t *edge)
-{
-  congestion_control_t *ccontrol = NULL;
-
-  if (edge->on_circuit && edge->on_circuit->ccontrol) {
-    ccontrol = edge->on_circuit->ccontrol;
-  } else if (edge->cpath_layer && edge->cpath_layer->ccontrol) {
-    ccontrol = edge->cpath_layer->ccontrol;
-  }
-
-  return ccontrol;
-}
-
 /**
  * Update global congestion control related consensus parameter values, every
  * consensus update.
@@ -265,13 +245,13 @@ circuit_process_stream_xoff(edge_connection_t *conn,
   }
 
   /* Make sure this XOFF came from the right hop */
-  if (layer_hint && layer_hint != conn->cpath_layer) {
+  if (!edge_uses_cpath(conn, layer_hint)) {
     log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
             "Got XOFF from wrong hop.");
     return false;
   }
 
-  if (edge_get_ccontrol(conn) == NULL) {
+  if (!edge_uses_flow_control(conn)) {
     log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
            "Got XOFF for non-congestion control circuit");
     return false;
@@ -359,13 +339,13 @@ circuit_process_stream_xon(edge_connection_t *conn,
   }
 
   /* Make sure this XON came from the right hop */
-  if (layer_hint && layer_hint != conn->cpath_layer) {
+  if (!edge_uses_cpath(conn, layer_hint)) {
     log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
            "Got XON from wrong hop.");
     return false;
   }
 
-  if (edge_get_ccontrol(conn) == NULL) {
+  if (!edge_uses_flow_control(conn)) {
     log_fn(LOG_PROTOCOL_WARN, LD_EDGE,
            "Got XON for non-congestion control circuit");
     return false;
@@ -464,7 +444,7 @@ flow_control_decide_xoff(edge_connection_t *stream)
   size_t total_buffered = connection_get_outbuf_len(TO_CONN(stream));
   uint32_t buffer_limit_xoff = 0;
 
-  if (BUG(edge_get_ccontrol(stream) == NULL)) {
+  if (BUG(!edge_uses_flow_control(stream))) {
     log_err(LD_BUG, "Flow control called for non-congestion control circuit");
     return -1;
   }
@@ -717,21 +697,6 @@ edge_uses_flow_control(const edge_connection_t *stream)
   return ret;
 }
 
-/**
- * Returns the max RTT for the circuit that carries this stream,
- * as observed by congestion control.
- */
-uint64_t
-edge_get_max_rtt(const edge_connection_t *stream)
-{
-  if (stream->on_circuit && stream->on_circuit->ccontrol)
-    return stream->on_circuit->ccontrol->max_rtt_usec;
-  else if (stream->cpath_layer && stream->cpath_layer->ccontrol)
-    return stream->cpath_layer->ccontrol->max_rtt_usec;
-
-  return 0;
-}
-
 /** Returns true if a connection is an edge conn that uses flow control */
 bool
 conn_uses_flow_control(connection_t *conn)
diff --git a/src/core/or/congestion_control_flow.h b/src/core/or/congestion_control_flow.h
index 5c735cce23..05e25c44d0 100644
--- a/src/core/or/congestion_control_flow.h
+++ b/src/core/or/congestion_control_flow.h
@@ -31,8 +31,6 @@ bool edge_uses_flow_control(const edge_connection_t *stream);
 
 bool conn_uses_flow_control(connection_t *stream);
 
-uint64_t edge_get_max_rtt(const edge_connection_t *);
-
 /** Metricsport externs */
 extern uint64_t cc_stats_flow_num_xoff_sent;
 extern uint64_t cc_stats_flow_num_xon_sent;
diff --git a/src/core/or/connection_edge.c b/src/core/or/connection_edge.c
index b0ccedc27f..ee6ab8596c 100644
--- a/src/core/or/connection_edge.c
+++ b/src/core/or/connection_edge.c
@@ -70,6 +70,7 @@
 #include "core/or/circuitpadding.h"
 #include "core/or/connection_edge.h"
 #include "core/or/congestion_control_flow.h"
+#include "core/or/conflux_util.h"
 #include "core/or/circuitstats.h"
 #include "core/or/connection_or.h"
 #include "core/or/extendinfo.h"
@@ -628,6 +629,7 @@ connection_half_edge_add(const edge_connection_t *conn,
 
   if (!circ->half_streams) {
     circ->half_streams = smartlist_new();
+    conflux_update_half_streams(circ, circ->half_streams);
   }
 
   half_conn->stream_id = conn->stream_id;
@@ -3102,6 +3104,10 @@ get_unique_stream_id_by_circ(origin_circuit_t *circ)
                                            test_stream_id))
     goto again;
 
+  if (TO_CIRCUIT(circ)->conflux) {
+    conflux_sync_circ_fields(TO_CIRCUIT(circ)->conflux, circ);
+  }
+
   return test_stream_id;
 }
 
diff --git a/src/core/or/edge_connection_st.h b/src/core/or/edge_connection_st.h
index 22f9040d15..e8a3039b33 100644
--- a/src/core/or/edge_connection_st.h
+++ b/src/core/or/edge_connection_st.h
@@ -28,11 +28,15 @@ struct edge_connection_t {
                        * circuit? */
   int deliver_window; /**< How many more relay cells can end at me? */
 
-  struct circuit_t *on_circuit; /**< The circuit (if any) that this edge
-                                 * connection is using. */
+  /** The circuit (if any) that this edge connection is using.
+   * Note that edges that use conflux should use the helpers
+   * in conflux_util.c instead of accessing this directly. */
+  struct circuit_t *on_circuit;
 
   /** A pointer to which node in the circ this conn exits at.  Set for AP
-   * connections and for hidden service exit connections. */
+   * connections and for hidden service exit connections.
+   * Note that edges that use conflux should use the helpers
+   * in conflux_util.c instead of accessing this directly. */
   struct crypt_path_t *cpath_layer;
 
   /* Hidden service connection identifier for edge connections. Used by the HS
diff --git a/src/core/or/or_circuit_st.h b/src/core/or/or_circuit_st.h
index 11695ec301..d5a7007928 100644
--- a/src/core/or/or_circuit_st.h
+++ b/src/core/or/or_circuit_st.h
@@ -35,10 +35,18 @@ struct or_circuit_t {
   cell_queue_t p_chan_cells;
   /** The channel that is previous in this circuit. */
   channel_t *p_chan;
-  /** Linked list of Exit streams associated with this circuit. */
+  /** Linked list of Exit streams associated with this circuit.
+   *
+   * Note that any updates to this pointer must be followed with
+   * conflux_update_n_streams() to keep the other legs n_streams
+   * in sync. */
   edge_connection_t *n_streams;
   /** Linked list of Exit streams associated with this circuit that are
-   * still being resolved. */
+   * still being resolved.
+   *
+   * Just like with n_streams, any updates to this pointer must
+   * be followed with conflux_update_resolving_streams().
+   */
   edge_connection_t *resolving_streams;
 
   /** Cryptographic state used for encrypting and authenticating relay
diff --git a/src/core/or/origin_circuit_st.h b/src/core/or/origin_circuit_st.h
index 73b971f72d..c5c255bb49 100644
--- a/src/core/or/origin_circuit_st.h
+++ b/src/core/or/origin_circuit_st.h
@@ -80,11 +80,18 @@ struct origin_circuit_t {
   circuit_t base_;
 
   /** Linked list of AP streams (or EXIT streams if hidden service)
-   * associated with this circuit. */
+   * associated with this circuit.
+   *
+   * Any updates to this pointer must be followed with
+   * conflux_update_p_streams(). */
   edge_connection_t *p_streams;
 
   /** Smartlist of half-closed streams (half_edge_t*) that still
-   * have pending activity */
+   * have pending activity.
+   *
+   * Any updates to this pointer must be followed with
+   * conflux_update_half_streams().
+   */
   smartlist_t *half_streams;
 
   /** Bytes read on this circuit since last call to
diff --git a/src/core/or/relay.c b/src/core/or/relay.c
index 827f0c3e46..58e48df902 100644
--- a/src/core/or/relay.c
+++ b/src/core/or/relay.c
@@ -122,7 +122,8 @@ static int connection_edge_process_ordered_relay_cell(cell_t *cell,
                                            edge_connection_t *conn,
                                            crypt_path_t *layer_hint,
                                            relay_header_t *rh);
-static void set_block_state_for_streams(edge_connection_t *stream_list,
+static void set_block_state_for_streams(circuit_t *circ,
+                                        edge_connection_t *stream_list,
                                         int block, streamid_t stream_id);
 
 /** Stats: how many relay cells have originated at this hop, or have
@@ -455,7 +456,7 @@ relay_lookup_conn(circuit_t *circ, cell_t *cell,
          tmpconn=tmpconn->next_stream) {
       if (rh.stream_id == tmpconn->stream_id &&
           !tmpconn->base_.marked_for_close &&
-          tmpconn->cpath_layer == layer_hint) {
+          edge_uses_cpath(tmpconn, layer_hint)) {
         log_debug(LD_APP,"found conn for stream %d.", rh.stream_id);
         return tmpconn;
       }
@@ -1549,25 +1550,6 @@ connection_edge_process_relay_cell_not_open(
 //  return -1;
 }
 
-/**
- * Return true iff our decryption layer_hint is from the last hop
- * in a circuit.
- */
-static bool
-relay_crypt_from_last_hop(origin_circuit_t *circ, crypt_path_t *layer_hint)
-{
-  tor_assert(circ);
-  tor_assert(layer_hint);
-  tor_assert(circ->cpath);
-
-  if (layer_hint != circ->cpath->prev) {
-    log_fn(LOG_PROTOCOL_WARN, LD_CIRC,
-           "Got unexpected relay data from intermediate hop");
-    return false;
-  }
-  return true;
-}
-
 /** Process a SENDME cell that arrived on <b>circ</b>. If it is a stream level
  * cell, it is destined for the given <b>conn</b>. If it is a circuit level
  * cell, it is destined for the <b>layer_hint</b>. The <b>domain</b> is the
@@ -2454,6 +2436,15 @@ circuit_resume_edge_reading(circuit_t *circ, crypt_path_t *layer_hint)
     log_debug(layer_hint?LD_APP:LD_EXIT,"Too big queue, no resuming");
     return;
   }
+
+  /* If we have a conflux negotiated, and it still can't send on
+   * any circuit, then do not resume sending. */
+  if (circ->conflux && !conflux_can_send(circ->conflux)) {
+    log_debug(layer_hint?LD_APP:LD_EXIT,
+              "Conflux can't send, not resuming edges");
+    return;
+  }
+
   log_debug(layer_hint?LD_APP:LD_EXIT,"resuming");
 
   if (CIRCUIT_IS_ORIGIN(circ))
@@ -2487,20 +2478,6 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
     return 0;
   }
 
-  /* How many cells do we have space for?  It will be the minimum of
-   * the number needed to exhaust the package window, and the minimum
-   * needed to fill the cell queue. */
-
-  max_to_package = congestion_control_get_package_window(circ, layer_hint);
-  if (CIRCUIT_IS_ORIGIN(circ)) {
-    cells_on_queue = circ->n_chan_cells.n;
-  } else {
-    or_circuit_t *or_circ = TO_OR_CIRCUIT(circ);
-    cells_on_queue = or_circ->p_chan_cells.n;
-  }
-  if (cell_queue_highwatermark() - cells_on_queue < max_to_package)
-    max_to_package = cell_queue_highwatermark() - cells_on_queue;
-
   /* Once we used to start listening on the streams in the order they
    * appeared in the linked list.  That leads to starvation on the
    * streams that appeared later on the list, since the first streams
@@ -2539,11 +2516,13 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
   /* Activate reading starting from the chosen stream */
   for (conn=chosen_stream; conn; conn = conn->next_stream) {
     /* Start reading for the streams starting from here */
-    if (conn->base_.marked_for_close || conn->package_window <= 0 ||
-        conn->xoff_received)
+    if (conn->base_.marked_for_close || conn->package_window <= 0)
       continue;
-    if (!layer_hint || conn->cpath_layer == layer_hint) {
-      connection_start_reading(TO_CONN(conn));
+
+    if (edge_uses_cpath(conn, layer_hint)) {
+      if (!conn->xoff_received) {
+        connection_start_reading(TO_CONN(conn));
+      }
 
       if (connection_get_inbuf_len(TO_CONN(conn)) > 0)
         ++n_packaging_streams;
@@ -2551,11 +2530,13 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
   }
   /* Go back and do the ones we skipped, circular-style */
   for (conn = first_conn; conn != chosen_stream; conn = conn->next_stream) {
-    if (conn->base_.marked_for_close || conn->package_window <= 0 ||
-        conn->xoff_received)
+    if (conn->base_.marked_for_close || conn->package_window <= 0)
       continue;
-    if (!layer_hint || conn->cpath_layer == layer_hint) {
-      connection_start_reading(TO_CONN(conn));
+
+    if (edge_uses_cpath(conn, layer_hint)) {
+      if (!conn->xoff_received) {
+        connection_start_reading(TO_CONN(conn));
+      }
 
       if (connection_get_inbuf_len(TO_CONN(conn)) > 0)
         ++n_packaging_streams;
@@ -2567,6 +2548,32 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
 
  again:
 
+  /* If we're using conflux, the circuit we decide to send on may change
+   * after we're sending. Get it again, and re-check package windows
+   * for it */
+  if (circ->conflux) {
+    if (circuit_consider_stop_edge_reading(circ, layer_hint))
+      return -1;
+
+    circ = conflux_decide_next_circ(circ->conflux);
+
+    /* Get the destination layer hint for this circuit */
+    layer_hint = conflux_get_destination_hop(circ);
+  }
+
+  /* How many cells do we have space for?  It will be the minimum of
+   * the number needed to exhaust the package window, and the minimum
+   * needed to fill the cell queue. */
+  max_to_package = congestion_control_get_package_window(circ, layer_hint);
+  if (CIRCUIT_IS_ORIGIN(circ)) {
+    cells_on_queue = circ->n_chan_cells.n;
+  } else {
+    or_circuit_t *or_circ = TO_OR_CIRCUIT(circ);
+    cells_on_queue = or_circ->p_chan_cells.n;
+  }
+  if (cell_queue_highwatermark() - cells_on_queue < max_to_package)
+    max_to_package = cell_queue_highwatermark() - cells_on_queue;
+
   cells_per_conn = CEIL_DIV(max_to_package, n_packaging_streams);
 
   packaged_this_round = 0;
@@ -2580,7 +2587,7 @@ circuit_resume_edge_reading_helper(edge_connection_t *first_conn,
   for (conn=first_conn; conn; conn=conn->next_stream) {
     if (conn->base_.marked_for_close || conn->package_window <= 0)
       continue;
-    if (!layer_hint || conn->cpath_layer == layer_hint) {
+    if (edge_uses_cpath(conn, layer_hint)) {
       int n = cells_per_conn, r;
       /* handle whatever might still be on the inbuf */
       r = connection_edge_package_raw_inbuf(conn, 1, &n);
@@ -2638,7 +2645,7 @@ circuit_consider_stop_edge_reading(circuit_t *circ, crypt_path_t *layer_hint)
     or_circuit_t *or_circ = TO_OR_CIRCUIT(circ);
     log_debug(domain,"considering circ->package_window %d",
               circ->package_window);
-    if (congestion_control_get_package_window(circ, layer_hint) <= 0) {
+    if (circuit_get_package_window(circ, layer_hint) <= 0) {
       log_debug(domain,"yes, not-at-origin. stopped.");
       for (conn = or_circ->n_streams; conn; conn=conn->next_stream)
         connection_stop_reading(TO_CONN(conn));
@@ -2649,11 +2656,11 @@ circuit_consider_stop_edge_reading(circuit_t *circ, crypt_path_t *layer_hint)
   /* else, layer hint is defined, use it */
   log_debug(domain,"considering layer_hint->package_window %d",
             layer_hint->package_window);
-  if (congestion_control_get_package_window(circ, layer_hint) <= 0) {
+  if (circuit_get_package_window(circ, layer_hint) <= 0) {
     log_debug(domain,"yes, at-origin. stopped.");
     for (conn = TO_ORIGIN_CIRCUIT(circ)->p_streams; conn;
          conn=conn->next_stream) {
-      if (conn->cpath_layer == layer_hint)
+      if (edge_uses_cpath(conn, layer_hint))
         connection_stop_reading(TO_CONN(conn));
     }
     return 1;
@@ -3029,7 +3036,7 @@ set_circuit_blocked_on_chan(circuit_t *circ, channel_t *chan, int block)
     edge = TO_OR_CIRCUIT(circ)->n_streams;
   }
 
-  set_block_state_for_streams(edge, block, 0);
+  set_block_state_for_streams(circ, edge, block, 0);
 }
 
 /**
@@ -3039,15 +3046,29 @@ set_circuit_blocked_on_chan(circuit_t *circ, channel_t *chan, int block)
  * in the stream list. If it is non-zero, only apply to that specific stream.
  */
 static void
-set_block_state_for_streams(edge_connection_t *stream_list, int block,
-                            streamid_t stream_id)
+set_block_state_for_streams(circuit_t *circ, edge_connection_t *stream_list,
+                            int block, streamid_t stream_id)
 {
+  /* If we have a conflux object, we need to examine its status before
+   * blocking and unblocking streams. */
+  if (circ->conflux) {
+    bool can_send = conflux_can_send(circ->conflux);
+
+    if (block && can_send) {
+      /* Don't actually block streams, since conflux can send*/
+      return;
+    } else if (!block && !can_send) {
+      /* Don't actually unblock streams, since conflux still can't send */
+      return;
+    }
+  }
+
   for (edge_connection_t *edge = stream_list; edge; edge = edge->next_stream) {
     connection_t *conn = TO_CONN(edge);
     if (stream_id && edge->stream_id != stream_id)
       continue;
 
-    if (!conn->read_event) {
+    if (!conn->read_event || edge->xoff_received) {
       /* This connection is a placeholder for something; probably a DNS
        * request.  It can't actually stop or start reading.*/
       continue;
@@ -3412,8 +3433,8 @@ append_cell_to_circuit_queue(circuit_t *circ, channel_t *chan,
   if (circ_blocked && fromstream) {
     /* This edge connection is apparently not blocked; this can happen for
      * new streams on a blocked circuit, for their CONNECTED response.
-     * block it now. */
-    set_block_state_for_streams(stream_list, 1, fromstream);
+     * block it now, unless we have conflux. */
+    set_block_state_for_streams(circ, stream_list, 1, fromstream);
   }
 
   update_circuit_on_cmux(circ, direction);
diff --git a/src/feature/relay/dns.c b/src/feature/relay/dns.c
index 7267ca06dd..f6a020d061 100644
--- a/src/feature/relay/dns.c
+++ b/src/feature/relay/dns.c
@@ -71,6 +71,7 @@
 
 #include "core/or/edge_connection_st.h"
 #include "core/or/or_circuit_st.h"
+#include "core/or/conflux_util.h"
 
 #include "ht.h"
 
@@ -650,6 +651,7 @@ dns_resolve(edge_connection_t *exitconn)
          * connected cell. */
         exitconn->next_stream = oncirc->n_streams;
         oncirc->n_streams = exitconn;
+        conflux_update_n_streams(oncirc, exitconn);
       }
       break;
     case 0:
@@ -658,6 +660,7 @@ dns_resolve(edge_connection_t *exitconn)
       exitconn->base_.state = EXIT_CONN_STATE_RESOLVING;
       exitconn->next_stream = oncirc->resolving_streams;
       oncirc->resolving_streams = exitconn;
+      conflux_update_resolving_streams(oncirc, exitconn);
       break;
     case -2:
     case -1:
@@ -1234,6 +1237,7 @@ inform_pending_connections(cached_resolve_t *resolve)
         pend->conn->next_stream = TO_OR_CIRCUIT(circ)->n_streams;
         pend->conn->on_circuit = circ;
         TO_OR_CIRCUIT(circ)->n_streams = pend->conn;
+        conflux_update_n_streams(TO_OR_CIRCUIT(circ), pend->conn);
 
         connection_exit_connect(pend->conn);
       } else {

-- 
To stop receiving notification emails like this one, please contact
the administrator of this repository.
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits