Skip to content

Commit

Permalink
refactor: make the flows testable
Browse files Browse the repository at this point in the history
  • Loading branch information
massix committed May 15, 2024
1 parent fd22bb5 commit d280dab
Show file tree
Hide file tree
Showing 3 changed files with 406 additions and 109 deletions.
113 changes: 4 additions & 109 deletions cmd/protrans/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ import (
"github.com/hekmon/transmissionrpc"
natpmp "github.com/jackpal/go-nat-pmp"
"github.com/massix/protrans/pkg/config"
"github.com/massix/protrans/pkg/nat"
"github.com/massix/protrans/pkg/transmission"
"github.com/massix/protrans/pkg/flow"
"github.com/sirupsen/logrus"
)

Expand All @@ -24,110 +23,6 @@ func dumpTransmissionConfiguration(conf *config.ProtransConfiguration) string {
return fmt.Sprintf("\tHost: %s\n\tPort: %d\n\tUsername: %s\n", conf.Transmission.Host, conf.Transmission.Port, conf.Transmission.Username)
}

func fetchExternalIP(natClient nat.NatClientI, wg *sync.WaitGroup, ipChan chan<- string, done chan os.Signal) {
running := true

for running {
ip, err := nat.GetExternalIP(natClient)
if err != nil {
logrus.Warn(err)
} else {
logrus.Debugf("Retrieved IP: %s, sending to channel", ip)
ipChan <- ip
}

select {
case s := <-done:
logrus.Info("Gracefully stopping gateway detector")
running = false
done <- s // Make sure everyone is leaving
case <-time.After(30 * time.Second):
logrus.Debug("No signals received in 30 seconds, refreshing IP...")
}
}

wg.Done()
}

func mapPorts(natClient nat.NatClientI, portLifetime int, wg *sync.WaitGroup, ipChan <-chan string, portChan chan<- int, done chan os.Signal) {
running := true

for running {
select {
case ip := <-ipChan:
var mappedTcpPort int
var mappedUdpPort int
var err error
logrus.Debugf("Mapping port for external IP: %s", ip)

mappedTcpPort, err = nat.AddPortMapping(natClient, "tcp", portLifetime)
if err != nil {
logrus.Errorf("Unable to create port mapping: %s", err)
continue
}

mappedUdpPort, err = nat.AddPortMapping(natClient, "udp", portLifetime)
if err != nil {
logrus.Errorf("Unable to create port mapping: %s", err)
continue
}

if mappedTcpPort != mappedUdpPort {
logrus.Errorf("Ports differ in range: (tcp %d) and (udp %d)", mappedTcpPort, mappedUdpPort)
continue
}

logrus.Debugf("Sending port %d to channel", mappedTcpPort)
portChan <- mappedTcpPort
case s := <-done:
logrus.Info("Gracefully stopping port mapper")
running = false
done <- s
}
}

wg.Done()
}

func transmissionArgSetter(transmissionClient transmission.TransmissionClient, wg *sync.WaitGroup, portChan <-chan int, done chan os.Signal) {
running := true

for running {
select {
case mappedPort := <-portChan:
if transmission.IsConnected(transmissionClient) {
logrus.Debug("Transmission is connected")
if transmission.IsPortOpen(transmissionClient) {
logrus.Debug("Port is already set in Transmission, nothing to do")
continue
}

if err := transmission.SetPeerPort(transmissionClient, mappedPort); err != nil {
logrus.Error(err)
continue
}

logrus.Debug("Port set!")
time.Sleep(3 * time.Second)

if transmission.IsPortOpen(transmissionClient) {
logrus.Infof("Successfully set port %d to Transmission and checked network connectivity", mappedPort)
} else {
logrus.Warnf("Set port %d to Transmission but was unable to check connectivity (this may be normal, it might take some time before the NAT is recognised)", mappedPort)
}
} else {
logrus.Warnf("Should set port: %d but Transmission is not connected", mappedPort)
}
case s := <-done:
logrus.Info("Gracefully stopping Transmission Client")
running = false
done <- s
}
}

wg.Done()
}

func main() {
var configurationPath string

Expand Down Expand Up @@ -174,13 +69,13 @@ func main() {
wg.Add(3)

// This goroutine will constantly check the external IP address and send it to a channel
go fetchExternalIP(natClient, &wg, ipChan, done)
go flow.FetchExternalIP(natClient, &wg, ipChan, done)

// This goroutine will receive the IP address and create a port mapping which will be sent to another channel
go mapPorts(natClient, int(conf.Nat.PortLifetime), &wg, ipChan, portChan, done)
go flow.MapPorts(natClient, int(conf.Nat.PortLifetime), &wg, ipChan, portChan, done)

// This goroutine will receive the mapped port and send it to Transmission if connected
go transmissionArgSetter(transmissionClient, &wg, portChan, done)
go flow.TransmissionArgSetter(transmissionClient, &wg, portChan, done)

wg.Wait()
}
115 changes: 115 additions & 0 deletions pkg/flow/flow.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package flow

import (
"os"
"sync"
"time"

"github.com/massix/protrans/pkg/nat"
"github.com/massix/protrans/pkg/transmission"
"github.com/sirupsen/logrus"
)

func FetchExternalIP(natClient nat.NatClientI, wg *sync.WaitGroup, ipChan chan<- string, done chan os.Signal) {
running := true

for running {
ip, err := nat.GetExternalIP(natClient)
if err != nil {
logrus.Warn(err)
} else {
logrus.Debugf("Retrieved IP: %s, sending to channel", ip)
ipChan <- ip
}

select {
case s := <-done:
logrus.Info("Gracefully stopping gateway detector")
running = false
done <- s // Make sure everyone is leaving
case <-time.After(30 * time.Second):
logrus.Debug("No signals received in 30 seconds, refreshing IP...")
}
}

wg.Done()
}

func MapPorts(natClient nat.NatClientI, portLifetime int, wg *sync.WaitGroup, ipChan <-chan string, portChan chan<- int, done chan os.Signal) {
running := true

for running {
select {
case ip := <-ipChan:
var mappedTcpPort int
var mappedUdpPort int
var err error
logrus.Debugf("Mapping port for external IP: %s", ip)

mappedTcpPort, err = nat.AddPortMapping(natClient, "tcp", portLifetime)
if err != nil {
logrus.Errorf("Unable to create port mapping: %s", err)
continue
}

mappedUdpPort, err = nat.AddPortMapping(natClient, "udp", portLifetime)
if err != nil {
logrus.Errorf("Unable to create port mapping: %s", err)
continue
}

if mappedTcpPort != mappedUdpPort {
logrus.Errorf("Ports differ in range: (tcp %d) and (udp %d)", mappedTcpPort, mappedUdpPort)
continue
}

logrus.Debugf("Sending port %d to channel", mappedTcpPort)
portChan <- mappedTcpPort
case s := <-done:
logrus.Info("Gracefully stopping port mapper")
running = false
done <- s
}
}

wg.Done()
}

func TransmissionArgSetter(transmissionClient transmission.TransmissionClient, wg *sync.WaitGroup, portChan <-chan int, done chan os.Signal) {
running := true

for running {
select {
case mappedPort := <-portChan:
if transmission.IsConnected(transmissionClient) {
logrus.Debug("Transmission is connected")
if transmission.IsPortOpen(transmissionClient) {
logrus.Debug("Port is already set in Transmission, nothing to do")
continue
}

if err := transmission.SetPeerPort(transmissionClient, mappedPort); err != nil {
logrus.Error(err)
continue
}

logrus.Debug("Port set!")
time.Sleep(3 * time.Second)

if transmission.IsPortOpen(transmissionClient) {
logrus.Infof("Successfully set port %d to Transmission and checked network connectivity", mappedPort)
} else {
logrus.Warnf("Set port %d to Transmission but was unable to check connectivity (this may be normal, it might take some time before the NAT is recognised)", mappedPort)
}
} else {
logrus.Warnf("Should set port: %d but Transmission is not connected", mappedPort)
}
case s := <-done:
logrus.Info("Gracefully stopping Transmission Client")
running = false
done <- s
}
}

wg.Done()
}
Loading

0 comments on commit d280dab

Please sign in to comment.