[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[or-cvs] r7014: fixed up socketpair (bsockets/trunk)
Author: chiussi
Date: 2006-08-10 05:18:41 -0400 (Thu, 10 Aug 2006)
New Revision: 7014
Modified:
bsockets/trunk/bsocket.c
bsockets/trunk/callback.c
bsockets/trunk/callback.h
bsockets/trunk/event.c
bsockets/trunk/event.h
bsockets/trunk/io.c
bsockets/trunk/io.h
bsockets/trunk/select.c
bsockets/trunk/socket.c
bsockets/trunk/socket.h
bsockets/trunk/sync.c
bsockets/trunk/test.c
bsockets/trunk/unix.h
Log:
fixed up socketpair
Modified: bsockets/trunk/bsocket.c
===================================================================
--- bsockets/trunk/bsocket.c 2006-08-10 09:14:57 UTC (rev 7013)
+++ bsockets/trunk/bsocket.c 2006-08-10 09:18:41 UTC (rev 7014)
@@ -2,7 +2,6 @@
#include "bsocket.h"
-//
#ifdef USE_WIN32
Modified: bsockets/trunk/callback.c
===================================================================
--- bsockets/trunk/callback.c 2006-08-10 09:14:57 UTC (rev 7013)
+++ bsockets/trunk/callback.c 2006-08-10 09:18:41 UTC (rev 7014)
@@ -18,6 +18,7 @@
//todo -- if write is incomplete, reiterate
//todo -- if error, raise exception
+
}
void CALLBACK callback_read(
@@ -82,7 +83,7 @@
}
//caller must guarantee list atomicity
-void make_connected(struct bsocket *b, struct bsocket *partner, struct socket_env *env) {
+void make_connected(struct bsocket *b, int partner, struct socket_env *env) {
b->connected = TRUE;
b->partner = partner;
@@ -90,8 +91,8 @@
bsocket_raise(b,IS_READABLE,FALSE,env);
bsocket_raise(b,IS_WRITABLE,TRUE,env);
- post_read(b,env);
-
+ if (partner == -1)
+ post_read(b,env);
}
void complete_connect(int fd, int err, struct socket_env *env) {
@@ -112,7 +113,7 @@
} else {
- make_connected(l[fd],NULL,env);
+ make_connected(l[fd],-1,env);
}
}
Modified: bsockets/trunk/callback.h
===================================================================
--- bsockets/trunk/callback.h 2006-08-10 09:14:57 UTC (rev 7013)
+++ bsockets/trunk/callback.h 2006-08-10 09:18:41 UTC (rev 7014)
@@ -1,11 +1,15 @@
#ifndef _CALLBACK_H_
#define _CALLBACK_H_
+#include "socket.h"
+
void CALLBACK callback_write(DWORD, DWORD, OVERLAPPED*, DWORD);
void CALLBACK callback_read (DWORD, DWORD, OVERLAPPED*, DWORD);
void complete_connect(int, int, struct socket_env *);
+void make_connected(struct bsocket *, int, struct socket_env *);
+
#endif
Modified: bsockets/trunk/event.c
===================================================================
--- bsockets/trunk/event.c 2006-08-10 09:14:57 UTC (rev 7013)
+++ bsockets/trunk/event.c 2006-08-10 09:18:41 UTC (rev 7014)
@@ -144,6 +144,7 @@
if (b != NULL) {
e->s = b->s;
e->fd = b->fd;
+ e->p_fd = b->partner;
}
e->type = type;
@@ -244,6 +245,7 @@
return out;
}
+
int post_write(struct bsocket *b, int *ret, int *err, struct socket_env *env) {
ASSERT(b->write_buf != NULL);
@@ -253,7 +255,6 @@
return event_post(b,EV_WRITE,&b->write_wb,ret,err,TRUE,env);
-
}
int post_exception(struct bsocket *b, int err, struct socket_env *env) {
@@ -345,8 +346,9 @@
WSAEVENT we[WSA_MAXIMUM_WAIT_EVENTS];
struct __ev ev[WSA_MAXIMUM_WAIT_EVENTS];
+ WSABUF *wb;
+
WSANETWORKEVENTS ne;
-
WSAOVERLAPPED *wo;
struct _connect_data *c;
@@ -357,6 +359,8 @@
struct _msg *msg;
+ struct bsocket *p;
+
int i, _j;
int err;
int out;
@@ -422,6 +426,7 @@
z = 0;
+ //todo -- this is fishy
if ((i = connect_lookup[(SOCKET) ev->fd]) != -1) {
E_DELETE(i);
z++;
@@ -435,8 +440,11 @@
ASSERT(z <= 1);
//todo cancel any outstanding i/o (do we need to?)
- ASSERT(closesocket(e->s) == 0);
+ if (e->p_fd == -1)
+ ASSERT(closesocket(e->s) == 0);
+
+
//todo -- this is a major sync violation, but it feels safe, why?
SetEvent(env->b[e->fd]->exception_e);
@@ -529,10 +537,10 @@
case EV_WRITE:
- wo = (WSAOVERLAPPED*)
- malloc(sizeof(WSAOVERLAPPED));
+ if (e->p_fd == -1) {
- if (e->s != INVALID_SOCKET) {
+ wo = (WSAOVERLAPPED*)
+ malloc(sizeof(WSAOVERLAPPED));
r = WSASend(
e->s,
@@ -543,41 +551,92 @@
wo,
callback_write);
- } else {
+ if (r == 0) {
- //todo -- assert is connected
- msg = msg_new();
+ event_respond(e,z,0);
- if (msg != NULL) {
+ } else {
- } else{
+ switch(err = WSAGetLastError()) {
- r = -1;
+ case WSA_IO_PENDING:
+ event_respond(e,z,0);
+ break;
+ default:
+ event_respond(e,-1,unixify_wsaerr(err));
+ break;
+ }
}
+ } else {
+ //todo -- i hate this branching nonsense, come up with a
+ //local CHECK
- }
+ //it's tempting to use callback_read, but it's a bad
+ //move. what if partner excepts inbetween calls?
+ wb = e->data;
- if (r == 0) {
+ p = bsocket_get(e->p_fd,AS_READ,env);
+ i = 0;
- event_respond(e,z,0);
+ if (p != NULL) {
- } else {
+ while (wb->len) {
- switch(err = WSAGetLastError()) {
+ ASSERT(wb->len > 0);
- case WSA_IO_PENDING:
- event_respond(e,z,0);
- break;
+ msg = msg_new();
- default:
- event_respond(e,-1,unixify_wsaerr(err));
- break;
+ if (msg != NULL) {
+
+ msg->len = min(MSG_SIZE,wb->len);
+ memcpy(msg->o_data,wb->buf,msg->len);
+
+ if (list_enqueue(msg,p->in_q) != NULL) {
+
+ i += msg->len;
+ wb->len -= msg->len;
+ wb->buf += msg->len;
+
+ } else {
+ wb->len = 0;
+ }
+
+ } else {
+
+ //nothing exception worthy has happened here,
+ //we are just low on memory
+ wb->len = 0;
+ }
+
+ }
+
+ ASSERT(wb->len == 0);
+
+ if (i > 0) {
+ socket_raise(e->p_fd,IS_READABLE,TRUE,env);
+ }
+
+ socket_raise(e->fd,IS_WRITABLE,TRUE,env);
+
+
+ bsocket_release(e->p_fd,AS_READ,env);
+
+ event_respond(e,i,0);
+
+
+ } else {
+
+ //todo -- what is a better errno?
+ event_respond(e,-1,errno);
+
}
+
+
}
break;
Modified: bsockets/trunk/event.h
===================================================================
--- bsockets/trunk/event.h 2006-08-10 09:14:57 UTC (rev 7013)
+++ bsockets/trunk/event.h 2006-08-10 09:18:41 UTC (rev 7014)
@@ -34,6 +34,8 @@
int type;
int fd;
+ int p_fd;
+
};
struct _connect_data *connect_data_new(struct sockaddr*,size_t);
Modified: bsockets/trunk/io.c
===================================================================
--- bsockets/trunk/io.c 2006-08-10 09:14:57 UTC (rev 7013)
+++ bsockets/trunk/io.c 2006-08-10 09:18:41 UTC (rev 7014)
@@ -112,15 +112,6 @@
}
-//put EOF in input stream
-//todo -- is atomicity needed here?
-void bsocket_eof(struct bsocket *b) {
-
- //NULL data indicates EOF
- list_enqueue(NULL,b->in_q);
-
-}
-
//todo -- check is connected
int recv_win32(int fd, void *buf, size_t len, int flags, struct socket_env *env) {
@@ -161,6 +152,7 @@
pos = buf;
data_read = 0;
+
while (space_left) {
r = list_queuepeek(&msg,b->in_q);
@@ -187,6 +179,8 @@
if (msg->len == 0 ) {
//todo -- msg is being recycled when it shouldn't be
//msg_free(msg);
+
+
ASSERT(list_dequeue(NULL,b->in_q) == 0);
} else {
@@ -199,12 +193,14 @@
} else {
+
+
space_left = 0;
}
} else {
- socket_raise(fd,IS_READABLE,0,env);
+ socket_raise(fd,IS_READABLE,FALSE,env);
}
@@ -222,7 +218,6 @@
}
-
int send_win32 (int fd, void *buf, size_t len, int flags, struct socket_env *env) {
struct bsocket *b;
@@ -234,6 +229,8 @@
b = bsocket_get(fd,AS_WRITE,env);
CHECK(b != NULL,0);
+ //todo -- possible sync fault
+ CHECK(b->connected,ENOTCONN);
//todo -- is this good enough to guarantee the socket is writable?
if (b->blocking == FALSE) {
@@ -242,8 +239,7 @@
r = wait_until(b,IS_WRITABLE,env);
- //todo
- //bsocket_raise(b,IS_WRITABLE,0,env);
+ socket_raise(fd,IS_WRITABLE,0,env);
CHECK(r == 0,0);
Modified: bsockets/trunk/io.h
===================================================================
--- bsockets/trunk/io.h 2006-08-10 09:14:57 UTC (rev 7013)
+++ bsockets/trunk/io.h 2006-08-10 09:18:41 UTC (rev 7014)
@@ -21,7 +21,6 @@
};
-void bsocket_eof(struct bsocket *);
void invoke_read(SOCKET s, int fd, struct socket_env *);
struct _msg *msg_new();
Modified: bsockets/trunk/select.c
===================================================================
--- bsockets/trunk/select.c 2006-08-10 09:14:57 UTC (rev 7013)
+++ bsockets/trunk/select.c 2006-08-10 09:18:41 UTC (rev 7014)
@@ -6,6 +6,7 @@
#include "select.h"
#include "misc.h"
#include "io.h"
+#include "list.h"
//atomicity -- must own list to call this
void bsocket_raise(struct bsocket *b, int type, int active, struct socket_env *env ) {
@@ -106,24 +107,24 @@
bsocket_list_release(env);
}
-;
-
}
-//atomicity is provided by caller -- must own list
+//requires list ownership to call
void bsocket_exception(struct bsocket *b, int err, struct socket_env *env) {
- if (b->s != INVALID_SOCKET) {
+ if (!b->excepted) {
if (err) {
b->err = err;
}
- b->closed = TRUE;
+ b->excepted = TRUE;
+ b->connected = FALSE;
b->eof = TRUE;
- bsocket_eof(b);
+ //NULL represents EOF
+ list_enqueue(NULL,b->in_q);
bsocket_raise(b,IS_READABLE,TRUE,env);
bsocket_raise(b,IS_WRITABLE,TRUE,env);
@@ -133,9 +134,13 @@
b->s = INVALID_SOCKET;
+
+ if (b->partner != -1) {
+ bsocket_exception(env->b[b->partner],err,env);
+ }
+
}
-
}
void socket_exception(int fd, int err, struct socket_env *env) {
Modified: bsockets/trunk/socket.c
===================================================================
--- bsockets/trunk/socket.c 2006-08-10 09:14:57 UTC (rev 7013)
+++ bsockets/trunk/socket.c 2006-08-10 09:18:41 UTC (rev 7014)
@@ -244,7 +244,6 @@
MUTEX_ACQUIRE(env->b[fd]->read_m);
MUTEX_ACQUIRE(env->b[fd]->write_m);
-
//todo -- if this fails fd is lost forever (TS, or cope?)
list_enqueue((void*) b->fd, env->free_q);
@@ -289,7 +288,8 @@
b->except_wl = NULL;
b->in_q = NULL;
- b->partner = NULL;
+ b->partner = -1;
+ b->excepted = FALSE;
b->read_m = CreateMutex(NULL,FALSE,NULL);
CHECK_(b->read_m != NULL);
@@ -339,6 +339,7 @@
ASSERT(FALSE);
}
+//todo -- make sure type is SOCK_STREAM
int socketpair_win32(int domain, int type, int protocol, int fd[2], struct socket_env *env){
@@ -374,8 +375,8 @@
b[0]->fd = s[0];
b[1]->fd = s[1];
- make_connected(b[0],b[1],env);
- make_connected(b[1],b[0],env);
+ make_connected(b[0],b[1]->fd,env);
+ make_connected(b[1],b[0]->fd,env);
fd[0] = s[0];
fd[1] = s[1];
@@ -516,7 +517,6 @@
event_ping(env);
fail:
- //todo fix memory leak on failure
if (h != NULL && out == -1) {
TerminateThread(h,1);
CloseHandle(h);
@@ -536,11 +536,7 @@
out = 0;
was_null = FALSE;
- //printf("==shutdown\n");
- //fflush(stdout);
-
//todo -- i can smell a sync issue here
-
if (env == NULL) {
env = __GLOBAL_BSOCKET_ENV_;
was_null = TRUE;
@@ -548,6 +544,7 @@
}
for (i=0; i<MAX_BSOCKETS; i++) {
+
close_win32(i,env);
}
@@ -563,7 +560,6 @@
ASSERT(r == WAIT_OBJECT_0);
-
//make sure we have cleaned up properly
//todo - get rid of this when we are confident it isn't a problem
for (i=0; i<MAX_BSOCKETS; i++) {
Modified: bsockets/trunk/socket.h
===================================================================
--- bsockets/trunk/socket.h 2006-08-10 09:14:57 UTC (rev 7013)
+++ bsockets/trunk/socket.h 2006-08-10 09:18:41 UTC (rev 7014)
@@ -49,6 +49,8 @@
int err;
int eof;
+ int excepted;
+
int fd;
/*writing business*/
@@ -62,8 +64,8 @@
/*reading business*/
struct _list *in_q;
- /*this is non-null if the socket is part of a socketpair*/
- struct bsocket *partner;
+ //-1 if not in a socketpair, >0 otherwise*/
+ int partner;
};
Modified: bsockets/trunk/sync.c
===================================================================
--- bsockets/trunk/sync.c 2006-08-10 09:14:57 UTC (rev 7013)
+++ bsockets/trunk/sync.c 2006-08-10 09:18:41 UTC (rev 7014)
@@ -93,6 +93,7 @@
out = l[fd];
}
+
LR
bsocket_list_release(env);
Modified: bsockets/trunk/test.c
===================================================================
--- bsockets/trunk/test.c 2006-08-10 09:14:57 UTC (rev 7013)
+++ bsockets/trunk/test.c 2006-08-10 09:18:41 UTC (rev 7014)
@@ -1675,7 +1675,6 @@
int test_simple_socketpair() {
-
int s[2];
int r;
@@ -1687,14 +1686,10 @@
TEST(r == 0);
r = bsend(s[0],&msg,sizeof(msg),0);
- printf("%d\n",r);
- fflush(stdout);
-
TEST(r == sizeof(msg));
r = brecv(s[1],&msg,sizeof(msg),0);
TEST(r == sizeof(msg));
-
TEST(msg == 666);
TEST(bsocket_shutdown(NULL) == 0);
@@ -1702,10 +1697,66 @@
return 0;
}
-int test_simple_socketpair_nb() {
- TEST(FALSE);
+int test_socketpair_exception() {
+ int s[2];
+ int r;
+
+ int i;
+
+ int msg_base=666;
+ int msg_count = 100;
+ int msg;
+
+ TEST(bsocket_init(NULL) == 0);
+
+
+ r = bsocketpair(AF_INET,SOCK_STREAM,0,s);
+
+ for (i=0; i<msg_count; i++) {
+ msg = msg_base+i;
+ r = bsend(s[0],&msg,sizeof(msg),0);
+ TEST(r == sizeof(msg));
+ }
+
+ for (i=0; i<msg_count; i++) {
+ msg = msg_base*2+i;
+ r = bsend(s[1],&msg,sizeof(msg),0);
+ TEST(r == sizeof(msg));
+ }
+
+ socket_exception(s[0],1, __GLOBAL_BSOCKET_ENV_);
+
+ for (i=0; i<msg_count; i++) {
+ r = brecv(s[1],&msg,sizeof(msg),0);
+ TEST(r == sizeof(msg));
+ TEST(msg == (msg_base+i));
+ }
+
+ for (i=0; i<msg_count; i++) {
+ r = brecv(s[0],&msg,sizeof(msg),0);
+ TEST(r == sizeof(msg));
+ TEST(msg == (msg_base*2)+i);
+ }
+
+ r = brecv(s[0],&msg,sizeof(msg),0);
+ TEST(r == 0);
+
+ r = brecv(s[1],&msg,sizeof(msg),0);
+ TEST(r == 0);
+
+ r = bsend(s[0],&msg,sizeof(msg),0);
+ TEST(r == -1);
+ TEST(errno == ENOTCONN);
+
+ r = bsend(s[1],&msg,sizeof(msg),0);
+ TEST(r == -1);
+ TEST(errno == ENOTCONN);
+
+ TEST(bsocket_shutdown(NULL) == 0);
+
return 0;
+
}
int test_multiread() {
@@ -1761,8 +1812,8 @@
{test_multi_connect,NULL,"Test rapid firing server with connections."},
{test_simple_write,NULL,"Connect and send some data to a server."},
{test_simple_read,NULL,"Connect and read some data from a server."},
- {test_simple_socketpair,NULL,"Test simple transfer of data between socketpairs.\n"},
- {test_simple_socketpair_nb,NULL,"\t(non-blocking"},
+ {test_simple_socketpair,NULL,"Test simple transfer of data between socketpairs."},
+ {test_socketpair_exception,NULL,"Test is socketpair exception behaves properly."},
//{test_verify_npp_usage,NULL,"Check that the NPP is not being used for large sends."},
{NULL,NULL,NULL}
};
Modified: bsockets/trunk/unix.h
===================================================================
--- bsockets/trunk/unix.h 2006-08-10 09:14:57 UTC (rev 7013)
+++ bsockets/trunk/unix.h 2006-08-10 09:18:41 UTC (rev 7014)
@@ -65,4 +65,9 @@
#define ECLOSED (BSOCK_PRIVATE_ERRNO+14)
#endif
+#ifndef ENOTCONN
+#define ENOTCONN (BSOCK_PRIVATE_ERRNO+15)
#endif
+
+
+#endif