[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [snowflake/master] Rewrite websocketconn with synchronous pipes.
commit 01e28aa4604fea7a2af0259c8b18be1bd5f9b3d7
Author: David Fifield <david@xxxxxxxxxxxxxxx>
Date: Mon Feb 3 12:31:00 2020 -0700
Rewrite websocketconn with synchronous pipes.
Makes the following changes:
* permits concurrent Read/Write/Close
* converts certain CloseErrors into io.EOF
https://bugs.torproject.org/33144
---
common/websocketconn/websocketconn.go | 122 +++++++++++++++++++++++-----------
1 file changed, 82 insertions(+), 40 deletions(-)
diff --git a/common/websocketconn/websocketconn.go b/common/websocketconn/websocketconn.go
index b87e657..fa2b0da 100644
--- a/common/websocketconn/websocketconn.go
+++ b/common/websocketconn/websocketconn.go
@@ -10,61 +10,103 @@ import (
// An abstraction that makes an underlying WebSocket connection look like an
// io.ReadWriteCloser.
type Conn struct {
- Ws *websocket.Conn
- r io.Reader
+ ws *websocket.Conn
+ Reader io.Reader
+ Writer io.Writer
}
// Implements io.Reader.
func (conn *Conn) Read(b []byte) (n int, err error) {
- var opCode int
- if conn.r == nil {
- // New message
- var r io.Reader
- for {
- if opCode, r, err = conn.Ws.NextReader(); err != nil {
- return
- }
- if opCode != websocket.BinaryMessage && opCode != websocket.TextMessage {
- continue
- }
-
- conn.r = r
- break
- }
- }
-
- n, err = conn.r.Read(b)
- if err == io.EOF {
- // Message finished
- conn.r = nil
- err = nil
- }
- return
+ return conn.Reader.Read(b)
}
// Implements io.Writer.
func (conn *Conn) Write(b []byte) (n int, err error) {
- var w io.WriteCloser
- if w, err = conn.Ws.NextWriter(websocket.BinaryMessage); err != nil {
- return
- }
- if n, err = w.Write(b); err != nil {
- return
- }
- err = w.Close()
- return
+ return conn.Writer.Write(b)
}
// Implements io.Closer.
func (conn *Conn) Close() error {
// Ignore any error in trying to write a Close frame.
- _ = conn.Ws.WriteControl(websocket.CloseMessage, []byte{}, time.Now().Add(time.Second))
- return conn.Ws.Close()
+ _ = conn.ws.WriteControl(websocket.CloseMessage, []byte{}, time.Now().Add(time.Second))
+ return conn.ws.Close()
+}
+
+func readLoop(w io.Writer, ws *websocket.Conn) error {
+ for {
+ messageType, r, err := ws.NextReader()
+ if err != nil {
+ return err
+ }
+ if messageType != websocket.BinaryMessage && messageType != websocket.TextMessage {
+ continue
+ }
+ _, err = io.Copy(w, r)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func writeLoop(ws *websocket.Conn, r io.Reader) error {
+ for {
+ var buf [2048]byte
+ n, err := r.Read(buf[:])
+ if err != nil {
+ return err
+ }
+ data := buf[:n]
+ w, err := ws.NextWriter(websocket.BinaryMessage)
+ if err != nil {
+ return err
+ }
+ n, err = w.Write(data)
+ if err != nil {
+ return err
+ }
+ err = w.Close()
+ if err != nil {
+ return err
+ }
+ }
+}
+
+// websocket.Conn methods start returning websocket.CloseError after the
+// connection has been closed. We want to instead interpret that as io.EOF, just
+// as you would find with a normal net.Conn. This only converts
+// websocket.CloseErrors with known codes; other codes like CloseProtocolError
+// and CloseAbnormalClosure will still be reported as anomalous.
+func closeErrorToEOF(err error) error {
+ if websocket.IsCloseError(err, websocket.CloseNormalClosure, websocket.CloseNoStatusReceived) {
+ err = io.EOF
+ }
+ return err
}
// Create a new Conn.
func New(ws *websocket.Conn) *Conn {
- var conn Conn
- conn.Ws = ws
- return &conn
+ // Set up synchronous pipes to serialize reads and writes to the
+ // underlying websocket.Conn.
+ //
+ // https://godoc.org/github.com/gorilla/websocket#hdr-Concurrency
+ // "Connections support one concurrent reader and one concurrent writer.
+ // Applications are responsible for ensuring that no more than one
+ // goroutine calls the write methods (NextWriter, etc.) concurrently and
+ // that no more than one goroutine calls the read methods (NextReader,
+ // etc.) concurrently. The Close and WriteControl methods can be called
+ // concurrently with all other methods."
+ pr1, pw1 := io.Pipe()
+ go func() {
+ pw1.CloseWithError(closeErrorToEOF(readLoop(pw1, ws)))
+ }()
+ pr2, pw2 := io.Pipe()
+ go func() {
+ pr2.CloseWithError(closeErrorToEOF(writeLoop(ws, pr2)))
+ }()
+ return &Conn{
+ ws: ws,
+ Reader: pr1,
+ Writer: pw2,
+ }
}
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits