[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [snowflake/master] Add synchronization to prevent race in broker



commit dccc15a6e9d620298f77fb7ae14692723b434306
Author: Cecylia Bocovich <cohosh@xxxxxxxxxxxxxx>
Date:   Fri Nov 22 17:15:06 2019 -0500

    Add synchronization to prevent race in broker
    
    There's a race condition in the broker where both the proxy and the
    client processes try to pop/remove the same snowflake from the heap.
    This patch adds synchronization to prevent simultaneous accesses to
    snowflakes.
---
 broker/broker.go | 34 +++++++++++++++++++++++++++-------
 1 file changed, 27 insertions(+), 7 deletions(-)

diff --git a/broker/broker.go b/broker/broker.go
index c166f1a..a5b0edf 100644
--- a/broker/broker.go
+++ b/broker/broker.go
@@ -18,6 +18,7 @@ import (
 	"os"
 	"os/signal"
 	"strings"
+	"sync"
 	"syscall"
 	"time"
 
@@ -37,6 +38,8 @@ type BrokerContext struct {
 	// Map keeping track of snowflakeIDs required to match SDP answers from
 	// the second http POST.
 	idToSnowflake map[string]*Snowflake
+	// Synchronization for the
+	snowflakeLock sync.Mutex
 	proxyPolls    chan *ProxyPoll
 	metrics       *Metrics
 }
@@ -127,10 +130,13 @@ func (ctx *BrokerContext) Broker() {
 				request.offerChannel <- offer
 			case <-time.After(time.Second * ProxyTimeout):
 				// This snowflake is no longer available to serve clients.
-				// TODO: Fix race using a delete channel
-				heap.Remove(ctx.snowflakes, snowflake.index)
-				delete(ctx.idToSnowflake, snowflake.id)
-				request.offerChannel <- nil
+				ctx.snowflakeLock.Lock()
+				defer ctx.snowflakeLock.Unlock()
+				if snowflake.index != -1 {
+					heap.Remove(ctx.snowflakes, snowflake.index)
+					delete(ctx.idToSnowflake, snowflake.id)
+					close(request.offerChannel)
+				}
 			}
 		}(request)
 	}
@@ -146,7 +152,9 @@ func (ctx *BrokerContext) AddSnowflake(id string, proxyType string) *Snowflake {
 	snowflake.proxyType = proxyType
 	snowflake.offerChannel = make(chan []byte)
 	snowflake.answerChannel = make(chan []byte)
+	ctx.snowflakeLock.Lock()
 	heap.Push(ctx.snowflakes, snowflake)
+	ctx.snowflakeLock.Unlock()
 	ctx.idToSnowflake[id] = snowflake
 	return snowflake
 }
@@ -215,15 +223,19 @@ func clientOffers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
 		return
 	}
 	// Immediately fail if there are no snowflakes available.
-	if ctx.snowflakes.Len() <= 0 {
+	ctx.snowflakeLock.Lock()
+	numSnowflakes := ctx.snowflakes.Len()
+	ctx.snowflakeLock.Unlock()
+	if numSnowflakes <= 0 {
 		ctx.metrics.clientDeniedCount++
 		w.WriteHeader(http.StatusServiceUnavailable)
 		return
 	}
 	// Otherwise, find the most available snowflake proxy, and pass the offer to it.
 	// Delete must be deferred in order to correctly process answer request later.
+	ctx.snowflakeLock.Lock()
 	snowflake := heap.Pop(ctx.snowflakes).(*Snowflake)
-	defer delete(ctx.idToSnowflake, snowflake.id)
+	ctx.snowflakeLock.Unlock()
 	snowflake.offerChannel <- offer
 
 	// Wait for the answer to be returned on the channel or timeout.
@@ -243,6 +255,10 @@ func clientOffers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
 			log.Printf("unable to write timeout error, failed with error: %v", err)
 		}
 	}
+
+	ctx.snowflakeLock.Lock()
+	delete(ctx.idToSnowflake, snowflake.id)
+	ctx.snowflakeLock.Unlock()
 }
 
 /*
@@ -266,7 +282,9 @@ func proxyAnswers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
 	}
 
 	var success = true
+	ctx.snowflakeLock.Lock()
 	snowflake, ok := ctx.idToSnowflake[id]
+	ctx.snowflakeLock.Unlock()
 	if !ok || nil == snowflake {
 		// The snowflake took too long to respond with an answer, so its client
 		// disappeared / the snowflake is no longer recognized by the Broker.
@@ -287,9 +305,10 @@ func proxyAnswers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
 }
 
 func debugHandler(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
-	s := fmt.Sprintf("current snowflakes available: %d\n", ctx.snowflakes.Len())
 
 	var webexts, browsers, standalones, unknowns int
+	ctx.snowflakeLock.Lock()
+	s := fmt.Sprintf("current snowflakes available: %d\n", len(ctx.idToSnowflake))
 	for _, snowflake := range ctx.idToSnowflake {
 		if snowflake.proxyType == "badge" {
 			browsers++
@@ -302,6 +321,7 @@ func debugHandler(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
 		}
 
 	}
+	ctx.snowflakeLock.Unlock()
 	s += fmt.Sprintf("\tstandalone proxies: %d", standalones)
 	s += fmt.Sprintf("\n\tbrowser proxies: %d", browsers)
 	s += fmt.Sprintf("\n\twebext proxies: %d", webexts)



_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits