[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [snowflake/master] Add context to HTTP handlers, attempt to support localhost Broker.



commit b04d1f67fb9ec8888da295a0fcf908fd8c08b84b
Author: Serene Han <keroserene+git@xxxxxxxxx>
Date:   Fri Feb 12 15:38:28 2016 -0800

    Add context to HTTP handlers, attempt to support localhost Broker.
    Seems unlikely to work due to dev_appserver single inflight request limitation
---
 broker/broker.go                | 127 +++++++++++++++++++++++++++++-----------
 broker/snowflake-broker_test.go |   8 +++
 client/torrc-localhost          |   7 +++
 proxy/broker.coffee             |   3 +-
 4 files changed, 110 insertions(+), 35 deletions(-)

diff --git a/broker/broker.go b/broker/broker.go
index ae039a5..a83e78a 100644
--- a/broker/broker.go
+++ b/broker/broker.go
@@ -11,6 +11,7 @@ package snowflake_broker
 
 import (
 	"container/heap"
+	"fmt"
 	"io/ioutil"
 	"log"
 	"net"
@@ -23,27 +24,68 @@ const (
 	ProxyTimeout  = 10
 )
 
-// This is minimum viable client-proxy registration.
-// TODO(#13): better, more secure registration corresponding to what's in
-// the python flashproxy facilitator.
-var snowflakes *SnowflakeHeap
+type SnowflakeContext struct {
+	snowflakes *SnowflakeHeap
+	// Map keeping track of snowflakeIDs required to match SDP answers from
+	// the second http POST.
+	snowflakeMap map[string]*Snowflake
+}
+
+type SnowflakeHandler struct {
+	*SnowflakeContext
+	h func(*SnowflakeContext, http.ResponseWriter, *http.Request)
+}
+
+func (sh SnowflakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	sh.h(sh.SnowflakeContext, w, r)
+}
 
-// Map keeping track of snowflakeIDs required to match SDP answers from
-// the second http POST.
-var snowflakeMap map[string]*Snowflake
+type ProxyRequest struct {
+	id        string
+	offerChan chan []byte
+}
+
+var createChan = make(chan *ProxyRequest)
 
 // Create and add a Snowflake to the heap.
-func AddSnowflake(id string) *Snowflake {
+func (sc *SnowflakeContext) AddSnowflake(id string) *Snowflake {
+	log.Println(sc.snowflakes)
+
 	snowflake := new(Snowflake)
 	snowflake.id = id
 	snowflake.clients = 0
 	snowflake.offerChannel = make(chan []byte)
 	snowflake.answerChannel = make(chan []byte)
-	heap.Push(snowflakes, snowflake)
-	snowflakeMap[id] = snowflake
+	heap.Push(sc.snowflakes, snowflake)
+	sc.snowflakeMap[id] = snowflake
+
+	log.Println("Total snowflakes available: ", sc.snowflakes.Len())
+	log.Println(sc.snowflakes)
+	log.Println(sc.snowflakeMap)
 	return snowflake
 }
 
+func (sc *SnowflakeContext) Broker(proxies <-chan *ProxyRequest) {
+	for p := range proxies {
+		log.Println("adding ", p.id)
+		snowflake := sc.AddSnowflake(p.id)
+		// Wait for a client to avail an offer to the snowflake, or timeout
+		// and ask the snowflake to poll later.
+		go func(p *ProxyRequest) {
+			select {
+			case offer := <-snowflake.offerChannel:
+				log.Println("Passing client offer to snowflake.")
+				p.offerChan <- offer
+			case <-time.After(time.Second * ProxyTimeout):
+				// This snowflake is no longer available to serve clients.
+				heap.Remove(sc.snowflakes, snowflake.index)
+				delete(sc.snowflakeMap, snowflake.id)
+				p.offerChan <- nil
+			}
+		}(p)
+	}
+}
+
 func robotsTxtHandler(w http.ResponseWriter, r *http.Request) {
 	w.Header().Set("Content-Type", "text/plain; charset=utf-8")
 	w.Write([]byte("User-agent: *\nDisallow:\n"))
@@ -73,7 +115,7 @@ Expects a WebRTC SDP offer in the Request to give to an assigned
 snowflake proxy, which responds with the SDP answer to be sent in
 the HTTP response back to the client.
 */
-func clientHandler(w http.ResponseWriter, r *http.Request) {
+func clientHandler(ctx *SnowflakeContext, w http.ResponseWriter, r *http.Request) {
 	offer, err := ioutil.ReadAll(r.Body)
 	if nil != err {
 		log.Println("Invalid data.")
@@ -85,23 +127,24 @@ func clientHandler(w http.ResponseWriter, r *http.Request) {
 
 	// Find the most available snowflake proxy, and pass the offer to it.
 	// TODO: Needs improvement - maybe shouldn'
-	snowflake := heap.Pop(snowflakes).(*Snowflake)
-	if nil == snowflake {
+	if ctx.snowflakes.Len() <= 0 {
+		log.Println("Client: No snowflake proxies available.")
 		w.WriteHeader(http.StatusServiceUnavailable)
-		// w.Write([]byte("no snowflake proxies available"))
 		return
 	}
+	snowflake := heap.Pop(ctx.snowflakes).(*Snowflake)
 	snowflake.offerChannel <- offer
 
 	// Wait for the answer to be returned on the channel.
 	select {
 	case answer := <-snowflake.answerChannel:
-		log.Println("Retrieving answer")
+		log.Println("Client: Retrieving answer")
 		w.Write(answer)
 		// Only remove from the snowflake map once the answer is set.
-		delete(snowflakeMap, snowflake.id)
+		delete(ctx.snowflakeMap, snowflake.id)
 
 	case <-time.After(time.Second * ClientTimeout):
+		log.Println("Client: Timed out.")
 		w.WriteHeader(http.StatusGatewayTimeout)
 		w.Write([]byte("timed out waiting for answer!"))
 	}
@@ -110,7 +153,7 @@ func clientHandler(w http.ResponseWriter, r *http.Request) {
 /*
 For snowflake proxies to request a client from the Broker.
 */
-func proxyHandler(w http.ResponseWriter, r *http.Request) {
+func proxyHandler(ctx *SnowflakeContext, w http.ResponseWriter, r *http.Request) {
 	if isPreflight(w, r) {
 		return
 	}
@@ -126,21 +169,22 @@ func proxyHandler(w http.ResponseWriter, r *http.Request) {
 	}
 	// Maybe confirm that X-Session-ID is the same.
 	log.Println("Received snowflake: ", id)
-	snowflake := AddSnowflake(id)
+
+	p := new(ProxyRequest)
+	p.id = id
+	p.offerChan = make(chan []byte)
+	createChan <- p
 
 	// Wait for a client to avail an offer to the snowflake, or timeout
 	// and ask the snowflake to poll later.
-	select {
-	case offer := <-snowflake.offerChannel:
-		log.Println("Passing client offer to snowflake.")
-		w.Write(offer)
-
-	case <-time.After(time.Second * ProxyTimeout):
-		// This snowflake is no longer available to serve clients.
-		heap.Remove(snowflakes, snowflake.index)
-		delete(snowflakeMap, snowflake.id)
+	offer := <-p.offerChan
+	if nil == offer {
+		log.Println("Proxy " + id + " did not receive a Client offer.")
 		w.WriteHeader(http.StatusGatewayTimeout)
+		return
 	}
+	log.Println("Passing client offer to snowflake.")
+	w.Write(offer)
 }
 
 /*
@@ -148,12 +192,12 @@ Expects snowflake proxes which have previously successfully received
 an offer from proxyHandler to respond with an answer in an HTTP POST,
 which the broker will pass back to the original client.
 */
-func answerHandler(w http.ResponseWriter, r *http.Request) {
+func answerHandler(ctx *SnowflakeContext, w http.ResponseWriter, r *http.Request) {
 	if isPreflight(w, r) {
 		return
 	}
 	id := r.Header.Get("X-Session-ID")
-	snowflake, ok := snowflakeMap[id]
+	snowflake, ok := ctx.snowflakeMap[id]
 	if !ok || nil == snowflake {
 		// The snowflake took too long to respond with an answer,
 		// and the designated client is no longer around / recognized by the Broker.
@@ -170,15 +214,30 @@ func answerHandler(w http.ResponseWriter, r *http.Request) {
 	snowflake.answerChannel <- body
 }
 
+func debugHandler(ctx *SnowflakeContext, w http.ResponseWriter, r *http.Request) {
+	s := fmt.Sprintf("current: %d", ctx.snowflakes.Len())
+	w.Write([]byte(s))
+}
+
 func init() {
-	snowflakes = new(SnowflakeHeap)
-	snowflakeMap = make(map[string]*Snowflake)
+	// snowflakeMap = make(map[string]*Snowflake)
+	snowflakes := new(SnowflakeHeap)
 	heap.Init(snowflakes)
+	ctx := &SnowflakeContext{
+		snowflakes:   snowflakes,
+		snowflakeMap: make(map[string]*Snowflake),
+	}
+
+	go ctx.Broker(createChan)
 
 	http.HandleFunc("/robots.txt", robotsTxtHandler)
 	http.HandleFunc("/ip", ipHandler)
 
-	http.HandleFunc("/client", clientHandler)
-	http.HandleFunc("/proxy", proxyHandler)
-	http.HandleFunc("/answer", answerHandler)
+	// http.HandleFunc("/client", clientHandler)
+	// http.HandleFunc("/proxy", proxyHandler)
+	// http.HandleFunc("/answer", answerHandler)
+	http.Handle("/client", SnowflakeHandler{ctx, clientHandler})
+	http.Handle("/proxy", SnowflakeHandler{ctx, proxyHandler})
+	http.Handle("/answer", SnowflakeHandler{ctx, answerHandler})
+	http.Handle("/debug", SnowflakeHandler{ctx, debugHandler})
 }
diff --git a/broker/snowflake-broker_test.go b/broker/snowflake-broker_test.go
index 03f876b..47533d0 100644
--- a/broker/snowflake-broker_test.go
+++ b/broker/snowflake-broker_test.go
@@ -22,6 +22,8 @@ func TestSnowflakeHeap(t *testing.T) {
 	s4.clients = 1
 
 	heap.Push(h, s1)
+	if 1 != h.Len() {
+	}
 	heap.Push(h, s2)
 	heap.Push(h, s3)
 	heap.Push(h, s4)
@@ -36,6 +38,9 @@ func TestSnowflakeHeap(t *testing.T) {
 	}
 
 	r := heap.Pop(h).(*Snowflake)
+	if 2 != h.Len() {
+		t.Error("Unexpected length.")
+	}
 	if r.clients != 3 {
 		t.Error("Unexpected clients: ", r.clients)
 	}
@@ -44,6 +49,9 @@ func TestSnowflakeHeap(t *testing.T) {
 	}
 
 	r = heap.Pop(h).(*Snowflake)
+	if 1 != h.Len() {
+		t.Error("Unexpected length.")
+	}
 	if r.clients != 4 {
 		t.Error("Unexpected clients: ", r.clients)
 	}
diff --git a/client/torrc-localhost b/client/torrc-localhost
new file mode 100644
index 0000000..7d539fb
--- /dev/null
+++ b/client/torrc-localhost
@@ -0,0 +1,7 @@
+UseBridges 1
+DataDirectory datadir
+
+ClientTransportPlugin snowflake exec ./client \
+-url http://localhost:8080/ \
+
+Bridge snowflake 0.0.3.0:1
diff --git a/proxy/broker.coffee b/proxy/broker.coffee
index e989d57..71c3186 100644
--- a/proxy/broker.coffee
+++ b/proxy/broker.coffee
@@ -28,7 +28,8 @@ class Broker
     @clients = 0
     @id = genSnowflakeID()
     # Ensure url has the right protocol + trailing slash.
-    @url = 'https://' + @url if 0 != @url.indexOf('https://', 0)
+    @url = 'http://' + @url if 0 == @url.indexOf('localhost', 0)
+    @url = 'https://' + @url if 0 != @url.indexOf('http', 0)
     @url += '/' if '/' != @url.substr -1
 
   # Promises some client SDP Offer.



_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits