[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [pluggable-transports/meek] 07/09: Reqwrite PollingPacketConn in terms of QueuePacketConn.
This is an automated email from the git hooks/post-receive script.
dcf pushed a commit to branch turbotunnel
in repository pluggable-transports/meek.
commit a61ec4e3c8ab2a606f643f3a8159e3515295bcda
Author: David Fifield <david@xxxxxxxxxxxxxxx>
AuthorDate: Fri Oct 28 00:22:03 2022 -0600
Reqwrite PollingPacketConn in terms of QueuePacketConn.
As in Champa. Use one main polling loop, not multiple, but execute each
poll asynchronously.
---
meek-client/turbotunnel.go | 242 ++++++++++++++-------------------------------
1 file changed, 75 insertions(+), 167 deletions(-)
diff --git a/meek-client/turbotunnel.go b/meek-client/turbotunnel.go
index d4274e2..cbb3380 100644
--- a/meek-client/turbotunnel.go
+++ b/meek-client/turbotunnel.go
@@ -10,11 +10,9 @@ package main
import (
"bytes"
- "errors"
+ "context"
"io"
"net"
- "sync"
- "sync/atomic"
"time"
"git.torproject.org/pluggable-transports/meek.git/common/encapsulation"
@@ -22,18 +20,11 @@ import (
)
const (
- // The size of receive and send queues.
- queueSize = 256
-
// The size of the largest bundle of packets we will send in a poll.
// (Actually it's not quite a maximum, we will quit bundling as soon as
// it is exceeded.)
maxSendBundleLength = 0x10000
- // How many goroutines stand ready to do a poll when an outgoing packet
- // needs to be sent.
- numRequestLoops = 32
-
// We must poll the server to see if it has anything to send; there is
// no way for the server to push data back to us until we poll it. When
// a timer expires, we send a request even if it has an empty body. The
@@ -66,16 +57,13 @@ type PollingPacketConn struct {
clientID turbotunnel.ClientID
remoteAddr net.Addr
poller Poller
- recvQueue chan []byte
- sendQueue chan []byte
- // Sending on pollChan permits requestLoop to send an empty polling
- // query. requestLoop also does its own polling according to a time
- // schedule.
- pollChan chan struct{}
- closeOnce sync.Once
- closed chan struct{}
- // What error to return when the PollingPacketConn is closed.
- err atomic.Value
+ ctx context.Context
+ cancel context.CancelFunc
+ // QueuePacketConn is the direct receiver of ReadFrom and WriteTo calls.
+ // requestLoop, via send, removes messages from the outgoing queue that
+ // were placed there by WriteTo, and inserts messages into the incoming
+ // queue to be returned from ReadFrom.
+ *turbotunnel.QueuePacketConn
}
// NewPollingPacketConn creates a PollingPacketConn with a random ClientID as
@@ -83,99 +71,31 @@ type PollingPacketConn struct {
// ReadFrom, and the RemoteAddr method; is is poller that really controls the
// effective remote address.
func NewPollingPacketConn(remoteAddr net.Addr, poller Poller) *PollingPacketConn {
+ clientID := turbotunnel.NewClientID()
+ ctx, cancel := context.WithCancel(context.Background())
c := &PollingPacketConn{
- clientID: turbotunnel.NewClientID(),
- remoteAddr: remoteAddr,
- poller: poller,
- recvQueue: make(chan []byte, queueSize),
- sendQueue: make(chan []byte, queueSize),
- pollChan: make(chan struct{}),
- closed: make(chan struct{}),
- }
- for i := 0; i < numRequestLoops; i++ {
- go c.requestLoop()
+ clientID: clientID,
+ remoteAddr: remoteAddr,
+ poller: poller,
+ ctx: ctx,
+ cancel: cancel,
+ QueuePacketConn: turbotunnel.NewQueuePacketConn(clientID, 0),
}
+ go c.requestLoop()
return c
}
-var errClosedPacketConn = errors.New("operation on closed connection")
-var errNotImplemented = errors.New("not implemented")
-
-// ReadFrom returns a packet received from a previous poll, blocking until there
-// is a packet to return. Unless the returned error is non-nil, the returned
-// net.Addr is always c.RemoteAddr(),
-func (c *PollingPacketConn) ReadFrom(p []byte) (int, net.Addr, error) {
- select {
- case <-c.closed:
- return 0, nil, &net.OpError{Op: "read", Net: c.RemoteAddr().Network(), Source: c.LocalAddr(), Addr: c.RemoteAddr(), Err: c.err.Load().(error)}
- default:
- }
- select {
- case <-c.closed:
- return 0, nil, &net.OpError{Op: "read", Net: c.RemoteAddr().Network(), Source: c.LocalAddr(), Addr: c.RemoteAddr(), Err: c.err.Load().(error)}
- case buf := <-c.recvQueue:
- return copy(p, buf), c.RemoteAddr(), nil
- }
-}
-
-// WriteTo queues a packet to be sent (possibly batched) by the underlying
-// poller.
-func (c *PollingPacketConn) WriteTo(p []byte, addr net.Addr) (int, error) {
- // The addr argument is ignored.
- select {
- case <-c.closed:
- return 0, &net.OpError{Op: "write", Net: c.RemoteAddr().Network(), Source: c.LocalAddr(), Addr: c.RemoteAddr(), Err: c.err.Load().(error)}
- default:
- }
- // Copy the slice so that the caller may reuse it.
- buf := make([]byte, len(p))
- copy(buf, p)
- select {
- case c.sendQueue <- buf:
- return len(buf), nil
- default:
- // Drop the outgoing packet if the send queue is full.
- return len(buf), nil
- }
-}
-
-// closeWithError unblocks pending operations and makes future operations fail
-// with the given error. If err is nil, it becomes errClosedPacketConn.
-func (c *PollingPacketConn) closeWithError(err error) error {
- var newlyClosed bool
- c.closeOnce.Do(func() {
- newlyClosed = true
- // Store the error to be returned by future PacketConn
- // operations.
- if err == nil {
- err = errClosedPacketConn
- }
- c.err.Store(err)
- close(c.closed)
- })
- if !newlyClosed {
- return &net.OpError{Op: "close", Net: c.LocalAddr().Network(), Addr: c.LocalAddr(), Err: c.err.Load().(error)}
- }
- return nil
-}
-
-// Close unblocks pending operations and makes future operations fail with a
-// "closed connection" error.
+// Close cancels any in-progress polls and closes the underlying
+// QueuePacketConn.
func (c *PollingPacketConn) Close() error {
- return c.closeWithError(nil)
+ c.cancel()
+ return c.QueuePacketConn.Close()
}
-// LocalAddr returns this connection's random Client ID.
-func (c *PollingPacketConn) LocalAddr() net.Addr { return c.clientID }
-
// RemoteAddr returns the remoteAddr value that was passed to
// NewPollingPacketConn.
func (c *PollingPacketConn) RemoteAddr() net.Addr { return c.remoteAddr }
-func (c *PollingPacketConn) SetDeadline(t time.Time) error { return errNotImplemented }
-func (c *PollingPacketConn) SetReadDeadline(t time.Time) error { return errNotImplemented }
-func (c *PollingPacketConn) SetWriteDeadline(t time.Time) error { return errNotImplemented }
-
func (c *PollingPacketConn) requestLoop() {
pollDelay := initPollDelay
pollTimer := time.NewTimer(pollDelay)
@@ -184,52 +104,31 @@ func (c *PollingPacketConn) requestLoop() {
body.Write(c.clientID[:])
var p []byte
+ unstash := c.QueuePacketConn.Unstash(c.remoteAddr)
+ outgoing := c.QueuePacketConn.OutgoingQueue(c.remoteAddr)
pollTimerExpired := false
- // Block, waiting for at least one packet.
+ // Block, waiting for one packet or a demand to poll. Prioritize
+ // taking a packet from the stash, then taking one from the
+ // outgoing queue, then finally also consider polls.
select {
- case <-c.closed:
+ case <-c.ctx.Done():
return
- case p = <-c.sendQueue:
+ case p = <-unstash:
default:
select {
- case <-c.closed:
+ case <-c.ctx.Done():
return
- case p = <-c.sendQueue:
- case <-c.pollChan:
- p = nil
- case <-pollTimer.C:
- p = nil
- pollTimerExpired = true
- }
- }
-
- if len(p) > 0 {
- // Encapsulate the packet into the request body.
- encapsulation.WriteData(&body, p)
-
- // A data-carrying request displaces one pending poll
- // opportunity, if any.
- select {
- case <-c.pollChan:
- default:
- }
- }
-
- // Send any other packets that are immediately available, up to
- // maxSendBundleLength.
- loop:
- // TODO: It would be better if maxSendBundleLength were a true
- // maximum (we don't remove a packet from c.sendQueue unless it
- // fits in the remaining length). That would also allow for
- // arbitrary shaping, along with encapsulation.WritePadding.
- for body.Len() < maxSendBundleLength {
- select {
- case <-c.closed:
- return
- case p := <-c.sendQueue:
- encapsulation.WriteData(&body, p)
+ case p = <-unstash:
+ case p = <-outgoing:
default:
- break loop
+ select {
+ case <-c.ctx.Done():
+ return
+ case p = <-unstash:
+ case p = <-outgoing:
+ case <-pollTimer.C:
+ pollTimerExpired = true
+ }
}
}
@@ -241,8 +140,7 @@ func (c *PollingPacketConn) requestLoop() {
pollDelay = maxPollDelay
}
} else {
- // We're sending an actual data packet, or we're polling
- // in response to a received packet. Reset the poll
+ // We're sending an actual data packet. Reset the poll
// delay to initial.
if !pollTimer.Stop() {
<-pollTimer.C
@@ -251,36 +149,46 @@ func (c *PollingPacketConn) requestLoop() {
}
pollTimer.Reset(pollDelay)
- resp, err := c.poller.Poll(&body)
- if err != nil {
- c.closeWithError(err)
- return
- }
- defer resp.Close()
+ // Grab as many more packets as are immediately available and
+ // fit in maxSendBundleLength. Always include the first packet,
+ // even if it doesn't fit.
+ first := true
+ for len(p) > 0 && (first || body.Len()+len(p) <= maxSendBundleLength) {
+ first = false
+
+ // Encapsulate the packet into the request body.
+ encapsulation.WriteData(&body, p)
- queuedPoll := false
- for {
- p, err := encapsulation.ReadData(resp)
- if err == io.EOF {
- break
- } else if err != nil {
- c.closeWithError(err)
- break
- }
select {
- case c.recvQueue <- p:
+ case p = <-outgoing:
default:
- // Drop incoming packets when queue is full.
+ p = nil
+ }
+ }
+ if len(p) > 0 {
+ // We read an actual packet, but it didn't fit under the
+ // limit. Stash it so that it will be first in line for
+ // the next poll.
+ c.QueuePacketConn.Stash(p, c.remoteAddr)
+ }
+
+ go func() {
+ resp, err := c.poller.Poll(&body)
+ if err != nil {
+ c.Close()
+ return
}
- if !queuedPoll {
- queuedPoll = true
- for i := 0; i < 2; i++ {
- select {
- case c.pollChan <- struct{}{}:
- default:
- }
+ defer resp.Close()
+ for {
+ p, err := encapsulation.ReadData(resp)
+ if err == io.EOF {
+ break
+ } else if err != nil {
+ c.Close()
+ return
}
+ c.QueuePacketConn.QueueIncoming(p, c.remoteAddr)
}
- }
+ }()
}
}
--
To stop receiving notification emails like this one, please contact
the administrator of this repository.
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits