[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [snowflake/master] Rewrite websocketconn with synchronous pipes.



commit 01e28aa4604fea7a2af0259c8b18be1bd5f9b3d7
Author: David Fifield <david@xxxxxxxxxxxxxxx>
Date:   Mon Feb 3 12:31:00 2020 -0700

    Rewrite websocketconn with synchronous pipes.
    
    Makes the following changes:
     * permits concurrent Read/Write/Close
     * converts certain CloseErrors into io.EOF
    
    https://bugs.torproject.org/33144
---
 common/websocketconn/websocketconn.go | 122 +++++++++++++++++++++++-----------
 1 file changed, 82 insertions(+), 40 deletions(-)

diff --git a/common/websocketconn/websocketconn.go b/common/websocketconn/websocketconn.go
index b87e657..fa2b0da 100644
--- a/common/websocketconn/websocketconn.go
+++ b/common/websocketconn/websocketconn.go
@@ -10,61 +10,103 @@ import (
 // An abstraction that makes an underlying WebSocket connection look like an
 // io.ReadWriteCloser.
 type Conn struct {
-	Ws *websocket.Conn
-	r  io.Reader
+	ws     *websocket.Conn
+	Reader io.Reader
+	Writer io.Writer
 }
 
 // Implements io.Reader.
 func (conn *Conn) Read(b []byte) (n int, err error) {
-	var opCode int
-	if conn.r == nil {
-		// New message
-		var r io.Reader
-		for {
-			if opCode, r, err = conn.Ws.NextReader(); err != nil {
-				return
-			}
-			if opCode != websocket.BinaryMessage && opCode != websocket.TextMessage {
-				continue
-			}
-
-			conn.r = r
-			break
-		}
-	}
-
-	n, err = conn.r.Read(b)
-	if err == io.EOF {
-		// Message finished
-		conn.r = nil
-		err = nil
-	}
-	return
+	return conn.Reader.Read(b)
 }
 
 // Implements io.Writer.
 func (conn *Conn) Write(b []byte) (n int, err error) {
-	var w io.WriteCloser
-	if w, err = conn.Ws.NextWriter(websocket.BinaryMessage); err != nil {
-		return
-	}
-	if n, err = w.Write(b); err != nil {
-		return
-	}
-	err = w.Close()
-	return
+	return conn.Writer.Write(b)
 }
 
 // Implements io.Closer.
 func (conn *Conn) Close() error {
 	// Ignore any error in trying to write a Close frame.
-	_ = conn.Ws.WriteControl(websocket.CloseMessage, []byte{}, time.Now().Add(time.Second))
-	return conn.Ws.Close()
+	_ = conn.ws.WriteControl(websocket.CloseMessage, []byte{}, time.Now().Add(time.Second))
+	return conn.ws.Close()
+}
+
+func readLoop(w io.Writer, ws *websocket.Conn) error {
+	for {
+		messageType, r, err := ws.NextReader()
+		if err != nil {
+			return err
+		}
+		if messageType != websocket.BinaryMessage && messageType != websocket.TextMessage {
+			continue
+		}
+		_, err = io.Copy(w, r)
+		if err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
+func writeLoop(ws *websocket.Conn, r io.Reader) error {
+	for {
+		var buf [2048]byte
+		n, err := r.Read(buf[:])
+		if err != nil {
+			return err
+		}
+		data := buf[:n]
+		w, err := ws.NextWriter(websocket.BinaryMessage)
+		if err != nil {
+			return err
+		}
+		n, err = w.Write(data)
+		if err != nil {
+			return err
+		}
+		err = w.Close()
+		if err != nil {
+			return err
+		}
+	}
+}
+
+// websocket.Conn methods start returning websocket.CloseError after the
+// connection has been closed. We want to instead interpret that as io.EOF, just
+// as you would find with a normal net.Conn. This only converts
+// websocket.CloseErrors with known codes; other codes like CloseProtocolError
+// and CloseAbnormalClosure will still be reported as anomalous.
+func closeErrorToEOF(err error) error {
+	if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) {
+		err = io.EOF
+	}
+	return err
 }
 
 // Create a new Conn.
 func New(ws *websocket.Conn) *Conn {
-	var conn Conn
-	conn.Ws = ws
-	return &conn
+	// Set up synchronous pipes to serialize reads and writes to the
+	// underlying websocket.Conn.
+	//
+	// https://godoc.org/github.com/gorilla/websocket#hdr-Concurrency
+	// "Connections support one concurrent reader and one concurrent writer.
+	// Applications are responsible for ensuring that no more than one
+	// goroutine calls the write methods (NextWriter, etc.) concurrently and
+	// that no more than one goroutine calls the read methods (NextReader,
+	// etc.) concurrently. The Close and WriteControl methods can be called
+	// concurrently with all other methods."
+	pr1, pw1 := io.Pipe()
+	go func() {
+		pw1.CloseWithError(closeErrorToEOF(readLoop(pw1, ws)))
+	}()
+	pr2, pw2 := io.Pipe()
+	go func() {
+		pr2.CloseWithError(closeErrorToEOF(writeLoop(ws, pr2)))
+	}()
+	return &Conn{
+		ws:     ws,
+		Reader: pr1,
+		Writer: pw2,
+	}
 }



_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits