[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [tor/master] Use SIMPLEQ, not smartlist_t, for channel cell queues.



commit 7c9954a02abd16e5c74c2a5dea9ed0f65af230be
Author: Nick Mathewson <nickm@xxxxxxxxxxxxxx>
Date:   Fri Oct 12 17:58:01 2012 -0400

    Use SIMPLEQ, not smartlist_t, for channel cell queues.
    
    This lets  us use fewer memory allocations, and avoid O(n^2) iterations
---
 src/or/channel.c |  158 ++++++++++++++++++++----------------------------------
 src/or/channel.h |    9 +++-
 2 files changed, 66 insertions(+), 101 deletions(-)

diff --git a/src/or/channel.c b/src/or/channel.c
index 6527288..b594d05 100644
--- a/src/or/channel.c
+++ b/src/or/channel.c
@@ -31,6 +31,7 @@
 
 typedef struct cell_queue_entry_s cell_queue_entry_t;
 struct cell_queue_entry_s {
+  SIMPLEQ_ENTRY(cell_queue_entry_s) next;
   enum {
     CELL_QUEUE_FIXED,
     CELL_QUEUE_VAR,
@@ -768,6 +769,10 @@ channel_init(channel_t *chan)
   /* Init next_circ_id */
   chan->next_circ_id = crypto_rand_int(1 << 15);
 
+  /* Initialize queues. */
+  SIMPLEQ_INIT(&chan->incoming_queue);
+  SIMPLEQ_INIT(&chan->outgoing_queue);
+
   /* Timestamp it */
   channel_timestamp_created(chan);
 }
@@ -861,6 +866,7 @@ channel_listener_free(channel_listener_t *chan_l)
 static void
 channel_force_free(channel_t *chan)
 {
+  cell_queue_entry_t *cell, *cell_tmp;
   tor_assert(chan);
 
   /* Call a free method if there is one */
@@ -869,26 +875,16 @@ channel_force_free(channel_t *chan)
   channel_clear_remote_end(chan);
 
   /* We might still have a cell queue; kill it */
-  if (chan->incoming_queue) {
-    SMARTLIST_FOREACH_BEGIN(chan->incoming_queue,
-                            cell_queue_entry_t *, q) {
-      cell_queue_entry_free(q, 0);
-    } SMARTLIST_FOREACH_END(q);
-
-    smartlist_free(chan->incoming_queue);
-    chan->incoming_queue = NULL;
+  SIMPLEQ_FOREACH_SAFE(cell, &chan->incoming_queue, next, cell_tmp) {
+      cell_queue_entry_free(cell, 0);
   }
+  SIMPLEQ_INIT(&chan->incoming_queue);
 
   /* Outgoing cell queue is similar, but we can have to free packed cells */
-  if (chan->outgoing_queue) {
-    SMARTLIST_FOREACH_BEGIN(chan->outgoing_queue,
-                            cell_queue_entry_t *, q) {
-      cell_queue_entry_free(q, 0);
-    } SMARTLIST_FOREACH_END(q);
-
-    smartlist_free(chan->outgoing_queue);
-    chan->outgoing_queue = NULL;
+  SIMPLEQ_FOREACH_SAFE(cell, &chan->outgoing_queue, next, cell_tmp) {
+    cell_queue_entry_free(cell, 0);
   }
+  SIMPLEQ_INIT(&chan->outgoing_queue);
 
   tor_free(chan);
 }
@@ -1046,8 +1042,7 @@ channel_set_cell_handlers(channel_t *chan,
   chan->var_cell_handler = var_cell_handler;
 
   /* Re-run the queue if we have one and there's any reason to */
-  if (chan->incoming_queue &&
-      (smartlist_len(chan->incoming_queue) > 0) &&
+  if (! SIMPLEQ_EMPTY(&chan->incoming_queue) &&
       try_again &&
       (chan->cell_handler ||
        chan->var_cell_handler)) channel_process_cells(chan);
@@ -1669,9 +1664,8 @@ channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q)
   }
 
   /* Can we send it right out?  If so, try */
-  if (!(chan->outgoing_queue &&
-        (smartlist_len(chan->outgoing_queue) > 0)) &&
-       chan->state == CHANNEL_STATE_OPEN) {
+  if (SIMPLEQ_EMPTY(&chan->outgoing_queue) &&
+      chan->state == CHANNEL_STATE_OPEN) {
     /* Pick the right write function for this cell type and save the result */
     switch (q->type) {
       case CELL_QUEUE_FIXED:
@@ -1707,14 +1701,12 @@ channel_write_cell_queue_entry(channel_t *chan, cell_queue_entry_t *q)
 
   if (!sent) {
     /* Not sent, queue it */
-    if (!(chan->outgoing_queue))
-      chan->outgoing_queue = smartlist_new();
     /*
      * We have to copy the queue entry passed in, since the caller probably
      * used the stack.
      */
     tmp = cell_queue_entry_dup(q);
-    smartlist_add(chan->outgoing_queue, tmp);
+    SIMPLEQ_INSERT_TAIL(&chan->outgoing_queue, tmp, next);
     /* Try to process the queue? */
     if (chan->state == CHANNEL_STATE_OPEN) channel_flush_cells(chan);
   }
@@ -1900,19 +1892,15 @@ channel_change_state(channel_t *chan, channel_state_t to_state)
     channel_do_open_actions(chan);
 
     /* Check for queued cells to process */
-    if (chan->incoming_queue &&
-        smartlist_len(chan->incoming_queue) > 0)
+    if (! SIMPLEQ_EMPTY(&chan->incoming_queue))
       channel_process_cells(chan);
-    if (chan->outgoing_queue &&
-        smartlist_len(chan->outgoing_queue) > 0)
+    if (! SIMPLEQ_EMPTY(&chan->outgoing_queue))
       channel_flush_cells(chan);
   } else if (to_state == CHANNEL_STATE_CLOSED ||
              to_state == CHANNEL_STATE_ERROR) {
     /* Assert that all queues are empty */
-    tor_assert(!(chan->incoming_queue) ||
-                smartlist_len(chan->incoming_queue) == 0);
-    tor_assert(!(chan->outgoing_queue) ||
-                smartlist_len(chan->outgoing_queue) == 0);
+    tor_assert(SIMPLEQ_EMPTY(&chan->incoming_queue));
+    tor_assert(SIMPLEQ_EMPTY(&chan->outgoing_queue));
   }
 }
 
@@ -2086,16 +2074,9 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan,
   /* If we aren't in CHANNEL_STATE_OPEN, nothing goes through */
   if (chan->state == CHANNEL_STATE_OPEN) {
     while ((unlimited || num_cells > flushed) &&
-           (chan->outgoing_queue &&
-            (smartlist_len(chan->outgoing_queue) > 0))) {
-      /*
-       * Ewww, smartlist_del_keeporder() is O(n) in list length; maybe a
-       * a linked list would make more sense for the queue.
-       */
-
-      /* Get the head of the queue */
-      q = smartlist_get(chan->outgoing_queue, 0);
-      if (q) {
+           NULL != (q = SIMPLEQ_FIRST(&chan->outgoing_queue))) {
+
+      if (1) {
         /*
          * Okay, we have a good queue entry, try to give it to the lower
          * layer.
@@ -2180,26 +2161,17 @@ channel_flush_some_cells_from_outgoing_queue(channel_t *chan,
             cell_queue_entry_free(q, 0);
             q = NULL;
         }
-      } else {
-        /* This shouldn't happen; log and throw it away */
-        log_info(LD_CHANNEL,
-                 "Saw a NULL entry in the outgoing cell queue on channel %p "
-                 "(global ID " U64_FORMAT "); this is definitely a bug.",
-                 chan, U64_PRINTF_ARG(chan->global_identifier));
-        /* q is already NULL, so we know to delete that queue entry */
-      }
 
-      /* if q got NULLed out, we used it and should remove the queue entry */
-      if (!q) smartlist_del_keeporder(chan->outgoing_queue, 0);
-      /* No cell removed from list, so we can't go on any further */
-      else break;
+        /* if q got NULLed out, we used it and should remove the queue entry */
+        if (!q) SIMPLEQ_REMOVE_HEAD(&chan->outgoing_queue, next);
+        /* No cell removed from list, so we can't go on any further */
+        else break;
+      }
     }
   }
 
   /* Did we drain the queue? */
-  if (!(chan->outgoing_queue) ||
-      smartlist_len(chan->outgoing_queue) == 0) {
-    /* Timestamp it */
+  if (SIMPLEQ_EMPTY(&chan->outgoing_queue)) {
     channel_timestamp_drained(chan);
   }
 
@@ -2233,8 +2205,8 @@ channel_more_to_flush(channel_t *chan)
   tor_assert(chan);
 
   /* Check if we have any queued */
-  if (chan->incoming_queue &&
-      smartlist_len(chan->incoming_queue) > 0) return 1;
+  if (! SIMPLEQ_EMPTY(&chan->incoming_queue))
+      return 1;
 
   /* Check if any circuits would like to queue some */
   if (circuitmux_num_cells(chan->cmux) > 0) return 1;
@@ -2427,8 +2399,7 @@ channel_listener_queue_incoming(channel_listener_t *listener,
 void
 channel_process_cells(channel_t *chan)
 {
-  smartlist_t *queue;
-  int putback = 0;
+  cell_queue_entry_t *q;
   tor_assert(chan);
   tor_assert(chan->state == CHANNEL_STATE_CLOSING ||
              chan->state == CHANNEL_STATE_MAINT ||
@@ -2442,24 +2413,21 @@ channel_process_cells(channel_t *chan)
   if (!(chan->cell_handler ||
         chan->var_cell_handler)) return;
   /* Nothing we can do if we have no cells */
-  if (!(chan->incoming_queue)) return;
+  if (SIMPLEQ_EMPTY(&chan->incoming_queue)) return;
 
-  queue = chan->incoming_queue;
-  chan->incoming_queue = NULL;
   /*
    * Process cells until we're done or find one we have no current handler
    * for.
    */
-  SMARTLIST_FOREACH_BEGIN(queue, cell_queue_entry_t *, q) {
+  while (NULL != (q = SIMPLEQ_FIRST(&chan->incoming_queue))) {
     tor_assert(q);
     tor_assert(q->type == CELL_QUEUE_FIXED ||
                q->type == CELL_QUEUE_VAR);
 
-    if (putback) {
-      smartlist_add(chan->incoming_queue, q);
-    } else if (q->type == CELL_QUEUE_FIXED &&
+    if (q->type == CELL_QUEUE_FIXED &&
         chan->cell_handler) {
       /* Handle a fixed-length cell */
+      SIMPLEQ_REMOVE_HEAD(&chan->incoming_queue, next);
       tor_assert(q->u.fixed.cell);
       log_debug(LD_CHANNEL,
                 "Processing incoming cell_t %p for channel %p (global ID "
@@ -2471,6 +2439,7 @@ channel_process_cells(channel_t *chan)
     } else if (q->type == CELL_QUEUE_VAR &&
                chan->var_cell_handler) {
       /* Handle a variable-length cell */
+      SIMPLEQ_REMOVE_HEAD(&chan->incoming_queue, next);
       tor_assert(q->u.var.var_cell);
       log_debug(LD_CHANNEL,
                 "Processing incoming var_cell_t %p for channel %p (global ID "
@@ -2481,15 +2450,10 @@ channel_process_cells(channel_t *chan)
       tor_free(q);
     } else {
       /* Can't handle this one */
-      if (!chan->incoming_queue)
-        chan->incoming_queue = smartlist_new();
-      smartlist_add(chan->incoming_queue, q);
-      putback = 1;
+      break;
     }
-  } SMARTLIST_FOREACH_END(q);
+  }
 
-  /* If the list is empty, free it */
-  smartlist_free(queue);
 }
 
 /**
@@ -2511,15 +2475,9 @@ channel_queue_cell(channel_t *chan, cell_t *cell)
 
   /* Do we need to queue it, or can we just call the handler right away? */
   if (!(chan->cell_handler)) need_to_queue = 1;
-  if (chan->incoming_queue &&
-      (smartlist_len(chan->incoming_queue) > 0))
+  if (! SIMPLEQ_EMPTY(&chan->incoming_queue))
     need_to_queue = 1;
 
-  /* If we need to queue and have no queue, create one */
-  if (need_to_queue && !(chan->incoming_queue)) {
-    chan->incoming_queue = smartlist_new();
-  }
-
   /* Timestamp for receiving */
   channel_timestamp_recv(chan);
 
@@ -2537,14 +2495,13 @@ channel_queue_cell(channel_t *chan, cell_t *cell)
     chan->cell_handler(chan, cell);
   } else {
     /* Otherwise queue it and then process the queue if possible. */
-    tor_assert(chan->incoming_queue);
     q = cell_queue_entry_new_fixed(cell);
     log_debug(LD_CHANNEL,
               "Queueing incoming cell_t %p for channel %p "
               "(global ID " U64_FORMAT ")",
               cell, chan,
               U64_PRINTF_ARG(chan->global_identifier));
-    smartlist_add(chan->incoming_queue, q);
+    SIMPLEQ_INSERT_TAIL(&chan->incoming_queue, q, next);
     if (chan->cell_handler ||
         chan->var_cell_handler) {
       channel_process_cells(chan);
@@ -2571,15 +2528,9 @@ channel_queue_var_cell(channel_t *chan, var_cell_t *var_cell)
 
   /* Do we need to queue it, or can we just call the handler right away? */
   if (!(chan->var_cell_handler)) need_to_queue = 1;
-  if (chan->incoming_queue &&
-      (smartlist_len(chan->incoming_queue) > 0))
+  if (! SIMPLEQ_EMPTY(&chan->incoming_queue))
     need_to_queue = 1;
 
-  /* If we need to queue and have no queue, create one */
-  if (need_to_queue && !(chan->incoming_queue)) {
-    chan->incoming_queue = smartlist_new();
-  }
-
   /* Timestamp for receiving */
   channel_timestamp_recv(chan);
 
@@ -2597,14 +2548,13 @@ channel_queue_var_cell(channel_t *chan, var_cell_t *var_cell)
     chan->var_cell_handler(chan, var_cell);
   } else {
     /* Otherwise queue it and then process the queue if possible. */
-    tor_assert(chan->incoming_queue);
     q = cell_queue_entry_new_var(var_cell);
     log_debug(LD_CHANNEL,
               "Queueing incoming var_cell_t %p for channel %p "
               "(global ID " U64_FORMAT ")",
               var_cell, chan,
               U64_PRINTF_ARG(chan->global_identifier));
-    smartlist_add(chan->incoming_queue, q);
+    SIMPLEQ_INSERT_TAIL(&chan->incoming_queue, q, next);
     if (chan->cell_handler ||
         chan->var_cell_handler) {
       channel_process_cells(chan);
@@ -3134,6 +3084,19 @@ channel_listener_describe_transport(channel_listener_t *chan_l)
 }
 
 /**
+ * Return the number of entries in <b>queue</b>
+ */
+static int
+chan_cell_queue_len(const chan_cell_queue_t *queue)
+{
+  int r = 0;
+  cell_queue_entry_t *cell;
+  SIMPLEQ_FOREACH(cell, queue, next)
+    ++r;
+  return r;
+}
+
+/**
  * Dump channel statistics
  *
  * Dump statistics for one channel to the log
@@ -3246,10 +3209,8 @@ channel_dump_statistics(channel_t *chan, int severity)
       " * Channel " U64_FORMAT " has %d queued incoming cells"
       " and %d queued outgoing cells",
       U64_PRINTF_ARG(chan->global_identifier),
-      (chan->incoming_queue != NULL) ?
-        smartlist_len(chan->incoming_queue) : 0,
-      (chan->outgoing_queue != NULL) ?
-        smartlist_len(chan->outgoing_queue) : 0);
+      chan_cell_queue_len(&chan->incoming_queue),
+      chan_cell_queue_len(&chan->outgoing_queue));
 
   /* Describe circuits */
   log(severity, LD_GENERAL,
@@ -3498,8 +3459,7 @@ channel_has_queued_writes(channel_t *chan)
   tor_assert(chan);
   tor_assert(chan->has_queued_writes);
 
-  if (chan->outgoing_queue &&
-      smartlist_len(chan->outgoing_queue) > 0) {
+  if (! SIMPLEQ_EMPTY(&chan->outgoing_queue)) {
     has_writes = 1;
   } else {
     /* Check with the lower layer */
diff --git a/src/or/channel.h b/src/or/channel.h
index cb9835a..ccb8fe8 100644
--- a/src/or/channel.h
+++ b/src/or/channel.h
@@ -10,6 +10,7 @@
 #define _TOR_CHANNEL_H
 
 #include "or.h"
+#include "tor_queue.h"
 #include "circuitmux.h"
 
 /* Channel handler function pointer typedefs */
@@ -17,6 +18,10 @@ typedef void (*channel_listener_fn_ptr)(channel_listener_t *, channel_t *);
 typedef void (*channel_cell_handler_fn_ptr)(channel_t *, cell_t *);
 typedef void (*channel_var_cell_handler_fn_ptr)(channel_t *, var_cell_t *);
 
+struct cell_queue_entry_s;
+SIMPLEQ_HEAD(chan_cell_queue, cell_queue_entry_s) incoming_queue;
+typedef struct chan_cell_queue chan_cell_queue_t;
+
 /*
  * Channel struct; see the channel_t typedef in or.h.  A channel is an
  * abstract interface for the OR-to-OR connection, similar to connection_or_t,
@@ -120,10 +125,10 @@ struct channel_s {
   channel_t *next_with_same_id, *prev_with_same_id;
 
   /* List of incoming cells to handle */
-  smartlist_t *incoming_queue;
+  chan_cell_queue_t incoming_queue;
 
   /* List of queued outgoing cells */
-  smartlist_t *outgoing_queue;
+  chan_cell_queue_t outgoing_queue;
 
   /* Circuit mux for circuits sending on this channel */
   circuitmux_t *cmux;



_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits