[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [pluggable-transports/meek] 08/09: Halt requestLoop when PollingPacketConn is closed.



This is an automated email from the git hooks/post-receive script.

dcf pushed a commit to branch turbotunnel
in repository pluggable-transports/meek.

commit 46c988b2582fcfc9d6ff17114a1961a058edd556
Author: David Fifield <david@xxxxxxxxxxxxxxx>
AuthorDate: Fri Oct 28 01:01:17 2022 -0600

    Halt requestLoop when PollingPacketConn is closed.
---
 meek-client/meek-client.go      |  4 +++-
 meek-client/turbotunnel.go      |  4 ++--
 meek-client/turbotunnel_test.go | 50 +++++++++++++++++++++++++++++++++--------
 3 files changed, 46 insertions(+), 12 deletions(-)

diff --git a/meek-client/meek-client.go b/meek-client/meek-client.go
index 683aa90..813a373 100644
--- a/meek-client/meek-client.go
+++ b/meek-client/meek-client.go
@@ -27,6 +27,7 @@
 package main
 
 import (
+	"context"
 	"flag"
 	"fmt"
 	"io"
@@ -97,13 +98,14 @@ type RequestInfo struct {
 	RoundTripper http.RoundTripper
 }
 
-func (info *RequestInfo) Poll(out io.Reader) (in io.ReadCloser, err error) {
+func (info *RequestInfo) Poll(ctx context.Context, out io.Reader) (in io.ReadCloser, err error) {
 	req, err := http.NewRequest("POST", info.URL.String(), out)
 	// Prevent Content-Type sniffing by net/http and middleboxes.
 	req.Header.Set("Content-Type", "application/octet-stream")
 	if err != nil {
 		return nil, err
 	}
+	req = req.WithContext(ctx)
 	if info.Host != "" {
 		req.Host = info.Host
 	}
diff --git a/meek-client/turbotunnel.go b/meek-client/turbotunnel.go
index cbb3380..43c0c46 100644
--- a/meek-client/turbotunnel.go
+++ b/meek-client/turbotunnel.go
@@ -40,7 +40,7 @@ const (
 // Poller is an abstract interface over an operation that writes a stream of
 // bytes and reads a stream of bytes in return, like an HTTP request.
 type Poller interface {
-	Poll(out io.Reader) (in io.ReadCloser, err error)
+	Poll(ctx context.Context, out io.Reader) (in io.ReadCloser, err error)
 }
 
 // PollingPacketConn implements the net.PacketConn interface over a carrier of
@@ -173,7 +173,7 @@ func (c *PollingPacketConn) requestLoop() {
 		}
 
 		go func() {
-			resp, err := c.poller.Poll(&body)
+			resp, err := c.poller.Poll(c.ctx, &body)
 			if err != nil {
 				c.Close()
 				return
diff --git a/meek-client/turbotunnel_test.go b/meek-client/turbotunnel_test.go
index 11640ec..89aadab 100644
--- a/meek-client/turbotunnel_test.go
+++ b/meek-client/turbotunnel_test.go
@@ -2,6 +2,7 @@ package main
 
 import (
 	"bytes"
+	"context"
 	"errors"
 	"io"
 	"io/ioutil"
@@ -15,24 +16,58 @@ func (_ emptyAddr) Network() string { return "empty" }
 func (_ emptyAddr) String() string  { return "empty" }
 
 type funcPoller struct {
-	poll func(out io.Reader) (in io.ReadCloser, err error)
+	poll func(ctx context.Context, out io.Reader) (in io.ReadCloser, err error)
 }
 
-func (fp funcPoller) Poll(out io.Reader) (in io.ReadCloser, err error) {
-	return fp.poll(out)
+func (fp funcPoller) Poll(ctx context.Context, out io.Reader) (in io.ReadCloser, err error) {
+	return fp.poll(ctx, out)
+}
+
+// TestCloseCancelsPoll tests that calling Close cancels the context passed to
+// the poller.
+func TestCloseCancelsPoll(t *testing.T) {
+	beginCh := make(chan struct{})
+	resultCh := make(chan error)
+	// The poller returns immediately with a nil error when its context is
+	// canceled. It returns after a delay with a non-nil error if its
+	// context is not canceled.
+	poller := funcPoller{poll: func(ctx context.Context, _ io.Reader) (io.ReadCloser, error) {
+		defer close(resultCh)
+		beginCh <- struct{}{}
+		select {
+		case <-ctx.Done():
+			resultCh <- nil
+		case <-time.After(5 * time.Second):
+			resultCh <- errors.New("poll was not canceled")
+		}
+		return ioutil.NopCloser(bytes.NewReader(nil)), nil
+	}}
+	pconn := NewPollingPacketConn(emptyAddr{}, poller)
+	// Wait until the poll function has been called.
+	<-beginCh
+	// Close the connection.
+	err := pconn.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+	// Observe what happened inside the poll function. Closing the
+	// connection should have canceled the context.
+	err = <-resultCh
+	if err != nil {
+		t.Fatal(err)
+	}
 }
 
 // TestCloseHaltsRequestLoop tests that requestLoop terminates and stops calling
 // its Poller after Close is called.
 func TestCloseHaltsRequestLoop(t *testing.T) {
-	closedCh := make(chan struct{})
 	resultCh := make(chan error)
 	// The poller returns immediately with a nil error as long as closedCh
 	// is not closed. When closedCh is closed, the poller returns
 	// immediately with a non-nil error.
-	poller := funcPoller{poll: func(_ io.Reader) (io.ReadCloser, error) {
+	poller := funcPoller{poll: func(ctx context.Context, _ io.Reader) (io.ReadCloser, error) {
 		select {
-		case <-closedCh:
+		case <-ctx.Done():
 			resultCh <- errors.New("poll called after close")
 		default:
 		}
@@ -44,9 +79,6 @@ func TestCloseHaltsRequestLoop(t *testing.T) {
 	if err != nil {
 		t.Fatal(err)
 	}
-	// Tell the poll function to return an error if it is called after this
-	// point.
-	close(closedCh)
 	// Wait a few seconds to see if the poll function is called after the
 	// conn is closed.
 	select {

-- 
To stop receiving notification emails like this one, please contact
the administrator of this repository.
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits