Skip to content

Commit

Permalink
Merge pull request #31 from linyows/wait-goroutines
Browse files Browse the repository at this point in the history
Wait goroutines with channel
  • Loading branch information
linyows authored Oct 16, 2023
2 parents 9523a8e + b3d11c2 commit 23b23cf
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 22 deletions.
2 changes: 2 additions & 0 deletions integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"log"
"testing"
"time"
)

func TestIntegration(t *testing.T) {
Expand Down Expand Up @@ -53,6 +54,7 @@ func TestIntegration(t *testing.T) {

c := &SMTPClient{IP: ip, Port: warpPort}
err := c.SendEmail()
time.Sleep(1 * time.Second)

fmt.Printf("\nWarp Server:\n%s", &warpLog)
fmt.Printf("\nSMTP Server:\n%s\n", &smtpLog)
Expand Down
30 changes: 15 additions & 15 deletions pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"net"
"regexp"
"strings"
"sync"
"time"
)

Expand All @@ -31,6 +30,7 @@ type Pipe struct {
blocker chan interface{}

isWaitedStarttlsRes bool
isHeaderRemoved bool

timeAtConnected time.Time
timeAtDataStarting time.Time
Expand Down Expand Up @@ -75,9 +75,6 @@ var (
)

func (e Elapse) String() string {
if e < 0 {
return "nil"
}
return fmt.Sprintf("%d msec", e)
}

Expand Down Expand Up @@ -105,7 +102,9 @@ func (p *Pipe) mediateOnUpstream(b []byte, i int) ([]byte, int, bool) {
p.waitForTLSConn(b, i)
go p.afterCommHook(data, pxyToDst)
} else {
go p.afterCommHook(p.removeMailBody(data), srcToDst)
if !p.isHeaderRemoved {
go p.afterCommHook(p.removeMailBody(data), srcToDst)
}
}

return b, i, false
Expand Down Expand Up @@ -157,16 +156,16 @@ func (p *Pipe) Do() {
p.timeAtConnected = time.Now()
go p.afterCommHook([]byte(fmt.Sprintf("connected to %s", p.rAddr)), onPxy)

var once sync.Once
p.blocker = make(chan interface{})
done := make(chan bool)

// Sender --- packet --> Proxy
go func() {
_, err := p.copy(upstream, p.mediateOnUpstream)
if err != nil && !errors.Is(err, net.ErrClosed) {
go p.afterCommHook([]byte(fmt.Sprintf("io copy error: %s", err)), pxyToDst)
}
once.Do(p.close())
done <- true
}()

// Proxy <--- packet -- Receiver
Expand All @@ -175,8 +174,10 @@ func (p *Pipe) Do() {
if err != nil && !errors.Is(err, net.ErrClosed) {
go p.afterCommHook([]byte(fmt.Sprintf("io copy error: %s", err)), dstToPxy)
}
once.Do(p.close())
done <- true
}()

<-done
}

func (p *Pipe) setSenderServerName(b []byte) {
Expand Down Expand Up @@ -329,13 +330,11 @@ func (p *Pipe) escapeCRLF(b []byte) []byte {
return bytes.ReplaceAll(b, []byte(crlf), []byte("\\r\\n"))
}

func (p *Pipe) close() func() {
return func() {
defer p.afterConnHook()
defer p.afterCommHook([]byte("connections closed"), onPxy)
p.rConn.Close()
p.sConn.Close()
}
func (p *Pipe) Close() {
p.rConn.Close()
p.sConn.Close()
go p.afterCommHook([]byte("connections closed"), onPxy)
go p.afterConnHook()
}

func (p *Pipe) isResponseOfEHLOWithStartTLS(b []byte) bool {
Expand All @@ -355,6 +354,7 @@ func (p *Pipe) removeMailBody(b Data) Data {
if i == -1 {
return b
}
p.isHeaderRemoved = true
return b[:i]
}

Expand Down
4 changes: 0 additions & 4 deletions pipe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,6 @@ func TestElapseString(t *testing.T) {
elapse: 2147483647,
expect: "2147483647 msec",
},
{
elapse: -1,
expect: "nil",
},
}

for _, v := range tests {
Expand Down
17 changes: 14 additions & 3 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,23 +135,34 @@ func (s *Server) HandleConnection(conn net.Conn) {
}
}
p.afterConnHook = func() {
sM := p.sMailAddr
rM := p.rMailAddr
if len(sM) == 0 {
sM = []byte("unknown")
}
if len(rM) == 0 {
rM = []byte("unknown")
}

now := time.Now()
elapse := p.elapse()
b := fmt.Sprintf("from:%s to:%s elapse:%s", p.sMailAddr, p.rMailAddr, elapse)
if s.Verbose {
b := fmt.Sprintf("from:%s to:%s elapse:%s", sM, rM, elapse)
s.log.Printf("%s %s %s", p.id, onPxy, b)
}
for _, hook := range s.Hooks {
hook.AfterConn(&AfterConnData{
ConnID: p.id,
OccurredAt: now,
MailFrom: p.sMailAddr,
MailTo: p.rMailAddr,
MailFrom: sM,
MailTo: rM,
Elapse: elapse,
})
}
}

p.Do()
p.Close()
}

func (s *Server) OriginalAddrDst(conn net.Conn) (*net.TCPAddr, error) {
Expand Down

0 comments on commit 23b23cf

Please sign in to comment.