[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[Libevent-users] bufferevent_pair in multithreaded application



Hi,

I've got following use case:
in a multithreaded application:
some threads are running in an eventloop (with a event_base)
some thread are not, the just run as pthreads with 'while(1)' or a
'select/epoll' on a set of fds

In order to communicate between these threads, in a memory safe and
clean locking manner, I was thinking on using a bufferevent_pair
between such an eventloop thread and a 'pthread' thread.

Questions:
* If I understand it correctly, the bufferevent pair would be created
and handled by the eventloop_thread, and the pthread thread can write
and read to one of the bufferevent buffers in the pair. Is this
correct? Is this use case a way you can use bufferevent pairs?
* in the pthread (the 'other' thread), is there a way to
'select/poll/epoll' on a fd of this buffer (or read/write events on
this buffer), without using libevent/eventloop? Or how would this be
used (busy poll loop like in the example below not taking into
account), Or do you need to create a separate pipe (e.g.) to trigger
the pthread (which not using libevent besides the bufferevent_write
and bufferevent_read)

example code on little poc below.

br,

Jan

poc example: (thread sends data to eventloop thread, and eventloop
sends every 4 seconds data to thread)

#include <pthread.h>

#include "event2/bufferevent.h"
#include "event2/buffer.h"
#include "event2/event.h"
#include "event2/thread.h"

struct bufferevent *bev1 = NULL, *bev2 = NULL;
char buffer[8333];

void *read_thread(void *x_void_ptr)
{
    char dump[9000];

    //send something to eventloop
    bufferevent_write(bev1, buffer, sizeof(buffer));

    while(1){
        sleep(1);
        int size = bufferevent_read(bev1, dump, sizeof(dump));
        printf("size read: %d\n",size);
    }
}

static void
readcb(struct bufferevent *bev, void *arg)
{
    printf("readcb: %p, %lu\n", arg, pthread_self());
}

static void
writecb(struct bufferevent *bev, void *arg)
{
    printf("writecb: %p, %lu\n", arg, pthread_self());
}

static void dummy(evutil_socket_t fd, short what, void *arg)
{
    bufferevent_write(bev2, buffer, sizeof(buffer)); //FIXME: same
buffer as in thread, but just used as data source in this poc
}


int main()
{
   int i;
    //put something in the buffer
    for (i = 0; i < (int)sizeof(buffer); i++)
        buffer[i] = i;

    evthread_use_pthreads();

    struct event_base *base_loop = event_base_new();

    struct bufferevent *pair[2];
    if(bufferevent_pair_new(base_loop, BEV_OPT_THREADSAFE, pair) != 0){
        fprintf(stderr, "Error creating pair\n");
        return 0;
   }

    bev1 = pair[0];
    bev2 = pair[1];
    bufferevent_setcb(bev1, readcb, writecb, NULL, bev1);
    bufferevent_setcb(bev2, readcb, writecb, NULL, bev2);

    struct event* ev = event_new(base_loop, -1, EV_PERSIST, dummy, NULL);
    struct timeval timeout_val;
    timeout_val.tv_sec = 4;
    timeout_val.tv_usec = 0;
    evtimer_add(ev, &timeout_val);

    bufferevent_enable(bev1, EV_READ);
    bufferevent_enable(bev2, EV_READ);

    pthread_t read_thread_p;
    if(pthread_create(&read_thread_p, NULL, read_thread, NULL)) {
        fprintf(stderr, "Error creating thread\n");
        return 0;
   }

    event_base_dispatch(base_loop);

    return 0;
}
***********************************************************************
To unsubscribe, send an e-mail to majordomo@xxxxxxxxxxxxx with
unsubscribe libevent-users    in the body.