[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [snowflake/master] broker tracking a heap of snowflakes
commit 0cd6852ad0060fad40aaf31d5ba1f6951127fc35
Author: Serene Han <keroserene+git@xxxxxxxxx>
Date: Thu Jan 21 10:44:14 2016 -0800
broker tracking a heap of snowflakes
---
broker/snowflake-broker.go | 130 +++++++++++++++++++++++++++++++++++----
broker/snowflake-broker_test.go | 59 ++++++++++++++++++
client/meek-webrtc.go | 3 +-
proxy/broker.coffee | 7 +--
proxy/snowflake.coffee | 2 +-
5 files changed, 182 insertions(+), 19 deletions(-)
diff --git a/broker/snowflake-broker.go b/broker/snowflake-broker.go
index 4c0e62f..279b2b6 100644
--- a/broker/snowflake-broker.go
+++ b/broker/snowflake-broker.go
@@ -1,13 +1,13 @@
package snowflake_broker
import (
- // "io"
+ "container/heap"
+ "fmt"
"io/ioutil"
"log"
"net"
"net/http"
- "path"
-
+ "time"
// "appengine"
// "appengine/urlfetch"
)
@@ -15,7 +15,64 @@ import (
// This is an intermediate step - a basic hardcoded appengine rendezvous
// to a single browser snowflake.
-var snowflakeProxy = ""
+// This is minimum viable client-proxy registration.
+// TODO: better, more secure registration corresponding to what's in
+// the python flashproxy facilitator.
+
+// Slice of available snowflake proxies.
+// var snowflakes []chan []byte
+
+type Snowflake struct {
+ id string
+ sigChannel chan []byte
+ clients int
+ index int
+}
+
+// Implements heap.Interface, and holds Snowflakes.
+type SnowflakeHeap []*Snowflake
+
+func (sh SnowflakeHeap) Len() int { return len(sh) }
+
+func (sh SnowflakeHeap) Less(i, j int) bool {
+ // Snowflakes serving less clients should sort earlier.
+ return sh[i].clients < sh[j].clients
+}
+
+func (sh SnowflakeHeap) Swap(i, j int) {
+ sh[i], sh[j] = sh[j], sh[i]
+ sh[i].index = i
+ sh[j].index = j
+}
+
+func (sh *SnowflakeHeap) Push(s interface{}) {
+ n := len(*sh)
+ snowflake := s.(*Snowflake)
+ snowflake.index = n
+ *sh = append(*sh, snowflake)
+}
+
+// Only valid when Len() > 0.
+func (sh *SnowflakeHeap) Pop() interface{} {
+ flakes := *sh
+ n := len(flakes)
+ snowflake := flakes[n-1]
+ snowflake.index = -1
+ *sh = flakes[0 : n-1]
+ return snowflake
+}
+
+var snowflakes *SnowflakeHeap
+
+// Create and add a Snowflake to the heap.
+func AddSnowflake(id string) *Snowflake {
+ snowflake := new(Snowflake)
+ snowflake.id = id
+ snowflake.clients = 0
+ snowflake.sigChannel = make(chan []byte)
+ heap.Push(snowflakes, snowflake)
+ return snowflake
+}
func robotsTxtHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
@@ -36,29 +93,76 @@ Expects a WebRTC SDP offer in the Request to give to an assigned
snowflake proxy, which responds with the SDP answer to be sent in
the HTTP response back to the client.
*/
-func regHandler(w http.ResponseWriter, r *http.Request) {
- // TODO: Maybe don't pass anything on path, since it will always be bidirectional
- dir, _ := path.Split(path.Clean(r.URL.Path))
- if dir != "/reg/" {
- http.NotFound(w, r)
+func clientHandler(w http.ResponseWriter, r *http.Request) {
+ offer, err := ioutil.ReadAll(r.Body)
+ if nil != err {
+ log.Println("Invalid data.")
+ return
+ }
+ w.Header().Set("Access-Control-Allow-Origin", "*")
+ // Pop the most available snowflake proxy, and pass the offer to it.
+ // TODO: Make this much better.
+ snowflake := heap.Pop(snowflakes).(*Snowflake)
+ if nil == snowflake {
+ w.Write([]byte("no snowflake proxies available"))
+ // w.WriteHeader(http.StatusServiceUnavailable)
return
}
+ // snowflakes = snowflakes[1:]
+ snowflake.sigChannel <- offer
+ w.Write([]byte("sent offer to proxy!"))
+ // TODO: Get browser snowflake to talkto this appengine instance
+ // so it can reply with an answer, and not just the offer again :)
+ // TODO: Real broker which matches clients and snowflake proxies.
+ w.Write(offer)
+}
+
+/*
+A snowflake browser proxy requests a client from the Broker.
+*/
+func proxyHandler(w http.ResponseWriter, r *http.Request) {
body, err := ioutil.ReadAll(r.Body)
if nil != err {
+ log.Println("Invalid data.")
return
+ }
+ w.Header().Set("Access-Control-Allow-Origin", "*")
+ snowflakeSession := body
+ log.Println("Received snowflake: ", snowflakeSession)
+ snowflake := AddSnowflake(string(snowflakeSession))
+ select {
+ case offer := <-snowflake.sigChannel:
+ log.Println("Passing client offer to snowflake.")
+ w.Write(offer)
+ case <-time.After(time.Second * 10):
+ s := fmt.Sprintf("%d snowflakes left.", snowflakes.Len())
+ w.Write([]byte("timed out. " + s))
+ heap.Remove(snowflakes, snowflake.index)
+ // w.WriteHeader(http.StatusRequestTimeout)
+ }
+}
+
+func reflectHandler(w http.ResponseWriter, r *http.Request) {
+ body, err := ioutil.ReadAll(r.Body)
+ if nil != err {
log.Println("Invalid data.")
+ return
}
w.Header().Set("Access-Control-Allow-Origin", "*")
- // TODO: Get browser snowflake to talkto this appengine instance
- // so it can reply with an answer, and not just the offer again :)
- // TODO: Real broker which matches clients and snowflake proxies.
w.Write(body)
}
func init() {
+ // snowflakes = make([]chan []byte, 0)
+ snowflakes = new(SnowflakeHeap)
+ heap.Init(snowflakes)
+
http.HandleFunc("/robots.txt", robotsTxtHandler)
http.HandleFunc("/ip", ipHandler)
- http.HandleFunc("/reg/", regHandler)
+
+ http.HandleFunc("/client", clientHandler)
+ http.HandleFunc("/proxy", proxyHandler)
+ http.HandleFunc("/reflect", reflectHandler)
// if SNOWFLAKE_BROKER == "" {
// panic("SNOWFLAKE_BROKER empty; did you forget to edit config.go?")
// }
diff --git a/broker/snowflake-broker_test.go b/broker/snowflake-broker_test.go
new file mode 100644
index 0000000..03f876b
--- /dev/null
+++ b/broker/snowflake-broker_test.go
@@ -0,0 +1,59 @@
+package snowflake_broker
+
+import (
+ "container/heap"
+ "testing"
+)
+
+func TestSnowflakeHeap(t *testing.T) {
+ h := new(SnowflakeHeap)
+ heap.Init(h)
+ if 0 != h.Len() {
+ t.Error("Unexpected length.")
+ }
+ s1 := new(Snowflake)
+ s2 := new(Snowflake)
+ s3 := new(Snowflake)
+ s4 := new(Snowflake)
+
+ s1.clients = 4
+ s2.clients = 5
+ s3.clients = 3
+ s4.clients = 1
+
+ heap.Push(h, s1)
+ heap.Push(h, s2)
+ heap.Push(h, s3)
+ heap.Push(h, s4)
+
+ if 4 != h.Len() {
+ t.Error("Unexpected length.")
+ }
+
+ heap.Remove(h, 0)
+ if 3 != h.Len() {
+ t.Error("Unexpected length.")
+ }
+
+ r := heap.Pop(h).(*Snowflake)
+ if r.clients != 3 {
+ t.Error("Unexpected clients: ", r.clients)
+ }
+ if r.index != -1 {
+ t.Error("Unexpected index: ", r.index)
+ }
+
+ r = heap.Pop(h).(*Snowflake)
+ if r.clients != 4 {
+ t.Error("Unexpected clients: ", r.clients)
+ }
+
+ r = heap.Pop(h).(*Snowflake)
+ if r.clients != 5 {
+ t.Error("Unexpected clients: ", r.clients)
+ }
+
+ if 0 != h.Len() {
+ t.Error("Unexpected length.")
+ }
+}
diff --git a/client/meek-webrtc.go b/client/meek-webrtc.go
index df59e98..2d68770 100644
--- a/client/meek-webrtc.go
+++ b/client/meek-webrtc.go
@@ -37,7 +37,7 @@ func NewMeekChannel(broker string, front string) *MeekChannel {
mc.Method = "POST"
mc.trueURL = targetUrl
- mc.externalUrl = front + "/reg/test" // TODO: Have a better suffix.
+ mc.externalUrl = front + "/client"
// We make a copy of DefaultTransport because we want the default Dial
// and TLSHandshakeTimeout settings. But we want to disable the default
@@ -70,6 +70,7 @@ func (mc *MeekChannel) Negotiate(offer *webrtc.SessionDescription) (
if nil != err {
return nil, err
}
+ log.Println("Body: ", string(body))
answer := webrtc.DeserializeSessionDescription(string(body))
return answer, nil
}
diff --git a/proxy/broker.coffee b/proxy/broker.coffee
index fb2add9..9432e5c 100644
--- a/proxy/broker.coffee
+++ b/proxy/broker.coffee
@@ -21,7 +21,6 @@ class Broker
xhr = new XMLHttpRequest()
try
xhr.open 'POST', @url
- xhr
catch err
###
An exception happens here when, for example, NoScript allows the domain on
@@ -35,14 +34,14 @@ class Broker
# xhr.responseType = 'text'
xhr.onreadystatechange = ->
if xhr.DONE == xhr.readyState
+ log 'Broker: ' + xhr.status
if 200 == xhr.status
- log 'Broker: success'
log 'Response: ' + xhr.responseText
- # @fac_complete xhr.responseText
+ log xhr
else
log 'Broker error ' + xhr.status + ' - ' + xhr.statusText
-
xhr.send 'snowflake-testing'
+ log "Broker: sent a registration message, waiting for reply..."
sendAnswer: (answer) ->
log 'Sending answer to broker.'
diff --git a/proxy/snowflake.coffee b/proxy/snowflake.coffee
index ac59419..1b837d5 100644
--- a/proxy/snowflake.coffee
+++ b/proxy/snowflake.coffee
@@ -8,7 +8,7 @@ Assume that the webrtc client plugin is always the offerer, in which case
this must always act as the answerer.
###
DEFAULT_WEBSOCKET = '192.81.135.242:9901'
-DEFAULT_BROKER = 'https://snowflake-reg.appspot.com/reg/test'
+DEFAULT_BROKER = 'https://snowflake-reg.appspot.com/proxy'
DEFAULT_PORTS =
http: 80
https: 443
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits