[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[or-cvs] Make AP connections wait for a circuit if none exists.



Update of /home/or/cvsroot/src/or
In directory moria.mit.edu:/tmp/cvs-serv14699/src/or

Modified Files:
	buffers.c circuit.c connection.c connection_edge.c or.h 
Log Message:
Make AP connections wait for a circuit if none exists.
Also:
  - Refactor socks request into a separate struct
  - Add a separate 'waiting for circuit' state to AP connections
    between 'waiting for socks' and 'open'.

Arma: can you check out the XXX's I've added to connection_edge? I may
be mishandling some async and close logic.


Index: buffers.c
===================================================================
RCS file: /home/or/cvsroot/src/or/buffers.c,v
retrieving revision 1.51
retrieving revision 1.52
diff -u -d -r1.51 -r1.52
--- buffers.c	19 Oct 2003 01:10:38 -0000	1.51
+++ buffers.c	11 Nov 2003 02:41:31 -0000	1.52
@@ -395,19 +395,16 @@
  *   socks4a: "socksheader username\0 destaddr\0"
  *   socks5 phase one: "version #methods methods"
  *   socks5 phase two: "version command 0 addresstype..."
- * If it's a complete and valid handshake, and destaddr fits in addr_out,
- *   then pull the handshake off the buf, assign to addr_out and port_out,
- *   and return 1.
+ * If it's a complete and valid handshake, and destaddr fits in
+ *   MAX_SOCKS_ADDR_LEN bytes, then pull the handshake off the buf,
+ *   assign to *req, and return 1.
  * If it's invalid or too big, return -1.
  * Else it's not all there yet, leave buf alone and return 0.
  * If you want to specify the socks reply, write it into *reply
  *   and set *replylen, else leave *replylen alone.
  * If returning 0 or -1, *addr_out and *port_out are undefined.
  */
-int fetch_from_buf_socks(buf_t *buf, char *socks_version,
-                         char *reply, int *replylen,
-                         char *addr_out, int max_addrlen,
-                         uint16_t *port_out) {
+int fetch_from_buf_socks(buf_t *buf, socks_request_t *req) {
   unsigned char len;
   char *tmpbuf=NULL;
   uint32_t destip;
@@ -421,25 +418,25 @@
 
     case 5: /* socks5 */
 
-      if(*socks_version != 5) { /* we need to negotiate a method */
+      if(req->socks_version != 5) { /* we need to negotiate a method */
         unsigned char nummethods = (unsigned char)*(buf->mem+1);
-        assert(!*socks_version);
+        assert(!req->socks_version);
         log_fn(LOG_DEBUG,"socks5: learning offered methods");
         if(buf->datalen < 2+nummethods)
           return 0;
         if(!nummethods || !memchr(buf->mem+2, 0, nummethods)) {
           log_fn(LOG_WARN,"socks5: offered methods don't include 'no auth'. Rejecting.");
-          *replylen = 2; /* 2 bytes of response */
-          *reply = 5; /* socks5 reply */
-          *(reply+1) = 0xFF; /* reject all methods */
+          req->replylen = 2; /* 2 bytes of response */
+          req->reply[0] = 5; /* socks5 reply */
+          req->reply[1] = 0xFF; /* reject all methods */
           return -1;
         }          
         buf_remove_from_front(buf,2+nummethods);/* remove packet from buf */
 
-        *replylen = 2; /* 2 bytes of response */
-        *reply = 5; /* socks5 reply */
-        *(reply+1) = 0; /* choose the 'no auth' method */
-        *socks_version = 5; /* remember that we've already negotiated auth */
+        req->replylen = 2; /* 2 bytes of response */
+        req->reply[0] = 5; /* socks5 reply */
+        req->reply[1] = 0; /* choose the 'no auth' method */
+        req->socks_version = 5; /* remember that we've already negotiated auth */
         log_fn(LOG_DEBUG,"socks5: accepted method 0");
         return 0;
       }
@@ -459,13 +456,13 @@
           destip = ntohl(*(uint32_t*)(buf->mem+4));
           in.s_addr = htonl(destip);
           tmpbuf = inet_ntoa(in);
-          if(strlen(tmpbuf)+1 > max_addrlen) {
+          if(strlen(tmpbuf)+1 > MAX_SOCKS_ADDR_LEN) {
             log_fn(LOG_WARN,"socks5 IP takes %d bytes, which doesn't fit in %d",
-                   strlen(tmpbuf)+1,max_addrlen);
+                   strlen(tmpbuf)+1,MAX_SOCKS_ADDR_LEN);
             return -1;
           }
-          strcpy(addr_out,tmpbuf);
-          *port_out = ntohs(*(uint16_t*)(buf->mem+8));
+          strcpy(req->addr,tmpbuf);
+          req->port = ntohs(*(uint16_t*)(buf->mem+8));
           buf_remove_from_front(buf, 10);
           return 1;
         case 3: /* fqdn */
@@ -473,14 +470,14 @@
           len = (unsigned char)*(buf->mem+4);
           if(buf->datalen < 7+len) /* addr/port there? */
             return 0; /* not yet */
-          if(len+1 > max_addrlen) {
+          if(len+1 > MAX_SOCKS_ADDR_LEN) {
             log_fn(LOG_WARN,"socks5 hostname is %d bytes, which doesn't fit in %d",
-                   len+1,max_addrlen);
+                   len+1,MAX_SOCKS_ADDR_LEN);
             return -1;
           }
-          memcpy(addr_out,buf->mem+5,len);
-          addr_out[len] = 0;
-          *port_out = ntohs(*(uint16_t*)(buf->mem+5+len));
+          memcpy(req->addr,buf->mem+5,len);
+          req->addr[len] = 0;
+          req->port = ntohs(*(uint16_t*)(buf->mem+5+len));
           buf_remove_from_front(buf, 5+len+2);
           return 1;
         default: /* unsupported */
@@ -490,7 +487,7 @@
       assert(0);
     case 4: /* socks4 */
 
-       *socks_version = 4;
+      req->socks_version = 4;
       if(buf->datalen < SOCKS4_NETWORK_LEN) /* basic info available? */
         return 0; /* not yet */
 
@@ -499,9 +496,9 @@
         return -1;
       }
 
-      *port_out = ntohs(*(uint16_t*)(buf->mem+2));
+      req->port = ntohs(*(uint16_t*)(buf->mem+2));
       destip = ntohl(*(uint32_t*)(buf->mem+4));
-      if(!*port_out || !destip) {
+      if(!req->port || !destip) {
         log_fn(LOG_WARN,"socks4: Port or DestIP is zero.");
         return -1;
       }
@@ -509,7 +506,7 @@
         log_fn(LOG_DEBUG,"socks4: destip not in form 0.0.0.x.");
         in.s_addr = htonl(destip);
         tmpbuf = inet_ntoa(in);
-        if(strlen(tmpbuf)+1 > max_addrlen) {
+        if(strlen(tmpbuf)+1 > MAX_SOCKS_ADDR_LEN) {
           log_fn(LOG_WARN,"socks4 addr (%d bytes) too long.", strlen(tmpbuf));
           return -1;
         }
@@ -530,13 +527,13 @@
           log_fn(LOG_DEBUG,"Destaddr not here yet.");
           return 0;
         }
-        if(max_addrlen <= next-startaddr) {
+        if(MAX_SOCKS_ADDR_LEN <= next-startaddr) {
           log_fn(LOG_WARN,"Destaddr too long.");
           return -1;
         }
       }
       log_fn(LOG_DEBUG,"Everything is here. Success.");
-      strcpy(addr_out, socks4_prot == socks4 ? tmpbuf : startaddr);
+      strcpy(req->addr, socks4_prot == socks4 ? tmpbuf : startaddr);
       buf_remove_from_front(buf, next-buf->mem+1); /* next points to the final \0 on inbuf */
       return 1;
 

Index: circuit.c
===================================================================
RCS file: /home/or/cvsroot/src/or/circuit.c,v
retrieving revision 1.81
retrieving revision 1.82
diff -u -d -r1.81 -r1.82
--- circuit.c	21 Oct 2003 09:48:17 -0000	1.81
+++ circuit.c	11 Nov 2003 02:41:31 -0000	1.82
@@ -755,6 +755,9 @@
     if(hop == circ->cpath) { /* done building the circuit. whew. */
       circ->state = CIRCUIT_STATE_OPEN;
       log_fn(LOG_INFO,"circuit built!");
+      /* Tell any AP connections that have been waiting for a new
+       * circuit that one is ready. */
+      connection_ap_attach_pending();
       return 0;
     }
 

Index: connection.c
===================================================================
RCS file: /home/or/cvsroot/src/or/connection.c,v
retrieving revision 1.127
retrieving revision 1.128
diff -u -d -r1.127 -r1.128
--- connection.c	10 Nov 2003 08:06:53 -0000	1.127
+++ connection.c	11 Nov 2003 02:41:31 -0000	1.128
@@ -85,6 +85,10 @@
     conn->inbuf = buf_new();
     conn->outbuf = buf_new();
   }
+  if (type == CONN_TYPE_AP) {
+    conn->socks_request = tor_malloc(sizeof(socks_request_t));
+    memset(conn->socks_request, 0, sizeof(socks_request_t));
+  }
 
   conn->timestamp_created = now;
   conn->timestamp_lastread = now;
@@ -115,6 +119,7 @@
   if (conn->identity_pkey)
     crypto_free_pk_env(conn->identity_pkey);
   tor_free(conn->nickname);
+  tor_free(conn->socks_request);
 
   if(conn->s >= 0) {
     log_fn(LOG_INFO,"closing fd %d.",conn->s);
@@ -760,6 +765,9 @@
       assert_cpath_layer_ok(conn->cpath_layer);
     /* XXX unchecked, package window, deliver window. */
   }
+  if (conn->type != CONN_TYPE_AP) {
+    assert(!conn->socks_request);
+  }
 
   switch(conn->type) 
     {
@@ -779,6 +787,12 @@
     case CONN_TYPE_AP:
       assert(conn->state >= _AP_CONN_STATE_MIN &&
              conn->state <= _AP_CONN_STATE_MAX);
+      if (conn->state == AP_CONN_STATE_SOCKS_WAIT ||
+          conn->state == AP_CONN_STATE_CIRCUIT_WAIT) {
+        assert(conn->socks_request);
+      } else {
+        assert(!conn->socks_request);
+      }
       break;
     case CONN_TYPE_DIR:
       assert(conn->state >= _DIR_CONN_STATE_MIN &&

Index: connection_edge.c
===================================================================
RCS file: /home/or/cvsroot/src/or/connection_edge.c,v
retrieving revision 1.45
retrieving revision 1.46
diff -u -d -r1.45 -r1.46
--- connection_edge.c	10 Nov 2003 08:06:53 -0000	1.45
+++ connection_edge.c	11 Nov 2003 02:41:31 -0000	1.46
@@ -7,8 +7,8 @@
 extern or_options_t options; /* command-line and config-file options */
 
 static int connection_ap_handshake_process_socks(connection_t *conn);
-static int connection_ap_handshake_send_begin(connection_t *ap_conn, circuit_t *circ,
-                                              char *destaddr, uint16_t destport);
+static int connection_ap_handshake_attach_circuit(connection_t *conn);
+static int connection_ap_handshake_send_begin(connection_t *ap_conn, circuit_t *circ);
 static int connection_ap_handshake_socks_reply(connection_t *conn, char *reply,
                                                int replylen, char success);
 
@@ -465,6 +465,31 @@
   goto repeat_connection_edge_package_raw_inbuf;
 }
 
+/* Tell any APs that are waiting for a new circuit that one is available */
+void connection_ap_attach_pending(void)
+{
+  connection_t *conn;
+  int r;
+
+  while ((conn = connection_get_by_type_state(CONN_TYPE_AP,
+                                              AP_CONN_STATE_CIRCUIT_WAIT))) {
+    r = connection_ap_handshake_attach_circuit(conn);
+    if (r == 0) {
+      /* r==0: We're attached; do nothing. */
+    } else if (r>0) {
+      /* r>0: There was no circuit to attach to: stop the loop. */
+      break;
+    } else {
+      /* r<0: There was an error sending the begin cell; other pending  
+       *   AP connections may succeed.
+       */
+      /* XXX Is this right? How do we say that the connection failed?
+       * Should I close it?  mark it for close? -NM */
+      connection_ap_handshake_socks_reply(conn, NULL, 0, 0);
+    }
+  }
+}
+
 static void connection_edge_consider_sending_sendme(connection_t *conn) {
   circuit_t *circ;
  
@@ -491,23 +516,23 @@
 }
 
 static int connection_ap_handshake_process_socks(connection_t *conn) {
-  circuit_t *circ;
-  char destaddr[200]; /* XXX why 200? but not 256, because it won't fit in a cell */
-  char reply[256];
-  uint16_t destport;
-  int replylen=0;
+  socks_request_t *socks;
   int sockshere;
 
   assert(conn);
+  assert(conn->type == CONN_TYPE_AP);
+  assert(conn->state == AP_CONN_STATE_SOCKS_WAIT);
+  assert(conn->socks_request);
+  socks = conn->socks_request;
 
   log_fn(LOG_DEBUG,"entered.");
 
-  sockshere = fetch_from_buf_socks(conn->inbuf, &conn->socks_version, reply, &replylen,
-                                   destaddr, sizeof(destaddr), &destport);
+  sockshere = fetch_from_buf_socks(conn->inbuf, socks);
+  conn->socks_version = socks->socks_version;
   if(sockshere == -1 || sockshere == 0) {
-    if(replylen) { /* we should send reply back */
+    if(socks->replylen) { /* we should send reply back */
       log_fn(LOG_DEBUG,"reply is already set for us. Using it.");
-      connection_ap_handshake_socks_reply(conn, reply, replylen, 0);
+      connection_ap_handshake_socks_reply(conn, socks->reply, socks->replylen, 0);
     } else if(sockshere == -1) { /* send normal reject */
       log_fn(LOG_WARN,"Fetching socks handshake failed. Closing.");
       connection_ap_handshake_socks_reply(conn, NULL, 0, 0);
@@ -517,13 +542,35 @@
     return sockshere;
   } /* else socks handshake is done, continue processing */
 
+  conn->state = AP_CONN_STATE_CIRCUIT_WAIT;
+  if (connection_ap_handshake_attach_circuit(conn)<0)
+    return -1;
+  return 0;
+}
+
+/* Try to find a live circuit.  If we don't find one, tell 'conn' to
+ * stop reading and return 1.  Otherwise, associate the CONN_TYPE_AP
+ * connection 'conn' with the newest live circuit, and start sending a
+ * BEGIN cell down the circuit.  Returns 0 on success, and -1 on
+ * error.
+ */
+static int connection_ap_handshake_attach_circuit(connection_t *conn) {
+  circuit_t *circ;
+  
+  assert(conn);
+  assert(conn->type == CONN_TYPE_AP);
+  assert(conn->state == AP_CONN_STATE_CIRCUIT_WAIT);
+  assert(conn->socks_request);
+  
   /* find the circuit that we should use, if there is one. */
   circ = circuit_get_newest_open();
 
   if(!circ) {
-    log_fn(LOG_INFO,"No circuit ready. Closing.");
-    return -1;
+    log_fn(LOG_INFO,"No circuit ready for edge connection; delaying.");
+    connection_stop_reading(conn); /* XXX Is this correct? -NM */
+    return 1;
   }
+  connection_start_reading(conn); /* XXX Is this correct? -NM */
 
   circ->dirty = 1;
 
@@ -536,7 +583,7 @@
   assert(circ->cpath->prev->state == CPATH_STATE_OPEN);
   conn->cpath_layer = circ->cpath->prev;
 
-  if(connection_ap_handshake_send_begin(conn, circ, destaddr, destport) < 0) {
+  if(connection_ap_handshake_send_begin(conn, circ) < 0) {
     circuit_close(circ);
     return -1;
   }
@@ -545,11 +592,15 @@
 }
 
 /* deliver the destaddr:destport in a relay cell */
-static int connection_ap_handshake_send_begin(connection_t *ap_conn, circuit_t *circ,
-                                              char *destaddr, uint16_t destport) {
+static int connection_ap_handshake_send_begin(connection_t *ap_conn, circuit_t *circ)
+{
   char payload[CELL_PAYLOAD_SIZE];
   int payload_len;
 
+  assert(ap_conn->type == CONN_TYPE_AP);
+  assert(ap_conn->state == AP_CONN_STATE_CIRCUIT_WAIT);
+  assert(ap_conn->socks_request);
+
   if(crypto_pseudo_rand(STREAM_ID_SIZE, ap_conn->stream_id) < 0)
     return -1;
   /* FIXME check for collisions */
@@ -557,7 +608,7 @@
   memcpy(payload, ap_conn->stream_id, STREAM_ID_SIZE);
   payload_len = STREAM_ID_SIZE + 1 +
     snprintf(payload+STREAM_ID_SIZE,CELL_PAYLOAD_SIZE-RELAY_HEADER_SIZE-STREAM_ID_SIZE,
-             "%s:%d", destaddr, destport);
+             "%s:%d", ap_conn->socks_request->addr, ap_conn->socks_request->port);
 
   log_fn(LOG_DEBUG,"Sending relay cell to begin stream %d.",*(int *)ap_conn->stream_id);
 
@@ -568,6 +619,8 @@
   ap_conn->package_window = STREAMWINDOW_START;
   ap_conn->deliver_window = STREAMWINDOW_START;
   ap_conn->state = AP_CONN_STATE_OPEN;
+  tor_free(ap_conn->socks_request);
+  ap_conn->socks_request = NULL;
   log_fn(LOG_INFO,"Address/port sent, ap socket %d, n_aci %d",ap_conn->s,circ->n_aci);
   return 0;
 }

Index: or.h
===================================================================
RCS file: /home/or/cvsroot/src/or/or.h,v
retrieving revision 1.175
retrieving revision 1.176
diff -u -d -r1.175 -r1.176
--- or.h	10 Nov 2003 08:06:53 -0000	1.175
+++ or.h	11 Nov 2003 02:41:31 -0000	1.176
@@ -160,7 +160,7 @@
 /* the AP state values must be disjoint from the EXIT state values */
 #define _AP_CONN_STATE_MIN 4
 #define AP_CONN_STATE_SOCKS_WAIT 4
-#define AP_CONN_STATE_OR_WAIT 5
+#define AP_CONN_STATE_CIRCUIT_WAIT 5
 #define AP_CONN_STATE_OPEN 6
 #define _AP_CONN_STATE_MAX 6
 
@@ -254,6 +254,7 @@
 #define ZERO_STREAM "\0\0\0\0\0\0\0\0"
 
 typedef struct buf_t buf_t;
+typedef struct socks_request_t socks_request_t;
 
 struct connection_t { 
 
@@ -304,7 +305,6 @@
                         */
 
 /* Used only by edge connections: */
-  char socks_version;
   char stream_id[STREAM_ID_SIZE];
   struct connection_t *next_stream; /* points to the next stream at this edge, if any */
   struct crypt_path_t *cpath_layer; /* a pointer to which node in the circ this conn exits at */
@@ -315,6 +315,10 @@
   int done_receiving;
   char has_sent_end; /* for debugging: set once we've set the stream end,
                         and check in circuit_about_to_close_connection() */
+  
+  /* Used only by AP connections */
+  char socks_version;
+  socks_request_t *socks_request;
 };
 
 typedef struct connection_t connection_t;
@@ -455,6 +459,17 @@
   int loglevel;
 } or_options_t;
 
+#define MAX_SOCKS_REPLY_LEN 256
+/* Not 256; addresses must fit in a begin cell. */
+#define MAX_SOCKS_ADDR_LEN 200
+struct socks_request_t {
+  char socks_version;
+  int replylen;
+  char reply[MAX_SOCKS_REPLY_LEN];
+  char addr[MAX_SOCKS_ADDR_LEN];
+  uint16_t port;
+};
+
 /* all the function prototypes go here */
 
 /********************************* buffers.c ***************************/
@@ -480,10 +495,7 @@
 int fetch_from_buf_http(buf_t *buf,
                         char *headers_out, int max_headerlen,
                         char *body_out, int max_bodylen);
-int fetch_from_buf_socks(buf_t *buf, char *socks_version,
-                         char *reply, int *replylen,
-                         char *addr_out, int max_addrlen,
-                         uint16_t *port_out);
+int fetch_from_buf_socks(buf_t *buf, socks_request_t *req);
 
 /********************************* circuit.c ***************************/
 
@@ -602,6 +614,8 @@
 int connection_edge_package_raw_inbuf(connection_t *conn);
 
 void connection_exit_connect(connection_t *conn);
+
+void connection_ap_attach_pending(void);
 
 extern uint64_t stats_n_data_cells_packaged;
 extern uint64_t stats_n_data_bytes_packaged;