[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[or-cvs] r6992: - fixed race condition in notification that was resulting in (bsockets/trunk)
Author: chiussi
Date: 2006-08-08 19:15:44 -0400 (Tue, 08 Aug 2006)
New Revision: 6992
Added:
bsockets/trunk/bsocket.c
Modified:
bsockets/trunk/Makefile
bsockets/trunk/bsocket.h
bsockets/trunk/callback.c
bsockets/trunk/callback.h
bsockets/trunk/event.c
bsockets/trunk/io.c
bsockets/trunk/io.h
bsockets/trunk/list.c
bsockets/trunk/misc.c
bsockets/trunk/misc.h
bsockets/trunk/select.c
bsockets/trunk/socket.c
bsockets/trunk/socket.h
bsockets/trunk/sync.c
bsockets/trunk/test.c
bsockets/trunk/wait.c
Log:
- fixed race condition in notification that was resulting in deadlock when a callback was invoked while a event was being processed
- cleaned up memory leak on socket write
- removed some todos
- socketpair init complete
- initial work on socketpair send
Modified: bsockets/trunk/Makefile
===================================================================
--- bsockets/trunk/Makefile 2006-08-08 22:56:26 UTC (rev 6991)
+++ bsockets/trunk/Makefile 2006-08-08 23:15:44 UTC (rev 6992)
@@ -1,7 +1,7 @@
test_HEADERS= test.h
bsock_HEADERS = list.h test.h socket.h bsocket.h misc.h
-sock_OBJS = list.o socket.o unix.o event.o sync.o select.o wait.o misc.o io.o callback.o
+sock_OBJS = list.o socket.o unix.o event.o sync.o select.o wait.o misc.o io.o callback.o bsocket.o
test_OBJS = test.o ${sock_OBJS}
BIN_SUFFIX = .exe
Added: bsockets/trunk/bsocket.c
===================================================================
--- bsockets/trunk/bsocket.c 2006-08-08 22:56:26 UTC (rev 6991)
+++ bsockets/trunk/bsocket.c 2006-08-08 23:15:44 UTC (rev 6992)
@@ -0,0 +1,23 @@
+#include <stdio.h>
+
+#include "bsocket.h"
+
+#ifdef USE_WIN32
+
+int bsocketpair(int domain, int type, int protocol, int fd[2]) {
+
+ return socketpair_win32(domain,type,protocol,fd,__GLOBAL_BSOCKET_ENV_);
+}
+
+#else
+
+int bsocketpair(int domain, int type, int protocol, int fd[2]) {
+
+ return socketpair(domain,type,protocol,fd);
+
+}
+
+#endif
+
+
+
Property changes on: bsockets/trunk/bsocket.c
___________________________________________________________________
Name: svn:executable
+ *
Modified: bsockets/trunk/bsocket.h
===================================================================
--- bsockets/trunk/bsocket.h 2006-08-08 22:56:26 UTC (rev 6991)
+++ bsockets/trunk/bsocket.h 2006-08-08 23:15:44 UTC (rev 6992)
@@ -1,6 +1,8 @@
#ifndef _BSOCKET_H_
#define _BSOCKET_H_
+#include <windows.h>
+
#define USE_WIN32
#ifdef USE_WIN32
@@ -53,7 +55,9 @@
int getsockopt_win32(int, int, int, void*, int *, struct socket_env*);
int send_win32(int, void*, size_t, int, struct socket_env*);
int recv_win32(int, void*, size_t, int, struct socket_env*);
+int socketpair_win32(int, int, int, int*, struct socket_env*);
+
#define bsocket_init(X) socket_init_win32(X)
#define bsocket_shutdown(X) socket_cleanup_win32(X)
@@ -70,6 +74,9 @@
#define bsend(A,B,C,D) send_win32(A,B,C,D,__GLOBAL_BSOCKET_ENV_)
#define brecv(A,B,C,D) recv_win32(A,B,C,D,__GLOBAL_BSOCKET_ENV_);
+
+int bsocketpair(int,int,int,int*);
+
#define F_SETFL 1
#define O_NONBLOCK 1
@@ -85,7 +92,7 @@
#define MAX_SIMUL_CONNECT 9
#define MAX_SIMUL_LISTEN 5
-#if (MAX_SIMUL_CONNECT + MAX_SIMUL_LISTEN + 2 > WSA_MAXIMUM_WAIT_EVENTS )
+#if ((MAX_SIMUL_CONNECT + MAX_SIMUL_LISTEN + 2 )> WSA_MAXIMUM_WAIT_EVENTS )
#error "Reduce MAX_SIMUL_CONNECT or MAX_SIMUL_LISTEN"
#endif
Modified: bsockets/trunk/callback.c
===================================================================
--- bsockets/trunk/callback.c 2006-08-08 22:56:26 UTC (rev 6991)
+++ bsockets/trunk/callback.c 2006-08-08 23:15:44 UTC (rev 6992)
@@ -29,37 +29,51 @@
struct bsocket *b;
struct _msg *msg;
+ int r;
+
+ //todo -- do we need to free wo when we're done?
msg = (struct _msg*) wo->hEvent;
if (err == 0) {
ASSERT(len >= 0);
-
ASSERT(len <= MSG_SIZE);
- b = bsocket_get(msg->fd,AS_READ,msg->env);
+ if (len > 0) {
- ASSERT(b != NULL);
- ASSERT(b->fd == msg->fd);
+ b = bsocket_get(msg->fd,AS_READ,msg->env);
- if ( list_enqueue(msg,b->in_q ) == 0) {
+ ASSERT(b != NULL);
+ ASSERT(b->fd == msg->fd);
msg->len = len;
- printf("%d\n",msg->len);
- fflush(stdout);
- post_read(b,msg->env);
- socket_raise(b->fd,IS_READABLE,TRUE,msg->env);
+ r = (int) list_enqueue(msg,b->in_q );
+ bsocket_release(msg->fd,AS_READ,msg->env);
+
+ if (r != 0) {
+
+ ASSERT(len > 0);
+
+ post_read(b,msg->env);
+
+ socket_raise(msg->fd,IS_READABLE,TRUE,msg->env);
+
+ } else {
+
+ //todo -- dealloc msg?
+ socket_exception(b->fd,errno,msg->env);
+
+ }
+
} else {
- //todo -- dealloc msg?
- socket_exception(b->fd,errno,msg->env);
+ socket_exception(msg->fd,0,msg->env);
+
}
- bsocket_release(msg->fd,AS_READ,msg->env);
-
} else {
socket_exception(msg->fd,unixify_wsaerr(err),msg->env);
@@ -67,13 +81,12 @@
}
-//caller must guarantee atomicity
+//caller must guarantee list atomicity
void make_connected(struct bsocket *b, struct bsocket *partner, struct socket_env *env) {
b->connected = TRUE;
b->partner = partner;
- //todo -- start reading
bsocket_raise(b,IS_READABLE,FALSE,env);
bsocket_raise(b,IS_WRITABLE,TRUE,env);
@@ -100,7 +113,7 @@
} else {
make_connected(l[fd],NULL,env);
- }
+ }
}
@@ -108,4 +121,5 @@
}
+
}
Modified: bsockets/trunk/callback.h
===================================================================
--- bsockets/trunk/callback.h 2006-08-08 22:56:26 UTC (rev 6991)
+++ bsockets/trunk/callback.h 2006-08-08 23:15:44 UTC (rev 6992)
@@ -6,5 +6,6 @@
void CALLBACK callback_read (DWORD, DWORD, OVERLAPPED*, DWORD);
void complete_connect(int, int, struct socket_env *);
+
#endif
Modified: bsockets/trunk/event.c
===================================================================
--- bsockets/trunk/event.c 2006-08-08 22:56:26 UTC (rev 6991)
+++ bsockets/trunk/event.c 2006-08-08 23:15:44 UTC (rev 6992)
@@ -77,6 +77,10 @@
}
+ if (e->sig != NULL) {
+ //CloseHandle(e->sig);
+ }
+
free(e);
}
@@ -128,6 +132,8 @@
CHECK(e != NULL,ENOMEM);
+ w = NULL;
+
if (wait) {
w = CreateEvent(NULL,TRUE,FALSE,NULL);
CHECK_(w != NULL);
@@ -157,19 +163,21 @@
MUTEX_RELEASE(env->post_m);
- if (w != NULL) {
+ if (w != NULL ) {
ASSERT( WaitForSingleObject(w,INFINITE) == WAIT_OBJECT_0);
}
fail:
+ if (w != NULL) {
+ ASSERT(CloseHandle(w));
+ }
if (out == -1) {
e->type = EV_PING;
event_free(e);
}
-
return out;
}
@@ -266,6 +274,7 @@
}
+
struct _event *event_next(struct socket_env *env) {
struct _event *e;
@@ -346,6 +355,8 @@
struct _list *connect_q;
+ struct _msg *msg;
+
int i, _j;
int err;
int out;
@@ -426,6 +437,9 @@
//todo cancel any outstanding i/o (do we need to?)
ASSERT(closesocket(e->s) == 0);
+ //todo -- this is a major sync violation, but it feels safe, why?
+ SetEvent(env->b[e->fd]->exception_e);
+
event_respond(e,0,0);
break;
@@ -463,6 +477,7 @@
} else {
+ //todo -- errno should = 0?
event_respond(e,-1,errno);
}
@@ -517,16 +532,35 @@
wo = (WSAOVERLAPPED*)
malloc(sizeof(WSAOVERLAPPED));
+ if (e->s != INVALID_SOCKET) {
- r = WSASend(
- e->s,
- (WSABUF*) (e->data),
- 1,
- (DWORD*) &z,
- 0,
- wo,
- callback_write);
+ r = WSASend(
+ e->s,
+ (WSABUF*) (e->data),
+ 1,
+ (DWORD*) &z,
+ 0,
+ wo,
+ callback_write);
+ } else {
+
+ //todo -- assert is connected
+ msg = msg_new();
+
+ if (msg != NULL) {
+
+ } else{
+
+ r = -1;
+
+
+ }
+
+
+
+ }
+
if (r == 0) {
event_respond(e,z,0);
@@ -636,6 +670,16 @@
}
+ //make sure there are no outstanding events
+ ASSERT(list_dequeue(NULL,env->event_q) != 0);
+
+ for (i=1; i<WSA_MAXIMUM_WAIT_EVENTS; i++) {
+
+ //todo -- assert nothing is using these events
+ ASSERT(WSACloseEvent(we[i]));
+ }
+
+
fail:
if (connect_q != NULL)
Modified: bsockets/trunk/io.c
===================================================================
--- bsockets/trunk/io.c 2006-08-08 22:56:26 UTC (rev 6991)
+++ bsockets/trunk/io.c 2006-08-08 23:15:44 UTC (rev 6992)
@@ -61,6 +61,7 @@
CHECK(msg != NULL,0);
+ //todo we dont want to have to allocate a wo each bloody time
wo = (WSAOVERLAPPED*)
malloc(sizeof(WSAOVERLAPPED));
@@ -78,44 +79,23 @@
flags = 0;
- while (resume) {
+ z = 0;
- resume = FALSE;
+ r = WSARecv(
+ s,
+ &msg->buf_wb,
+ 1,
+ (DWORD*) &z,
+ (DWORD*) &flags,
+ wo,
+ callback_read
+ );
- z = 0;
+ if (r == SOCKET_ERROR) {
- r = WSARecv(
- s,
- &msg->buf_wb,
- 1,
- (DWORD*) &z,
- (DWORD*) &flags,
- wo,
- callback_read
- );
+ err = WSAGetLastError();
- if (r != SOCKET_ERROR) {
-
- if (z == 0) {
-
- out = -1;
- err = 0;
- resume = FALSE;
-
- } else {
- resume = TRUE;
-
- }
-
-
- } else {
-
- err = WSAGetLastError();
-
- CHECK(err == WSA_IO_PENDING,err);
-
- }
-
+ CHECK(err == WSA_IO_PENDING,err);
}
fail:
@@ -133,7 +113,7 @@
}
//put EOF in input stream
-//atomicity must have write permissions for socket, list ownership not needed (todo ?)
+//todo -- is atomicity needed here?
void bsocket_eof(struct bsocket *b) {
//NULL data indicates EOF
@@ -161,6 +141,7 @@
CHECK(len > 0, EINVAL);
b = bsocket_get(fd,AS_READ,env);
+
CHECK(b != NULL,0);
if ( list_is_empty(b->in_q)) {
@@ -172,55 +153,61 @@
r = wait_until(b,IS_READABLE,env);
b = bsocket_get(fd,AS_READ,env);
+
CHECK(b != NULL,0);
}
- space_left = len;
- pos = buf;
- data_read = 0;
+ space_left = len;
+ pos = buf;
+ data_read = 0;
- //todo -- if next message is EOF, return everything up until it
-
while (space_left) {
r = list_queuepeek(&msg,b->in_q);
- ASSERT(r == 0);
- if (msg != NULL) {
+ if (r == 0) {
- copy_len = min(space_left,msg->len);
- ASSERT(copy_len > 0);
+ if (msg != NULL) {
- memcpy(pos,msg->data,copy_len);
+ copy_len = min(space_left,msg->len);
- msg->len -= copy_len;
+ if (copy_len <= 0 ) {
+ printf("--%d\n",copy_len);
+ fflush(stdout);
+ }
- ASSERT(msg->len >= 0);
+ ASSERT(copy_len > 0);
- if (msg->len == 0 ) {
+ memcpy(pos,msg->data,copy_len);
- msg_free(msg);
- ASSERT(list_dequeue(NULL,b->in_q) == 0);
+ msg->len -= copy_len;
- } else {
- msg->data += copy_len;
+ ASSERT(msg->len >= 0);
- }
+ if (msg->len == 0 ) {
+ //todo -- msg is being recycled when it shouldn't be
+ //msg_free(msg);
+ ASSERT(list_dequeue(NULL,b->in_q) == 0);
- pos += copy_len;
- space_left -= copy_len;
- data_read += copy_len;
+ } else {
+ msg->data += copy_len;
+ }
+ pos += copy_len;
+ space_left -= copy_len;
+ data_read += copy_len;
- } else {
+ } else {
- space_left = 0;
+ space_left = 0;
- }
+ }
+ } else {
+ socket_raise(fd,IS_READABLE,0,env);
+ }
-
}
out = data_read;
@@ -234,12 +221,8 @@
return out;
}
-int send_socketpair_win32() {
- return -1;
-}
-
int send_win32 (int fd, void *buf, size_t len, int flags, struct socket_env *env) {
struct bsocket *b;
@@ -252,8 +235,6 @@
b = bsocket_get(fd,AS_WRITE,env);
CHECK(b != NULL,0);
- //todo -- if socketpair, send_socketpair_win32
-
//todo -- is this good enough to guarantee the socket is writable?
if (b->blocking == FALSE) {
CHECK(b->write_buf == NULL,EAGAIN);
@@ -274,16 +255,16 @@
r = post_write(b,&z,&err,env);
- CHECK(r == 0,0);
-
- out = z;
-
- if (out == -1)
+ if (r != 0) {
errno = err;
+ out = -1;
+ } else {
+ out = z;
+ }
fail:
- if ( (out == -1) && (b->write_buf != NULL )) {
+ if ( (out == -1) && (b != NULL) && (b->write_buf != NULL )) {
free(b->write_buf);
}
Modified: bsockets/trunk/io.h
===================================================================
--- bsockets/trunk/io.h 2006-08-08 22:56:26 UTC (rev 6991)
+++ bsockets/trunk/io.h 2006-08-08 23:15:44 UTC (rev 6992)
@@ -24,4 +24,7 @@
void bsocket_eof(struct bsocket *);
void invoke_read(SOCKET s, int fd, struct socket_env *);
+struct _msg *msg_new();
+void msg_free(struct _msg *);
+
#endif
Modified: bsockets/trunk/list.c
===================================================================
--- bsockets/trunk/list.c 2006-08-08 22:56:26 UTC (rev 6991)
+++ bsockets/trunk/list.c 2006-08-08 23:15:44 UTC (rev 6992)
@@ -11,8 +11,12 @@
void list_data(void *data, struct _list_node *n) {
//todo -- this is ugly, why?
- *((void**) data) = n->data;
+ if (data != NULL) {
+ *((void**) data) = n->data;
+
+ }
+
}
struct _list_node *list_enqueue(void *data, struct _list *list) {
Modified: bsockets/trunk/misc.c
===================================================================
--- bsockets/trunk/misc.c 2006-08-08 22:56:26 UTC (rev 6991)
+++ bsockets/trunk/misc.c 2006-08-08 23:15:44 UTC (rev 6992)
@@ -2,12 +2,8 @@
#include "misc.h"
-///
-
-
char _G_VERY_BAD_NEWS_[] = "\nSerious error, can't cope. Here is as much information as we can give...\n\tfile:%s line:%d errno:%d win32 err:%d\n";
-
HANDLE make_thread (void *func, void *data) {
return CreateThread(NULL,0,func, data, 0, NULL);
Modified: bsockets/trunk/misc.h
===================================================================
--- bsockets/trunk/misc.h 2006-08-08 22:56:26 UTC (rev 6991)
+++ bsockets/trunk/misc.h 2006-08-08 23:15:44 UTC (rev 6992)
@@ -28,8 +28,9 @@
#define MUTEX_ACQUIRE(X) ASSERT(WaitForSingleObject(X,INFINITE) == WAIT_OBJECT_0)
#define MUTEX_RELEASE(X) ASSERT(ReleaseMutex(X))
+#define LA //printf("list acquire: %s:%d\n",__func__,__LINE__); fflush(stdout);
+#define LR //printf("list release: %s:%d\n",__func__,__LINE__); fflush(stdout);
-
int winsock_start();
HANDLE make_thread(void*,void*);
Modified: bsockets/trunk/select.c
===================================================================
--- bsockets/trunk/select.c 2006-08-08 22:56:26 UTC (rev 6991)
+++ bsockets/trunk/select.c 2006-08-08 23:15:44 UTC (rev 6992)
@@ -7,6 +7,7 @@
#include "misc.h"
#include "io.h"
+//atomicity -- must own list to call this
void bsocket_raise(struct bsocket *b, int type, int active, struct socket_env *env ) {
struct _wait_list *wl;
@@ -28,7 +29,6 @@
break;
default:
-
ASSERT(FALSE);
break;
@@ -48,6 +48,8 @@
struct bsocket **l;
+ LA;
+
l = bsocket_list_get(env);
if (l != NULL) {
@@ -56,6 +58,7 @@
bsocket_raise(l[fd],type,active,env);
}
+ LR;
bsocket_list_release(env);
}
@@ -66,6 +69,7 @@
struct bsocket **l;
+ LA
l = bsocket_list_get(env);
if (l != NULL) {
@@ -76,6 +80,7 @@
}
+ LR
bsocket_list_release(env);
}
@@ -86,6 +91,7 @@
struct bsocket **l;
+ LA
l = bsocket_list_get(env);
if (l != NULL) {
@@ -96,24 +102,29 @@
l[fd]->err = 0;
}
-
+ LR
bsocket_list_release(env);
}
+;
+
}
//atomicity is provided by caller -- must own list
void bsocket_exception(struct bsocket *b, int err, struct socket_env *env) {
-// ASSERT(err != 0);
-
if (b->s != INVALID_SOCKET) {
- b->err = err;
+ if (err) {
+ b->err = err;
+ }
+
b->closed = TRUE;
b->eof = TRUE;
+ bsocket_eof(b);
+
bsocket_raise(b,IS_READABLE,TRUE,env);
bsocket_raise(b,IS_WRITABLE,TRUE,env);
bsocket_raise(b,IS_EXCEPTED,TRUE,env);
@@ -131,6 +142,8 @@
struct bsocket **l;
+ LA
+
l = bsocket_list_get(env);
if (l != NULL) {
@@ -139,6 +152,8 @@
bsocket_exception(l[fd],err,env);
}
+ LR
+
bsocket_list_release(env);
}
@@ -298,6 +313,7 @@
CHECK(b != NULL,ENOMEM);
CHECK(t != NULL,ENOMEM);
+ LA
l = bsocket_list_get(env);
CHECK(l != NULL,0);
@@ -318,6 +334,7 @@
}
}
+ LR
bsocket_list_release(env);
l = NULL;
@@ -389,7 +406,6 @@
}
-
fail:
if (b != NULL) {
free(b);
@@ -402,6 +418,3 @@
return out;
}
-
-
-
Modified: bsockets/trunk/socket.c
===================================================================
--- bsockets/trunk/socket.c 2006-08-08 22:56:26 UTC (rev 6991)
+++ bsockets/trunk/socket.c 2006-08-08 23:15:44 UTC (rev 6992)
@@ -13,6 +13,7 @@
#include "sync.h"
#include "select.h"
#include "misc.h"
+#include "callback.h"
//support only turning off and on socket blocking, will add more features as needed
int fcntl_win32(int fd, int cmd, long args, struct socket_env *env ) {
@@ -69,8 +70,8 @@
int out;
-
l = NULL;
+ LA
l = bsocket_list_get(env);
CHECK(l[fd] != NULL,EBADF);
@@ -85,13 +86,11 @@
case SO_ERROR:
-
CHECK(*optlen >= sizeof(int),EINVAL);
*((int*) optval) = l[fd]->err;
*optlen = sizeof(int);
-
break;
default:
@@ -109,7 +108,7 @@
}
fail:
-
+ LR
bsocket_list_release(env);
return out;
@@ -125,8 +124,10 @@
out = 0;
- b = bsocket_get(fd,AS_RW,env);
+ b = bsocket_get(fd,AS_WRITE,env);
+
+
CHECK(b != NULL,0);
CHECK(b->connected == FALSE,EISCONN);
CHECK(b->connecting == FALSE,EALREADY);
@@ -164,7 +165,7 @@
fail:
if (b != NULL) {
- bsocket_release(fd,AS_RW,env);
+ bsocket_release(fd,AS_WRITE,env);
}
@@ -172,6 +173,7 @@
}
+
void bsocket_free(struct bsocket *b) {
if (b->read_wl != NULL) {
@@ -189,55 +191,71 @@
wl_free(b->except_wl);
}
-}
+ if (b->write_m != NULL) {
+ CloseHandle(b->write_m);
+ }
+ if (b->read_m != NULL) {
+ CloseHandle(b->read_m);
+ }
-//todo handle SO_LINGER
-int bsocket_close(struct bsocket *b, struct socket_env *env) {
+ if (b->close_e != NULL) {
+ CloseHandle(b->close_e);
+ }
- SetEvent(b->close_e);
+ if (b->exception_e != NULL) {
+ CloseHandle(b->exception_e);
+ }
- bsocket_exception(b,ECLOSED,env);
+ //todo -- remove any messages waiting
- bsocket_list_release(env);
+ list_free(b->in_q);
- //manually aquire all socket mutexes (don't use atomic!)
- MUTEX_ACQUIRE(b->read_m);
- MUTEX_ACQUIRE(b->write_m);
+ free(b);
+
+}
+
+int close_win32(int fd, struct socket_env *env) {
+
+ struct bsocket *b;
+ int out;
+ int r;
+
+ //manually ensure that socket exists
+ CHECK(fd >= 0,EBADF);
+ CHECK(fd < MAX_BSOCKETS,EBADF);
+
MUTEX_ACQUIRE(env->list_m);
+ b = env->b[fd];
+ MUTEX_RELEASE(env->list_m);
- if (!b->closed) {
- ASSERT(post_close(b,env) == 0);
- }
+ CHECK(b != NULL,EBADF);
- //return fd
- list_enqueue((void*) b->fd, env->free_q);
+ out = 0;
- bsocket_free(b);
+ socket_exception(fd,ECLOSED,env);
- env->b[b->fd] = NULL;
+ r = WaitForSingleObject(env->b[fd]->exception_e,INFINITE);
+ ASSERT(r == WAIT_OBJECT_0);
- //todo if there is an outstanding error, that becomes the output of close
+ SetEvent(b->close_e);
- return 0;
+ MUTEX_ACQUIRE(env->list_m);
+ MUTEX_ACQUIRE(env->b[fd]->read_m);
+ MUTEX_ACQUIRE(env->b[fd]->write_m);
-}
-//todo -- fix so that close_win32 does not require any new memory
-int close_win32(int fd, struct socket_env *env) {
+ //todo -- if this fails fd is lost forever (TS, or cope?)
+ list_enqueue((void*) b->fd, env->free_q);
- struct bsocket **l;
+ env->b[fd] = NULL;
- int out;
+ MUTEX_RELEASE(env->list_m);
- l = bsocket_list_get(env);
- ASSERT(l != NULL);
- CHECK(l[fd] != NULL,EBADF);
+ bsocket_free(b);
- out = bsocket_close(l[fd],env);
+ //todo out = last error
- bsocket_list_release(env);
-
fail:
return out;
@@ -264,6 +282,7 @@
b->write_m = NULL;
b->close_e = NULL;
+ b->exception_e = NULL;
b->read_wl = NULL;
b->write_wl = NULL;
@@ -281,6 +300,9 @@
b->close_e = CreateEvent(NULL,TRUE,FALSE,NULL);
CHECK_(b->close_e != NULL);
+ b->exception_e = CreateEvent(NULL,TRUE,FALSE,NULL);
+ CHECK_(b->close_e != NULL);
+
b->err = 0;
b->connected = FALSE;
b->connecting = FALSE;
@@ -317,7 +339,79 @@
ASSERT(FALSE);
}
+int socketpair_win32(int domain, int type, int protocol, int fd[2], struct socket_env *env){
+
+ struct bsocket *b[2];
+ struct bsocket **l;
+
+ int s[2];
+ int out;
+
+ b[0] = NULL;
+ b[1] = NULL;
+
+ b[0] = bsocket_new(INVALID_SOCKET);
+ CHECK(b[0] != NULL,0);
+
+ b[1] = bsocket_new(INVALID_SOCKET);
+ CHECK(b[0] != NULL,0);
+
+ s[0] = -1;
+ s[1] = -1;
+
+ l = NULL;
+
+ l = bsocket_list_get(env);
+ CHECK(l != NULL,0);
+
+ s[0] = claim_free_fd(b[0],env);
+ CHECK(s[0] != -1,0);
+
+ s[1] = claim_free_fd(b[1],env);
+ CHECK(s[1] != -1,0);
+
+ b[0]->fd = s[0];
+ b[1]->fd = s[1];
+
+ make_connected(b[0],b[1],env);
+ make_connected(b[1],b[0],env);
+
+ fd[0] = s[0];
+ fd[1] = s[1];
+ out = 0;
+
+ fail:
+
+ if (out == -1) {
+
+ if (s[0] != -1) {
+ release_fd(s[0],env);
+ }
+
+ if (s[1] != -1) {
+ release_fd(s[1],env);
+ }
+
+ if (b[0] != NULL) {
+ bsocket_free(b[0]);
+ }
+
+ if (b[1] != NULL) {
+ bsocket_free(b[1]);
+ }
+
+ }
+
+ if (l != NULL) {
+ bsocket_list_release(env);
+ }
+
+ return out;
+
+}
+
+
int socket_win32(int af, int type, int protocol, struct socket_env *env) {
SOCKET s;
@@ -377,7 +471,6 @@
ASSERT( sizeof(int) == sizeof(DWORD));
-
if (env == NULL) {
env = (struct socket_env *)
@@ -413,7 +506,6 @@
env->list_m = CreateMutex(NULL,FALSE,NULL);
CHECK_(env->list_m != NULL);
-
env->wait_m = CreateMutex(NULL,FALSE,NULL);
CHECK_(env->wait_m != NULL);
@@ -436,8 +528,6 @@
int socket_cleanup_win32(struct socket_env *env) {
- struct bsocket **l;
-
int i;
int out;
int r;
@@ -446,6 +536,9 @@
out = 0;
was_null = FALSE;
+ //printf("==shutdown\n");
+ //fflush(stdout);
+
//todo -- i can smell a sync issue here
if (env == NULL) {
@@ -454,26 +547,18 @@
}
- ASSERT(SetEvent(env->shutdown_e));
+ for (i=0; i<MAX_BSOCKETS; i++) {
+ close_win32(i,env);
+ }
+ LA
MUTEX_ACQUIRE(env->list_m);
- l = env->b;
+ ASSERT(SetEvent(env->shutdown_e));
- for (i=0; i<MAX_BSOCKETS; i++) {
-
- if (l[i] != NULL) {
-
- r = bsocket_close(l[i],env);
-
- }
-
- }
-
//todo replace with post_shutdown
ASSERT(event_post(NULL,EV_SHUTDOWN,NULL,NULL,NULL,FALSE,env) == 0);
-
r = WaitForSingleObject (env->event_t,1000);
ASSERT(r == WAIT_OBJECT_0);
@@ -486,13 +571,15 @@
ASSERT(env->b[i] == NULL);
}
-
ASSERT(CloseHandle(env->shutdown_e));
//ASSERT(CloseHandle(env->free_m));
ASSERT(CloseHandle(env->list_m));
ASSERT(CloseHandle(env->post_m));
ASSERT(CloseHandle(env->wait_m));
+ WSACloseEvent(env->post_e);
+ CloseHandle(env->event_t);
+
list_free(env->free_q);
free(env);
Modified: bsockets/trunk/socket.h
===================================================================
--- bsockets/trunk/socket.h 2006-08-08 22:56:26 UTC (rev 6991)
+++ bsockets/trunk/socket.h 2006-08-08 23:15:44 UTC (rev 6992)
@@ -26,7 +26,6 @@
#define AE_GLOBAL (AE_POST|AE_LIST|AE_FREE|AE_WAIT)
-
struct bsocket {
SOCKET s;
@@ -35,6 +34,7 @@
HANDLE write_m;
HANDLE close_e;
+ HANDLE exception_e;
struct _wait_list *read_wl;
struct _wait_list *write_wl;
@@ -65,7 +65,6 @@
/*this is non-null if the socket is part of a socketpair*/
struct bsocket *partner;
-
};
struct socket_env {
Modified: bsockets/trunk/sync.c
===================================================================
--- bsockets/trunk/sync.c 2006-08-08 22:56:26 UTC (rev 6991)
+++ bsockets/trunk/sync.c 2006-08-08 23:15:44 UTC (rev 6992)
@@ -9,44 +9,45 @@
int claim_free_fd(struct bsocket *b, struct socket_env *env) {
- struct bsocket **l;
-
int out;
- //we don't care about the output, this just gives us list
- //ownership
- l = bsocket_list_get(env);
-
- CHECK(l != NULL, 0);
-
if (list_dequeue(&out,env->free_q) == 0) {
- l[out] = b;
+ env->b[out] = b;
} else {
errno = EMFILE;
out = -1;
}
- bsocket_list_release(env);
-
- fail:
return out;
}
+void release_fd(int fd, struct socket_env *env) {
+ struct bsocket **l;
+
+ ASSERT(fd >= 0);
+ ASSERT(fd < MAX_BSOCKETS);
+
+ l[fd] = NULL;
+
+
+}
+
+
struct bsocket **bsocket_list_get(struct socket_env *env) {
if (atomic(-1,AE_LIST)) {
return NULL;
}
-
return env->b;
}
void bsocket_list_release(struct socket_env *env) {
+
release(-1,AE_LIST);
}
@@ -65,6 +66,7 @@
}
+//todo -- FOOL! don't own list while waiting on socket mutexes (change!!)
struct bsocket *bsocket_get(int fd, int access, struct socket_env *env) {
struct bsocket **l;
@@ -81,21 +83,21 @@
}
out = NULL;
+ LA
l = bsocket_list_get(env);
if (l != NULL) {
+ //todo -- if l[fd] == NULL?
if (!atomic(fd,access)) {
out = l[fd];
}
+ LR
bsocket_list_release(env);
}
-// printf("bsocket_get out\n");
- //fflush(stdout);
-
return out;
@@ -231,6 +233,7 @@
case WAIT_ABANDONED:
//there is no acceptable reason for wfmo() to fail
case WAIT_FAILED:
+ printf("! %d\n",r);
ASSERT(FALSE);
break;
@@ -239,6 +242,7 @@
//not a critical issue but we want to know if we claimed any mutexes
//(we shouldn't have)
ASSERT(mutex_count_2 == mutex_count);
+
errno = ECLOSED;
mutex_count = 0;
out = -1;
@@ -252,7 +256,6 @@
}
}
-
if (out != -1) {
ASSERT(mutex_count == 1);
Modified: bsockets/trunk/test.c
===================================================================
--- bsockets/trunk/test.c 2006-08-08 22:56:26 UTC (rev 6991)
+++ bsockets/trunk/test.c 2006-08-08 23:15:44 UTC (rev 6992)
@@ -371,17 +371,11 @@
errno = 0;
-
-
-
TEST(bclose(fd[0]) == 0);
TEST(errno == 0);
-
-
f = bsocket(AF_INET,SOCK_STREAM,0);
-
TEST(f != -1);
TEST(errno == 0);
@@ -392,9 +386,6 @@
}
-
-
-
f = bsocket(AF_INET,SOCK_STREAM,0);
TEST(f != -1);
@@ -405,16 +396,17 @@
}
-//todo -- think of a way to check if there is a memory leak
+//todo -- there is still a very small memory leak in socket init
int test_socketinit() {
int fd;
- int tests = 1;
+ int tests = 50;
while (tests--) {
TEST(bsocket_init(NULL) == 0 );
+
fd = bsocket(AF_INET,SOCK_STREAM,0);
TEST(fd >= 0);
@@ -424,11 +416,13 @@
TEST(bsocket_shutdown(NULL) == 0 );
+
}
return 0;
}
+
int test_createlist() {
struct _list *list;
@@ -1601,9 +1595,10 @@
int r;
int msg;
-
s = socket(AF_INET,SOCK_STREAM,0);
+ msg = 666;
+
SILENT_TEST(s != INVALID_SOCKET);
r = bind(s,(struct sockaddr*) &localhost,sizeof(localhost));
@@ -1612,17 +1607,18 @@
r = listen(s,10);
SILENT_TEST(r == 0);
- c = accept(s,NULL,0);
- SILENT_TEST(c != INVALID_SOCKET);
+ while (TRUE) {
+ c = accept(s,NULL,0);
+ SILENT_TEST(c != INVALID_SOCKET);
- msg = 666;
- Sleep(100);
+ r = send(c,(char*) &msg, sizeof(msg),0);
+ SILENT_TEST(r == sizeof(msg));
- r = send(c,(char*) &msg, sizeof(msg),0);
- SILENT_TEST(r == sizeof(msg));
+ closesocket(c);
+ }
+
closesocket(s);
- closesocket(c);
return 0;
@@ -1635,29 +1631,89 @@
int fd;
int r;
int msg;
+ int tests;
- TEST(bsocket_init(NULL) == 0);
+ int mega_tests = 10;
- h = new_thread(test_simple_read_helper,NULL);
- TEST(h != NULL);
+ while(mega_tests--) {
- fd = bsocket(AF_INET,SOCK_STREAM,0);
- TEST(fd == 0);
+ TEST(bsocket_init(NULL) == 0);
- r = bconnect(fd,(struct sockaddr*) &localhost,sizeof(localhost));
+ h = new_thread(test_simple_read_helper,NULL);
+ TEST(h != NULL);
+ tests = 50;
+
+ while (tests--) {
+
+ fd = bsocket(AF_INET,SOCK_STREAM,0);
+ TEST(fd >= 0);
+
+ r = bconnect(fd,(struct sockaddr*) &localhost,sizeof(localhost));
+ TEST(r == 0);
+
+ r = brecv(fd,&msg,sizeof(msg),0);
+
+ TEST(r == sizeof(msg));
+ TEST(msg == 666);
+
+ r = brecv(fd,&msg,sizeof(msg),0);
+ TEST(r == 0);
+
+ r = brecv(fd,&msg,sizeof(msg),0);
+ TEST(r == 0);
+
+ TEST(bclose(fd) == 0);
+ }
+
+ TEST(bsocket_shutdown(NULL) == 0);
+ CloseHandle(h);
+ }
+
+ return 0;
+
+}
+
+int test_simple_socketpair() {
+
+
+ int s[2];
+ int r;
+
+ int msg = 666;
+
+ TEST(bsocket_init(NULL) == 0);
+
+ r = bsocketpair(AF_INET,SOCK_STREAM,0,s);
TEST(r == 0);
- r = brecv(fd,&msg,sizeof(msg),0);
+ r = bsend(s[0],&msg,sizeof(msg),0);
+ printf("%d\n",r);
+ fflush(stdout);
TEST(r == sizeof(msg));
+
+ r = brecv(s[1],&msg,sizeof(msg),0);
+ TEST(r == sizeof(msg));
+
TEST(msg == 666);
TEST(bsocket_shutdown(NULL) == 0);
return 0;
+}
+int test_simple_socketpair_nb() {
+ TEST(FALSE);
+
+ return 0;
}
+int test_multiread() {
+
+ return 0;
+
+}
+
int test_verify_npp_usage() {
errno = 666;
return -1;
@@ -1692,6 +1748,7 @@
{test_waitclose,NULL,"Test if a wait object behaves properly on closure."},
{test_maxoverlapped,NULL,"Test if we can have a sufficient number of outstanding operations."},
{test_bselect_macros,NULL,"Testing bselect() macros."},
+ //todo close returns error if error has occured and not been collected
// {test_multieventselect,NULL,"Can we have more than one socket associated with a WSAevent."},
//{test_theory1,NULL,"Test theory #1 (WSAwfme does not block when socket closed."},
//todo -- find a way to reduce timeout
@@ -1704,7 +1761,9 @@
{test_multi_connect,NULL,"Test rapid firing server with connections."},
{test_simple_write,NULL,"Connect and send some data to a server."},
{test_simple_read,NULL,"Connect and read some data from a server."},
- {test_verify_npp_usage,NULL,"Check that the NPP is not being used for large sends."},
+ {test_simple_socketpair,NULL,"Test simple transfer of data between socketpairs.\n"},
+ {test_simple_socketpair_nb,NULL,"\t(non-blocking"},
+ //{test_verify_npp_usage,NULL,"Check that the NPP is not being used for large sends."},
{NULL,NULL,NULL}
};
Modified: bsockets/trunk/wait.c
===================================================================
--- bsockets/trunk/wait.c 2006-08-08 22:56:26 UTC (rev 6991)
+++ bsockets/trunk/wait.c 2006-08-08 23:15:44 UTC (rev 6992)
@@ -304,7 +304,7 @@
void wl_deactivate(struct _wait_list *l) {
ASSERT(l->waiting == NULL);
- ASSERT(l->active);
+ //ASSERT(l->active);
l->active = FALSE;
@@ -329,7 +329,7 @@
if (out != NULL) {
out->waiting = NULL;
- out->active = 0;
+ out->active = FALSE;
} else {