[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [snowflake/main] Ensure turbotunnel read and write loop terminate



commit 7c9005bed3e353c4e108355abd1ed4b35099f2ea
Author: Cecylia Bocovich <cohosh@xxxxxxxxxxxxxx>
Date:   Wed May 12 09:32:07 2021 -0400

    Ensure turbotunnel read and write loop terminate
    
    Introduce a waitgroup and done channel to ensure that both the read and
    write gorouting for turbotunnel connections terminate when the
    connection is closed.
---
 common/turbotunnel/clientmap.go |  1 +
 server/lib/http.go              | 40 ++++++++++++++++++++++++++--------------
 2 files changed, 27 insertions(+), 14 deletions(-)

diff --git a/common/turbotunnel/clientmap.go b/common/turbotunnel/clientmap.go
index fa12915..53d0302 100644
--- a/common/turbotunnel/clientmap.go
+++ b/common/turbotunnel/clientmap.go
@@ -140,5 +140,6 @@ func (inner *clientMapInner) Pop() interface{} {
 	inner.byAge = inner.byAge[:n-1]
 	// Remove from byAddr map.
 	delete(inner.byAddr, record.Addr)
+	close(record.SendQueue)
 	return record
 }
diff --git a/server/lib/http.go b/server/lib/http.go
index b1c453c..c612422 100644
--- a/server/lib/http.go
+++ b/server/lib/http.go
@@ -8,6 +8,7 @@ import (
 	"log"
 	"net"
 	"net/http"
+	"sync"
 	"time"
 
 	"git.torproject.org/pluggable-transports/snowflake.git/common/encapsulation"
@@ -139,18 +140,21 @@ func turbotunnelMode(conn net.Conn, addr net.Addr, pconn *turbotunnel.QueuePacke
 	// credited for the entire KCP session.
 	clientIDAddrMap.Set(clientID, addr.String())
 
-	errCh := make(chan error)
+	var wg sync.WaitGroup
+	wg.Add(2)
+	done := make(chan struct{})
 
 	// The remainder of the WebSocket stream consists of encapsulated
 	// packets. We read them one by one and feed them into the
 	// QueuePacketConn on which kcp.ServeConn was set up, which eventually
 	// leads to KCP-level sessions in the acceptSessions function.
 	go func() {
+		defer wg.Done()
+		defer close(done) // Signal the write loop to finish
 		for {
 			p, err := encapsulation.ReadData(conn)
 			if err != nil {
-				errCh <- err
-				break
+				return
 			}
 			pconn.QueueIncoming(p, clientID)
 		}
@@ -159,24 +163,32 @@ func turbotunnelMode(conn net.Conn, addr net.Addr, pconn *turbotunnel.QueuePacke
 	// At the same time, grab packets addressed to this ClientID and
 	// encapsulate them into the downstream.
 	go func() {
+		defer wg.Done()
+		defer conn.Close() // Signal the read loop to finish
+
 		// Buffer encapsulation.WriteData operations to keep length
 		// prefixes in the same send as the data that follows.
 		bw := bufio.NewWriter(conn)
-		for p := range pconn.OutgoingQueue(clientID) {
-			_, err := encapsulation.WriteData(bw, p)
-			if err == nil {
-				err = bw.Flush()
-			}
-			if err != nil {
-				errCh <- err
-				break
+		for {
+			select {
+			case <-done:
+				return
+			case p, ok := <-pconn.OutgoingQueue(clientID):
+				if !ok {
+					return
+				}
+				_, err := encapsulation.WriteData(bw, p)
+				if err == nil {
+					err = bw.Flush()
+				}
+				if err != nil {
+					return
+				}
 			}
 		}
 	}()
 
-	// Wait until one of the above loops terminates. The closing of the
-	// WebSocket connection will terminate the other one.
-	<-errCh
+	wg.Wait()
 
 	return nil
 }

_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits