[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [snowflake/master] Ported snowflake client to work with pion/webrtc
commit b5c50b69d080fe488620367a1203ec3c67bb93b1
Author: Cecylia Bocovich <cohosh@xxxxxxxxxxxxxx>
Date: Tue Jun 25 16:35:37 2019 -0400
Ported snowflake client to work with pion/webrtc
Modified the snowflake client to use pion/webrtc as the webrtc library.
This involved a few small changes to match function signatures as well
as several larger ones:
- OnNegotiationNeeded is no longer supported, so CreateOffer and
SetLocalDescription have been moved to a go routine called after the
other peer connection callbacks are set
- We need our own deserialize/serialize functions
- We need to use a SettingEngine in order to access the
OnICEGatheringStateChange callback
---
client/lib/interfaces.go | 2 +-
client/lib/lib_test.go | 14 +++----
client/lib/rendezvous.go | 21 ++++++-----
client/lib/util.go | 59 +++++++++++++++++++++++++----
client/lib/webrtc.go | 98 ++++++++++++++++++++++++++----------------------
client/snowflake.go | 28 ++++++++++----
6 files changed, 146 insertions(+), 76 deletions(-)
diff --git a/client/lib/interfaces.go b/client/lib/interfaces.go
index f62d4f5..609e610 100644
--- a/client/lib/interfaces.go
+++ b/client/lib/interfaces.go
@@ -52,5 +52,5 @@ type SocksConnector interface {
// Interface for the Snowflake's transport. (Typically just webrtc.DataChannel)
type SnowflakeDataChannel interface {
io.Closer
- Send([]byte)
+ Send([]byte) error
}
diff --git a/client/lib/lib_test.go b/client/lib/lib_test.go
index 4f74cb3..4e9e2c7 100644
--- a/client/lib/lib_test.go
+++ b/client/lib/lib_test.go
@@ -8,7 +8,7 @@ import (
"net/http"
"testing"
- "github.com/keroserene/go-webrtc"
+ "github.com/pion/webrtc"
. "github.com/smartystreets/goconvey/convey"
)
@@ -17,9 +17,10 @@ type MockDataChannel struct {
done chan bool
}
-func (m *MockDataChannel) Send(data []byte) {
+func (m *MockDataChannel) Send(data []byte) error {
m.destination.Write(data)
m.done <- true
+ return nil
}
func (*MockDataChannel) Close() error { return nil }
@@ -217,11 +218,11 @@ func TestSnowflakeClient(t *testing.T) {
c.offerChannel = make(chan *webrtc.SessionDescription, 1)
c.answerChannel = make(chan *webrtc.SessionDescription, 1)
- c.config = webrtc.NewConfiguration()
+ c.config = &webrtc.Configuration{}
c.preparePeerConnection()
c.offerChannel <- nil
- answer := webrtc.DeserializeSessionDescription(
+ answer := deserializeSessionDescription(
`{"type":"answer","sdp":""}`)
c.answerChannel <- answer
c.exchangeSDP()
@@ -264,12 +265,11 @@ func TestSnowflakeClient(t *testing.T) {
})
Convey("Rendezvous", t, func() {
- webrtc.SetLoggingVerbosity(0)
transport := &MockTransport{
http.StatusOK,
[]byte(`{"type":"answer","sdp":"fake"}`),
}
- fakeOffer := webrtc.DeserializeSessionDescription("test")
+ fakeOffer := deserializeSessionDescription(`{"type":"offer","sdp":"test"}`)
Convey("Construct BrokerChannel with no front domain", func() {
b := NewBrokerChannel("test.broker", "", transport)
@@ -291,7 +291,7 @@ func TestSnowflakeClient(t *testing.T) {
answer, err := b.Negotiate(fakeOffer)
So(err, ShouldBeNil)
So(answer, ShouldNotBeNil)
- So(answer.Sdp, ShouldResemble, "fake")
+ So(answer.SDP, ShouldResemble, "fake")
})
Convey("BrokerChannel.Negotiate fails with 503", func() {
diff --git a/client/lib/rendezvous.go b/client/lib/rendezvous.go
index 54ce459..8f994f4 100644
--- a/client/lib/rendezvous.go
+++ b/client/lib/rendezvous.go
@@ -17,7 +17,7 @@ import (
"net/http"
"net/url"
- "github.com/keroserene/go-webrtc"
+ "github.com/pion/webrtc"
)
const (
@@ -84,7 +84,7 @@ func (bc *BrokerChannel) Negotiate(offer *webrtc.SessionDescription) (
*webrtc.SessionDescription, error) {
log.Println("Negotiating via BrokerChannel...\nTarget URL: ",
bc.Host, "\nFront URL: ", bc.url.Host)
- data := bytes.NewReader([]byte(offer.Serialize()))
+ data := bytes.NewReader([]byte(serializeSessionDescription(offer)))
// Suffix with broker's client registration handler.
clientURL := bc.url.ResolveReference(&url.URL{Path: "client"})
request, err := http.NewRequest("POST", clientURL.String(), data)
@@ -107,7 +107,7 @@ func (bc *BrokerChannel) Negotiate(offer *webrtc.SessionDescription) (
if nil != err {
return nil, err
}
- answer := webrtc.DeserializeSessionDescription(string(body))
+ answer := deserializeSessionDescription(string(body))
return answer, nil
case http.StatusServiceUnavailable:
@@ -126,15 +126,18 @@ type WebRTCDialer struct {
}
func NewWebRTCDialer(
- broker *BrokerChannel, iceServers IceServerList) *WebRTCDialer {
- config := webrtc.NewConfiguration(iceServers...)
- if nil == config {
- log.Println("Unable to prepare WebRTC configuration.")
- return nil
+ broker *BrokerChannel, iceServers []webrtc.ICEServer) *WebRTCDialer {
+ var config webrtc.Configuration
+ if iceServers == nil {
+ config = webrtc.Configuration{
+ ICEServers: iceServers,
+ }
+ } else {
+ config = webrtc.Configuration{}
}
return &WebRTCDialer{
BrokerChannel: broker,
- webrtcConfig: config,
+ webrtcConfig: &config,
}
}
diff --git a/client/lib/util.go b/client/lib/util.go
index 028fb1c..f385279 100644
--- a/client/lib/util.go
+++ b/client/lib/util.go
@@ -1,23 +1,17 @@
package lib
import (
- "fmt"
+ "encoding/json"
"log"
"time"
- "github.com/keroserene/go-webrtc"
+ "github.com/pion/webrtc"
)
const (
LogTimeInterval = 5
)
-type IceServerList []webrtc.ConfigurationOption
-
-func (i *IceServerList) String() string {
- return fmt.Sprint(*i)
-}
-
type BytesLogger interface {
Log()
AddOutbound(int)
@@ -93,3 +87,52 @@ func (b *BytesSyncLogger) AddInbound(amount int) {
}
b.InboundChan <- amount
}
+func deserializeSessionDescription(msg string) *webrtc.SessionDescription {
+ var parsed map[string]interface{}
+ err := json.Unmarshal([]byte(msg), &parsed)
+ if nil != err {
+ log.Println(err)
+ return nil
+ }
+ if _, ok := parsed["type"]; !ok {
+ log.Println("Cannot deserialize SessionDescription without type field.")
+ return nil
+ }
+ if _, ok := parsed["sdp"]; !ok {
+ log.Println("Cannot deserialize SessionDescription without sdp field.")
+ return nil
+ }
+
+ var stype webrtc.SDPType
+ switch parsed["type"].(string) {
+ default:
+ log.Println("Unknown SDP type")
+ return nil
+ case "offer":
+ stype = webrtc.SDPTypeOffer
+ case "pranswer":
+ stype = webrtc.SDPTypePranswer
+ case "answer":
+ stype = webrtc.SDPTypeAnswer
+ case "rollback":
+ stype = webrtc.SDPTypeRollback
+ }
+
+ if err != nil {
+ log.Println(err)
+ return nil
+ }
+ return &webrtc.SessionDescription{
+ Type: stype,
+ SDP: parsed["sdp"].(string),
+ }
+}
+
+func serializeSessionDescription(desc *webrtc.SessionDescription) string {
+ bytes, err := json.Marshal(*desc)
+ if nil != err {
+ log.Println(err)
+ return ""
+ }
+ return string(bytes)
+}
diff --git a/client/lib/webrtc.go b/client/lib/webrtc.go
index 6406da5..dbc205e 100644
--- a/client/lib/webrtc.go
+++ b/client/lib/webrtc.go
@@ -9,7 +9,7 @@ import (
"time"
"github.com/dchest/uniuri"
- "github.com/keroserene/go-webrtc"
+ "github.com/pion/webrtc"
)
// Remote WebRTC peer.
@@ -151,48 +151,54 @@ func (c *WebRTCPeer) Connect() error {
// Create and prepare callbacks on a new WebRTC PeerConnection.
func (c *WebRTCPeer) preparePeerConnection() error {
if nil != c.pc {
- c.pc.Destroy()
+ c.pc.Close()
c.pc = nil
}
- pc, err := webrtc.NewPeerConnection(c.config)
+ s := webrtc.SettingEngine{}
+ s.SetTrickle(true)
+ api := webrtc.NewAPI(webrtc.WithSettingEngine(s))
+ pc, err := api.NewPeerConnection(*c.config)
if err != nil {
log.Printf("NewPeerConnection ERROR: %s", err)
return err
}
// Prepare PeerConnection callbacks.
- pc.OnNegotiationNeeded = func() {
- log.Println("WebRTC: OnNegotiationNeeded")
- go func() {
- offer, err := pc.CreateOffer()
- // TODO: Potentially timeout and retry if ICE isn't working.
- if err != nil {
- c.errorChannel <- err
- return
- }
- err = pc.SetLocalDescription(offer)
- if err != nil {
- c.errorChannel <- err
- return
- }
- }()
- }
- // Allow candidates to accumulate until IceGatheringStateComplete.
- pc.OnIceCandidate = func(candidate webrtc.IceCandidate) {
- log.Printf(candidate.Candidate)
- }
- pc.OnIceGatheringStateChange = func(state webrtc.IceGatheringState) {
- if state == webrtc.IceGatheringStateComplete {
- log.Printf("WebRTC: IceGatheringStateComplete")
+ // Allow candidates to accumulate until ICEGatheringStateComplete.
+ pc.OnICECandidate(func(candidate *webrtc.ICECandidate) {
+ if candidate == nil {
+ log.Printf("WebRTC: Done gathering candidates")
+ } else {
+ log.Printf("WebRTC: Got ICE candidate: %s", candidate.String())
+ }
+ })
+ pc.OnICEGatheringStateChange(func(state webrtc.ICEGathererState) {
+ if state == webrtc.ICEGathererStateComplete {
+ log.Println("WebRTC: ICEGatheringStateComplete")
c.offerChannel <- pc.LocalDescription()
}
- }
+ })
// This callback is not expected, as the Client initiates the creation
// of the data channel, not the remote peer.
- pc.OnDataChannel = func(channel *webrtc.DataChannel) {
+ pc.OnDataChannel(func(channel *webrtc.DataChannel) {
log.Println("OnDataChannel")
panic("Unexpected OnDataChannel!")
- }
+ })
c.pc = pc
+ go func() {
+ offer, err := pc.CreateOffer(nil)
+ // TODO: Potentially timeout and retry if ICE isn't working.
+ if err != nil {
+ c.errorChannel <- err
+ return
+ }
+ log.Println("WebRTC: Created offer")
+ err = pc.SetLocalDescription(offer)
+ if err != nil {
+ c.errorChannel <- err
+ return
+ }
+ log.Println("WebRTC: Set local description")
+ }()
log.Println("WebRTC: PeerConnection created.")
return nil
}
@@ -204,7 +210,11 @@ func (c *WebRTCPeer) establishDataChannel() error {
if c.transport != nil {
panic("Unexpected datachannel already exists!")
}
- dc, err := c.pc.CreateDataChannel(c.id)
+ ordered := true
+ dataChannelOptions := &webrtc.DataChannelInit{
+ Ordered: &ordered,
+ }
+ dc, err := c.pc.CreateDataChannel(c.id, dataChannelOptions)
// Triggers "OnNegotiationNeeded" on the PeerConnection, which will prepare
// an SDP offer while other goroutines operating on this struct handle the
// signaling. Eventually fires "OnOpen".
@@ -212,7 +222,7 @@ func (c *WebRTCPeer) establishDataChannel() error {
log.Printf("CreateDataChannel ERROR: %s", err)
return err
}
- dc.OnOpen = func() {
+ dc.OnOpen(func() {
c.lock.Lock()
defer c.lock.Unlock()
log.Println("WebRTC: DataChannel.OnOpen")
@@ -227,8 +237,8 @@ func (c *WebRTCPeer) establishDataChannel() error {
}
// Then enable the datachannel.
c.transport = dc
- }
- dc.OnClose = func() {
+ })
+ dc.OnClose(func() {
c.lock.Lock()
// Future writes will go to the buffer until a new DataChannel is available.
if nil == c.transport {
@@ -241,29 +251,29 @@ func (c *WebRTCPeer) establishDataChannel() error {
// Disable the DataChannel as a write destination.
log.Println("WebRTC: DataChannel.OnClose [remotely]")
c.transport = nil
- c.pc.DeleteDataChannel(dc)
+ dc.Close()
// Unlock before Close'ing, since it calls cleanup and asks for the
// lock to check if the transport needs to be be deleted.
c.lock.Unlock()
c.Close()
- }
- dc.OnMessage = func(msg []byte) {
- if len(msg) <= 0 {
+ })
+ dc.OnMessage(func(msg webrtc.DataChannelMessage) {
+ if len(msg.Data) <= 0 {
log.Println("0 length message---")
}
- c.BytesLogger.AddInbound(len(msg))
- n, err := c.writePipe.Write(msg)
+ c.BytesLogger.AddInbound(len(msg.Data))
+ n, err := c.writePipe.Write(msg.Data)
if err != nil {
// TODO: Maybe shouldn't actually close.
log.Println("Error writing to SOCKS pipe")
c.writePipe.CloseWithError(err)
}
- if n != len(msg) {
+ if n != len(msg.Data) {
log.Println("Error: short write")
panic("short write")
}
c.lastReceive = time.Now()
- }
+ })
log.Println("WebRTC: DataChannel created.")
return nil
}
@@ -304,7 +314,7 @@ func (c *WebRTCPeer) exchangeSDP() error {
}
}
log.Printf("Received Answer.\n")
- err := c.pc.SetRemoteDescription(answer)
+ err := c.pc.SetRemoteDescription(*answer)
if nil != err {
log.Println("WebRTC: Unable to SetRemoteDescription:", err)
return err
@@ -342,13 +352,13 @@ func (c *WebRTCPeer) cleanup() {
if c.pc == nil {
panic("DataChannel w/o PeerConnection, not good.")
}
- c.pc.DeleteDataChannel(dataChannel.(*webrtc.DataChannel))
+ dataChannel.(*webrtc.DataChannel).Close()
} else {
c.lock.Unlock()
}
if nil != c.pc {
log.Printf("WebRTC: closing PeerConnection")
- err := c.pc.Destroy()
+ err := c.pc.Close()
if nil != err {
log.Printf("Error closing peerconnection...")
}
diff --git a/client/snowflake.go b/client/snowflake.go
index 9098de7..01c89d8 100644
--- a/client/snowflake.go
+++ b/client/snowflake.go
@@ -17,7 +17,7 @@ import (
"git.torproject.org/pluggable-transports/goptlib.git"
sf "git.torproject.org/pluggable-transports/snowflake.git/client/lib"
"git.torproject.org/pluggable-transports/snowflake.git/common/safelog"
- "github.com/keroserene/go-webrtc"
+ "github.com/pion/webrtc"
)
const (
@@ -65,6 +65,25 @@ func socksAcceptLoop(ln *pt.SocksListener, snowflakes sf.SnowflakeCollector) err
}
}
+//s is a comma-separated list of ICE server URLs
+func parseIceServers(s string) []webrtc.ICEServer {
+ var servers []webrtc.ICEServer
+ log.Println(s)
+ s = strings.TrimSpace(s)
+ if len(s) == 0 {
+ return nil
+ }
+ urls := strings.Split(s, ",")
+ log.Printf("Using ICE Servers:")
+ for _, url := range urls {
+ log.Printf("url: %s", url)
+ servers = append(servers, webrtc.ICEServer{
+ URLs: []string{url},
+ })
+ }
+ return servers
+}
+
func main() {
iceServersCommas := flag.String("ice", "", "comma-separated list of ICE servers")
brokerURL := flag.String("url", "", "URL of signaling broker")
@@ -75,7 +94,6 @@ func main() {
"capacity for number of multiplexed WebRTC peers")
flag.Parse()
- webrtc.SetLoggingVerbosity(1)
log.SetFlags(log.LstdFlags | log.LUTC)
// Don't write to stderr; versions of tor earlier than about
@@ -105,11 +123,7 @@ func main() {
log.Println("\n\n\n --- Starting Snowflake Client ---")
- var iceServers sf.IceServerList
- if len(strings.TrimSpace(*iceServersCommas)) > 0 {
- option := webrtc.OptionIceServer(*iceServersCommas)
- iceServers = append(iceServers, option)
- }
+ iceServers := parseIceServers(*iceServersCommas)
// Prepare to collect remote WebRTC peers.
snowflakes := sf.NewPeers(*max)
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits