[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [snowflake/master] Add synchronization around destroying DataChannels and PeerConnections



commit 1114acbcb4acb82174b293983ced8afcaf9e2a93
Author: Arlo Breault <arlolra@xxxxxxxxx>
Date:   Wed Mar 14 13:35:39 2018 -0400

    Add synchronization around destroying DataChannels and PeerConnections
    
    From https://trac.torproject.org/projects/tor/ticket/21312#comment:33
---
 client/webrtc.go           | 36 ++++++++++++++++++++++++++++--------
 proxy-go/snowflake.go      | 17 +++++++++++++----
 server-webrtc/snowflake.go | 17 +++++++++++++----
 3 files changed, 54 insertions(+), 16 deletions(-)

diff --git a/client/webrtc.go b/client/webrtc.go
index e35c47d..8c7cb4c 100644
--- a/client/webrtc.go
+++ b/client/webrtc.go
@@ -5,6 +5,7 @@ import (
 	"errors"
 	"io"
 	"log"
+	"sync"
 	"time"
 
 	"github.com/dchest/uniuri"
@@ -35,6 +36,9 @@ type WebRTCPeer struct {
 
 	closed bool
 
+	lock sync.Mutex // Synchronization for DataChannel destruction
+	once sync.Once  // Synchronization for PeerConnection destruction
+
 	BytesLogger
 }
 
@@ -69,6 +73,8 @@ func (c *WebRTCPeer) Read(b []byte) (int, error) {
 // Writes bytes out to remote WebRTC.
 // As part of |io.ReadWriter|
 func (c *WebRTCPeer) Write(b []byte) (int, error) {
+	c.lock.Lock()
+	defer c.lock.Unlock()
 	c.BytesLogger.AddOutbound(len(b))
 	// TODO: Buffering could be improved / separated out of WebRTCPeer.
 	if nil == c.transport {
@@ -82,14 +88,12 @@ func (c *WebRTCPeer) Write(b []byte) (int, error) {
 
 // As part of |Snowflake|
 func (c *WebRTCPeer) Close() error {
-	if c.closed { // Skip if already closed.
-		return nil
-	}
-	// Mark for deletion.
-	c.closed = true
-	c.cleanup()
-	c.Reset()
-	log.Printf("WebRTC: Closing")
+	c.once.Do(func() {
+		c.closed = true
+		c.cleanup()
+		c.Reset()
+		log.Printf("WebRTC: Closing")
+	})
 	return nil
 }
 
@@ -194,6 +198,8 @@ func (c *WebRTCPeer) preparePeerConnection() error {
 
 // Create a WebRTC DataChannel locally.
 func (c *WebRTCPeer) establishDataChannel() error {
+	c.lock.Lock()
+	defer c.lock.Unlock()
 	if c.transport != nil {
 		panic("Unexpected datachannel already exists!")
 	}
@@ -206,6 +212,8 @@ func (c *WebRTCPeer) establishDataChannel() error {
 		return err
 	}
 	dc.OnOpen = func() {
+		c.lock.Lock()
+		defer c.lock.Unlock()
 		log.Println("WebRTC: DataChannel.OnOpen")
 		if nil != c.transport {
 			panic("WebRTC: transport already exists.")
@@ -220,10 +228,12 @@ func (c *WebRTCPeer) establishDataChannel() error {
 		c.transport = dc
 	}
 	dc.OnClose = func() {
+		c.lock.Lock()
 		// Future writes will go to the buffer until a new DataChannel is available.
 		if nil == c.transport {
 			// Closed locally, as part of a reset.
 			log.Println("WebRTC: DataChannel.OnClose [locally]")
+			c.lock.Unlock()
 			return
 		}
 		// Closed remotely, need to reset everything.
@@ -231,6 +241,9 @@ func (c *WebRTCPeer) establishDataChannel() error {
 		log.Println("WebRTC: DataChannel.OnClose [remotely]")
 		c.transport = nil
 		c.pc.DeleteDataChannel(dc)
+		// Unlock before Close'ing, since it calls cleanup and asks for the
+		// lock to check if the transport needs to be be deleted.
+		c.lock.Unlock()
 		c.Close()
 	}
 	dc.OnMessage = func(msg []byte) {
@@ -321,16 +334,23 @@ func (c *WebRTCPeer) cleanup() {
 		c.writePipe.Close()
 		c.writePipe = nil
 	}
+	c.lock.Lock()
 	if nil != c.transport {
 		log.Printf("WebRTC: closing DataChannel")
 		dataChannel := c.transport
 		// Setting transport to nil *before* dc Close indicates to OnClose that
 		// this was locally triggered.
 		c.transport = nil
+		// Release the lock before calling DeleteDataChannel (which in turn
+		// calls Close on the dataChannel), but after nil'ing out the transport,
+		// since otherwise we'll end up in the onClose handler in a deadlock.
+		c.lock.Unlock()
 		if c.pc == nil {
 			panic("DataChannel w/o PeerConnection, not good.")
 		}
 		c.pc.DeleteDataChannel(dataChannel.(*webrtc.DataChannel))
+	} else {
+		c.lock.Unlock()
 	}
 	if nil != c.pc {
 		log.Printf("WebRTC: closing PeerConnection")
diff --git a/proxy-go/snowflake.go b/proxy-go/snowflake.go
index a12cfb2..5a5925e 100644
--- a/proxy-go/snowflake.go
+++ b/proxy-go/snowflake.go
@@ -62,6 +62,9 @@ type webRTCConn struct {
 	dc *webrtc.DataChannel
 	pc *webrtc.PeerConnection
 	pr *io.PipeReader
+
+	lock sync.Mutex // Synchronization for DataChannel destruction
+	once sync.Once  // Synchronization for PeerConnection destruction
 }
 
 func (c *webRTCConn) Read(b []byte) (int, error) {
@@ -69,6 +72,8 @@ func (c *webRTCConn) Read(b []byte) (int, error) {
 }
 
 func (c *webRTCConn) Write(b []byte) (int, error) {
+	c.lock.Lock()
+	defer c.lock.Unlock()
 	// log.Printf("webrtc Write %d %+q", len(b), string(b))
 	log.Printf("Write %d bytes --> WebRTC", len(b))
 	if c.dc != nil {
@@ -77,8 +82,11 @@ func (c *webRTCConn) Write(b []byte) (int, error) {
 	return len(b), nil
 }
 
-func (c *webRTCConn) Close() error {
-	return c.pc.Destroy()
+func (c *webRTCConn) Close() (err error) {
+	c.once.Do(func() {
+		err = c.pc.Destroy()
+	})
+	return
 }
 
 func (c *webRTCConn) LocalAddr() net.Addr {
@@ -255,17 +263,18 @@ func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription, config *webrtc.
 		log.Println("OnDataChannel")
 
 		pr, pw := io.Pipe()
-
 		conn := &webRTCConn{pc: pc, dc: dc, pr: pr}
 
 		dc.OnOpen = func() {
 			log.Println("OnOpen channel")
 		}
 		dc.OnClose = func() {
+			conn.lock.Lock()
+			defer conn.lock.Unlock()
 			log.Println("OnClose channel")
-			pw.Close()
 			conn.dc = nil
 			pc.DeleteDataChannel(dc)
+			pw.Close()
 		}
 		dc.OnMessage = func(msg []byte) {
 			log.Printf("OnMessage <--- %d bytes", len(msg))
diff --git a/server-webrtc/snowflake.go b/server-webrtc/snowflake.go
index 82b6afe..0484d94 100644
--- a/server-webrtc/snowflake.go
+++ b/server-webrtc/snowflake.go
@@ -43,6 +43,9 @@ type webRTCConn struct {
 	dc *webrtc.DataChannel
 	pc *webrtc.PeerConnection
 	pr *io.PipeReader
+
+	lock sync.Mutex // Synchronization for DataChannel destruction
+	once sync.Once  // Synchronization for PeerConnection destruction
 }
 
 func (c *webRTCConn) Read(b []byte) (int, error) {
@@ -50,6 +53,8 @@ func (c *webRTCConn) Read(b []byte) (int, error) {
 }
 
 func (c *webRTCConn) Write(b []byte) (int, error) {
+	c.lock.Lock()
+	defer c.lock.Unlock()
 	// log.Printf("webrtc Write %d %+q", len(b), string(b))
 	log.Printf("Write %d bytes --> WebRTC", len(b))
 	if c.dc != nil {
@@ -58,8 +63,11 @@ func (c *webRTCConn) Write(b []byte) (int, error) {
 	return len(b), nil
 }
 
-func (c *webRTCConn) Close() error {
-	return c.pc.Destroy()
+func (c *webRTCConn) Close() (err error) {
+	c.once.Do(func() {
+		err = c.pc.Destroy()
+	})
+	return
 }
 
 func (c *webRTCConn) LocalAddr() net.Addr {
@@ -122,17 +130,18 @@ func makePeerConnectionFromOffer(sdp *webrtc.SessionDescription, config *webrtc.
 		log.Println("OnDataChannel")
 
 		pr, pw := io.Pipe()
-
 		conn := &webRTCConn{pc: pc, dc: dc, pr: pr}
 
 		dc.OnOpen = func() {
 			log.Println("OnOpen channel")
 		}
 		dc.OnClose = func() {
+			conn.lock.Lock()
+			defer conn.lock.Unlock()
 			log.Println("OnClose channel")
-			pw.Close()
 			conn.dc = nil
 			pc.DeleteDataChannel(dc)
+			pw.Close()
 		}
 		dc.OnMessage = func(msg []byte) {
 			log.Printf("OnMessage <--- %d bytes", len(msg))

_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits