[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[Libevent-users] libevent + libpthread_workqueue = EV_PARALLEL



See attached for an experimental patch that adds a new flag EV_PARALLEL for the ev_events field of 'struct event'. If this flag is turned on, the callback will be invoked asynchronously using a threadpool. The caller is responsible for ensuring that the callback function can safely be executed by multiple threads concurrently.

My patch uses libpthread_workqueue as the underlying threadpool implementation. I am the main author of libpthread_workqueue and probably biased towards it :) Despite the name, it has been ported to Windows, and has been used in the portable version of Grand Central Dispatch (a.k.a libdispatch).

For more information, see here:

  http://mark.heily.com/project/libpthread_workqueue

I had to manually add '-lpthread_workqueue' to the dependency list in libdispatch_core.la in order to get everything to build correctly. As a test, I hacked up test/test-time.c to demonstrate that the threadpool is functional.

This is a very preliminary patch and should not be considered for inclusion in libevent "as is"; however, I would like feedback on the general idea, along with additional things that may be needed.

Cheers,

 - Mark
diff --git a/Makefile.am b/Makefile.am
index 8f0907f..b5d906a 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -129,6 +129,10 @@ SYS_INCLUDES =
 
 endif
 
+if USE_PTHREAD_WORKQUEUE
+SYS_LIBS += -lpthread_workqueue
+endif
+
 if SELECT_BACKEND
 SYS_SRC += select.c
 endif
@@ -190,11 +194,11 @@ endif
 GENERIC_LDFLAGS = -version-info $(VERSION_INFO) $(RELEASE) $(NO_UNDEFINED)
 
 libevent_la_SOURCES = $(CORE_SRC) $(EXTRA_SRC)
-libevent_la_LIBADD = @LTLIBOBJS@ $(SYS_LIBS)
+libevent_la_LIBADD = @LTLIBOBJS@ $(SYS_LIBS) -lpthread_workqueue
 libevent_la_LDFLAGS = $(GENERIC_LDFLAGS)
 
 libevent_core_la_SOURCES = $(CORE_SRC)
-libevent_core_la_LIBADD = @LTLIBOBJS@ $(SYS_LIBS)
+libevent_core_la_LIBADD = @LTLIBOBJS@ $(SYS_LIBS) -lpthread_workqueue
 libevent_core_la_LDFLAGS = $(GENERIC_LDFLAGS)
 
 if PTHREADS
diff --git a/configure.in b/configure.in
index 8a9211d..e81ae29 100644
--- a/configure.in
+++ b/configure.in
@@ -188,6 +188,7 @@ AC_CHECK_HEADERS([ \
   netinet/in6.h \
   poll.h \
   port.h \
+  pthread_workqueue.h \
   stdarg.h \
   stddef.h \
   stdint.h \
@@ -574,6 +575,12 @@ fi
 
 AM_CONDITIONAL(SIGNAL_SUPPORT, [test "x$needsignal" = "xyes"])
 
+if test "x$ac_cv_header_pthread_workqueue_h" = "xyes"; then
+	AC_DEFINE(HAVE_PTHREAD_WORKQUEUE, 1,
+		    [Define if pthread_workqueue API is available])
+fi
+AM_CONDITIONAL(USE_PTHREAD_WORKQUEUE, [test "x$ac_cv_header_pthread_workqueue_h" = "xyes"])
+
 AC_TYPE_PID_T
 AC_TYPE_SIZE_T
 AC_TYPE_SSIZE_T
diff --git a/event.c b/event.c
index 575f8f5..4c4011d 100644
--- a/event.c
+++ b/event.c
@@ -49,6 +49,9 @@
 #ifdef _EVENT_HAVE_SYS_EVENTFD_H
 #include <sys/eventfd.h>
 #endif
+#if _EVENT_HAVE_PTHREAD_WORKQUEUE
+#include <pthread_workqueue.h>
+#endif
 #include <ctype.h>
 #include <errno.h>
 #include <signal.h>
@@ -125,6 +128,12 @@ struct event_base *event_global_current_base_ = NULL;
 
 /* Global state */
 
+#if _EVENT_HAVE_PTHREAD_WORKQUEUE
+static pthread_workqueue_t event_global_workqueue;
+#endif
+
+/* Global state */
+
 static int use_monotonic;
 
 /* Prototypes */
@@ -156,6 +165,13 @@ static int	evthread_notify_base(struct event_base *base);
 static void insert_common_timeout_inorder(struct common_timeout_list *ctl,
     struct event *ev);
 
+struct event_continuation {
+    event_callback_fn   ev_callback;
+    evutil_socket_t     ev_fd;
+    short               ev_res;
+    void               *ev_arg;
+};
+
 #ifndef _EVENT_DISABLE_DEBUG_MODE
 /* These functions implement a hashtable of which 'struct event *' structures
  * have been setup or added.  We don't want to trust the content of the struct
@@ -451,6 +467,42 @@ event_base_update_cache_time(struct event_base *base)
 	return 0;
 }
 
+static int
+event_parallelism_init(void)
+{
+#if _EVENT_HAVE_PTHREAD_WORKQUEUE
+    static int once = 0;
+    pthread_workqueue_attr_t pwq_attr;
+    int rv;
+
+    // FIXME -- Not as good as pthread_once()
+    if (once > 0)
+        return 0;
+    once = 1;
+
+    rv = pthread_workqueue_attr_init_np(&pwq_attr);
+    if (rv != 0) {
+		event_errx(1, "%s: Unable to initialize attribute structure", __func__);
+		return -1;
+	}
+
+    rv = pthread_workqueue_attr_setqueuepriority_np(&pwq_attr, 
+            WORKQ_DEFAULT_PRIOQUEUE);
+    if (rv != 0) {
+		event_errx(1, "%s: Unable to set queue priority", __func__);
+		return -1;
+    }
+
+    rv = pthread_workqueue_create_np(&event_global_workqueue, &pwq_attr);
+    if (rv != 0) {
+		event_errx(1, "%s: Unable to create workqueue", __func__);
+		return -1;
+	}
+#endif
+
+    return 0;
+}
+
 struct event_base *
 event_init(void)
 {
@@ -463,6 +515,11 @@ event_init(void)
 
 	current_base = base;
 
+    if (event_parallelism_init() < 0) {
+        event_base_free(base);
+        return NULL;
+    }
+
 	return (base);
 }
 
@@ -1110,6 +1167,49 @@ event_haveevents(struct event_base *base)
 	return (base->virtual_event_count > 0 || base->event_count > 0);
 }
 
+/* Dequeue and execute a callback function.
+   This runs on a worker thread. 
+ */
+static void
+event_callback_dequeue(void *arg)
+{
+    struct event_continuation *c = arg;
+
+    (*c->ev_callback)(c->ev_fd, c->ev_res, c->ev_arg);
+    mm_free(arg);
+}
+
+/* Enqueue a callback function */
+static inline void
+event_callback_enqueue(struct event *ev)
+{
+#if _EVENT_HAVE_PTHREAD_WORKQUEUE
+    struct event_continuation *c;
+    int rv;
+
+    if (ev->ev_events & EV_PARALLEL) {
+        if ((c = mm_calloc(1, sizeof(struct event_continuation))) == NULL) {
+            event_errx(1, "%s: calloc", __func__);
+            return;
+        }
+        c->ev_callback = ev->ev_callback;
+        c->ev_fd = ev->ev_fd;
+        c->ev_res = ev->ev_res;
+        c->ev_arg = ev->ev_arg;
+
+        rv = pthread_workqueue_additem_np(event_global_workqueue, 
+                event_callback_dequeue, c, NULL, NULL);
+        if (rv != 0) {
+            event_errx(1, "%s: Unable to add a workqueue item", __func__);
+        }
+        return;
+    }
+#endif
+
+    /* By default, the callback will be executed in the current thread */
+    (*ev->ev_callback)(ev->ev_fd, ev->ev_res, ev->ev_arg);
+}
+
 /* "closure" function called when processing active signal events */
 static inline void
 event_signal_closure(struct event_base *base, struct event *ev)
@@ -1127,7 +1227,7 @@ event_signal_closure(struct event_base *base, struct event *ev)
 		ev->ev_ncalls = ncalls;
 		if (ncalls == 0)
 			ev->ev_pncalls = NULL;
-		(*ev->ev_callback)(ev->ev_fd, ev->ev_res, ev->ev_arg);
+        event_callback_enqueue(ev);
 
 		EVBASE_ACQUIRE_LOCK(base, th_base_lock);
 		should_break = base->event_break;
@@ -1318,6 +1418,7 @@ done:
 	return result;
 }
 
+
 /* Closure function invoked when we're activating a persistent event. */
 static inline void
 event_persist_closure(struct event_base *base, struct event *ev)
@@ -1358,7 +1459,7 @@ event_persist_closure(struct event_base *base, struct event *ev)
 		event_add_internal(ev, &run_at, 1);
 	}
 	EVBASE_RELEASE_LOCK(base, th_base_lock);
-	(*ev->ev_callback)(ev->ev_fd, ev->ev_res, ev->ev_arg);
+    event_callback_enqueue(ev);
 }
 
 /*
@@ -1408,8 +1509,7 @@ event_process_active_single_queue(struct event_base *base,
 		default:
 		case EV_CLOSURE_NONE:
 			EVBASE_RELEASE_LOCK(base, th_base_lock);
-			(*ev->ev_callback)(
-				ev->ev_fd, ev->ev_res, ev->ev_arg);
+            event_callback_enqueue(ev);
 			break;
 		}
 
diff --git a/include/event2/event.h b/include/event2/event.h
index 01018fa..f5cdb5a 100644
--- a/include/event2/event.h
+++ b/include/event2/event.h
@@ -793,6 +793,8 @@ int event_base_got_break(struct event_base *);
 #define EV_PERSIST	0x10
 /** Select edge-triggered behavior, if supported by the backend. */
 #define EV_ET       0x20
+/** Invoke the callback function asynchronously using a thread pool. */
+#define EV_PARALLEL 0x40
 /**@}*/
 
 /**
diff --git a/test/test-time.c b/test/test-time.c
index 8e43148..01e4566 100644
--- a/test/test-time.c
+++ b/test/test-time.c
@@ -2,11 +2,16 @@
  * Compile with:
  * cc -I/usr/local/include -o time-test time-test.c -L/usr/local/lib -levent
  */
+#define _GNU_SOURCE
+
 #include "event2/event-config.h"
+#include "event2/event_struct.h"
 
 #include <sys/types.h>
 #include <sys/stat.h>
+#include <sys/syscall.h>
 #include <fcntl.h>
+#include <pthread.h>
 #include <stdlib.h>
 #include <stdio.h>
 #include <string.h>
@@ -20,9 +25,10 @@
 #include "event2/event_compat.h"
 #include "event2/event_struct.h"
 
+pthread_mutex_t called_mtx = PTHREAD_MUTEX_INITIALIZER;
 int called = 0;
 
-#define NEVENT	20000
+#define NEVENT 1000
 
 struct event *ev[NEVENT];
 
@@ -41,20 +47,19 @@ time_cb(evutil_socket_t fd, short event, void *arg)
 {
 	struct timeval tv;
 	int i, j;
+    int _called;
 
+    pthread_mutex_lock(&called_mtx);
 	called++;
+    _called = called;
+    pthread_mutex_unlock(&called_mtx);
 
-	if (called < 10*NEVENT) {
-		for (i = 0; i < 10; i++) {
-			j = rand_int(NEVENT);
-			tv.tv_sec = 0;
-			tv.tv_usec = rand_int(50000);
-			if (tv.tv_usec % 2)
-				evtimer_add(ev[j], &tv);
-			else
-				evtimer_del(ev[j]);
-		}
-	}
+    printf ("thread %d - called == %d\n", syscall(SYS_gettid), _called);
+
+    if (_called == NEVENT) {
+        puts("Done.");
+        exit(0);
+    }
 }
 
 int
@@ -80,13 +85,15 @@ main(int argc, char **argv)
 
 		/* Initalize one event */
 		evtimer_set(ev[i], time_cb, ev[i]);
-		tv.tv_sec = 0;
+		tv.tv_sec = 3;
 		tv.tv_usec = rand_int(50000);
+        ev[i]->ev_events |= EV_PARALLEL;
 		evtimer_add(ev[i], &tv);
 	}
 
 	event_dispatch();
 
-	return (called < NEVENT);
+    sleep(10);  /* Ensures all timers will fire */
+	return (1);
 }