[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [pluggable-transports/meek] 04/09: Import RemoteMap from Champa/dnstt.
This is an automated email from the git hooks/post-receive script.
dcf pushed a commit to branch turbotunnel
in repository pluggable-transports/meek.
commit 70b33a768d66497e4ed8f02e8e4bba10f2e26ab7
Author: David Fifield <david@xxxxxxxxxxxxxxx>
AuthorDate: Thu Oct 27 18:33:57 2022 -0600
Import RemoteMap from Champa/dnstt.
Adds support for a one-packet "stash," and closes send queues when
expiring.
---
common/turbotunnel/client_map.go | 144 ---------------------------
common/turbotunnel/queuepacketconn.go | 37 +++++--
common/turbotunnel/remotemap.go | 177 ++++++++++++++++++++++++++++++++++
3 files changed, 208 insertions(+), 150 deletions(-)
diff --git a/common/turbotunnel/client_map.go b/common/turbotunnel/client_map.go
deleted file mode 100644
index 733480a..0000000
--- a/common/turbotunnel/client_map.go
+++ /dev/null
@@ -1,144 +0,0 @@
-package turbotunnel
-
-import (
- "container/heap"
- "net"
- "sync"
- "time"
-)
-
-// clientRecord is a record of a recently seen client, with the time it was last
-// seen and a send queue.
-type clientRecord struct {
- Addr net.Addr
- LastSeen time.Time
- SendQueue chan []byte
-}
-
-// ClientMap manages a mapping of live clients (keyed by address, which will be
-// a ClientID) to their respective send queues. ClientMap's functions are safe
-// to call from multiple goroutines.
-type ClientMap struct {
- // We use an inner structure to avoid exposing public heap.Interface
- // functions to users of clientMap.
- inner clientMapInner
- // Synchronizes access to inner.
- lock sync.Mutex
-}
-
-// NewClientMap creates a ClientMap that expires clients after a timeout.
-//
-// The timeout does not have to be kept in sync with smux's internal idle
-// timeout. If a client is removed from the client map while the smux session is
-// still live, the worst that can happen is a loss of whatever packets were in
-// the send queue at the time. If smux later decides to send more packets to the
-// same client, we'll instantiate a new send queue, and if the client ever
-// connects again with the proper client ID, we'll deliver them.
-func NewClientMap(timeout time.Duration) *ClientMap {
- m := &ClientMap{
- inner: clientMapInner{
- byAge: make([]*clientRecord, 0),
- byAddr: make(map[net.Addr]int),
- },
- }
- go func() {
- for {
- time.Sleep(timeout / 2)
- now := time.Now()
- m.lock.Lock()
- m.inner.removeExpired(now, timeout)
- m.lock.Unlock()
- }
- }()
- return m
-}
-
-// SendQueue returns the send queue corresponding to addr, creating it if
-// necessary.
-func (m *ClientMap) SendQueue(addr net.Addr) chan []byte {
- m.lock.Lock()
- defer m.lock.Unlock()
- return m.inner.SendQueue(addr, time.Now())
-}
-
-// clientMapInner is the inner type of ClientMap, implementing heap.Interface.
-// byAge is the backing store, a heap ordered by LastSeen time, to facilitate
-// expiring old client records. byAddr is a map from addresses (i.e., ClientIDs)
-// to heap indices, to allow looking up by address. Unlike ClientMap,
-// clientMapInner requires external synchonization.
-type clientMapInner struct {
- byAge []*clientRecord
- byAddr map[net.Addr]int
-}
-
-// removeExpired removes all client records whose LastSeen timestamp is more
-// than timeout in the past.
-func (inner *clientMapInner) removeExpired(now time.Time, timeout time.Duration) {
- for len(inner.byAge) > 0 && now.Sub(inner.byAge[0].LastSeen) >= timeout {
- heap.Pop(inner)
- }
-}
-
-// SendQueue finds the existing client record corresponding to addr, or creates
-// a new one if none exists yet. It updates the client record's LastSeen time
-// and returns its SendQueue.
-func (inner *clientMapInner) SendQueue(addr net.Addr, now time.Time) chan []byte {
- var record *clientRecord
- i, ok := inner.byAddr[addr]
- if ok {
- // Found one, update its LastSeen.
- record = inner.byAge[i]
- record.LastSeen = now
- heap.Fix(inner, i)
- } else {
- // Not found, create a new one.
- record = &clientRecord{
- Addr: addr,
- LastSeen: now,
- SendQueue: make(chan []byte, queueSize),
- }
- heap.Push(inner, record)
- }
- return record.SendQueue
-}
-
-// heap.Interface for clientMapInner.
-
-func (inner *clientMapInner) Len() int {
- if len(inner.byAge) != len(inner.byAddr) {
- panic("inconsistent clientMap")
- }
- return len(inner.byAge)
-}
-
-func (inner *clientMapInner) Less(i, j int) bool {
- return inner.byAge[i].LastSeen.Before(inner.byAge[j].LastSeen)
-}
-
-func (inner *clientMapInner) Swap(i, j int) {
- inner.byAge[i], inner.byAge[j] = inner.byAge[j], inner.byAge[i]
- inner.byAddr[inner.byAge[i].Addr] = i
- inner.byAddr[inner.byAge[j].Addr] = j
-}
-
-func (inner *clientMapInner) Push(x interface{}) {
- record := x.(*clientRecord)
- if _, ok := inner.byAddr[record.Addr]; ok {
- panic("duplicate address in clientMap")
- }
- // Insert into byAddr map.
- inner.byAddr[record.Addr] = len(inner.byAge)
- // Insert into byAge slice.
- inner.byAge = append(inner.byAge, record)
-}
-
-func (inner *clientMapInner) Pop() interface{} {
- n := len(inner.byAddr)
- // Remove from byAge slice.
- record := inner.byAge[n-1]
- inner.byAge[n-1] = nil
- inner.byAge = inner.byAge[:n-1]
- // Remove from byAddr map.
- delete(inner.byAddr, record.Addr)
- return record
-}
diff --git a/common/turbotunnel/queuepacketconn.go b/common/turbotunnel/queuepacketconn.go
index acb04c1..eb4df4b 100644
--- a/common/turbotunnel/queuepacketconn.go
+++ b/common/turbotunnel/queuepacketconn.go
@@ -21,8 +21,19 @@ type taggedPacket struct {
// method inserts a packet into the incoming queue, to eventually be returned by
// ReadFrom. WriteTo inserts a packet into an address-specific outgoing queue,
// which can later by accessed through the OutgoingQueue method.
+//
+// Besides the outgoing queues, there is also a one-element "stash" for each
+// remote peer address. You can stash a packet using the Stash method, and get
+// it back later by receiving from the channel returned by Unstash. The stash is
+// meant as a convenient place to temporarily store a single packet, such as
+// when you've read one too many packets from the send queue and need to store
+// the extra packet to be processed first in the next pass. It's the caller's
+// responsibility to Unstash what they have Stashed. Calling Stash does not put
+// the packet at the head of the send queue; if there is the possibility that a
+// packet has been stashed, it must be checked for by calling Unstash in
+// addition to OutgoingQueue.
type QueuePacketConn struct {
- clients *ClientMap
+ remotes *RemoteMap
localAddr net.Addr
recvQueue chan taggedPacket
closeOnce sync.Once
@@ -31,11 +42,11 @@ type QueuePacketConn struct {
err atomic.Value
}
-// NewQueuePacketConn makes a new QueuePacketConn, set to track recent peers for
-// at least a duration of timeout.
+// NewQueuePacketConn makes a new QueuePacketConn, set to track recent peers
+// for at least a duration of timeout.
func NewQueuePacketConn(localAddr net.Addr, timeout time.Duration) *QueuePacketConn {
return &QueuePacketConn{
- clients: NewClientMap(timeout),
+ remotes: NewRemoteMap(timeout),
localAddr: localAddr,
recvQueue: make(chan taggedPacket, queueSize),
closed: make(chan struct{}),
@@ -65,7 +76,21 @@ func (c *QueuePacketConn) QueueIncoming(p []byte, addr net.Addr) {
// creating it if necessary. The contents of the queue will be packets that are
// written to the address in question using WriteTo.
func (c *QueuePacketConn) OutgoingQueue(addr net.Addr) <-chan []byte {
- return c.clients.SendQueue(addr)
+ return c.remotes.SendQueue(addr)
+}
+
+// Stash places p in the stash for addr, if the stash is not already occupied.
+// Returns true if the packet was placed in the stash, or false if the stash was
+// already occupied. This method is similar to WriteTo, except that it puts the
+// packet in the stash queue (accessible via Unstash), rather than the outgoing
+// queue (accessible via OutgoingQueue).
+func (c *QueuePacketConn) Stash(p []byte, addr net.Addr) bool {
+ return c.remotes.Stash(addr, p)
+}
+
+// Unstash returns the channel that represents the stash for addr.
+func (c *QueuePacketConn) Unstash(addr net.Addr) <-chan []byte {
+ return c.remotes.Unstash(addr)
}
// ReadFrom returns a packet and address previously stored by QueueIncoming.
@@ -95,7 +120,7 @@ func (c *QueuePacketConn) WriteTo(p []byte, addr net.Addr) (int, error) {
buf := make([]byte, len(p))
copy(buf, p)
select {
- case c.clients.SendQueue(addr) <- buf:
+ case c.remotes.SendQueue(addr) <- buf:
return len(buf), nil
default:
// Drop the outgoing packet if the send queue is full.
diff --git a/common/turbotunnel/remotemap.go b/common/turbotunnel/remotemap.go
new file mode 100644
index 0000000..c679bfa
--- /dev/null
+++ b/common/turbotunnel/remotemap.go
@@ -0,0 +1,177 @@
+package turbotunnel
+
+import (
+ "container/heap"
+ "net"
+ "sync"
+ "time"
+)
+
+// remoteRecord is a record of a recently seen remote peer, with the time it was
+// last seen and queues of outgoing packets.
+type remoteRecord struct {
+ Addr net.Addr
+ LastSeen time.Time
+ SendQueue chan []byte
+ Stash chan []byte
+}
+
+// RemoteMap manages a mapping of live remote peers, keyed by address, to their
+// respective send queues. Each peer has two queues: a primary send queue, and a
+// "stash". The primary send queue is returned by the SendQueue method. The
+// stash is an auxiliary one-element queue accessed using the Stash and Unstash
+// methods. The stash is meant for use by callers that need to "unread" a packet
+// that's already been removed from the primary send queue.
+//
+// RemoteMap's functions are safe to call from multiple goroutines.
+type RemoteMap struct {
+ // We use an inner structure to avoid exposing public heap.Interface
+ // functions to users of remoteMap.
+ inner remoteMapInner
+ // Synchronizes access to inner.
+ lock sync.Mutex
+}
+
+// NewRemoteMap creates a RemoteMap that expires peers after a timeout.
+//
+// If the timeout is 0, peers never expire.
+//
+// The timeout does not have to be kept in sync with smux's idle timeout. If a
+// peer is removed from the map while the smux session is still live, the worst
+// that can happen is a loss of whatever packets were in the send queue at the
+// time. If smux later decides to send more packets to the same peer, we'll
+// instantiate a new send queue, and if the peer is ever seen again with a
+// matching address, we'll deliver them.
+func NewRemoteMap(timeout time.Duration) *RemoteMap {
+ m := &RemoteMap{
+ inner: remoteMapInner{
+ byAge: make([]*remoteRecord, 0),
+ byAddr: make(map[net.Addr]int),
+ },
+ }
+ if timeout > 0 {
+ go func() {
+ for {
+ time.Sleep(timeout / 2)
+ now := time.Now()
+ m.lock.Lock()
+ m.inner.removeExpired(now, timeout)
+ m.lock.Unlock()
+ }
+ }()
+ }
+ return m
+}
+
+// SendQueue returns the send queue corresponding to addr, creating it if
+// necessary.
+func (m *RemoteMap) SendQueue(addr net.Addr) chan []byte {
+ m.lock.Lock()
+ defer m.lock.Unlock()
+ return m.inner.Lookup(addr, time.Now()).SendQueue
+}
+
+// Stash places p in the stash corresponding to addr, if the stash is not
+// already occupied. Returns true if the p was placed in the stash, false
+// otherwise.
+func (m *RemoteMap) Stash(addr net.Addr, p []byte) bool {
+ m.lock.Lock()
+ defer m.lock.Unlock()
+ select {
+ case m.inner.Lookup(addr, time.Now()).Stash <- p:
+ return true
+ default:
+ return false
+ }
+}
+
+// Unstash returns the channel that reads from the stash for addr.
+func (m *RemoteMap) Unstash(addr net.Addr) <-chan []byte {
+ m.lock.Lock()
+ defer m.lock.Unlock()
+ return m.inner.Lookup(addr, time.Now()).Stash
+}
+
+// remoteMapInner is the inner type of RemoteMap, implementing heap.Interface.
+// byAge is the backing store, a heap ordered by LastSeen time, to facilitate
+// expiring old records. byAddr is a map from addresses to heap indices, to
+// allow looking up by address. Unlike RemoteMap, remoteMapInner requires
+// external synchonization.
+type remoteMapInner struct {
+ byAge []*remoteRecord
+ byAddr map[net.Addr]int
+}
+
+// removeExpired removes all records whose LastSeen timestamp is more than
+// timeout in the past.
+func (inner *remoteMapInner) removeExpired(now time.Time, timeout time.Duration) {
+ for len(inner.byAge) > 0 && now.Sub(inner.byAge[0].LastSeen) >= timeout {
+ record := heap.Pop(inner).(*remoteRecord)
+ close(record.SendQueue)
+ }
+}
+
+// Lookup finds the existing record corresponding to addr, or creates a new
+// one if none exists yet. It updates the record's LastSeen time and returns the
+// record.
+func (inner *remoteMapInner) Lookup(addr net.Addr, now time.Time) *remoteRecord {
+ var record *remoteRecord
+ i, ok := inner.byAddr[addr]
+ if ok {
+ // Found one, update its LastSeen.
+ record = inner.byAge[i]
+ record.LastSeen = now
+ heap.Fix(inner, i)
+ } else {
+ // Not found, create a new one.
+ record = &remoteRecord{
+ Addr: addr,
+ LastSeen: now,
+ SendQueue: make(chan []byte, queueSize),
+ Stash: make(chan []byte, 1),
+ }
+ heap.Push(inner, record)
+ }
+ return record
+}
+
+// heap.Interface for remoteMapInner.
+
+func (inner *remoteMapInner) Len() int {
+ if len(inner.byAge) != len(inner.byAddr) {
+ panic("inconsistent remoteMap")
+ }
+ return len(inner.byAge)
+}
+
+func (inner *remoteMapInner) Less(i, j int) bool {
+ return inner.byAge[i].LastSeen.Before(inner.byAge[j].LastSeen)
+}
+
+func (inner *remoteMapInner) Swap(i, j int) {
+ inner.byAge[i], inner.byAge[j] = inner.byAge[j], inner.byAge[i]
+ inner.byAddr[inner.byAge[i].Addr] = i
+ inner.byAddr[inner.byAge[j].Addr] = j
+}
+
+func (inner *remoteMapInner) Push(x interface{}) {
+ record := x.(*remoteRecord)
+ if _, ok := inner.byAddr[record.Addr]; ok {
+ panic("duplicate address in remoteMap")
+ }
+ // Insert into byAddr map.
+ inner.byAddr[record.Addr] = len(inner.byAge)
+ // Insert into byAge slice.
+ inner.byAge = append(inner.byAge, record)
+}
+
+func (inner *remoteMapInner) Pop() interface{} {
+ n := len(inner.byAddr)
+ // Remove from byAge slice.
+ record := inner.byAge[n-1]
+ inner.byAge[n-1] = nil
+ inner.byAge = inner.byAge[:n-1]
+ // Remove from byAddr map.
+ delete(inner.byAddr, record.Addr)
+ return record
+}
--
To stop receiving notification emails like this one, please contact
the administrator of this repository.
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits