[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [meek/master] Use persistent connections in the WSGI reflector.
commit e06bcf2c849075cf241addf2182dfc0125a35c92
Author: David Fifield <david@xxxxxxxxxxxxxxx>
Date: Tue Apr 7 09:36:35 2015 -0700
Use persistent connections in the WSGI reflector.
This improves performance quite a lot--previously we were doing a
complete TCP and TLS handshake to meek-server for every single request,
which, apart from increasing latency, also caused a lot of CPU usage on
meek-server. it was up above 80% when I checked it.
Now we reuse connections until they error out, making a new connection
if all others are currently busy.
---
wsgi/reflect.py | 90 ++++++++++++++++++++++++++++++++++++-------------------
1 file changed, 60 insertions(+), 30 deletions(-)
diff --git a/wsgi/reflect.py b/wsgi/reflect.py
index e9bff42..5b89f6a 100644
--- a/wsgi/reflect.py
+++ b/wsgi/reflect.py
@@ -2,35 +2,18 @@
import httplib
import urlparse
+import threading
FORWARD_URL = "https://meek.bamsoftware.com/"
TIMEOUT = 20
BUFSIZ = 2048
+MAX_REQUEST_LENGTH = 0x10000
REFLECTED_HEADER_FIELDS = [
"Content-Type",
"X-Session-Id",
]
-# Limits a file-like object to reading only n bytes. Used to limit wsgi.input to
-# the Content-Length, otherwise it blocks.
-class LimitedReader(object):
- def __init__(self, f, n):
- self.f = f
- self.n = n
-
- def __getattr__(self, name):
- return getattr(self.f, name)
-
- def read(self, size=None):
- if self.n <= 0:
- return ""
- if size is None or size > self.n:
- size = self.n
- data = self.f.read(size)
- self.n -= len(data)
- return data
-
# Join two URL paths.
def path_join(a, b):
if a.endswith("/"):
@@ -60,8 +43,14 @@ def copy_request(environ, url):
content_length = environ.get("CONTENT_LENGTH")
if content_length:
- body = LimitedReader(environ["wsgi.input"], int(content_length))
- headers.append(("Content-Length", content_length))
+ content_length = int(content_length)
+ # We read the whole response body (and limit its length). Normally we
+ # would just pass environ["wsgi.input"] as the body to
+ # HTTPSConnection.request. But make_request may need to try the request
+ # twice, in which case it needs to send the same body the second time.
+ if content_length > MAX_REQUEST_LENGTH:
+ raise ValueError("Content-Length too large: %d" % content_length)
+ body = environ["wsgi.input"].read(content_length)
else:
body = ""
@@ -73,18 +62,56 @@ def copy_request(environ, url):
return method, url, body, headers
-def make_conn(url):
- u = urlparse.urlsplit(url)
- create_connection = httplib.HTTPConnection
- if u.scheme == "https":
- create_connection = httplib.HTTPSConnection
- return create_connection(u.hostname, u.port, strict=True, timeout=TIMEOUT)
+# We want to reuse persistent HTTPSConnections. If we don't then every request
+# will start a branch new TCP and TLS connection, leading to increased latency
+# and high CPU use on meek-server. A pool just locks connections so only one
+# thread can use a connection at a time. If the connection is still good after
+# use, then the caller should put it back by calling restore_conn.
+class ConnectionPool(object):
+ def __init__(self, url):
+ self.url = urlparse.urlsplit(url)
+ self.conns = []
+ self.lock = threading.RLock()
+
+ def new_conn(self):
+ create_connection = httplib.HTTPConnection
+ if self.url.scheme == "https":
+ create_connection = httplib.HTTPSConnection
+ return create_connection(self.url.hostname, self.url.port, strict=True, timeout=TIMEOUT)
+
+ def get_conn(self):
+ with self.lock:
+ try:
+ return self.conns.pop(0)
+ except IndexError:
+ pass
+ return self.new_conn()
+
+ def restore_conn(self, conn):
+ with self.lock:
+ self.conns.append(conn)
def make_request(conn, method, url, body, headers):
u = urlparse.urlsplit(url)
path = urlparse.urlunsplit(("", "", u.path, u.query, ""))
conn.request(method, path, body, headers)
- return conn.getresponse()
+ try:
+ return conn.getresponse()
+ except httplib.BadStatusLine, e:
+ if e.message != "":
+ raise
+ # There's a very common error with httplib persistent connections. If
+ # you let a connection idle until it times out, then issue a request,
+ # you will get a BadStatusLine("") exception, not when the request is
+ # sent, but when getresponse tries to read from a closed socket. When
+ # that happens, we reinitialize the connection by first closing it,
+ # which will cause a new TCP and TLS handshake to happen for the next
+ # request.
+ conn.close()
+ conn.request(method, path, body, headers)
+ return conn.getresponse()
+
+pool = ConnectionPool(FORWARD_URL)
def main(environ, start_response):
try:
@@ -93,10 +120,12 @@ def main(environ, start_response):
start_response("400 Bad Request", [("Content-Type", "text/plain; charset=utf-8")])
yield "Bad Request"
return
+
try:
- conn = make_conn(url)
+ conn = pool.get_conn()
resp = make_request(conn, method, url, body, headers)
except Exception, e:
+ # Discard conn.
start_response("500 Internal Server Error", [("Content-Type", "text/plain; charset=utf-8")])
yield "Internal Server Error"
return
@@ -114,7 +143,8 @@ def main(environ, start_response):
break
yield data
- conn.close()
+ resp.close()
+ pool.restore_conn(conn)
if __name__ == "__main__":
import wsgiref.simple_server
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits