[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [snowflake/master] Add a new heap at the broker for restricted flakes
commit 0052c0e10cfd8e270a57d85711064e8d9e064bf5
Author: Cecylia Bocovich <cohosh@xxxxxxxxxxxxxx>
Date: Tue Jun 16 17:49:39 2020 -0400
Add a new heap at the broker for restricted flakes
Now when proxies poll, they provide their NAT type to the broker. This
introduces a new snowflake heap of just restricted snowflakes that the
broker can pull from if the client has a known, unrestricted NAT. All
other clients will pull from a heap of snowflakes with unrestricted or
unknown NAT topologies.
---
broker/broker.go | 67 ++++++++++++++++++++++++++++++-----------
broker/snowflake-broker_test.go | 14 ++++-----
2 files changed, 57 insertions(+), 24 deletions(-)
diff --git a/broker/broker.go b/broker/broker.go
index 2d3cd4b..9297980 100644
--- a/broker/broker.go
+++ b/broker/broker.go
@@ -31,12 +31,18 @@ const (
ClientTimeout = 10
ProxyTimeout = 10
readLimit = 100000 //Maximum number of bytes to be read from an HTTP request
+
+ NATUnknown = "unknown"
+ NATRestricted = "restricted"
+ NATUnrestricted = "unrestricted"
)
type BrokerContext struct {
- snowflakes *SnowflakeHeap
- // Map keeping track of snowflakeIDs required to match SDP answers from
- // the second http POST.
+ snowflakes *SnowflakeHeap
+ restrictedSnowflakes *SnowflakeHeap
+ // Maps keeping track of snowflakeIDs required to match SDP answers from
+ // the second http POST. Restricted snowflakes can only be matched up with
+ // clients behind an unrestricted NAT.
idToSnowflake map[string]*Snowflake
// Synchronization for the snowflake map and heap
snowflakeLock sync.Mutex
@@ -47,6 +53,8 @@ type BrokerContext struct {
func NewBrokerContext(metricsLogger *log.Logger) *BrokerContext {
snowflakes := new(SnowflakeHeap)
heap.Init(snowflakes)
+ rSnowflakes := new(SnowflakeHeap)
+ heap.Init(rSnowflakes)
metrics, err := NewMetrics(metricsLogger)
if err != nil {
@@ -58,10 +66,11 @@ func NewBrokerContext(metricsLogger *log.Logger) *BrokerContext {
}
return &BrokerContext{
- snowflakes: snowflakes,
- idToSnowflake: make(map[string]*Snowflake),
- proxyPolls: make(chan *ProxyPoll),
- metrics: metrics,
+ snowflakes: snowflakes,
+ restrictedSnowflakes: rSnowflakes,
+ idToSnowflake: make(map[string]*Snowflake),
+ proxyPolls: make(chan *ProxyPoll),
+ metrics: metrics,
}
}
@@ -79,7 +88,7 @@ type MetricsHandler struct {
func (sh SnowflakeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
- w.Header().Set("Access-Control-Allow-Headers", "Origin, X-Session-ID")
+ w.Header().Set("Access-Control-Allow-Headers", "Origin, X-Session-ID, Snowflake-NAT-Type")
// Return early if it's CORS preflight.
if "OPTIONS" == r.Method {
return
@@ -101,15 +110,17 @@ func (mh MetricsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
type ProxyPoll struct {
id string
proxyType string
+ natType string
offerChannel chan []byte
}
// Registers a Snowflake and waits for some Client to send an offer,
// as part of the polling logic of the proxy handler.
-func (ctx *BrokerContext) RequestOffer(id string, proxyType string) []byte {
+func (ctx *BrokerContext) RequestOffer(id string, proxyType string, natType string) []byte {
request := new(ProxyPoll)
request.id = id
request.proxyType = proxyType
+ request.natType = natType
request.offerChannel = make(chan []byte)
ctx.proxyPolls <- request
// Block until an offer is available, or timeout which sends a nil offer.
@@ -122,7 +133,7 @@ func (ctx *BrokerContext) RequestOffer(id string, proxyType string) []byte {
// client offer or nil on timeout / none are available.
func (ctx *BrokerContext) Broker() {
for request := range ctx.proxyPolls {
- snowflake := ctx.AddSnowflake(request.id, request.proxyType)
+ snowflake := ctx.AddSnowflake(request.id, request.proxyType, request.natType)
// Wait for a client to avail an offer to the snowflake.
go func(request *ProxyPoll) {
select {
@@ -133,7 +144,11 @@ func (ctx *BrokerContext) Broker() {
ctx.snowflakeLock.Lock()
defer ctx.snowflakeLock.Unlock()
if snowflake.index != -1 {
- heap.Remove(ctx.snowflakes, snowflake.index)
+ if request.natType == NATRestricted {
+ heap.Remove(ctx.restrictedSnowflakes, snowflake.index)
+ } else {
+ heap.Remove(ctx.snowflakes, snowflake.index)
+ }
delete(ctx.idToSnowflake, snowflake.id)
close(request.offerChannel)
}
@@ -145,7 +160,7 @@ func (ctx *BrokerContext) Broker() {
// Create and add a Snowflake to the heap.
// Required to keep track of proxies between providing them
// with an offer and awaiting their second POST with an answer.
-func (ctx *BrokerContext) AddSnowflake(id string, proxyType string) *Snowflake {
+func (ctx *BrokerContext) AddSnowflake(id string, proxyType string, natType string) *Snowflake {
snowflake := new(Snowflake)
snowflake.id = id
snowflake.clients = 0
@@ -153,7 +168,11 @@ func (ctx *BrokerContext) AddSnowflake(id string, proxyType string) *Snowflake {
snowflake.offerChannel = make(chan []byte)
snowflake.answerChannel = make(chan []byte)
ctx.snowflakeLock.Lock()
- heap.Push(ctx.snowflakes, snowflake)
+ if natType == NATRestricted {
+ heap.Push(ctx.restrictedSnowflakes, snowflake)
+ } else {
+ heap.Push(ctx.snowflakes, snowflake)
+ }
ctx.snowflakeLock.Unlock()
ctx.idToSnowflake[id] = snowflake
return snowflake
@@ -170,7 +189,7 @@ func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
return
}
- sid, proxyType, _, err := messages.DecodePollRequest(body)
+ sid, proxyType, natType, err := messages.DecodePollRequest(body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
@@ -187,7 +206,7 @@ func proxyPolls(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
}
// Wait for a client to avail an offer to the snowflake, or timeout if nil.
- offer := ctx.RequestOffer(sid, proxyType)
+ offer := ctx.RequestOffer(sid, proxyType, natType)
var b []byte
if nil == offer {
ctx.metrics.lock.Lock()
@@ -226,9 +245,23 @@ func clientOffers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusBadRequest)
return
}
+
+ natType := r.Header.Get("Snowflake-NAT-Type")
+ if natType == "" {
+ natType = NATUnknown
+ }
+
+ // Only hand out known restricted snowflakes to unrestricted clients
+ var snowflakeHeap *SnowflakeHeap
+ if natType == NATUnrestricted {
+ snowflakeHeap = ctx.restrictedSnowflakes
+ } else {
+ snowflakeHeap = ctx.snowflakes
+ }
+
// Immediately fail if there are no snowflakes available.
ctx.snowflakeLock.Lock()
- numSnowflakes := ctx.snowflakes.Len()
+ numSnowflakes := snowflakeHeap.Len()
ctx.snowflakeLock.Unlock()
if numSnowflakes <= 0 {
ctx.metrics.lock.Lock()
@@ -240,7 +273,7 @@ func clientOffers(ctx *BrokerContext, w http.ResponseWriter, r *http.Request) {
// Otherwise, find the most available snowflake proxy, and pass the offer to it.
// Delete must be deferred in order to correctly process answer request later.
ctx.snowflakeLock.Lock()
- snowflake := heap.Pop(ctx.snowflakes).(*Snowflake)
+ snowflake := heap.Pop(snowflakeHeap).(*Snowflake)
ctx.snowflakeLock.Unlock()
snowflake.offerChannel <- offer
diff --git a/broker/snowflake-broker_test.go b/broker/snowflake-broker_test.go
index 18b83dd..91383a1 100644
--- a/broker/snowflake-broker_test.go
+++ b/broker/snowflake-broker_test.go
@@ -29,7 +29,7 @@ func TestBroker(t *testing.T) {
Convey("Adds Snowflake", func() {
So(ctx.snowflakes.Len(), ShouldEqual, 0)
So(len(ctx.idToSnowflake), ShouldEqual, 0)
- ctx.AddSnowflake("foo", "")
+ ctx.AddSnowflake("foo", "", NATUnknown)
So(ctx.snowflakes.Len(), ShouldEqual, 1)
So(len(ctx.idToSnowflake), ShouldEqual, 1)
})
@@ -55,7 +55,7 @@ func TestBroker(t *testing.T) {
Convey("Request an offer from the Snowflake Heap", func() {
done := make(chan []byte)
go func() {
- offer := ctx.RequestOffer("test", "")
+ offer := ctx.RequestOffer("test", "", NATUnknown)
done <- offer
}()
request := <-ctx.proxyPolls
@@ -79,7 +79,7 @@ func TestBroker(t *testing.T) {
Convey("with a proxy answer if available.", func() {
done := make(chan bool)
// Prepare a fake proxy to respond with.
- snowflake := ctx.AddSnowflake("fake", "")
+ snowflake := ctx.AddSnowflake("fake", "", NATUnknown)
go func() {
clientOffers(ctx, w, r)
done <- true
@@ -97,7 +97,7 @@ func TestBroker(t *testing.T) {
return
}
done := make(chan bool)
- snowflake := ctx.AddSnowflake("fake", "")
+ snowflake := ctx.AddSnowflake("fake", "", NATUnknown)
go func() {
clientOffers(ctx, w, r)
// Takes a few seconds here...
@@ -147,7 +147,7 @@ func TestBroker(t *testing.T) {
})
Convey("Responds to proxy answers...", func() {
- s := ctx.AddSnowflake("test", "")
+ s := ctx.AddSnowflake("test", "", NATUnknown)
w := httptest.NewRecorder()
data := bytes.NewReader([]byte(`{"Version":"1.0","Sid":"test","Answer":"test"}`))
@@ -260,7 +260,7 @@ func TestBroker(t *testing.T) {
// Manually do the Broker goroutine action here for full control.
p := <-ctx.proxyPolls
So(p.id, ShouldEqual, "ymbcCMto7KHNGYlp")
- s := ctx.AddSnowflake(p.id, "")
+ s := ctx.AddSnowflake(p.id, "", NATUnknown)
go func() {
offer := <-s.offerChannel
p.offerChannel <- offer
@@ -537,7 +537,7 @@ func TestMetrics(t *testing.T) {
So(err, ShouldBeNil)
// Prepare a fake proxy to respond with.
- snowflake := ctx.AddSnowflake("fake", "")
+ snowflake := ctx.AddSnowflake("fake", "", NATUnknown)
go func() {
clientOffers(ctx, w, r)
done <- true
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits