[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [tor/master] Make pending work cancellable.



commit c7eebe237ddf0555a99b2ef10fd95def2a4bbbd4
Author: Nick Mathewson <nickm@xxxxxxxxxxxxxx>
Date:   Tue Sep 24 16:57:40 2013 -0400

    Make pending work cancellable.
---
 src/common/workqueue.c     |   72 +++++++++++++++++++++++++++-----------------
 src/common/workqueue.h     |   18 +++++------
 src/test/bench_workqueue.c |   24 +++++----------
 3 files changed, 61 insertions(+), 53 deletions(-)

diff --git a/src/common/workqueue.c b/src/common/workqueue.c
index ea8dcb0..80e061d 100644
--- a/src/common/workqueue.c
+++ b/src/common/workqueue.c
@@ -31,16 +31,18 @@ keep array of threads; round-robin between them.
 
  */
 
-typedef struct workqueue_entry_s {
-  TOR_SIMPLEQ_ENTRY(workqueue_entry_s) next_work;
-  int (*fn)(int status, void *state, void *arg);
+struct workqueue_entry_s {
+  TOR_TAILQ_ENTRY(workqueue_entry_s) next_work;
+  struct workerthread_s *on_thread;
+  uint8_t pending;
+  int (*fn)(void *state, void *arg);
   void (*reply_fn)(void *arg);
   void *arg;
-} workqueue_entry_t;
+};
 
 struct replyqueue_s {
   tor_mutex_t lock;
-  TOR_SIMPLEQ_HEAD(, workqueue_entry_s) answers;
+  TOR_TAILQ_HEAD(, workqueue_entry_s) answers;
 
   void (*alert_fn)(struct replyqueue_s *); // lock not held on this, next 2. 
   tor_socket_t write_sock;
@@ -50,7 +52,7 @@ struct replyqueue_s {
 typedef struct workerthread_s {
   tor_mutex_t lock;
   tor_cond_t condition;
-  TOR_SIMPLEQ_HEAD(, workqueue_entry_s) work;
+  TOR_TAILQ_HEAD(, workqueue_entry_s) work;
   unsigned is_running;
   unsigned is_shut_down;
   unsigned waiting;
@@ -76,7 +78,7 @@ struct threadpool_s {
 static void queue_reply(replyqueue_t *queue, workqueue_entry_t *work);
 
 static workqueue_entry_t *
-workqueue_entry_new(int (*fn)(int, void*, void*),
+workqueue_entry_new(int (*fn)(void*, void*),
                     void (*reply_fn)(void*),
                     void *arg)
 {
@@ -95,6 +97,23 @@ workqueue_entry_free(workqueue_entry_t *ent)
   tor_free(ent);
 }
 
+int
+workqueue_entry_cancel(workqueue_entry_t *ent)
+{
+  int cancelled = 0;
+  tor_mutex_acquire(&ent->on_thread->lock);
+  if (ent->pending) {
+    TOR_TAILQ_REMOVE(&ent->on_thread->work, ent, next_work);
+    cancelled = 1;
+  }
+  tor_mutex_release(&ent->on_thread->lock);
+
+  if (cancelled) {
+    tor_free(ent);
+  }
+  return cancelled;
+}
+
 static void
 worker_thread_main(void *thread_)
 {
@@ -107,20 +126,17 @@ worker_thread_main(void *thread_)
   thread->is_running = 1;
   while (1) {
     /* lock held. */
-    while (!TOR_SIMPLEQ_EMPTY(&thread->work)) {
+    while (!TOR_TAILQ_EMPTY(&thread->work)) {
       /* lock held. */
 
-      work = TOR_SIMPLEQ_FIRST(&thread->work);
-      TOR_SIMPLEQ_REMOVE_HEAD(&thread->work, next_work);
+      work = TOR_TAILQ_FIRST(&thread->work);
+      TOR_TAILQ_REMOVE(&thread->work, work, next_work);
+      work->pending = 0;
       tor_mutex_release(&thread->lock);
 
-      result = work->fn(WQ_CMD_RUN, thread->state, work->arg);
+      result = work->fn(thread->state, work->arg);
 
-      if (result == WQ_RPL_QUEUE) {
-        queue_reply(thread->reply_queue, work);
-      } else {
-        workqueue_entry_free(work);
-      }
+      queue_reply(thread->reply_queue, work);
 
       tor_mutex_acquire(&thread->lock);
       if (result >= WQ_RPL_ERROR) {
@@ -148,8 +164,8 @@ queue_reply(replyqueue_t *queue, workqueue_entry_t *work)
 {
   int was_empty;
   tor_mutex_acquire(&queue->lock);
-  was_empty = TOR_SIMPLEQ_EMPTY(&queue->answers);
-  TOR_SIMPLEQ_INSERT_TAIL(&queue->answers, work, next_work);
+  was_empty = TOR_TAILQ_EMPTY(&queue->answers);
+  TOR_TAILQ_INSERT_TAIL(&queue->answers, work, next_work);
   tor_mutex_release(&queue->lock);
 
   if (was_empty) {
@@ -175,7 +191,7 @@ workerthread_new(void *state, replyqueue_t *replyqueue)
   workerthread_t *thr = tor_malloc_zero(sizeof(workerthread_t));
   tor_mutex_init_for_cond(&thr->lock);
   tor_cond_init(&thr->condition);
-  TOR_SIMPLEQ_INIT(&thr->work);
+  TOR_TAILQ_INIT(&thr->work);
   thr->state = state;
   thr->reply_queue = replyqueue;
 
@@ -187,9 +203,9 @@ workerthread_new(void *state, replyqueue_t *replyqueue)
   return thr;
 }
 
-void *
+workqueue_entry_t *
 threadpool_queue_work(threadpool_t *pool,
-                      int (*fn)(int, void *, void *),
+                      int (*fn)(void *, void *),
                       void (*reply_fn)(void *),
                       void *arg)
 {
@@ -206,11 +222,12 @@ threadpool_queue_work(threadpool_t *pool,
     pool->next_for_work = 0;
   tor_mutex_release(&pool->lock);
 
-
   ent = workqueue_entry_new(fn, reply_fn, arg);
 
   tor_mutex_acquire(&worker->lock);
-  TOR_SIMPLEQ_INSERT_TAIL(&worker->work, ent, next_work);
+  ent->on_thread = worker;
+  ent->pending = 1;
+  TOR_TAILQ_INSERT_TAIL(&worker->work, ent, next_work);
 
   if (worker->waiting) /* XXXX inside or outside of lock?? */
     tor_cond_signal_one(&worker->condition);
@@ -298,7 +315,7 @@ replyqueue_new(void)
   rq = tor_malloc_zero(sizeof(replyqueue_t));
 
   tor_mutex_init(&rq->lock);
-  TOR_SIMPLEQ_INIT(&rq->answers);
+  TOR_TAILQ_INIT(&rq->answers);
 
   rq->read_sock = pair[0];
   rq->write_sock = pair[1];
@@ -331,10 +348,10 @@ replyqueue_process(replyqueue_t *queue)
   /* XXXX freak out on r == 0, or r == "error, not retryable". */
 
   tor_mutex_acquire(&queue->lock);
-  while (!TOR_SIMPLEQ_EMPTY(&queue->answers)) {
+  while (!TOR_TAILQ_EMPTY(&queue->answers)) {
     /* lock held. */
-    workqueue_entry_t *work = TOR_SIMPLEQ_FIRST(&queue->answers);
-    TOR_SIMPLEQ_REMOVE_HEAD(&queue->answers, next_work);
+    workqueue_entry_t *work = TOR_TAILQ_FIRST(&queue->answers);
+    TOR_TAILQ_REMOVE(&queue->answers, work, next_work);
     tor_mutex_release(&queue->lock);
 
     work->reply_fn(work->arg);
@@ -345,3 +362,4 @@ replyqueue_process(replyqueue_t *queue)
 
   tor_mutex_release(&queue->lock);
 }
+
diff --git a/src/common/workqueue.h b/src/common/workqueue.h
index e502734..47753cf 100644
--- a/src/common/workqueue.h
+++ b/src/common/workqueue.h
@@ -8,20 +8,20 @@
 
 typedef struct replyqueue_s replyqueue_t;
 typedef struct threadpool_s threadpool_t;
-
+typedef struct workqueue_entry_s workqueue_entry_t;
 
 #define WQ_CMD_RUN    0
 #define WQ_CMD_CANCEL 1
 
-#define WQ_RPL_QUEUE    0
-#define WQ_RPL_NOQUEUE  1
-#define WQ_RPL_ERROR    2
-#define WQ_RPL_SHUTDOWN 3
+#define WQ_RPL_REPLY    0
+#define WQ_RPL_ERROR    1
+#define WQ_RPL_SHUTDOWN 2
 
-void *threadpool_queue_work(threadpool_t *pool,
-                            int (*fn)(int, void *, void *),
-                            void (*reply_fn)(void *),
-                            void *arg);
+workqueue_entry_t *threadpool_queue_work(threadpool_t *pool,
+                                         int (*fn)(void *, void *),
+                                         void (*reply_fn)(void *),
+                                         void *arg);
+int workqueue_entry_cancel(workqueue_entry_t *pending_work);
 int threadpool_start_threads(threadpool_t *pool, int n);
 threadpool_t *threadpool_new(int n_threads,
                              replyqueue_t *replyqueue,
diff --git a/src/test/bench_workqueue.c b/src/test/bench_workqueue.c
index 1bdfbef..f190c61 100644
--- a/src/test/bench_workqueue.c
+++ b/src/test/bench_workqueue.c
@@ -64,7 +64,7 @@ mark_handled(int serial)
 }
 
 static int
-workqueue_do_rsa(int cmd, void *state, void *work)
+workqueue_do_rsa(void *state, void *work)
 {
   rsa_work_t *rw = work;
   state_t *st = state;
@@ -74,16 +74,11 @@ workqueue_do_rsa(int cmd, void *state, void *work)
 
   tor_assert(st->magic == 13371337);
 
-  if (cmd == WQ_CMD_CANCEL) {
-    tor_free(work);
-    return WQ_RPL_NOQUEUE;
-  }
-
   len = crypto_pk_private_sign(rsa, (char*)sig, 256,
                                (char*)rw->msg, rw->msglen);
   if (len < 0) {
-    tor_free(work);
-    return WQ_RPL_NOQUEUE;
+    rw->msglen = 0;
+    return WQ_RPL_ERROR;
   }
 
   memset(rw->msg, 0, sizeof(rw->msg));
@@ -93,12 +88,12 @@ workqueue_do_rsa(int cmd, void *state, void *work)
 
   mark_handled(rw->serial);
 
-  return WQ_RPL_QUEUE;
+  return WQ_RPL_REPLY;
 }
 
 #if 0
 static int
-workqueue_do_shutdown(int cmd, void *state, void *work)
+workqueue_do_shutdown(void *state, void *work)
 {
   (void)state;
   (void)work;
@@ -110,7 +105,7 @@ workqueue_do_shutdown(int cmd, void *state, void *work)
 #endif
 
 static int
-workqueue_do_ecdh(int cmd, void *state, void *work)
+workqueue_do_ecdh(void *state, void *work)
 {
   ecdh_work_t *ew = work;
   uint8_t output[CURVE25519_OUTPUT_LEN];
@@ -118,16 +113,11 @@ workqueue_do_ecdh(int cmd, void *state, void *work)
 
   tor_assert(st->magic == 13371337);
 
-  if (cmd == WQ_CMD_CANCEL) {
-    tor_free(work);
-    return WQ_RPL_NOQUEUE;
-  }
-
   curve25519_handshake(output, &st->ecdh, &ew->u.pk);
   memcpy(ew->u.msg, output, CURVE25519_OUTPUT_LEN);
   ++st->n_handled;
   mark_handled(ew->serial);
-  return WQ_RPL_QUEUE;
+  return WQ_RPL_REPLY;
 }
 
 static void *



_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits