[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[or-cvs] Make AP connections wait for a circuit if none exists.
Update of /home/or/cvsroot/src/or
In directory moria.mit.edu:/tmp/cvs-serv14699/src/or
Modified Files:
buffers.c circuit.c connection.c connection_edge.c or.h
Log Message:
Make AP connections wait for a circuit if none exists.
Also:
- Refactor socks request into a separate struct
- Add a separate 'waiting for circuit' state to AP connections
between 'waiting for socks' and 'open'.
Arma: can you check out the XXX's I've added to connection_edge? I may
be mishandling some async and close logic.
Index: buffers.c
===================================================================
RCS file: /home/or/cvsroot/src/or/buffers.c,v
retrieving revision 1.51
retrieving revision 1.52
diff -u -d -r1.51 -r1.52
--- buffers.c 19 Oct 2003 01:10:38 -0000 1.51
+++ buffers.c 11 Nov 2003 02:41:31 -0000 1.52
@@ -395,19 +395,16 @@
* socks4a: "socksheader username\0 destaddr\0"
* socks5 phase one: "version #methods methods"
* socks5 phase two: "version command 0 addresstype..."
- * If it's a complete and valid handshake, and destaddr fits in addr_out,
- * then pull the handshake off the buf, assign to addr_out and port_out,
- * and return 1.
+ * If it's a complete and valid handshake, and destaddr fits in
+ * MAX_SOCKS_ADDR_LEN bytes, then pull the handshake off the buf,
+ * assign to *req, and return 1.
* If it's invalid or too big, return -1.
* Else it's not all there yet, leave buf alone and return 0.
* If you want to specify the socks reply, write it into *reply
* and set *replylen, else leave *replylen alone.
* If returning 0 or -1, *addr_out and *port_out are undefined.
*/
-int fetch_from_buf_socks(buf_t *buf, char *socks_version,
- char *reply, int *replylen,
- char *addr_out, int max_addrlen,
- uint16_t *port_out) {
+int fetch_from_buf_socks(buf_t *buf, socks_request_t *req) {
unsigned char len;
char *tmpbuf=NULL;
uint32_t destip;
@@ -421,25 +418,25 @@
case 5: /* socks5 */
- if(*socks_version != 5) { /* we need to negotiate a method */
+ if(req->socks_version != 5) { /* we need to negotiate a method */
unsigned char nummethods = (unsigned char)*(buf->mem+1);
- assert(!*socks_version);
+ assert(!req->socks_version);
log_fn(LOG_DEBUG,"socks5: learning offered methods");
if(buf->datalen < 2+nummethods)
return 0;
if(!nummethods || !memchr(buf->mem+2, 0, nummethods)) {
log_fn(LOG_WARN,"socks5: offered methods don't include 'no auth'. Rejecting.");
- *replylen = 2; /* 2 bytes of response */
- *reply = 5; /* socks5 reply */
- *(reply+1) = 0xFF; /* reject all methods */
+ req->replylen = 2; /* 2 bytes of response */
+ req->reply[0] = 5; /* socks5 reply */
+ req->reply[1] = 0xFF; /* reject all methods */
return -1;
}
buf_remove_from_front(buf,2+nummethods);/* remove packet from buf */
- *replylen = 2; /* 2 bytes of response */
- *reply = 5; /* socks5 reply */
- *(reply+1) = 0; /* choose the 'no auth' method */
- *socks_version = 5; /* remember that we've already negotiated auth */
+ req->replylen = 2; /* 2 bytes of response */
+ req->reply[0] = 5; /* socks5 reply */
+ req->reply[1] = 0; /* choose the 'no auth' method */
+ req->socks_version = 5; /* remember that we've already negotiated auth */
log_fn(LOG_DEBUG,"socks5: accepted method 0");
return 0;
}
@@ -459,13 +456,13 @@
destip = ntohl(*(uint32_t*)(buf->mem+4));
in.s_addr = htonl(destip);
tmpbuf = inet_ntoa(in);
- if(strlen(tmpbuf)+1 > max_addrlen) {
+ if(strlen(tmpbuf)+1 > MAX_SOCKS_ADDR_LEN) {
log_fn(LOG_WARN,"socks5 IP takes %d bytes, which doesn't fit in %d",
- strlen(tmpbuf)+1,max_addrlen);
+ strlen(tmpbuf)+1,MAX_SOCKS_ADDR_LEN);
return -1;
}
- strcpy(addr_out,tmpbuf);
- *port_out = ntohs(*(uint16_t*)(buf->mem+8));
+ strcpy(req->addr,tmpbuf);
+ req->port = ntohs(*(uint16_t*)(buf->mem+8));
buf_remove_from_front(buf, 10);
return 1;
case 3: /* fqdn */
@@ -473,14 +470,14 @@
len = (unsigned char)*(buf->mem+4);
if(buf->datalen < 7+len) /* addr/port there? */
return 0; /* not yet */
- if(len+1 > max_addrlen) {
+ if(len+1 > MAX_SOCKS_ADDR_LEN) {
log_fn(LOG_WARN,"socks5 hostname is %d bytes, which doesn't fit in %d",
- len+1,max_addrlen);
+ len+1,MAX_SOCKS_ADDR_LEN);
return -1;
}
- memcpy(addr_out,buf->mem+5,len);
- addr_out[len] = 0;
- *port_out = ntohs(*(uint16_t*)(buf->mem+5+len));
+ memcpy(req->addr,buf->mem+5,len);
+ req->addr[len] = 0;
+ req->port = ntohs(*(uint16_t*)(buf->mem+5+len));
buf_remove_from_front(buf, 5+len+2);
return 1;
default: /* unsupported */
@@ -490,7 +487,7 @@
assert(0);
case 4: /* socks4 */
- *socks_version = 4;
+ req->socks_version = 4;
if(buf->datalen < SOCKS4_NETWORK_LEN) /* basic info available? */
return 0; /* not yet */
@@ -499,9 +496,9 @@
return -1;
}
- *port_out = ntohs(*(uint16_t*)(buf->mem+2));
+ req->port = ntohs(*(uint16_t*)(buf->mem+2));
destip = ntohl(*(uint32_t*)(buf->mem+4));
- if(!*port_out || !destip) {
+ if(!req->port || !destip) {
log_fn(LOG_WARN,"socks4: Port or DestIP is zero.");
return -1;
}
@@ -509,7 +506,7 @@
log_fn(LOG_DEBUG,"socks4: destip not in form 0.0.0.x.");
in.s_addr = htonl(destip);
tmpbuf = inet_ntoa(in);
- if(strlen(tmpbuf)+1 > max_addrlen) {
+ if(strlen(tmpbuf)+1 > MAX_SOCKS_ADDR_LEN) {
log_fn(LOG_WARN,"socks4 addr (%d bytes) too long.", strlen(tmpbuf));
return -1;
}
@@ -530,13 +527,13 @@
log_fn(LOG_DEBUG,"Destaddr not here yet.");
return 0;
}
- if(max_addrlen <= next-startaddr) {
+ if(MAX_SOCKS_ADDR_LEN <= next-startaddr) {
log_fn(LOG_WARN,"Destaddr too long.");
return -1;
}
}
log_fn(LOG_DEBUG,"Everything is here. Success.");
- strcpy(addr_out, socks4_prot == socks4 ? tmpbuf : startaddr);
+ strcpy(req->addr, socks4_prot == socks4 ? tmpbuf : startaddr);
buf_remove_from_front(buf, next-buf->mem+1); /* next points to the final \0 on inbuf */
return 1;
Index: circuit.c
===================================================================
RCS file: /home/or/cvsroot/src/or/circuit.c,v
retrieving revision 1.81
retrieving revision 1.82
diff -u -d -r1.81 -r1.82
--- circuit.c 21 Oct 2003 09:48:17 -0000 1.81
+++ circuit.c 11 Nov 2003 02:41:31 -0000 1.82
@@ -755,6 +755,9 @@
if(hop == circ->cpath) { /* done building the circuit. whew. */
circ->state = CIRCUIT_STATE_OPEN;
log_fn(LOG_INFO,"circuit built!");
+ /* Tell any AP connections that have been waiting for a new
+ * circuit that one is ready. */
+ connection_ap_attach_pending();
return 0;
}
Index: connection.c
===================================================================
RCS file: /home/or/cvsroot/src/or/connection.c,v
retrieving revision 1.127
retrieving revision 1.128
diff -u -d -r1.127 -r1.128
--- connection.c 10 Nov 2003 08:06:53 -0000 1.127
+++ connection.c 11 Nov 2003 02:41:31 -0000 1.128
@@ -85,6 +85,10 @@
conn->inbuf = buf_new();
conn->outbuf = buf_new();
}
+ if (type == CONN_TYPE_AP) {
+ conn->socks_request = tor_malloc(sizeof(socks_request_t));
+ memset(conn->socks_request, 0, sizeof(socks_request_t));
+ }
conn->timestamp_created = now;
conn->timestamp_lastread = now;
@@ -115,6 +119,7 @@
if (conn->identity_pkey)
crypto_free_pk_env(conn->identity_pkey);
tor_free(conn->nickname);
+ tor_free(conn->socks_request);
if(conn->s >= 0) {
log_fn(LOG_INFO,"closing fd %d.",conn->s);
@@ -760,6 +765,9 @@
assert_cpath_layer_ok(conn->cpath_layer);
/* XXX unchecked, package window, deliver window. */
}
+ if (conn->type != CONN_TYPE_AP) {
+ assert(!conn->socks_request);
+ }
switch(conn->type)
{
@@ -779,6 +787,12 @@
case CONN_TYPE_AP:
assert(conn->state >= _AP_CONN_STATE_MIN &&
conn->state <= _AP_CONN_STATE_MAX);
+ if (conn->state == AP_CONN_STATE_SOCKS_WAIT ||
+ conn->state == AP_CONN_STATE_CIRCUIT_WAIT) {
+ assert(conn->socks_request);
+ } else {
+ assert(!conn->socks_request);
+ }
break;
case CONN_TYPE_DIR:
assert(conn->state >= _DIR_CONN_STATE_MIN &&
Index: connection_edge.c
===================================================================
RCS file: /home/or/cvsroot/src/or/connection_edge.c,v
retrieving revision 1.45
retrieving revision 1.46
diff -u -d -r1.45 -r1.46
--- connection_edge.c 10 Nov 2003 08:06:53 -0000 1.45
+++ connection_edge.c 11 Nov 2003 02:41:31 -0000 1.46
@@ -7,8 +7,8 @@
extern or_options_t options; /* command-line and config-file options */
static int connection_ap_handshake_process_socks(connection_t *conn);
-static int connection_ap_handshake_send_begin(connection_t *ap_conn, circuit_t *circ,
- char *destaddr, uint16_t destport);
+static int connection_ap_handshake_attach_circuit(connection_t *conn);
+static int connection_ap_handshake_send_begin(connection_t *ap_conn, circuit_t *circ);
static int connection_ap_handshake_socks_reply(connection_t *conn, char *reply,
int replylen, char success);
@@ -465,6 +465,31 @@
goto repeat_connection_edge_package_raw_inbuf;
}
+/* Tell any APs that are waiting for a new circuit that one is available */
+void connection_ap_attach_pending(void)
+{
+ connection_t *conn;
+ int r;
+
+ while ((conn = connection_get_by_type_state(CONN_TYPE_AP,
+ AP_CONN_STATE_CIRCUIT_WAIT))) {
+ r = connection_ap_handshake_attach_circuit(conn);
+ if (r == 0) {
+ /* r==0: We're attached; do nothing. */
+ } else if (r>0) {
+ /* r>0: There was no circuit to attach to: stop the loop. */
+ break;
+ } else {
+ /* r<0: There was an error sending the begin cell; other pending
+ * AP connections may succeed.
+ */
+ /* XXX Is this right? How do we say that the connection failed?
+ * Should I close it? mark it for close? -NM */
+ connection_ap_handshake_socks_reply(conn, NULL, 0, 0);
+ }
+ }
+}
+
static void connection_edge_consider_sending_sendme(connection_t *conn) {
circuit_t *circ;
@@ -491,23 +516,23 @@
}
static int connection_ap_handshake_process_socks(connection_t *conn) {
- circuit_t *circ;
- char destaddr[200]; /* XXX why 200? but not 256, because it won't fit in a cell */
- char reply[256];
- uint16_t destport;
- int replylen=0;
+ socks_request_t *socks;
int sockshere;
assert(conn);
+ assert(conn->type == CONN_TYPE_AP);
+ assert(conn->state == AP_CONN_STATE_SOCKS_WAIT);
+ assert(conn->socks_request);
+ socks = conn->socks_request;
log_fn(LOG_DEBUG,"entered.");
- sockshere = fetch_from_buf_socks(conn->inbuf, &conn->socks_version, reply, &replylen,
- destaddr, sizeof(destaddr), &destport);
+ sockshere = fetch_from_buf_socks(conn->inbuf, socks);
+ conn->socks_version = socks->socks_version;
if(sockshere == -1 || sockshere == 0) {
- if(replylen) { /* we should send reply back */
+ if(socks->replylen) { /* we should send reply back */
log_fn(LOG_DEBUG,"reply is already set for us. Using it.");
- connection_ap_handshake_socks_reply(conn, reply, replylen, 0);
+ connection_ap_handshake_socks_reply(conn, socks->reply, socks->replylen, 0);
} else if(sockshere == -1) { /* send normal reject */
log_fn(LOG_WARN,"Fetching socks handshake failed. Closing.");
connection_ap_handshake_socks_reply(conn, NULL, 0, 0);
@@ -517,13 +542,35 @@
return sockshere;
} /* else socks handshake is done, continue processing */
+ conn->state = AP_CONN_STATE_CIRCUIT_WAIT;
+ if (connection_ap_handshake_attach_circuit(conn)<0)
+ return -1;
+ return 0;
+}
+
+/* Try to find a live circuit. If we don't find one, tell 'conn' to
+ * stop reading and return 1. Otherwise, associate the CONN_TYPE_AP
+ * connection 'conn' with the newest live circuit, and start sending a
+ * BEGIN cell down the circuit. Returns 0 on success, and -1 on
+ * error.
+ */
+static int connection_ap_handshake_attach_circuit(connection_t *conn) {
+ circuit_t *circ;
+
+ assert(conn);
+ assert(conn->type == CONN_TYPE_AP);
+ assert(conn->state == AP_CONN_STATE_CIRCUIT_WAIT);
+ assert(conn->socks_request);
+
/* find the circuit that we should use, if there is one. */
circ = circuit_get_newest_open();
if(!circ) {
- log_fn(LOG_INFO,"No circuit ready. Closing.");
- return -1;
+ log_fn(LOG_INFO,"No circuit ready for edge connection; delaying.");
+ connection_stop_reading(conn); /* XXX Is this correct? -NM */
+ return 1;
}
+ connection_start_reading(conn); /* XXX Is this correct? -NM */
circ->dirty = 1;
@@ -536,7 +583,7 @@
assert(circ->cpath->prev->state == CPATH_STATE_OPEN);
conn->cpath_layer = circ->cpath->prev;
- if(connection_ap_handshake_send_begin(conn, circ, destaddr, destport) < 0) {
+ if(connection_ap_handshake_send_begin(conn, circ) < 0) {
circuit_close(circ);
return -1;
}
@@ -545,11 +592,15 @@
}
/* deliver the destaddr:destport in a relay cell */
-static int connection_ap_handshake_send_begin(connection_t *ap_conn, circuit_t *circ,
- char *destaddr, uint16_t destport) {
+static int connection_ap_handshake_send_begin(connection_t *ap_conn, circuit_t *circ)
+{
char payload[CELL_PAYLOAD_SIZE];
int payload_len;
+ assert(ap_conn->type == CONN_TYPE_AP);
+ assert(ap_conn->state == AP_CONN_STATE_CIRCUIT_WAIT);
+ assert(ap_conn->socks_request);
+
if(crypto_pseudo_rand(STREAM_ID_SIZE, ap_conn->stream_id) < 0)
return -1;
/* FIXME check for collisions */
@@ -557,7 +608,7 @@
memcpy(payload, ap_conn->stream_id, STREAM_ID_SIZE);
payload_len = STREAM_ID_SIZE + 1 +
snprintf(payload+STREAM_ID_SIZE,CELL_PAYLOAD_SIZE-RELAY_HEADER_SIZE-STREAM_ID_SIZE,
- "%s:%d", destaddr, destport);
+ "%s:%d", ap_conn->socks_request->addr, ap_conn->socks_request->port);
log_fn(LOG_DEBUG,"Sending relay cell to begin stream %d.",*(int *)ap_conn->stream_id);
@@ -568,6 +619,8 @@
ap_conn->package_window = STREAMWINDOW_START;
ap_conn->deliver_window = STREAMWINDOW_START;
ap_conn->state = AP_CONN_STATE_OPEN;
+ tor_free(ap_conn->socks_request);
+ ap_conn->socks_request = NULL;
log_fn(LOG_INFO,"Address/port sent, ap socket %d, n_aci %d",ap_conn->s,circ->n_aci);
return 0;
}
Index: or.h
===================================================================
RCS file: /home/or/cvsroot/src/or/or.h,v
retrieving revision 1.175
retrieving revision 1.176
diff -u -d -r1.175 -r1.176
--- or.h 10 Nov 2003 08:06:53 -0000 1.175
+++ or.h 11 Nov 2003 02:41:31 -0000 1.176
@@ -160,7 +160,7 @@
/* the AP state values must be disjoint from the EXIT state values */
#define _AP_CONN_STATE_MIN 4
#define AP_CONN_STATE_SOCKS_WAIT 4
-#define AP_CONN_STATE_OR_WAIT 5
+#define AP_CONN_STATE_CIRCUIT_WAIT 5
#define AP_CONN_STATE_OPEN 6
#define _AP_CONN_STATE_MAX 6
@@ -254,6 +254,7 @@
#define ZERO_STREAM "\0\0\0\0\0\0\0\0"
typedef struct buf_t buf_t;
+typedef struct socks_request_t socks_request_t;
struct connection_t {
@@ -304,7 +305,6 @@
*/
/* Used only by edge connections: */
- char socks_version;
char stream_id[STREAM_ID_SIZE];
struct connection_t *next_stream; /* points to the next stream at this edge, if any */
struct crypt_path_t *cpath_layer; /* a pointer to which node in the circ this conn exits at */
@@ -315,6 +315,10 @@
int done_receiving;
char has_sent_end; /* for debugging: set once we've set the stream end,
and check in circuit_about_to_close_connection() */
+
+ /* Used only by AP connections */
+ char socks_version;
+ socks_request_t *socks_request;
};
typedef struct connection_t connection_t;
@@ -455,6 +459,17 @@
int loglevel;
} or_options_t;
+#define MAX_SOCKS_REPLY_LEN 256
+/* Not 256; addresses must fit in a begin cell. */
+#define MAX_SOCKS_ADDR_LEN 200
+struct socks_request_t {
+ char socks_version;
+ int replylen;
+ char reply[MAX_SOCKS_REPLY_LEN];
+ char addr[MAX_SOCKS_ADDR_LEN];
+ uint16_t port;
+};
+
/* all the function prototypes go here */
/********************************* buffers.c ***************************/
@@ -480,10 +495,7 @@
int fetch_from_buf_http(buf_t *buf,
char *headers_out, int max_headerlen,
char *body_out, int max_bodylen);
-int fetch_from_buf_socks(buf_t *buf, char *socks_version,
- char *reply, int *replylen,
- char *addr_out, int max_addrlen,
- uint16_t *port_out);
+int fetch_from_buf_socks(buf_t *buf, socks_request_t *req);
/********************************* circuit.c ***************************/
@@ -602,6 +614,8 @@
int connection_edge_package_raw_inbuf(connection_t *conn);
void connection_exit_connect(connection_t *conn);
+
+void connection_ap_attach_pending(void);
extern uint64_t stats_n_data_cells_packaged;
extern uint64_t stats_n_data_bytes_packaged;