[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[or-cvs] r10499: Commit for 5th June Delayed callbacks mostly implemented. (in libevent-urz/trunk: . doc)
Author: Urz
Date: 2007-06-05 04:57:15 -0400 (Tue, 05 Jun 2007)
New Revision: 10499
Modified:
libevent-urz/trunk/buffer.c
libevent-urz/trunk/doc/plan.txt
libevent-urz/trunk/event-internal.h
libevent-urz/trunk/event.h
libevent-urz/trunk/signal.c
Log:
Commit for 5th June
Delayed callbacks mostly implemented.
Modified: libevent-urz/trunk/buffer.c
===================================================================
--- libevent-urz/trunk/buffer.c 2007-06-05 07:44:52 UTC (rev 10498)
+++ libevent-urz/trunk/buffer.c 2007-06-05 08:57:15 UTC (rev 10499)
@@ -54,19 +54,68 @@
#ifdef HAVE_UNISTD_H
#include <unistd.h>
#endif
+#ifdef WIN32
+#include <windows.h>
+#include <process.h>
+#endif
#include "event.h"
+#include "event_internal.h"
-int del_notifier_status = NOTIFIER_UNINIT;
-int del_notifier[2];
-static struct event evbuffer_del_event;
+void evbuffer_lock(struct evbuffer *buffer)
+{
+#ifdef WIN32
+ /* Code borrowed and changed from Tor's compat.c */
+ /*
+ * Note (from http://msdn2.microsoft.com/en-us/library/ms684266.aspx):
+ * After a thread obtains ownership of a mutex,
+ * it can specify the same mutex in repeated calls
+ * to the wait-functions without blocking its execution.
+ * This prevents a thread from deadlocking itself while
+ * waiting for a mutex that it already owns. To release
+ * its ownership under such circumstances, the thread must
+ * call ReleaseMutex once for each time that the mutex
+ * satisfied the conditions of a wait function.
+ *
+ * Thus, evbuffer_lock can be called (safely) repeatedly
+ * by the same thread, meaning we don't have problems with
+ * a thread deadlocking itself.
+ */
+ DWORD ret;
+ do {
+ ret = WaitForSingleObject(buffer->lock, INFINITE);
+ if(ret != WAIT_ABANDONED && ret != WAIT_OBJECT_0) {
+ printf("Failed to acquire mutex: %d",(int) GetLastError());
+ }
+ } while(ret != WAIT_ABANDONED && ret != WAIT_OBJECT_0);
+#endif
+}
+void evbuffer_unlock(struct evbuffer *buffer)
+{
+#ifdef WIN32
+ /* Code borrowed and changed from Tor's compat.c */
+ BOOL ret;
+ ret = 0;
+ do {
+ ret = ReleaseMutex(buffer->lock);
+ if(!ret) {
+ printf("Failed to release mutex: %d", (int) GetLastError());
+ }
+ } while(!ret);
+#endif
+}
+
struct evbuffer *
evbuffer_new(void)
{
struct evbuffer *buffer;
buffer = calloc(1, sizeof(struct evbuffer));
+
+ #ifdef WIN32
+ buffer->lock = CreateMutex(NULL, FALSE, NULL);
+ #endif
return (buffer);
}
@@ -97,6 +146,9 @@
{
int res;
+ evbuffer_lock(outbuf);
+ evbuffer_lock(inbuf);
+
/* Short cut for better performance */
if (outbuf->off == 0) {
struct evbuffer tmp;
@@ -117,6 +169,9 @@
if (oldoff && outbuf->cb != NULL)
(*outbuf->cb)(outbuf, 0, oldoff, outbuf->cbarg);
+ evbuffer_unlock(outbuf);
+ evbuffer_unlock(inbuf);
+
return (0);
}
@@ -126,6 +181,8 @@
evbuffer_drain(inbuf, inbuf->off);
}
+ evbuffer_unlock(outbuf);
+ evbuffer_unlock(inbuf);
return (res);
}
@@ -137,6 +194,8 @@
size_t oldoff = buf->off;
int sz;
va_list aq;
+
+ evbuffer_lock(buf);
for (;;) {
buffer = (char *)buf->buffer + buf->off;
@@ -157,14 +216,17 @@
va_end(aq);
if (sz == -1)
+ evbuffer_unlock(buf);
return (-1);
if (sz < space) {
buf->off += sz;
if (buf->cb != NULL)
(*buf->cb)(buf, oldoff, buf->off, buf->cbarg);
+ evbuffer_unlock(buf);
return (sz);
}
if (evbuffer_expand(buf, sz + 1) == -1)
+ evbuffer_unlock(buf);
return (-1);
}
@@ -176,6 +238,8 @@
{
int res = -1;
va_list ap;
+
+ /* No need to do lock / unlock here, it is done in evbuffer_add_vprintf */
va_start(ap, fmt);
res = evbuffer_add_vprintf(buf, fmt, ap);
@@ -190,11 +254,16 @@
evbuffer_remove(struct evbuffer *buf, void *data, size_t datlen)
{
size_t nread = datlen;
+
+ evbuffer_lock(buf);
+
if (nread >= buf->off)
nread = buf->off;
memcpy(data, buf->buffer, nread);
evbuffer_drain(buf, nread);
+
+ evbuffer_unlock(buf);
return (nread);
}
@@ -207,10 +276,15 @@
char *
evbuffer_readline(struct evbuffer *buffer)
{
- u_char *data = EVBUFFER_DATA(buffer);
- size_t len = EVBUFFER_LENGTH(buffer);
+ u_char *data;
+ size_t len;
char *line;
unsigned int i;
+
+ evbuffer_lock(buffer);
+
+ data = EVBUFFER_DATA(buffer);
+ len = EVBUFFER_LENGTH(buffer);
for (i = 0; i < len; i++) {
if (data[i] == '\r' || data[i] == '\n')
@@ -242,7 +316,9 @@
}
evbuffer_drain(buffer, i + 1);
-
+
+ evbuffer_unlock(buffer);
+
return (line);
}
@@ -251,6 +327,14 @@
static inline void
evbuffer_align(struct evbuffer *buf)
{
+ evbuffer_lock(buf);
+ evbuffer_align(buf);
+ evbuffer_unlock(buf);
+}
+
+static inline void
+evbuffer_align(struct evbuffer *buf)
+{
memmove(buf->orig_buffer, buf->buffer, buf->off);
buf->buffer = buf->orig_buffer;
buf->misalign = 0;
@@ -261,8 +345,12 @@
int
evbuffer_expand(struct evbuffer *buf, size_t datlen)
{
- size_t need = buf->misalign + buf->off + datlen;
+ size_t need;
+ evbuffer_lock(buf);
+
+ need = buf->misalign + buf->off + datlen;
+
/* If we can fit all the data, then we don't have to do anything */
if (buf->totallen >= need)
return (0);
@@ -284,25 +372,35 @@
if (buf->orig_buffer != buf->buffer)
evbuffer_align(buf);
- if ((newbuf = realloc(buf->buffer, length)) == NULL)
+ if ((newbuf = realloc(buf->buffer, length)) == NULL) {
+ evbuffer_unlock(buf);
return (-1);
+ }
buf->orig_buffer = buf->buffer = newbuf;
buf->totallen = length;
}
+ evbuffer_unlock(buf);
return (0);
}
int
evbuffer_add(struct evbuffer *buf, const void *data, size_t datlen)
{
- size_t need = buf->misalign + buf->off + datlen;
- size_t oldoff = buf->off;
+ size_t need;
+ size_t oldoff;
+
+ evbuffer_lock(buf);
+
+ need = buf->misalign + buf->off + datlen;
+ oldoff = buf->off;
if (buf->totallen < need) {
- if (evbuffer_expand(buf, datlen) == -1)
+ if (evbuffer_expand(buf, datlen) == -1) {
+ evbuffer_unlock(buf);
return (-1);
+ }
}
memcpy(buf->buffer + buf->off, data, datlen);
@@ -311,13 +409,18 @@
if (datlen && buf->cb != NULL)
(*buf->cb)(buf, oldoff, buf->off, buf->cbarg);
+ evbuffer_unlock(buf);
return (0);
}
void
evbuffer_drain(struct evbuffer *buf, size_t len)
{
- size_t oldoff = buf->off;
+ size_t oldoff;
+
+ evbuffer_lock(buf);
+
+ oldoff = buf->off;
if (len >= buf->off) {
buf->off = 0;
@@ -335,6 +438,8 @@
/* Tell someone about changes in this buffer */
if (buf->off != oldoff && buf->cb != NULL)
(*buf->cb)(buf, oldoff, buf->off, buf->cbarg);
+
+ evbuffer_unlock(buf);
}
@@ -348,11 +453,14 @@
evbuffer_read(struct evbuffer *buf, int fd, int howmuch)
{
u_char *p;
- size_t oldoff = buf->off;
+ size_t oldoff;
int n = EVBUFFER_MAX_READ;
#ifdef WIN32
DWORD dwBytesRead;
#endif
+ evbuffer_lock(buf);
+
+ oldoff = buf->off;
#ifdef FIONREAD
if (ioctl(fd, FIONREAD, &n) == -1 || n == 0) {
@@ -375,32 +483,44 @@
howmuch = n;
/* If we don't have FIONREAD, we might waste some space here */
- if (evbuffer_expand(buf, howmuch) == -1)
+ if (evbuffer_expand(buf, howmuch) == -1) {
+ evbuffer_unlock(buf);
return (-1);
+ }
/* We can append new data at this point */
p = buf->buffer + buf->off;
#ifndef WIN32
n = read(fd, p, howmuch);
- if (n == -1)
+ if (n == -1) {
+ evbuffer_unlock(buf);
return (-1);
- if (n == 0)
+ }
+ if (n == 0) {
+ evbuffer_unlock(buf);
return (0);
+ }
#else
n = ReadFile((HANDLE)fd, p, howmuch, &dwBytesRead, NULL);
- if (n == 0)
+ if (n == 0) {
+ evbuffer_unlock(buf);
return (-1);
- if (dwBytesRead == 0)
+ }
+ if (dwBytesRead == 0) {
+ evbuffer_lock(buf);
return (0);
+ }
n = dwBytesRead;
#endif
buf->off += n;
-
+
/* Tell someone about changes in this buffer */
if (buf->off != oldoff && buf->cb != NULL)
(*buf->cb)(buf, oldoff, buf->off, buf->cbarg);
+
+ evbuffer_unlock(buf);
return (n);
}
@@ -413,21 +533,33 @@
DWORD dwBytesWritten;
#endif
+ evbuffer_lock(buffer);
+
#ifndef WIN32
n = write(fd, buffer->buffer, buffer->off);
- if (n == -1)
+ if (n == -1) {
+ evbuffer_unlock(buffer);
return (-1);
- if (n == 0)
+ }
+ if (n == 0) {
+ evbuffer_unlock(buffer);
return (0);
+ }
#else
n = WriteFile((HANDLE)fd, buffer->buffer, buffer->off, &dwBytesWritten, NULL);
- if (n == 0)
+ if (n == 0) {
+ evbuffer_unlock(buffer);
return (-1);
- if (dwBytesWritten == 0)
+ }
+ if (dwBytesWritten == 0) {
+ evbuffer_unlock(buffer);
return (0);
+ }
n = dwBytesWritten;
#endif
evbuffer_drain(buffer, n);
+
+ evbuffer_unlock(buffer);
return (n);
}
@@ -435,19 +567,27 @@
u_char *
evbuffer_find(struct evbuffer *buffer, const u_char *what, size_t len)
{
- size_t remain = buffer->off;
- u_char *search = buffer->buffer;
+ size_t remain;
+ u_char *search;
u_char *p;
+
+ evbuffer_lock(buffer);
+
+ remain = buffer->off;
+ search = buffer->buffer;
while ((p = memchr(search, *what, remain)) != NULL) {
remain = buffer->off - (size_t)(search - buffer->buffer);
if (remain < len)
break;
- if (memcmp(p, what, len) == 0)
+ if (memcmp(p, what, len) == 0) {
+ evbuffer_unlock(buffer);
return (p);
+ }
search = p + 1;
}
+ evbuffer_unlock(buffer);
return (NULL);
}
@@ -455,52 +595,10 @@
void (*cb)(struct evbuffer *, size_t, size_t, void *),
void *cbarg)
{
+ evbuffer_lock(buffer);
+
buffer->cb = cb;
buffer->cbarg = cbarg;
-}
-
-/* Code to do setup / initialization of the evbuffer delayed callbacks
- * Initially copied from ev_signal_init, then modified.
- */
-void evbuffer_del_init(void)
-{
- /* create the delayed notifier socketpair */
- if (socketpair(AF_UNIX, SOCK_STREAM, 0, del_notifier) == -1)
- event_err(1, "%s: socketpair", __func__);
-
- /*
- * I don't understand what this does, and these declarations are in
- * signal.c, so it is commented out until I do.
- FD_CLOSEONEXEC(del_notifier[0]);
- FD_CLOSEONEXEC(del_notifier[1]);
- */
-
- /*
- * If I'm not mistaken that means that calls to write on
- * del_notifier[EVBUFFER_END] will be non-blocking
- */
- fcntl(del_notifier[EVBUFFER_END], F_SETFL, O_NONBLOCK);
-
- event_set(&evbuffer_del_event, del_notifier[DISPATCH_END], EV_READ,
- evsignal_cb, NULL);
- /* &ev_signal); */
- /* I can't find any documentation for this, what does it do? */
- evbuffer_del_event.ev_flags |= EVLIST_INTERNAL;
-}
-
-void evbuffer_set_del_read_cb(struct evbuffer *buffer,
- void (*del_read_event)(struct evbuffer *, void *),
- void *del_read_event_arg)
-{
- buffer->del_read_event = del_read_event;
- buffer->del_read_event_arg = del_read_event_arg;
-}
-
-void evbuffer_set_del_write_cb(struct evbuffer *buffer,
- void (*del_write_event)(struct evbuffer *, void *),
- void *del_write_event_arg)
-{
- buffer->del_write_event = del_write_event;
- buffer->del_write_event_arg = del_write_event_arg;
+ evbuffer_unlock(buffer);
}
\ No newline at end of file
Modified: libevent-urz/trunk/doc/plan.txt
===================================================================
--- libevent-urz/trunk/doc/plan.txt 2007-06-05 07:44:52 UTC (rev 10498)
+++ libevent-urz/trunk/doc/plan.txt 2007-06-05 08:57:15 UTC (rev 10499)
@@ -22,30 +22,44 @@
Coding:
Alter evbuffer (event.h:212) struct.
- - Mostly done. Place to store callbacks + their data, flags if callback should occur.
- - Needs a mutex/lock added. Look at compat files to see if there is a type we can use.
+ - Mutex added. Most of the data is stored in the bufferevent struct now, as per
+ IRC conversation with nickm.
Add a new event_add / event_del for delayed evbuffer events
(so the user doesn't have to know about the socketpair trick).
- - Done (evbuffer_set_del_read_cb and evbuffer_set_del_write_cb, buffer.c:492/500
+ - Done by sa_bufferevent_new, at sa_evbuffer.c:270
Alter the evbuffer functions (add, drain, etc) to use the mutex on the evbuffer struct,
-to set the evbuffer flags to let the main thread know callbacks are ready, and to call
+to set the bufferevent flags to let the main thread know callbacks are ready, and to call
a function to ensure the main event-loop is notified of events.
- - Not started
+ - Mutex work is done.
+ - Notification function written, but not yet used.
+ - bufferevent pending callback flags not yet set.
Write a short read event handler for the event_loop half of the socketpair which
simply reads all there is to be read (no blocking!) and calls the real read/write callbacks
depending on what it reads.
- - Not started
+ - Done
+
+Functions yet to modify:
+int sa_bufferevent_base_set(struct event_base *base, struct bufferevent *bufev);
+int sa_bufferevent_priority_set(struct bufferevent *bufev, int pri);
+void sa_bufferevent_free(struct bufferevent *bufev);
+int sa_bufferevent_write(struct bufferevent *bufev, void *data, size_t size);
+int sa_bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf);
+size_t sa_bufferevent_read(struct bufferevent *bufev, void *data, size_t size);
+int sa_bufferevent_enable(struct bufferevent *bufev, short event);
+int sa_bufferevent_disable(struct bufferevent *bufev, short event);
+void sa_bufferevent_settimeout(struct bufferevent *bufev,
+ int timeout_read, int timeout_write);
+
TODOs for tomorrow:
-Adding the mutex to evbuffer
-Make a list of all evbuffers so our event_loop handler can look through it.
-evbuffer create/destroy functions need to add/remove evbuffers from this list.
-Write event_loop handler.
+Modify functions above where needed.
+Write regression tests.
+Compile and Test.
+Write sample code
-
Testing:
Add test cases to libevent testing code... test/regress.c I believe
Modified: libevent-urz/trunk/event-internal.h
===================================================================
--- libevent-urz/trunk/event-internal.h 2007-06-05 07:44:52 UTC (rev 10498)
+++ libevent-urz/trunk/event-internal.h 2007-06-05 08:57:15 UTC (rev 10499)
@@ -27,6 +27,8 @@
#ifndef _EVENT_INTERNAL_H_
#define _EVENT_INTERNAL_H_
+#include "compat/sys/queue.h"
+
#ifdef __cplusplus
extern "C" {
#endif
@@ -49,6 +51,25 @@
RB_HEAD(event_tree, event) timetree;
};
+extern u_char del_notifier_status;
+#define NOTIFIER_UNINIT 0
+#define NOTIFIER_READY 1
+#define NOTIFIER_PENDING 2
+extern int del_notifier[2];
+#define EVBUFFER_END 0
+#define DISPATCH_END 1
+
+#ifdef HAVE_SETFD
+#define FD_CLOSEONEXEC(x) do { \
+ if (fcntl(x, F_SETFD, 1) == -1) \
+ event_warn("fcntl(%d, F_SETFD)", x); \
+} while (0)
+#else
+#define FD_CLOSEONEXEC(x)
+#endif
+
+LIST_HEAD(sa_evbuf_list_elem, struct sa_bufferevent) bufev_list_head;
+
#ifdef __cplusplus
}
#endif
Modified: libevent-urz/trunk/event.h
===================================================================
--- libevent-urz/trunk/event.h 2007-06-05 07:44:52 UTC (rev 10498)
+++ libevent-urz/trunk/event.h 2007-06-05 08:57:15 UTC (rev 10499)
@@ -36,6 +36,7 @@
#ifdef WIN32
#define WIN32_LEAN_AND_MEAN
#include <windows.h>
+#include <process.h>
#undef WIN32_LEAN_AND_MEAN
typedef unsigned char u_char;
typedef unsigned short u_short;
@@ -220,32 +221,12 @@
void (*cb)(struct evbuffer *, size_t, size_t, void *);
void *cbarg;
- /*
- * Callback Function pointers for 'delayed' callback -
- * That is, callbacks which occur at the next dispatch.
- *
- * The Read event is called after data has been writen in,
- * The Write event is called after data has been read out.
- */
- void (*del_read_event)(struct evbuffer *, void *);
- void *del_read_event_arg;
-
- void (*del_write_event)(struct evbuffer *, void *);
- void *del_write_event_arg;
-
- /* 1 if callback should occur, 0 if it should not */
- u_char del_read_event_set;
- u_char del_write_event_set;
+ #ifdef WIN32
+ HANDLE lock
+ #endif
+
};
-extern u_char del_notifier_status;
-#define NOTIFIER_UNINIT 0
-#define NOTIFIER_READY 1
-#define NOTIFIER_PENDING 2
-extern int del_notifier[2];
-#define EVBUFFER_END 0
-#define DISPATCH_END 1
-
/* Just for error reporting - use other constants otherwise */
#define EVBUFFER_READ 0x01
#define EVBUFFER_WRITE 0x02
@@ -296,6 +277,51 @@
void bufferevent_settimeout(struct bufferevent *bufev,
int timeout_read, int timeout_write);
+
+/*
+ * 'Semi-automatic' bufferevents - events work as normal but your code must
+ * write to the input buffer and read from the output buffer
+ */
+struct sa_bufferevent {
+ struct event ev_read;
+ struct event ev_write;
+
+ struct evbuffer *input;
+ struct evbuffer *output;
+
+ struct event_watermark wm_read;
+ struct event_watermark wm_write;
+
+ evbuffercb readcb;
+ evbuffercb writecb;
+ everrorcb errorcb;
+ void *cbarg;
+
+ int timeout_read; /* in seconds */
+ int timeout_write; /* in seconds */
+
+ short enabled; /* events that are currently enabled */
+
+ /* 1 if callback should occur, 0 if it should not */
+ u_char del_read_event_set;
+ u_char del_write_event_set;
+
+ LIST_ENTRY(struct sa_bufferevent) list_elem;
+};
+
+struct bufferevent *sa_bufferevent_new(evbuffercb readcb,
+ evbuffercb writecb, everrorcb errorcb, void *cbarg);
+int sa_bufferevent_base_set(struct event_base *base, struct bufferevent *bufev);
+int sa_bufferevent_priority_set(struct bufferevent *bufev, int pri);
+void sa_bufferevent_free(struct bufferevent *bufev);
+int sa_bufferevent_write(struct bufferevent *bufev, void *data, size_t size);
+int sa_bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf);
+size_t sa_bufferevent_read(struct bufferevent *bufev, void *data, size_t size);
+int sa_bufferevent_enable(struct bufferevent *bufev, short event);
+int sa_bufferevent_disable(struct bufferevent *bufev, short event);
+void sa_bufferevent_settimeout(struct bufferevent *bufev,
+ int timeout_read, int timeout_write);
+
#define EVBUFFER_LENGTH(x) (x)->off
#define EVBUFFER_DATA(x) (x)->buffer
#define EVBUFFER_INPUT(x) (x)->input
Modified: libevent-urz/trunk/signal.c
===================================================================
--- libevent-urz/trunk/signal.c 2007-06-05 07:44:52 UTC (rev 10498)
+++ libevent-urz/trunk/signal.c 2007-06-05 08:57:15 UTC (rev 10499)
@@ -49,6 +49,7 @@
#endif
#include "event.h"
+#include "event-internal.h"
#include "evsignal.h"
#include "log.h"
@@ -77,15 +78,6 @@
event_add(ev, NULL);
}
-#ifdef HAVE_SETFD
-#define FD_CLOSEONEXEC(x) do { \
- if (fcntl(x, F_SETFD, 1) == -1) \
- event_warn("fcntl(%d, F_SETFD)", x); \
-} while (0)
-#else
-#define FD_CLOSEONEXEC(x)
-#endif
-
void
evsignal_init(void)
{