[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [pluggable-transports/meek] 05/09: Import server send loop from Champa.



This is an automated email from the git hooks/post-receive script.

dcf pushed a commit to branch turbotunnel
in repository pluggable-transports/meek.

commit 0b83ea61d699eaed82af0b6e1c1a8a70d92f44fe
Author: David Fifield <david@xxxxxxxxxxxxxxx>
AuthorDate: Thu Oct 27 18:54:21 2022 -0600

    Import server send loop from Champa.
    
    Enforces maxPayloadLength (by stashing packets that exceed the limit)
    and should have lower latency because it does not wait for the maximum
    delay every time.
---
 meek-server/meek-server.go | 70 ++++++++++++++++++++++++++++++++++++----------
 1 file changed, 55 insertions(+), 15 deletions(-)

diff --git a/meek-server/meek-server.go b/meek-server/meek-server.go
index f971ab8..f715107 100644
--- a/meek-server/meek-server.go
+++ b/meek-server/meek-server.go
@@ -55,7 +55,7 @@ const (
 	// chunk of data we'll send back in a response.
 	maxPayloadLength = 0x10000
 	// How long we try to read from the OR port before closing a response.
-	turnaroundTimeout = 10 * time.Millisecond
+	turnaroundTimeout = 100 * time.Millisecond
 	// Passed as ReadTimeout and WriteTimeout when constructing the
 	// http.Server.
 	readWriteTimeout = 20 * time.Second
@@ -169,26 +169,66 @@ func (state *State) Post(w http.ResponseWriter, req *http.Request) {
 		state.conn.QueueIncoming(p, clientID)
 	}
 
-	// Write outgoing packets, if any, up to turnaroundTimeout.
 	w.Header().Set("Content-Type", "application/octet-stream")
-	outgoing := state.conn.OutgoingQueue(clientID)
+	// Write outgoing packets, if any. We wait up to turnaroundTimeout for
+	// the first available packet; after that we only include whatever
+	// packets are immediately available.
+	limit := maxPayloadLength
 	timer := time.NewTimer(turnaroundTimeout)
-loop:
+	defer timer.Stop()
+	first := true
 	for {
+		var p []byte
+		unstash := state.conn.Unstash(clientID)
+		outgoing := state.conn.OutgoingQueue(clientID)
+		// Prioritize taking a packet first from the stash, then from
+		// the outgoing queue, then finally check for expiration of the
+		// timer. (We continue to bundle packets even after the timer
+		// expires, as long as the packets are immediately available.)
 		select {
-		case <-timer.C:
-			break loop
-		case p := <-outgoing:
-			_, err := encapsulation.WriteData(w, p)
-			if err != nil {
-				break loop
-			}
-			// Flush after each chunk, this is important for
-			// latency.
-			if w, ok := w.(http.Flusher); ok {
-				w.Flush()
+		case p = <-unstash:
+		default:
+			select {
+			case p = <-unstash:
+			case p = <-outgoing:
+			default:
+				select {
+				case p = <-unstash:
+				case p = <-outgoing:
+				case <-timer.C:
+				}
 			}
 		}
+		// We wait for the first packet only. Later packets must be
+		// immediately available.
+		timer.Reset(0)
+
+		if len(p) == 0 {
+			// Timer expired, we are done bundling packets into this
+			// response.
+			break
+		}
+
+		limit -= len(p)
+		if !first && limit < 0 {
+			// This packet doesn't fit in the payload size limit.
+			// Stash it so that it will be first in line for the
+			// next response.
+			state.conn.Stash(p, clientID)
+			break
+		}
+		first = false
+
+		// Write the packet to the HTTP response.
+		_, err := encapsulation.WriteData(w, p)
+		if err != nil {
+			log.Printf("encapsulation.WriteData: %v", err)
+			break
+		}
+		// Flush after each chunk, this is important for latency.
+		if rw, ok := w.(http.Flusher); ok {
+			rw.Flush()
+		}
 	}
 }
 

-- 
To stop receiving notification emails like this one, please contact
the administrator of this repository.
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits