[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[or-cvs] r10499: Commit for 5th June Delayed callbacks mostly implemented. (in libevent-urz/trunk: . doc)



Author: Urz
Date: 2007-06-05 04:57:15 -0400 (Tue, 05 Jun 2007)
New Revision: 10499

Modified:
   libevent-urz/trunk/buffer.c
   libevent-urz/trunk/doc/plan.txt
   libevent-urz/trunk/event-internal.h
   libevent-urz/trunk/event.h
   libevent-urz/trunk/signal.c
Log:
Commit for 5th June
Delayed callbacks mostly implemented.

Modified: libevent-urz/trunk/buffer.c
===================================================================
--- libevent-urz/trunk/buffer.c	2007-06-05 07:44:52 UTC (rev 10498)
+++ libevent-urz/trunk/buffer.c	2007-06-05 08:57:15 UTC (rev 10499)
@@ -54,19 +54,68 @@
 #ifdef HAVE_UNISTD_H
 #include <unistd.h>
 #endif
+#ifdef WIN32
+#include <windows.h>
+#include <process.h>
+#endif
 
 #include "event.h"
+#include "event_internal.h"
 
-int del_notifier_status = NOTIFIER_UNINIT;
-int del_notifier[2];
-static struct event evbuffer_del_event;
+void evbuffer_lock(struct evbuffer *buffer)
+{
+#ifdef WIN32
+    /* Code borrowed and changed from Tor's compat.c */
+    /*
+     * Note (from http://msdn2.microsoft.com/en-us/library/ms684266.aspx):
+     * After a thread obtains ownership of a mutex, 
+     * it can specify the same mutex in repeated calls 
+     * to the wait-functions without blocking its execution. 
+     * This prevents a thread from deadlocking itself while 
+     * waiting for a mutex that it already owns. To release 
+     * its ownership under such circumstances, the thread must 
+     * call ReleaseMutex once for each time that the mutex 
+     * satisfied the conditions of a wait function.
+     *
+     * Thus, evbuffer_lock can be called (safely) repeatedly
+     * by the same thread, meaning we don't have problems with
+     * a thread deadlocking itself.
+     */
+    DWORD ret;
+    do {
+        ret = WaitForSingleObject(buffer->lock, INFINITE);
+        if(ret != WAIT_ABANDONED && ret != WAIT_OBJECT_0) {
+            printf("Failed to acquire mutex: %d",(int) GetLastError());
+        }
+    } while(ret != WAIT_ABANDONED && ret != WAIT_OBJECT_0);
+#endif
+}
 
+void evbuffer_unlock(struct evbuffer *buffer)
+{
+#ifdef WIN32
+    /* Code borrowed and changed from Tor's compat.c */
+    BOOL ret;
+    ret = 0;
+    do {
+        ret = ReleaseMutex(buffer->lock);
+        if(!ret) {
+            printf("Failed to release mutex: %d", (int) GetLastError());
+        }
+    } while(!ret);
+#endif
+}
+
 struct evbuffer *
 evbuffer_new(void)
 {
 	struct evbuffer *buffer;
 	
 	buffer = calloc(1, sizeof(struct evbuffer));
+    
+    #ifdef WIN32
+    buffer->lock = CreateMutex(NULL, FALSE, NULL);
+    #endif
 
 	return (buffer);
 }
@@ -97,6 +146,9 @@
 {
 	int res;
 
+    evbuffer_lock(outbuf);
+    evbuffer_lock(inbuf);
+    
 	/* Short cut for better performance */
 	if (outbuf->off == 0) {
 		struct evbuffer tmp;
@@ -117,6 +169,9 @@
 		if (oldoff && outbuf->cb != NULL)
 			(*outbuf->cb)(outbuf, 0, oldoff, outbuf->cbarg);
 		
+        evbuffer_unlock(outbuf);
+        evbuffer_unlock(inbuf);
+        
 		return (0);
 	}
 
@@ -126,6 +181,8 @@
 		evbuffer_drain(inbuf, inbuf->off);
 	}
 
+    evbuffer_unlock(outbuf);
+    evbuffer_unlock(inbuf);
 	return (res);
 }
 
@@ -137,6 +194,8 @@
 	size_t oldoff = buf->off;
 	int sz;
 	va_list aq;
+    
+    evbuffer_lock(buf);
 
 	for (;;) {
 		buffer = (char *)buf->buffer + buf->off;
@@ -157,14 +216,17 @@
 		va_end(aq);
 
 		if (sz == -1)
+            evbuffer_unlock(buf);
 			return (-1);
 		if (sz < space) {
 			buf->off += sz;
 			if (buf->cb != NULL)
 				(*buf->cb)(buf, oldoff, buf->off, buf->cbarg);
+            evbuffer_unlock(buf);
 			return (sz);
 		}
 		if (evbuffer_expand(buf, sz + 1) == -1)
+            evbuffer_unlock(buf);
 			return (-1);
 
 	}
@@ -176,6 +238,8 @@
 {
 	int res = -1;
 	va_list ap;
+    
+    /* No need to do lock / unlock here, it is done in evbuffer_add_vprintf */
 
 	va_start(ap, fmt);
 	res = evbuffer_add_vprintf(buf, fmt, ap);
@@ -190,11 +254,16 @@
 evbuffer_remove(struct evbuffer *buf, void *data, size_t datlen)
 {
 	size_t nread = datlen;
+    
+    evbuffer_lock(buf);
+    
 	if (nread >= buf->off)
 		nread = buf->off;
 
 	memcpy(data, buf->buffer, nread);
 	evbuffer_drain(buf, nread);
+    
+    evbuffer_unlock(buf);
 	
 	return (nread);
 }
@@ -207,10 +276,15 @@
 char *
 evbuffer_readline(struct evbuffer *buffer)
 {
-	u_char *data = EVBUFFER_DATA(buffer);
-	size_t len = EVBUFFER_LENGTH(buffer);
+	u_char *data;
+	size_t len;
 	char *line;
 	unsigned int i;
+    
+    evbuffer_lock(buffer);
+    
+    data = EVBUFFER_DATA(buffer);
+    len = EVBUFFER_LENGTH(buffer);
 
 	for (i = 0; i < len; i++) {
 		if (data[i] == '\r' || data[i] == '\n')
@@ -242,7 +316,9 @@
 	}
 
 	evbuffer_drain(buffer, i + 1);
-
+    
+    evbuffer_unlock(buffer);
+    
 	return (line);
 }
 
@@ -251,6 +327,14 @@
 static inline void
 evbuffer_align(struct evbuffer *buf)
 {
+    evbuffer_lock(buf);
+    evbuffer_align(buf);
+    evbuffer_unlock(buf);
+}
+
+static inline void
+evbuffer_align(struct evbuffer *buf)
+{
 	memmove(buf->orig_buffer, buf->buffer, buf->off);
 	buf->buffer = buf->orig_buffer;
 	buf->misalign = 0;
@@ -261,8 +345,12 @@
 int
 evbuffer_expand(struct evbuffer *buf, size_t datlen)
 {
-	size_t need = buf->misalign + buf->off + datlen;
+	size_t need;
 
+    evbuffer_lock(buf);
+    
+    need = buf->misalign + buf->off + datlen;
+
 	/* If we can fit all the data, then we don't have to do anything */
 	if (buf->totallen >= need)
 		return (0);
@@ -284,25 +372,35 @@
 
 		if (buf->orig_buffer != buf->buffer)
 			evbuffer_align(buf);
-		if ((newbuf = realloc(buf->buffer, length)) == NULL)
+		if ((newbuf = realloc(buf->buffer, length)) == NULL) {
+            evbuffer_unlock(buf);
 			return (-1);
+        }
 
 		buf->orig_buffer = buf->buffer = newbuf;
 		buf->totallen = length;
 	}
 
+    evbuffer_unlock(buf);
 	return (0);
 }
 
 int
 evbuffer_add(struct evbuffer *buf, const void *data, size_t datlen)
 {
-	size_t need = buf->misalign + buf->off + datlen;
-	size_t oldoff = buf->off;
+	size_t need;
+	size_t oldoff;
+    
+    evbuffer_lock(buf);
+    
+    need = buf->misalign + buf->off + datlen;
+    oldoff = buf->off;
 
 	if (buf->totallen < need) {
-		if (evbuffer_expand(buf, datlen) == -1)
+		if (evbuffer_expand(buf, datlen) == -1) {
+            evbuffer_unlock(buf);
 			return (-1);
+        }
 	}
 
 	memcpy(buf->buffer + buf->off, data, datlen);
@@ -311,13 +409,18 @@
 	if (datlen && buf->cb != NULL)
 		(*buf->cb)(buf, oldoff, buf->off, buf->cbarg);
 
+    evbuffer_unlock(buf);
 	return (0);
 }
 
 void
 evbuffer_drain(struct evbuffer *buf, size_t len)
 {
-	size_t oldoff = buf->off;
+	size_t oldoff;
+    
+    evbuffer_lock(buf);
+    
+    oldoff = buf->off;
 
 	if (len >= buf->off) {
 		buf->off = 0;
@@ -335,6 +438,8 @@
 	/* Tell someone about changes in this buffer */
 	if (buf->off != oldoff && buf->cb != NULL)
 		(*buf->cb)(buf, oldoff, buf->off, buf->cbarg);
+    
+    evbuffer_unlock(buf);
 
 }
 
@@ -348,11 +453,14 @@
 evbuffer_read(struct evbuffer *buf, int fd, int howmuch)
 {
 	u_char *p;
-	size_t oldoff = buf->off;
+	size_t oldoff;
 	int n = EVBUFFER_MAX_READ;
 #ifdef WIN32
 	DWORD dwBytesRead;
 #endif
+    evbuffer_lock(buf);
+    
+    oldoff = buf->off;
 
 #ifdef FIONREAD
 	if (ioctl(fd, FIONREAD, &n) == -1 || n == 0) {
@@ -375,32 +483,44 @@
 		howmuch = n;
 
 	/* If we don't have FIONREAD, we might waste some space here */
-	if (evbuffer_expand(buf, howmuch) == -1)
+	if (evbuffer_expand(buf, howmuch) == -1) {
+        evbuffer_unlock(buf);
 		return (-1);
+    }
 
 	/* We can append new data at this point */
 	p = buf->buffer + buf->off;
 
 #ifndef WIN32
 	n = read(fd, p, howmuch);
-	if (n == -1)
+	if (n == -1) {
+        evbuffer_unlock(buf);
 		return (-1);
-	if (n == 0)
+    }
+	if (n == 0) {
+        evbuffer_unlock(buf);
 		return (0);
+    }
 #else
 	n = ReadFile((HANDLE)fd, p, howmuch, &dwBytesRead, NULL);
-	if (n == 0)
+	if (n == 0) {
+        evbuffer_unlock(buf);
 		return (-1);
-	if (dwBytesRead == 0)
+    }
+	if (dwBytesRead == 0) {
+        evbuffer_lock(buf);
 		return (0);
+    }
 	n = dwBytesRead;
 #endif
 
 	buf->off += n;
-
+    
 	/* Tell someone about changes in this buffer */
 	if (buf->off != oldoff && buf->cb != NULL)
 		(*buf->cb)(buf, oldoff, buf->off, buf->cbarg);
+    
+    evbuffer_unlock(buf);
 
 	return (n);
 }
@@ -413,21 +533,33 @@
 	DWORD dwBytesWritten;
 #endif
 
+    evbuffer_lock(buffer);
+    
 #ifndef WIN32
 	n = write(fd, buffer->buffer, buffer->off);
-	if (n == -1)
+	if (n == -1) {
+        evbuffer_unlock(buffer);
 		return (-1);
-	if (n == 0)
+    }
+	if (n == 0) {
+        evbuffer_unlock(buffer);
 		return (0);
+    }
 #else
 	n = WriteFile((HANDLE)fd, buffer->buffer, buffer->off, &dwBytesWritten, NULL);
-	if (n == 0)
+	if (n == 0) {
+        evbuffer_unlock(buffer);
 		return (-1);
-	if (dwBytesWritten == 0)
+    }
+	if (dwBytesWritten == 0) {
+        evbuffer_unlock(buffer);
 		return (0);
+    }
 	n = dwBytesWritten;
 #endif
 	evbuffer_drain(buffer, n);
+    
+    evbuffer_unlock(buffer);
 
 	return (n);
 }
@@ -435,19 +567,27 @@
 u_char *
 evbuffer_find(struct evbuffer *buffer, const u_char *what, size_t len)
 {
-	size_t remain = buffer->off;
-	u_char *search = buffer->buffer;
+	size_t remain;
+	u_char *search;
 	u_char *p;
+    
+    evbuffer_lock(buffer);
+    
+    remain = buffer->off;
+    search = buffer->buffer;
 
 	while ((p = memchr(search, *what, remain)) != NULL) {
 		remain = buffer->off - (size_t)(search - buffer->buffer);
 		if (remain < len)
 			break;
-		if (memcmp(p, what, len) == 0)
+		if (memcmp(p, what, len) == 0) {
+            evbuffer_unlock(buffer);
 			return (p);
+        }
 		search = p + 1;
 	}
 
+    evbuffer_unlock(buffer);
 	return (NULL);
 }
 
@@ -455,52 +595,10 @@
     void (*cb)(struct evbuffer *, size_t, size_t, void *),
     void *cbarg)
 {
+    evbuffer_lock(buffer);
+    
 	buffer->cb = cb;
 	buffer->cbarg = cbarg;
-}
-
-/* Code to do setup / initialization of the evbuffer delayed callbacks 
- * Initially copied from ev_signal_init, then modified.
- */
-void evbuffer_del_init(void) 
-{
-    /* create the delayed notifier socketpair */
-	if (socketpair(AF_UNIX, SOCK_STREAM, 0, del_notifier) == -1)
-		event_err(1, "%s: socketpair", __func__);
-
-    /* 
-     * I don't understand what this does, and these declarations are in
-     * signal.c, so it is commented out until I do.
-	FD_CLOSEONEXEC(del_notifier[0]);
-	FD_CLOSEONEXEC(del_notifier[1]);
-     */
-
-    /* 
-     * If I'm not mistaken that means that calls to write on
-     * del_notifier[EVBUFFER_END] will be non-blocking
-     */
-	fcntl(del_notifier[EVBUFFER_END], F_SETFL, O_NONBLOCK);
-
     
-	event_set(&evbuffer_del_event, del_notifier[DISPATCH_END], EV_READ,
-	    evsignal_cb, NULL);
-        /* &ev_signal); */
-    /* I can't find any documentation for this, what does it do? */
-	evbuffer_del_event.ev_flags |= EVLIST_INTERNAL;
-}
-
-void evbuffer_set_del_read_cb(struct evbuffer *buffer,
-    void (*del_read_event)(struct evbuffer *, void *),
-    void *del_read_event_arg) 
-{
-    buffer->del_read_event = del_read_event;
-    buffer->del_read_event_arg = del_read_event_arg;
-}
-
-void evbuffer_set_del_write_cb(struct evbuffer *buffer,
-    void (*del_write_event)(struct evbuffer *, void *),
-    void *del_write_event_arg) 
-{
-    buffer->del_write_event = del_write_event;
-    buffer->del_write_event_arg = del_write_event_arg;
+    evbuffer_unlock(buffer);
 }
\ No newline at end of file

Modified: libevent-urz/trunk/doc/plan.txt
===================================================================
--- libevent-urz/trunk/doc/plan.txt	2007-06-05 07:44:52 UTC (rev 10498)
+++ libevent-urz/trunk/doc/plan.txt	2007-06-05 08:57:15 UTC (rev 10499)
@@ -22,30 +22,44 @@
 
 Coding:
 Alter evbuffer (event.h:212) struct.
-    - Mostly done. Place to store callbacks + their data, flags if callback should occur.
-    - Needs a mutex/lock added. Look at compat files to see if there is a type we can use.
+    - Mutex added. Most of the data is stored in the bufferevent struct now, as per
+        IRC conversation with nickm.
     
 Add a new event_add / event_del for delayed evbuffer events
 (so the user doesn't have to know about the socketpair trick).
-    - Done (evbuffer_set_del_read_cb and evbuffer_set_del_write_cb, buffer.c:492/500
+    - Done by sa_bufferevent_new, at sa_evbuffer.c:270
 
 Alter the evbuffer functions (add, drain, etc) to use the mutex on the evbuffer struct,
-to set the evbuffer flags to let the main thread know callbacks are ready, and to call
+to set the bufferevent flags to let the main thread know callbacks are ready, and to call
 a function to ensure the main event-loop is notified of events.
-    - Not started
+    - Mutex work is done.
+    - Notification function written, but not yet used.
+    - bufferevent pending callback flags not yet set.
     
 Write a short read event handler for the event_loop half of the socketpair which
 simply reads all there is to be read (no blocking!) and calls the real read/write callbacks
 depending on what it reads.
-    - Not started
+    - Done
 
+    
+Functions yet to modify:
+int sa_bufferevent_base_set(struct event_base *base, struct bufferevent *bufev);
+int sa_bufferevent_priority_set(struct bufferevent *bufev, int pri);
+void sa_bufferevent_free(struct bufferevent *bufev);
+int sa_bufferevent_write(struct bufferevent *bufev, void *data, size_t size);
+int sa_bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf);
+size_t sa_bufferevent_read(struct bufferevent *bufev, void *data, size_t size);
+int sa_bufferevent_enable(struct bufferevent *bufev, short event);
+int sa_bufferevent_disable(struct bufferevent *bufev, short event);
+void sa_bufferevent_settimeout(struct bufferevent *bufev,
+    int timeout_read, int timeout_write);
+
 TODOs for tomorrow:
-Adding the mutex to evbuffer
-Make a list of all evbuffers so our event_loop handler can look through it. 
-evbuffer create/destroy functions need to add/remove evbuffers from this list.
-Write event_loop handler.
+Modify functions above where needed.
+Write regression tests.
+Compile and Test.
+Write sample code
 
-
 Testing:
 Add test cases to libevent testing code... test/regress.c I believe
 

Modified: libevent-urz/trunk/event-internal.h
===================================================================
--- libevent-urz/trunk/event-internal.h	2007-06-05 07:44:52 UTC (rev 10498)
+++ libevent-urz/trunk/event-internal.h	2007-06-05 08:57:15 UTC (rev 10499)
@@ -27,6 +27,8 @@
 #ifndef _EVENT_INTERNAL_H_
 #define _EVENT_INTERNAL_H_
 
+#include "compat/sys/queue.h"
+
 #ifdef __cplusplus
 extern "C" {
 #endif
@@ -49,6 +51,25 @@
 	RB_HEAD(event_tree, event) timetree;
 };
 
+extern u_char del_notifier_status;
+#define NOTIFIER_UNINIT 0
+#define NOTIFIER_READY 1
+#define NOTIFIER_PENDING 2
+extern int del_notifier[2];
+#define EVBUFFER_END 0
+#define DISPATCH_END 1
+
+#ifdef HAVE_SETFD
+#define FD_CLOSEONEXEC(x) do { \
+        if (fcntl(x, F_SETFD, 1) == -1) \
+                event_warn("fcntl(%d, F_SETFD)", x); \
+} while (0)
+#else
+#define FD_CLOSEONEXEC(x)
+#endif
+
+LIST_HEAD(sa_evbuf_list_elem, struct sa_bufferevent) bufev_list_head;
+
 #ifdef __cplusplus
 }
 #endif

Modified: libevent-urz/trunk/event.h
===================================================================
--- libevent-urz/trunk/event.h	2007-06-05 07:44:52 UTC (rev 10498)
+++ libevent-urz/trunk/event.h	2007-06-05 08:57:15 UTC (rev 10499)
@@ -36,6 +36,7 @@
 #ifdef WIN32
 #define WIN32_LEAN_AND_MEAN
 #include <windows.h>
+#include <process.h>
 #undef WIN32_LEAN_AND_MEAN
 typedef unsigned char u_char;
 typedef unsigned short u_short;
@@ -220,32 +221,12 @@
 	void (*cb)(struct evbuffer *, size_t, size_t, void *);
 	void *cbarg;
     
-    /* 
-     * Callback Function pointers for 'delayed' callback -
-     * That is, callbacks which occur at the next dispatch.
-     * 
-     * The Read event is called after data has been writen in,
-     * The Write event is called after data has been read out.
-     */
-    void (*del_read_event)(struct evbuffer *, void *);
-    void *del_read_event_arg;
-    
-    void (*del_write_event)(struct evbuffer *, void *);
-    void *del_write_event_arg;
-    
-    /* 1 if callback should occur, 0 if it should not */
-    u_char del_read_event_set;
-    u_char del_write_event_set;
+    #ifdef WIN32
+    HANDLE lock
+    #endif
+  
 };
 
-extern u_char del_notifier_status;
-#define NOTIFIER_UNINIT 0
-#define NOTIFIER_READY 1
-#define NOTIFIER_PENDING 2
-extern int del_notifier[2];
-#define EVBUFFER_END 0
-#define DISPATCH_END 1
-
 /* Just for error reporting - use other constants otherwise */
 #define EVBUFFER_READ		0x01
 #define EVBUFFER_WRITE		0x02
@@ -296,6 +277,51 @@
 void bufferevent_settimeout(struct bufferevent *bufev,
     int timeout_read, int timeout_write);
 
+    
+/* 
+ * 'Semi-automatic' bufferevents - events work as normal but your code must
+ * write to the input buffer and read from the output buffer
+ */
+struct sa_bufferevent {
+	struct event ev_read;
+	struct event ev_write;
+
+	struct evbuffer *input;
+	struct evbuffer *output;
+
+	struct event_watermark wm_read;
+	struct event_watermark wm_write;
+
+	evbuffercb readcb;
+	evbuffercb writecb;
+	everrorcb errorcb;
+	void *cbarg;
+
+	int timeout_read;	/* in seconds */
+	int timeout_write;	/* in seconds */
+
+	short enabled;	/* events that are currently enabled */
+    
+    /* 1 if callback should occur, 0 if it should not */
+    u_char del_read_event_set;
+    u_char del_write_event_set;
+    
+    LIST_ENTRY(struct sa_bufferevent) list_elem;	
+};
+
+struct bufferevent *sa_bufferevent_new(evbuffercb readcb, 
+    evbuffercb writecb, everrorcb errorcb, void *cbarg);
+int sa_bufferevent_base_set(struct event_base *base, struct bufferevent *bufev);
+int sa_bufferevent_priority_set(struct bufferevent *bufev, int pri);
+void sa_bufferevent_free(struct bufferevent *bufev);
+int sa_bufferevent_write(struct bufferevent *bufev, void *data, size_t size);
+int sa_bufferevent_write_buffer(struct bufferevent *bufev, struct evbuffer *buf);
+size_t sa_bufferevent_read(struct bufferevent *bufev, void *data, size_t size);
+int sa_bufferevent_enable(struct bufferevent *bufev, short event);
+int sa_bufferevent_disable(struct bufferevent *bufev, short event);
+void sa_bufferevent_settimeout(struct bufferevent *bufev,
+    int timeout_read, int timeout_write);
+
 #define EVBUFFER_LENGTH(x)	(x)->off
 #define EVBUFFER_DATA(x)	(x)->buffer
 #define EVBUFFER_INPUT(x)	(x)->input

Modified: libevent-urz/trunk/signal.c
===================================================================
--- libevent-urz/trunk/signal.c	2007-06-05 07:44:52 UTC (rev 10498)
+++ libevent-urz/trunk/signal.c	2007-06-05 08:57:15 UTC (rev 10499)
@@ -49,6 +49,7 @@
 #endif
 
 #include "event.h"
+#include "event-internal.h"
 #include "evsignal.h"
 #include "log.h"
 
@@ -77,15 +78,6 @@
 	event_add(ev, NULL);
 }
 
-#ifdef HAVE_SETFD
-#define FD_CLOSEONEXEC(x) do { \
-        if (fcntl(x, F_SETFD, 1) == -1) \
-                event_warn("fcntl(%d, F_SETFD)", x); \
-} while (0)
-#else
-#define FD_CLOSEONEXEC(x)
-#endif
-
 void
 evsignal_init(void)
 {