[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[or-cvs] r7041: - cleaned up some i/o code - got rid of some todos (bsockets/trunk)
Author: chiussi
Date: 2006-08-13 10:49:38 -0400 (Sun, 13 Aug 2006)
New Revision: 7041
Modified:
bsockets/trunk/callback.c
bsockets/trunk/event.c
bsockets/trunk/io.c
bsockets/trunk/io.h
bsockets/trunk/select.c
bsockets/trunk/socket.c
bsockets/trunk/socket.h
bsockets/trunk/sync.c
bsockets/trunk/test.c
bsockets/trunk/wait.c
Log:
- cleaned up some i/o code
- got rid of some todos
Modified: bsockets/trunk/callback.c
===================================================================
--- bsockets/trunk/callback.c 2006-08-13 00:32:22 UTC (rev 7040)
+++ bsockets/trunk/callback.c 2006-08-13 14:49:38 UTC (rev 7041)
@@ -14,10 +14,61 @@
IN WSAOVERLAPPED *wo,
IN DWORD flags) {
- //todo -- activate write_wait_list
- //todo -- if write is incomplete, reiterate
- //todo -- if error, raise exception
+ struct _msg *msg;
+ int err2;
+ int r;
+ int z;
+
+ msg = (struct _msg*) wo->hEvent;
+
+ if (err == 0) {
+
+ if (msg->buf_wb.len == 0) {
+
+ socket_raise(msg->fd,IS_WRITABLE,TRUE,msg->env);
+ msg_free(msg);
+
+ } else {
+
+ //it is assumed that on an incomplete operation,
+ //the buffer pointer is adjusted upward by the number
+ //of written bytes. tests seem to confirm this, although
+ //i can't find it documented anywhere
+
+ //it is pretty nasty to do the write operation twice, but
+ //its the method involving the least overhead right now
+
+ //todo -- figure out a way to test this, i can't figure out a
+ //way to force WSASend to do an incomplete write
+ r = WSASend(
+ msg->s,
+ &msg->buf_wb,
+ 1,
+ (DWORD*) &z,
+ 0,
+ wo,
+ callback_write
+ );
+
+ if (r == SOCKET_ERROR) {
+
+ if ((err2 = WSAGetLastError()) != WSA_IO_PENDING) {
+
+ socket_exception(msg->fd,unixify_wsaerr(err2),msg->env);
+
+ }
+
+ }
+
+ }
+
+ } else {
+
+ socket_exception(msg->fd,unixify_wsaerr(err),msg->env);
+
+ }
+
}
void CALLBACK callback_read(
@@ -34,6 +85,7 @@
//todo -- do we need to free wo when we're done?
msg = (struct _msg*) wo->hEvent;
+
if (err == 0) {
ASSERT(len >= 0);
Modified: bsockets/trunk/event.c
===================================================================
--- bsockets/trunk/event.c 2006-08-13 00:32:22 UTC (rev 7040)
+++ bsockets/trunk/event.c 2006-08-13 14:49:38 UTC (rev 7041)
@@ -77,10 +77,6 @@
}
- if (e->sig != NULL) {
- //CloseHandle(e->sig);
- }
-
free(e);
}
@@ -175,6 +171,7 @@
}
if (out == -1) {
+ //set to ping so it doesn't try to do any type specific free()s
e->type = EV_PING;
event_free(e);
}
@@ -248,12 +245,10 @@
int post_write(struct bsocket *b, int *ret, int *err, struct socket_env *env) {
- ASSERT(b->write_buf != NULL);
+ //todo assert is writable
- b->write_wb.buf = b->write_buf;
- b->write_wb.len = b->write_len;
- return event_post(b,EV_WRITE,&b->write_wb,ret,err,TRUE,env);
+ return event_post(b,EV_WRITE,&b->w_wo,ret,err,TRUE,env);
}
@@ -271,7 +266,7 @@
int post_read(struct bsocket *b, struct socket_env *env) {
- return event_post(b,EV_READ,NULL,NULL,NULL,FALSE,env);
+ return event_post(b,EV_READ,&b->r_wo,NULL,NULL,FALSE,env);
}
@@ -520,7 +515,6 @@
unixify_wsaerr(WSAGetLastError()),
env );
-
}
}
@@ -531,20 +525,22 @@
case EV_READ:
- invoke_read(e->s,e->fd,env);
+ invoke_read(e->s,e->fd,(WSAOVERLAPPED*) e->data,env);
break;
case EV_WRITE:
+ wo = e->data;
+ msg = wo->hEvent;
+
if (e->p_fd == -1) {
- wo = (WSAOVERLAPPED*)
- malloc(sizeof(WSAOVERLAPPED));
+ msg->s = e->s;
r = WSASend(
e->s,
- (WSABUF*) (e->data),
+ &msg->buf_wb,
1,
(DWORD*) &z,
0,
@@ -572,62 +568,29 @@
} else {
- //todo -- i hate this branching nonsense, come up with a
- //local CHECK
-
//it's tempting to use callback_read, but it's a bad
//move. what if partner excepts inbetween calls?
- wb = e->data;
p = bsocket_get(e->p_fd,AS_READ,env);
- i = 0;
if (p != NULL) {
- while (wb->len) {
+ if (list_enqueue(msg,p->in_q) != NULL) {
- ASSERT(wb->len > 0);
+ socket_raise(e->fd,IS_WRITABLE,TRUE,env);
- msg = msg_new();
+ bsocket_release(e->p_fd,AS_READ,env);
- if (msg != NULL) {
+ event_respond(e,msg->len,0);
- msg->len = min(MSG_SIZE,wb->len);
- memcpy(msg->o_data,wb->buf,msg->len);
+ } else {
- if (list_enqueue(msg,p->in_q) != NULL) {
+ event_respond(e,-1,errno);
- i += msg->len;
- wb->len -= msg->len;
- wb->buf += msg->len;
-
- } else {
- wb->len = 0;
- }
-
- } else {
-
- //nothing exception worthy has happened here,
- //we are just low on memory
- wb->len = 0;
- }
-
}
- ASSERT(wb->len == 0);
- if (i > 0) {
- socket_raise(e->p_fd,IS_READABLE,TRUE,env);
- }
- socket_raise(e->fd,IS_WRITABLE,TRUE,env);
-
-
- bsocket_release(e->p_fd,AS_READ,env);
-
- event_respond(e,i,0);
-
-
} else {
//todo -- what is a better errno?
@@ -704,7 +667,6 @@
if (ev[i].type == FD_CONNECT) {
//todo -- figure out a way to test the element is being deleted
-
E_DELETE(i);
//if (connect_count >= MAX_SIMUL_CONNECT) {
@@ -726,7 +688,6 @@
}
-
}
//make sure there are no outstanding events
Modified: bsockets/trunk/io.c
===================================================================
--- bsockets/trunk/io.c 2006-08-13 00:32:22 UTC (rev 7040)
+++ bsockets/trunk/io.c 2006-08-13 14:49:38 UTC (rev 7041)
@@ -9,7 +9,7 @@
#include "list.h"
#include "io.h"
-struct _msg *msg_new() {
+struct _msg *msg_new(int fd, int len, void *data, struct socket_env *env) {
struct _msg *out;
@@ -18,33 +18,56 @@
if (out != NULL) {
- out->data = out->o_data;
+ out->o_data = malloc(len);
- out->buf_wb.buf = out->o_data;
- out->buf_wb.len = MSG_SIZE;
+ out->len = len;
+ if (out->o_data != NULL) {
+ if (data != NULL) {
+ memcpy(out->o_data,data,len);
+ }
+
+ out->data = out->o_data;
+
+ out->buf_wb.buf = out->o_data;
+ out->buf_wb.len = len;
+
+
+
+ out->env = env;
+ out->fd = fd;
+
+ } else {
+
+ free(out);
+ out = NULL;
+
+ errno = ENOMEM;
+
+ }
+
} else {
errno = ENOMEM;
}
+
return out;
}
void msg_free(struct _msg *msg) {
+ free(msg->o_data);
free(msg);
}
-void invoke_read(SOCKET s, int fd, struct socket_env *env) {
+void invoke_read(SOCKET s, int fd, WSAOVERLAPPED *wo, struct socket_env *env) {
- WSAOVERLAPPED *wo;
-
struct _msg *msg;
int err;
@@ -53,42 +76,33 @@
int r,z;
int flags;
- int resume;
+ out = 0;
+ flags = 0;
- wo = NULL;
+ z = 0;
- msg = msg_new();
-
+ msg = msg_new(fd,4096,NULL,env);
CHECK(msg != NULL,0);
- //todo we dont want to have to allocate a wo each bloody time
- wo = (WSAOVERLAPPED*)
- malloc(sizeof(WSAOVERLAPPED));
+ wo->hEvent = (WSAEVENT) msg;
- CHECK(wo != NULL,ENOMEM);
+ WSABUF *wb;
+ wb = (WSABUF*) malloc(sizeof (WSABUF));
- msg->fd = fd;
- msg->s = s;
- msg->env = env;
- wo->hEvent = msg;
+ ASSERT(wb != NULL);
- resume = TRUE;
+ wb->buf = malloc(1000);
+ wb->len = 1000;
- out = 0;
-
- flags = 0;
-
- z = 0;
-
r = WSARecv(
- s,
- &msg->buf_wb,
- 1,
- (DWORD*) &z,
- (DWORD*) &flags,
- wo,
- callback_read
+ s,
+ &msg->buf_wb,
+ 1,
+ (DWORD*) &z,
+ (DWORD*) &flags,
+ wo,
+ callback_read
);
if (r == SOCKET_ERROR) {
@@ -102,10 +116,10 @@
if (out == -1) {
- socket_exception(fd,unixify_wsaerr(err), env);
+ socket_exception(msg->fd,unixify_wsaerr(err), env);
- if (wo != NULL) {
- free(wo);
+ if (msg != NULL) {
+ msg_free(msg);
}
}
@@ -177,10 +191,8 @@
ASSERT(msg->len >= 0);
if (msg->len == 0 ) {
- //todo -- msg is being recycled when it shouldn't be
- //msg_free(msg);
+ msg_free(msg);
-
ASSERT(list_dequeue(NULL,b->in_q) == 0);
} else {
@@ -193,8 +205,6 @@
} else {
-
-
space_left = 0;
}
@@ -221,20 +231,20 @@
int send_win32 (int fd, void *buf, size_t len, int flags, struct socket_env *env) {
struct bsocket *b;
+ struct _msg *msg;
int err;
int r;
int z;
int out;
+
b = bsocket_get(fd,AS_WRITE,env);
CHECK(b != NULL,0);
- //todo -- possible sync fault
CHECK(b->connected,ENOTCONN);
- //todo -- is this good enough to guarantee the socket is writable?
if (b->blocking == FALSE) {
- CHECK(b->write_buf == NULL,EAGAIN);
+ //todo we need a new way to check if this socket is writable
}
r = wait_until(b,IS_WRITABLE,env);
@@ -243,11 +253,10 @@
CHECK(r == 0,0);
- b->write_buf = malloc(len);
- CHECK(b != NULL,ENOMEM);
- memcpy(b->write_buf,buf,len);
+ msg = msg_new(fd,len,buf,env);
+ CHECK(msg != NULL,0);
- b->write_len = len;
+ b->w_wo.hEvent = (WSAEVENT) msg;
r = post_write(b,&z,&err,env);
@@ -260,10 +269,6 @@
fail:
- if ( (out == -1) && (b != NULL) && (b->write_buf != NULL )) {
- free(b->write_buf);
- }
-
if (b != NULL) {
bsocket_release(fd,AS_WRITE,env);
}
Modified: bsockets/trunk/io.h
===================================================================
--- bsockets/trunk/io.h 2006-08-13 00:32:22 UTC (rev 7040)
+++ bsockets/trunk/io.h 2006-08-13 14:49:38 UTC (rev 7041)
@@ -12,18 +12,17 @@
struct socket_env *env;
- char o_data[MSG_SIZE];
-
int len;
int fd;
void *data;
+ void *o_data;
};
-void invoke_read(SOCKET s, int fd, struct socket_env *);
+void invoke_read(SOCKET, int, WSAOVERLAPPED *, struct socket_env *);
-struct _msg *msg_new();
+struct _msg *msg_new(int, int, void*, struct socket_env*);
void msg_free(struct _msg *);
#endif
Modified: bsockets/trunk/select.c
===================================================================
--- bsockets/trunk/select.c 2006-08-13 00:32:22 UTC (rev 7040)
+++ bsockets/trunk/select.c 2006-08-13 14:49:38 UTC (rev 7041)
@@ -134,7 +134,6 @@
b->s = INVALID_SOCKET;
-
if (b->partner != -1) {
bsocket_exception(env->b[b->partner],err,env);
}
Modified: bsockets/trunk/socket.c
===================================================================
--- bsockets/trunk/socket.c 2006-08-13 00:32:22 UTC (rev 7040)
+++ bsockets/trunk/socket.c 2006-08-13 14:49:38 UTC (rev 7041)
@@ -340,6 +340,7 @@
}
//todo -- make sure type is SOCK_STREAM
+//todo -- review
int socketpair_win32(int domain, int type, int protocol, int fd[2], struct socket_env *env){
@@ -557,7 +558,6 @@
ASSERT(event_post(NULL,EV_SHUTDOWN,NULL,NULL,NULL,FALSE,env) == 0);
r = WaitForSingleObject (env->event_t,1000);
-
ASSERT(r == WAIT_OBJECT_0);
//make sure we have cleaned up properly
Modified: bsockets/trunk/socket.h
===================================================================
--- bsockets/trunk/socket.h 2006-08-13 00:32:22 UTC (rev 7040)
+++ bsockets/trunk/socket.h 2006-08-13 14:49:38 UTC (rev 7041)
@@ -54,14 +54,14 @@
int fd;
/*writing business*/
+ WSAOVERLAPPED w_wo;
- WSABUF write_wb;
-
void *write_buf;
void *write_pointer;
int write_len;
/*reading business*/
+ WSAOVERLAPPED r_wo;
struct _list *in_q;
//-1 if not in a socketpair, >0 otherwise*/
Modified: bsockets/trunk/sync.c
===================================================================
--- bsockets/trunk/sync.c 2006-08-13 00:32:22 UTC (rev 7040)
+++ bsockets/trunk/sync.c 2006-08-13 14:49:38 UTC (rev 7041)
@@ -6,6 +6,7 @@
#include "sync.h"
#include "list.h"
#include "misc.h"
+#include "io.h"
int claim_free_fd(struct bsocket *b, struct socket_env *env) {
@@ -14,6 +15,7 @@
if (list_dequeue(&out,env->free_q) == 0) {
env->b[out] = b;
+ env->b[out]->fd = out;
} else {
errno = EMFILE;
Modified: bsockets/trunk/test.c
===================================================================
--- bsockets/trunk/test.c 2006-08-13 00:32:22 UTC (rev 7040)
+++ bsockets/trunk/test.c 2006-08-13 14:49:38 UTC (rev 7041)
@@ -52,7 +52,6 @@
int out;
- //todo might cause overflow if RAND_MAX is too big on your system
out = ((rand()*max)-1)/(RAND_MAX);
ASSERT(out < max);
@@ -1170,12 +1169,9 @@
IN DWORD dwError, IN DWORD cbTransferred,
IN LPWSAOVERLAPPED lpOverlapped,
IN DWORD dwFlags
-) {
+) {}
-}
-
-
int test_maxoverlapped() {
const int msg_len = 4096;
@@ -1246,7 +1242,6 @@
return 0;
}
-//todo -- come up with more comprehensive test
int test_bselect_macros() {
bfd_set fd;
@@ -1355,9 +1350,6 @@
TEST(!BFD_ISSET(1000,&fd));
TEST(!BFD_ISSET(2000,&fd));
-// TEST(fd.hash[0] == -1);
-// TEST(fd.hash[1000] == -1);
-// TEST(fd.hash[2000] == -1);
return 0;
}
@@ -1567,13 +1559,11 @@
TEST(fd == 0);
r = bconnect(fd, (struct sockaddr *) &localhost,sizeof(localhost));
-
TEST(r == 0);
msg = 666;
r = bsend(fd,&msg,sizeof(msg),0);
-
TEST(r == sizeof(msg));
TEST(WaitForSingleObject(h,1000) == WAIT_OBJECT_0);
@@ -1635,6 +1625,8 @@
int mega_tests = 10;
+
+
while(mega_tests--) {
TEST(bsocket_init(NULL) == 0);
@@ -1759,23 +1751,22 @@
}
-int test_multiread() {
-
- return 0;
-
-}
-
int test_verify_npp_usage() {
errno = 666;
return -1;
}
-
int test_complex_select() {
return -1;
}
+int test_accept() {
+ TEST(FALSE);
+
+ return 0;
+}
+
struct test_case tc[] =
{
@@ -1814,6 +1805,7 @@
{test_simple_socketpair,NULL,"Test simple transfer of data between socketpairs."},
{test_socketpair_exception,NULL,"Test is socketpair exception behaves properly."},
//{test_verify_npp_usage,NULL,"Check that the NPP is not being used for large sends."},
+ {test_accept,NULL,"Test is baccept() behaves properly."},
{NULL,NULL,NULL}
};
@@ -1830,7 +1822,6 @@
srand(time(NULL));
- //todo uncomment
winsock_start();
//lookup googles address before starting (we want something that should be available)
Modified: bsockets/trunk/wait.c
===================================================================
--- bsockets/trunk/wait.c 2006-08-13 00:32:22 UTC (rev 7040)
+++ bsockets/trunk/wait.c 2006-08-13 14:49:38 UTC (rev 7041)
@@ -5,12 +5,9 @@
#include "wait.h"
#include "misc.h"
-
//todo, come up with a way to recycle monitor objects so we dont have
//keep allocating them for each wait operation
-//todo -- resolve external race conditions with AE_LIST
-
//todo -- when activated with error code, each wait should return with that error code
int monitor_link(struct _wait_list *l, struct _monitor *m) {
@@ -21,7 +18,6 @@
ASSERT(l != NULL);
ASSERT(m != NULL);
-
if (l->active) {
out = 1;