[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [tor/master] Track total queue size per channel, with overhead estimates, and global queue total



commit 8852a1794cfa9eb5dae494f5d85242d8fd6955fc
Author: Andrea Shepard <andrea@xxxxxxxxxxxxxx>
Date:   Tue Nov 5 00:30:02 2013 -0800

    Track total queue size per channel, with overhead estimates, and global queue total
---
 src/or/channel.c       |  126 ++++++++++++++++++++++++++++++++++++++++++++++++
 src/or/channel.h       |   16 ++++++
 src/or/channeltls.c    |   54 +++++++++++++++++++++
 src/or/connection.c    |    2 +
 src/or/connection_or.c |    3 ++
 src/or/or.h            |    6 +++
 6 files changed, 207 insertions(+)

diff --git a/src/or/channel.c b/src/or/channel.c
index 0caebfb..f729a17 100644
--- a/src/or/channel.c
+++ b/src/or/channel.c
@@ -124,6 +124,13 @@ static uint64_t n_channel_bytes_passed_to_lower_layer = 0;
 
 static uint64_t n_channel_bytes_in_queues = 0;
 
+/*
+ * Current total estimated queue size *including lower layer queues and
+ * transmit overhead*
+ */
+
+static uint64_t estimated_total_queue_size = 0;
+
 /* Digest->channel map
  *
  * Similar to the one used in connection_or.c, this maps from the identity
@@ -1840,6 +1847,8 @@ channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q)
     n_channel_bytes_queued += cell_bytes;
     n_channel_bytes_in_queues += cell_bytes;
     channel_assert_counter_consistency();
+    /* Update channel queue size */
+    chan->bytes_in_queue += cell_bytes;
     /* Try to process the queue? */
     if (chan->state == CHANNEL_STATE_OPEN) channel_flush_cells(chan);
   }
@@ -1878,6 +1887,9 @@ channel_write_cell(channel_t *chan, cell_t *cell)
   q.type = CELL_QUEUE_FIXED;
   q.u.fixed.cell = cell;
   channel_write_cell_queue_entry(chan, &q);
+
+  /* Update the queue size estimate */
+  channel_update_xmit_queue_size(chan);
 }
 
 /**
@@ -1913,6 +1925,9 @@ channel_write_packed_cell(channel_t *chan, packed_cell_t *packed_cell)
   q.type = CELL_QUEUE_PACKED;
   q.u.packed.packed_cell = packed_cell;
   channel_write_cell_queue_entry(chan, &q);
+
+  /* Update the queue size estimate */
+  channel_update_xmit_queue_size(chan);
 }
 
 /**
@@ -1949,6 +1964,9 @@ channel_write_var_cell(channel_t *chan, var_cell_t *var_cell)
   q.type = CELL_QUEUE_VAR;
   q.u.var.var_cell = var_cell;
   channel_write_cell_queue_entry(chan, &q);
+
+  /* Update the queue size estimate */
+  channel_update_xmit_queue_size(chan);
 }
 
 /**
@@ -2056,6 +2074,29 @@ channel_change_state(channel_t *chan, channel_state_t to_state)
     scheduler_channel_doesnt_want_writes(chan);
   }
 
+  /*
+   * If we're closing, this channel no longer counts toward the global
+   * estimated queue size; if we're open, it now does.
+   */
+  if ((to_state == CHANNEL_STATE_CLOSING ||
+       to_state == CHANNEL_STATE_CLOSED ||
+       to_state == CHANNEL_STATE_ERROR) &&
+      (from_state == CHANNEL_STATE_OPEN ||
+       from_state == CHANNEL_STATE_MAINT)) {
+    estimated_total_queue_size -= chan->bytes_in_queue;
+  }
+
+  /*
+   * If we're opening, this channel now does count toward the global
+   * estimated queue size.
+   */
+  if ((to_state == CHANNEL_STATE_OPEN ||
+       to_state == CHANNEL_STATE_MAINT) &&
+      !(from_state == CHANNEL_STATE_OPEN ||
+        from_state == CHANNEL_STATE_MAINT)) {
+    estimated_total_queue_size += chan->bytes_in_queue;
+  }
+
   /* Tell circuits if we opened and stuff */
   if (to_state == CHANNEL_STATE_OPEN) {
     channel_do_open_actions(chan);
@@ -2350,6 +2391,8 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan,
           n_channel_bytes_passed_to_lower_layer += cell_size;
           n_channel_bytes_in_queues -= cell_size;
           channel_assert_counter_consistency();
+          /* Update the channel's queue size too */
+          chan->bytes_in_queue -= cell_size;
         }
         /* No cell removed from list, so we can't go on any further */
         else break;
@@ -2362,6 +2405,9 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan,
     channel_timestamp_drained(chan);
   }
 
+  /* Update the estimate queue size */
+  channel_update_xmit_queue_size(chan);
+
   return flushed;
 }
 
@@ -4421,3 +4467,83 @@ channel_set_circid_type(channel_t *chan,
   }
 }
 
+/**
+ * Update the estimated number of bytes queued to transmit for this channel,
+ * and notify the scheduler.  The estimate includes both the channel queue and
+ * the queue size reported by the lower layer, and an overhead estimate
+ * optionally provided by the lower layer.
+ */
+
+void
+channel_update_xmit_queue_size(channel_t *chan)
+{
+  uint64_t queued, adj;
+  double overhead;
+
+  tor_assert(chan);
+  tor_assert(chan->num_bytes_queued);
+
+  /*
+   * First, get the number of bytes we have queued without factoring in
+   * lower-layer overhead.
+   */
+  queued = chan->num_bytes_queued(chan) + chan->bytes_in_queue;
+  /* Next, adjust by the overhead factor, if any is available */
+  if (chan->get_overhead_estimate) {
+    overhead = chan->get_overhead_estimate(chan);
+    if (overhead >= 1.0f) {
+      queued *= overhead;
+    } else {
+      /* Ignore silly overhead factors */
+      log_notice(LD_CHANNEL, "Ignoring silly overhead factor %f", overhead);
+    }
+  }
+
+  /* Now, compare to the previous estimate */
+  if (queued > chan->bytes_queued_for_xmit) {
+    adj = queued - chan->bytes_queued_for_xmit;
+    log_debug(LD_CHANNEL,
+              "Increasing queue size for channel " U64_FORMAT " by " U64_FORMAT
+              " from " U64_FORMAT " to " U64_FORMAT,
+              U64_PRINTF_ARG(chan->global_identifier),
+              U64_PRINTF_ARG(adj),
+              U64_PRINTF_ARG(chan->bytes_queued_for_xmit),
+              U64_PRINTF_ARG(queued));
+    /* Update the channel's estimate */
+    chan->bytes_queued_for_xmit = queued;
+
+    /* Update the global queue size estimate if appropriate */
+    if (chan->state == CHANNEL_STATE_OPEN ||
+        chan->state == CHANNEL_STATE_MAINT) {
+      estimated_total_queue_size += adj;
+      log_debug(LD_CHANNEL,
+                "Increasing global queue size by " U64_FORMAT " for channel "
+                U64_FORMAT ", new size is " U64_FORMAT,
+                U64_PRINTF_ARG(adj), U64_PRINTF_ARG(chan->global_identifier),
+                U64_PRINTF_ARG(estimated_total_queue_size));
+    }
+  } else if (queued < chan->bytes_queued_for_xmit) {
+    adj = chan->bytes_queued_for_xmit - queued;
+    log_debug(LD_CHANNEL,
+              "Decreasing queue size for channel " U64_FORMAT " by " U64_FORMAT
+              " from " U64_FORMAT " to " U64_FORMAT,
+              U64_PRINTF_ARG(chan->global_identifier),
+              U64_PRINTF_ARG(adj),
+              U64_PRINTF_ARG(chan->bytes_queued_for_xmit),
+              U64_PRINTF_ARG(queued));
+    /* Update the channel's estimate */
+    chan->bytes_queued_for_xmit = queued;
+
+    /* Update the global queue size estimate if appropriate */
+    if (chan->state == CHANNEL_STATE_OPEN ||
+        chan->state == CHANNEL_STATE_MAINT) {
+      estimated_total_queue_size -= adj;
+      log_debug(LD_CHANNEL,
+                "Decreasing global queue size by " U64_FORMAT " for channel "
+                U64_FORMAT ", new size is " U64_FORMAT,
+                U64_PRINTF_ARG(adj), U64_PRINTF_ARG(chan->global_identifier),
+                U64_PRINTF_ARG(estimated_total_queue_size));
+    }
+  }
+}
+
diff --git a/src/or/channel.h b/src/or/channel.h
index f35cb27..388c729 100644
--- a/src/or/channel.h
+++ b/src/or/channel.h
@@ -79,6 +79,11 @@ struct channel_s {
   /* Methods implemented by the lower layer */
 
   /**
+   * Ask the lower layer for an estimate of the average overhead for
+   * transmissions on this channel.
+   */
+  double (*get_overhead_estimate)(channel_t *);
+  /*
    * Ask the underlying transport what the remote endpoint address is, in
    * a tor_addr_t.  This is optional and subclasses may leave this NULL.
    * If they implement it, they should write the address out to the
@@ -110,6 +115,8 @@ struct channel_s {
   int (*matches_extend_info)(channel_t *, extend_info_t *);
   /** Check if this channel matches a target address when extending */
   int (*matches_target)(channel_t *, const tor_addr_t *);
+  /* Ask the lower layer how many bytes it has queued but not yet sent */
+  size_t (*num_bytes_queued)(channel_t *);
   /* Ask the lower layer how many cells can be written */
   int (*num_cells_writeable)(channel_t *);
   /* Write a cell to an open channel */
@@ -202,6 +209,14 @@ struct channel_s {
   /** Channel counters for cell channels */
   uint64_t n_cells_recved, n_bytes_recved;
   uint64_t n_cells_xmitted, n_bytes_xmitted;
+
+  /** Our current contribution to the scheduler's total xmit queue */
+  uint64_t bytes_queued_for_xmit;
+
+  /** Number of bytes in this channel's cell queue; does not include
+   * lower-layer queueing.
+   */
+  uint64_t bytes_in_queue;
 };
 
 struct channel_listener_s {
@@ -460,6 +475,7 @@ unsigned int channel_num_circuits(channel_t *chan);
 void channel_set_circid_type(channel_t *chan, crypto_pk_t *identity_rcvd,
                              int consider_identity);
 void channel_timestamp_client(channel_t *chan);
+void channel_update_xmit_queue_size(channel_t *chan);
 
 const char * channel_listener_describe_transport(channel_listener_t *chan_l);
 void channel_listener_dump_statistics(channel_listener_t *chan_l,
diff --git a/src/or/channeltls.c b/src/or/channeltls.c
index 7df2d35..a8222de 100644
--- a/src/or/channeltls.c
+++ b/src/or/channeltls.c
@@ -55,6 +55,7 @@ static void channel_tls_common_init(channel_tls_t *tlschan);
 static void channel_tls_close_method(channel_t *chan);
 static const char * channel_tls_describe_transport_method(channel_t *chan);
 static void channel_tls_free_method(channel_t *chan);
+static double channel_tls_get_overhead_estimate_method(channel_t *chan);
 static int
 channel_tls_get_remote_addr_method(channel_t *chan, tor_addr_t *addr_out);
 static int
@@ -69,6 +70,7 @@ channel_tls_matches_extend_info_method(channel_t *chan,
 static int channel_tls_matches_target_method(channel_t *chan,
                                              const tor_addr_t *target);
 static int channel_tls_num_cells_writeable_method(channel_t *chan);
+static size_t channel_tls_num_bytes_queued_method(channel_t *chan);
 static int channel_tls_write_cell_method(channel_t *chan,
                                          cell_t *cell);
 static int channel_tls_write_packed_cell_method(channel_t *chan,
@@ -118,6 +120,7 @@ channel_tls_common_init(channel_tls_t *tlschan)
   chan->close = channel_tls_close_method;
   chan->describe_transport = channel_tls_describe_transport_method;
   chan->free = channel_tls_free_method;
+  chan->get_overhead_estimate = channel_tls_get_overhead_estimate_method;
   chan->get_remote_addr = channel_tls_get_remote_addr_method;
   chan->get_remote_descr = channel_tls_get_remote_descr_method;
   chan->get_transport_name = channel_tls_get_transport_name_method;
@@ -125,6 +128,7 @@ channel_tls_common_init(channel_tls_t *tlschan)
   chan->is_canonical = channel_tls_is_canonical_method;
   chan->matches_extend_info = channel_tls_matches_extend_info_method;
   chan->matches_target = channel_tls_matches_target_method;
+  chan->num_bytes_queued = channel_tls_num_bytes_queued_method;
   chan->num_cells_writeable = channel_tls_num_cells_writeable_method;
   chan->write_cell = channel_tls_write_cell_method;
   chan->write_packed_cell = channel_tls_write_packed_cell_method;
@@ -438,6 +442,40 @@ channel_tls_free_method(channel_t *chan)
 }
 
 /**
+ * Get an estimate of the average TLS overhead for the upper layer
+ */
+
+static double
+channel_tls_get_overhead_estimate_method(channel_t *chan)
+{
+  double overhead = 1.0f;
+  channel_tls_t *tlschan = BASE_CHAN_TO_TLS(chan);
+
+  tor_assert(tlschan);
+  tor_assert(tlschan->conn);
+
+  /* Just return 1.0f if we don't have sensible data */
+  if (tlschan->conn->bytes_xmitted > 0 &&
+      tlschan->conn->bytes_xmitted_by_tls >=
+      tlschan->conn->bytes_xmitted) {
+    overhead = ((double)(tlschan->conn->bytes_xmitted_by_tls)) /
+      ((double)(tlschan->conn->bytes_xmitted));
+
+    /*
+     * Never estimate more than 2.0; otherwise we get silly large estimates
+     * at the very start of a new TLS connection.
+     */
+    if (overhead > 2.0f) overhead = 2.0f;
+  }
+
+  log_debug(LD_CHANNEL,
+            "Estimated overhead ratio for TLS chan " U64_FORMAT " is %f",
+            U64_PRINTF_ARG(chan->global_identifier), overhead);
+
+  return overhead;
+}
+
+/**
  * Get the remote address of a channel_tls_t
  *
  * This implements the get_remote_addr method for channel_tls_t; copy the
@@ -676,6 +714,22 @@ channel_tls_matches_target_method(channel_t *chan,
 }
 
 /**
+ * Tell the upper layer how many bytes we have queued and not yet
+ * sent.
+ */
+
+static size_t
+channel_tls_num_bytes_queued_method(channel_t *chan)
+{
+  channel_tls_t *tlschan = BASE_CHAN_TO_TLS(chan);
+
+  tor_assert(tlschan);
+  tor_assert(tlschan->conn);
+
+  return connection_get_outbuf_len(TO_CONN(tlschan->conn));
+}
+
+/**
  * Tell the upper layer how many cells we can accept to write
  *
  * This implements the num_cells_writeable method for channel_tls_t; it
diff --git a/src/or/connection.c b/src/or/connection.c
index 4a3bd2c..525f4b5 100644
--- a/src/or/connection.c
+++ b/src/or/connection.c
@@ -3839,6 +3839,8 @@ connection_handle_write_impl(connection_t *conn, int force)
     tor_tls_get_n_raw_bytes(or_conn->tls, &n_read, &n_written);
     log_debug(LD_GENERAL, "After TLS write of %d: %ld read, %ld written",
               result, (long)n_read, (long)n_written);
+    or_conn->bytes_xmitted += result;
+    or_conn->bytes_xmitted_by_tls += n_written;
     /* So we notice bytes were written even on error */
     /* XXXX024 This cast is safe since we can never write INT_MAX bytes in a
      * single set of TLS operations. But it looks kinda ugly. If we refactor
diff --git a/src/or/connection_or.c b/src/or/connection_or.c
index 6b276cc..445335b 100644
--- a/src/or/connection_or.c
+++ b/src/or/connection_or.c
@@ -583,6 +583,9 @@ connection_or_flushed_some(or_connection_t *conn)
 {
   size_t datalen;
 
+  /* The channel will want to update its estimated queue size */
+  channel_update_xmit_queue_size(TLS_CHAN_TO_BASE(conn->chan));
+
   /* If we're under the low water mark, add cells until we're just over the
    * high water mark. */
   datalen = connection_get_outbuf_len(TO_CONN(conn));
diff --git a/src/or/or.h b/src/or/or.h
index bdcb29e..320931d 100644
--- a/src/or/or.h
+++ b/src/or/or.h
@@ -1529,6 +1529,12 @@ typedef struct or_connection_t {
   /** Last emptied write token bucket in msec since midnight; only used if
    * TB_EMPTY events are enabled. */
   uint32_t write_emptied_time;
+
+  /*
+   * Count the number of bytes flushed out on this orconn, and the number of
+   * bytes TLS actually sent - used for overhead estimation for scheduling.
+   */
+  uint64_t bytes_xmitted, bytes_xmitted_by_tls;
 } or_connection_t;
 
 /** Subtype of connection_t for an "edge connection" -- that is, an entry (ap)



_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits