[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [snowflake/master] Add synchronization to prevent race in broker
commit dccc15a6e9d620298f77fb7ae14692723b434306
Author: Cecylia Bocovich <cohosh@xxxxxxxxxxxxxx>
Date: Fri Nov 22 17:15:06 2019 -0500
Add synchronization to prevent race in broker
There's a race condition in the broker where both the proxy and the
client processes try to pop/remove the same snowflake from the heap.
This patch adds synchronization to prevent simultaneous accesses to
snowflakes.
---
broker/broker.go | 34 +++++++++++++++++++++++++++-------
1 file changed, 27 insertions(+), 7 deletions(-)
diff --git a/broker/broker.go b/broker/broker.go
index c166f1a..a5b0edf 100644
--- a/broker/broker.go
+++ b/broker/broker.go
@@ -18,6 +18,7 @@ import (
"os"
"os/signal"
"strings"
+ "sync"
"syscall"
"time"
@@ -37,6 +38,8 @@ type BrokerContext struct {
// Map keeping track of snowflakeIDs required to match SDP answers from
// the second http POST.
idToSnowflake map[string]*Snowflake
+ // Synchronization for the
+ snowflakeLock sync.Mutex
proxyPolls chan *ProxyPoll
metrics *Metrics
}
@@ -127,10 +130,13 @@ func (ctx *BrokerContext) Broker() {
request.offerChannel <- offer
case <-time.After(time.Second * ProxyTimeout):
// This snowflake is no longer available to serve clients.
- // TODO: Fix race using a delete channel
- heap.Remove(ctx.snowflakes, snowflake.index)
- delete(ctx.idToSnowflake, snowflake.id)
- request.offerChannel <- nil
+ ctx.snowflakeLock.Lock()
+ defer ctx.snowflakeLock.Unlock()
+ if snowflake.index != -1 {
+ heap.Remove(ctx.snowflakes, snowflake.index)
+ delete(ctx.idToSnowflake, snowflake.id)
+ close(request.offerChannel)
+ }
}
}(request)
}
@@ -146,7 +152,9 @@ func (ctx *BrokerContext) AddSnowflake(id string, proxyType string) *Snowflake {
snowflake.proxyType = proxyType
snowflake.offerChannel = make(chan []byte)
snowflake.answerChannel = make(chan []byte)
+ ctx.snowflakeLock.Lock()
heap.Push(ctx.snowflakes, snowflake)
+ ctx.snowflakeLock.Unlock()
ctx.idToSnowflake[id] = snowflake
return snowflake
}
@@ -215,15 +223,19 @@ func clientOffers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
return
}
// Immediately fail if there are no snowflakes available.
- if ctx.snowflakes.Len() <= 0 {
+ ctx.snowflakeLock.Lock()
+ numSnowflakes := ctx.snowflakes.Len()
+ ctx.snowflakeLock.Unlock()
+ if numSnowflakes <= 0 {
ctx.metrics.clientDeniedCount++
w.WriteHeader(http.StatusServiceUnavailable)
return
}
// Otherwise, find the most available snowflake proxy, and pass the offer to it.
// Delete must be deferred in order to correctly process answer request later.
+ ctx.snowflakeLock.Lock()
snowflake := heap.Pop(ctx.snowflakes).(*Snowflake)
- defer delete(ctx.idToSnowflake, snowflake.id)
+ ctx.snowflakeLock.Unlock()
snowflake.offerChannel <- offer
// Wait for the answer to be returned on the channel or timeout.
@@ -243,6 +255,10 @@ func clientOffers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
log.Printf("unable to write timeout error, failed with error: %v", err)
}
}
+
+ ctx.snowflakeLock.Lock()
+ delete(ctx.idToSnowflake, snowflake.id)
+ ctx.snowflakeLock.Unlock()
}
/*
@@ -266,7 +282,9 @@ func proxyAnswers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
}
var success = true
+ ctx.snowflakeLock.Lock()
snowflake, ok := ctx.idToSnowflake[id]
+ ctx.snowflakeLock.Unlock()
if !ok || nil == snowflake {
// The snowflake took too long to respond with an answer, so its client
// disappeared / the snowflake is no longer recognized by the Broker.
@@ -287,9 +305,10 @@ func proxyAnswers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
}
func debugHandler(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
- s := fmt.Sprintf("current snowflakes available: %d\n", ctx.snowflakes.Len())
var webexts, browsers, standalones, unknowns int
+ ctx.snowflakeLock.Lock()
+ s := fmt.Sprintf("current snowflakes available: %d\n", len(ctx.idToSnowflake))
for _, snowflake := range ctx.idToSnowflake {
if snowflake.proxyType == "badge" {
browsers++
@@ -302,6 +321,7 @@ func debugHandler(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
}
}
+ ctx.snowflakeLock.Unlock()
s += fmt.Sprintf("\tstandalone proxies: %d", standalones)
s += fmt.Sprintf("\n\tbrowser proxies: %d", browsers)
s += fmt.Sprintf("\n\twebext proxies: %d", webexts)
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits