[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[or-cvs] implemented total read rate limiting



Update of /home/or/cvsroot/src/or
In directory moria.mit.edu:/home2/arma/work/onion/cvs/src/or

Modified Files:
	buffers.c config.c connection.c main.c or.h 
Log Message:
implemented total read rate limiting


Index: buffers.c
===================================================================
RCS file: /home/or/cvsroot/src/or/buffers.c,v
retrieving revision 1.25
retrieving revision 1.26
diff -u -d -r1.25 -r1.26
--- buffers.c	25 Jun 2003 00:31:41 -0000	1.25
+++ buffers.c	5 Jul 2003 07:10:34 -0000	1.26
@@ -28,7 +28,7 @@
   free(buf);
 }
 
-/* read from socket s, writing onto buf+buf_datalen. If at_most is >= 0 then
+/* read from socket s, writing onto buf+buf_datalen.
  * read at most 'at_most' bytes, and in any case don't read more than will fit based on buflen.
  * If read() returns 0, set *reached_eof to 1 and return 0. If you want to tear
  * down the connection return -1, else return the number of bytes read.
@@ -41,9 +41,8 @@
 
   /* this is the point where you would grow the buffer, if you want to */
 
-  if(at_most < 0 || *buflen - *buf_datalen < at_most)
+  if(at_most > *buflen - *buf_datalen)
     at_most = *buflen - *buf_datalen; /* take the min of the two */
-    /* (note that this only modifies at_most inside this function) */
 
   if(at_most == 0)
     return 0; /* we shouldn't read anything */

Index: config.c
===================================================================
RCS file: /home/or/cvsroot/src/or/config.c,v
retrieving revision 1.35
retrieving revision 1.36
diff -u -d -r1.35 -r1.36
--- config.c	17 Jun 2003 22:18:26 -0000	1.35
+++ config.c	5 Jul 2003 07:10:34 -0000	1.36
@@ -188,6 +188,7 @@
     config_compare(list, "KeepalivePeriod", CONFIG_TYPE_INT, &options->KeepalivePeriod) ||
     config_compare(list, "MaxOnionsPending",CONFIG_TYPE_INT, &options->MaxOnionsPending) ||
     config_compare(list, "NewCircuitPeriod",CONFIG_TYPE_INT, &options->NewCircuitPeriod) ||
+    config_compare(list, "TotalBandwidth",  CONFIG_TYPE_INT, &options->TotalBandwidth) ||
 
     config_compare(list, "OnionRouter",     CONFIG_TYPE_BOOL, &options->OnionRouter) ||
     config_compare(list, "Daemon",          CONFIG_TYPE_BOOL, &options->Daemon) ||
@@ -216,18 +217,17 @@
   const char *cmd;
   int result = 0;
 
-/* give reasonable defaults for each option */
+/* give reasonable values for each option. Defaults to zero. */
   memset(options,0,sizeof(or_options_t));
-  options->Daemon = 0;
   options->LogLevel = "debug";
   options->loglevel = LOG_DEBUG;
   options->CoinWeight = 0.8;
-  options->LinkPadding = 0;
   options->MaxConn = 900;
   options->DirFetchPeriod = 600;
   options->KeepalivePeriod = 300;
   options->MaxOnionsPending = 10;
   options->NewCircuitPeriod = 60; /* once a minute */
+  options->TotalBandwidth = 800000; /* at most 800kB/s total sustained incoming */
 //  options->ReconnectPeriod = 6001;
 
 /* get config lines from /etc/torrc and assign them */

Index: connection.c
===================================================================
RCS file: /home/or/cvsroot/src/or/connection.c,v
retrieving revision 1.70
retrieving revision 1.71
diff -u -d -r1.70 -r1.71
--- connection.c	3 Jul 2003 03:40:47 -0000	1.70
+++ connection.c	5 Jul 2003 07:10:34 -0000	1.71
@@ -8,6 +8,8 @@
 
 extern or_options_t options; /* command-line and config-file options */
 
+extern int global_read_bucket;
+
 char *conn_type_to_string[] = {
   "",            /* 0 */
   "OP listener", /* 1 */
@@ -265,6 +267,7 @@
 int connection_read_to_buf(connection_t *conn) {
   int read_result;
   struct timeval now;
+  int at_most = global_read_bucket;
 
   if(connection_speaks_cells(conn)) {
     assert(conn->receiver_bucket >= 0);
@@ -277,21 +280,25 @@
 
   conn->timestamp_lastread = now.tv_sec;
 
-  read_result = read_to_buf(conn->s, conn->receiver_bucket, &conn->inbuf, &conn->inbuflen,
+  if(conn->receiver_bucket >= 0 && at_most > conn->receiver_bucket)
+    at_most = conn->receiver_bucket;
+
+  read_result = read_to_buf(conn->s, at_most, &conn->inbuf, &conn->inbuflen,
                             &conn->inbuf_datalen, &conn->inbuf_reached_eof);
 //  log(LOG_DEBUG,"connection_read_to_buf(): read_to_buf returned %d.",read_result);
-  if(read_result >= 0 && connection_speaks_cells(conn)) {
-//    log(LOG_DEBUG,"connection_read_to_buf(): Read %d, bucket now %d.",read_result,conn->receiver_bucket);
-    conn->receiver_bucket -= read_result;
-    if(conn->receiver_bucket <= 0) {
-
-//      log(LOG_DEBUG,"connection_read_to_buf() stopping reading, receiver bucket full.");
+  if(read_result >= 0) {
+    global_read_bucket -= read_result; assert(global_read_bucket >= 0);
+    if(connection_speaks_cells(conn))
+      conn->receiver_bucket -= read_result;
+    if(conn->receiver_bucket == 0 || global_read_bucket == 0) {
+      log_fn(LOG_DEBUG,"buckets (%d, %d) exhausted. Pausing.", global_read_bucket, conn->receiver_bucket);
+      conn->wants_to_read = 1;
       connection_stop_reading(conn);
 
       /* If we're not in 'open' state here, then we're never going to finish the
        * handshake, because we'll never increment the receiver_bucket. But we
        * can't check for that here, because the buf we just read might have enough
-       * on it to finish the handshake. So we check for that in check_conn_read().
+       * on it to finish the handshake. So we check for that in conn_read().
        */
     }
   }
@@ -350,26 +357,12 @@
   if(!connection_speaks_cells(conn))
     return 0; /* edge connections don't use receiver_buckets */
 
-  if(conn->receiver_bucket > 10*conn->bandwidth)
+  if(conn->receiver_bucket > 9*conn->bandwidth)
     return 0;
 
   return 1;
 }
 
-void connection_increment_receiver_bucket(connection_t *conn) {
-  assert(conn);
-
-  if(connection_receiver_bucket_should_increase(conn)) {
-    /* yes, the receiver_bucket can become overfull here. But not by much. */
-    conn->receiver_bucket += conn->bandwidth*1.1;
-//    log(LOG_DEBUG,"connection_increment_receiver_bucket(): Bucket now %d.",conn->receiver_bucket);
-    if(connection_state_is_open(conn)) {
-      /* if we're in state 'open', then start reading again */
-      connection_start_reading(conn);
-    }
-  }
-}
-
 int connection_is_listener(connection_t *conn) {
   if(conn->type == CONN_TYPE_OR_LISTENER ||
      conn->type == CONN_TYPE_AP_LISTENER ||
@@ -392,6 +385,8 @@
 void connection_send_cell(connection_t *conn) {
   cell_t cell;
   int bytes_in_full_flushlen;
+
+  assert(0); /* this function has decayed. rewrite before using. */
 
   /* this function only gets called if options.LinkPadding is 1 */
   assert(options.LinkPadding == 1);

Index: main.c
===================================================================
RCS file: /home/or/cvsroot/src/or/main.c,v
retrieving revision 1.76
retrieving revision 1.77
diff -u -d -r1.76 -r1.77
--- main.c	5 Jul 2003 05:46:05 -0000	1.76
+++ main.c	5 Jul 2003 07:10:34 -0000	1.77
@@ -11,6 +11,7 @@
 /********* START VARIABLES **********/
 
 or_options_t options; /* command-line and config-file options */
+int global_read_bucket; /* max number of bytes I can read this second */
 
 static connection_t *connection_array[MAXCONNECTIONS] =
         { NULL };
@@ -367,10 +368,25 @@
       time_to_new_circuit = now.tv_sec + options.NewCircuitPeriod;
     }
 
+    if(global_read_bucket < 9*options.TotalBandwidth) {
+      global_read_bucket += options.TotalBandwidth;
+      log_fn(LOG_DEBUG,"global_read_bucket now %d.", global_read_bucket);
+    }
+
     /* do housekeeping for each connection */
     for(i=0;i<nfds;i++) {
       tmpconn = connection_array[i];
-      connection_increment_receiver_bucket(tmpconn);
+      if(connection_receiver_bucket_should_increase(tmpconn)) {
+        tmpconn->receiver_bucket += tmpconn->bandwidth;
+//        log_fn(LOG_DEBUG,"Receiver bucket %d now %d.", i, tmpconn->receiver_bucket);
+      }
+
+      if(tmpconn->wants_to_read == 1 /* it's marked to turn reading back on now */
+         && global_read_bucket > 0 /* and we're allowed to read */
+         && tmpconn->receiver_bucket != 0) { /* and either an edge conn or non-empty bucket */
+        tmpconn->wants_to_read = 0;
+        connection_start_reading(tmpconn);
+      }
 
       /* check connections to see whether we should send a keepalive, expire, or wait */
       if(!connection_speaks_cells(tmpconn))
@@ -811,6 +827,7 @@
   if(getconfig(argc,argv,&options))
     exit(1);
   log_set_severity(options.loglevel);     /* assign logging severity level from options */
+  global_read_bucket = options.TotalBandwidth; /* start it at 1 second of traffic */
 
   if(options.Daemon)
     daemonize();

Index: or.h
===================================================================
RCS file: /home/or/cvsroot/src/or/or.h,v
retrieving revision 1.97
retrieving revision 1.98
diff -u -d -r1.97 -r1.98
--- or.h	5 Jul 2003 05:46:06 -0000	1.97
+++ or.h	5 Jul 2003 07:10:34 -0000	1.98
@@ -550,7 +550,6 @@
 void connection_send_cell(connection_t *conn);
 
 int connection_receiver_bucket_should_increase(connection_t *conn);
-void connection_increment_receiver_bucket (connection_t *conn);
 
 void connection_increment_send_timeval(connection_t *conn);
 void connection_init_timeval(connection_t *conn);