[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[or-cvs] r6992: - fixed race condition in notification that was resulting in (bsockets/trunk)



Author: chiussi
Date: 2006-08-08 19:15:44 -0400 (Tue, 08 Aug 2006)
New Revision: 6992

Added:
   bsockets/trunk/bsocket.c
Modified:
   bsockets/trunk/Makefile
   bsockets/trunk/bsocket.h
   bsockets/trunk/callback.c
   bsockets/trunk/callback.h
   bsockets/trunk/event.c
   bsockets/trunk/io.c
   bsockets/trunk/io.h
   bsockets/trunk/list.c
   bsockets/trunk/misc.c
   bsockets/trunk/misc.h
   bsockets/trunk/select.c
   bsockets/trunk/socket.c
   bsockets/trunk/socket.h
   bsockets/trunk/sync.c
   bsockets/trunk/test.c
   bsockets/trunk/wait.c
Log:
- fixed race condition in notification that was resulting in deadlock when a callback was invoked while a event was being processed
- cleaned up memory leak on socket write
- removed some todos
- socketpair init complete
- initial work on socketpair send 



Modified: bsockets/trunk/Makefile
===================================================================
--- bsockets/trunk/Makefile	2006-08-08 22:56:26 UTC (rev 6991)
+++ bsockets/trunk/Makefile	2006-08-08 23:15:44 UTC (rev 6992)
@@ -1,7 +1,7 @@
 test_HEADERS= test.h
 bsock_HEADERS = list.h test.h socket.h bsocket.h misc.h
 
-sock_OBJS = list.o socket.o unix.o event.o sync.o select.o wait.o misc.o io.o callback.o
+sock_OBJS = list.o socket.o unix.o event.o sync.o select.o wait.o misc.o io.o callback.o bsocket.o
 test_OBJS = test.o ${sock_OBJS}
 
 BIN_SUFFIX = .exe

Added: bsockets/trunk/bsocket.c
===================================================================
--- bsockets/trunk/bsocket.c	2006-08-08 22:56:26 UTC (rev 6991)
+++ bsockets/trunk/bsocket.c	2006-08-08 23:15:44 UTC (rev 6992)
@@ -0,0 +1,23 @@
+#include <stdio.h>
+
+#include "bsocket.h"
+
+#ifdef USE_WIN32
+
+int bsocketpair(int domain, int type, int protocol, int fd[2]) {
+
+	return socketpair_win32(domain,type,protocol,fd,__GLOBAL_BSOCKET_ENV_);
+}
+
+#else
+
+int bsocketpair(int domain, int type, int protocol, int fd[2]) {
+
+	return socketpair(domain,type,protocol,fd);
+
+}
+
+#endif
+
+
+


Property changes on: bsockets/trunk/bsocket.c
___________________________________________________________________
Name: svn:executable
   + *

Modified: bsockets/trunk/bsocket.h
===================================================================
--- bsockets/trunk/bsocket.h	2006-08-08 22:56:26 UTC (rev 6991)
+++ bsockets/trunk/bsocket.h	2006-08-08 23:15:44 UTC (rev 6992)
@@ -1,6 +1,8 @@
 #ifndef _BSOCKET_H_
 #define _BSOCKET_H_
 
+#include <windows.h>
+
 #define USE_WIN32
 
 #ifdef USE_WIN32
@@ -53,7 +55,9 @@
 int getsockopt_win32(int, int, int, void*, int *, struct socket_env*);
 int send_win32(int, void*, size_t, int, struct socket_env*);
 int recv_win32(int, void*, size_t, int, struct socket_env*);
+int socketpair_win32(int, int, int, int*, struct socket_env*);
 
+
 #define bsocket_init(X)			socket_init_win32(X)
 #define bsocket_shutdown(X)		socket_cleanup_win32(X)
 
@@ -70,6 +74,9 @@
 #define bsend(A,B,C,D)			send_win32(A,B,C,D,__GLOBAL_BSOCKET_ENV_)
 #define brecv(A,B,C,D)			recv_win32(A,B,C,D,__GLOBAL_BSOCKET_ENV_);
 
+
+int bsocketpair(int,int,int,int*);
+
 #define F_SETFL					1
 
 #define	O_NONBLOCK				1
@@ -85,7 +92,7 @@
 #define MAX_SIMUL_CONNECT 		9
 #define MAX_SIMUL_LISTEN  		5
 
-#if (MAX_SIMUL_CONNECT + MAX_SIMUL_LISTEN + 2 > WSA_MAXIMUM_WAIT_EVENTS )
+#if ((MAX_SIMUL_CONNECT + MAX_SIMUL_LISTEN + 2 )> WSA_MAXIMUM_WAIT_EVENTS )
 #error "Reduce MAX_SIMUL_CONNECT or MAX_SIMUL_LISTEN"
 #endif
 

Modified: bsockets/trunk/callback.c
===================================================================
--- bsockets/trunk/callback.c	2006-08-08 22:56:26 UTC (rev 6991)
+++ bsockets/trunk/callback.c	2006-08-08 23:15:44 UTC (rev 6992)
@@ -29,37 +29,51 @@
 	struct bsocket *b;
 	struct _msg *msg;
 
+	int r;
+
+	//todo -- do we need to free wo when we're done?
 	msg = (struct _msg*) wo->hEvent;
 
 	if (err == 0) {
 
 		ASSERT(len >= 0);
 
-
 		ASSERT(len <= MSG_SIZE);
 
-		b = bsocket_get(msg->fd,AS_READ,msg->env);
+		if (len > 0) {
 
-		ASSERT(b != NULL);
-		ASSERT(b->fd == msg->fd);
+			b = bsocket_get(msg->fd,AS_READ,msg->env);
 
-		if ( list_enqueue(msg,b->in_q ) == 0) {
+			ASSERT(b != NULL);
+			ASSERT(b->fd == msg->fd);
 
 			msg->len = len;
-			printf("%d\n",msg->len);
-			fflush(stdout);
 
-			post_read(b,msg->env);
-			socket_raise(b->fd,IS_READABLE,TRUE,msg->env);
+			r = (int) list_enqueue(msg,b->in_q );
 
+			bsocket_release(msg->fd,AS_READ,msg->env);
+
+			if (r != 0) {
+
+				ASSERT(len > 0);
+
+				post_read(b,msg->env);
+
+				socket_raise(msg->fd,IS_READABLE,TRUE,msg->env);
+
+			} else {
+
+				//todo -- dealloc msg?
+				socket_exception(b->fd,errno,msg->env);
+
+			}
+
 		} else {
 
-			//todo -- dealloc msg?
-			socket_exception(b->fd,errno,msg->env);
+ 			socket_exception(msg->fd,0,msg->env);
+
 		}
 
-		bsocket_release(msg->fd,AS_READ,msg->env);
-
 	} else {
 
 		socket_exception(msg->fd,unixify_wsaerr(err),msg->env);
@@ -67,13 +81,12 @@
 
 }
 
-//caller must guarantee atomicity
+//caller must guarantee list atomicity
 void make_connected(struct bsocket *b, struct bsocket *partner, struct socket_env *env) {
 
 	b->connected = TRUE;
 	b->partner = partner;
 
-	//todo -- start reading
 	bsocket_raise(b,IS_READABLE,FALSE,env);
 	bsocket_raise(b,IS_WRITABLE,TRUE,env);
 
@@ -100,7 +113,7 @@
 			} else {
 
 				make_connected(l[fd],NULL,env);
-		}
+			}
 
 		}
 
@@ -108,4 +121,5 @@
 
 	}
 
+
 }

Modified: bsockets/trunk/callback.h
===================================================================
--- bsockets/trunk/callback.h	2006-08-08 22:56:26 UTC (rev 6991)
+++ bsockets/trunk/callback.h	2006-08-08 23:15:44 UTC (rev 6992)
@@ -6,5 +6,6 @@
 void CALLBACK callback_read (DWORD, DWORD, OVERLAPPED*, DWORD);
 
 void complete_connect(int, int, struct socket_env *);
+
 #endif
 

Modified: bsockets/trunk/event.c
===================================================================
--- bsockets/trunk/event.c	2006-08-08 22:56:26 UTC (rev 6991)
+++ bsockets/trunk/event.c	2006-08-08 23:15:44 UTC (rev 6992)
@@ -77,6 +77,10 @@
 
 	}
 
+	if (e->sig != NULL) {
+		//CloseHandle(e->sig);
+	}
+
 	free(e);
 
 }
@@ -128,6 +132,8 @@
 
 	CHECK(e != NULL,ENOMEM);
 
+	w =  NULL;
+
 	if (wait) {
 		w = CreateEvent(NULL,TRUE,FALSE,NULL);
 		CHECK_(w != NULL);
@@ -157,19 +163,21 @@
 
 	MUTEX_RELEASE(env->post_m);
 
-	if (w != NULL) {
+	if (w != NULL ) {
 		ASSERT( WaitForSingleObject(w,INFINITE) == WAIT_OBJECT_0);
 	}
 
 	fail:
 
+	if (w != NULL) {
+		ASSERT(CloseHandle(w));
+	}
 
 	if (out == -1) {
 		e->type = EV_PING;
 		event_free(e);
 	}
 
-
 	return out;
 
 }
@@ -266,6 +274,7 @@
 
 }
 
+
 struct _event *event_next(struct socket_env *env) {
 
 	struct _event *e;
@@ -346,6 +355,8 @@
 
 	struct _list *connect_q;
 
+	struct _msg *msg;
+
 	int i, _j;
 	int err;
 	int out;
@@ -426,6 +437,9 @@
 						//todo cancel any outstanding i/o (do we need to?)
 						ASSERT(closesocket(e->s) == 0);
 
+						//todo -- this is a major sync violation, but it feels safe, why?
+						SetEvent(env->b[e->fd]->exception_e);
+
 						event_respond(e,0,0);
 
 					break;
@@ -463,6 +477,7 @@
 
 						} else {
 
+							//todo -- errno should = 0?
 							event_respond(e,-1,errno);
 
 						}
@@ -517,16 +532,35 @@
 						wo = (WSAOVERLAPPED*)
 							malloc(sizeof(WSAOVERLAPPED));
 
+						if (e->s != INVALID_SOCKET) {
 
-						r = WSASend(
-								e->s,
-								(WSABUF*) (e->data),
-								1,
-								(DWORD*) &z,
-								0,
-								wo,
-								callback_write);
+							r = WSASend(
+									e->s,
+									(WSABUF*) (e->data),
+									1,
+									(DWORD*) &z,
+									0,
+									wo,
+									callback_write);
 
+						} else {
+
+							//todo -- assert is connected
+							msg = msg_new();
+
+							if (msg != NULL) {
+
+							} else{
+
+								r = -1;
+
+
+							}
+
+
+
+						}
+
 						if (r == 0) {
 
 							event_respond(e,z,0);
@@ -636,6 +670,16 @@
 
 	}
 
+	//make sure there are no outstanding events
+	ASSERT(list_dequeue(NULL,env->event_q) != 0);
+
+	for (i=1; i<WSA_MAXIMUM_WAIT_EVENTS; i++) {
+
+		//todo -- assert nothing is using these events
+		ASSERT(WSACloseEvent(we[i]));
+	}
+
+
 	fail:
 
 	if (connect_q != NULL)

Modified: bsockets/trunk/io.c
===================================================================
--- bsockets/trunk/io.c	2006-08-08 22:56:26 UTC (rev 6991)
+++ bsockets/trunk/io.c	2006-08-08 23:15:44 UTC (rev 6992)
@@ -61,6 +61,7 @@
 
 	CHECK(msg != NULL,0);
 
+	//todo we dont want to have to allocate a wo each bloody time
 	wo = (WSAOVERLAPPED*)
 		malloc(sizeof(WSAOVERLAPPED));
 
@@ -78,44 +79,23 @@
 
 	flags = 0;
 
-	while (resume) {
+	z = 0;
 
-		resume = FALSE;
+	r = WSARecv(
+			s,
+			&msg->buf_wb,
+			1,
+			(DWORD*) &z,
+			(DWORD*) &flags,
+			wo,
+			callback_read
+	);
 
-		z = 0;
+	if (r == SOCKET_ERROR) {
 
-		r = WSARecv(
-				s,
-				&msg->buf_wb,
-				1,
-				(DWORD*) &z,
-				(DWORD*) &flags,
-				wo,
-				callback_read
-		);
+		err = WSAGetLastError();
 
-		if (r != SOCKET_ERROR) {
-
-			if (z == 0) {
-
-				out = -1;
-				err = 0;
-				resume = FALSE;
-
-			} else {
-				resume = TRUE;
-
-			}
-
-
-		} else {
-
-			err = WSAGetLastError();
-
-			CHECK(err == WSA_IO_PENDING,err);
-
-		}
-
+		CHECK(err == WSA_IO_PENDING,err);
 	}
 
 	fail:
@@ -133,7 +113,7 @@
 }
 
 //put EOF in input stream
-//atomicity must have write permissions for socket, list ownership not needed (todo ?)
+//todo -- is atomicity needed here?
 void bsocket_eof(struct bsocket *b) {
 
 	//NULL data indicates EOF
@@ -161,6 +141,7 @@
 	CHECK(len > 0, EINVAL);
 
 	b = bsocket_get(fd,AS_READ,env);
+
 	CHECK(b != NULL,0);
 
 	if ( list_is_empty(b->in_q)) {
@@ -172,55 +153,61 @@
 		r = wait_until(b,IS_READABLE,env);
 
 		b = bsocket_get(fd,AS_READ,env);
+
 		CHECK(b != NULL,0);
 	}
 
-	space_left = len;
-	pos = buf;
-	data_read = 0;
+	space_left	= len;
+	pos			= buf;
+	data_read	= 0;
 
-	//todo -- if next message is EOF, return everything up until it
-
 	while (space_left) {
 
 		r = list_queuepeek(&msg,b->in_q);
-		ASSERT(r == 0);
 
-		if (msg != NULL) {
+		if (r == 0) {
 
-			copy_len = min(space_left,msg->len);
-			ASSERT(copy_len > 0);
+			if (msg != NULL) {
 
-			memcpy(pos,msg->data,copy_len);
+				copy_len = min(space_left,msg->len);
 
-			msg->len -= copy_len;
+				if (copy_len <= 0 ) {
+					printf("--%d\n",copy_len);
+					fflush(stdout);
+				}
 
-			ASSERT(msg->len >= 0);
+				ASSERT(copy_len > 0);
 
-			if (msg->len == 0 ) {
+				memcpy(pos,msg->data,copy_len);
 
-				msg_free(msg);
-				ASSERT(list_dequeue(NULL,b->in_q) == 0);
+				msg->len -= copy_len;
 
-			} else {
-				msg->data += copy_len;
+				ASSERT(msg->len >= 0);
 
-			}
+				if (msg->len == 0 ) {
+					//todo -- msg is being recycled when it shouldn't be
+					//msg_free(msg);
+					ASSERT(list_dequeue(NULL,b->in_q) == 0);
 
-			pos 		+= copy_len;
-			space_left	-= copy_len;
-			data_read	+= copy_len;
+				} else {
+					msg->data += copy_len;
+				}
 
+				pos 		+= copy_len;
+				space_left	-= copy_len;
+				data_read	+= copy_len;
 
-		} else {
+			} else {
 
-			space_left = 0;
+				space_left = 0;
 
-		}
+			}
 
+		} else {
+			socket_raise(fd,IS_READABLE,0,env);
 
+		}
 
-
 	}
 
 	out = data_read;
@@ -234,12 +221,8 @@
 	return out;
 }
 
-int send_socketpair_win32() {
 
-	return -1;
 
-}
-
 int send_win32 (int fd, void *buf, size_t len, int flags, struct socket_env *env) {
 
 	struct bsocket *b;
@@ -252,8 +235,6 @@
 	b = bsocket_get(fd,AS_WRITE,env);
 	CHECK(b != NULL,0);
 
-	//todo -- if socketpair, send_socketpair_win32
-
 	//todo -- is this good enough to guarantee the socket is writable?
 	if (b->blocking == FALSE) {
 		CHECK(b->write_buf == NULL,EAGAIN);
@@ -274,16 +255,16 @@
 
 	r = post_write(b,&z,&err,env);
 
-	CHECK(r == 0,0);
-
-	out = z;
-
-	if (out == -1)
+	if (r != 0) {
 		errno = err;
+		out = -1;
+	} else {
+		out = z;
+	}
 
 	fail:
 
-	if ( (out == -1) && (b->write_buf != NULL )) {
+	if ( (out == -1) && (b != NULL) && (b->write_buf != NULL )) {
 		free(b->write_buf);
 	}
 

Modified: bsockets/trunk/io.h
===================================================================
--- bsockets/trunk/io.h	2006-08-08 22:56:26 UTC (rev 6991)
+++ bsockets/trunk/io.h	2006-08-08 23:15:44 UTC (rev 6992)
@@ -24,4 +24,7 @@
 void bsocket_eof(struct bsocket *);
 void invoke_read(SOCKET s, int fd, struct socket_env *);
 
+struct _msg *msg_new();
+void msg_free(struct _msg *);
+
 #endif

Modified: bsockets/trunk/list.c
===================================================================
--- bsockets/trunk/list.c	2006-08-08 22:56:26 UTC (rev 6991)
+++ bsockets/trunk/list.c	2006-08-08 23:15:44 UTC (rev 6992)
@@ -11,8 +11,12 @@
 
 void list_data(void *data, struct _list_node *n) {
 		//todo -- this is ugly, why?
-		*((void**) data) = n->data;
+		if (data != NULL) {
 
+			*((void**) data) = n->data;
+
+		}
+
 }
 
 struct _list_node *list_enqueue(void *data, struct _list *list) {

Modified: bsockets/trunk/misc.c
===================================================================
--- bsockets/trunk/misc.c	2006-08-08 22:56:26 UTC (rev 6991)
+++ bsockets/trunk/misc.c	2006-08-08 23:15:44 UTC (rev 6992)
@@ -2,12 +2,8 @@
 
 #include "misc.h"
 
-///
-
-
 char _G_VERY_BAD_NEWS_[] = "\nSerious error, can't cope. Here is as much information as we can give...\n\tfile:%s line:%d errno:%d win32 err:%d\n";
 
-
 HANDLE make_thread (void *func, void *data) {
 
 	return CreateThread(NULL,0,func, data, 0, NULL);

Modified: bsockets/trunk/misc.h
===================================================================
--- bsockets/trunk/misc.h	2006-08-08 22:56:26 UTC (rev 6991)
+++ bsockets/trunk/misc.h	2006-08-08 23:15:44 UTC (rev 6992)
@@ -28,8 +28,9 @@
 #define MUTEX_ACQUIRE(X)	ASSERT(WaitForSingleObject(X,INFINITE) == WAIT_OBJECT_0)
 #define MUTEX_RELEASE(X)	ASSERT(ReleaseMutex(X))
 
+#define 	LA  //printf("list acquire: %s:%d\n",__func__,__LINE__); fflush(stdout);
+#define		LR	//printf("list release: %s:%d\n",__func__,__LINE__); fflush(stdout);
 
-
 int winsock_start();
 HANDLE make_thread(void*,void*);
 

Modified: bsockets/trunk/select.c
===================================================================
--- bsockets/trunk/select.c	2006-08-08 22:56:26 UTC (rev 6991)
+++ bsockets/trunk/select.c	2006-08-08 23:15:44 UTC (rev 6992)
@@ -7,6 +7,7 @@
 #include "misc.h"
 #include "io.h"
 
+//atomicity -- must own list to call this
 void bsocket_raise(struct bsocket *b, int type, int active, struct socket_env *env ) {
 
 	struct _wait_list *wl;
@@ -28,7 +29,6 @@
 		break;
 
 		default:
-
 			ASSERT(FALSE);
 		break;
 
@@ -48,6 +48,8 @@
 
 	struct bsocket **l;
 
+	LA;
+
 	l = bsocket_list_get(env);
 
 	if (l != NULL) {
@@ -56,6 +58,7 @@
 			bsocket_raise(l[fd],type,active,env);
 		}
 
+		LR;
 		bsocket_list_release(env);
 
 	}
@@ -66,6 +69,7 @@
 
 	struct bsocket **l;
 
+	LA
 	l = bsocket_list_get(env);
 
 	if (l != NULL) {
@@ -76,6 +80,7 @@
 
 		}
 
+		LR
 		bsocket_list_release(env);
 
 	}
@@ -86,6 +91,7 @@
 
 	struct bsocket **l;
 
+	LA
 	l = bsocket_list_get(env);
 
 	if (l != NULL) {
@@ -96,24 +102,29 @@
 			l[fd]->err = 0;
 
 		}
-
+		LR
 		bsocket_list_release(env);
 
 	}
+;
 
+
 }
 
 //atomicity is provided by caller -- must own list
 void bsocket_exception(struct bsocket *b, int err, struct socket_env *env) {
 
-//	ASSERT(err != 0);
-
 	if (b->s != INVALID_SOCKET) {
 
-		b->err		= err;
+		if (err) {
+			b->err		= err;
+		}
+
 		b->closed	= TRUE;
 		b->eof		= TRUE;
 
+		bsocket_eof(b);
+
 		bsocket_raise(b,IS_READABLE,TRUE,env);
 		bsocket_raise(b,IS_WRITABLE,TRUE,env);
 		bsocket_raise(b,IS_EXCEPTED,TRUE,env);
@@ -131,6 +142,8 @@
 
 	struct bsocket **l;
 
+	LA
+
 	l = bsocket_list_get(env);
 
 	if (l != NULL) {
@@ -139,6 +152,8 @@
 			bsocket_exception(l[fd],err,env);
 		}
 
+		LR
+
 		bsocket_list_release(env);
 
 	}
@@ -298,6 +313,7 @@
 	CHECK(b != NULL,ENOMEM);
 	CHECK(t != NULL,ENOMEM);
 
+	LA
 	l = bsocket_list_get(env);
 
 	CHECK(l != NULL,0);
@@ -318,6 +334,7 @@
 		}
 	}
 
+	LR
 	bsocket_list_release(env);
 	l = NULL;
 
@@ -389,7 +406,6 @@
 
 	}
 
-
 	fail:
 	if (b != NULL) {
 		free(b);
@@ -402,6 +418,3 @@
 	return out;
 
 }
-
-
-

Modified: bsockets/trunk/socket.c
===================================================================
--- bsockets/trunk/socket.c	2006-08-08 22:56:26 UTC (rev 6991)
+++ bsockets/trunk/socket.c	2006-08-08 23:15:44 UTC (rev 6992)
@@ -13,6 +13,7 @@
 #include "sync.h"
 #include "select.h"
 #include "misc.h"
+#include "callback.h"
 
 //support only turning off and on socket blocking, will add more features as needed
 int fcntl_win32(int fd, int cmd, long args, struct socket_env *env ) {
@@ -69,8 +70,8 @@
 
 	int out;
 
-
 	l = NULL;
+	LA
 	l = bsocket_list_get(env);
 
 	CHECK(l[fd] != NULL,EBADF);
@@ -85,13 +86,11 @@
 
 				case SO_ERROR:
 
-
 					CHECK(*optlen >= sizeof(int),EINVAL);
 
 					*((int*) optval) = l[fd]->err;
 					*optlen = sizeof(int);
 
-
 				break;
 
 				default:
@@ -109,7 +108,7 @@
 	}
 
 	fail:
-
+	LR
 	bsocket_list_release(env);
 
 	return out;
@@ -125,8 +124,10 @@
 
 	out = 0;
 
-	b = bsocket_get(fd,AS_RW,env);
 
+	b = bsocket_get(fd,AS_WRITE,env);
+
+
 	CHECK(b != NULL,0);
 	CHECK(b->connected == FALSE,EISCONN);
 	CHECK(b->connecting == FALSE,EALREADY);
@@ -164,7 +165,7 @@
 	fail:
 
 	if (b != NULL) {
-		bsocket_release(fd,AS_RW,env);
+		bsocket_release(fd,AS_WRITE,env);
 	}
 
 
@@ -172,6 +173,7 @@
 
 }
 
+
 void bsocket_free(struct bsocket *b) {
 
 	if (b->read_wl != NULL) {
@@ -189,55 +191,71 @@
 		wl_free(b->except_wl);
 	}
 
-}
+	if (b->write_m != NULL) {
+		CloseHandle(b->write_m);
+	}
 
+	if (b->read_m != NULL) {
+		CloseHandle(b->read_m);
+	}
 
-//todo handle SO_LINGER
-int bsocket_close(struct bsocket *b, struct socket_env *env) {
+	if (b->close_e != NULL) {
+		CloseHandle(b->close_e);
+	}
 
-	SetEvent(b->close_e);
+	if (b->exception_e != NULL) {
+		CloseHandle(b->exception_e);
+	}
 
-	bsocket_exception(b,ECLOSED,env);
+	//todo -- remove any messages waiting
 
-	bsocket_list_release(env);
+	list_free(b->in_q);
 
-	//manually aquire all socket mutexes (don't use atomic!)
-	MUTEX_ACQUIRE(b->read_m);
-	MUTEX_ACQUIRE(b->write_m);
+	free(b);
+
+}
+
+int close_win32(int fd, struct socket_env *env) {
+
+	struct bsocket *b;
+	int out;
+	int r;
+
+	//manually ensure that socket exists
+	CHECK(fd >= 0,EBADF);
+	CHECK(fd < MAX_BSOCKETS,EBADF);
+
 	MUTEX_ACQUIRE(env->list_m);
+	b = env->b[fd];
+	MUTEX_RELEASE(env->list_m);
 
-	if (!b->closed) {
-		ASSERT(post_close(b,env) == 0);
-	}
+	CHECK(b != NULL,EBADF);
 
-	//return fd
-	list_enqueue((void*) b->fd, env->free_q);
+	out = 0;
 
-	bsocket_free(b);
+	socket_exception(fd,ECLOSED,env);
 
-	env->b[b->fd] = NULL;
+	r = WaitForSingleObject(env->b[fd]->exception_e,INFINITE);
+	ASSERT(r == WAIT_OBJECT_0);
 
-	//todo if there is an outstanding error, that becomes the output of close
+	SetEvent(b->close_e);
 
-	return 0;
+	MUTEX_ACQUIRE(env->list_m);
+	MUTEX_ACQUIRE(env->b[fd]->read_m);
+	MUTEX_ACQUIRE(env->b[fd]->write_m);
 
-}
 
-//todo -- fix so that close_win32 does not require any new memory
-int close_win32(int fd, struct socket_env *env) {
+	//todo -- if this fails fd is lost forever (TS, or cope?)
+	list_enqueue((void*) b->fd, env->free_q);
 
-	struct bsocket **l;
+	env->b[fd] = NULL;
 
-	int out;
+	MUTEX_RELEASE(env->list_m);
 
-	l = bsocket_list_get(env);
-	ASSERT(l != NULL);
-	CHECK(l[fd] != NULL,EBADF);
+	bsocket_free(b);
 
-	out = bsocket_close(l[fd],env);
+	//todo out = last error
 
-	bsocket_list_release(env);
-
 	fail:
 
 	return out;
@@ -264,6 +282,7 @@
 	b->write_m 		= NULL;
 
 	b->close_e		= NULL;
+	b->exception_e	= NULL;
 
 	b->read_wl		= NULL;
 	b->write_wl		= NULL;
@@ -281,6 +300,9 @@
 	b->close_e = CreateEvent(NULL,TRUE,FALSE,NULL);
 	CHECK_(b->close_e != NULL);
 
+	b->exception_e = CreateEvent(NULL,TRUE,FALSE,NULL);
+	CHECK_(b->close_e != NULL);
+
 	b->err = 0;
 	b->connected = FALSE;
 	b->connecting = FALSE;
@@ -317,7 +339,79 @@
 	ASSERT(FALSE);
 }
 
+int socketpair_win32(int domain, int type, int protocol, int fd[2], struct socket_env *env){
 
+
+	struct bsocket *b[2];
+	struct bsocket **l;
+
+	int s[2];
+	int out;
+
+	b[0] = NULL;
+	b[1] = NULL;
+
+	b[0] = bsocket_new(INVALID_SOCKET);
+	CHECK(b[0] != NULL,0);
+
+	b[1] = bsocket_new(INVALID_SOCKET);
+	CHECK(b[0] != NULL,0);
+
+	s[0] = -1;
+	s[1] = -1;
+
+	l = NULL;
+
+	l = bsocket_list_get(env);
+	CHECK(l != NULL,0);
+
+	s[0] = claim_free_fd(b[0],env);
+	CHECK(s[0] != -1,0);
+
+	s[1] = claim_free_fd(b[1],env);
+	CHECK(s[1] != -1,0);
+
+	b[0]->fd = s[0];
+	b[1]->fd = s[1];
+
+	make_connected(b[0],b[1],env);
+	make_connected(b[1],b[0],env);
+
+	fd[0] = s[0];
+	fd[1] = s[1];
+	out = 0;
+
+	fail:
+
+	if (out == -1) {
+
+		if (s[0] != -1) {
+			release_fd(s[0],env);
+		}
+
+		if (s[1] != -1) {
+			release_fd(s[1],env);
+		}
+
+		if (b[0] != NULL) {
+			bsocket_free(b[0]);
+		}
+
+		if (b[1] != NULL) {
+			bsocket_free(b[1]);
+		}
+
+	}
+
+	if (l != NULL) {
+		bsocket_list_release(env);
+	}
+
+	return out;
+
+}
+
+
 int socket_win32(int af, int type, int protocol, struct socket_env *env) {
 
 	SOCKET s;
@@ -377,7 +471,6 @@
 
 	ASSERT( sizeof(int) == sizeof(DWORD));
 
-
 	if (env == NULL) {
 
 		env = (struct socket_env *)
@@ -413,7 +506,6 @@
 	env->list_m = CreateMutex(NULL,FALSE,NULL);
 	CHECK_(env->list_m != NULL);
 
-
 	env->wait_m = CreateMutex(NULL,FALSE,NULL);
 	CHECK_(env->wait_m != NULL);
 
@@ -436,8 +528,6 @@
 
 int socket_cleanup_win32(struct socket_env *env) {
 
-	struct bsocket **l;
-
 	int i;
 	int out;
 	int r;
@@ -446,6 +536,9 @@
 	out = 0;
 	was_null = FALSE;
 
+	//printf("==shutdown\n");
+	//fflush(stdout);
+
 	//todo -- i can smell a sync issue here
 
 	if (env == NULL) {
@@ -454,26 +547,18 @@
 
 	}
 
-	ASSERT(SetEvent(env->shutdown_e));
+	for (i=0; i<MAX_BSOCKETS; i++) {
+		close_win32(i,env);
+	}
 
+	LA
 	MUTEX_ACQUIRE(env->list_m);
 
-	l = env->b;
+	ASSERT(SetEvent(env->shutdown_e));
 
-	for (i=0; i<MAX_BSOCKETS; i++) {
-
-		if (l[i] != NULL) {
-
-			r = bsocket_close(l[i],env);
-
-		}
-
-	}
-
 	//todo replace with post_shutdown
 	ASSERT(event_post(NULL,EV_SHUTDOWN,NULL,NULL,NULL,FALSE,env) == 0);
 
-
 	r = WaitForSingleObject (env->event_t,1000);
 
 	ASSERT(r == WAIT_OBJECT_0);
@@ -486,13 +571,15 @@
 		ASSERT(env->b[i] == NULL);
 	}
 
-
 	ASSERT(CloseHandle(env->shutdown_e));
 	//ASSERT(CloseHandle(env->free_m));
 	ASSERT(CloseHandle(env->list_m));
 	ASSERT(CloseHandle(env->post_m));
 	ASSERT(CloseHandle(env->wait_m));
 
+	WSACloseEvent(env->post_e);
+	CloseHandle(env->event_t);
+
 	list_free(env->free_q);
 
 	free(env);

Modified: bsockets/trunk/socket.h
===================================================================
--- bsockets/trunk/socket.h	2006-08-08 22:56:26 UTC (rev 6991)
+++ bsockets/trunk/socket.h	2006-08-08 23:15:44 UTC (rev 6992)
@@ -26,7 +26,6 @@
 
 #define AE_GLOBAL		(AE_POST|AE_LIST|AE_FREE|AE_WAIT)
 
-
 struct bsocket {
 
 	SOCKET s;
@@ -35,6 +34,7 @@
 	HANDLE write_m;
 
 	HANDLE close_e;
+	HANDLE exception_e;
 
 	struct _wait_list	*read_wl;
 	struct _wait_list	*write_wl;
@@ -65,7 +65,6 @@
 	/*this is non-null if the socket is part of a socketpair*/
 	struct bsocket *partner;
 
-
 };
 
 struct socket_env {

Modified: bsockets/trunk/sync.c
===================================================================
--- bsockets/trunk/sync.c	2006-08-08 22:56:26 UTC (rev 6991)
+++ bsockets/trunk/sync.c	2006-08-08 23:15:44 UTC (rev 6992)
@@ -9,44 +9,45 @@
 
 int claim_free_fd(struct bsocket *b, struct socket_env *env) {
 
-	struct bsocket **l;
-
 	int out;
 
-	//we don't care about the output, this just gives us list
-	//ownership
-	l = bsocket_list_get(env);
-
-	CHECK(l != NULL, 0);
-
 	if (list_dequeue(&out,env->free_q) == 0) {
 
-		l[out] = b;
+		env->b[out] = b;
 
 	} else {
 		errno = EMFILE;
 		out = -1;
 	}
 
-	bsocket_list_release(env);
-
-	fail:
 	return out;
 }
 
+void release_fd(int fd, struct socket_env *env) {
 
+	struct bsocket **l;
+
+	ASSERT(fd >= 0);
+	ASSERT(fd < MAX_BSOCKETS);
+
+	l[fd] = NULL;
+
+
+}
+
+
 struct bsocket **bsocket_list_get(struct socket_env *env) {
 
 	if (atomic(-1,AE_LIST)) {
 		return NULL;
 	}
 
-
 	return env->b;
 
 }
 
 void bsocket_list_release(struct socket_env *env) {
+
 	release(-1,AE_LIST);
 }
 
@@ -65,6 +66,7 @@
 
 }
 
+//todo -- FOOL! don't own list while waiting on socket mutexes (change!!)
 struct bsocket *bsocket_get(int fd, int access, struct socket_env *env) {
 
 	struct bsocket **l;
@@ -81,21 +83,21 @@
 	}
 
 	out = NULL;
+	LA
 	l = bsocket_list_get(env);
 
 	if (l != NULL) {
+		//todo -- if l[fd] == NULL?
 
 		if (!atomic(fd,access)) {
 			out = l[fd];
 		}
 
+		LR
 		bsocket_list_release(env);
 
 	}
 
-//	printf("bsocket_get out\n");
-	//fflush(stdout);
-
 	return out;
 
 
@@ -231,6 +233,7 @@
 			case WAIT_ABANDONED:
 			//there is no acceptable reason for wfmo() to fail
 			case WAIT_FAILED:
+				printf("! %d\n",r);
 				ASSERT(FALSE);
 			break;
 
@@ -239,6 +242,7 @@
 				//not a critical issue but we want to know if we claimed any mutexes
 				//(we shouldn't have)
 				ASSERT(mutex_count_2 == mutex_count);
+
 				errno = ECLOSED;
 				mutex_count = 0;
 				out = -1;
@@ -252,7 +256,6 @@
 		}
 	}
 
-
 	if (out != -1) {
 
 		ASSERT(mutex_count == 1);

Modified: bsockets/trunk/test.c
===================================================================
--- bsockets/trunk/test.c	2006-08-08 22:56:26 UTC (rev 6991)
+++ bsockets/trunk/test.c	2006-08-08 23:15:44 UTC (rev 6992)
@@ -371,17 +371,11 @@
 
 	errno = 0;
 
-
-
-
 	TEST(bclose(fd[0]) == 0);
 	TEST(errno == 0);
 
-
-
 	f = bsocket(AF_INET,SOCK_STREAM,0);
 
-
 	TEST(f != -1);
 	TEST(errno == 0);
 
@@ -392,9 +386,6 @@
 
 	}
 
-
-
-
 	f = bsocket(AF_INET,SOCK_STREAM,0);
 
 	TEST(f != -1);
@@ -405,16 +396,17 @@
 
 }
 
-//todo -- think of a way to check if there is a memory leak
+//todo -- there is still a very small memory leak in socket init
 int test_socketinit() {
 
 	int fd;
-	int tests = 1;
+	int tests = 50;
 
 	while (tests--) {
 
 		TEST(bsocket_init(NULL) == 0 );
 
+
 		fd = bsocket(AF_INET,SOCK_STREAM,0);
 
 		TEST(fd >= 0);
@@ -424,11 +416,13 @@
 
 		TEST(bsocket_shutdown(NULL) == 0 );
 
+
 	}
 
 	return 0;
 }
 
+
 int test_createlist() {
 
 	struct _list *list;
@@ -1601,9 +1595,10 @@
 	int r;
 	int msg;
 
-
 	s = socket(AF_INET,SOCK_STREAM,0);
+	msg = 666;
 
+
 	SILENT_TEST(s != INVALID_SOCKET);
 
 	r = bind(s,(struct sockaddr*) &localhost,sizeof(localhost));
@@ -1612,17 +1607,18 @@
 	r = listen(s,10);
 	SILENT_TEST(r == 0);
 
-	c = accept(s,NULL,0);
-	SILENT_TEST(c != INVALID_SOCKET);
+	while (TRUE) {
+		c = accept(s,NULL,0);
+		SILENT_TEST(c != INVALID_SOCKET);
 
-	msg = 666;
-	Sleep(100);
+		r = send(c,(char*) &msg, sizeof(msg),0);
+		SILENT_TEST(r == sizeof(msg));
 
-	r = send(c,(char*) &msg, sizeof(msg),0);
-	SILENT_TEST(r == sizeof(msg));
+		closesocket(c);
 
+	}
+
 	closesocket(s);
-	closesocket(c);
 
 	return 0;
 
@@ -1635,29 +1631,89 @@
 	int fd;
 	int r;
 	int msg;
+	int tests;
 
-	TEST(bsocket_init(NULL) == 0);
+	int mega_tests = 10;
 
-	h = new_thread(test_simple_read_helper,NULL);
-	TEST(h != NULL);
+	while(mega_tests--) {
 
-	fd = bsocket(AF_INET,SOCK_STREAM,0);
-	TEST(fd == 0);
+		TEST(bsocket_init(NULL) == 0);
 
-	r = bconnect(fd,(struct sockaddr*) &localhost,sizeof(localhost));
+		h = new_thread(test_simple_read_helper,NULL);
+		TEST(h != NULL);
+		tests = 50;
+
+		while (tests--) {
+
+			fd = bsocket(AF_INET,SOCK_STREAM,0);
+			TEST(fd >= 0);
+
+			r = bconnect(fd,(struct sockaddr*) &localhost,sizeof(localhost));
+			TEST(r == 0);
+
+			r = brecv(fd,&msg,sizeof(msg),0);
+
+			TEST(r == sizeof(msg));
+			TEST(msg == 666);
+
+			r = brecv(fd,&msg,sizeof(msg),0);
+			TEST(r == 0);
+
+			r = brecv(fd,&msg,sizeof(msg),0);
+			TEST(r == 0);
+
+			TEST(bclose(fd) == 0);
+		}
+
+		TEST(bsocket_shutdown(NULL) == 0);
+		CloseHandle(h);
+	}
+
+	return 0;
+
+}
+
+int test_simple_socketpair() {
+
+
+	int s[2];
+	int r;
+
+	int msg = 666;
+
+	TEST(bsocket_init(NULL) == 0);
+
+	r = bsocketpair(AF_INET,SOCK_STREAM,0,s);
 	TEST(r == 0);
 
-	r = brecv(fd,&msg,sizeof(msg),0);
+	r = bsend(s[0],&msg,sizeof(msg),0);
+	printf("%d\n",r);
+	fflush(stdout);
 
 	TEST(r == sizeof(msg));
+
+	r = brecv(s[1],&msg,sizeof(msg),0);
+	TEST(r == sizeof(msg));
+
 	TEST(msg == 666);
 
 	TEST(bsocket_shutdown(NULL) == 0);
 
 	return 0;
+}
 
+int test_simple_socketpair_nb() {
+	TEST(FALSE);
+
+	return 0;
 }
 
+int test_multiread() {
+
+	return 0;
+
+}
+
 int test_verify_npp_usage() {
 	errno = 666;
 	return -1;
@@ -1692,6 +1748,7 @@
 	{test_waitclose,NULL,"Test if a wait object behaves properly on closure."},
 	{test_maxoverlapped,NULL,"Test if we can have a sufficient number of outstanding operations."},
 	{test_bselect_macros,NULL,"Testing bselect() macros."},
+	//todo close returns error if error has occured and not been collected
 //	{test_multieventselect,NULL,"Can we have more than one socket associated with a WSAevent."},
 	//{test_theory1,NULL,"Test theory #1 (WSAwfme does not block when socket closed."},
 	//todo -- find a way to reduce timeout
@@ -1704,7 +1761,9 @@
 	{test_multi_connect,NULL,"Test rapid firing server with connections."},
 	{test_simple_write,NULL,"Connect and send some data to a server."},
 	{test_simple_read,NULL,"Connect and read some data from a server."},
-	{test_verify_npp_usage,NULL,"Check that the NPP is not being used for large sends."},
+	{test_simple_socketpair,NULL,"Test simple transfer of data between socketpairs.\n"},
+	{test_simple_socketpair_nb,NULL,"\t(non-blocking"},
+	//{test_verify_npp_usage,NULL,"Check that the NPP is not being used for large sends."},
 	{NULL,NULL,NULL}
 };
 

Modified: bsockets/trunk/wait.c
===================================================================
--- bsockets/trunk/wait.c	2006-08-08 22:56:26 UTC (rev 6991)
+++ bsockets/trunk/wait.c	2006-08-08 23:15:44 UTC (rev 6992)
@@ -304,7 +304,7 @@
 void wl_deactivate(struct _wait_list *l) {
 
 	ASSERT(l->waiting == NULL);
-	ASSERT(l->active);
+	//ASSERT(l->active);
 
 	l->active = FALSE;
 
@@ -329,7 +329,7 @@
 	if (out != NULL) {
 
 		out->waiting = NULL;
-		out->active = 0;
+		out->active = FALSE;
 
 	} else {