[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]

[tor-commits] [pluggable-transports/meek] 04/09: Import RemoteMap from Champa/dnstt.



This is an automated email from the git hooks/post-receive script.

dcf pushed a commit to branch turbotunnel
in repository pluggable-transports/meek.

commit 70b33a768d66497e4ed8f02e8e4bba10f2e26ab7
Author: David Fifield <david@xxxxxxxxxxxxxxx>
AuthorDate: Thu Oct 27 18:33:57 2022 -0600

    Import RemoteMap from Champa/dnstt.
    
    Adds support for a one-packet "stash," and closes send queues when
    expiring.
---
 common/turbotunnel/client_map.go      | 144 ---------------------------
 common/turbotunnel/queuepacketconn.go |  37 +++++--
 common/turbotunnel/remotemap.go       | 177 ++++++++++++++++++++++++++++++++++
 3 files changed, 208 insertions(+), 150 deletions(-)

diff --git a/common/turbotunnel/client_map.go b/common/turbotunnel/client_map.go
deleted file mode 100644
index 733480a..0000000
--- a/common/turbotunnel/client_map.go
+++ /dev/null
@@ -1,144 +0,0 @@
-package turbotunnel
-
-import (
-	"container/heap"
-	"net"
-	"sync"
-	"time"
-)
-
-// clientRecord is a record of a recently seen client, with the time it was last
-// seen and a send queue.
-type clientRecord struct {
-	Addr      net.Addr
-	LastSeen  time.Time
-	SendQueue chan []byte
-}
-
-// ClientMap manages a mapping of live clients (keyed by address, which will be
-// a ClientID) to their respective send queues. ClientMap's functions are safe
-// to call from multiple goroutines.
-type ClientMap struct {
-	// We use an inner structure to avoid exposing public heap.Interface
-	// functions to users of clientMap.
-	inner clientMapInner
-	// Synchronizes access to inner.
-	lock sync.Mutex
-}
-
-// NewClientMap creates a ClientMap that expires clients after a timeout.
-//
-// The timeout does not have to be kept in sync with smux's internal idle
-// timeout. If a client is removed from the client map while the smux session is
-// still live, the worst that can happen is a loss of whatever packets were in
-// the send queue at the time. If smux later decides to send more packets to the
-// same client, we'll instantiate a new send queue, and if the client ever
-// connects again with the proper client ID, we'll deliver them.
-func NewClientMap(timeout time.Duration) *ClientMap {
-	m := &ClientMap{
-		inner: clientMapInner{
-			byAge:  make([]*clientRecord, 0),
-			byAddr: make(map[net.Addr]int),
-		},
-	}
-	go func() {
-		for {
-			time.Sleep(timeout / 2)
-			now := time.Now()
-			m.lock.Lock()
-			m.inner.removeExpired(now, timeout)
-			m.lock.Unlock()
-		}
-	}()
-	return m
-}
-
-// SendQueue returns the send queue corresponding to addr, creating it if
-// necessary.
-func (m *ClientMap) SendQueue(addr net.Addr) chan []byte {
-	m.lock.Lock()
-	defer m.lock.Unlock()
-	return m.inner.SendQueue(addr, time.Now())
-}
-
-// clientMapInner is the inner type of ClientMap, implementing heap.Interface.
-// byAge is the backing store, a heap ordered by LastSeen time, to facilitate
-// expiring old client records. byAddr is a map from addresses (i.e., ClientIDs)
-// to heap indices, to allow looking up by address. Unlike ClientMap,
-// clientMapInner requires external synchonization.
-type clientMapInner struct {
-	byAge  []*clientRecord
-	byAddr map[net.Addr]int
-}
-
-// removeExpired removes all client records whose LastSeen timestamp is more
-// than timeout in the past.
-func (inner *clientMapInner) removeExpired(now time.Time, timeout time.Duration) {
-	for len(inner.byAge) > 0 && now.Sub(inner.byAge[0].LastSeen) >= timeout {
-		heap.Pop(inner)
-	}
-}
-
-// SendQueue finds the existing client record corresponding to addr, or creates
-// a new one if none exists yet. It updates the client record's LastSeen time
-// and returns its SendQueue.
-func (inner *clientMapInner) SendQueue(addr net.Addr, now time.Time) chan []byte {
-	var record *clientRecord
-	i, ok := inner.byAddr[addr]
-	if ok {
-		// Found one, update its LastSeen.
-		record = inner.byAge[i]
-		record.LastSeen = now
-		heap.Fix(inner, i)
-	} else {
-		// Not found, create a new one.
-		record = &clientRecord{
-			Addr:      addr,
-			LastSeen:  now,
-			SendQueue: make(chan []byte, queueSize),
-		}
-		heap.Push(inner, record)
-	}
-	return record.SendQueue
-}
-
-// heap.Interface for clientMapInner.
-
-func (inner *clientMapInner) Len() int {
-	if len(inner.byAge) != len(inner.byAddr) {
-		panic("inconsistent clientMap")
-	}
-	return len(inner.byAge)
-}
-
-func (inner *clientMapInner) Less(i, j int) bool {
-	return inner.byAge[i].LastSeen.Before(inner.byAge[j].LastSeen)
-}
-
-func (inner *clientMapInner) Swap(i, j int) {
-	inner.byAge[i], inner.byAge[j] = inner.byAge[j], inner.byAge[i]
-	inner.byAddr[inner.byAge[i].Addr] = i
-	inner.byAddr[inner.byAge[j].Addr] = j
-}
-
-func (inner *clientMapInner) Push(x interface{}) {
-	record := x.(*clientRecord)
-	if _, ok := inner.byAddr[record.Addr]; ok {
-		panic("duplicate address in clientMap")
-	}
-	// Insert into byAddr map.
-	inner.byAddr[record.Addr] = len(inner.byAge)
-	// Insert into byAge slice.
-	inner.byAge = append(inner.byAge, record)
-}
-
-func (inner *clientMapInner) Pop() interface{} {
-	n := len(inner.byAddr)
-	// Remove from byAge slice.
-	record := inner.byAge[n-1]
-	inner.byAge[n-1] = nil
-	inner.byAge = inner.byAge[:n-1]
-	// Remove from byAddr map.
-	delete(inner.byAddr, record.Addr)
-	return record
-}
diff --git a/common/turbotunnel/queuepacketconn.go b/common/turbotunnel/queuepacketconn.go
index acb04c1..eb4df4b 100644
--- a/common/turbotunnel/queuepacketconn.go
+++ b/common/turbotunnel/queuepacketconn.go
@@ -21,8 +21,19 @@ type taggedPacket struct {
 // method inserts a packet into the incoming queue, to eventually be returned by
 // ReadFrom. WriteTo inserts a packet into an address-specific outgoing queue,
 // which can later by accessed through the OutgoingQueue method.
+//
+// Besides the outgoing queues, there is also a one-element "stash" for each
+// remote peer address. You can stash a packet using the Stash method, and get
+// it back later by receiving from the channel returned by Unstash. The stash is
+// meant as a convenient place to temporarily store a single packet, such as
+// when you've read one too many packets from the send queue and need to store
+// the extra packet to be processed first in the next pass. It's the caller's
+// responsibility to Unstash what they have Stashed. Calling Stash does not put
+// the packet at the head of the send queue; if there is the possibility that a
+// packet has been stashed, it must be checked for by calling Unstash in
+// addition to OutgoingQueue.
 type QueuePacketConn struct {
-	clients   *ClientMap
+	remotes   *RemoteMap
 	localAddr net.Addr
 	recvQueue chan taggedPacket
 	closeOnce sync.Once
@@ -31,11 +42,11 @@ type QueuePacketConn struct {
 	err atomic.Value
 }
 
-// NewQueuePacketConn makes a new QueuePacketConn, set to track recent peers for
-// at least a duration of timeout.
+// NewQueuePacketConn makes a new QueuePacketConn, set to track recent peers
+// for at least a duration of timeout.
 func NewQueuePacketConn(localAddr net.Addr, timeout time.Duration) *QueuePacketConn {
 	return &QueuePacketConn{
-		clients:   NewClientMap(timeout),
+		remotes:   NewRemoteMap(timeout),
 		localAddr: localAddr,
 		recvQueue: make(chan taggedPacket, queueSize),
 		closed:    make(chan struct{}),
@@ -65,7 +76,21 @@ func (c *QueuePacketConn) QueueIncoming(p []byte, addr net.Addr) {
 // creating it if necessary. The contents of the queue will be packets that are
 // written to the address in question using WriteTo.
 func (c *QueuePacketConn) OutgoingQueue(addr net.Addr) <-chan []byte {
-	return c.clients.SendQueue(addr)
+	return c.remotes.SendQueue(addr)
+}
+
+// Stash places p in the stash for addr, if the stash is not already occupied.
+// Returns true if the packet was placed in the stash, or false if the stash was
+// already occupied. This method is similar to WriteTo, except that it puts the
+// packet in the stash queue (accessible via Unstash), rather than the outgoing
+// queue (accessible via OutgoingQueue).
+func (c *QueuePacketConn) Stash(p []byte, addr net.Addr) bool {
+	return c.remotes.Stash(addr, p)
+}
+
+// Unstash returns the channel that represents the stash for addr.
+func (c *QueuePacketConn) Unstash(addr net.Addr) <-chan []byte {
+	return c.remotes.Unstash(addr)
 }
 
 // ReadFrom returns a packet and address previously stored by QueueIncoming.
@@ -95,7 +120,7 @@ func (c *QueuePacketConn) WriteTo(p []byte, addr net.Addr) (int, error) {
 	buf := make([]byte, len(p))
 	copy(buf, p)
 	select {
-	case c.clients.SendQueue(addr) <- buf:
+	case c.remotes.SendQueue(addr) <- buf:
 		return len(buf), nil
 	default:
 		// Drop the outgoing packet if the send queue is full.
diff --git a/common/turbotunnel/remotemap.go b/common/turbotunnel/remotemap.go
new file mode 100644
index 0000000..c679bfa
--- /dev/null
+++ b/common/turbotunnel/remotemap.go
@@ -0,0 +1,177 @@
+package turbotunnel
+
+import (
+	"container/heap"
+	"net"
+	"sync"
+	"time"
+)
+
+// remoteRecord is a record of a recently seen remote peer, with the time it was
+// last seen and queues of outgoing packets.
+type remoteRecord struct {
+	Addr      net.Addr
+	LastSeen  time.Time
+	SendQueue chan []byte
+	Stash     chan []byte
+}
+
+// RemoteMap manages a mapping of live remote peers, keyed by address, to their
+// respective send queues. Each peer has two queues: a primary send queue, and a
+// "stash". The primary send queue is returned by the SendQueue method. The
+// stash is an auxiliary one-element queue accessed using the Stash and Unstash
+// methods. The stash is meant for use by callers that need to "unread" a packet
+// that's already been removed from the primary send queue.
+//
+// RemoteMap's functions are safe to call from multiple goroutines.
+type RemoteMap struct {
+	// We use an inner structure to avoid exposing public heap.Interface
+	// functions to users of remoteMap.
+	inner remoteMapInner
+	// Synchronizes access to inner.
+	lock sync.Mutex
+}
+
+// NewRemoteMap creates a RemoteMap that expires peers after a timeout.
+//
+// If the timeout is 0, peers never expire.
+//
+// The timeout does not have to be kept in sync with smux's idle timeout. If a
+// peer is removed from the map while the smux session is still live, the worst
+// that can happen is a loss of whatever packets were in the send queue at the
+// time. If smux later decides to send more packets to the same peer, we'll
+// instantiate a new send queue, and if the peer is ever seen again with a
+// matching address, we'll deliver them.
+func NewRemoteMap(timeout time.Duration) *RemoteMap {
+	m := &RemoteMap{
+		inner: remoteMapInner{
+			byAge:  make([]*remoteRecord, 0),
+			byAddr: make(map[net.Addr]int),
+		},
+	}
+	if timeout > 0 {
+		go func() {
+			for {
+				time.Sleep(timeout / 2)
+				now := time.Now()
+				m.lock.Lock()
+				m.inner.removeExpired(now, timeout)
+				m.lock.Unlock()
+			}
+		}()
+	}
+	return m
+}
+
+// SendQueue returns the send queue corresponding to addr, creating it if
+// necessary.
+func (m *RemoteMap) SendQueue(addr net.Addr) chan []byte {
+	m.lock.Lock()
+	defer m.lock.Unlock()
+	return m.inner.Lookup(addr, time.Now()).SendQueue
+}
+
+// Stash places p in the stash corresponding to addr, if the stash is not
+// already occupied. Returns true if the p was placed in the stash, false
+// otherwise.
+func (m *RemoteMap) Stash(addr net.Addr, p []byte) bool {
+	m.lock.Lock()
+	defer m.lock.Unlock()
+	select {
+	case m.inner.Lookup(addr, time.Now()).Stash <- p:
+		return true
+	default:
+		return false
+	}
+}
+
+// Unstash returns the channel that reads from the stash for addr.
+func (m *RemoteMap) Unstash(addr net.Addr) <-chan []byte {
+	m.lock.Lock()
+	defer m.lock.Unlock()
+	return m.inner.Lookup(addr, time.Now()).Stash
+}
+
+// remoteMapInner is the inner type of RemoteMap, implementing heap.Interface.
+// byAge is the backing store, a heap ordered by LastSeen time, to facilitate
+// expiring old records. byAddr is a map from addresses to heap indices, to
+// allow looking up by address. Unlike RemoteMap, remoteMapInner requires
+// external synchonization.
+type remoteMapInner struct {
+	byAge  []*remoteRecord
+	byAddr map[net.Addr]int
+}
+
+// removeExpired removes all records whose LastSeen timestamp is more than
+// timeout in the past.
+func (inner *remoteMapInner) removeExpired(now time.Time, timeout time.Duration) {
+	for len(inner.byAge) > 0 && now.Sub(inner.byAge[0].LastSeen) >= timeout {
+		record := heap.Pop(inner).(*remoteRecord)
+		close(record.SendQueue)
+	}
+}
+
+// Lookup finds the existing record corresponding to addr, or creates a new
+// one if none exists yet. It updates the record's LastSeen time and returns the
+// record.
+func (inner *remoteMapInner) Lookup(addr net.Addr, now time.Time) *remoteRecord {
+	var record *remoteRecord
+	i, ok := inner.byAddr[addr]
+	if ok {
+		// Found one, update its LastSeen.
+		record = inner.byAge[i]
+		record.LastSeen = now
+		heap.Fix(inner, i)
+	} else {
+		// Not found, create a new one.
+		record = &remoteRecord{
+			Addr:      addr,
+			LastSeen:  now,
+			SendQueue: make(chan []byte, queueSize),
+			Stash:     make(chan []byte, 1),
+		}
+		heap.Push(inner, record)
+	}
+	return record
+}
+
+// heap.Interface for remoteMapInner.
+
+func (inner *remoteMapInner) Len() int {
+	if len(inner.byAge) != len(inner.byAddr) {
+		panic("inconsistent remoteMap")
+	}
+	return len(inner.byAge)
+}
+
+func (inner *remoteMapInner) Less(i, j int) bool {
+	return inner.byAge[i].LastSeen.Before(inner.byAge[j].LastSeen)
+}
+
+func (inner *remoteMapInner) Swap(i, j int) {
+	inner.byAge[i], inner.byAge[j] = inner.byAge[j], inner.byAge[i]
+	inner.byAddr[inner.byAge[i].Addr] = i
+	inner.byAddr[inner.byAge[j].Addr] = j
+}
+
+func (inner *remoteMapInner) Push(x interface{}) {
+	record := x.(*remoteRecord)
+	if _, ok := inner.byAddr[record.Addr]; ok {
+		panic("duplicate address in remoteMap")
+	}
+	// Insert into byAddr map.
+	inner.byAddr[record.Addr] = len(inner.byAge)
+	// Insert into byAge slice.
+	inner.byAge = append(inner.byAge, record)
+}
+
+func (inner *remoteMapInner) Pop() interface{} {
+	n := len(inner.byAddr)
+	// Remove from byAge slice.
+	record := inner.byAge[n-1]
+	inner.byAge[n-1] = nil
+	inner.byAge = inner.byAge[:n-1]
+	// Remove from byAddr map.
+	delete(inner.byAddr, record.Addr)
+	return record
+}

-- 
To stop receiving notification emails like this one, please contact
the administrator of this repository.
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits