[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[or-cvs] r7014: fixed up socketpair (bsockets/trunk)



Author: chiussi
Date: 2006-08-10 05:18:41 -0400 (Thu, 10 Aug 2006)
New Revision: 7014

Modified:
   bsockets/trunk/bsocket.c
   bsockets/trunk/callback.c
   bsockets/trunk/callback.h
   bsockets/trunk/event.c
   bsockets/trunk/event.h
   bsockets/trunk/io.c
   bsockets/trunk/io.h
   bsockets/trunk/select.c
   bsockets/trunk/socket.c
   bsockets/trunk/socket.h
   bsockets/trunk/sync.c
   bsockets/trunk/test.c
   bsockets/trunk/unix.h
Log:
fixed up socketpair



Modified: bsockets/trunk/bsocket.c
===================================================================
--- bsockets/trunk/bsocket.c	2006-08-10 09:14:57 UTC (rev 7013)
+++ bsockets/trunk/bsocket.c	2006-08-10 09:18:41 UTC (rev 7014)
@@ -2,7 +2,6 @@
 
 #include "bsocket.h"
 
-//
 
 #ifdef USE_WIN32
 

Modified: bsockets/trunk/callback.c
===================================================================
--- bsockets/trunk/callback.c	2006-08-10 09:14:57 UTC (rev 7013)
+++ bsockets/trunk/callback.c	2006-08-10 09:18:41 UTC (rev 7014)
@@ -18,6 +18,7 @@
 	//todo -- if write is incomplete, reiterate
 	//todo -- if error, raise exception
 
+
 }
 
 void CALLBACK callback_read(
@@ -82,7 +83,7 @@
 }
 
 //caller must guarantee list atomicity
-void make_connected(struct bsocket *b, struct bsocket *partner, struct socket_env *env) {
+void make_connected(struct bsocket *b, int partner, struct socket_env *env) {
 
 	b->connected = TRUE;
 	b->partner = partner;
@@ -90,8 +91,8 @@
 	bsocket_raise(b,IS_READABLE,FALSE,env);
 	bsocket_raise(b,IS_WRITABLE,TRUE,env);
 
-	post_read(b,env);
-
+	if (partner == -1)
+		post_read(b,env);
 }
 
 void complete_connect(int fd, int err, struct socket_env *env) {
@@ -112,7 +113,7 @@
 
 			} else {
 
-				make_connected(l[fd],NULL,env);
+				make_connected(l[fd],-1,env);
 			}
 
 		}

Modified: bsockets/trunk/callback.h
===================================================================
--- bsockets/trunk/callback.h	2006-08-10 09:14:57 UTC (rev 7013)
+++ bsockets/trunk/callback.h	2006-08-10 09:18:41 UTC (rev 7014)
@@ -1,11 +1,15 @@
 #ifndef _CALLBACK_H_
 #define _CALLBACK_H_
 
+#include "socket.h"
 
+
 void CALLBACK callback_write(DWORD, DWORD, OVERLAPPED*, DWORD);
 void CALLBACK callback_read (DWORD, DWORD, OVERLAPPED*, DWORD);
 
 void complete_connect(int, int, struct socket_env *);
 
+void make_connected(struct bsocket *, int, struct socket_env *);
+
 #endif
 

Modified: bsockets/trunk/event.c
===================================================================
--- bsockets/trunk/event.c	2006-08-10 09:14:57 UTC (rev 7013)
+++ bsockets/trunk/event.c	2006-08-10 09:18:41 UTC (rev 7014)
@@ -144,6 +144,7 @@
 	if (b != NULL) {
 		e->s = b->s;
 		e->fd = b->fd;
+		e->p_fd = b->partner;
 	}
 
 	e->type = type;
@@ -244,6 +245,7 @@
 	return out;
 }
 
+
 int post_write(struct bsocket *b, int *ret, int *err, struct socket_env *env) {
 
 	ASSERT(b->write_buf != NULL);
@@ -253,7 +255,6 @@
 
 	return  event_post(b,EV_WRITE,&b->write_wb,ret,err,TRUE,env);
 
-
 }
 
 int post_exception(struct bsocket *b, int err, struct socket_env *env) {
@@ -345,8 +346,9 @@
 	WSAEVENT we[WSA_MAXIMUM_WAIT_EVENTS];
 	struct __ev ev[WSA_MAXIMUM_WAIT_EVENTS];
 
+	WSABUF *wb;
+
 	WSANETWORKEVENTS ne;
-
 	WSAOVERLAPPED *wo;
 
 	struct _connect_data *c;
@@ -357,6 +359,8 @@
 
 	struct _msg *msg;
 
+	struct bsocket *p;
+
 	int i, _j;
 	int err;
 	int out;
@@ -422,6 +426,7 @@
 
 						z = 0;
 
+						//todo -- this is fishy
 						if ((i = connect_lookup[(SOCKET) ev->fd]) != -1) {
 							E_DELETE(i);
 							z++;
@@ -435,8 +440,11 @@
 						ASSERT(z <= 1);
 
 						//todo cancel any outstanding i/o (do we need to?)
-						ASSERT(closesocket(e->s) == 0);
+						if (e->p_fd == -1)
+							ASSERT(closesocket(e->s) == 0);
 
+
+
 						//todo -- this is a major sync violation, but it feels safe, why?
 						SetEvent(env->b[e->fd]->exception_e);
 
@@ -529,10 +537,10 @@
 
 					case EV_WRITE:
 
-						wo = (WSAOVERLAPPED*)
-							malloc(sizeof(WSAOVERLAPPED));
+						if (e->p_fd == -1) {
 
-						if (e->s != INVALID_SOCKET) {
+							wo = (WSAOVERLAPPED*)
+								malloc(sizeof(WSAOVERLAPPED));
 
 							r = WSASend(
 									e->s,
@@ -543,41 +551,92 @@
 									wo,
 									callback_write);
 
-						} else {
+							if (r == 0) {
 
-							//todo -- assert is connected
-							msg = msg_new();
+								event_respond(e,z,0);
 
-							if (msg != NULL) {
+							} else {
 
-							} else{
+								switch(err = WSAGetLastError()) {
 
-								r = -1;
+									case WSA_IO_PENDING:
+										event_respond(e,z,0);
+									break;
 
+									default:
+										event_respond(e,-1,unixify_wsaerr(err));
+									break;
+								}
 
 							}
 
+						} else {
 
+							//todo -- i hate this branching nonsense, come up with a
+							//local CHECK
 
-						}
+							//it's tempting to use callback_read, but it's a bad
+							//move. what if partner excepts inbetween calls?
+							wb = e->data;
 
-						if (r == 0) {
+							p = bsocket_get(e->p_fd,AS_READ,env);
+							i = 0;
 
-							event_respond(e,z,0);
+							if (p != NULL) {
 
-						} else {
+								while (wb->len) {
 
-							switch(err = WSAGetLastError()) {
+									ASSERT(wb->len > 0);
 
-								case WSA_IO_PENDING:
-									event_respond(e,z,0);
-								break;
+									msg = msg_new();
 
-								default:
-									event_respond(e,-1,unixify_wsaerr(err));
-								break;
+									if (msg != NULL) {
+
+										msg->len = min(MSG_SIZE,wb->len);
+										memcpy(msg->o_data,wb->buf,msg->len);
+
+										if (list_enqueue(msg,p->in_q) != NULL) {
+
+											i += msg->len;
+											wb->len -= msg->len;
+											wb->buf += msg->len;
+
+										} else {
+											wb->len = 0;
+										}
+
+									} else {
+
+										//nothing exception worthy has happened here,
+										//we are just low on memory
+										wb->len = 0;
+									}
+
+								}
+
+								ASSERT(wb->len == 0);
+
+								if (i > 0) {
+									socket_raise(e->p_fd,IS_READABLE,TRUE,env);
+								}
+
+								socket_raise(e->fd,IS_WRITABLE,TRUE,env);
+
+
+								bsocket_release(e->p_fd,AS_READ,env);
+
+								event_respond(e,i,0);
+
+
+							} else {
+
+								//todo -- what is a better errno?
+								event_respond(e,-1,errno);
+
 							}
 
+
+
 						}
 
 					break;

Modified: bsockets/trunk/event.h
===================================================================
--- bsockets/trunk/event.h	2006-08-10 09:14:57 UTC (rev 7013)
+++ bsockets/trunk/event.h	2006-08-10 09:18:41 UTC (rev 7014)
@@ -34,6 +34,8 @@
 	int type;
 	int fd;
 
+	int		p_fd;
+
 };
 
 struct _connect_data *connect_data_new(struct sockaddr*,size_t);

Modified: bsockets/trunk/io.c
===================================================================
--- bsockets/trunk/io.c	2006-08-10 09:14:57 UTC (rev 7013)
+++ bsockets/trunk/io.c	2006-08-10 09:18:41 UTC (rev 7014)
@@ -112,15 +112,6 @@
 
 }
 
-//put EOF in input stream
-//todo -- is atomicity needed here?
-void bsocket_eof(struct bsocket *b) {
-
-	//NULL data indicates EOF
-	list_enqueue(NULL,b->in_q);
-
-}
-
 //todo -- check is connected
 int recv_win32(int fd, void *buf, size_t len, int flags, struct socket_env *env) {
 
@@ -161,6 +152,7 @@
 	pos			= buf;
 	data_read	= 0;
 
+
 	while (space_left) {
 
 		r = list_queuepeek(&msg,b->in_q);
@@ -187,6 +179,8 @@
 				if (msg->len == 0 ) {
 					//todo -- msg is being recycled when it shouldn't be
 					//msg_free(msg);
+
+
 					ASSERT(list_dequeue(NULL,b->in_q) == 0);
 
 				} else {
@@ -199,12 +193,14 @@
 
 			} else {
 
+
+
 				space_left = 0;
 
 			}
 
 		} else {
-			socket_raise(fd,IS_READABLE,0,env);
+			socket_raise(fd,IS_READABLE,FALSE,env);
 
 		}
 
@@ -222,7 +218,6 @@
 }
 
 
-
 int send_win32 (int fd, void *buf, size_t len, int flags, struct socket_env *env) {
 
 	struct bsocket *b;
@@ -234,6 +229,8 @@
 
 	b = bsocket_get(fd,AS_WRITE,env);
 	CHECK(b != NULL,0);
+	//todo -- possible sync fault
+	CHECK(b->connected,ENOTCONN);
 
 	//todo -- is this good enough to guarantee the socket is writable?
 	if (b->blocking == FALSE) {
@@ -242,8 +239,7 @@
 
 	r = wait_until(b,IS_WRITABLE,env);
 
-	//todo
-	//bsocket_raise(b,IS_WRITABLE,0,env);
+	socket_raise(fd,IS_WRITABLE,0,env);
 
 	CHECK(r == 0,0);
 

Modified: bsockets/trunk/io.h
===================================================================
--- bsockets/trunk/io.h	2006-08-10 09:14:57 UTC (rev 7013)
+++ bsockets/trunk/io.h	2006-08-10 09:18:41 UTC (rev 7014)
@@ -21,7 +21,6 @@
 
 };
 
-void bsocket_eof(struct bsocket *);
 void invoke_read(SOCKET s, int fd, struct socket_env *);
 
 struct _msg *msg_new();

Modified: bsockets/trunk/select.c
===================================================================
--- bsockets/trunk/select.c	2006-08-10 09:14:57 UTC (rev 7013)
+++ bsockets/trunk/select.c	2006-08-10 09:18:41 UTC (rev 7014)
@@ -6,6 +6,7 @@
 #include "select.h"
 #include "misc.h"
 #include "io.h"
+#include "list.h"
 
 //atomicity -- must own list to call this
 void bsocket_raise(struct bsocket *b, int type, int active, struct socket_env *env ) {
@@ -106,24 +107,24 @@
 		bsocket_list_release(env);
 
 	}
-;
 
-
 }
 
-//atomicity is provided by caller -- must own list
+//requires list ownership to call
 void bsocket_exception(struct bsocket *b, int err, struct socket_env *env) {
 
-	if (b->s != INVALID_SOCKET) {
+	if (!b->excepted) {
 
 		if (err) {
 			b->err		= err;
 		}
 
-		b->closed	= TRUE;
+		b->excepted = TRUE;
+		b->connected = FALSE;
 		b->eof		= TRUE;
 
-		bsocket_eof(b);
+		//NULL represents EOF
+		list_enqueue(NULL,b->in_q);
 
 		bsocket_raise(b,IS_READABLE,TRUE,env);
 		bsocket_raise(b,IS_WRITABLE,TRUE,env);
@@ -133,9 +134,13 @@
 
 		b->s = INVALID_SOCKET;
 
+
+		if (b->partner != -1) {
+			bsocket_exception(env->b[b->partner],err,env);
+		}
+
 	}
 
-
 }
 
 void socket_exception(int fd, int err, struct socket_env *env) {

Modified: bsockets/trunk/socket.c
===================================================================
--- bsockets/trunk/socket.c	2006-08-10 09:14:57 UTC (rev 7013)
+++ bsockets/trunk/socket.c	2006-08-10 09:18:41 UTC (rev 7014)
@@ -244,7 +244,6 @@
 	MUTEX_ACQUIRE(env->b[fd]->read_m);
 	MUTEX_ACQUIRE(env->b[fd]->write_m);
 
-
 	//todo -- if this fails fd is lost forever (TS, or cope?)
 	list_enqueue((void*) b->fd, env->free_q);
 
@@ -289,7 +288,8 @@
 	b->except_wl	= NULL;
 
 	b->in_q			= NULL;
-	b->partner 		= NULL;
+	b->partner 		= -1;
+	b->excepted     = FALSE;
 
 	b->read_m = CreateMutex(NULL,FALSE,NULL);
 	CHECK_(b->read_m != NULL);
@@ -339,6 +339,7 @@
 	ASSERT(FALSE);
 }
 
+//todo -- make sure type is SOCK_STREAM
 int socketpair_win32(int domain, int type, int protocol, int fd[2], struct socket_env *env){
 
 
@@ -374,8 +375,8 @@
 	b[0]->fd = s[0];
 	b[1]->fd = s[1];
 
-	make_connected(b[0],b[1],env);
-	make_connected(b[1],b[0],env);
+	make_connected(b[0],b[1]->fd,env);
+	make_connected(b[1],b[0]->fd,env);
 
 	fd[0] = s[0];
 	fd[1] = s[1];
@@ -516,7 +517,6 @@
 	event_ping(env);
 
 	fail:
-	//todo fix memory leak on failure
 	if (h != NULL && out == -1) {
 		TerminateThread(h,1);
 		CloseHandle(h);
@@ -536,11 +536,7 @@
 	out = 0;
 	was_null = FALSE;
 
-	//printf("==shutdown\n");
-	//fflush(stdout);
-
 	//todo -- i can smell a sync issue here
-
 	if (env == NULL) {
 		env = __GLOBAL_BSOCKET_ENV_;
 		was_null = TRUE;
@@ -548,6 +544,7 @@
 	}
 
 	for (i=0; i<MAX_BSOCKETS; i++) {
+
 		close_win32(i,env);
 	}
 
@@ -563,7 +560,6 @@
 
 	ASSERT(r == WAIT_OBJECT_0);
 
-
 	//make sure we have cleaned up properly
 	//todo - get rid of this when we are confident it isn't a problem
 	for (i=0; i<MAX_BSOCKETS; i++) {

Modified: bsockets/trunk/socket.h
===================================================================
--- bsockets/trunk/socket.h	2006-08-10 09:14:57 UTC (rev 7013)
+++ bsockets/trunk/socket.h	2006-08-10 09:18:41 UTC (rev 7014)
@@ -49,6 +49,8 @@
 	int err;
 	int eof;
 
+	int excepted;
+
 	int fd;
 
 	/*writing business*/
@@ -62,8 +64,8 @@
 	/*reading business*/
 	struct _list *in_q;
 
-	/*this is non-null if the socket is part of a socketpair*/
-	struct bsocket *partner;
+	//-1 if not in a socketpair, >0 otherwise*/
+	int partner;
 
 };
 

Modified: bsockets/trunk/sync.c
===================================================================
--- bsockets/trunk/sync.c	2006-08-10 09:14:57 UTC (rev 7013)
+++ bsockets/trunk/sync.c	2006-08-10 09:18:41 UTC (rev 7014)
@@ -93,6 +93,7 @@
 			out = l[fd];
 		}
 
+
 		LR
 		bsocket_list_release(env);
 

Modified: bsockets/trunk/test.c
===================================================================
--- bsockets/trunk/test.c	2006-08-10 09:14:57 UTC (rev 7013)
+++ bsockets/trunk/test.c	2006-08-10 09:18:41 UTC (rev 7014)
@@ -1675,7 +1675,6 @@
 
 int test_simple_socketpair() {
 
-
 	int s[2];
 	int r;
 
@@ -1687,14 +1686,10 @@
 	TEST(r == 0);
 
 	r = bsend(s[0],&msg,sizeof(msg),0);
-	printf("%d\n",r);
-	fflush(stdout);
-
 	TEST(r == sizeof(msg));
 
 	r = brecv(s[1],&msg,sizeof(msg),0);
 	TEST(r == sizeof(msg));
-
 	TEST(msg == 666);
 
 	TEST(bsocket_shutdown(NULL) == 0);
@@ -1702,10 +1697,66 @@
 	return 0;
 }
 
-int test_simple_socketpair_nb() {
-	TEST(FALSE);
+int test_socketpair_exception() {
 
+	int s[2];
+	int r;
+
+	int i;
+
+	int msg_base=666;
+	int msg_count = 100;
+	int msg;
+
+	TEST(bsocket_init(NULL) == 0);
+
+
+	r = bsocketpair(AF_INET,SOCK_STREAM,0,s);
+
+	for (i=0; i<msg_count; i++) {
+		msg = msg_base+i;
+		r = bsend(s[0],&msg,sizeof(msg),0);
+		TEST(r == sizeof(msg));
+	}
+
+	for (i=0; i<msg_count; i++) {
+		msg = msg_base*2+i;
+		r = bsend(s[1],&msg,sizeof(msg),0);
+		TEST(r == sizeof(msg));
+	}
+
+	socket_exception(s[0],1, __GLOBAL_BSOCKET_ENV_);
+
+	for (i=0; i<msg_count; i++) {
+		r = brecv(s[1],&msg,sizeof(msg),0);
+		TEST(r == sizeof(msg));
+		TEST(msg == (msg_base+i));
+	}
+
+	for (i=0; i<msg_count; i++) {
+		r = brecv(s[0],&msg,sizeof(msg),0);
+		TEST(r == sizeof(msg));
+		TEST(msg == (msg_base*2)+i);
+	}
+
+	r = brecv(s[0],&msg,sizeof(msg),0);
+	TEST(r == 0);
+
+	r = brecv(s[1],&msg,sizeof(msg),0);
+	TEST(r == 0);
+
+	r = bsend(s[0],&msg,sizeof(msg),0);
+	TEST(r == -1);
+	TEST(errno == ENOTCONN);
+
+	r = bsend(s[1],&msg,sizeof(msg),0);
+	TEST(r == -1);
+	TEST(errno == ENOTCONN);
+
+	TEST(bsocket_shutdown(NULL) == 0);
+
 	return 0;
+
 }
 
 int test_multiread() {
@@ -1761,8 +1812,8 @@
 	{test_multi_connect,NULL,"Test rapid firing server with connections."},
 	{test_simple_write,NULL,"Connect and send some data to a server."},
 	{test_simple_read,NULL,"Connect and read some data from a server."},
-	{test_simple_socketpair,NULL,"Test simple transfer of data between socketpairs.\n"},
-	{test_simple_socketpair_nb,NULL,"\t(non-blocking"},
+	{test_simple_socketpair,NULL,"Test simple transfer of data between socketpairs."},
+	{test_socketpair_exception,NULL,"Test is socketpair exception behaves properly."},
 	//{test_verify_npp_usage,NULL,"Check that the NPP is not being used for large sends."},
 	{NULL,NULL,NULL}
 };

Modified: bsockets/trunk/unix.h
===================================================================
--- bsockets/trunk/unix.h	2006-08-10 09:14:57 UTC (rev 7013)
+++ bsockets/trunk/unix.h	2006-08-10 09:18:41 UTC (rev 7014)
@@ -65,4 +65,9 @@
 #define ECLOSED			(BSOCK_PRIVATE_ERRNO+14)
 #endif
 
+#ifndef ENOTCONN
+#define	ENOTCONN		(BSOCK_PRIVATE_ERRNO+15)
 #endif
+
+
+#endif