[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

Re: [Libevent-users] Help with progress thread



On Fri, Nov 12, 2010 at 10:47 AM, Ralph Castain <rhc@xxxxxxxxxxxx> wrote:
> Hi Nick
>
> Appreciate the help. Shame on me for the buglet re open.
>
> Using your revisions, I was able to get this to work with our integration (i.e., using the opal abstractions). I'm a little puzzled by the procedure, though, so if you don't mind my tasking your patience, I do have a question.
>
> I gather that the standard procedure in threaded scenarios is to signal the "progress thread" every time a new event is assigned to that event base by another thread.

No.  There is internal notification code that is called when you
event_add or event_del an event from another thread, and this code is
supposed wake up the thread that's in the event loop automatically so
it can restart the loop.  See my messages of Nov 9 to Nov 10.

> Your "event_active" method is nicer than writing a pipe - thanks for pointing it out - but bottom line is that you have to use an "update" event to force the event lib out of event_loop and then re-enter event_loop. True?

False... I think?  I think I am confused as to what you think is going on here.

Look at the code in evthread-test.c.  The event_add for the write
event happens from thread 1 (the initial thread) while thread 2 (the
one running event_base_loop) is running the event loop.  If I needed
to do something special in order to make an event_add get counted, it
would already be too late.

Here is a simpler variation.  The main thread launches an
event_base_loop thread to watch for events, waits, then adds an event
to the event_base_loop thread.  Then it shows that it can trigger the
event several times without having done anything besides event_add()
to tell the new thread about it.  Then it waits for events to process
as before, then it tells the subthread to exit using
event_base_loopexit (which also adds an internal event to the
event_base), then it returns.  *At no point is any magic required to
add the events, even though they are added from a different thread.*

If this isn't what you had in mind, could you post a small sample of
the code that you think Libevent doesn't support, but should?

yrs,
-- 
Nick
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <stdbool.h>
#ifndef WIN32
#include <unistd.h>
#include <sys/time.h>
#endif
#include <errno.h>

#include <pthread.h>

#include <event2/util.h>
#include <event2/thread.h>
#include <event2/event.h>
#include <event2/event_struct.h>

static struct event_base *my_base=NULL;
static pthread_t progress_thread;
static bool progress_thread_stop=false;
static int progress_thread_pipe[2];
static pthread_mutex_t lock;
static struct event read_event;
static bool fd_written=false;

static void* progress_engine(void *obj);
static void read_handler(int sd, short flags, void* cbdata);

int main(int argc, char **argv)
{
    char byte='a';
    struct timespec tp={0, 100};
    int count=0;

    /* setup for threads */
    evthread_use_pthreads();

    /* create a new base */
    my_base = event_base_new();

    /* launch a progress thread on that base*/
    pipe(progress_thread_pipe);

    if (pthread_mutex_init(&lock, NULL)) {
     fprintf(stderr, "pthread_mutex_init failed\n");
      exit(1);
    }
    if (pthread_create(&progress_thread, NULL, progress_engine,
                       NULL)) {
     fprintf(stderr, "pthread_create failed\n");
      exit(1);
    }
    /*
      pthread starts the thread running itself; no need to do anything to
      launch it.
    */

    /* wait a little while - reflects reality in an async system */
    while (count < 100) {
        nanosleep(&tp, NULL);
        count++;
    }
    count=0;

    /* Add an event for the subthread to read from the pipe. */
    event_assign(&read_event, my_base,
                 progress_thread_pipe[0], EV_READ|EV_PERSIST,
                 read_handler, NULL);
    event_add(&read_event, NULL);

    while (count < 5) {
        /* Write to the pipe; see if the subthread gets the message */
        if (write(progress_thread_pipe[1], &byte, 1) < 0) {
            perror("write");
            exit(1);
        }
        nanosleep(&tp, NULL);
        count++;
    }
    count=0;

    /* wait for it to trigger */
    while (!fd_written && count < 1000) {
        if (0 == (count % 100)) {
            fprintf(stderr, "Waiting...\n");
        }
        nanosleep(&tp, NULL);
        count++;
    }

    /* stop the thread */
    pthread_mutex_lock(&lock);
    progress_thread_stop = true;
    pthread_mutex_unlock(&lock);

    event_base_loopexit(my_base, NULL);

    pthread_join(progress_thread, NULL);

    return 0;
}

static int n_bytes_read = 0;

static void read_handler(int sd, short flags, void *cbdata)
{
    char byte;
    int n;
    if ((n = read(progress_thread_pipe[0], &byte, 1)) <= 0) {
      if (n == 0)
        fprintf(stderr, "got a close\n");
      else
        perror("read");
    } else {
        fprintf(stderr, "I read %d bytes successfully\n", n);
        n_bytes_read += n;
    }
}

static void timeout_cb(int sd, short flags, void *cbdata)
{
    /* This should never happen */
    abort();
}

static void* progress_engine(void *obj)
{
    /* Add a timeout event on the event base.  There needs to be at least
     * one event added, or event_base_loop will exit immediately.  We'll give
     * it a huge timeout. */
    struct timeval tv = { 10*60, 0 };
    struct event timer;
    evtimer_assign(&timer, my_base, timeout_cb, NULL);
    if (event_add(&timer, &tv) < 0) {
        fprintf(stderr, "couldn't add timer");
        return (void*)0;
    }

    while (1) {
        pthread_mutex_lock(&lock);
        if (progress_thread_stop) {
            fprintf(stderr, "Thread stopping\n");
            pthread_mutex_unlock(&lock);
            event_del(&timer);
            return (void*)1;
        }
        pthread_mutex_unlock(&lock);

        fprintf(stderr, "Looping...\n");
        event_base_loop(my_base, EVLOOP_ONCE);
    }
}