[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [tor/master] Multithreading support for event-queue code.



commit e2a6a7ec6178834c3de7a3be614679120e2c00c8
Author: Nick Mathewson <nickm@xxxxxxxxxxxxxx>
Date:   Wed Aug 12 10:10:11 2015 -0400

    Multithreading support for event-queue code.
---
 src/or/control.c |   61 ++++++++++++++++++++++++++++++++++++++++--------------
 1 file changed, 46 insertions(+), 15 deletions(-)

diff --git a/src/or/control.c b/src/or/control.c
index 4b8e9c1..2be99d5 100644
--- a/src/or/control.c
+++ b/src/or/control.c
@@ -594,13 +594,20 @@ typedef struct queued_event_s {
   char *msg;
 } queued_event_t;
 
-/** If this is greater than 0, we don't allow new events to be queued. */
+/** If this is greater than 0, we don't allow new events to be queued.
+ * XXXX This should be thread-local. */
 static int block_event_queue = 0;
 
 /** Holds a smartlist of queued_event_t objects that may need to be sent
  * to one or more controllers */
 static smartlist_t *queued_control_events = NULL;
 
+/** True if the flush_queued_events_event is pending. */
+static int flush_queued_event_pending = 0;
+
+/** Lock to protect the above fields. */
+static tor_mutex_t *queued_control_events_lock = NULL;
+
 /** An event that should fire in order to flush the contents of
  * queued_control_events. */
 static struct event *flush_queued_events_event = NULL;
@@ -621,6 +628,10 @@ control_initialize_event_queue(void)
       tor_assert(flush_queued_events_event);
     }
   }
+
+  if (queued_control_events_lock == NULL) {
+    queued_control_events_lock = tor_mutex_new();
+  }
 }
 
 /** Helper: inserts an event on the list of events queued to be sent to
@@ -643,30 +654,43 @@ queue_control_event_string,(uint16_t event, char *msg))
     tor_free(msg);
     return;
   }
-  if (block_event_queue) {
-    tor_free(msg);
-    return;
-  }
 
   queued_event_t *ev = tor_malloc(sizeof(*ev));
   ev->event = event;
   ev->msg = msg;
 
+  tor_mutex_acquire(queued_control_events_lock);
+  if (block_event_queue) { /* XXXX This should be thread-specific. */
+    tor_mutex_release(queued_control_events_lock);
+    tor_free(msg);
+    tor_free(ev);
+    return;
+  }
+
   /* No queueing an event while queueing an event */
   ++block_event_queue;
 
   tor_assert(queued_control_events);
   smartlist_add(queued_control_events, ev);
 
-  /* We just put the first event on the queue; mark the queue to be
-   * flushed.
+  int activate_event = 0;
+  if (! flush_queued_event_pending && in_main_thread()) {
+    activate_event = 1;
+    flush_queued_event_pending = 1;
+  }
+
+  --block_event_queue;
+
+  tor_mutex_release(queued_control_events_lock);
+
+  /* We just put an event on the queue; mark the queue to be
+   * flushed.  We only do this from the main thread for now; otherwise,
+   * we'd need to incur locking overhead in Libevent or use a socket.
    */
-  if (smartlist_len(queued_control_events) == 1) {
+  if (activate_event) {
     tor_assert(flush_queued_events_event);
     event_active(flush_queued_events_event, EV_READ, 1);
   }
-
-  --block_event_queue;
 }
 
 /** Release all storage held by <b>ev</b>. */
@@ -687,15 +711,20 @@ queued_event_free(queued_event_t *ev)
 static void
 queued_events_flush_all(int force)
 {
-  smartlist_t *all_conns = get_connection_array();
-  smartlist_t *controllers = smartlist_new();
-
   if (PREDICT_UNLIKELY(queued_control_events == NULL)) {
     return;
   }
+  smartlist_t *all_conns = get_connection_array();
+  smartlist_t *controllers = smartlist_new();
+  smartlist_t *queued_events;
 
+  tor_mutex_acquire(queued_control_events_lock);
   /* No queueing an event while flushing events. */
   ++block_event_queue;
+  flush_queued_event_pending = 0;
+  queued_events = queued_control_events;
+  queued_control_events = smartlist_new();
+  tor_mutex_release(queued_control_events_lock);
 
   /* Gather all the controllers that will care... */
   SMARTLIST_FOREACH_BEGIN(all_conns, connection_t *, conn) {
@@ -708,7 +737,7 @@ queued_events_flush_all(int force)
     }
   } SMARTLIST_FOREACH_END(conn);
 
-  SMARTLIST_FOREACH_BEGIN(queued_control_events, queued_event_t *, ev) {
+  SMARTLIST_FOREACH_BEGIN(queued_events, queued_event_t *, ev) {
     const event_mask_t bit = ((event_mask_t)1) << ev->event;
     const size_t msg_len = strlen(ev->msg);
     SMARTLIST_FOREACH_BEGIN(controllers, control_connection_t *,
@@ -728,10 +757,12 @@ queued_events_flush_all(int force)
     } SMARTLIST_FOREACH_END(control_conn);
   }
 
-  smartlist_clear(queued_control_events);
+  smartlist_free(queued_events);
   smartlist_free(controllers);
 
+  tor_mutex_acquire(queued_control_events_lock);
   --block_event_queue;
+  tor_mutex_release(queued_control_events_lock);
 }
 
 /** Libevent callback: Flushes pending events to controllers that are



_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits