[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[or-cvs] Implemented congestion control



Update of /home/or/cvsroot/src/or
In directory moria.seul.org:/home/arma/work/onion/cvs/src/or

Modified Files:
	buffers.c circuit.c command.c connection.c connection_ap.c 
	connection_exit.c connection_op.c connection_or.c main.c or.h 
Log Message:
Implemented congestion control

Servers are allowed to send 100 cells initially, and can't send more until
they receive a 'sendme' cell from that direction, indicating that they
can send 10 more cells. As it currently stands, the exit node quickly
runs out of window, and sends bursts of 10 whenever a sendme cell gets
to him. This is much much much faster (and more flexible) than the old 
"give each circuit 1 kB/s and hope nothing overflows" approach.

Also divided out the connection_watch_events into stop_reading,
start_writing, etc. That way we can control them separately.



Index: buffers.c
===================================================================
RCS file: /home/or/cvsroot/src/or/buffers.c,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -d -r1.5 -r1.6
--- buffers.c	16 Jul 2002 02:12:58 -0000	1.5
+++ buffers.c	18 Jul 2002 06:37:58 -0000	1.6
@@ -26,26 +26,26 @@
 
 int read_to_buf(int s, int at_most, char **buf, size_t *buflen, size_t *buf_datalen, int *reached_eof) {
 
-  /* read from socket s, writing onto buf+buf_datalen. Read at most
-   * 'at_most' bytes, and also don't read more than will fit based on buflen.
+  /* read from socket s, writing onto buf+buf_datalen. If at_most is >= 0 then
+   * read at most 'at_most' bytes, and in any case don't read more than will fit based on buflen.
    * If read() returns 0, set *reached_eof to 1 and return 0. If you want to tear
    * down the connection return -1, else return the number of bytes read.
    */
 
   int read_result;
 
-  assert(buf && *buf && buflen && buf_datalen && reached_eof && (s>=0) && (at_most >= 0));
+  assert(buf && *buf && buflen && buf_datalen && reached_eof && (s>=0));
 
   /* this is the point where you would grow the buffer, if you want to */
 
-  if(*buflen - *buf_datalen < at_most)
+  if(at_most < 0 || *buflen - *buf_datalen < at_most)
     at_most = *buflen - *buf_datalen; /* take the min of the two */
     /* (note that this only modifies at_most inside this function) */
 
   if(at_most == 0)
     return 0; /* we shouldn't read anything */
 
-  log(LOG_DEBUG,"read_to_buf(): reading at most %d bytes.",at_most);
+//  log(LOG_DEBUG,"read_to_buf(): reading at most %d bytes.",at_most);
   read_result = read(s, *buf+*buf_datalen, at_most);
   if (read_result < 0) {
     if(errno!=EAGAIN) { /* it's a real error */
@@ -58,7 +58,7 @@
     return 0;
   } else { /* we read some bytes */
     *buf_datalen += read_result;
-    log(LOG_DEBUG,"read_to_buf(): Read %d bytes. %d on inbuf.",read_result, *buf_datalen);
+//    log(LOG_DEBUG,"read_to_buf(): Read %d bytes. %d on inbuf.",read_result, *buf_datalen);
     return read_result;
   }
 
@@ -90,8 +90,8 @@
     *buf_datalen -= write_result;
     *buf_flushlen -= write_result;
     memmove(*buf, *buf+write_result, *buf_datalen);
-    log(LOG_DEBUG,"flush_buf(): flushed %d bytes, %d ready to flush, %d remain.",
-        write_result,*buf_flushlen,*buf_datalen);
+//    log(LOG_DEBUG,"flush_buf(): flushed %d bytes, %d ready to flush, %d remain.",
+//       write_result,*buf_flushlen,*buf_datalen);
     return *buf_flushlen;
   }
 }
@@ -114,7 +114,7 @@
 
   memcpy(*buf+*buf_datalen, string, string_len);
   *buf_datalen += string_len;
-  log(LOG_DEBUG,"write_to_buf(): added %d bytes to buf (now %d total).",string_len, *buf_datalen);
+//  log(LOG_DEBUG,"write_to_buf(): added %d bytes to buf (now %d total).",string_len, *buf_datalen);
   return *buf_datalen;
 
 }

Index: circuit.c
===================================================================
RCS file: /home/or/cvsroot/src/or/circuit.c,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -d -r1.6 -r1.7
--- circuit.c	16 Jul 2002 01:12:15 -0000	1.6
+++ circuit.c	18 Jul 2002 06:37:58 -0000	1.7
@@ -57,6 +57,9 @@
   circ->p_aci = p_aci;
   /* circ->n_aci remains 0 because we haven't identified the next hop yet */
 
+  circ->n_receive_window = RECEIVE_WINDOW_START;
+  circ->p_receive_window = RECEIVE_WINDOW_START;
+
   circuit_add(circ);
 
   return circ;

Index: command.c
===================================================================
RCS file: /home/or/cvsroot/src/or/command.c,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -d -r1.6 -r1.7
--- command.c	16 Jul 2002 01:12:15 -0000	1.6
+++ command.c	18 Jul 2002 06:37:58 -0000	1.7
@@ -22,6 +22,12 @@
     case CELL_DESTROY:
       command_process_destroy_cell(cell, conn);
       break;
+    case CELL_SENDME:
+      command_process_sendme_cell(cell, conn);
+      break;
+    default:
+      log(LOG_DEBUG,"Cell of unknown type (%d) received. Dropping.", cell->command);
+      break;
   }
 }
 
@@ -147,6 +153,8 @@
       return;
     }
     n_conn->state = EXIT_CONN_STATE_CONNECTING_WAIT;
+    n_conn->receiver_bucket = -1; /* edge connections don't do receiver buckets */
+    n_conn->bandwidth = -1;
     n_conn->s = -1; /* not yet valid */
     if(connection_add(n_conn) < 0) { /* no space, forget it */
       log(LOG_DEBUG,"command_process_create_cell(): connection_add failed. Closing.");
@@ -159,15 +167,65 @@
   }
 }
 
-void command_process_data_cell(cell_t *cell, connection_t *conn) {
+void command_process_sendme_cell(cell_t *cell, connection_t *conn) {
   circuit_t *circ;
 
-  /* FIXME do something with 'close' state, here */
+  circ = circuit_get_by_aci_conn(cell->aci, conn);
+
+  if(!circ) {
+    log(LOG_DEBUG,"command_process_sendme_cell(): unknown circuit %d. Dropping.", cell->aci);
+    return;
+  }
+
+  if(circ->state == CIRCUIT_STATE_OPEN_WAIT) {
+    log(LOG_DEBUG,"command_process_sendme_cell(): circuit in open_wait. Dropping.");
+    return;
+  }
+  if(circ->state == CIRCUIT_STATE_OR_WAIT) {
+    log(LOG_DEBUG,"command_process_sendme_cell(): circuit in or_wait. Dropping.");
+    return;
+  }
+
+  /* at this point both circ->n_conn and circ->p_conn are guaranteed to be set */
+
+  assert(cell->length == RECEIVE_WINDOW_INCREMENT);
+
+  if(cell->aci == circ->p_aci) { /* it's an outgoing cell */
+    circ->n_receive_window += cell->length;
+    log(LOG_DEBUG,"connection_process_sendme_cell(): n_receive_window for aci %d is %d.",circ->n_aci,circ->n_receive_window);
+    if(circ->n_conn->type == CONN_TYPE_EXIT) {
+      connection_start_reading(circ->n_conn);
+      connection_package_raw_inbuf(circ->n_conn); /* handle whatever might still be on the inbuf */
+    } else {
+      cell->aci = circ->n_aci; /* switch it */
+      if(connection_write_cell_to_buf(cell, circ->n_conn) < 0) { /* (clobbers cell) */
+        circuit_close(circ);
+        return;
+      }
+    }
+  } else { /* it's an ingoing cell */
+    circ->p_receive_window += cell->length;
+    log(LOG_DEBUG,"connection_process_sendme_cell(): p_receive_window for aci %d is %d.",circ->p_aci,circ->p_receive_window);
+    if(circ->p_conn->type == CONN_TYPE_AP) {
+      connection_start_reading(circ->p_conn);
+      connection_package_raw_inbuf(circ->p_conn); /* handle whatever might still be on the inbuf */
+    } else {
+      cell->aci = circ->p_aci; /* switch it */
+      if(connection_write_cell_to_buf(cell, circ->p_conn) < 0) { /* (clobbers cell) */
+        circuit_close(circ);
+        return;
+      }
+    }
+  } 
+}
+
+void command_process_data_cell(cell_t *cell, connection_t *conn) {
+  circuit_t *circ;
 
   circ = circuit_get_by_aci_conn(cell->aci, conn);
 
   if(!circ) {
-    log(LOG_DEBUG,"command_process_data_cell(): received DATA cell for unknown circuit. Dropping.");
+    log(LOG_DEBUG,"command_process_data_cell(): unknown circuit %d. Dropping.", cell->aci);
     return;
   }
 
@@ -184,6 +242,12 @@
 
   if(cell->aci == circ->p_aci) { /* it's an outgoing cell */
     cell->aci = circ->n_aci; /* switch it */
+    if(--circ->p_receive_window < 0) { /* is it less than 0 after decrement? */
+      log(LOG_DEBUG,"connection_process_data_cell(): Too many data cells on aci %d. Closing.", circ->p_aci);
+      circuit_close(circ);
+      return;
+    }
+    log(LOG_DEBUG,"connection_process_data_cell(): p_receive_window for aci %d is %d.",circ->p_aci,circ->p_receive_window);
     if(circuit_deliver_data_cell(cell, circ, circ->n_conn, 'd') < 0) {
       log(LOG_DEBUG,"command_process_data_cell(): circuit_deliver_data_cell (forward) failed. Closing.");
       circuit_close(circ);
@@ -191,6 +255,12 @@
     }
   } else { /* it's an ingoing cell */
     cell->aci = circ->p_aci; /* switch it */
+    if(--circ->n_receive_window < 0) { /* is it less than 0 after decrement? */
+      log(LOG_DEBUG,"connection_process_data_cell(): Too many data cells on aci %d. Closing.", circ->n_aci);
+      circuit_close(circ);
+      return;
+    }
+    log(LOG_DEBUG,"connection_process_data_cell(): n_receive_window for aci %d is %d.",circ->n_aci,circ->n_receive_window);
     if(circ->p_conn->type == CONN_TYPE_AP) { /* we want to decrypt, not encrypt */
       if(circuit_deliver_data_cell(cell, circ, circ->p_conn, 'd') < 0) {
         log(LOG_DEBUG,"command_process_data_cell(): circuit_deliver_data_cell (backward to AP) failed. Closing.");
@@ -213,7 +283,7 @@
   circ = circuit_get_by_aci_conn(cell->aci, conn);
 
   if(!circ) {
-    log(LOG_DEBUG,"command_process_destroy_cell(): received DESTROY cell for unknown circuit. Dropping.");
+    log(LOG_DEBUG,"command_process_destroy_cell(): unknown circuit %d. Dropping.", cell->aci);
     return;
   }
 

Index: connection.c
===================================================================
RCS file: /home/or/cvsroot/src/or/connection.c,v
retrieving revision 1.8
retrieving revision 1.9
diff -u -d -r1.8 -r1.9
--- connection.c	16 Jul 2002 18:24:12 -0000	1.8
+++ connection.c	18 Jul 2002 06:37:58 -0000	1.9
@@ -101,9 +101,7 @@
   if(conn->dest_port)
     free(conn->dest_port);
 
- /* FIXME should we do these for all connections, or just ORs, or what */
-  if(conn->type == CONN_TYPE_OR ||
-     conn->type == CONN_TYPE_OP) {
+  if(connection_speaks_cells(conn)) {
     EVP_CIPHER_CTX_cleanup(&conn->f_ctx);
     EVP_CIPHER_CTX_cleanup(&conn->b_ctx);
   }
@@ -158,7 +156,7 @@
   log(LOG_DEBUG,"connection_create_listener(): Listening on local port %u.",ntohs(local->sin_port));
 
   conn->state = LISTENER_STATE_READY;
-  connection_watch_events(conn, POLLIN);
+  connection_start_reading(conn);
 
   return 0;
 }
@@ -185,6 +183,11 @@
   newconn = connection_new(new_type);
   newconn->s = news;
 
+  if(!connection_speaks_cells(newconn)) {
+    newconn->receiver_bucket = -1;
+    newconn->bandwidth = -1;
+  }
+
   /* learn things from parent, so we can perform auth */
   memcpy(&newconn->local,&conn->local,sizeof(struct sockaddr_in));
   newconn->prkey = conn->prkey;
@@ -197,7 +200,7 @@
 
   log(LOG_DEBUG,"connection_handle_listener_read(): socket %d entered state %d.",newconn->s, new_state);
   newconn->state = new_state;
-  connection_watch_events(newconn, POLLIN);
+  connection_start_reading(newconn);
 
   return 0;
 }
@@ -284,13 +287,20 @@
 int connection_read_to_buf(connection_t *conn) {
   int read_result;
 
+  if(connection_speaks_cells(conn)) {
+    assert(conn->receiver_bucket >= 0);
+  }
+  if(!connection_speaks_cells(conn)) {
+    assert(conn->receiver_bucket < 0);
+  }
   read_result = read_to_buf(conn->s, conn->receiver_bucket, &conn->inbuf, &conn->inbuflen,
                             &conn->inbuf_datalen, &conn->inbuf_reached_eof);
-  log(LOG_DEBUG,"connection_read_to_buf(): read_to_buf returned %d.",read_result);
-  if(read_result >= 0) {
+//  log(LOG_DEBUG,"connection_read_to_buf(): read_to_buf returned %d.",read_result);
+  if(read_result >= 0 && connection_speaks_cells(conn)) {
     conn->receiver_bucket -= read_result;
     if(conn->receiver_bucket <= 0) {
 
+//      log(LOG_DEBUG,"connection_read_to_buf() stopping reading, receiver bucket full.");
       connection_stop_reading(conn);
 
       /* If we're not in 'open' state here, then we're never going to finish the
@@ -308,6 +318,14 @@
   return fetch_from_buf(string, len, &conn->inbuf, &conn->inbuflen, &conn->inbuf_datalen);
 }
 
+int connection_wants_to_flush(connection_t *conn) {
+  return conn->outbuf_flushlen;
+}
+
+int connection_outbuf_too_full(connection_t *conn) {
+  return (conn->outbuf_flushlen > 10*CELL_PAYLOAD_SIZE);
+}
+
 int connection_flush_buf(connection_t *conn) {
   return flush_buf(conn->s, &conn->outbuf, &conn->outbuflen, &conn->outbuf_flushlen, &conn->outbuf_datalen);
 }
@@ -321,7 +339,7 @@
       (options.LinkPadding == 0) ) {
     /* connection types other than or and op, or or/op not in 'open' state, should flush immediately */
     /* also flush immediately if we're not doing LinkPadding, since otherwise it will never flush */
-    connection_watch_events(conn, POLLOUT | POLLIN);
+    connection_start_writing(conn);
     conn->outbuf_flushlen += len;
   }
 
@@ -331,6 +349,9 @@
 int connection_receiver_bucket_should_increase(connection_t *conn) {
   assert(conn);
 
+  if(!connection_speaks_cells(conn))
+    return 0; /* edge connections don't use receiver_buckets */
+
   if(conn->receiver_bucket > 10*conn->bandwidth)
     return 0;
 
@@ -350,6 +371,15 @@
   }
 }
 
+int connection_speaks_cells(connection_t *conn) {
+  assert(conn);
+
+  if(conn->type == CONN_TYPE_OR || conn->type == CONN_TYPE_OP)
+    return 1;
+
+  return 0;
+}
+
 int connection_state_is_open(connection_t *conn) {
   assert(conn);
 
@@ -371,7 +401,7 @@
 
   assert(conn);
 
-  if(conn->type != CONN_TYPE_OR && conn->type != CONN_TYPE_OP) {
+  if(!connection_speaks_cells(conn)) {
     /* this conn doesn't speak cells. do nothing. */
     return;
   }
@@ -385,7 +415,7 @@
 #if 0 /* use to send evenly spaced cells, but not padding */
   if(conn->outbuf_datalen - conn->outbuf_flushlen >= sizeof(cell_t)) {
     conn->outbuf_flushlen += sizeof(cell_t); /* instruct it to send a cell */
-    connection_watch_events(conn, POLLOUT | POLLIN);
+    connection_start_writing(conn);
   }
 #endif
 
@@ -408,7 +438,7 @@
   }
 
   conn->outbuf_flushlen += sizeof(cell_t); /* instruct it to send a cell */
-  connection_watch_events(conn, POLLOUT | POLLIN);
+  connection_start_writing(conn);
 
 }
 
@@ -434,16 +464,12 @@
 
   assert(conn);
 
-  if(conn->type == CONN_TYPE_OP ||
-     conn->type == CONN_TYPE_AP ||
-     conn->type == CONN_TYPE_EXIT) {
+  if(!connection_speaks_cells(conn)) {
      log(LOG_DEBUG,"connection_send_destroy(): At an edge. Marking connection for close.");
      conn->marked_for_close = 1;
      return 0;
   }
 
-  assert(conn->type == CONN_TYPE_OR);
-
   cell.aci = aci;
   cell.command = CELL_DESTROY;
   log(LOG_DEBUG,"connection_send_destroy(): Sending destroy (aci %d).",aci);
@@ -452,7 +478,6 @@
 }
 
 int connection_write_cell_to_buf(cell_t *cellp, connection_t *conn) {
-  /* FIXME in the future, we should modify windows, etc, here */
  
   if(connection_encrypt_cell_header(cellp,conn)<0) {
     return -1;
@@ -464,10 +489,10 @@
 int connection_encrypt_cell_header(cell_t *cellp, connection_t *conn) {
   char newheader[8];
   int newsize;
+#if 0
   int x;
   char *px;
 
-#if 0
   printf("Sending: Cell header plaintext: ");
   px = (char *)cellp;
   for(x=0;x<8;x++) {
@@ -517,7 +542,8 @@
   circuit_t *circ;
 
   assert(conn);
-  assert(conn->type == CONN_TYPE_EXIT || conn->type == CONN_TYPE_AP);
+  assert(!connection_speaks_cells(conn));
+  /* this function should never get called if the receiver_window is 0 */
 
   amount_to_process = conn->inbuf_datalen;
 
@@ -548,6 +574,13 @@
       circuit_close(circ);
       return 0;
     }
+    assert(circ->n_receive_window > 0);
+    if(--circ->n_receive_window <= 0) { /* is it 0 after decrement? */
+      connection_stop_reading(circ->n_conn);
+      log(LOG_DEBUG,"connection_raw_package_inbuf(): receive_window at exit reached 0.");
+      return 0; /* don't process the inbuf any more */
+    }
+    log(LOG_DEBUG,"connection_raw_package_inbuf(): receive_window at exit is %d",circ->n_receive_window);
   } else { /* send it forward. we're an AP */
     cell.aci = circ->n_aci;
     cell.command = CELL_DATA;
@@ -557,17 +590,58 @@
       circuit_close(circ);
       return 0;
     }
+    assert(circ->p_receive_window > 0);
+    if(--circ->p_receive_window <= 0) { /* is it 0 after decrement? */
+      connection_stop_reading(circ->p_conn);
+      log(LOG_DEBUG,"connection_raw_package_inbuf(): receive_window at AP reached 0.");
+      return 0; /* don't process the inbuf any more */
+    }
+    log(LOG_DEBUG,"connection_raw_package_inbuf(): receive_window at AP is %d",circ->p_receive_window);
   }
   if(amount_to_process > CELL_PAYLOAD_SIZE)
+    log(LOG_DEBUG,"connection_raw_package_inbuf(): recursing.");
     return connection_package_raw_inbuf(conn);
   return 0;
 }
 
+int connection_consider_sending_sendme(connection_t *conn) {
+  circuit_t *circ;
+  cell_t sendme;
+
+  if(connection_outbuf_too_full(conn))
+    return 0;
+
+  circ = circuit_get_by_conn(conn);
+  if(!circ) {
+    log(LOG_DEBUG,"connection_consider_sending_sendme(): Bug: no circuit associated with conn. Closing.");
+    return -1;
+  }
+  sendme.command = CELL_SENDME;
+  sendme.length = RECEIVE_WINDOW_INCREMENT;
+
+  if(circ->n_conn == conn) { /* we're at an exit */
+    if(circ->p_receive_window < RECEIVE_WINDOW_START-RECEIVE_WINDOW_INCREMENT) {
+      log(LOG_DEBUG,"connection_consider_sending_sendme(): Queueing sendme back.");
+      circ->p_receive_window += RECEIVE_WINDOW_INCREMENT;
+      sendme.aci = circ->p_aci;
+      return connection_write_cell_to_buf(&sendme, circ->p_conn); /* (clobbers sendme) */
+    }
+  } else { /* we're at an AP */
+    if(circ->n_receive_window < RECEIVE_WINDOW_START-RECEIVE_WINDOW_INCREMENT) {
+      log(LOG_DEBUG,"connection_consider_sending_sendme(): Queueing sendme forward.");
+      circ->n_receive_window += RECEIVE_WINDOW_INCREMENT;
+      sendme.aci = circ->n_aci;
+      return connection_write_cell_to_buf(&sendme, circ->n_conn); /* (clobbers sendme) */
+    }
+  }
+  return 0;
+} 
+
 int connection_finished_flushing(connection_t *conn) {
 
   assert(conn);
 
-  log(LOG_DEBUG,"connection_finished_flushing() entered. Socket %u.", conn->s);
+//  log(LOG_DEBUG,"connection_finished_flushing() entered. Socket %u.", conn->s);
 
   switch(conn->type) {
     case CONN_TYPE_AP:
@@ -591,7 +665,7 @@
   char crypted[128];
   char outbuf[1024];
   int outlen;
-  int x;
+//  int x;
   cell_t *cellp;
 
   if(conn->inbuf_datalen < 128) /* entire response available? */
@@ -613,7 +687,7 @@
     log(LOG_ERR,"connection_process_cell_from_inbuf(): Decryption failed, dropping.");
     return connection_process_inbuf(conn); /* process the remainder of the buffer */
   }
-  log(LOG_DEBUG,"connection_process_cell_from_inbuf(): Cell decrypted (%d bytes).",outlen);
+//  log(LOG_DEBUG,"connection_process_cell_from_inbuf(): Cell decrypted (%d bytes).",outlen);
 #if 0
   printf("Cell header plaintext: ");
   for(x=0;x<8;x++) {
@@ -625,7 +699,7 @@
   /* copy the rest of the cell */
   memcpy((char *)outbuf+8, (char *)crypted+8, sizeof(cell_t)-8);
   cellp = (cell_t *)outbuf;
-  log(LOG_DEBUG,"connection_process_cell_from_inbuf(): Decrypted cell is of type %u (ACI %u).",cellp->command,cellp->aci);
+//  log(LOG_DEBUG,"connection_process_cell_from_inbuf(): Decrypted cell is of type %u (ACI %u).",cellp->command,cellp->aci);
   command_process_cell(cellp, conn);
 
   return connection_process_inbuf(conn); /* process the remainder of the buffer */

Index: connection_ap.c
===================================================================
RCS file: /home/or/cvsroot/src/or/connection_ap.c,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -d -r1.4 -r1.5
--- connection_ap.c	16 Jul 2002 01:12:15 -0000	1.4
+++ connection_ap.c	18 Jul 2002 06:37:58 -0000	1.5
@@ -343,14 +343,17 @@
 
   assert(conn && conn->type == CONN_TYPE_AP);
 
-  if(conn->state == AP_CONN_STATE_OPEN) {
-    log(LOG_DEBUG,"connection_ap_process_data_cell(): In state 'open', writing to buf.");
-    return connection_write_to_buf(cell->payload, cell->length, conn);
+  if(conn->state != AP_CONN_STATE_OPEN) {
+    /* we should not have gotten this cell */
+    log(LOG_DEBUG,"connection_ap_process_data_cell(): Got a data cell when not in 'open' state. Closing.");
+    return -1;
   }
 
-  /* else we shouldn't have gotten this cell */
-  log(LOG_DEBUG,"connection_ap_process_data_cell(): Got a data cell when not in 'open' state. Closing.");
-  return -1;
+  log(LOG_DEBUG,"connection_ap_process_data_cell(): In state 'open', writing to buf.");
+
+  if(connection_write_to_buf(cell->payload, cell->length, conn) < 0)
+    return -1;
+  return connection_consider_sending_sendme(conn);
 }     
 
 int connection_ap_finished_flushing(connection_t *conn) {
@@ -360,7 +363,8 @@
   switch(conn->state) {
     case AP_CONN_STATE_OPEN:
       /* FIXME down the road, we'll clear out circuits that are pending to close */
-      connection_watch_events(conn, POLLIN);
+      connection_stop_writing(conn);
+      connection_consider_sending_sendme(conn);
       return 0;
     default:
       log(LOG_DEBUG,"Bug: connection_ap_finished_flushing() called in unexpected state.");
@@ -377,7 +381,7 @@
 }
 
 int connection_ap_handle_listener_read(connection_t *conn) {
-  log(LOG_NOTICE,"AP: Received a connection request. Waiting for keys.");
+  log(LOG_NOTICE,"AP: Received a connection request. Waiting for SS.");
   return connection_handle_listener_read(conn, CONN_TYPE_AP, AP_CONN_STATE_SS_WAIT);
 } 
 

Index: connection_exit.c
===================================================================
RCS file: /home/or/cvsroot/src/or/connection_exit.c,v
retrieving revision 1.7
retrieving revision 1.8
diff -u -d -r1.7 -r1.8
--- connection_exit.c	16 Jul 2002 01:12:15 -0000	1.7
+++ connection_exit.c	18 Jul 2002 06:37:58 -0000	1.8
@@ -50,12 +50,14 @@
           conn->address,ntohs(conn->port));
       
       conn->state = EXIT_CONN_STATE_OPEN;
-      connection_flush_buf(conn); /* in case there are any queued data cells */
-      connection_watch_events(conn, POLLIN);
+      if(connection_wants_to_flush(conn)) /* in case there are any queued data cells */
+        connection_start_writing(conn);
+      connection_start_reading(conn);
       return 0;
     case EXIT_CONN_STATE_OPEN:
       /* FIXME down the road, we'll clear out circuits that are pending to close */
-      connection_watch_events(conn, POLLIN);
+      connection_stop_writing(conn);
+      connection_consider_sending_sendme(conn);
       return 0;
     default:
       log(LOG_DEBUG,"Bug: connection_exit_finished_flushing() called in unexpected state.");
@@ -101,7 +103,7 @@
           return -1;
         }
         memcpy(&conn->addr, rent->h_addr,rent->h_length); 
-	log(LOG_DEBUG,"connection_exit_process_data_cell(): addr %s resolves to %d.",cell->payload,conn->addr);
+	log(LOG_DEBUG,"connection_exit_process_data_cell(): addr is %s.",cell->payload);
       } else if (!conn->port) { /* this cell contains the dest port */
         if(!memchr(cell->payload,'\0',cell->length)) {
           log(LOG_DEBUG,"connection_exit_process_data_cell(): dest_port cell has no \\0. Closing.");
@@ -139,7 +141,6 @@
             connection_set_poll_socket(conn);
             conn->state = EXIT_CONN_STATE_CONNECTING;
       
-            /* i think only pollout is needed, but i'm curious if pollin ever gets caught -RD */
             log(LOG_DEBUG,"connection_exit_process_data_cell(): connect in progress, socket %d.",s);
             connection_watch_events(conn, POLLOUT | POLLIN);
             return 0;
@@ -161,11 +162,12 @@
       return 0;
     case EXIT_CONN_STATE_CONNECTING:
       log(LOG_DEBUG,"connection_exit_process_data_cell(): Data receiving while connecting. Queueing.");
-      /* this sets us to POLLOUT | POLLIN, which is ok because we need to keep listening for
-       * writable for connect() to finish */
-      return connection_write_to_buf(cell->payload, cell->length, conn);
+      /* we stay listening for writable, so connect() can finish */
+      /* fall through to the next state -- write the cell and consider sending back a sendme */
     case EXIT_CONN_STATE_OPEN:
-      return connection_write_to_buf(cell->payload, cell->length, conn);
+      if(connection_write_to_buf(cell->payload, cell->length, conn) < 0)
+        return -1;
+      return connection_consider_sending_sendme(conn);
   }
 
   return 0;

Index: connection_op.c
===================================================================
RCS file: /home/or/cvsroot/src/or/connection_op.c,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -d -r1.4 -r1.5
--- connection_op.c	16 Jul 2002 01:12:15 -0000	1.4
+++ connection_op.c	18 Jul 2002 06:37:58 -0000	1.5
@@ -98,7 +98,7 @@
   switch(conn->state) {
     case OP_CONN_STATE_OPEN:
       /* FIXME down the road, we'll clear out circuits that are pending to close */
-      connection_watch_events(conn, POLLIN);
+      connection_stop_writing(conn);
       return 0;
     default:
       log(LOG_DEBUG,"Bug: connection_op_finished_flushing() called in unexpected state.");

Index: connection_or.c
===================================================================
RCS file: /home/or/cvsroot/src/or/connection_or.c,v
retrieving revision 1.6
retrieving revision 1.7
diff -u -d -r1.6 -r1.7
--- connection_or.c	16 Jul 2002 01:12:15 -0000	1.6
+++ connection_or.c	18 Jul 2002 06:37:58 -0000	1.7
@@ -97,7 +97,7 @@
       return 0;
     case OR_CONN_STATE_OPEN:
       /* FIXME down the road, we'll clear out circuits that are pending to close */
-      connection_watch_events(conn, POLLIN);
+      connection_stop_writing(conn);
       return 0;
     default:
       log(LOG_DEBUG,"Bug: connection_or_finished_flushing() called in unexpected state.");
@@ -187,9 +187,8 @@
         return NULL;
       }
 
-      /* i think only pollout is needed, but i'm curious if pollin ever gets caught -RD */
       log(LOG_DEBUG,"connection_or_connect() : connect in progress.");
-      connection_watch_events(conn, POLLOUT | POLLIN);
+      connection_watch_events(conn, POLLIN | POLLOUT); /* writable indicates finish, readable indicates broken link */
       *result = 1; /* connecting */
       return conn;
 

Index: main.c
===================================================================
RCS file: /home/or/cvsroot/src/or/main.c,v
retrieving revision 1.12
retrieving revision 1.13
diff -u -d -r1.12 -r1.13
--- main.c	16 Jul 2002 18:24:12 -0000	1.12
+++ main.c	18 Jul 2002 06:37:58 -0000	1.13
@@ -198,6 +198,7 @@
 
   assert(conn && conn->poll_index < nfds);
 
+  log(LOG_DEBUG,"connection_stop_reading() called.");
   if(poll_array[conn->poll_index].events & POLLIN)
     poll_array[conn->poll_index].events -= POLLIN;
 }
@@ -209,6 +210,22 @@
   poll_array[conn->poll_index].events |= POLLIN;
 }
 
+void connection_stop_writing(connection_t *conn) {
+
+  assert(conn && conn->poll_index < nfds);
+
+  if(poll_array[conn->poll_index].events & POLLOUT)
+    poll_array[conn->poll_index].events -= POLLOUT;
+}
+
+void connection_start_writing(connection_t *conn) {
+
+  assert(conn && conn->poll_index < nfds);
+
+  poll_array[conn->poll_index].events |= POLLOUT;
+}
+
+
 void check_conn_read(int i) {
   int retval;
   connection_t *conn;
@@ -257,7 +274,7 @@
   if(poll_array[i].revents & POLLOUT) { /* something to write */
 
     conn = connection_array[i];
-    log(LOG_DEBUG,"check_conn_write(): socket %d wants to write.",conn->s);
+//    log(LOG_DEBUG,"check_conn_write(): socket %d wants to write.",conn->s);
 
     if(conn->type == CONN_TYPE_OP_LISTENER ||
        conn->type == CONN_TYPE_OR_LISTENER) {
@@ -277,7 +294,7 @@
       connection_free(conn);
       if(i<nfds) { /* we just replaced the one at i with a new one.
                       process it too. */
-        check_conn_read(i);
+        check_conn_write(i);
       }
     }
   }
@@ -327,8 +344,9 @@
 
   if(need_to_refill_buckets) {
     if(now.tv_sec > current_second) { /* the second has already rolled over! */
-      log(LOG_DEBUG,"prepare_for_poll(): The second has rolled over, immediately refilling.");
-      increment_receiver_buckets();
+//      log(LOG_DEBUG,"prepare_for_poll(): The second has rolled over, immediately refilling.");
+      for(i=0;i<nfds;i++)
+        connection_increment_receiver_bucket(connection_array[i]);
       current_second = now.tv_sec; /* remember which second it is, for next time */
     }
     *timeout = 1000 - (now.tv_usec / 1000); /* how many milliseconds til the next second? */
@@ -339,7 +357,7 @@
     /* now check which conn wants to speak soonest */
     for(i=0;i<nfds;i++) {
       tmpconn = connection_array[i];
-      if(tmpconn->type != CONN_TYPE_OR && tmpconn->type != CONN_TYPE_OP)
+      if(!connection_speaks_cells(tmpconn))
         continue; /* this conn type doesn't send cells */
       if(!connection_state_is_open(tmpconn))
         continue; /* only conns in state 'open' have a valid send_timeval */ 
@@ -370,13 +388,6 @@
   }
 
   return 0;
-}
-
-void increment_receiver_buckets(void) {
-  int i;
-
-  for(i=0;i<nfds;i++)
-    connection_increment_receiver_bucket(connection_array[i]);
 }
 
 int do_main_loop(void) {

Index: or.h
===================================================================
RCS file: /home/or/cvsroot/src/or/or.h,v
retrieving revision 1.10
retrieving revision 1.11
diff -u -d -r1.10 -r1.11
--- or.h	16 Jul 2002 01:12:15 -0000	1.10
+++ or.h	18 Jul 2002 06:37:58 -0000	1.11
@@ -112,6 +112,9 @@
 /* default cipher function */
 #define ONION_DEFAULT_CIPHER ONION_CIPHER_DES
 
+#define RECEIVE_WINDOW_START 100
+#define RECEIVE_WINDOW_INCREMENT 10
+
 typedef uint16_t aci_t;
 
 typedef struct
@@ -240,6 +243,8 @@
   uint16_t n_port;
   connection_t *p_conn;
   connection_t *n_conn;
+  int n_receive_window;
+  int p_receive_window;
 
   aci_t p_aci; /* connection identifiers */
   aci_t n_aci;
@@ -370,6 +375,7 @@
 void command_process_cell(cell_t *cell, connection_t *conn);
 
 void command_process_create_cell(cell_t *cell, connection_t *conn);
+void command_process_sendme_cell(cell_t *cell, connection_t *conn);
 void command_process_data_cell(cell_t *cell, connection_t *conn);
 void command_process_destroy_cell(cell_t *cell, connection_t *conn);
 
@@ -402,6 +408,8 @@
 
 int connection_fetch_from_buf(char *string, int len, connection_t *conn);
 
+int connection_outbuf_too_full(connection_t *conn);
+int connection_wants_to_flush(connection_t *conn);
 int connection_flush_buf(connection_t *conn);
 
 int connection_write_to_buf(char *string, int len, connection_t *conn);
@@ -413,6 +421,7 @@
 void connection_increment_send_timeval(connection_t *conn);
 void connection_init_timeval(connection_t *conn);
 
+int connection_speaks_cells(connection_t *conn);
 int connection_state_is_open(connection_t *conn);
 
 int connection_send_destroy(aci_t aci, connection_t *conn);
@@ -423,6 +432,7 @@
 int connection_package_raw_inbuf(connection_t *conn);
 int connection_process_cell_from_inbuf(connection_t *conn);
 
+int connection_consider_sending_sendme(connection_t *conn);
 int connection_finished_flushing(connection_t *conn);
 
 /********************************* connection_ap.c ****************************/
@@ -513,13 +523,14 @@
 void connection_watch_events(connection_t *conn, short events);
 void connection_stop_reading(connection_t *conn);
 void connection_start_reading(connection_t *conn);
+void connection_stop_writing(connection_t *conn);
+void connection_start_writing(connection_t *conn);
 
 void check_conn_read(int i);
 void check_conn_marked(int i);
 void check_conn_write(int i);
 
 int prepare_for_poll(int *timeout);
-void increment_receiver_buckets(void);
 
 int do_main_loop(void);