[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[Libevent-users] libevent + libpthread_workqueue = EV_PARALLEL
See attached for an experimental patch that adds a new flag EV_PARALLEL
for the ev_events field of 'struct event'. If this flag is turned on,
the callback will be invoked asynchronously using a threadpool. The
caller is responsible for ensuring that the callback function can safely
be executed by multiple threads concurrently.
My patch uses libpthread_workqueue as the underlying threadpool
implementation. I am the main author of libpthread_workqueue and
probably biased towards it :) Despite the name, it has been ported to
Windows, and has been used in the portable version of Grand Central
Dispatch (a.k.a libdispatch).
For more information, see here:
http://mark.heily.com/project/libpthread_workqueue
I had to manually add '-lpthread_workqueue' to the dependency list in
libdispatch_core.la in order to get everything to build correctly. As a
test, I hacked up test/test-time.c to demonstrate that the threadpool is
functional.
This is a very preliminary patch and should not be considered for
inclusion in libevent "as is"; however, I would like feedback on the
general idea, along with additional things that may be needed.
Cheers,
- Mark
diff --git a/Makefile.am b/Makefile.am
index 8f0907f..b5d906a 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -129,6 +129,10 @@ SYS_INCLUDES =
endif
+if USE_PTHREAD_WORKQUEUE
+SYS_LIBS += -lpthread_workqueue
+endif
+
if SELECT_BACKEND
SYS_SRC += select.c
endif
@@ -190,11 +194,11 @@ endif
GENERIC_LDFLAGS = -version-info $(VERSION_INFO) $(RELEASE) $(NO_UNDEFINED)
libevent_la_SOURCES = $(CORE_SRC) $(EXTRA_SRC)
-libevent_la_LIBADD = @LTLIBOBJS@ $(SYS_LIBS)
+libevent_la_LIBADD = @LTLIBOBJS@ $(SYS_LIBS) -lpthread_workqueue
libevent_la_LDFLAGS = $(GENERIC_LDFLAGS)
libevent_core_la_SOURCES = $(CORE_SRC)
-libevent_core_la_LIBADD = @LTLIBOBJS@ $(SYS_LIBS)
+libevent_core_la_LIBADD = @LTLIBOBJS@ $(SYS_LIBS) -lpthread_workqueue
libevent_core_la_LDFLAGS = $(GENERIC_LDFLAGS)
if PTHREADS
diff --git a/configure.in b/configure.in
index 8a9211d..e81ae29 100644
--- a/configure.in
+++ b/configure.in
@@ -188,6 +188,7 @@ AC_CHECK_HEADERS([ \
netinet/in6.h \
poll.h \
port.h \
+ pthread_workqueue.h \
stdarg.h \
stddef.h \
stdint.h \
@@ -574,6 +575,12 @@ fi
AM_CONDITIONAL(SIGNAL_SUPPORT, [test "x$needsignal" = "xyes"])
+if test "x$ac_cv_header_pthread_workqueue_h" = "xyes"; then
+ AC_DEFINE(HAVE_PTHREAD_WORKQUEUE, 1,
+ [Define if pthread_workqueue API is available])
+fi
+AM_CONDITIONAL(USE_PTHREAD_WORKQUEUE, [test "x$ac_cv_header_pthread_workqueue_h" = "xyes"])
+
AC_TYPE_PID_T
AC_TYPE_SIZE_T
AC_TYPE_SSIZE_T
diff --git a/event.c b/event.c
index 575f8f5..4c4011d 100644
--- a/event.c
+++ b/event.c
@@ -49,6 +49,9 @@
#ifdef _EVENT_HAVE_SYS_EVENTFD_H
#include <sys/eventfd.h>
#endif
+#if _EVENT_HAVE_PTHREAD_WORKQUEUE
+#include <pthread_workqueue.h>
+#endif
#include <ctype.h>
#include <errno.h>
#include <signal.h>
@@ -125,6 +128,12 @@ struct event_base *event_global_current_base_ = NULL;
/* Global state */
+#if _EVENT_HAVE_PTHREAD_WORKQUEUE
+static pthread_workqueue_t event_global_workqueue;
+#endif
+
+/* Global state */
+
static int use_monotonic;
/* Prototypes */
@@ -156,6 +165,13 @@ static int evthread_notify_base(struct event_base *base);
static void insert_common_timeout_inorder(struct common_timeout_list *ctl,
struct event *ev);
+struct event_continuation {
+ event_callback_fn ev_callback;
+ evutil_socket_t ev_fd;
+ short ev_res;
+ void *ev_arg;
+};
+
#ifndef _EVENT_DISABLE_DEBUG_MODE
/* These functions implement a hashtable of which 'struct event *' structures
* have been setup or added. We don't want to trust the content of the struct
@@ -451,6 +467,42 @@ event_base_update_cache_time(struct event_base *base)
return 0;
}
+static int
+event_parallelism_init(void)
+{
+#if _EVENT_HAVE_PTHREAD_WORKQUEUE
+ static int once = 0;
+ pthread_workqueue_attr_t pwq_attr;
+ int rv;
+
+ // FIXME -- Not as good as pthread_once()
+ if (once > 0)
+ return 0;
+ once = 1;
+
+ rv = pthread_workqueue_attr_init_np(&pwq_attr);
+ if (rv != 0) {
+ event_errx(1, "%s: Unable to initialize attribute structure", __func__);
+ return -1;
+ }
+
+ rv = pthread_workqueue_attr_setqueuepriority_np(&pwq_attr,
+ WORKQ_DEFAULT_PRIOQUEUE);
+ if (rv != 0) {
+ event_errx(1, "%s: Unable to set queue priority", __func__);
+ return -1;
+ }
+
+ rv = pthread_workqueue_create_np(&event_global_workqueue, &pwq_attr);
+ if (rv != 0) {
+ event_errx(1, "%s: Unable to create workqueue", __func__);
+ return -1;
+ }
+#endif
+
+ return 0;
+}
+
struct event_base *
event_init(void)
{
@@ -463,6 +515,11 @@ event_init(void)
current_base = base;
+ if (event_parallelism_init() < 0) {
+ event_base_free(base);
+ return NULL;
+ }
+
return (base);
}
@@ -1110,6 +1167,49 @@ event_haveevents(struct event_base *base)
return (base->virtual_event_count > 0 || base->event_count > 0);
}
+/* Dequeue and execute a callback function.
+ This runs on a worker thread.
+ */
+static void
+event_callback_dequeue(void *arg)
+{
+ struct event_continuation *c = arg;
+
+ (*c->ev_callback)(c->ev_fd, c->ev_res, c->ev_arg);
+ mm_free(arg);
+}
+
+/* Enqueue a callback function */
+static inline void
+event_callback_enqueue(struct event *ev)
+{
+#if _EVENT_HAVE_PTHREAD_WORKQUEUE
+ struct event_continuation *c;
+ int rv;
+
+ if (ev->ev_events & EV_PARALLEL) {
+ if ((c = mm_calloc(1, sizeof(struct event_continuation))) == NULL) {
+ event_errx(1, "%s: calloc", __func__);
+ return;
+ }
+ c->ev_callback = ev->ev_callback;
+ c->ev_fd = ev->ev_fd;
+ c->ev_res = ev->ev_res;
+ c->ev_arg = ev->ev_arg;
+
+ rv = pthread_workqueue_additem_np(event_global_workqueue,
+ event_callback_dequeue, c, NULL, NULL);
+ if (rv != 0) {
+ event_errx(1, "%s: Unable to add a workqueue item", __func__);
+ }
+ return;
+ }
+#endif
+
+ /* By default, the callback will be executed in the current thread */
+ (*ev->ev_callback)(ev->ev_fd, ev->ev_res, ev->ev_arg);
+}
+
/* "closure" function called when processing active signal events */
static inline void
event_signal_closure(struct event_base *base, struct event *ev)
@@ -1127,7 +1227,7 @@ event_signal_closure(struct event_base *base, struct event *ev)
ev->ev_ncalls = ncalls;
if (ncalls == 0)
ev->ev_pncalls = NULL;
- (*ev->ev_callback)(ev->ev_fd, ev->ev_res, ev->ev_arg);
+ event_callback_enqueue(ev);
EVBASE_ACQUIRE_LOCK(base, th_base_lock);
should_break = base->event_break;
@@ -1318,6 +1418,7 @@ done:
return result;
}
+
/* Closure function invoked when we're activating a persistent event. */
static inline void
event_persist_closure(struct event_base *base, struct event *ev)
@@ -1358,7 +1459,7 @@ event_persist_closure(struct event_base *base, struct event *ev)
event_add_internal(ev, &run_at, 1);
}
EVBASE_RELEASE_LOCK(base, th_base_lock);
- (*ev->ev_callback)(ev->ev_fd, ev->ev_res, ev->ev_arg);
+ event_callback_enqueue(ev);
}
/*
@@ -1408,8 +1509,7 @@ event_process_active_single_queue(struct event_base *base,
default:
case EV_CLOSURE_NONE:
EVBASE_RELEASE_LOCK(base, th_base_lock);
- (*ev->ev_callback)(
- ev->ev_fd, ev->ev_res, ev->ev_arg);
+ event_callback_enqueue(ev);
break;
}
diff --git a/include/event2/event.h b/include/event2/event.h
index 01018fa..f5cdb5a 100644
--- a/include/event2/event.h
+++ b/include/event2/event.h
@@ -793,6 +793,8 @@ int event_base_got_break(struct event_base *);
#define EV_PERSIST 0x10
/** Select edge-triggered behavior, if supported by the backend. */
#define EV_ET 0x20
+/** Invoke the callback function asynchronously using a thread pool. */
+#define EV_PARALLEL 0x40
/**@}*/
/**
diff --git a/test/test-time.c b/test/test-time.c
index 8e43148..01e4566 100644
--- a/test/test-time.c
+++ b/test/test-time.c
@@ -2,11 +2,16 @@
* Compile with:
* cc -I/usr/local/include -o time-test time-test.c -L/usr/local/lib -levent
*/
+#define _GNU_SOURCE
+
#include "event2/event-config.h"
+#include "event2/event_struct.h"
#include <sys/types.h>
#include <sys/stat.h>
+#include <sys/syscall.h>
#include <fcntl.h>
+#include <pthread.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
@@ -20,9 +25,10 @@
#include "event2/event_compat.h"
#include "event2/event_struct.h"
+pthread_mutex_t called_mtx = PTHREAD_MUTEX_INITIALIZER;
int called = 0;
-#define NEVENT 20000
+#define NEVENT 1000
struct event *ev[NEVENT];
@@ -41,20 +47,19 @@ time_cb(evutil_socket_t fd, short event, void *arg)
{
struct timeval tv;
int i, j;
+ int _called;
+ pthread_mutex_lock(&called_mtx);
called++;
+ _called = called;
+ pthread_mutex_unlock(&called_mtx);
- if (called < 10*NEVENT) {
- for (i = 0; i < 10; i++) {
- j = rand_int(NEVENT);
- tv.tv_sec = 0;
- tv.tv_usec = rand_int(50000);
- if (tv.tv_usec % 2)
- evtimer_add(ev[j], &tv);
- else
- evtimer_del(ev[j]);
- }
- }
+ printf ("thread %d - called == %d\n", syscall(SYS_gettid), _called);
+
+ if (_called == NEVENT) {
+ puts("Done.");
+ exit(0);
+ }
}
int
@@ -80,13 +85,15 @@ main(int argc, char **argv)
/* Initalize one event */
evtimer_set(ev[i], time_cb, ev[i]);
- tv.tv_sec = 0;
+ tv.tv_sec = 3;
tv.tv_usec = rand_int(50000);
+ ev[i]->ev_events |= EV_PARALLEL;
evtimer_add(ev[i], &tv);
}
event_dispatch();
- return (called < NEVENT);
+ sleep(10); /* Ensures all timers will fire */
+ return (1);
}