[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [snowflake/master] ensure closing stale remotes from the client side



commit ac9d49b8727b953c12a76e3645fe71a9ec3aab75
Author: Serene H <git@xxxxxxxxxxxxxx>
Date:   Mon Aug 1 12:17:28 2016 -0700

    ensure closing stale remotes from the client side
---
 client/snowflake.go |  3 +-
 client/webrtc.go    | 81 ++++++++++++++++++++++++++++++++++-------------------
 2 files changed, 54 insertions(+), 30 deletions(-)

diff --git a/client/snowflake.go b/client/snowflake.go
index 8dfa390..75e999f 100644
--- a/client/snowflake.go
+++ b/client/snowflake.go
@@ -20,6 +20,7 @@ import (
 const (
 	ReconnectTimeout         = 10
 	DefaultSnowflakeCapacity = 1
+	SnowflakeTimeout         = 30
 )
 
 // When a connection handler starts, +1 is written to this channel; when it
@@ -81,7 +82,7 @@ func handler(socks SocksConnector, snowflakes SnowflakeCollector) error {
 		return errors.New("handler: Received invalid Snowflake")
 	}
 	defer socks.Close()
-	defer snowflake.Reset()
+	defer snowflake.Close()
 	log.Println("---- Handler: snowflake assigned ----")
 	err := socks.Grant(&net.TCPAddr{IP: net.IPv4zero, Port: 0})
 	if err != nil {
diff --git a/client/webrtc.go b/client/webrtc.go
index 1f7ac00..0492466 100644
--- a/client/webrtc.go
+++ b/client/webrtc.go
@@ -29,6 +29,7 @@ type WebRTCPeer struct {
 	errorChannel  chan error
 	recvPipe      *io.PipeReader
 	writePipe     *io.PipeWriter
+	lastReceive   time.Time
 	buffer        bytes.Buffer
 	reset         chan struct{}
 
@@ -37,6 +38,28 @@ type WebRTCPeer struct {
 	BytesLogger
 }
 
+// Construct a WebRTC PeerConnection.
+func NewWebRTCPeer(config *webrtc.Configuration,
+	broker *BrokerChannel) *WebRTCPeer {
+	connection := new(WebRTCPeer)
+	connection.id = "snowflake-" + uniuri.New()
+	connection.config = config
+	connection.broker = broker
+	connection.offerChannel = make(chan *webrtc.SessionDescription, 1)
+	connection.answerChannel = make(chan *webrtc.SessionDescription, 1)
+	// Error channel is mostly for reporting during the initial SDP offer
+	// creation & local description setting, which happens asynchronously.
+	connection.errorChannel = make(chan error, 1)
+	connection.reset = make(chan struct{}, 1)
+
+	// Override with something that's not NullLogger to have real logging.
+	connection.BytesLogger = &BytesNullLogger{}
+
+	// Pipes remain the same even when DataChannel gets switched.
+	connection.recvPipe, connection.writePipe = io.Pipe()
+	return connection
+}
+
 // Read bytes from local SOCKS.
 // As part of |io.ReadWriter|
 func (c *WebRTCPeer) Read(b []byte) (int, error) {
@@ -47,6 +70,7 @@ func (c *WebRTCPeer) Read(b []byte) (int, error) {
 // As part of |io.ReadWriter|
 func (c *WebRTCPeer) Write(b []byte) (int, error) {
 	c.BytesLogger.AddOutbound(len(b))
+	// TODO: Buffering could be improved / separated out of WebRTCPeer.
 	if nil == c.transport {
 		log.Printf("Buffered %d bytes --> WebRTC", len(b))
 		c.buffer.Write(b)
@@ -61,45 +85,42 @@ func (c *WebRTCPeer) Close() error {
 	if c.closed { // Skip if already closed.
 		return nil
 	}
-	log.Printf("WebRTC: Closing")
-	c.cleanup()
 	// Mark for deletion.
 	c.closed = true
+	c.cleanup()
+	c.Reset()
+	log.Printf("WebRTC: Closing")
 	return nil
 }
 
 // As part of |Resetter|
 func (c *WebRTCPeer) Reset() {
-	c.Close()
-	go func() {
-		c.reset <- struct{}{}
-		log.Println("WebRTC resetting...")
-	}()
+	if nil == c.reset {
+		return
+	}
+	c.reset <- struct{}{}
 }
 
 // As part of |Resetter|
 func (c *WebRTCPeer) WaitForReset() { <-c.reset }
 
-// Construct a WebRTC PeerConnection.
-func NewWebRTCPeer(config *webrtc.Configuration,
-	broker *BrokerChannel) *WebRTCPeer {
-	connection := new(WebRTCPeer)
-	connection.id = "snowflake-" + uniuri.New()
-	connection.config = config
-	connection.broker = broker
-	connection.offerChannel = make(chan *webrtc.SessionDescription, 1)
-	connection.answerChannel = make(chan *webrtc.SessionDescription, 1)
-	// Error channel is mostly for reporting during the initial SDP offer
-	// creation & local description setting, which happens asynchronously.
-	connection.errorChannel = make(chan error, 1)
-	connection.reset = make(chan struct{}, 1)
-
-	// Override with something that's not NullLogger to have real logging.
-	connection.BytesLogger = &BytesNullLogger{}
-
-	// Pipes remain the same even when DataChannel gets switched.
-	connection.recvPipe, connection.writePipe = io.Pipe()
-	return connection
+// Prevent long-lived broken remotes.
+// Should also update the DataChannel in underlying go-webrtc's to make Closes
+// more immediate / responsive.
+func (c *WebRTCPeer) checkForStaleness() {
+	c.lastReceive = time.Now()
+	for {
+		if c.closed {
+			return
+		}
+		if time.Since(c.lastReceive).Seconds() > SnowflakeTimeout {
+			log.Println("WebRTC: No messages received for", SnowflakeTimeout,
+				"seconds -- closing stale connection.")
+			c.Close()
+			return
+		}
+		<-time.After(time.Second)
+	}
 }
 
 // As part of |Connector| interface.
@@ -119,6 +140,7 @@ func (c *WebRTCPeer) Connect() error {
 	if err != nil {
 		return err
 	}
+	go c.checkForStaleness()
 	return nil
 }
 
@@ -208,7 +230,7 @@ func (c *WebRTCPeer) establishDataChannel() error {
 		// Disable the DataChannel as a write destination.
 		log.Println("WebRTC: DataChannel.OnClose [remotely]")
 		c.transport = nil
-		c.Reset()
+		c.Close()
 	}
 	dc.OnMessage = func(msg []byte) {
 		if len(msg) <= 0 {
@@ -225,6 +247,7 @@ func (c *WebRTCPeer) establishDataChannel() error {
 			log.Println("Error: short write")
 			panic("short write")
 		}
+		c.lastReceive = time.Now()
 	}
 	log.Println("WebRTC: DataChannel created.")
 	return nil
@@ -257,7 +280,7 @@ func (c *WebRTCPeer) exchangeSDP() error {
 		}
 	case err := <-c.errorChannel:
 		log.Println("Failed to prepare offer", err)
-		c.Reset()
+		c.Close()
 		return err
 	}
 	// Keep trying the same offer until a valid answer arrives.

_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits