[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [snowflake/master] Use a channel to safely synchronize datachannel writes, (#12)



commit c3ada1b54521927809b2ae67b45684dd412dc612
Author: Serene Han <keroserene+git@xxxxxxxxx>
Date:   Fri Feb 19 16:17:17 2016 -0800

    Use a channel to safely synchronize datachannel writes, (#12)
    clean up ice candidate log message.
    still need to debug the copy loop break.
---
 client/client_test.go | 11 +++++++++--
 client/snowflake.go   | 54 ++++++++++++++++++++++++++++++++++++---------------
 2 files changed, 47 insertions(+), 18 deletions(-)

diff --git a/client/client_test.go b/client/client_test.go
index 6ee36d9..85c2144 100644
--- a/client/client_test.go
+++ b/client/client_test.go
@@ -9,10 +9,12 @@ import (
 
 type MockDataChannel struct {
 	destination bytes.Buffer
+	done        chan bool
 }
 
 func (m *MockDataChannel) Send(data []byte) {
 	m.destination.Write(data)
+	m.done <- true
 }
 
 func (*MockDataChannel) Close() error {
@@ -24,6 +26,7 @@ func TestConnect(t *testing.T) {
 
 		Convey("WebRTC Connection", func() {
 			c := new(webRTCConn)
+
 			c.BytesInfo = &BytesInfo{
 				inboundChan: make(chan int), outboundChan: make(chan int),
 				inbound: 0, outbound: 0, inEvents: 0, outEvents: 0,
@@ -31,15 +34,19 @@ func TestConnect(t *testing.T) {
 			So(c.buffer.Bytes(), ShouldEqual, nil)
 
 			Convey("SendData buffers when datachannel is nil", func() {
-				c.sendData([]byte("test"))
+				c.SendData([]byte("test"))
 				c.snowflake = nil
 				So(c.buffer.Bytes(), ShouldResemble, []byte("test"))
 			})
 
 			Convey("SendData sends to datachannel when not nil", func() {
 				mock := new(MockDataChannel)
+				mock.done = make(chan bool)
+				go c.SendLoop()
+				c.writeChannel = make(chan []byte)
 				c.snowflake = mock
-				c.sendData([]byte("test"))
+				c.SendData([]byte("test"))
+				<-mock.done
 				So(c.buffer.Bytes(), ShouldEqual, nil)
 				So(mock.destination.Bytes(), ShouldResemble, []byte("test"))
 			})
diff --git a/client/snowflake.go b/client/snowflake.go
index eba68eb..d01ec5e 100644
--- a/client/snowflake.go
+++ b/client/snowflake.go
@@ -36,12 +36,15 @@ const (
 func copyLoop(a, b net.Conn) {
 	var wg sync.WaitGroup
 	wg.Add(2)
+	// TODO fix the copy loop.
 	go func() {
 		io.Copy(b, a)
+		log.Println("copy loop b-a break")
 		wg.Done()
 	}()
 	go func() {
 		io.Copy(a, b)
+		log.Println("copy loop a-b break")
 		wg.Done()
 	}()
 	wg.Wait()
@@ -63,6 +66,7 @@ type webRTCConn struct {
 	offerChannel  chan *webrtc.SessionDescription
 	answerChannel chan *webrtc.SessionDescription
 	errorChannel  chan error
+	writeChannel  chan []byte
 	recvPipe      *io.PipeReader
 	writePipe     *io.PipeWriter
 	buffer        bytes.Buffer
@@ -77,7 +81,7 @@ func (c *webRTCConn) Read(b []byte) (int, error) {
 }
 
 func (c *webRTCConn) Write(b []byte) (int, error) {
-	c.sendData(b)
+	c.SendData(b)
 	return len(b), nil
 }
 
@@ -133,9 +137,9 @@ func (c *webRTCConn) PreparePeerConnection() {
 			}
 		}()
 	}
+	// Allow candidates to accumulate until OnIceComplete.
 	pc.OnIceCandidate = func(candidate webrtc.IceCandidate) {
-		log.Printf("WebRTC: OnIceCandidate %s", candidate.Serialize())
-		// Allow candidates to accumulate until OnIceComplete.
+		log.Printf(candidate.Candidate)
 	}
 	// TODO: This may soon be deprecated, consider OnIceGatheringStateChange.
 	pc.OnIceComplete = func() {
@@ -169,10 +173,11 @@ func (c *webRTCConn) EstablishDataChannel() error {
 		// }
 		// Flush the buffer, then enable datachannel.
 		// TODO: Make this more safe
-		dc.Send(c.buffer.Bytes())
-		log.Println("Flushed", c.buffer.Len(), "bytes")
-		c.buffer.Reset()
+		// dc.Send(c.buffer.Bytes())
+		// log.Println("Flushed", c.buffer.Len(), "bytes")
+		// c.buffer.Reset()
 		c.snowflake = dc
+		c.SendData(nil)
 	}
 	dc.OnClose = func() {
 		// Disable the DataChannel as a write destination.
@@ -180,7 +185,7 @@ func (c *webRTCConn) EstablishDataChannel() error {
 		log.Println("WebRTC: DataChannel.OnClose")
 		if nil != c.snowflake {
 			c.snowflake = nil
-			// Only reset if this OnClose triggered
+			// Only reset if this OnClose was triggered remotely.
 			c.Reset()
 		}
 	}
@@ -247,21 +252,32 @@ func (c *webRTCConn) ReceiveAnswer() {
 	}()
 }
 
-func (c *webRTCConn) sendData(data []byte) {
-	c.BytesInfo.AddOutbound(len(data))
+func (c *webRTCConn) SendData(data []byte) {
 	// Buffer the data in case datachannel isn't available yet.
 	if nil == c.snowflake {
 		log.Printf("Buffered %d bytes --> WebRTC", len(data))
 		c.buffer.Write(data)
 		return
 	}
-	// Otherwise, flush buffer if necessary.
-	for c.buffer.Len() > 0 {
-		c.snowflake.Send(c.buffer.Bytes())
-		log.Println("Flushed", c.buffer.Len(), "bytes")
-		c.buffer.Reset()
+	go func() {
+		c.writeChannel <- data
+	}()
+}
+
+// Expected in own goroutine.
+func (c *webRTCConn) SendLoop() {
+	log.Println("send loop")
+	for data := range c.writeChannel {
+		// Flush buffer if necessary.
+		for c.buffer.Len() > 0 {
+			c.snowflake.Send(c.buffer.Bytes())
+			log.Println("Flushed", c.buffer.Len(), "bytes")
+			c.buffer.Reset()
+		}
+
+		c.BytesInfo.AddOutbound(len(data))
+		c.snowflake.Send(data)
 	}
-	c.snowflake.Send(data)
 }
 
 // WebRTC re-establishment loop. Expected in own goroutine.
@@ -296,6 +312,7 @@ func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
 	connection.broker = broker
 	connection.offerChannel = make(chan *webrtc.SessionDescription)
 	connection.answerChannel = make(chan *webrtc.SessionDescription)
+	connection.writeChannel = make(chan []byte)
 	connection.errorChannel = make(chan error)
 	connection.reset = make(chan struct{})
 	connection.BytesInfo = &BytesInfo{
@@ -308,6 +325,7 @@ func dialWebRTC(config *webrtc.Configuration, broker *BrokerChannel) (
 	connection.recvPipe, connection.writePipe = io.Pipe()
 
 	go connection.ConnectLoop()
+	go connection.SendLoop()
 	return connection, nil
 }
 
@@ -317,8 +335,10 @@ func endWebRTC() {
 		return
 	}
 	if nil != webrtcRemote.snowflake {
+		s := webrtcRemote.snowflake
+		webrtcRemote.snowflake = nil
 		log.Printf("WebRTC: closing DataChannel")
-		webrtcRemote.snowflake.Close()
+		s.Close()
 	}
 	if nil != webrtcRemote.pc {
 		log.Printf("WebRTC: closing PeerConnection")
@@ -333,6 +353,7 @@ func handler(conn *pt.SocksConn) error {
 		handlerChan <- -1
 	}()
 	defer conn.Close()
+	log.Println("handler", conn)
 
 	// TODO: [#3] Fetch ICE server information from Broker.
 	// TODO: [#18] Consider TURN servers here too.
@@ -357,6 +378,7 @@ func handler(conn *pt.SocksConn) error {
 	}
 
 	copyLoop(conn, remote)
+	log.Println("----END---")
 	return nil
 }
 



_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits