[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[or-cvs] r9995: Initial version of code to stop using socket pairs for linke (in tor/trunk: . doc src/or)



Author: nickm
Date: 2007-04-21 13:26:12 -0400 (Sat, 21 Apr 2007)
New Revision: 9995

Modified:
   tor/trunk/
   tor/trunk/ChangeLog
   tor/trunk/doc/TODO
   tor/trunk/src/or/buffers.c
   tor/trunk/src/or/connection.c
   tor/trunk/src/or/connection_edge.c
   tor/trunk/src/or/directory.c
   tor/trunk/src/or/main.c
   tor/trunk/src/or/or.h
   tor/trunk/src/or/test.c
Log:
 r12763@Kushana:  nickm | 2007-04-20 18:42:58 -0400
 Initial version of code to stop using socket pairs for linked connections.  Superficially, it seems to work, but it probably needs a lot more testing and attention.



Property changes on: tor/trunk
___________________________________________________________________
 svk:merge ticket from /tor/branches/socketpair [r12763] on c95137ef-5f19-0410-b913-86e773d04f59

Modified: tor/trunk/ChangeLog
===================================================================
--- tor/trunk/ChangeLog	2007-04-21 17:24:18 UTC (rev 9994)
+++ tor/trunk/ChangeLog	2007-04-21 17:26:12 UTC (rev 9995)
@@ -21,6 +21,11 @@
     - Count the number of open sockets separately from the number of active
       connection_t objects.  This will let us avoid underusing our
       allocated connection limit.
+    - We no longer use socket pairs to link an edge connection to an
+      anonymous directory connection.  Instead, we track the link
+      internally and transfer the data in-process.  This saves two
+      sockets per anonymous directory connection (at the client and at
+      the server), and avoids the nasty Windows socketpair() workaround.
 
   o Minor features (build):
     - Make autoconf search for libevent, openssl, and zlib consistently.

Modified: tor/trunk/doc/TODO
===================================================================
--- tor/trunk/doc/TODO	2007-04-21 17:24:18 UTC (rev 9994)
+++ tor/trunk/doc/TODO	2007-04-21 17:26:12 UTC (rev 9995)
@@ -54,6 +54,19 @@
 Things we'd like to do in 0.2.0.x:
   - Proposals:
     - 101: Voting on the Tor Directory System
+      - Prepare ASAP for new voting formats
+        - Don't flip out with warnings when voting-related URLs are
+          uploaded/downloaded.
+      - Finalize proposal
+      - Get authorities voting
+        - Implement parsing for new document formats
+        - Code to generate votes
+        - Code to generate consensus from a list of votes
+        - Add a signature to a consensus.
+        - Code to check signatures on a consensus
+        - Push/pull documents as appropriate.
+      - Start caching consensus documents once authorities make them
+      - Start downloading and using consensus documents once caches serve them
     . 104: Long and Short Router Descriptors (by Jun 1)
       . Finalize proposal
       o Implement parsing for extra-info documents
@@ -109,14 +122,24 @@
           to make sure that we call the event base dispatch function enough.)
       . Implement
         o Count connections and sockets separately
-        - Allow connections with s == -1
-        - Add a linked_conn field; it should get marked when we're marked.
-        - Add a function to move bytes from buffer to buffer.
-        - Have handle_read dtrt for linked connections
-        - Have an activate/deactivate_linked_connection function.
-        - Have activated functions added to a list on first activation, and
+        . Allow connections with s == -1
+        o Add a linked_conn field; it should get marked when we're marked.
+        o Add a function to move bytes from buffer to buffer.
+        o Have read_to_buf dtrt for linked connections
+        o Have handle_read dtrt for linked connections
+        o Have an activate/deactivate_linked_connection function.
+        o Have activated connections added to a list on first activation, and
           that list made active before calls to event_loop.
-    - Generate torrc.{complete|sample}.in, tor.1.in, the HTML manual, and the
+        o Have connections get deactivated when no more data to write on
+          linked conn outbuf.
+        o Handle closing connections properly.
+        o Actually create and use linked connections.
+        - Handle rate-limiting on directory writes to linked directory
+          connections in a more sensible manner.
+        - Rename want_to_read and want_to_write; they're actually about
+          being blocked, not about wanting to read/write.
+        - Find more ways to test this.
+    D Generate torrc.{complete|sample}.in, tor.1.in, the HTML manual, and the
       online config documentation from a single source.
     - Have clients do TLS connection rotation less often than "every 10
       minutes" in the thrashy case, and more often than "once a week" in the
@@ -172,6 +195,8 @@
     - Blocking-resistance.
     - It would be potentially helpful to https requests on the OR port by
       acting like an HTTPS server.
+    - Audit how much RAM we're using for buffers and cell pools; try to
+      trim down a lot.
   o Deprecations:
     o Remove v0 control protocol.
 P  - Packaging:
@@ -280,7 +305,7 @@
 
 Minor items for 0.1.2.x as time permits:
   - include bandwidth breakdown by conn->type in BW events.
-  - Unify autoconf search code for libevent and openssl.  Make code
+  o Unify autoconf search code for libevent and openssl.  Make code
     suggest platform-appropriate "devel" / "dev" / whatever packages
     if we can link but we can't find the headers.
   - Recommend polipo? Please?
@@ -318,7 +343,7 @@
     the solution is to have a separate 'extend-data' cell type
     which is used for the first N data cells, and only
     extend-data cells can be extend requests.
-    - Specify, including thought about anonymity implications.
+    . Specify, including thought about anonymity implications. [proposal 110]
   - Display the reasons in 'destroy' and 'truncated' cells under some
     circumstances?
   - If the server is spewing complaints about raising your ulimit -n,
@@ -373,7 +398,7 @@
     I can say "banana" as my bandwidthcapacity, and it won't even squeak.
   o Include the output of svn info in the binary, so it's trivial to see what
     version a binary was built from.
-    - Do the same for svk info.
+    o Do the same for svk info.
   - Add a doxygen style checker to make check-spaces so nick doesn't drift
     too far from arma's undocumented styleguide.  Also, document that
     styleguide in HACKING.  (See r9634 for example.)
@@ -388,7 +413,10 @@
   - Should TrackHostExits expire TrackHostExitsExpire seconds after their
     *last* use, not their *first* use?
   X Configuration format really wants sections.
-  - Good RBL substitute.
+  . Good RBL substitute.
+    - Play with the implementations; link them from somewhere; add a
+      round-robin link from torel.torproject.org; describe how to
+      use them in the FAQ.
   - Authorities should try using exits for http to connect to some URLS
     (specified in a configuration file, so as not to make the List Of Things
     Not To Censor completely obvious) and ask them for results.  Exits that

Modified: tor/trunk/src/or/buffers.c
===================================================================
--- tor/trunk/src/or/buffers.c	2007-04-21 17:24:18 UTC (rev 9994)
+++ tor/trunk/src/or/buffers.c	2007-04-21 17:26:12 UTC (rev 9995)
@@ -817,6 +817,32 @@
   return buf->datalen;
 }
 
+/** Move up to <b>buf_flushlen</b> bytes from <b>buf_in</b> to <b>buf_out</b>.
+ * Return the number of bytes actually copied.
+ */
+int
+move_buf_to_buf(buf_t *buf_out, buf_t *buf_in, size_t *buf_flushlen)
+{
+  char b[4096];
+  size_t cp, len;
+  len = *buf_flushlen;
+  if (len > buf_in->datalen)
+    len = buf_in->datalen;
+
+  cp = len; /* Remember the number of bytes we intend to copy. */
+  while (len) {
+    /* This isn't the most efficient implementation one could imagine, since
+     * it does two copies instead of 1, but I kinda doubt that this will be
+     * critical path. */
+    size_t n = len > sizeof(b) ? sizeof(b) : len;
+    fetch_from_buf(b, n, buf_in);
+    write_to_buf(b, n, buf_out);
+    len -= n;
+  }
+  *buf_flushlen -= cp;
+  return cp;
+}
+
 /** There is a (possibly incomplete) http statement on <b>buf</b>, of the
  * form "\%s\\r\\n\\r\\n\%s", headers, body. (body may contain nuls.)
  * If a) the headers include a Content-Length field and all bytes in

Modified: tor/trunk/src/or/connection.c
===================================================================
--- tor/trunk/src/or/connection.c	2007-04-21 17:24:18 UTC (rev 9994)
+++ tor/trunk/src/or/connection.c	2007-04-21 17:26:12 UTC (rev 9995)
@@ -113,6 +113,7 @@
         case DIR_CONN_STATE_CONNECTING: return "connecting";
         case DIR_CONN_STATE_CLIENT_SENDING: return "client sending";
         case DIR_CONN_STATE_CLIENT_READING: return "client reading";
+        case DIR_CONN_STATE_CLIENT_FINISHED: return "client finished";
         case DIR_CONN_STATE_SERVER_COMMAND_WAIT: return "waiting for command";
         case DIR_CONN_STATE_SERVER_WRITING: return "writing";
       }
@@ -212,9 +213,22 @@
   return conn;
 }
 
+/** Create a link between <b>conn_a</b> and <b>conn_b</b> */
+void
+connection_link_connections(connection_t *conn_a, connection_t *conn_b)
+{
+  tor_assert(conn_a->s < 0);
+  tor_assert(conn_b->s < 0);
+
+  conn_a->linked = 1;
+  conn_b->linked = 1;
+  conn_a->linked_conn = conn_b;
+  conn_b->linked_conn = conn_a;
+}
+
 /** Tell libevent that we don't care about <b>conn</b> any more. */
 void
-connection_unregister(connection_t *conn)
+connection_unregister_events(connection_t *conn)
 {
   if (conn->read_event) {
     if (event_del(conn->read_event))
@@ -260,6 +274,17 @@
       break;
   }
 
+  if (conn->linked) {
+    int severity = buf_datalen(conn->inbuf)+buf_datalen(conn->outbuf)
+      ? LOG_NOTICE : LOG_INFO;
+    log_fn(severity, LD_GENERAL, "Freeing linked %s connection [%s] with %d "
+           "bytes on inbuf, %d on outbuf.",
+           conn_type_to_string(conn->type),
+           conn_state_to_string(conn->type, conn->state),
+           (int)buf_datalen(conn->inbuf), (int)buf_datalen(conn->outbuf));
+    // tor_assert(!buf_datalen(conn->outbuf)); /*XXXX020 remove me.*/
+  }
+
   if (!connection_is_listener(conn)) {
     buf_free(conn->inbuf);
     buf_free(conn->outbuf);
@@ -325,6 +350,15 @@
   tor_assert(conn);
   tor_assert(!connection_is_on_closeable_list(conn));
   tor_assert(!connection_in_array(conn));
+  if (conn->linked_conn) {
+    log_err(LD_BUG, "Called with conn->linked_conn still set.");
+    tor_fragile_assert();
+    conn->linked_conn->linked_conn = NULL;
+    if (! conn->linked_conn->marked_for_close &&
+        conn->linked_conn->reading_from_linked_conn)
+      connection_start_reading(conn->linked_conn);
+    conn->linked_conn = NULL;
+  }
   if (connection_speaks_cells(conn)) {
     if (conn->state == OR_CONN_STATE_OPEN)
       directory_set_dirty();
@@ -336,7 +370,7 @@
     TO_CONTROL_CONN(conn)->event_mask = 0;
     control_update_global_event_mask();
   }
-  connection_unregister(conn);
+  connection_unregister_events(conn);
   _connection_free(conn);
 }
 
@@ -486,7 +520,7 @@
 connection_close_immediate(connection_t *conn)
 {
   assert_connection_ok(conn,0);
-  if (conn->s < 0) {
+  if (conn->s < 0 && !conn->linked) {
     log_err(LD_BUG,"Attempt to close already-closed connection.");
     tor_fragile_assert();
     return;
@@ -498,9 +532,10 @@
              (int)conn->outbuf_flushlen);
   }
 
-  connection_unregister(conn);
+  connection_unregister_events(conn);
 
-  tor_close_socket(conn->s);
+  if (conn->s >= 0)
+    tor_close_socket(conn->s);
   conn->s = -1;
   if (!connection_is_listener(conn)) {
     buf_clear(conn->outbuf);
@@ -529,6 +564,12 @@
   conn->marked_for_close_file = file;
   add_connection_to_closeable_list(conn);
 
+#if 0
+  /* XXXX020 Actually, I don't think this is right. */
+  if (conn->linked_conn && !conn->linked_conn->marked_for_close)
+    _connection_mark_for_close(conn->linked_conn, line, file);
+#endif
+
   /* in case we're going to be held-open-til-flushed, reset
    * the number of seconds since last successful write, so
    * we get our whole 15 seconds */
@@ -1101,11 +1142,14 @@
 
 /** Return 1 if we should apply rate limiting to <b>conn</b>,
  * and 0 otherwise. Right now this just checks if it's an internal
- * IP address. */
+ * IP address or an internal connection. */
 static int
 connection_is_rate_limited(connection_t *conn)
 {
-  return !is_internal_IP(conn->addr, 0);
+  if (conn->linked || is_internal_IP(conn->addr, 0))
+    return 0;
+  else
+    return 1;
 }
 
 extern int global_read_bucket, global_write_bucket;
@@ -1483,6 +1527,7 @@
 connection_handle_read(connection_t *conn)
 {
   int max_to_read=-1, try_to_read;
+  size_t before, n_read = 0;
 
   if (conn->marked_for_close)
     return 0; /* do nothing */
@@ -1505,6 +1550,8 @@
 loop_again:
   try_to_read = max_to_read;
   tor_assert(!conn->marked_for_close);
+
+  before = buf_datalen(conn->inbuf);
   if (connection_read_to_buf(conn, &max_to_read) < 0) {
     /* There's a read error; kill the connection.*/
     connection_close_immediate(conn); /* Don't flush; connection is dead. */
@@ -1517,6 +1564,7 @@
     connection_mark_for_close(conn);
     return -1;
   }
+  n_read += buf_datalen(conn->inbuf) - before;
   if (CONN_IS_EDGE(conn) && try_to_read != max_to_read) {
     /* instruct it not to try to package partial cells. */
     if (connection_process_inbuf(conn, 0) < 0) {
@@ -1533,6 +1581,27 @@
       connection_process_inbuf(conn, 1) < 0) {
     return -1;
   }
+  if (conn->linked_conn) {
+    /* The other side's handle_write will never actually get called, so
+     * we need to invoke the appropriate callbacks ourself. */
+    connection_t *linked = conn->linked_conn;
+    /* XXXX020 Do we need to ensure that this stuff is called even if
+     * conn dies in a way that causes us to return -1 earlier? */
+
+    if (n_read) {
+      /* Probably a no-op, but hey. */
+      connection_buckets_decrement(linked, time(NULL), 0, n_read);
+
+      if (connection_flushed_some(linked) < 0)
+        connection_mark_for_close(linked);
+      if (!connection_wants_to_flush(linked))
+        connection_finished_flushing(linked);
+    }
+
+    if (!buf_datalen(linked->outbuf) && conn->active_on_link)
+      connection_stop_reading_from_linked_conn(conn);
+  }
+  /* If we hit the EOF, call connection_reached_eof. */
   if (!conn->marked_for_close &&
       conn->inbuf_reached_eof &&
       connection_reached_eof(conn) < 0) {
@@ -1541,9 +1610,9 @@
   return 0;
 }
 
-/** Pull in new bytes from conn-\>s onto conn-\>inbuf, either
- * directly or via TLS. Reduce the token buckets by the number of
- * bytes read.
+/** Pull in new bytes from conn-\>s or conn-\>linked_conn onto conn-\>inbuf,
+ * either directly or via TLS. Reduce the token buckets by the number of bytes
+ * read.
  *
  * If *max_to_read is -1, then decide it ourselves, else go with the
  * value passed to us. When returning, if it's changed, subtract the
@@ -1633,7 +1702,24 @@
     tor_tls_get_n_raw_bytes(or_conn->tls, &n_read, &n_written);
     log_debug(LD_GENERAL, "After TLS read of %d: %ld read, %ld written",
               result, (long)n_read, (long)n_written);
+  } else if (conn->linked) {
+    if (conn->linked_conn) {
+      result = move_buf_to_buf(conn->inbuf, conn->linked_conn->outbuf,
+                               &conn->linked_conn->outbuf_flushlen);
+    } else {
+      result = 0;
+    }
+    //log_notice(LD_GENERAL, "Moved %d bytes on an internal link!", result);
+    /* If the other side has disappeared, or if it's been marked for close and
+     * we flushed its outbuf, then we should set our inbuf_reached_eof. */
+    if (!conn->linked_conn ||
+        (conn->linked_conn->marked_for_close &&
+         buf_datalen(conn->linked_conn->outbuf) == 0))
+      conn->inbuf_reached_eof = 1;
+
+    n_read = (size_t) result;
   } else {
+    /* !connection_speaks_cells, !conn->linked_conn. */
     int reached_eof = 0;
     CONN_LOG_PROTECT(conn,
         result = read_to_buf(conn->s, at_most, conn->inbuf, &reached_eof));
@@ -1687,7 +1773,7 @@
 int
 connection_wants_to_flush(connection_t *conn)
 {
-  return conn->outbuf_flushlen;
+  return conn->outbuf_flushlen > 0;
 }
 
 /** Are there too many bytes on edge connection <b>conn</b>'s outbuf to
@@ -2203,6 +2289,19 @@
   return 0;
 }
 
+/** DOCDOC */
+int
+connection_should_read_from_linked_conn(connection_t *conn)
+{
+  if (conn->linked && conn->reading_from_linked_conn) {
+    if (! conn->linked_conn ||
+        (conn->linked_conn->writing_to_linked_conn &&
+         buf_datalen(conn->linked_conn->outbuf)))
+      return 1;
+  }
+  return 0;
+}
+
 /** Allocates a base64'ed authenticator for use in http or https
  * auth, based on the input string <b>authenticator</b>. Returns it
  * if success, else returns NULL. */
@@ -2433,6 +2532,13 @@
       break;
   }
 
+  if (conn->linked_conn) {
+    tor_assert(conn->linked_conn->linked_conn == conn);
+    tor_assert(conn->linked != 0);
+  }
+  if (conn->linked)
+    tor_assert(conn->s < 0);
+
   if (conn->outbuf_flushlen > 0) {
     tor_assert(connection_is_writing(conn) || conn->wants_to_write ||
                conn->edge_blocked_on_circ);

Modified: tor/trunk/src/or/connection_edge.c
===================================================================
--- tor/trunk/src/or/connection_edge.c	2007-04-21 17:24:18 UTC (rev 9994)
+++ tor/trunk/src/or/connection_edge.c	2007-04-21 17:26:12 UTC (rev 9995)
@@ -1860,32 +1860,21 @@
  * and call connection_ap_handshake_attach_circuit(conn) on it.
  *
  * Return the other end of the socketpair, or -1 if error.
+ *
+ * DOCDOC The above is now wrong; we use links.
+ * DOCDOC start_reading
  */
-int
+edge_connection_t *
 connection_ap_make_bridge(char *address, uint16_t port,
                           const char *digest, int command)
 {
-  int fd[2];
   edge_connection_t *conn;
-  int err;
 
-  log_info(LD_APP,"Making AP bridge to %s:%d ...",safe_str(address),port);
+  log_notice(LD_APP,"Making internal anonymized tunnel to %s:%d ...",
+             safe_str(address),port); /* XXXX020 Downgrade back to info. */
 
-  if ((err = tor_socketpair(AF_UNIX, SOCK_STREAM, 0, fd)) < 0) {
-    log_warn(LD_NET,
-             "Couldn't construct socketpair (%s). Network down? Delaying.",
-             tor_socket_strerror(-err));
-    return -1;
-  }
-
-  tor_assert(fd[0] >= 0);
-  tor_assert(fd[1] >= 0);
-
-  set_socket_nonblocking(fd[0]);
-  set_socket_nonblocking(fd[1]);
-
   conn = TO_EDGE_CONN(connection_new(CONN_TYPE_AP));
-  conn->_base.s = fd[0];
+  conn->_base.linked = 1; /* so that we can add it safely below. */
 
   /* populate conn->socks_request */
 
@@ -1903,28 +1892,25 @@
                   digest, DIGEST_LEN);
   }
 
-  conn->_base.address = tor_strdup("(local bridge)");
+  conn->_base.address = tor_strdup("(local bridge)"); /*XXXX020 no "bridge"*/
   conn->_base.addr = 0;
   conn->_base.port = 0;
 
   if (connection_add(TO_CONN(conn)) < 0) { /* no space, forget it */
-    connection_free(TO_CONN(conn)); /* this closes fd[0] */
-    tor_close_socket(fd[1]);
-    return -1;
+    connection_free(TO_CONN(conn));
+    return NULL;
   }
 
   conn->_base.state = AP_CONN_STATE_CIRCUIT_WAIT;
-  connection_start_reading(TO_CONN(conn));
 
   /* attaching to a dirty circuit is fine */
   if (connection_ap_handshake_attach_circuit(conn) < 0) {
     connection_mark_unattached_ap(conn, END_STREAM_REASON_CANT_ATTACH);
-    tor_close_socket(fd[1]);
-    return -1;
+    return NULL;
   }
 
   log_info(LD_APP,"... AP bridge created and connected.");
-  return fd[1];
+  return conn;
 }
 
 /** Send an answer to an AP connection that has requested a DNS lookup
@@ -2406,37 +2392,19 @@
  * back an end cell for).  Return -(some circuit end reason) if the circuit
  * needs to be torn down.  Either connects exit_conn, frees it, or marks it,
  * as appropriate.
+ *
+ * DOCDOC no longer uses socketpair
  */
 static int
 connection_exit_connect_dir(edge_connection_t *exit_conn)
 {
-  int fd[2];
-  int err;
   dir_connection_t *dir_conn = NULL;
 
-  log_info(LD_EXIT, "Opening dir bridge");
+  log_info(LD_EXIT, "Opening local connection for anonymized directory exit");
 
-  if ((err = tor_socketpair(AF_UNIX, SOCK_STREAM, 0, fd)) < 0) {
-    log_warn(LD_NET,
-             "Couldn't construct socketpair (%s). "
-             "Network down? Out of sockets?",
-             tor_socket_strerror(-err));
-    connection_edge_end(exit_conn, END_STREAM_REASON_RESOURCELIMIT);
-    connection_free(TO_CONN(exit_conn));
-    return 0;
-  }
-
-  tor_assert(fd[0] >= 0);
-  tor_assert(fd[1] >= 0);
-
-  set_socket_nonblocking(fd[0]);
-  set_socket_nonblocking(fd[1]);
-
-  exit_conn->_base.s = fd[0];
   exit_conn->_base.state = EXIT_CONN_STATE_OPEN;
 
   dir_conn = TO_DIR_CONN(connection_new(CONN_TYPE_DIR));
-  dir_conn->_base.s = fd[1];
 
   dir_conn->_base.addr = 0x7f000001;
   dir_conn->_base.port = 0;
@@ -2445,6 +2413,8 @@
   dir_conn->_base.purpose = DIR_PURPOSE_SERVER;
   dir_conn->_base.state = DIR_CONN_STATE_SERVER_COMMAND_WAIT;
 
+  connection_link_connections(TO_CONN(dir_conn), TO_CONN(exit_conn));
+
   if (connection_add(TO_CONN(exit_conn))<0) {
     connection_edge_end(exit_conn, END_STREAM_REASON_RESOURCELIMIT);
     connection_free(TO_CONN(exit_conn));

Modified: tor/trunk/src/or/directory.c
===================================================================
--- tor/trunk/src/or/directory.c	2007-04-21 17:24:18 UTC (rev 9994)
+++ tor/trunk/src/or/directory.c	2007-04-21 17:26:12 UTC (rev 9995)
@@ -451,22 +451,25 @@
            error indicates broken link in windowsland. */
     }
   } else { /* we want to connect via tor */
+    edge_connection_t *linked_conn;
     /* make an AP connection
      * populate it and add it at the right state
      * socketpair and hook up both sides
      */
     conn->dirconn_direct = 0;
-    conn->_base.s =
+    linked_conn =
       connection_ap_make_bridge(conn->_base.address, conn->_base.port,
                                 digest,
                                 private_connection ?
                                   SOCKS_COMMAND_CONNECT :
                                   SOCKS_COMMAND_CONNECT_DIR);
-    if (conn->_base.s < 0) {
+    if (!linked_conn) {
       log_warn(LD_NET,"Making AP bridge to dirserver failed.");
       connection_mark_for_close(TO_CONN(conn));
+      connection_mark_for_close(TO_CONN(linked_conn));
       return;
     }
+    connection_link_connections(TO_CONN(conn), TO_CONN(linked_conn));
 
     if (connection_add(TO_CONN(conn)) < 0) {
       log_warn(LD_NET,"Unable to add AP bridge to dirserver.");
@@ -478,6 +481,7 @@
     directory_send_command(conn, purpose, 0, resource,
                            payload, payload_len);
     connection_watch_events(TO_CONN(conn), EV_READ | EV_WRITE);
+    connection_start_reading(TO_CONN(linked_conn));
   }
 }
 
@@ -1297,7 +1301,8 @@
 {
   int retval;
   if (conn->_base.state != DIR_CONN_STATE_CLIENT_READING) {
-    log_info(LD_HTTP,"conn reached eof, not reading. Closing.");
+    log_info(LD_HTTP,"conn reached eof, not reading. [state=%d] Closing.",
+             conn->_base.state);
     connection_close_immediate(TO_CONN(conn)); /* error: give up on flushing */
     connection_mark_for_close(TO_CONN(conn));
     return -1;

Modified: tor/trunk/src/or/main.c
===================================================================
--- tor/trunk/src/or/main.c	2007-04-21 17:24:18 UTC (rev 9994)
+++ tor/trunk/src/or/main.c	2007-04-21 17:26:12 UTC (rev 9995)
@@ -74,6 +74,10 @@
 /** List of connections that have been marked for close and need to be freed
  * and removed from connection_array. */
 static smartlist_t *closeable_connection_lst = NULL;
+/** DOCDOC */
+static smartlist_t *active_linked_connection_lst = NULL;
+/** DOCDOC */
+static int called_loop_once = 0;
 
 static int n_conns=0; /**< Number of connections currently active. */
 
@@ -155,7 +159,7 @@
 connection_add(connection_t *conn)
 {
   tor_assert(conn);
-  tor_assert(conn->s >= 0);
+  tor_assert(conn->s >= 0 || conn->linked);
 
   tor_assert(conn->conn_array_index == -1); /* can only connection_add once */
   if (n_conns == MAXCONNECTIONS) {
@@ -198,13 +202,12 @@
 
   tor_assert(conn->conn_array_index >= 0);
   current_index = conn->conn_array_index;
+  connection_unregister_events(conn); /* This is redundant, but cheap. */
   if (current_index == n_conns-1) { /* this is the end */
     n_conns--;
     return 0;
   }
 
-  connection_unregister(conn);
-
   /* replace this one with the one at the end */
   n_conns--;
   connection_array[current_index] = connection_array[n_conns];
@@ -213,23 +216,31 @@
   return 0;
 }
 
-/** If it's an edge conn, remove it from the list
+/** If <b>conn</b> is an edge conn, remove it from the list
  * of conn's on this circuit. If it's not on an edge,
  * flush and send destroys for all circuits on this conn.
  *
- * If <b>remove</b> is non-zero, then remove it from the
- * connection_array and closeable_connection_lst.
+ * Remove it from connection_array (if applicable) and
+ * from closeable_connection_list.
  *
  * Then free it.
  */
 static void
-connection_unlink(connection_t *conn, int remove)
+connection_unlink(connection_t *conn)
 {
   connection_about_to_close_connection(conn);
-  if (remove) {
+  if (conn->conn_array_index >= 0) {
     connection_remove(conn);
   }
+  if (conn->linked_conn) {
+    conn->linked_conn->linked_conn = NULL;
+    if (! conn->linked_conn->marked_for_close &&
+        conn->linked_conn->reading_from_linked_conn)
+      connection_start_reading(conn->linked_conn);
+    conn->linked_conn = NULL;
+  }
   smartlist_remove(closeable_connection_lst, conn);
+  smartlist_remove(active_linked_connection_lst, conn);
   if (conn->type == CONN_TYPE_EXIT) {
     assert_connection_edge_not_dns_pending(TO_EDGE_CONN(conn));
   }
@@ -286,16 +297,23 @@
 void
 connection_watch_events(connection_t *conn, short events)
 {
-  int r;
+  int r = 0;
 
   tor_assert(conn);
   tor_assert(conn->read_event);
   tor_assert(conn->write_event);
 
-  if (events & EV_READ) {
-    r = event_add(conn->read_event, NULL);
+  if (conn->linked) {
+    if (events & EV_READ)
+      connection_start_reading(conn);
+    else
+      connection_stop_reading(conn);
   } else {
-    r = event_del(conn->read_event);
+    if (events & EV_READ) {
+      r = event_add(conn->read_event, NULL);
+    } else {
+      r = event_del(conn->read_event);
+    }
   }
 
   if (r<0)
@@ -305,10 +323,17 @@
              conn->s, (events & EV_READ)?"":"un",
              tor_socket_strerror(tor_socket_errno(conn->s)));
 
-  if (events & EV_WRITE) {
-    r = event_add(conn->write_event, NULL);
+  if (conn->linked) {
+    if (events & EV_WRITE)
+      connection_start_writing(conn);
+    else
+      connection_stop_writing(conn);
   } else {
-    r = event_del(conn->write_event);
+    if (events & EV_WRITE) {
+      r = event_add(conn->write_event, NULL);
+    } else {
+      r = event_del(conn->write_event);
+    }
   }
 
   if (r<0)
@@ -325,7 +350,8 @@
 {
   tor_assert(conn);
 
-  return conn->read_event && event_pending(conn->read_event, EV_READ, NULL);
+  return conn->reading_from_linked_conn ||
+    (conn->read_event && event_pending(conn->read_event, EV_READ, NULL));
 }
 
 /** Tell the main loop to stop notifying <b>conn</b> of any read events. */
@@ -335,12 +361,16 @@
   tor_assert(conn);
   tor_assert(conn->read_event);
 
-  log_debug(LD_NET,"entering.");
-  if (event_del(conn->read_event))
-    log_warn(LD_NET, "Error from libevent setting read event state for %d "
-             "to unwatched: %s",
-             conn->s,
-             tor_socket_strerror(tor_socket_errno(conn->s)));
+  if (conn->linked) {
+    conn->reading_from_linked_conn = 0;
+    connection_stop_reading_from_linked_conn(conn);
+  } else {
+    if (event_del(conn->read_event))
+      log_warn(LD_NET, "Error from libevent setting read event state for %d "
+               "to unwatched: %s",
+               conn->s,
+               tor_socket_strerror(tor_socket_errno(conn->s)));
+  }
 }
 
 /** Tell the main loop to start notifying <b>conn</b> of any read events. */
@@ -350,11 +380,17 @@
   tor_assert(conn);
   tor_assert(conn->read_event);
 
-  if (event_add(conn->read_event, NULL))
-    log_warn(LD_NET, "Error from libevent setting read event state for %d "
-             "to watched: %s",
-             conn->s,
-             tor_socket_strerror(tor_socket_errno(conn->s)));
+  if (conn->linked) {
+    conn->reading_from_linked_conn = 1;
+    if (connection_should_read_from_linked_conn(conn))
+      connection_start_reading_from_linked_conn(conn);
+  } else {
+    if (event_add(conn->read_event, NULL))
+      log_warn(LD_NET, "Error from libevent setting read event state for %d "
+               "to watched: %s",
+               conn->s,
+               tor_socket_strerror(tor_socket_errno(conn->s)));
+  }
 }
 
 /** Return true iff <b>conn</b> is listening for write events. */
@@ -363,7 +399,8 @@
 {
   tor_assert(conn);
 
-  return conn->write_event && event_pending(conn->write_event, EV_WRITE, NULL);
+  return conn->writing_to_linked_conn ||
+    (conn->write_event && event_pending(conn->write_event, EV_WRITE, NULL));
 }
 
 /** Tell the main loop to stop notifying <b>conn</b> of any write events. */
@@ -373,11 +410,17 @@
   tor_assert(conn);
   tor_assert(conn->write_event);
 
-  if (event_del(conn->write_event))
-    log_warn(LD_NET, "Error from libevent setting write event state for %d "
-             "to unwatched: %s",
-             conn->s,
-             tor_socket_strerror(tor_socket_errno(conn->s)));
+  if (conn->linked) {
+    conn->writing_to_linked_conn = 0;
+    if (conn->linked_conn)
+      connection_stop_reading_from_linked_conn(conn->linked_conn);
+  } else {
+    if (event_del(conn->write_event))
+      log_warn(LD_NET, "Error from libevent setting write event state for %d "
+               "to unwatched: %s",
+               conn->s,
+               tor_socket_strerror(tor_socket_errno(conn->s)));
+  }
 }
 
 /** Tell the main loop to start notifying <b>conn</b> of any write events. */
@@ -387,13 +430,60 @@
   tor_assert(conn);
   tor_assert(conn->write_event);
 
-  if (event_add(conn->write_event, NULL))
-    log_warn(LD_NET, "Error from libevent setting write event state for %d "
-             "to watched: %s",
-             conn->s,
-             tor_socket_strerror(tor_socket_errno(conn->s)));
+  if (conn->linked) {
+    conn->writing_to_linked_conn = 1;
+    if (conn->linked_conn &&
+        connection_should_read_from_linked_conn(conn->linked_conn))
+      connection_start_reading_from_linked_conn(conn->linked_conn);
+  } else {
+    if (event_add(conn->write_event, NULL))
+      log_warn(LD_NET, "Error from libevent setting write event state for %d "
+               "to watched: %s",
+               conn->s,
+               tor_socket_strerror(tor_socket_errno(conn->s)));
+  }
 }
 
+/** DOCDOC*/
+void
+connection_start_reading_from_linked_conn(connection_t *conn)
+{
+  tor_assert(conn);
+  tor_assert(conn->linked == 1);
+
+  if (!conn->active_on_link) {
+    conn->active_on_link = 1;
+    smartlist_add(active_linked_connection_lst, conn);
+    if (!called_loop_once) {
+      /* This is the first event on the list; we won't be in LOOP_ONCE mode,
+       * so we need to make sure that the event_loop() actually exits at the
+       * end of its run through the current connections and
+       * lets us activate read events for linked connections.  */
+      struct timeval tv = { 0, 0 };
+      event_loopexit(&tv);
+    }
+  } else {
+    tor_assert(smartlist_isin(active_linked_connection_lst, conn));
+  }
+}
+
+/** DOCDOC*/
+void
+connection_stop_reading_from_linked_conn(connection_t *conn)
+{
+  tor_assert(conn);
+  tor_assert(conn->linked == 1);
+
+  if (conn->active_on_link) {
+    conn->active_on_link = 0;
+    /* XXXX020 maybe we should keep an index here so we can smartlist_del
+     * cleanly. */
+    smartlist_remove(active_linked_connection_lst, conn);
+  } else {
+    tor_assert(!smartlist_isin(active_linked_connection_lst, conn));
+  }
+}
+
 /** Close all connections that have been scheduled to get closed */
 static void
 close_closeable_connections(void)
@@ -402,7 +492,7 @@
   for (i = 0; i < smartlist_len(closeable_connection_lst); ) {
     connection_t *conn = smartlist_get(closeable_connection_lst, i);
     if (conn->conn_array_index < 0) {
-      connection_unlink(conn, 0); /* blow it away right now */
+      connection_unlink(conn); /* blow it away right now */
     } else {
       if (!conn_close_if_marked(conn->conn_array_index))
         ++i;
@@ -500,7 +590,7 @@
   assert_all_pending_dns_resolves_ok();
 
   log_debug(LD_NET,"Cleaning up connection (fd %d).",conn->s);
-  if (conn->s >= 0 && connection_wants_to_flush(conn)) {
+  if ((conn->s >= 0 || conn->linked_conn) && connection_wants_to_flush(conn)) {
     /* s == -1 means it's an incomplete edge connection, or that the socket
      * has already been closed as unflushable. */
     int sz = connection_bucket_write_limit(conn);
@@ -512,7 +602,21 @@
                conn->s, conn_type_to_string(conn->type), conn->state,
                (int)conn->outbuf_flushlen,
                 conn->marked_for_close_file, conn->marked_for_close);
-    if (connection_speaks_cells(conn)) {
+    if (conn->linked_conn) {
+      retval = move_buf_to_buf(conn->linked_conn->inbuf, conn->outbuf,
+                               &conn->outbuf_flushlen);
+      if (retval >= 0) {
+        /* The linked conn will notice that it has data when it notices that
+         * we're gone. */
+        connection_start_reading_from_linked_conn(conn->linked_conn);
+      }
+      /* XXXX020 Downgrade to debug. */
+      log_info(LD_GENERAL, "Flushed last %d bytes from a linked conn; "
+               "%d left; flushlen %d; wants-to-flush==%d", retval,
+               (int)buf_datalen(conn->outbuf),
+               (int)conn->outbuf_flushlen,
+               connection_wants_to_flush(conn));
+    } else if (connection_speaks_cells(conn)) {
       if (conn->state == OR_CONN_STATE_OPEN) {
         retval = flush_buf_tls(TO_OR_CONN(conn)->tls, conn->outbuf, sz,
                                &conn->outbuf_flushlen);
@@ -553,7 +657,7 @@
              conn->marked_for_close);
     }
   }
-  connection_unlink(conn, 1); /* unlink, remove, free */
+  connection_unlink(conn); /* unlink, remove, free */
   return 1;
 }
 
@@ -1270,9 +1374,15 @@
     /* Make it easier to tell whether libevent failure is our fault or not. */
     errno = 0;
 #endif
-    /* poll until we have an event, or the second ends */
-    loop_result = event_dispatch();
+    /* All active linked conns should get their read events activated. */
+    SMARTLIST_FOREACH(active_linked_connection_lst, connection_t *, conn,
+                      event_active(conn->read_event, EV_READ, 1));
+    called_loop_once = smartlist_len(active_linked_connection_lst) ? 1 : 0;
 
+    /* poll until we have an event, or the second ends, or until we have
+     * some active linked connections to triggger events for. */
+    loop_result = event_loop(called_loop_once ? EVLOOP_ONCE : 0);
+
     /* let catch() handle things like ^c, and otherwise don't worry about it */
     if (loop_result < 0) {
       int e = tor_socket_errno(-1);
@@ -1601,6 +1711,8 @@
   time_of_process_start = time(NULL);
   if (!closeable_connection_lst)
     closeable_connection_lst = smartlist_create();
+  if (!active_linked_connection_lst)
+    active_linked_connection_lst = smartlist_create();
   /* Initialize the history structures. */
   rep_hist_init();
   /* Initialize the service cache. */
@@ -1673,6 +1785,7 @@
   tor_tls_free_all();
   /* stuff in main.c */
   smartlist_free(closeable_connection_lst);
+  smartlist_free(active_linked_connection_lst);
   tor_free(timeout_event);
   /* Stuff in util.c */
   escaped(NULL);

Modified: tor/trunk/src/or/or.h
===================================================================
--- tor/trunk/src/or/or.h	2007-04-21 17:24:18 UTC (rev 9994)
+++ tor/trunk/src/or/or.h	2007-04-21 17:26:12 UTC (rev 9995)
@@ -748,6 +748,7 @@
   /* The next fields are all one-bit booleans. Some are only applicable
    * to connection subtypes, but we hold them here anyway, to save space.
    * (Currently, they all fit into a single byte.) */
+  /*XXXX020 rename wants_to_*; the names are misleading. */
   unsigned wants_to_read:1; /**< Boolean: should we start reading again once
                             * the bandwidth throttler allows it? */
   unsigned wants_to_write:1; /**< Boolean: should we start writing again once
@@ -771,7 +772,7 @@
   unsigned int chosen_exit_optional:1;
 
   int s; /**< Our socket; -1 if this connection is closed, or has no
-          * sockets. */
+          * socket. */
   int conn_array_index; /**< Index into the global connection array. */
   struct event *read_event; /**< Libevent event structure. */
   struct event *write_event; /**< Libevent event structure. */
@@ -797,6 +798,13 @@
                                       * we marked for close? */
   char *address; /**< FQDN (or IP) of the guy on the other end.
                   * strdup into this, because free_connection frees it. */
+  /** Annother connection that's connected to this one in lieu of a socket. */
+  struct connection_t *linked_conn;
+  /* XXXX020 NM move these up to the other 1-bit flags. */
+  unsigned int linked:1; /**< True if there is, or has been, a linked_conn. */
+  unsigned int reading_from_linked_conn:1; /**DOCDOC*/
+  unsigned int writing_to_linked_conn:1; /**DOCDOC*/
+  unsigned int active_on_link:1; /**DOCDOC*/
 
 } connection_t;
 
@@ -1967,6 +1975,7 @@
 int write_to_buf(const char *string, size_t string_len, buf_t *buf);
 int write_to_buf_zlib(buf_t *buf, tor_zlib_state_t *state,
                       const char *data, size_t data_len, int done);
+int move_buf_to_buf(buf_t *buf_out, buf_t *buf_in, size_t *buf_flushlen);
 int fetch_from_buf(char *string, size_t string_len, buf_t *buf);
 int fetch_from_buf_http(buf_t *buf,
                         char **headers_out, size_t max_headerlen,
@@ -2163,7 +2172,8 @@
 const char *conn_state_to_string(int type, int state);
 
 connection_t *connection_new(int type);
-void connection_unregister(connection_t *conn);
+void connection_link_connections(connection_t *conn_a, connection_t *conn_b);
+void connection_unregister_events(connection_t *conn);
 void connection_free(connection_t *conn);
 void connection_free_all(void);
 void connection_about_to_close_connection(connection_t *conn);
@@ -2227,6 +2237,7 @@
 int connection_is_listener(connection_t *conn);
 int connection_state_is_open(connection_t *conn);
 int connection_state_is_connecting(connection_t *conn);
+int connection_should_read_from_linked_conn(connection_t *conn);
 
 char *alloc_http_authenticator(const char *authenticator);
 
@@ -2252,8 +2263,8 @@
 int connection_ap_handshake_send_begin(edge_connection_t *ap_conn);
 int connection_ap_handshake_send_resolve(edge_connection_t *ap_conn);
 
-int connection_ap_make_bridge(char *address, uint16_t port,
-                              const char *digest, int command);
+edge_connection_t  *connection_ap_make_bridge(char *address, uint16_t port,
+                                              const char *digest, int command);
 void connection_ap_handshake_socks_reply(edge_connection_t *conn, char *reply,
                                          size_t replylen,
                                          int endreason);
@@ -2580,6 +2591,9 @@
 void connection_stop_writing(connection_t *conn);
 void connection_start_writing(connection_t *conn);
 
+void connection_stop_reading_from_linked_conn(connection_t *conn);
+void connection_start_reading_from_linked_conn(connection_t *conn);
+
 void directory_all_unreachable(time_t now);
 void directory_info_has_arrived(time_t now, int from_cache);
 

Modified: tor/trunk/src/or/test.c
===================================================================
--- tor/trunk/src/or/test.c	2007-04-21 17:24:18 UTC (rev 9994)
+++ tor/trunk/src/or/test.c	2007-04-21 17:26:12 UTC (rev 9995)
@@ -111,9 +111,10 @@
   char str[256];
   char str2[256];
 
-  buf_t *buf;
+  buf_t *buf, *buf2;
 
   int j;
+  size_t r;
 
   /****
    * buf_new
@@ -218,6 +219,37 @@
     test_memeq(str2, str, 255);
   }
 
+  /* Move from buf to buf. */
+  buf_free(buf);
+  buf = buf_new_with_capacity(4096);
+  buf2 = buf_new_with_capacity(4096);
+  for (j=0;j<100;++j)
+    write_to_buf(str, 255, buf);
+  test_eq(buf_datalen(buf), 25500);
+  for (j=0;j<100;++j) {
+    r = 10;
+    move_buf_to_buf(buf2, buf, &r);
+    test_eq(r, 0);
+  }
+  test_eq(buf_datalen(buf), 24500);
+  test_eq(buf_datalen(buf2), 1000);
+  for (j=0;j<3;++j) {
+    fetch_from_buf(str2, 255, buf2);
+    test_memeq(str2, str, 255);
+  }
+  r = 8192; /*big move*/
+  move_buf_to_buf(buf2, buf, &r);
+  test_eq(r, 0);
+  r = 30000; /* incomplete move */
+  move_buf_to_buf(buf2, buf, &r);
+  test_eq(r, 13692);
+  for (j=0;j<97;++j) {
+    fetch_from_buf(str2, 255, buf2);
+    test_memeq(str2, str, 255);
+  }
+  buf_free(buf);
+  buf_free(buf2);
+
 #if 0
   {
   int s;
@@ -285,8 +317,6 @@
   test_eq(eof, 1);
   }
 #endif
-
-  buf_free(buf);
 }
 
 static void