[Author Prev][Author Next][Thread Prev][Thread Next][Author Index][Thread Index]
[tor-commits] [snowflake/master] Start refactoring out a client and library
commit cce7ee64a77b89b8af6760a0e0a0682d2c9331b7
Author: Arlo Breault <arlolra@xxxxxxxxx>
Date: Tue Nov 20 22:17:24 2018 -0500
Start refactoring out a client and library
---
client/{ => lib}/interfaces.go | 2 +-
client/{client_test.go => lib/lib_test.go} | 4 +-
client/{ => lib}/peers.go | 2 +-
client/{ => lib}/rendezvous.go | 2 +-
client/lib/snowflake.go | 69 ++++++++++++++++++++++
client/{ => lib}/util.go | 50 ++++++++--------
client/{ => lib}/webrtc.go | 2 +-
client/snowflake.go | 94 +++++++-----------------------
8 files changed, 120 insertions(+), 105 deletions(-)
diff --git a/client/interfaces.go b/client/lib/interfaces.go
similarity index 98%
rename from client/interfaces.go
rename to client/lib/interfaces.go
index f18987a..f62d4f5 100644
--- a/client/interfaces.go
+++ b/client/lib/interfaces.go
@@ -1,4 +1,4 @@
-package main
+package lib
import (
"io"
diff --git a/client/client_test.go b/client/lib/lib_test.go
similarity index 99%
rename from client/client_test.go
rename to client/lib/lib_test.go
index cfc8cbf..5a9a2e5 100644
--- a/client/client_test.go
+++ b/client/lib/lib_test.go
@@ -1,4 +1,4 @@
-package main
+package lib
import (
"bytes"
@@ -179,7 +179,7 @@ func TestSnowflakeClient(t *testing.T) {
So(socks.rejected, ShouldEqual, false)
snowflakes.toRelease = nil
- handler(socks, snowflakes)
+ Handler(socks, snowflakes)
So(socks.rejected, ShouldEqual, true)
})
diff --git a/client/peers.go b/client/lib/peers.go
similarity index 99%
rename from client/peers.go
rename to client/lib/peers.go
index 3187f09..21411ed 100644
--- a/client/peers.go
+++ b/client/lib/peers.go
@@ -1,4 +1,4 @@
-package main
+package lib
import (
"container/list"
diff --git a/client/rendezvous.go b/client/lib/rendezvous.go
similarity index 99%
rename from client/rendezvous.go
rename to client/lib/rendezvous.go
index cab7f5a..7436f54 100644
--- a/client/rendezvous.go
+++ b/client/lib/rendezvous.go
@@ -9,7 +9,7 @@
//
// - Manual copy-paste signaling. User must create a signaling pipe.
// (The flags in torrc-manual allow this)
-package main
+package lib
import (
"bufio"
diff --git a/client/lib/snowflake.go b/client/lib/snowflake.go
new file mode 100644
index 0000000..900af88
--- /dev/null
+++ b/client/lib/snowflake.go
@@ -0,0 +1,69 @@
+package lib
+
+import (
+ "errors"
+ "io"
+ "log"
+ "net"
+ "sync"
+)
+
+const (
+ ReconnectTimeout = 10
+ SnowflakeTimeout = 30
+)
+
+// When a connection handler starts, +1 is written to this channel; when it
+// ends, -1 is written.
+var HandlerChan = make(chan int)
+
+// Given an accepted SOCKS connection, establish a WebRTC connection to the
+// remote peer and exchange traffic.
+func Handler(socks SocksConnector, snowflakes SnowflakeCollector) error {
+ HandlerChan <- 1
+ defer func() {
+ HandlerChan <- -1
+ }()
+ // Obtain an available WebRTC remote. May block.
+ snowflake := snowflakes.Pop()
+ if nil == snowflake {
+ socks.Reject()
+ return errors.New("handler: Received invalid Snowflake")
+ }
+ defer socks.Close()
+ defer snowflake.Close()
+ log.Println("---- Handler: snowflake assigned ----")
+ err := socks.Grant(&net.TCPAddr{IP: net.IPv4zero, Port: 0})
+ if err != nil {
+ return err
+ }
+
+ go func() {
+ // When WebRTC resets, close the SOCKS connection too.
+ snowflake.WaitForReset()
+ socks.Close()
+ }()
+
+ // Begin exchanging data. Either WebRTC or localhost SOCKS will close first.
+ // In eithercase, this closes the handler and induces a new handler.
+ copyLoop(socks, snowflake)
+ log.Println("---- Handler: closed ---")
+ return nil
+}
+
+// Exchanges bytes between two ReadWriters.
+// (In this case, between a SOCKS and WebRTC connection.)
+func copyLoop(a, b io.ReadWriter) {
+ var wg sync.WaitGroup
+ wg.Add(2)
+ go func() {
+ io.Copy(b, a)
+ wg.Done()
+ }()
+ go func() {
+ io.Copy(a, b)
+ wg.Done()
+ }()
+ wg.Wait()
+ log.Println("copy loop ended")
+}
diff --git a/client/util.go b/client/lib/util.go
similarity index 69%
rename from client/util.go
rename to client/lib/util.go
index 20817c3..028fb1c 100644
--- a/client/util.go
+++ b/client/lib/util.go
@@ -1,4 +1,4 @@
-package main
+package lib
import (
"fmt"
@@ -34,46 +34,46 @@ func (b BytesNullLogger) AddInbound(amount int) {}
// BytesSyncLogger uses channels to safely log from multiple sources with output
// occuring at reasonable intervals.
type BytesSyncLogger struct {
- outboundChan chan int
- inboundChan chan int
- outbound int
- inbound int
- outEvents int
- inEvents int
- isLogging bool
+ OutboundChan chan int
+ InboundChan chan int
+ Outbound int
+ Inbound int
+ OutEvents int
+ InEvents int
+ IsLogging bool
}
func (b *BytesSyncLogger) Log() {
- b.isLogging = true
+ b.IsLogging = true
var amount int
output := func() {
log.Printf("Traffic Bytes (in|out): %d | %d -- (%d OnMessages, %d Sends)",
- b.inbound, b.outbound, b.inEvents, b.outEvents)
- b.outbound = 0
- b.outEvents = 0
- b.inbound = 0
- b.inEvents = 0
+ b.Inbound, b.Outbound, b.InEvents, b.OutEvents)
+ b.Outbound = 0
+ b.OutEvents = 0
+ b.Inbound = 0
+ b.InEvents = 0
}
last := time.Now()
for {
select {
- case amount = <-b.outboundChan:
- b.outbound += amount
- b.outEvents++
+ case amount = <-b.OutboundChan:
+ b.Outbound += amount
+ b.OutEvents++
last := time.Now()
if time.Since(last) > time.Second*LogTimeInterval {
last = time.Now()
output()
}
- case amount = <-b.inboundChan:
- b.inbound += amount
- b.inEvents++
+ case amount = <-b.InboundChan:
+ b.Inbound += amount
+ b.InEvents++
if time.Since(last) > time.Second*LogTimeInterval {
last = time.Now()
output()
}
case <-time.After(time.Second * LogTimeInterval):
- if b.inEvents > 0 || b.outEvents > 0 {
+ if b.InEvents > 0 || b.OutEvents > 0 {
output()
}
}
@@ -81,15 +81,15 @@ func (b *BytesSyncLogger) Log() {
}
func (b *BytesSyncLogger) AddOutbound(amount int) {
- if !b.isLogging {
+ if !b.IsLogging {
return
}
- b.outboundChan <- amount
+ b.OutboundChan <- amount
}
func (b *BytesSyncLogger) AddInbound(amount int) {
- if !b.isLogging {
+ if !b.IsLogging {
return
}
- b.inboundChan <- amount
+ b.InboundChan <- amount
}
diff --git a/client/webrtc.go b/client/lib/webrtc.go
similarity index 99%
rename from client/webrtc.go
rename to client/lib/webrtc.go
index 8c7cb4c..e71a407 100644
--- a/client/webrtc.go
+++ b/client/lib/webrtc.go
@@ -1,4 +1,4 @@
-package main
+package lib
import (
"bytes"
diff --git a/client/snowflake.go b/client/snowflake.go
index a9841be..b2dea5c 100644
--- a/client/snowflake.go
+++ b/client/snowflake.go
@@ -2,7 +2,6 @@
package main
import (
- "errors"
"flag"
"io"
"io/ioutil"
@@ -12,36 +11,30 @@ import (
"os/signal"
"path/filepath"
"strings"
- "sync"
"syscall"
"time"
"git.torproject.org/pluggable-transports/goptlib.git"
+ sf "git.torproject.org/pluggable-transports/snowflake.git/client/lib"
"github.com/keroserene/go-webrtc"
)
const (
- ReconnectTimeout = 10
DefaultSnowflakeCapacity = 1
- SnowflakeTimeout = 30
)
-// When a connection handler starts, +1 is written to this channel; when it
-// ends, -1 is written.
-var handlerChan = make(chan int)
-
// Maintain |SnowflakeCapacity| number of available WebRTC connections, to
// transfer to the Tor SOCKS handler when needed.
-func ConnectLoop(snowflakes SnowflakeCollector) {
+func ConnectLoop(snowflakes sf.SnowflakeCollector) {
for {
// Check if ending is necessary.
_, err := snowflakes.Collect()
if nil != err {
log.Println("WebRTC:", err,
- " Retrying in", ReconnectTimeout, "seconds...")
+ " Retrying in", sf.ReconnectTimeout, "seconds...")
}
select {
- case <-time.After(time.Second * ReconnectTimeout):
+ case <-time.After(time.Second * sf.ReconnectTimeout):
continue
case <-snowflakes.Melted():
log.Println("ConnectLoop: stopped.")
@@ -51,7 +44,7 @@ func ConnectLoop(snowflakes SnowflakeCollector) {
}
// Accept local SOCKS connections and pass them to the handler.
-func socksAcceptLoop(ln *pt.SocksListener, snowflakes SnowflakeCollector) error {
+func socksAcceptLoop(ln *pt.SocksListener, snowflakes sf.SnowflakeCollector) error {
defer ln.Close()
log.Println("Started SOCKS listener.")
for {
@@ -64,64 +57,13 @@ func socksAcceptLoop(ln *pt.SocksListener, snowflakes SnowflakeCollector) error
return err
}
log.Println("SOCKS accepted: ", conn.Req)
- err = handler(conn, snowflakes)
+ err = sf.Handler(conn, snowflakes)
if err != nil {
log.Printf("handler error: %s", err)
}
}
}
-// Given an accepted SOCKS connection, establish a WebRTC connection to the
-// remote peer and exchange traffic.
-func handler(socks SocksConnector, snowflakes SnowflakeCollector) error {
- handlerChan <- 1
- defer func() {
- handlerChan <- -1
- }()
- // Obtain an available WebRTC remote. May block.
- snowflake := snowflakes.Pop()
- if nil == snowflake {
- socks.Reject()
- return errors.New("handler: Received invalid Snowflake")
- }
- defer socks.Close()
- defer snowflake.Close()
- log.Println("---- Handler: snowflake assigned ----")
- err := socks.Grant(&net.TCPAddr{IP: net.IPv4zero, Port: 0})
- if err != nil {
- return err
- }
-
- go func() {
- // When WebRTC resets, close the SOCKS connection too.
- snowflake.WaitForReset()
- socks.Close()
- }()
-
- // Begin exchanging data. Either WebRTC or localhost SOCKS will close first.
- // In eithercase, this closes the handler and induces a new handler.
- copyLoop(socks, snowflake)
- log.Println("---- Handler: closed ---")
- return nil
-}
-
-// Exchanges bytes between two ReadWriters.
-// (In this case, between a SOCKS and WebRTC connection.)
-func copyLoop(a, b io.ReadWriter) {
- var wg sync.WaitGroup
- wg.Add(2)
- go func() {
- io.Copy(b, a)
- wg.Done()
- }()
- go func() {
- io.Copy(a, b)
- wg.Done()
- }()
- wg.Wait()
- log.Println("copy loop ended")
-}
-
func main() {
iceServersCommas := flag.String("ice", "", "comma-separated list of ICE servers")
brokerURL := flag.String("url", "", "URL of signaling broker")
@@ -156,30 +98,34 @@ func main() {
log.Println("\n\n\n --- Starting Snowflake Client ---")
- var iceServers IceServerList
+ var iceServers sf.IceServerList
if len(strings.TrimSpace(*iceServersCommas)) > 0 {
option := webrtc.OptionIceServer(*iceServersCommas)
iceServers = append(iceServers, option)
}
// Prepare to collect remote WebRTC peers.
- snowflakes := NewPeers(*max)
+ snowflakes := sf.NewPeers(*max)
if "" != *brokerURL {
// Use potentially domain-fronting broker to rendezvous.
- broker := NewBrokerChannel(*brokerURL, *frontDomain, CreateBrokerTransport())
- snowflakes.Tongue = NewWebRTCDialer(broker, iceServers)
+ broker := sf.NewBrokerChannel(*brokerURL, *frontDomain, sf.CreateBrokerTransport())
+ snowflakes.Tongue = sf.NewWebRTCDialer(broker, iceServers)
} else {
// Otherwise, use manual copy and pasting of SDP messages.
- snowflakes.Tongue = NewCopyPasteDialer(iceServers)
+ snowflakes.Tongue = sf.NewCopyPasteDialer(iceServers)
}
if nil == snowflakes.Tongue {
log.Fatal("Unable to prepare rendezvous method.")
return
}
// Use a real logger to periodically output how much traffic is happening.
- snowflakes.BytesLogger = &BytesSyncLogger{
- inboundChan: make(chan int, 5), outboundChan: make(chan int, 5),
- inbound: 0, outbound: 0, inEvents: 0, outEvents: 0,
+ snowflakes.BytesLogger = &sf.BytesSyncLogger{
+ InboundChan: make(chan int, 5),
+ OutboundChan: make(chan int, 5),
+ Inbound: 0,
+ Outbound: 0,
+ InEvents: 0,
+ OutEvents: 0,
}
go snowflakes.BytesLogger.Log()
@@ -232,7 +178,7 @@ func main() {
sig = nil
for sig == nil {
select {
- case n := <-handlerChan:
+ case n := <-sf.HandlerChan:
numHandlers += n
case sig = <-sigChan:
}
@@ -244,7 +190,7 @@ func main() {
}
snowflakes.End()
for numHandlers > 0 {
- numHandlers += <-handlerChan
+ numHandlers += <-sf.HandlerChan
}
log.Println("snowflake is done.")
}
_______________________________________________
tor-commits mailing list
tor-commits@xxxxxxxxxxxxxxxxxxxx
https://lists.torproject.org/cgi-bin/mailman/listinfo/tor-commits