[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[or-cvs] checkpoint: start working on bandwidth tracking and



Update of /home/or/cvsroot/src/or
In directory moria.mit.edu:/home2/arma/work/onion/cvs/src/or

Modified Files:
	config.c connection.c main.c or.h rephist.c 
Log Message:
checkpoint: start working on bandwidth tracking and
letting clients evaluate whether they're suitable servers


Index: config.c
===================================================================
RCS file: /home/or/cvsroot/src/or/config.c,v
retrieving revision 1.123
retrieving revision 1.124
diff -u -d -r1.123 -r1.124
--- config.c	4 Jul 2004 22:48:11 -0000	1.123
+++ config.c	13 Jul 2004 07:42:20 -0000	1.124
@@ -193,6 +193,7 @@
     config_compare(list, "BandwidthRate",  CONFIG_TYPE_INT, &options->BandwidthRate) ||
     config_compare(list, "BandwidthBurst", CONFIG_TYPE_INT, &options->BandwidthBurst) ||
 
+    config_compare(list, "ClientOnly",     CONFIG_TYPE_BOOL, &options->ClientOnly) ||
     config_compare(list, "ContactInfo",    CONFIG_TYPE_STRING, &options->ContactInfo) ||
 
     config_compare(list, "DebugLogFile",   CONFIG_TYPE_STRING, &options->DebugLogFile) ||
@@ -678,6 +679,10 @@
     result = -1;
   }
 
+  /* XXX008 if AuthDir and !ORPort then fail */
+
+  /* XXX008 if AuthDir and ClientOnly then fail */
+
   if(options->SocksPort > 1 &&
      (options->PathlenCoinWeight < 0.0 || options->PathlenCoinWeight >= 1.0)) {
     log(LOG_WARN,"PathlenCoinWeight option must be >=0.0 and <1.0.");

Index: connection.c
===================================================================
RCS file: /home/or/cvsroot/src/or/connection.c,v
retrieving revision 1.237
retrieving revision 1.238
diff -u -d -r1.237 -r1.238
--- connection.c	12 Jul 2004 20:39:40 -0000	1.237
+++ connection.c	13 Jul 2004 07:42:20 -0000	1.238
@@ -820,10 +820,12 @@
  */
 int connection_handle_write(connection_t *conn) {
   int e, len=sizeof(e);
+  int result;
+  time_t now = time(NULL);
 
   tor_assert(!connection_is_listener(conn));
 
-  conn->timestamp_lastwritten = time(NULL);
+  conn->timestamp_lastwritten = now;
 
   /* Sometimes, "writeable" means "connected". */
   if (connection_state_is_connecting(conn)) {
@@ -859,7 +861,8 @@
     }
 
     /* else open, or closing */
-    switch(flush_buf_tls(conn->tls, conn->outbuf, &conn->outbuf_flushlen)) {
+    result = flush_buf_tls(conn->tls, conn->outbuf, &conn->outbuf_flushlen);
+    switch(result) {
       case TOR_TLS_ERROR:
       case TOR_TLS_CLOSE:
         log_fn(LOG_INFO,"tls error. breaking.");
@@ -888,7 +891,8 @@
        */
     }
   } else {
-    if (flush_buf(conn->s, conn->outbuf, &conn->outbuf_flushlen) < 0) {
+    result = flush_buf(conn->s, conn->outbuf, &conn->outbuf_flushlen);
+    if (result < 0) {
       connection_close_immediate(conn); /* Don't flush; connection is dead. */
       conn->has_sent_end = 1;
       connection_mark_for_close(conn);
@@ -896,6 +900,10 @@
     }
   }
 
+  if(result > 0) { /* remember it */
+    rep_hist_note_bytes_written(result, now);
+  }
+
   if(!connection_wants_to_flush(conn)) { /* it's done flushing */
     if(connection_finished_flushing(conn) < 0) {
       /* already marked */

Index: main.c
===================================================================
RCS file: /home/or/cvsroot/src/or/main.c,v
retrieving revision 1.289
retrieving revision 1.290
diff -u -d -r1.289 -r1.290
--- main.c	13 Jul 2004 00:38:08 -0000	1.289
+++ main.c	13 Jul 2004 07:42:20 -0000	1.290
@@ -366,16 +366,14 @@
     return;
   }
 
-  /* check connections to see whether we should send a keepalive, expire, or wait */
-  if(!connection_speaks_cells(conn))
-    return;
-
   /* If we haven't written to an OR connection for a while, then either nuke
      the connection or send a keepalive, depending. */
-  if(now >= conn->timestamp_lastwritten + options.KeepalivePeriod) {
+  if(connection_speaks_cells(conn) &&
+     now >= conn->timestamp_lastwritten + options.KeepalivePeriod) {
     if((!options.ORPort && !circuit_get_by_conn(conn)) ||
        (!connection_state_is_open(conn))) {
-      /* we're an onion proxy, with no circuits; or our handshake has expired. kill it. */
+      /* we're an onion proxy, with no circuits;
+       * or our handshake has expired. kill it. */
       log_fn(LOG_INFO,"Expiring connection to %d (%s:%d).",
              i,conn->address, conn->port);
       /* flush anything waiting, e.g. a destroy for a just-expired circ */
@@ -392,6 +390,36 @@
   }
 }
 
+#define MIN_BW_TO_PUBLISH_DESC 5000 /* 5000 bytes sustained */
+#define MIN_UPTIME_TO_PUBLISH_DESC (30*60) /* half an hour */
+
+/** Decide if we're a server or just a client. We are a server if:
+ * - We have the AuthoritativeDirectory option set.
+ * or
+ * - We don't have the ClientOnly option set; and
+ * - We have ORPort set; and
+ * - We have been up for at least MIN_UPTIME_TO_PUBLISH_DESC seconds; and
+ * - We have processed some suitable minimum bandwidth recently; and
+ * - We believe we are reachable from the outside.
+ */
+static int decide_if_server(time_t now) {
+
+  if(options.AuthoritativeDir)
+    return 1;
+  if(options.ClientOnly)
+    return 0;
+  if(!options.ORPort)
+    return 0;
+
+  /* here, determine if we're reachable */
+
+  if(stats_n_seconds_uptime < MIN_UPTIME_TO_PUBLISH_DESC)
+    return 0;
+  if(rep_hist_bandwidth_assess(now) < MIN_BW_TO_PUBLISH_DESC)
+    return 0;
+  return 1;
+}
+
 /** Perform regular maintenance tasks.  This function gets run once per
  * second by prepare_for_poll.
  */
@@ -401,7 +429,6 @@
   static time_t last_rotated_certificate = 0;
   int i;
 
-
   /** 1a. Every MIN_ONION_KEY_LIFETIME seconds, rotate the onion keys,
    *  shut down and restart all cpuworkers, and update the directory if
    *  necessary.
@@ -430,37 +457,42 @@
      * XXXX them at all. */
   }
 
-  /** 1c. Every DirFetchPostPeriod seconds, we get a new directory and upload
-   *    our descriptor (if any). */
+  /** 2. Every DirFetchPostPeriod seconds, we get a new directory and upload
+   *    our descriptor (if we've passed our internal checks). */
   if(time_to_fetch_directory < now) {
-    /* it's time to fetch a new directory and/or post our descriptor */
-    if(options.ORPort) {
+
+    if(decide_if_server(now)) {
       router_rebuild_descriptor();
       router_upload_dir_desc_to_dirservers();
     }
+
     routerlist_remove_old_routers(); /* purge obsolete entries */
+
     if(options.AuthoritativeDir) {
       /* We're a directory; dump any old descriptors. */
       dirserv_remove_old_servers();
       /* dirservers try to reconnect too, in case connections have failed */
       router_retry_connections();
     }
+
     directory_get_from_dirserver(DIR_PURPOSE_FETCH_DIR, NULL, 0);
-    /* Force an upload of our descriptors every DirFetchPostPeriod seconds. */
+
+    /* Force an upload of our rend descriptors every DirFetchPostPeriod seconds. */
     rend_services_upload(1);
     last_uploaded_services = now;
     rend_cache_clean(); /* should this go elsewhere? */
+
     time_to_fetch_directory = now + options.DirFetchPostPeriod;
   }
 
-  /** 2. Every second, we examine pending circuits and prune the
+  /** 3a. Every second, we examine pending circuits and prune the
    *    ones which have been pending for more than a few seconds.
    *    We do this before step 3, so it can try building more if
    *    it's not comfortable with the number of available circuits.
    */
   circuit_expire_building(now);
 
-  /** 2b. Also look at pending streams and prune the ones that 'began'
+  /** 3b. Also look at pending streams and prune the ones that 'began'
    *     a long time ago but haven't gotten a 'connected' yet.
    *     Do this before step 3, so we can put them back into pending
    *     state to be picked up by the new circuit.
@@ -468,11 +500,11 @@
   connection_ap_expire_beginning();
 
 
-  /** 2c. And expire connections that we've held open for too long.
+  /** 3c. And expire connections that we've held open for too long.
    */
   connection_expire_held_open();
 
-  /** 3. Every second, we try a new circuit if there are no valid
+  /** 4. Every second, we try a new circuit if there are no valid
    *    circuits. Every NewCircuitPeriod seconds, we expire circuits
    *    that became dirty more than NewCircuitPeriod seconds ago,
    *    and we make a new circ if there are no clean circuits.
@@ -480,22 +512,22 @@
   if(has_fetched_directory)
     circuit_build_needed_circs(now);
 
-  /** 4. We do housekeeping for each connection... */
+  /** 5. We do housekeeping for each connection... */
   for(i=0;i<nfds;i++) {
     run_connection_housekeeping(i, now);
   }
 
-  /** 5. And remove any marked circuits... */
+  /** 6. And remove any marked circuits... */
   circuit_close_all_marked();
 
-  /** 6. And upload service descriptors for any services whose intro points
+  /** 7. And upload service descriptors for any services whose intro points
    *    have changed in the last second. */
   if (last_uploaded_services < now-5) {
     rend_services_upload(0);
     last_uploaded_services = now;
   }
 
-  /** 7. and blow away any connections that need to die. have to do this now,
+  /** 8. and blow away any connections that need to die. have to do this now,
    * because if we marked a conn for close and left its socket -1, then
    * we'll pass it to poll/select and bad things will happen.
    */
@@ -510,6 +542,7 @@
 static int prepare_for_poll(void) {
   static long current_second = 0; /* from previous calls to gettimeofday */
   connection_t *conn;
+  int bytes_read;
   struct timeval now;
   int i;
 
@@ -517,8 +550,12 @@
 
   /* Check how much bandwidth we've consumed, and increment the token
    * buckets. */
-  stats_n_bytes_read += stats_prev_global_read_bucket-global_read_bucket;
+  bytes_read = stats_prev_global_read_bucket - global_read_bucket;
+  stats_n_bytes_read += bytes_read;
   connection_bucket_refill(&now);
+  if (bytes_read > 0) {
+    rep_hist_note_bytes_read(bytes_read, now.tv_sec);
+  }
   stats_prev_global_read_bucket = global_read_bucket;
 
   if(now.tv_sec > current_second) { /* the second has rolled over. check more stuff. */

Index: or.h
===================================================================
RCS file: /home/or/cvsroot/src/or/or.h,v
retrieving revision 1.377
retrieving revision 1.378
diff -u -d -r1.377 -r1.378
--- or.h	12 Jul 2004 20:39:40 -0000	1.377
+++ or.h	13 Jul 2004 07:42:20 -0000	1.378
@@ -840,6 +840,7 @@
   int SocksPort; /**< Port to listen on for SOCKS connections. */
   int DirPort; /**< Port to listen on for directory connections. */
   int AuthoritativeDir; /**< Boolean: is this an authoritative directory? */
+  int ClientOnly; /**< Boolean: should we never evolve into a server role? */
   int MaxConn; /**< Maximum number of simultaneous connections. */
   int TrafficShaping; /**< Unused. */
   int LinkPadding; /**< Unused. */

Index: rephist.c
===================================================================
RCS file: /home/or/cvsroot/src/or/rephist.c,v
retrieving revision 1.10
retrieving revision 1.11
diff -u -d -r1.10 -r1.11
--- rephist.c	25 Jun 2004 00:29:31 -0000	1.10
+++ rephist.c	13 Jul 2004 07:42:20 -0000	1.11
@@ -287,6 +287,54 @@
 }
 #endif
 
+#define NUM_SECS_ROLLING_MEASURE 10
+#define NUM_SECS_BW_SUM_IS_VALID (12*60*60) /* half a day */
+
+/** We read <b>num_bytes</b> more bytes in second <b>when</b>.
+ *
+ * Add num_bytes to the current running total for <b>when</b>.
+ *
+ * <b>when</b> can go back to time, but it's safe to ignore calls
+ * earlier that the latest <b>when</b> you've heard of.
+ */
+void rep_hist_note_bytes_written(int num_bytes, time_t when) {
+/* Maybe a circular array for recent seconds, and step to a new point
+ * every time a new second shows up. Or simpler is to just to have
+ * a normal array and push down each item every second; it's short.
+ */
+/* When a new second has rolled over, compute the sum of the bytes we've
+ * seen over when-1 to when-1-NUM_SECS_ROLLING_MEASURE, and stick it
+ * somewhere. See rep_hist_bandwidth_assess() below.
+ */
+
+}
+
+/** We wrote <b>num_bytes</b> more bytes in second <b>when</b>.
+ * (like rep_hist_note_bytes_written() above)
+ */
+void rep_hist_note_bytes_read(int num_bytes, time_t when) {
+/* if we're smart, we can make this func and the one above share code */
+}
+
+/**
+ * Find the largest sums in the past NUM_SECS_BW_SUM_IS_VALID (roughly)
+ * seconds. Find one sum for reading and one for writing. They don't have
+ * to be at the same time).
+ *
+ * Return the smaller of these sums, divided by NUM_SECS_ROLLING_MEASURE.
+ */
+int rep_hist_bandwidth_assess(time_t when) {
+/* To get a handle on space complexity, I promise I will call this
+ * function at most every options.DirFetchPostPeriod seconds. So in
+ * rep_hist_note_bytes_foo() above, you could keep a running max sum
+ * for the current period, and when the period ends you can tuck it away
+ * in a circular array of more managable size. We lose a bit of precision,
+ * but this is all guesswork anyway.
+ */
+
+  return 0;
+}
+
 /*
   Local Variables:
   mode:c