[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [obfs4/master] transports/meeklite: Cleanups, bugfixes and improvements



commit 816cff15f425d0cb87a1b996366989aa01833f99
Author: Yawning Angel <yawning@xxxxxxxxxxxxxxx>
Date:   Sun Jan 20 16:14:28 2019 +0000

    transports/meeklite: Cleanups, bugfixes and improvements
    
     * Properly close the response body on HTTP error.
     * Cleanup close signaling.
     * Write() should return faster on closed connections.
---
 transports/meeklite/meek.go | 70 +++++++++++++++++++++------------------------
 1 file changed, 32 insertions(+), 38 deletions(-)

diff --git a/transports/meeklite/meek.go b/transports/meeklite/meek.go
index a99556b..39a6d4b 100644
--- a/transports/meeklite/meek.go
+++ b/transports/meeklite/meek.go
@@ -39,6 +39,7 @@ import (
 	"net"
 	"net/http"
 	gourl "net/url"
+	"os"
 	"runtime"
 	"sync"
 	"time"
@@ -107,16 +108,14 @@ func newClientArgs(args *pt.Args) (ca *meekClientArgs, err error) {
 }
 
 type meekConn struct {
-	sync.Mutex
-
 	args      *meekClientArgs
 	sessionID string
 	transport *http.Transport
 
-	workerRunning   bool
+	closeOnce       sync.Once
 	workerWrChan    chan []byte
 	workerRdChan    chan []byte
-	workerCloseChan chan bool
+	workerCloseChan chan struct{}
 	rdBuf           *bytes.Buffer
 }
 
@@ -154,11 +153,10 @@ func (c *meekConn) Read(p []byte) (n int, err error) {
 
 func (c *meekConn) Write(b []byte) (n int, err error) {
 	// Check to see if the connection is actually open.
-	c.Lock()
-	closed := !c.workerRunning
-	c.Unlock()
-	if closed {
+	select {
+	case <-c.workerCloseChan:
 		return 0, io.ErrClosedPipe
+	default:
 	}
 
 	if len(b) == 0 {
@@ -168,9 +166,7 @@ func (c *meekConn) Write(b []byte) (n int, err error) {
 	// Copy the data to be written to a new slice, since
 	// we return immediately after queuing and the peer can
 	// happily reuse `b` before data has been sent.
-	toWrite := len(b)
-	b2 := make([]byte, toWrite)
-	copy(b2, b)
+	b2 := append([]byte{}, b...)
 	if ok := c.enqueueWrite(b2); !ok {
 		// Technically we did enqueue data, but the worker's
 		// got closed out from under us.
@@ -181,18 +177,15 @@ func (c *meekConn) Write(b []byte) (n int, err error) {
 }
 
 func (c *meekConn) Close() error {
-	// Ensure that we do this once and only once.
-	c.Lock()
-	defer c.Unlock()
-	if !c.workerRunning {
-		return nil
-	}
+	err := os.ErrClosed
 
-	// Tear down the worker.
-	c.workerRunning = false
-	c.workerCloseChan <- true
+	c.closeOnce.Do(func() {
+		// Tear down the worker, if it is still running.
+		close(c.workerCloseChan)
+		err = nil
+	})
 
-	return nil
+	return err
 }
 
 func (c *meekConn) LocalAddr() net.Addr {
@@ -216,7 +209,11 @@ func (c *meekConn) SetWriteDeadline(t time.Time) error {
 }
 
 func (c *meekConn) enqueueWrite(b []byte) (ok bool) {
-	defer func() { _ = recover() }()
+	defer func() {
+		if err := recover(); err != nil {
+			ok = false
+		}
+	}()
 	c.workerWrChan <- b
 	return true
 }
@@ -249,14 +246,16 @@ func (c *meekConn) roundTrip(sndBuf []byte) (recvBuf []byte, err error) {
 		if err != nil {
 			return nil, err
 		}
-		if resp.StatusCode != http.StatusOK {
-			err = fmt.Errorf("status code was %d, not %d", resp.StatusCode, http.StatusOK)
-			time.Sleep(retryDelay)
-		} else {
-			defer resp.Body.Close()
+
+		if resp.StatusCode == http.StatusOK {
 			recvBuf, err = ioutil.ReadAll(io.LimitReader(resp.Body, maxPayloadLength))
+			resp.Body.Close()
 			return
 		}
+
+		resp.Body.Close()
+		err = fmt.Errorf("status code was %d, not %d", resp.StatusCode, http.StatusOK)
+		time.Sleep(retryDelay)
 	}
 	return
 }
@@ -264,8 +263,8 @@ func (c *meekConn) roundTrip(sndBuf []byte) (recvBuf []byte, err error) {
 func (c *meekConn) ioWorker() {
 	interval := initPollInterval
 	var sndBuf, leftBuf []byte
-loop:
 
+loop:
 	for {
 		sndBuf = nil
 		select {
@@ -316,7 +315,7 @@ loop:
 			// Sent data, poll immediately.
 			interval = 0
 		} else if interval == 0 {
-			// Neither sent nor received data, initialize the delay.
+			// Neither sent nor received data after a poll, re-initialize the delay.
 			interval = initPollInterval
 		} else {
 			// Apply a multiplicative backoff.
@@ -334,11 +333,8 @@ loop:
 	close(c.workerRdChan)
 	close(c.workerWrChan)
 
-	// In case the close was done on an error condition, update the state
-	// variable so that further calls to Write() will fail.
-	c.Lock()
-	defer c.Unlock()
-	c.workerRunning = false
+	// Close the connection (extra calls to Close() are harmless).
+	_ = c.Close()
 }
 
 func newMeekConn(network, addr string, dialFn base.DialFunc, ca *meekClientArgs) (net.Conn, error) {
@@ -347,15 +343,13 @@ func newMeekConn(network, addr string, dialFn base.DialFunc, ca *meekClientArgs)
 		return nil, err
 	}
 
-	tr := &http.Transport{Dial: dialFn}
 	conn := &meekConn{
 		args:            ca,
 		sessionID:       id,
-		transport:       tr,
-		workerRunning:   true,
+		transport:       &http.Transport{Dial: dialFn},
 		workerWrChan:    make(chan []byte, maxChanBacklog),
 		workerRdChan:    make(chan []byte, maxChanBacklog),
-		workerCloseChan: make(chan bool),
+		workerCloseChan: make(chan struct{}),
 	}
 
 	// Start the I/O worker.



_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits