[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [tor/master] Refactor cpuworker to use workqueue/threadpool code.



commit 1e896214e7eb5ede65663486291252b171e9daea
Author: Nick Mathewson <nickm@xxxxxxxxxxxxxx>
Date:   Wed Oct 2 12:32:09 2013 -0400

    Refactor cpuworker to use workqueue/threadpool code.
---
 changes/better_workqueues |   10 +
 src/common/workqueue.c    |    4 +-
 src/common/workqueue.h    |    2 +-
 src/or/command.c          |    2 +-
 src/or/config.c           |    2 +-
 src/or/connection.c       |   24 +-
 src/or/cpuworker.c        |  532 +++++++++++++++++----------------------------
 src/or/cpuworker.h        |   10 +-
 src/or/main.c             |    6 +-
 src/or/onion.c            |   19 +-
 src/or/onion.h            |    4 +-
 src/or/or.h               |   17 +-
 12 files changed, 238 insertions(+), 394 deletions(-)

diff --git a/changes/better_workqueues b/changes/better_workqueues
new file mode 100644
index 0000000..32c984c
--- /dev/null
+++ b/changes/better_workqueues
@@ -0,0 +1,10 @@
+  o Major features:
+    - Refactor the CPU worker implementation for better performance by
+      avoiding the kernel and lengthening pipelines. The original
+      implementation used sockets to transfer data from the main thread
+      to the worker threads, and didn't allow any thread to be assigned
+      more than a single piece of work at once. The new implementation
+      avoids communications overhead by making requests in shared
+      memory, avoiding kernel IO where possible, and keeping more
+      request in flight at once. Resolves issue #9682.
+
diff --git a/src/common/workqueue.c b/src/common/workqueue.c
index 44cf98d..f3ef678 100644
--- a/src/common/workqueue.c
+++ b/src/common/workqueue.c
@@ -108,6 +108,7 @@ workqueue_entry_free(workqueue_entry_t *ent)
 {
   if (!ent)
     return;
+  memset(ent, 0xf0, sizeof(*ent));
   tor_free(ent);
 }
 
@@ -310,7 +311,7 @@ threadpool_queue_work(threadpool_t *pool,
  */
 int
 threadpool_queue_for_all(threadpool_t *pool,
-                         void *(*dup_fn)(const void *),
+                         void *(*dup_fn)(void *),
                          int (*fn)(void *, void *),
                          void (*reply_fn)(void *),
                          void *arg)
@@ -444,6 +445,7 @@ replyqueue_process(replyqueue_t *queue)
     workqueue_entry_t *work = TOR_TAILQ_FIRST(&queue->answers);
     TOR_TAILQ_REMOVE(&queue->answers, work, next_work);
     tor_mutex_release(&queue->lock);
+    work->on_thread = NULL;
 
     work->reply_fn(work->arg);
     workqueue_entry_free(work);
diff --git a/src/common/workqueue.h b/src/common/workqueue.h
index aa8620d..aa1bcc5 100644
--- a/src/common/workqueue.h
+++ b/src/common/workqueue.h
@@ -28,7 +28,7 @@ workqueue_entry_t *threadpool_queue_work(threadpool_t *pool,
                                          void (*reply_fn)(void *),
                                          void *arg);
 int threadpool_queue_for_all(threadpool_t *pool,
-                             void *(*dup_fn)(const void *),
+                             void *(*dup_fn)(void *),
                              int (*fn)(void *, void *),
                              void (*reply_fn)(void *),
                              void *arg);
diff --git a/src/or/command.c b/src/or/command.c
index 6dde2a9..c4a0f9b 100644
--- a/src/or/command.c
+++ b/src/or/command.c
@@ -310,7 +310,7 @@ command_process_create_cell(cell_t *cell, channel_t *chan)
     /* hand it off to the cpuworkers, and then return. */
     if (connection_or_digest_is_known_relay(chan->identity_digest))
       rep_hist_note_circuit_handshake_requested(create_cell->handshake_type);
-    if (assign_onionskin_to_cpuworker(NULL, circ, create_cell) < 0) {
+    if (assign_onionskin_to_cpuworker(circ, create_cell) < 0) {
       log_debug(LD_GENERAL,"Failed to hand off onionskin. Closing.");
       circuit_mark_for_close(TO_CIRCUIT(circ), END_CIRC_REASON_RESOURCELIMIT);
       return;
diff --git a/src/or/config.c b/src/or/config.c
index 5db065f..5b8560b 100644
--- a/src/or/config.c
+++ b/src/or/config.c
@@ -1729,7 +1729,7 @@ options_act(const or_options_t *old_options)
         if (have_completed_a_circuit() || !any_predicted_circuits(time(NULL)))
           inform_testing_reachability();
       }
-      cpuworkers_rotate();
+      cpuworkers_rotate_keyinfo();
       if (dns_reset())
         return -1;
     } else {
diff --git a/src/or/connection.c b/src/or/connection.c
index 11ff224..1b7426b 100644
--- a/src/or/connection.c
+++ b/src/or/connection.c
@@ -29,7 +29,6 @@
 #include "connection_edge.h"
 #include "connection_or.h"
 #include "control.h"
-#include "cpuworker.h"
 #include "directory.h"
 #include "dirserv.h"
 #include "dns.h"
@@ -130,7 +129,6 @@ conn_type_to_string(int type)
     case CONN_TYPE_AP: return "Socks";
     case CONN_TYPE_DIR_LISTENER: return "Directory listener";
     case CONN_TYPE_DIR: return "Directory";
-    case CONN_TYPE_CPUWORKER: return "CPU worker";
     case CONN_TYPE_CONTROL_LISTENER: return "Control listener";
     case CONN_TYPE_CONTROL: return "Control";
     case CONN_TYPE_EXT_OR: return "Extended OR";
@@ -213,12 +211,6 @@ conn_state_to_string(int type, int state)
         case DIR_CONN_STATE_SERVER_WRITING: return "writing";
       }
       break;
-    case CONN_TYPE_CPUWORKER:
-      switch (state) {
-        case CPUWORKER_STATE_IDLE: return "idle";
-        case CPUWORKER_STATE_BUSY_ONION: return "busy with onion";
-      }
-      break;
     case CONN_TYPE_CONTROL:
       switch (state) {
         case CONTROL_CONN_STATE_OPEN: return "open (protocol v1)";
@@ -248,7 +240,6 @@ connection_type_uses_bufferevent(connection_t *conn)
     case CONN_TYPE_CONTROL:
     case CONN_TYPE_OR:
     case CONN_TYPE_EXT_OR:
-    case CONN_TYPE_CPUWORKER:
       return 1;
     default:
       return 0;
@@ -2436,7 +2427,6 @@ connection_mark_all_noncontrol_connections(void)
     if (conn->marked_for_close)
       continue;
     switch (conn->type) {
-      case CONN_TYPE_CPUWORKER:
       case CONN_TYPE_CONTROL_LISTENER:
       case CONN_TYPE_CONTROL:
         break;
@@ -4530,8 +4520,6 @@ connection_process_inbuf(connection_t *conn, int package_partial)
                                            package_partial);
     case CONN_TYPE_DIR:
       return connection_dir_process_inbuf(TO_DIR_CONN(conn));
-    case CONN_TYPE_CPUWORKER:
-      return connection_cpu_process_inbuf(conn);
     case CONN_TYPE_CONTROL:
       return connection_control_process_inbuf(TO_CONTROL_CONN(conn));
     default:
@@ -4591,8 +4579,6 @@ connection_finished_flushing(connection_t *conn)
       return connection_edge_finished_flushing(TO_EDGE_CONN(conn));
     case CONN_TYPE_DIR:
       return connection_dir_finished_flushing(TO_DIR_CONN(conn));
-    case CONN_TYPE_CPUWORKER:
-      return connection_cpu_finished_flushing(conn);
     case CONN_TYPE_CONTROL:
       return connection_control_finished_flushing(TO_CONTROL_CONN(conn));
     default:
@@ -4648,8 +4634,6 @@ connection_reached_eof(connection_t *conn)
       return connection_edge_reached_eof(TO_EDGE_CONN(conn));
     case CONN_TYPE_DIR:
       return connection_dir_reached_eof(TO_DIR_CONN(conn));
-    case CONN_TYPE_CPUWORKER:
-      return connection_cpu_reached_eof(conn);
     case CONN_TYPE_CONTROL:
       return connection_control_reached_eof(TO_CONTROL_CONN(conn));
     default:
@@ -4855,10 +4839,6 @@ assert_connection_ok(connection_t *conn, time_t now)
       tor_assert(conn->purpose >= DIR_PURPOSE_MIN_);
       tor_assert(conn->purpose <= DIR_PURPOSE_MAX_);
       break;
-    case CONN_TYPE_CPUWORKER:
-      tor_assert(conn->state >= CPUWORKER_STATE_MIN_);
-      tor_assert(conn->state <= CPUWORKER_STATE_MAX_);
-      break;
     case CONN_TYPE_CONTROL:
       tor_assert(conn->state >= CONTROL_CONN_STATE_MIN_);
       tor_assert(conn->state <= CONTROL_CONN_STATE_MAX_);
@@ -4959,9 +4939,7 @@ proxy_type_to_string(int proxy_type)
 }
 
 /** Call connection_free_() on every connection in our array, and release all
- * storage held by connection.c. This is used by cpuworkers and dnsworkers
- * when they fork, so they don't keep resources held open (especially
- * sockets).
+ * storage held by connection.c.
  *
  * Don't do the checks in connection_free(), because they will
  * fail.
diff --git a/src/or/cpuworker.c b/src/or/cpuworker.c
index 340fbec..f3f275d 100644
--- a/src/or/cpuworker.c
+++ b/src/or/cpuworker.c
@@ -5,84 +5,98 @@
 
 /**
  * \file cpuworker.c
- * \brief Implements a farm of 'CPU worker' processes to perform
- * CPU-intensive tasks in another thread or process, to not
- * interrupt the main thread.
+ * \brief Uses the workqueue/threadpool code to farm CPU-intensive activities
+ * out to subprocesses.
  *
  * Right now, we only use this for processing onionskins.
  **/
 #include "or.h"
-#include "buffers.h"
 #include "channel.h"
-#include "channeltls.h"
 #include "circuitbuild.h"
 #include "circuitlist.h"
-#include "config.h"
-#include "connection.h"
 #include "connection_or.h"
+#include "config.h"
 #include "cpuworker.h"
 #include "main.h"
 #include "onion.h"
 #include "rephist.h"
 #include "router.h"
+#include "workqueue.h"
 
-/** The maximum number of cpuworker processes we will keep around. */
-#define MAX_CPUWORKERS 16
-/** The minimum number of cpuworker processes we will keep around. */
-#define MIN_CPUWORKERS 1
-
-/** The tag specifies which circuit this onionskin was from. */
-#define TAG_LEN 12
+#ifdef HAVE_EVENT2_EVENT_H
+#include <event2/event.h>
+#else
+#include <event.h>
+#endif
 
-/** How many cpuworkers we have running right now. */
-static int num_cpuworkers=0;
-/** How many of the running cpuworkers have an assigned task right now. */
-static int num_cpuworkers_busy=0;
-/** We need to spawn new cpuworkers whenever we rotate the onion keys
- * on platforms where execution contexts==processes.  This variable stores
- * the last time we got a key rotation event. */
-static time_t last_rotation_time=0;
+static void queue_pending_tasks(void);
 
-static void cpuworker_main(void *data) ATTR_NORETURN;
-static int spawn_cpuworker(void);
-static void spawn_enough_cpuworkers(void);
-static void process_pending_task(connection_t *cpuworker);
+typedef struct worker_state_s {
+  int generation;
+  server_onion_keys_t *onion_keys;
+} worker_state_t;
 
-/** Initialize the cpuworker subsystem.
- */
-void
-cpu_init(void)
+static void *
+worker_state_new(void *arg)
 {
-  cpuworkers_rotate();
+  worker_state_t *ws;
+  (void)arg;
+  ws = tor_malloc_zero(sizeof(worker_state_t));
+  ws->onion_keys = server_onion_keys_new();
+  return ws;
 }
-
-/** Called when we're done sending a request to a cpuworker. */
-int
-connection_cpu_finished_flushing(connection_t *conn)
+static void
+worker_state_free(void *arg)
 {
-  tor_assert(conn);
-  tor_assert(conn->type == CONN_TYPE_CPUWORKER);
-  return 0;
+  worker_state_t *ws = arg;
+  server_onion_keys_free(ws->onion_keys);
+  tor_free(ws);
 }
 
-/** Pack global_id and circ_id; set *tag to the result. (See note on
- * cpuworker_main for wire format.) */
+static replyqueue_t *replyqueue = NULL;
+static threadpool_t *threadpool = NULL;
+static struct event *reply_event = NULL;
+
+static tor_weak_rng_t request_sample_rng = TOR_WEAK_RNG_INIT;
+
+static int total_pending_tasks = 0;
+static int max_pending_tasks = 128;
+
 static void
-tag_pack(uint8_t *tag, uint64_t chan_id, circid_t circ_id)
+replyqueue_process_cb(evutil_socket_t sock, short events, void *arg)
 {
-  /*XXXX RETHINK THIS WHOLE MESS !!!! !NM NM NM NM*/
-  /*XXXX DOUBLEPLUSTHIS!!!! AS AS AS AS*/
-  set_uint64(tag, chan_id);
-  set_uint32(tag+8, circ_id);
+  replyqueue_t *rq = arg;
+  (void) sock;
+  (void) events;
+  replyqueue_process(rq);
 }
 
-/** Unpack <b>tag</b> into addr, port, and circ_id.
+/** Initialize the cpuworker subsystem.
  */
-static void
-tag_unpack(const uint8_t *tag, uint64_t *chan_id, circid_t *circ_id)
+void
+cpu_init(void)
 {
-  *chan_id = get_uint64(tag);
-  *circ_id = get_uint32(tag+8);
+  if (!replyqueue) {
+    replyqueue = replyqueue_new(0);
+  }
+  if (!reply_event) {
+    reply_event = tor_event_new(tor_libevent_get_base(),
+                                replyqueue_get_socket(replyqueue),
+                                EV_READ|EV_PERSIST,
+                                replyqueue_process_cb,
+                                replyqueue);
+    event_add(reply_event, NULL);
+  }
+  if (!threadpool) {
+    threadpool = threadpool_new(get_num_cpus(get_options()),
+                                replyqueue,
+                                worker_state_new,
+                                worker_state_free,
+                                NULL);
+  }
+  /* Total voodoo. Can we make this more sensible? */
+  max_pending_tasks = get_num_cpus(get_options()) * 64;
+  crypto_seed_weak_rng(&request_sample_rng);
 }
 
 /** Magic numbers to make sure our cpuworker_requests don't grow any
@@ -94,10 +108,6 @@ tag_unpack(const uint8_t *tag, uint64_t *chan_id, circid_t *circ_id)
 typedef struct cpuworker_request_t {
   /** Magic number; must be CPUWORKER_REQUEST_MAGIC. */
   uint32_t magic;
-  /** Opaque tag to identify the job */
-  uint8_t tag[TAG_LEN];
-  /** Task code. Must be one of CPUWORKER_TASK_* */
-  uint8_t task;
 
   /** Flag: Are we timing this request? */
   unsigned timed : 1;
@@ -114,8 +124,7 @@ typedef struct cpuworker_request_t {
 typedef struct cpuworker_reply_t {
   /** Magic number; must be CPUWORKER_REPLY_MAGIC. */
   uint32_t magic;
-  /** Opaque tag to identify the job; matches the request's tag.*/
-  uint8_t tag[TAG_LEN];
+
   /** True iff we got a successful request. */
   uint8_t success;
 
@@ -142,42 +151,46 @@ typedef struct cpuworker_reply_t {
   uint8_t rend_auth_material[DIGEST_LEN];
 } cpuworker_reply_t;
 
+typedef struct cpuworker_job_u {
+  uint64_t chan_id;
+  uint32_t circ_id;
+  union {
+    cpuworker_request_t request;
+    cpuworker_reply_t reply;
+  } u;
+} cpuworker_job_t;
+
+static int
+update_state_threadfn(void *state_, void *work_)
+{
+  worker_state_t *state = state_;
+  worker_state_t *update = work_;
+  server_onion_keys_free(state->onion_keys);
+  state->onion_keys = update->onion_keys;
+  update->onion_keys = NULL;
+  ++state->generation;
+  return WQ_RPL_REPLY;
+}
+static void
+update_state_replyfn(void *work_)
+{
+  tor_free(work_);
+}
+
 /** Called when the onion key has changed and we need to spawn new
  * cpuworkers.  Close all currently idle cpuworkers, and mark the last
  * rotation time as now.
  */
 void
-cpuworkers_rotate(void)
+cpuworkers_rotate_keyinfo(void)
 {
-  connection_t *cpuworker;
-  while ((cpuworker = connection_get_by_type_state(CONN_TYPE_CPUWORKER,
-                                                   CPUWORKER_STATE_IDLE))) {
-    connection_mark_for_close(cpuworker);
-    --num_cpuworkers;
+  if (threadpool_queue_for_all(threadpool,
+                               worker_state_new,
+                               update_state_threadfn,
+                               update_state_replyfn,
+                               NULL)) {
+    log_warn(LD_OR, "Failed to queue key update for worker threads.");
   }
-  last_rotation_time = time(NULL);
-  if (server_mode(get_options()))
-    spawn_enough_cpuworkers();
-}
-
-/** If the cpuworker closes the connection,
- * mark it as closed and spawn a new one as needed. */
-int
-connection_cpu_reached_eof(connection_t *conn)
-{
-  log_warn(LD_GENERAL,"Read eof. CPU worker died unexpectedly.");
-  if (conn->state != CPUWORKER_STATE_IDLE) {
-    /* the circ associated with this cpuworker will have to wait until
-     * it gets culled in run_connection_housekeeping(), since we have
-     * no way to find out which circ it was. */
-    log_warn(LD_GENERAL,"...and it left a circuit queued; abandoning circ.");
-    num_cpuworkers_busy--;
-  }
-  num_cpuworkers--;
-  spawn_enough_cpuworkers(); /* try to regrow. hope we don't end up
-                                spinning. */
-  connection_mark_for_close(conn);
-  return 0;
 }
 
 /** Indexed by handshake type: how many onionskins have we processed and
@@ -197,8 +210,6 @@ static uint64_t onionskins_usec_roundtrip[MAX_ONION_HANDSHAKE_TYPE+1];
  * time. (microseconds) */
 #define MAX_BELIEVABLE_ONIONSKIN_DELAY (2*1000*1000)
 
-static tor_weak_rng_t request_sample_rng = TOR_WEAK_RNG_INIT;
-
 /** Return true iff we'd like to measure a handshake of type
  * <b>onionskin_type</b>. Call only from the main thread. */
 static int
@@ -286,31 +297,22 @@ cpuworker_log_onionskin_overhead(int severity, int onionskin_type,
          onionskin_type_name, (unsigned)overhead, relative_overhead*100);
 }
 
-/** Called when we get data from a cpuworker.  If the answer is not complete,
- * wait for a complete answer. If the answer is complete,
- * process it as appropriate.
- */
-int
-connection_cpu_process_inbuf(connection_t *conn)
+/** */
+static void
+cpuworker_onion_handshake_replyfn(void *work_)
 {
+  cpuworker_job_t *job = work_;
+  cpuworker_reply_t rpl;
   uint64_t chan_id;
   circid_t circ_id;
   channel_t *p_chan = NULL;
-  circuit_t *circ;
-
-  tor_assert(conn);
-  tor_assert(conn->type == CONN_TYPE_CPUWORKER);
+  circuit_t *circ = NULL;
 
-  if (!connection_get_inbuf_len(conn))
-    return 0;
-
-  if (conn->state == CPUWORKER_STATE_BUSY_ONION) {
-    cpuworker_reply_t rpl;
-    if (connection_get_inbuf_len(conn) < sizeof(cpuworker_reply_t))
-      return 0; /* not yet */
-    tor_assert(connection_get_inbuf_len(conn) == sizeof(cpuworker_reply_t));
+  --total_pending_tasks;
 
-    connection_fetch_from_buf((void*)&rpl,sizeof(cpuworker_reply_t),conn);
+  if (1) {
+    /* Could avoid this, but doesn't matter. */
+    memcpy(&rpl, &job->u.reply, sizeof(rpl));
 
     tor_assert(rpl.magic == CPUWORKER_REPLY_MAGIC);
 
@@ -337,18 +339,21 @@ connection_cpu_process_inbuf(connection_t *conn)
         }
       }
     }
-    /* parse out the circ it was talking about */
-    tag_unpack(rpl.tag, &chan_id, &circ_id);
-    circ = NULL;
-    log_debug(LD_OR,
-              "Unpacking cpuworker reply, chan_id is " U64_FORMAT
-              ", circ_id is %u",
-              U64_PRINTF_ARG(chan_id), (unsigned)circ_id);
+    /* Find the circ it was talking about */
+    chan_id = job->chan_id;
+    circ_id = job->circ_id;
+
     p_chan = channel_find_by_global_id(chan_id);
 
     if (p_chan)
       circ = circuit_get_by_circid_channel(circ_id, p_chan);
 
+    log_debug(LD_OR,
+              "Unpacking cpuworker reply %p, chan_id is " U64_FORMAT
+              ", circ_id is %u, p_chan=%p, circ=%p, success=%d",
+              job, U64_PRINTF_ARG(chan_id), (unsigned)circ_id,
+              p_chan, circ, rpl.success);
+
     if (rpl.success == 0) {
       log_debug(LD_OR,
                 "decoding onionskin failed. "
@@ -367,6 +372,7 @@ connection_cpu_process_inbuf(connection_t *conn)
       goto done_processing;
     }
     tor_assert(! CIRCUIT_IS_ORIGIN(circ));
+    TO_OR_CIRCUIT(circ)->workqueue_entry = NULL;
     if (onionskin_answer(TO_OR_CIRCUIT(circ),
                          &rpl.created_cell,
                          (const char*)rpl.keys,
@@ -376,58 +382,33 @@ connection_cpu_process_inbuf(connection_t *conn)
       goto done_processing;
     }
     log_debug(LD_OR,"onionskin_answer succeeded. Yay.");
-  } else {
-    tor_assert(0); /* don't ask me to do handshakes yet */
   }
 
  done_processing:
-  conn->state = CPUWORKER_STATE_IDLE;
-  num_cpuworkers_busy--;
-  if (conn->timestamp_created < last_rotation_time) {
-    connection_mark_for_close(conn);
-    num_cpuworkers--;
-    spawn_enough_cpuworkers();
-  } else {
-    process_pending_task(conn);
-  }
-  return 0;
+  memwipe(&rpl, 0, sizeof(rpl));
+  memwipe(job, 0, sizeof(*job));
+  tor_free(job);
+  queue_pending_tasks();
 }
 
-/** Implement a cpuworker.  'data' is an fdarray as returned by socketpair.
- * Read and writes from fdarray[1].  Reads requests, writes answers.
- *
- *   Request format:
- *          cpuworker_request_t.
- *   Response format:
- *          cpuworker_reply_t
- */
-static void
-cpuworker_main(void *data)
+/** Implementation function for onion handshake requests. */
+static int
+cpuworker_onion_handshake_threadfn(void *state_, void *work_)
 {
-  /* For talking to the parent thread/process */
-  tor_socket_t *fdarray = data;
-  tor_socket_t fd;
+  worker_state_t *state = state_;
+  cpuworker_job_t *job = work_;
 
   /* variables for onion processing */
-  server_onion_keys_t onion_keys;
+  server_onion_keys_t *onion_keys = state->onion_keys;
   cpuworker_request_t req;
   cpuworker_reply_t rpl;
 
-  fd = fdarray[1]; /* this side is ours */
-  tor_free(data);
-
-  setup_server_onion_keys(&onion_keys);
-
-  for (;;) {
-    if (read_all(fd, (void *)&req, sizeof(req), 1) != sizeof(req)) {
-      log_info(LD_OR, "read request failed. Exiting.");
-      goto end;
-    }
-    tor_assert(req.magic == CPUWORKER_REQUEST_MAGIC);
+  memcpy(&req, &job->u.request, sizeof(req));
 
-    memset(&rpl, 0, sizeof(rpl));
+  tor_assert(req.magic == CPUWORKER_REQUEST_MAGIC);
+  memset(&rpl, 0, sizeof(rpl));
 
-    if (req.task == CPUWORKER_TASK_ONION) {
+  if (1) {
       const create_cell_t *cc = &req.create_cell;
       created_cell_t *cell_out = &rpl.created_cell;
       struct timeval tv_start = {0,0}, tv_end;
@@ -439,7 +420,7 @@ cpuworker_main(void *data)
         tor_gettimeofday(&tv_start);
       n = onion_skin_server_handshake(cc->handshake_type,
                                       cc->onionskin, cc->handshake_len,
-                                      &onion_keys,
+                                      onion_keys,
                                       cell_out->reply,
                                       rpl.keys, CPATH_KEY_MATERIAL_LEN,
                                       rpl.rend_auth_material);
@@ -447,12 +428,10 @@ cpuworker_main(void *data)
         /* failure */
         log_debug(LD_OR,"onion_skin_server_handshake failed.");
         memset(&rpl, 0, sizeof(rpl));
-        memcpy(rpl.tag, req.tag, TAG_LEN);
         rpl.success = 0;
       } else {
         /* success */
         log_debug(LD_OR,"onion_skin_server_handshake succeeded.");
-        memcpy(rpl.tag, req.tag, TAG_LEN);
         cell_out->handshake_len = n;
         switch (cc->cell_type) {
         case CELL_CREATE:
@@ -463,7 +442,7 @@ cpuworker_main(void *data)
           cell_out->cell_type = CELL_CREATED_FAST; break;
         default:
           tor_assert(0);
-          goto end;
+          return WQ_RPL_SHUTDOWN;
         }
         rpl.success = 1;
       }
@@ -479,187 +458,55 @@ cpuworker_main(void *data)
         else
           rpl.n_usec = (uint32_t) usec;
       }
-      if (write_all(fd, (void*)&rpl, sizeof(rpl), 1) != sizeof(rpl)) {
-        log_err(LD_BUG,"writing response buf failed. Exiting.");
-        goto end;
-      }
-      log_debug(LD_OR,"finished writing response.");
-    } else if (req.task == CPUWORKER_TASK_SHUTDOWN) {
-      log_info(LD_OR,"Clean shutdown: exiting");
-      goto end;
-    }
-    memwipe(&req, 0, sizeof(req));
-    memwipe(&rpl, 0, sizeof(req));
-  }
- end:
-  memwipe(&req, 0, sizeof(req));
-  memwipe(&rpl, 0, sizeof(req));
-  release_server_onion_keys(&onion_keys);
-  tor_close_socket(fd);
-  crypto_thread_cleanup();
-  spawn_exit();
-}
-
-/** Launch a new cpuworker. Return 0 if we're happy, -1 if we failed.
- */
-static int
-spawn_cpuworker(void)
-{
-  tor_socket_t *fdarray;
-  tor_socket_t fd;
-  connection_t *conn;
-  int err;
-
-  fdarray = tor_calloc(2, sizeof(tor_socket_t));
-  if ((err = tor_socketpair(AF_UNIX, SOCK_STREAM, 0, fdarray)) < 0) {
-    log_warn(LD_NET, "Couldn't construct socketpair for cpuworker: %s",
-             tor_socket_strerror(-err));
-    tor_free(fdarray);
-    return -1;
-  }
-
-  tor_assert(SOCKET_OK(fdarray[0]));
-  tor_assert(SOCKET_OK(fdarray[1]));
-
-  fd = fdarray[0];
-  if (spawn_func(cpuworker_main, (void*)fdarray) < 0) {
-    tor_close_socket(fdarray[0]);
-    tor_close_socket(fdarray[1]);
-    tor_free(fdarray);
-    return -1;
   }
-  log_debug(LD_OR,"just spawned a cpu worker.");
-
-  conn = connection_new(CONN_TYPE_CPUWORKER, AF_UNIX);
-
-  /* set up conn so it's got all the data we need to remember */
-  conn->s = fd;
-  conn->address = tor_strdup("localhost");
-  tor_addr_make_unspec(&conn->addr);
-
-  if (set_socket_nonblocking(fd) == -1) {
-    connection_free(conn); /* this closes fd */
-    return -1;
-  }
-
-  if (connection_add(conn) < 0) { /* no space, forget it */
-    log_warn(LD_NET,"connection_add for cpuworker failed. Giving up.");
-    connection_free(conn); /* this closes fd */
-    return -1;
-  }
-
-  conn->state = CPUWORKER_STATE_IDLE;
-  connection_start_reading(conn);
 
-  return 0; /* success */
-}
-
-/** If we have too few or too many active cpuworkers, try to spawn new ones
- * or kill idle ones.
- */
-static void
-spawn_enough_cpuworkers(void)
-{
-  int num_cpuworkers_needed = get_num_cpus(get_options());
-  int reseed = 0;
-
-  if (num_cpuworkers_needed < MIN_CPUWORKERS)
-    num_cpuworkers_needed = MIN_CPUWORKERS;
-  if (num_cpuworkers_needed > MAX_CPUWORKERS)
-    num_cpuworkers_needed = MAX_CPUWORKERS;
-
-  while (num_cpuworkers < num_cpuworkers_needed) {
-    if (spawn_cpuworker() < 0) {
-      log_warn(LD_GENERAL,"Cpuworker spawn failed. Will try again later.");
-      return;
-    }
-    num_cpuworkers++;
-    reseed++;
-  }
+  memcpy(&job->u.reply, &rpl, sizeof(rpl));
 
-  if (reseed)
-    crypto_seed_weak_rng(&request_sample_rng);
+  memwipe(&req, 0, sizeof(req));
+  memwipe(&rpl, 0, sizeof(req));
+  return WQ_RPL_REPLY;
 }
 
-/** Take a pending task from the queue and assign it to 'cpuworker'. */
+/** Take pending tasks from the queue and assign them to cpuworkers. */
 static void
-process_pending_task(connection_t *cpuworker)
+queue_pending_tasks(void)
 {
   or_circuit_t *circ;
   create_cell_t *onionskin = NULL;
 
-  tor_assert(cpuworker);
+  while (total_pending_tasks < max_pending_tasks) {
+    circ = onion_next_task(&onionskin);
 
-  /* for now only process onion tasks */
-
-  circ = onion_next_task(&onionskin);
-  if (!circ)
-    return;
-  if (assign_onionskin_to_cpuworker(cpuworker, circ, onionskin))
-    log_warn(LD_OR,"assign_to_cpuworker failed. Ignoring.");
-}
-
-/** How long should we let a cpuworker stay busy before we give
- * up on it and decide that we have a bug or infinite loop?
- * This value is high because some servers with low memory/cpu
- * sometimes spend an hour or more swapping, and Tor starves. */
-#define CPUWORKER_BUSY_TIMEOUT (60*60*12)
+    if (!circ)
+      return;
 
-/** We have a bug that I can't find. Sometimes, very rarely, cpuworkers get
- * stuck in the 'busy' state, even though the cpuworker process thinks of
- * itself as idle. I don't know why. But here's a workaround to kill any
- * cpuworker that's been busy for more than CPUWORKER_BUSY_TIMEOUT.
- */
-static void
-cull_wedged_cpuworkers(void)
-{
-  time_t now = time(NULL);
-  smartlist_t *conns = get_connection_array();
-  SMARTLIST_FOREACH_BEGIN(conns, connection_t *, conn) {
-    if (!conn->marked_for_close &&
-        conn->type == CONN_TYPE_CPUWORKER &&
-        conn->state == CPUWORKER_STATE_BUSY_ONION &&
-        conn->timestamp_lastwritten + CPUWORKER_BUSY_TIMEOUT < now) {
-      log_notice(LD_BUG,
-                 "closing wedged cpuworker. Can somebody find the bug?");
-      num_cpuworkers_busy--;
-      num_cpuworkers--;
-      connection_mark_for_close(conn);
-    }
-  } SMARTLIST_FOREACH_END(conn);
+    if (assign_onionskin_to_cpuworker(circ, onionskin))
+      log_warn(LD_OR,"assign_to_cpuworker failed. Ignoring.");
+  }
 }
 
 /** Try to tell a cpuworker to perform the public key operations necessary to
  * respond to <b>onionskin</b> for the circuit <b>circ</b>.
  *
- * If <b>cpuworker</b> is defined, assert that he's idle, and use him. Else,
- * look for an idle cpuworker and use him. If none idle, queue task onto the
- * pending onion list and return.  Return 0 if we successfully assign the
- * task, or -1 on failure.
+ * Return 0 if we successfully assign the task, or -1 on failure.
  */
 int
-assign_onionskin_to_cpuworker(connection_t *cpuworker,
-                              or_circuit_t *circ,
+assign_onionskin_to_cpuworker(or_circuit_t *circ,
                               create_cell_t *onionskin)
 {
+  workqueue_entry_t *queue_entry;
+  cpuworker_job_t *job;
   cpuworker_request_t req;
-  time_t now = approx_time();
-  static time_t last_culled_cpuworkers = 0;
   int should_time;
 
-  /* Checking for wedged cpuworkers requires a linear search over all
-   * connections, so let's do it only once a minute.
-   */
-#define CULL_CPUWORKERS_INTERVAL 60
-
-  if (last_culled_cpuworkers + CULL_CPUWORKERS_INTERVAL <= now) {
-    cull_wedged_cpuworkers();
-    spawn_enough_cpuworkers();
-    last_culled_cpuworkers = now;
-  }
-
   if (1) {
-    if (num_cpuworkers_busy == num_cpuworkers) {
+    if (!circ->p_chan) {
+      log_info(LD_OR,"circ->p_chan gone. Failing circ.");
+      tor_free(onionskin);
+      return -1;
+    }
+
+    if (total_pending_tasks >= max_pending_tasks) {
       log_debug(LD_OR,"No idle cpuworkers. Queuing.");
       if (onion_pending_add(circ, onionskin) < 0) {
         tor_free(onionskin);
@@ -668,36 +515,14 @@ assign_onionskin_to_cpuworker(connection_t *cpuworker,
       return 0;
     }
 
-    if (!cpuworker)
-      cpuworker = connection_get_by_type_state(CONN_TYPE_CPUWORKER,
-                                               CPUWORKER_STATE_IDLE);
-
-    tor_assert(cpuworker);
-
-    if (!circ->p_chan) {
-      log_info(LD_OR,"circ->p_chan gone. Failing circ.");
-      tor_free(onionskin);
-      return -1;
-    }
-
     if (connection_or_digest_is_known_relay(circ->p_chan->identity_digest))
       rep_hist_note_circuit_handshake_assigned(onionskin->handshake_type);
 
     should_time = should_time_request(onionskin->handshake_type);
     memset(&req, 0, sizeof(req));
     req.magic = CPUWORKER_REQUEST_MAGIC;
-    tag_pack(req.tag, circ->p_chan->global_identifier,
-             circ->p_circ_id);
     req.timed = should_time;
 
-    cpuworker->state = CPUWORKER_STATE_BUSY_ONION;
-    /* touch the lastwritten timestamp, since that's how we check to
-     * see how long it's been since we asked the question, and sometimes
-     * we check before the first call to connection_handle_write(). */
-    cpuworker->timestamp_lastwritten = now;
-    num_cpuworkers_busy++;
-
-    req.task = CPUWORKER_TASK_ONION;
     memcpy(&req.create_cell, onionskin, sizeof(create_cell_t));
 
     tor_free(onionskin);
@@ -705,9 +530,46 @@ assign_onionskin_to_cpuworker(connection_t *cpuworker,
     if (should_time)
       tor_gettimeofday(&req.started_at);
 
-    connection_write_to_buf((void*)&req, sizeof(req), cpuworker);
+    job = tor_malloc_zero(sizeof(cpuworker_job_t));
+    job->chan_id = circ->p_chan->global_identifier;
+    job->circ_id = circ->p_circ_id;
+    memcpy(&job->u.request, &req, sizeof(req));
     memwipe(&req, 0, sizeof(req));
+
+    ++total_pending_tasks;
+    queue_entry = threadpool_queue_work(threadpool,
+                                        cpuworker_onion_handshake_threadfn,
+                                        cpuworker_onion_handshake_replyfn,
+                                        job);
+    if (!queue_entry) {
+      log_warn(LD_BUG, "Couldn't queue work on threadpool");
+      tor_free(job);
+      return -1;
+    }
+    log_debug(LD_OR, "Queued task %p (qe=%p, chanid="U64_FORMAT", circid=%u)",
+              job, queue_entry, U64_PRINTF_ARG(job->chan_id), job->circ_id);
+
+    circ->workqueue_entry = queue_entry;
   }
   return 0;
 }
 
+/** If <b>circ</b> has a pending handshake that hasn't been processed yet,
+ * remove it from the worker queue. */
+void
+cpuworker_cancel_circ_handshake(or_circuit_t *circ)
+{
+  cpuworker_job_t *job;
+  if (circ->workqueue_entry == NULL)
+    return;
+
+  job = workqueue_entry_cancel(circ->workqueue_entry);
+  if (job) {
+    /* It successfully cancelled. */
+    memwipe(job, 0xe0, sizeof(*job));
+    tor_free(job);
+  }
+
+  circ->workqueue_entry = NULL;
+}
+
diff --git a/src/or/cpuworker.h b/src/or/cpuworker.h
index 2a2b37a..70a595e 100644
--- a/src/or/cpuworker.h
+++ b/src/or/cpuworker.h
@@ -13,19 +13,17 @@
 #define TOR_CPUWORKER_H
 
 void cpu_init(void);
-void cpuworkers_rotate(void);
-int connection_cpu_finished_flushing(connection_t *conn);
-int connection_cpu_reached_eof(connection_t *conn);
-int connection_cpu_process_inbuf(connection_t *conn);
+void cpuworkers_rotate_keyinfo(void);
+
 struct create_cell_t;
-int assign_onionskin_to_cpuworker(connection_t *cpuworker,
-                                  or_circuit_t *circ,
+int assign_onionskin_to_cpuworker(or_circuit_t *circ,
                                   struct create_cell_t *onionskin);
 
 uint64_t estimated_usec_for_onionskins(uint32_t n_requests,
                                        uint16_t onionskin_type);
 void cpuworker_log_onionskin_overhead(int severity, int onionskin_type,
                                       const char *onionskin_type_name);
+void cpuworker_cancel_circ_handshake(or_circuit_t *circ);
 
 #endif
 
diff --git a/src/or/main.c b/src/or/main.c
index abf3230..136043c 100644
--- a/src/or/main.c
+++ b/src/or/main.c
@@ -1271,7 +1271,7 @@ run_scheduled_events(time_t now)
       get_onion_key_set_at()+MIN_ONION_KEY_LIFETIME < now) {
     log_info(LD_GENERAL,"Rotating onion key.");
     rotate_onion_key();
-    cpuworkers_rotate();
+    cpuworkers_rotate_keyinfo();
     if (router_rebuild_descriptor(1)<0) {
       log_info(LD_CONFIG, "Couldn't rebuild router descriptor");
     }
@@ -1960,9 +1960,9 @@ do_hup(void)
    * force a retry there. */
 
   if (server_mode(options)) {
-    /* Restart cpuworker and dnsworker processes, so they get up-to-date
+    /* Update cpuworker and dnsworker processes, so they get up-to-date
      * configuration options. */
-    cpuworkers_rotate();
+    cpuworkers_rotate_keyinfo();
     dns_reset();
   }
   return 0;
diff --git a/src/or/onion.c b/src/or/onion.c
index 3723a3e..43fb63c 100644
--- a/src/or/onion.c
+++ b/src/or/onion.c
@@ -295,6 +295,8 @@ onion_pending_remove(or_circuit_t *circ)
   victim = circ->onionqueue_entry;
   if (victim)
     onion_queue_entry_remove(victim);
+
+  cpuworker_cancel_circ_handshake(circ);
 }
 
 /** Remove a queue entry <b>victim</b> from the queue, unlinking it from
@@ -339,25 +341,25 @@ clear_pending_onions(void)
 
 /* ============================================================ */
 
-/** Fill in a server_onion_keys_t object at <b>keys</b> with all of the keys
+/** Return a new server_onion_keys_t object with all of the keys
  * and other info we might need to do onion handshakes.  (We make a copy of
  * our keys for each cpuworker to avoid race conditions with the main thread,
  * and to avoid locking) */
-void
-setup_server_onion_keys(server_onion_keys_t *keys)
+server_onion_keys_t *
+server_onion_keys_new(void)
 {
-  memset(keys, 0, sizeof(server_onion_keys_t));
+  server_onion_keys_t *keys = tor_malloc_zero(sizeof(server_onion_keys_t));
   memcpy(keys->my_identity, router_get_my_id_digest(), DIGEST_LEN);
   dup_onion_keys(&keys->onion_key, &keys->last_onion_key);
   keys->curve25519_key_map = construct_ntor_key_map();
   keys->junk_keypair = tor_malloc_zero(sizeof(curve25519_keypair_t));
   curve25519_keypair_generate(keys->junk_keypair, 0);
+  return keys;
 }
 
-/** Release all storage held in <b>keys</b>, but do not free <b>keys</b>
- * itself (as it's likely to be stack-allocated.) */
+/** Release all storage held in <b>keys</b>. */
 void
-release_server_onion_keys(server_onion_keys_t *keys)
+server_onion_keys_free(server_onion_keys_t *keys)
 {
   if (! keys)
     return;
@@ -366,7 +368,8 @@ release_server_onion_keys(server_onion_keys_t *keys)
   crypto_pk_free(keys->last_onion_key);
   ntor_key_map_free(keys->curve25519_key_map);
   tor_free(keys->junk_keypair);
-  memset(keys, 0, sizeof(server_onion_keys_t));
+  memwipe(keys, 0, sizeof(server_onion_keys_t));
+  tor_free(keys);
 }
 
 /** Release whatever storage is held in <b>state</b>, depending on its
diff --git a/src/or/onion.h b/src/or/onion.h
index 3561987..9605008 100644
--- a/src/or/onion.h
+++ b/src/or/onion.h
@@ -30,8 +30,8 @@ typedef struct server_onion_keys_t {
 #define MAX_ONIONSKIN_CHALLENGE_LEN 255
 #define MAX_ONIONSKIN_REPLY_LEN 255
 
-void setup_server_onion_keys(server_onion_keys_t *keys);
-void release_server_onion_keys(server_onion_keys_t *keys);
+server_onion_keys_t *server_onion_keys_new(void);
+void server_onion_keys_free(server_onion_keys_t *keys);
 
 void onion_handshake_state_release(onion_handshake_state_t *state);
 
diff --git a/src/or/or.h b/src/or/or.h
index 8a15529..4ff3555 100644
--- a/src/or/or.h
+++ b/src/or/or.h
@@ -213,8 +213,7 @@ typedef enum {
 #define CONN_TYPE_DIR_LISTENER 8
 /** Type for HTTP connections to the directory server. */
 #define CONN_TYPE_DIR 9
-/** Connection from the main process to a CPU worker process. */
-#define CONN_TYPE_CPUWORKER 10
+/* Type 10 is unused. */
 /** Type for listening for connections from user interface process. */
 #define CONN_TYPE_CONTROL_LISTENER 11
 /** Type for connections from user interface process. */
@@ -276,17 +275,6 @@ typedef enum {
 /** State for any listener connection. */
 #define LISTENER_STATE_READY 0
 
-#define CPUWORKER_STATE_MIN_ 1
-/** State for a connection to a cpuworker process that's idle. */
-#define CPUWORKER_STATE_IDLE 1
-/** State for a connection to a cpuworker process that's processing a
- * handshake. */
-#define CPUWORKER_STATE_BUSY_ONION 2
-#define CPUWORKER_STATE_MAX_ 2
-
-#define CPUWORKER_TASK_ONION CPUWORKER_STATE_BUSY_ONION
-#define CPUWORKER_TASK_SHUTDOWN 255
-
 #define OR_CONN_STATE_MIN_ 1
 /** State for a connection to an OR: waiting for connect() to finish. */
 #define OR_CONN_STATE_CONNECTING 1
@@ -3158,6 +3146,9 @@ typedef struct or_circuit_t {
   /** Pointer to an entry on the onion queue, if this circuit is waiting for a
    * chance to give an onionskin to a cpuworker. Used only in onion.c */
   struct onion_queue_t *onionqueue_entry;
+  /** Pointer to a workqueue entry, if this circuit has given an onionskin to
+   * a cpuworker and is waiting for a response. Used only in cpuworker.c */
+  struct workqueue_entry_s *workqueue_entry;
 
   /** The circuit_id used in the previous (backward) hop of this circuit. */
   circid_t p_circ_id;



_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits