[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [snowflake/master] Use a channel to safely synchronize datachannel writes, (#12)
commit c3ada1b54521927809b2ae67b45684dd412dc612
Author: Serene Han <keroserene+git@xxxxxxxxx>
Date: Fri Feb 19 16:17:17 2016 -0800
Use a channel to safely synchronize datachannel writes, (#12)
clean up ice candidate log message.
still need to debug the copy loop break.
---
client/client_test.go | 11 +++++++++--
client/snowflake.go | 54 ++++++++++++++++++++++++++++++++++++---------------
2 files changed, 47 insertions(+), 18 deletions(-)
diff --git a/client/client_test.go b/client/client_test.go
index 6ee36d9..85c2144 100644
--- a/client/client_test.go
+++ b/client/client_test.go
@@ -9,10 +9,12 @@ import (
type MockDataChannel struct {
destination bytes.Buffer
+ done chan bool
}
func (m *MockDataChannel) Send(data []byte) {
m.destination.Write(data)
+ m.done <- true
}
func (*MockDataChannel) Close() error {
@@ -24,6 +26,7 @@ func TestConnect(t *testing.T) {
Convey("WebRTC Connection", func() {
c := new(webRTCConn)
+
c.BytesInfo = &BytesInfo{
inboundChan: make(chan int), outboundChan: make(chan int),
inbound: 0, outbound: 0, inEvents: 0, outEvents: 0,
@@ -31,15 +34,19 @@ func TestConnect(t *testing.T) {
So(c.buffer.Bytes(), ShouldEqual, nil)
Convey("SendData buffers when datachannel is nil", func() {
- c.sendData([]byte("test"))
+ c.SendData([]byte("test"))
c.snowflake = nil
So(c.buffer.Bytes(), ShouldResemble, []byte("test"))
})
Convey("SendData sends to datachannel when not nil", func() {
mock := new(MockDataChannel)
+ mock.done = make(chan bool)
+ go c.SendLoop()
+ c.writeChannel = make(chan []byte)
c.snowflake = mock
- c.sendData([]byte("test"))
+ c.SendData([]byte("test"))
+ <-mock.done
So(c.buffer.Bytes(), ShouldEqual, nil)
So(mock.destination.Bytes(), ShouldResemble, []byte("test"))
})
diff --git a/client/snowflake.go b/client/snowflake.go
index eba68eb..d01ec5e 100644
--- a/client/snowflake.go
+++ b/client/snowflake.go
@@ -36,12 +36,15 @@ const (
func copyLoop(a, b net.Conn) {
var wg sync.WaitGroup
wg.Add(2)
+ // TODO fix the copy loop.
go func() {
io.Copy(b, a)
+ log.Println("copy loop b-a break")
wg.Done()
}()
go func() {
io.Copy(a, b)
+ log.Println("copy loop a-b break")
wg.Done()
}()
wg.Wait()
@@ -63,6 +66,7 @@ type webRTCConn struct {
offerChannel chan *webrtc.SessionDescription
answerChannel chan *webrtc.SessionDescription
errorChannel chan error
+ writeChannel chan []byte
recvPipe *io.PipeReader
writePipe *io.PipeWriter
buffer bytes.Buffer
@@ -77,7 +81,7 @@ func (c *webRTCConn) Read(b []byte) (int, error) {
}
func (c *webRTCConn) Write(b []byte) (int, error) {
- c.sendData(b)
+ c.SendData(b)
return len(b), nil
}
@@ -133,9 +137,9 @@ func (c *webRTCConn) PreparePeerConnection() {
}
}()
}
+ // Allow candidates to accumulate until OnIceComplete.
pc.OnIceCandidate = func(candidate webrtc.IceCandidate) {
- log.Printf("WebRTC: OnIceCandidate %s", candidate.Serialize())
- // Allow candidates to accumulate until OnIceComplete.
+ log.Printf(candidate.Candidate)
}
// TODO: This may soon be deprecated, consider OnIceGatheringStateChange.
pc.OnIceComplete = func() {
@@ -169,10 +173,11 @@ func (c *webRTCConn) EstablishDataChannel() error {
// }
// Flush the buffer, then enable datachannel.
// TODO: Make this more safe
- dc.Send(c.buffer.Bytes())
- log.Println("Flushed", c.buffer.Len(), "bytes")
- c.buffer.Reset()
+ // dc.Send(c.buffer.Bytes())
+ // log.Println("Flushed", c.buffer.Len(), "bytes")
+ // c.buffer.Reset()
c.snowflake = dc
+ c.SendData(nil)
}
dc.OnClose = func() {
// Disable the DataChannel as a write destination.
@@ -180,7 +185,7 @@ func (c *webRTCConn) EstablishDataChannel() error {
log.Println("WebRTC: DataChannel.OnClose")
if nil != c.snowflake {
c.snowflake = nil
- // Only reset if this OnClose triggered
+ // Only reset if this OnClose was triggered remotely.
c.Reset()
}
}
@@ -247,21 +252,32 @@ func (c *webRTCConn) ReceiveAnswer() {
}()
}
-func (c *webRTCConn) sendData(data []byte) {
- c.BytesInfo.AddOutbound(len(data))
+func (c *webRTCConn) SendData(data []byte) {
// Buffer the data in case datachannel isn't available yet.
if nil == c.snowflake {
log.Printf("Buffered %d bytes --> WebRTC", len(data))
c.buffer.Write(data)
return
}
- // Otherwise, flush buffer if necessary.
- for c.buffer.Len() > 0 {
- c.snowflake.Send(c.buffer.Bytes())
- log.Println("Flushed", c.buffer.Len(), "bytes")
- c.buffer.Reset()
+ go func() {
+ c.writeChannel <- data
+ }()
+}
+
+// Expected in own goroutine.
+func (c *webRTCConn) SendLoop() {
+ log.Println("send loop")
+ for data := range c.writeChannel {
+ // Flush buffer if necessary.
+ for c.buffer.Len() > 0 {
+ c.snowflake.Send(c.buffer.Bytes())
+ log.Println("Flushed", c.buffer.Len(), "bytes")
+ c.buffer.Reset()
+ }
+
+ c.BytesInfo.AddOutbound(len(data))
+ c.snowflake.Send(data)
}
- c.snowflake.Send(data)
}
// WebRTC re-establishment loop. Expected in own goroutine.
@@ -296,6 +312,7 @@ func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
connection.broker = broker
connection.offerChannel = make(chan *webrtc.SessionDescription)
connection.answerChannel = make(chan *webrtc.SessionDescription)
+ connection.writeChannel = make(chan []byte)
connection.errorChannel = make(chan error)
connection.reset = make(chan struct{})
connection.BytesInfo = &BytesInfo{
@@ -308,6 +325,7 @@ func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
connection.recvPipe, connection.writePipe = io.Pipe()
go connection.ConnectLoop()
+ go connection.SendLoop()
return connection, nil
}
@@ -317,8 +335,10 @@ func endWebRTC() {
return
}
if nil != webrtcRemote.snowflake {
+ s := webrtcRemote.snowflake
+ webrtcRemote.snowflake = nil
log.Printf("WebRTC: closing DataChannel")
- webrtcRemote.snowflake.Close()
+ s.Close()
}
if nil != webrtcRemote.pc {
log.Printf("WebRTC: closing PeerConnection")
@@ -333,6 +353,7 @@ func handler(conn *pt.SocksConn) error {
handlerChan <- -1
}()
defer conn.Close()
+ log.Println("handler", conn)
// TODO: [#3] Fetch ICE server information from Broker.
// TODO: [#18] Consider TURN servers here too.
@@ -357,6 +378,7 @@ func handler(conn *pt.SocksConn) error {
}
copyLoop(conn, remote)
+ log.Println("----END---")
return nil
}
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits