[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [tor/master] Use a non-stupid data structure in the scheduler



commit ed1927d6bf8b9d60d40f6fbc20f9e1575a35e59d
Author: Andrea Shepard <andrea@xxxxxxxxxxxxxx>
Date:   Thu Dec 12 04:22:53 2013 -0800

    Use a non-stupid data structure in the scheduler
---
 src/or/channel.h   |    3 +
 src/or/scheduler.c |  190 ++++++++++++++++++++++++++++++++++++----------------
 2 files changed, 137 insertions(+), 56 deletions(-)

diff --git a/src/or/channel.h b/src/or/channel.h
index ced717a..023c39d 100644
--- a/src/or/channel.h
+++ b/src/or/channel.h
@@ -80,6 +80,9 @@ struct channel_s {
     SCHED_CHAN_PENDING
   } scheduler_state;
 
+  /** Heap index for use by the scheduler */
+  int sched_heap_idx;
+
   /** Timestamps for both cell channels and listeners */
   time_t timestamp_created; /* Channel created */
   time_t timestamp_active; /* Any activity */
diff --git a/src/or/scheduler.c b/src/or/scheduler.c
index c1b64df..4f12696 100644
--- a/src/or/scheduler.c
+++ b/src/or/scheduler.c
@@ -24,6 +24,14 @@
 #define SCHED_Q_HIGH_WATER (2 * SCHED_Q_LOW_WATER)
 
 /*
+ * Maximum cells to flush in a single call to channel_flush_some_cells();
+ * setting this low means more calls, but too high and we could overshoot
+ * SCHED_Q_HIGH_WATER.
+ */
+
+#define SCHED_MAX_FLUSH_CELLS 16
+
+/*
  * Write scheduling works by keeping track of which channels can
  * accept cells, and have cells to write.  From the scheduler's perspective,
  * a channel can be in four possible states:
@@ -100,7 +108,7 @@
  * is reserved for our use.
  */
 
-/* List of channels that can write and have cells (pending work) */
+/* Pqueue of channels that can write and have cells (pending work) */
 static smartlist_t *channels_pending = NULL;
 
 /*
@@ -125,7 +133,7 @@ static time_t queue_heuristic_timestamp = 0;
 
 /* Scheduler static function declarations */
 
-static int scheduler_compare_channels(const void **c1_v, const void **c2_v);
+static int scheduler_compare_channels(const void *c1_v, const void *c2_v);
 static void scheduler_evt_callback(evutil_socket_t fd,
                                    short events, void *arg);
 static int scheduler_more_work(void);
@@ -162,7 +170,7 @@ scheduler_free_all(void)
  */
 
 static int
-scheduler_compare_channels(const void **c1_v, const void **c2_v)
+scheduler_compare_channels(const void *c1_v, const void *c2_v)
 {
   channel_t *c1 = NULL, *c2 = NULL;
   /* These are a workaround for -Wbad-function-cast throwing a fit */
@@ -172,8 +180,8 @@ scheduler_compare_channels(const void **c1_v, const void **c2_v)
   tor_assert(c1_v);
   tor_assert(c2_v);
 
-  c1 = (channel_t *)(*c1_v);
-  c2 = (channel_t *)(*c2_v);
+  c1 = (channel_t *)(c1_v);
+  c2 = (channel_t *)(c2_v);
 
   tor_assert(c1);
   tor_assert(c2);
@@ -241,7 +249,10 @@ scheduler_channel_doesnt_want_writes(channel_t *chan)
      * the other lists.  It can't write any more, so it goes to
      * channels_waiting_to_write.
      */
-    smartlist_remove(channels_pending, chan);
+    smartlist_pqueue_remove(channels_pending,
+                            scheduler_compare_channels,
+                            STRUCT_OFFSET(channel_t, sched_heap_idx),
+                            chan);
     chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE;
     log_debug(LD_SCHED,
               "Channel " U64_FORMAT " at %p went from pending "
@@ -280,7 +291,10 @@ scheduler_channel_has_waiting_cells(channel_t *chan)
      * channels_pending.
      */
     chan->scheduler_state = SCHED_CHAN_PENDING;
-    smartlist_add(channels_pending, chan);
+    smartlist_pqueue_add(channels_pending,
+                         scheduler_compare_channels,
+                         STRUCT_OFFSET(channel_t, sched_heap_idx),
+                         chan);
     log_debug(LD_SCHED,
               "Channel " U64_FORMAT " at %p went from waiting_for_cells "
               "to pending",
@@ -353,7 +367,10 @@ scheduler_release_channel(channel_t *chan)
   tor_assert(channels_pending);
 
   if (chan->scheduler_state == SCHED_CHAN_PENDING) {
-    smartlist_remove(channels_pending, chan);
+    smartlist_pqueue_remove(channels_pending,
+                            scheduler_compare_channels,
+                            STRUCT_OFFSET(channel_t, sched_heap_idx),
+                            chan);
   }
 
   chan->scheduler_state = SCHED_CHAN_IDLE;
@@ -364,10 +381,11 @@ scheduler_release_channel(channel_t *chan)
 void
 scheduler_run(void)
 {
-  smartlist_t *tmp = NULL;
   int n_cells, n_chans_before, n_chans_after;
   uint64_t q_len_before, q_heur_before, q_len_after, q_heur_after;
   ssize_t flushed, flushed_this_time;
+  smartlist_t *to_readd = NULL;
+  channel_t *chan = NULL;
 
   log_debug(LD_SCHED, "We have a chance to run the scheduler");
 
@@ -375,61 +393,118 @@ scheduler_run(void)
     n_chans_before = smartlist_len(channels_pending);
     q_len_before = channel_get_global_queue_estimate();
     q_heur_before = scheduler_get_queue_heuristic();
-    tmp = channels_pending;
-    channels_pending = smartlist_new();
-
-    /*
-     * UGLY HACK: sort the list on each invocation
-     *
-     * TODO smarter data structures
-     */
-    smartlist_sort(tmp, scheduler_compare_channels);
-
-    SMARTLIST_FOREACH_BEGIN(tmp, channel_t *, chan) {
-      if (scheduler_get_queue_heuristic() <= SCHED_Q_HIGH_WATER) {
-        n_cells = channel_num_cells_writeable(chan);
-        if (n_cells > 0) {
-          log_debug(LD_SCHED,
-                    "Scheduler saw pending channel " U64_FORMAT " at %p with "
-                    "%d cells writeable",
-                    U64_PRINTF_ARG(chan->global_identifier), chan, n_cells);
-
-          flushed = 0;
-          while (flushed < n_cells) {
-            flushed_this_time =
-              channel_flush_some_cells(chan, n_cells - flushed);
-            if (flushed_this_time <= 0) break;
-            flushed += flushed_this_time;
-          }
 
-          if (flushed < n_cells) {
-            /* We ran out of cells to flush */
-            chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS;
-          } else {
-            /* TODO get this right */
-          }
+    while (scheduler_get_queue_heuristic() <= SCHED_Q_HIGH_WATER &&
+           smartlist_len(channels_pending) > 0) {
+      /* Pop off a channel */
+      chan = smartlist_pqueue_pop(channels_pending,
+                                  scheduler_compare_channels,
+                                  STRUCT_OFFSET(channel_t, sched_heap_idx));
+      tor_assert(chan);
+
+      /* Figure out how many cells we can write */
+      n_cells = channel_num_cells_writeable(chan);
+      if (n_cells > 0) {
+        log_debug(LD_SCHED,
+                  "Scheduler saw pending channel " U64_FORMAT " at %p with "
+                  "%d cells writeable",
+                  U64_PRINTF_ARG(chan->global_identifier), chan, n_cells);
+
+        flushed = 0;
+        while (flushed < n_cells &&
+               scheduler_get_queue_heuristic() <= SCHED_Q_HIGH_WATER) {
+          flushed_this_time =
+            channel_flush_some_cells(chan,
+                                     MIN(SCHED_MAX_FLUSH_CELLS,
+                                         n_cells - flushed));
+          if (flushed_this_time <= 0) break;
+          flushed += flushed_this_time;
+        }
 
+        if (flushed < n_cells) {
+          /* We ran out of cells to flush */
+          chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS;
           log_debug(LD_SCHED,
-                    "Scheduler flushed %d cells onto pending channel "
-                    U64_FORMAT " at %p",
-                    (int)flushed, U64_PRINTF_ARG(chan->global_identifier),
+                    "Channel " U64_FORMAT " at %p "
+                    "entered waiting_for_cells from pending",
+                    U64_PRINTF_ARG(chan->global_identifier),
                     chan);
         } else {
-          log_info(LD_SCHED,
-                   "Scheduler saw pending channel " U64_FORMAT " at %p with "
-                   "no cells writeable",
-                   U64_PRINTF_ARG(chan->global_identifier), chan);
-          /* Put it back to WAITING_TO_WRITE */
-          chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE;
+          /* The channel may still have some cells */
+          if (channel_more_to_flush(chan)) {
+          /* The channel goes to either pending or waiting_to_write */
+            if (channel_num_cells_writeable(chan) > 0) {
+              /* Add it back to pending later */
+              if (!to_readd) to_readd = smartlist_new();
+              smartlist_add(to_readd, chan);
+              log_debug(LD_SCHED,
+                        "Channel " U64_FORMAT " at %p "
+                        "is still pending",
+                        U64_PRINTF_ARG(chan->global_identifier),
+                        chan);
+            } else {
+              /* It's waiting to be able to write more */
+              chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE;
+              log_debug(LD_SCHED,
+                        "Channel " U64_FORMAT " at %p "
+                        "entered waiting_to_write from pending",
+                        U64_PRINTF_ARG(chan->global_identifier),
+                        chan);
+            }
+          } else {
+            /* No cells left; it can go to idle or waiting_for_cells */
+            if (channel_num_cells_writeable(chan) > 0) {
+              /*
+               * It can still accept writes, so it goes to
+               * waiting_for_cells
+               */
+              chan->scheduler_state = SCHED_CHAN_WAITING_FOR_CELLS;
+              log_debug(LD_SCHED,
+                        "Channel " U64_FORMAT " at %p "
+                        "entered waiting_for_cells from pending",
+                        U64_PRINTF_ARG(chan->global_identifier),
+                        chan);
+            } else {
+              /*
+               * We exactly filled up the output queue with all available
+               * cells; go to idle.
+               */
+              chan->scheduler_state = SCHED_CHAN_IDLE;
+              log_debug(LD_SCHED,
+                        "Channel " U64_FORMAT " at %p "
+                        "become idle from pending",
+                        U64_PRINTF_ARG(chan->global_identifier),
+                        chan);
+            }
+          }
         }
+
+        log_debug(LD_SCHED,
+                  "Scheduler flushed %d cells onto pending channel "
+                  U64_FORMAT " at %p",
+                  (int)flushed, U64_PRINTF_ARG(chan->global_identifier),
+                  chan);
       } else {
-        /* Not getting it this round; put it back on the list */
-        smartlist_add(channels_pending, chan);
-        /* It states in SCHED_CHAN_PENDING */
+        log_info(LD_SCHED,
+                 "Scheduler saw pending channel " U64_FORMAT " at %p with "
+                 "no cells writeable",
+                 U64_PRINTF_ARG(chan->global_identifier), chan);
+        /* Put it back to WAITING_TO_WRITE */
+        chan->scheduler_state = SCHED_CHAN_WAITING_TO_WRITE;
       }
-    } SMARTLIST_FOREACH_END(chan);
+    }
 
-    smartlist_free(tmp);
+    /* Readd any channels we need to */
+    if (to_readd) {
+      SMARTLIST_FOREACH_BEGIN(to_readd, channel_t *, chan) {
+        chan->scheduler_state = SCHED_CHAN_PENDING;
+        smartlist_pqueue_add(channels_pending,
+                             scheduler_compare_channels,
+                             STRUCT_OFFSET(channel_t, sched_heap_idx),
+                             chan);
+      } SMARTLIST_FOREACH_END(chan);
+      smartlist_free(to_readd);
+    }
 
     n_chans_after = smartlist_len(channels_pending);
     q_len_after = channel_get_global_queue_estimate();
@@ -473,7 +548,10 @@ scheduler_channel_wants_writes(channel_t *chan)
     /*
      * It can write now, so it goes to channels_pending.
      */
-    smartlist_add(channels_pending, chan);
+    smartlist_pqueue_add(channels_pending,
+                         scheduler_compare_channels,
+                         STRUCT_OFFSET(channel_t, sched_heap_idx),
+                         chan);
     chan->scheduler_state = SCHED_CHAN_PENDING;
     log_debug(LD_SCHED,
               "Channel " U64_FORMAT " at %p went from waiting_to_write "



_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits