[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [snowflake/master] Simplify proxy poll handler, and broker match test



commit 791f6925ec749a28ad95c76325f802bc4de2d75c
Author: Serene Han <keroserene+git@xxxxxxxxx>
Date:   Tue Feb 16 20:50:00 2016 -0800

    Simplify proxy poll handler, and broker match test
---
 broker/broker.go                | 89 ++++++++++++++++++++++-------------------
 broker/snowflake-broker_test.go | 25 +++++++++---
 broker/snowflake-heap.go        |  4 ++
 3 files changed, 70 insertions(+), 48 deletions(-)

diff --git a/broker/broker.go b/broker/broker.go
index 69b8369..9e5ee30 100644
--- a/broker/broker.go
+++ b/broker/broker.go
@@ -28,17 +28,17 @@ type BrokerContext struct {
 	snowflakes *SnowflakeHeap
 	// Map keeping track of snowflakeIDs required to match SDP answers from
 	// the second http POST.
-	snowflakeMap  map[string]*Snowflake
-	createChannel chan *ProxyRequest
+	snowflakeMap map[string]*Snowflake
+	proxyPolls   chan *ProxyPoll
 }
 
 func NewBrokerContext() *BrokerContext {
 	snowflakes := new(SnowflakeHeap)
 	heap.Init(snowflakes)
 	return &BrokerContext{
-		snowflakes:    snowflakes,
-		snowflakeMap:  make(map[string]*Snowflake),
-		createChannel: make(chan *ProxyRequest),
+		snowflakes:   snowflakes,
+		snowflakeMap: make(map[string]*Snowflake),
+		proxyPolls:   make(chan *ProxyPoll),
 	}
 }
 
@@ -51,46 +51,58 @@ func (sh SnowflakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
 	sh.h(sh.BrokerContext, w, r)
 }
 
-type ProxyRequest struct {
-	id        string
-	offerChan chan []byte
+// Proxies may poll for client offers concurrently.
+type ProxyPoll struct {
+	id           string
+	offerChannel chan []byte
 }
 
-// Create and add a Snowflake to the heap.
-func (sc *BrokerContext) AddSnowflake(id string) *Snowflake {
-	snowflake := new(Snowflake)
-	snowflake.id = id
-	snowflake.clients = 0
-	snowflake.offerChannel = make(chan []byte)
-	snowflake.answerChannel = make(chan []byte)
-	heap.Push(sc.snowflakes, snowflake)
-	sc.snowflakeMap[id] = snowflake
-	return snowflake
+// Registers a Snowflake and waits for some Client to send an offer,
+// as part of the polling logic of the proxy handler.
+func (ctx *BrokerContext) RequestOffer(id string) []byte {
+	request := new(ProxyPoll)
+	request.id = id
+	request.offerChannel = make(chan []byte)
+	ctx.proxyPolls <- request
+	// Block until an offer is available...
+	offer := <-request.offerChannel
+	return offer
 }
 
-// Match proxies to clients.
-// func (ctx *BrokerContext) Broker(proxies <-chan *ProxyRequest) {
+// goroutine which match proxies to clients.
+// Safely processes proxy requests, responding to them with either an available
+// client offer or nil on timeout / none are available.
 func (ctx *BrokerContext) Broker() {
-	// for p := range proxies {
-	for p := range ctx.createChannel {
-		snowflake := ctx.AddSnowflake(p.id)
-		// Wait for a client to avail an offer to the snowflake, or timeout
-		// and ask the snowflake to poll later.
-		go func(p *ProxyRequest) {
+	for request := range ctx.proxyPolls {
+		snowflake := ctx.AddSnowflake(request.id)
+		// Wait for a client to avail an offer to the snowflake.
+		go func(request *ProxyPoll) {
 			select {
 			case offer := <-snowflake.offerChannel:
-				log.Println("Passing client offer to snowflake.")
-				p.offerChan <- offer
+				log.Println("Passing client offer to snowflake proxy.")
+				request.offerChannel <- offer
 			case <-time.After(time.Second * ProxyTimeout):
 				// This snowflake is no longer available to serve clients.
 				heap.Remove(ctx.snowflakes, snowflake.index)
 				delete(ctx.snowflakeMap, snowflake.id)
-				p.offerChan <- nil
+				request.offerChannel <- nil
 			}
-		}(p)
+		}(request)
 	}
 }
 
+// Create and add a Snowflake to the heap.
+func (ctx *BrokerContext) AddSnowflake(id string) *Snowflake {
+	snowflake := new(Snowflake)
+	snowflake.id = id
+	snowflake.clients = 0
+	snowflake.offerChannel = make(chan []byte)
+	snowflake.answerChannel = make(chan []byte)
+	heap.Push(ctx.snowflakes, snowflake)
+	ctx.snowflakeMap[id] = snowflake
+	return snowflake
+}
+
 func robotsTxtHandler(w http.ResponseWriter, r *http.Request) {
 	w.Header().Set("Content-Type", "text/plain; charset=utf-8")
 	w.Write([]byte("User-agent: *\nDisallow:\n"))
@@ -145,14 +157,15 @@ func clientHandler(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
 	case answer := <-snowflake.answerChannel:
 		log.Println("Client: Retrieving answer")
 		w.Write(answer)
-		// Only remove from the snowflake map once the answer is set.
-		delete(ctx.snowflakeMap, snowflake.id)
 
 	case <-time.After(time.Second * ClientTimeout):
 		log.Println("Client: Timed out.")
 		w.WriteHeader(http.StatusGatewayTimeout)
 		w.Write([]byte("timed out waiting for answer!"))
 	}
+	// Remove from the snowflake map whether answer was sent or not, because
+	// this client request is now over.
+	delete(ctx.snowflakeMap, snowflake.id)
 }
 
 /*
@@ -172,17 +185,9 @@ func proxyHandler(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
 	if string(body) != id { // Mismatched IDs!
 		w.WriteHeader(http.StatusBadRequest)
 	}
-	// Maybe confirm that X-Session-ID is the same.
 	log.Println("Received snowflake: ", id)
-
-	p := new(ProxyRequest)
-	p.id = id
-	p.offerChan = make(chan []byte)
-	ctx.createChannel <- p
-
-	// Wait for a client to avail an offer to the snowflake, or timeout
-	// and ask the snowflake to poll later.
-	offer := <-p.offerChan
+	// Wait for a client to avail an offer to the snowflake, or timeout if nil.
+	offer := ctx.RequestOffer(id)
 	if nil == offer {
 		log.Println("Proxy " + id + " did not receive a Client offer.")
 		w.WriteHeader(http.StatusGatewayTimeout)
diff --git a/broker/snowflake-broker_test.go b/broker/snowflake-broker_test.go
index ee984b0..b9432d8 100644
--- a/broker/snowflake-broker_test.go
+++ b/broker/snowflake-broker_test.go
@@ -22,6 +22,19 @@ func TestBroker(t *testing.T) {
 			So(len(ctx.snowflakeMap), ShouldEqual, 1)
 		})
 
+		Convey("Broker goroutine matches clients with proxies", func() {
+			p := new(ProxyPoll)
+			p.id = "test"
+			p.offerChannel = make(chan []byte)
+			go func() {
+				ctx.proxyPolls <- p
+				close(ctx.proxyPolls)
+			}()
+			ctx.Broker()
+			So(ctx.snowflakes.Len(), ShouldEqual, 1)
+			So(ctx.snowflakes.Len(), ShouldEqual, 1)
+		})
+
 		Convey("Responds to client offers...", func() {
 			w := httptest.NewRecorder()
 			data := bytes.NewReader([]byte("test"))
@@ -83,9 +96,9 @@ func TestBroker(t *testing.T) {
 					done <- true
 				}(ctx)
 				// Pass a fake client offer to this proxy
-				p := <-ctx.createChannel
+				p := <-ctx.proxyPolls
 				So(p.id, ShouldEqual, "test")
-				p.offerChan <- []byte("fake offer")
+				p.offerChannel <- []byte("fake offer")
 				<-done
 				So(w.Code, ShouldEqual, http.StatusOK)
 				So(w.Body.String(), ShouldEqual, "fake offer")
@@ -96,10 +109,10 @@ func TestBroker(t *testing.T) {
 					proxyHandler(ctx, w, r)
 					done <- true
 				}(ctx)
-				p := <-ctx.createChannel
+				p := <-ctx.proxyPolls
 				So(p.id, ShouldEqual, "test")
 				// nil means timeout
-				p.offerChan <- nil
+				p.offerChannel <- nil
 				<-done
 				So(w.Body.String(), ShouldEqual, "")
 				So(w.Code, ShouldEqual, http.StatusGatewayTimeout)
@@ -159,12 +172,12 @@ func TestBroker(t *testing.T) {
 		}()
 
 		// Manually do the Broker goroutine action here for full control.
-		p := <-ctx.createChannel
+		p := <-ctx.proxyPolls
 		So(p.id, ShouldEqual, "test")
 		s := ctx.AddSnowflake(p.id)
 		go func() {
 			offer := <-s.offerChannel
-			p.offerChan <- offer
+			p.offerChannel <- offer
 		}()
 		So(ctx.snowflakeMap["test"], ShouldNotBeNil)
 
diff --git a/broker/snowflake-heap.go b/broker/snowflake-heap.go
index d37228f..cf249fe 100644
--- a/broker/snowflake-heap.go
+++ b/broker/snowflake-heap.go
@@ -4,6 +4,10 @@ Keeping track of pending available snowflake proxies.
 
 package snowflake_broker
 
+/*
+The Snowflake struct contains a single interaction
+over the offer and answer channels.
+*/
 type Snowflake struct {
 	id            string
 	offerChannel  chan []byte



_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits