[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[or-cvs] r10828: IOCPloader works! The only remaining issue is: Is the fact t (in libevent-urz/trunk: loaders sample)



Author: Urz
Date: 2007-07-14 22:35:25 -0400 (Sat, 14 Jul 2007)
New Revision: 10828

Modified:
   libevent-urz/trunk/loaders/IOCPloader.c
   libevent-urz/trunk/loaders/IOCPloader.h
   libevent-urz/trunk/sample/IOCPloader-test.c
Log:
IOCPloader works!
The only remaining issue is: Is the fact that WSASend completes immediately an artefact of testing on a single machine, or is it an issue with my code?

Modified: libevent-urz/trunk/loaders/IOCPloader.c
===================================================================
--- libevent-urz/trunk/loaders/IOCPloader.c	2007-07-14 13:52:45 UTC (rev 10827)
+++ libevent-urz/trunk/loaders/IOCPloader.c	2007-07-15 02:35:25 UTC (rev 10828)
@@ -6,6 +6,8 @@
 #include <stdio.h>
 
 void ReadComplete(connection *Conn, DWORD size);
+void ResetRead(connection *Conn);
+void SendComplete(connection *Conn, DWORD size);
 
 #define NO_WORKERS 2
 
@@ -23,16 +25,23 @@
  * loaders/IOCPloader.c: In function `iocp_worker_thread':
  * loaders/IOCPloader.c:209: warning: dereferencing type-punned pointer will break strict-aliasing rules
  * loaders/IOCPloader.c:213: warning: dereferencing type-punned pointer will break strict-aliasing rules
- *
- * Still need to talk to someone with knowledge about WSASend in re the size of the data to be sent.
  */
  
+void printhex(char *data, size_t len) {
+    size_t i;
+    printf("[");
+    for(i = 0; i < (len-1); i++) {
+        printf("%x, ", (int) data[i]);
+    }
+    printf("%x ]\n", (int) data[len-1]);
+}
+ 
 int IOCPloader_bind(SOCKET *s, struct sa_bufferevent * bufevent) {
     DWORD myListElem;
     DWORD listSearch;
     char found = 0;
     
-    printf("Bind Start\n");
+    //printf("Bind Start\n");
     
     ev_lock(listLock);
     
@@ -93,11 +102,12 @@
     
     // Despite the name, this call associates the socket with the I/O Completion port
     IOCP = CreateIoCompletionPort((HANDLE) *(connList[myListElem].sock), IOCP, (ULONG_PTR) myListElem, 0);
+    ResetRead(&connList[myListElem]);
     
     ev_unlock(connList[myListElem].lock);
     ev_unlock(listLock);
     
-    printf("Bind End\n");
+    //printf("Bind End\n");
     
     return (int) myListElem;
 }
@@ -146,6 +156,8 @@
     DWORD WSASendFlags = 0;
     DWORD localListSize;
     DWORD sizeRet = -1;
+    int sendRet;
+    int error;
     
     while(1) {
         ev_lock(listLock);
@@ -154,7 +166,7 @@
         ev_unlock(listLock);
         Sleep(1000);
         
-        printf("S");
+        //printf("S");
         
         for(listpos = 0; listpos < localListSize; listpos++) {
             
@@ -169,13 +181,17 @@
                     ev_unlock(connList[listpos].lock);
                     continue;
                 }
-                // TODO: How does WSASend know the amount of available data? Does it
-                // assume the WSABUF is full?
+                connList[listpos].sendbuf->len = unloaded;
+                // WSASend assumes the WSABUF is full.
+                // Thus, without reallocing the buffer, we just change the 'size' of the
+                // buffer. We know the size of the buffer is really SUGGESTED_BUF_SIZE tho.
                 // remind us that sending is in progress and we can't overwrite the buffer
                 connList[listpos].canSend = 0;
                 //printf("Sock : %d Overlapped %d (pos %d)\n", (int) connList[listpos].sock, (int) &(connList[listpos].sendol), (int) listpos);
                 //fflush(stdout);
-                WSASend(
+                //printf("Presend buf: ");
+                //printhex(connList[listpos].sendbuf->buf, unloaded);
+                sendRet = WSASend(
                     *(connList[listpos].sock),
                     // Socket to send on
                     connList[listpos].sendbuf,
@@ -191,9 +207,20 @@
                     NULL
                     // no completion routine
                 );
-                if(sizeRet != -1) {
-                    printf("WTF?: WSASend changed sizeRet\n");
+                if(sendRet == 0) {
+                    //printf("WSASend completed immediately\n");
+                    SendComplete(&connList[listpos], sizeRet);
+                } else if(sendRet == SOCKET_ERROR) {
+                    error = WSAGetLastError();
+                    if(error != WSA_IO_PENDING) {
+                        printf("WSASend error %d\n", error);
+                    } else {
+                        //printf("WSASend delayed completion\n");
+                    }
                 }
+                // if(sizeRet != -1) {
+                    // printf("WTF?: WSASend changed sizeRet\n");
+                // }
             }
             ev_unlock(connList[listpos].lock);
         }
@@ -208,8 +235,6 @@
     DWORD CompletionKey;
     BOOL GQCSRet;
     connection *Conn;
-    UINT WSARecvRet;
-    ULONG WSARecvFlags = 0;
 
     while(1) {
         Conn = NULL;
@@ -234,7 +259,7 @@
             continue;
         }
         
-        printf("C");
+        //printf("C");
         
         // This is no error, a completed read event has occured.
         // Get the connection ths applies to.
@@ -243,7 +268,7 @@
         ev_lock(Conn->lock);
         if(CompletedOverlapped->op == OP_SEND) {
             // the connection has sent all data and is ready for more
-            Conn->canSend = 1;
+            SendComplete(Conn, CompleteSize);
             printf("s");
         } else if(CompletedOverlapped->op == OP_RECV) {
             // Now we have a completed read event, perform the operations
@@ -252,25 +277,7 @@
             
             // Now the data has been removed from the WSABUF, we can reset the
             // read to continue reading.
-            // http://msdn2.microsoft.com/en-us/library/ms741688.aspx
-            WSARecvRet = WSARecv(
-                *(Conn->sock),
-                // The socket to recieve from
-                Conn->recvbuf,
-                // Pointer to an 'array' of WSABUFs.
-                1,
-                // The 'array' is of size 1
-                NULL, 
-                // This parameter would recieve the size in bytes of the
-                // read. However, because we are using overlapped, it doesn't.
-                &WSARecvFlags,
-                // Flags which control the operation of WSARecv. I belive MSG_WAITALL
-                // is the one we want.
-                (OVERLAPPED *) &(Conn->recvol), 
-                // The overlapped structure for the event.
-    			NULL
-                // The callback - we are using events for this, so that's not important
-                );
+            ResetRead(Conn);
             printf("r");
         }
         ev_unlock(Conn->lock);
@@ -278,10 +285,38 @@
     return 0;
 }
 
+void ResetRead(connection *Conn) {
+    // Reset the read.
+    // http://msdn2.microsoft.com/en-us/library/ms741688.aspx
+    UINT WSARecvRet;
+    ULONG WSARecvFlags = 0;
+    WSARecvRet = WSARecv(
+        *(Conn->sock),
+        // The socket to recieve from
+        Conn->recvbuf,
+        // Pointer to an 'array' of WSABUFs.
+        1,
+        // The 'array' is of size 1
+        NULL, 
+        // This parameter would recieve the size in bytes of the
+        // read. However, because we are using overlapped, it doesn't.
+        &WSARecvFlags,
+        // Flags which control the operation of WSARecv. I belive MSG_WAITALL
+        // is the one we want.
+        (OVERLAPPED *) &(Conn->recvol), 
+        // The overlapped structure for the event.
+        NULL
+        // The callback - we are using events for this, so that's not important
+        );
+}
+
 void ReadComplete(connection *Conn, DWORD size) {
     size_t toload, loaded;
     char *upto;
         
+    //printf("Postrecv buf: ");
+    //printhex(Conn->recvbuf->buf, size);
+    
     upto = Conn->recvbuf->buf;
     toload = (size_t) size;
     
@@ -291,3 +326,7 @@
         upto += loaded;
     }
 }
+
+void SendComplete(connection *Conn, DWORD size) {
+    Conn->canSend = 1;
+}

Modified: libevent-urz/trunk/loaders/IOCPloader.h
===================================================================
--- libevent-urz/trunk/loaders/IOCPloader.h	2007-07-14 13:52:45 UTC (rev 10827)
+++ libevent-urz/trunk/loaders/IOCPloader.h	2007-07-15 02:35:25 UTC (rev 10828)
@@ -9,6 +9,7 @@
 DWORD WINAPI iocp_worker_thread(LPVOID);
 DWORD WINAPI iocp_writer_thread(LPVOID);
 int IOCPloader_bind(SOCKET *, struct sa_bufferevent *);
+void printhex(char *data, size_t len);
 
 #define SUGGESTED_BUF_SIZE 4096
 

Modified: libevent-urz/trunk/sample/IOCPloader-test.c
===================================================================
--- libevent-urz/trunk/sample/IOCPloader-test.c	2007-07-14 13:52:45 UTC (rev 10827)
+++ libevent-urz/trunk/sample/IOCPloader-test.c	2007-07-15 02:35:25 UTC (rev 10828)
@@ -43,17 +43,25 @@
 #define BUF_SIZE 1000
 #define LISTEN_PORT 1025
 
+void doexit(int val) {
+    char data;
+    fread(&data, 1, 1, stdin);
+    exit(val);
+}
+
 void gen_pattern_a(char *buf, size_t len) {
     size_t upto;
+    static char cycle = 0;
     for(upto = 0; upto < len; upto++) {
-        buf[upto] = 'a';
+        buf[upto] = cycle++;
     }
 }
 
 int check_pattern_a(char *buf, size_t len) {
     size_t upto;
+    static char cycle = 0;
     for(upto = 0; upto < len; upto++) {
-        if(buf[upto] != 'a') {
+        if(buf[upto] != cycle++) {
             return 0;
         }
     }
@@ -62,15 +70,17 @@
 
 void gen_pattern_b(char *buf, size_t len) {
     size_t upto;
+    static char cycle = 255;
     for(upto = 0; upto < len; upto++) {
-        buf[upto] = 'b';
+        buf[upto] = cycle--;
     }
 }
 
 int check_pattern_b(char *buf, size_t len) {
     size_t upto;
+    static char cycle = 255;
     for(upto = 0; upto < len; upto++) {
-        if(buf[upto] != 'b') {
+        if(buf[upto] != cycle--) {
             return 0;
         }
     }
@@ -87,8 +97,9 @@
     
         if(!check_pattern_b(buf, len_read)) {
             buf[BUF_SIZE] = '\0';
-            printf("Recieved buffer failed pattern check b: recieved %s\n", buf);
-            exit(0);
+            printf("Recieved buffer failed pattern check b: recieved ");
+            printhex(buf, BUF_SIZE);
+            doexit(0);
         }
     } while (len_read != 0);
     
@@ -101,6 +112,8 @@
     size_t len_read;
     
     gen_pattern_a(buf, BUF_SIZE);
+    //printf("Sending buffer: ");
+    //printhex(buf, BUF_SIZE);
     sa_bufferevent_write(sabe, buf, BUF_SIZE);
     
     printf("W");
@@ -122,7 +135,7 @@
     WSAStartup(MAKEWORD( 2, 2 ), &wsaData);
     event_init();
     
-    lsabe = sa_bufferevent_new(listener_on_read, NULL, NULL, NULL);
+    lsabe = sa_bufferevent_new(listener_on_read, listener_on_write, NULL, NULL);
     // Obviously one or more of these has to be changed to be not-null
     
     Listen = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
@@ -139,12 +152,12 @@
 
     if(bind(Listen, (struct sockaddr*) &listenAddr, sizeof(listenAddr)) == SOCKET_ERROR) {
         printf("Oh noes! Bind failed\n");
-        exit(0);
+        doexit(0);
     }
 
     if(listen(Listen, 1) == SOCKET_ERROR) {
         printf("Oh noes! Listen failed\n");
-        exit(0);
+        doexit(0);
     }
     
     printf("Listening...\n");
@@ -163,6 +176,8 @@
     
     gen_pattern_a(buf, BUF_SIZE);
     printf("Generating Initial Pattern\n");
+    //printf("Sending buffer: ");
+    //printhex(buf, BUF_SIZE);
     
     sa_bufferevent_write(lsabe, buf, BUF_SIZE);
     printf("Writing to buffer\n");
@@ -181,12 +196,15 @@
     
         if(!check_pattern_a(buf, len_read)) {
             buf[BUF_SIZE] = '\0';
-            printf("Recieved buffer failed pattern check a: recieved %s\n", buf);
-            exit(0);
+            printf("Recieved buffer failed pattern check a: recieved ");
+            printhex(buf, BUF_SIZE);
+            doexit(0);
         }
     } while (len_read != 0);
     
     gen_pattern_b(buf, BUF_SIZE);
+    //printf("Sending buffer: ");
+    //printhex(buf, BUF_SIZE);
     sa_bufferevent_write(sabe, buf, BUF_SIZE);
     
     printf(".");
@@ -221,14 +239,14 @@
     Connect = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED);
     if(Connect == INVALID_SOCKET) {
         printf("Oh noes! WSASocket failed\n");
-        exit(0);
+        doexit(0);
     }
 
     printf("Connecting...\n");
     conret = connect(Connect, (struct sockaddr*)&socketAddr, sizeof(socketAddr));
     if(conret == SOCKET_ERROR) {
         printf("Oh noes! Connect failed\n");
-        exit(0);
+        doexit(0);
     } else {
         printf("conret: %d\n", conret);
     }