From f0a0be187acb78de74b428aee861f7e7d87b8f93 Mon Sep 17 00:00:00 2001 From: linyows Date: Mon, 16 Oct 2023 22:16:43 +0900 Subject: [PATCH 1/5] use channel instead of sync.Once --- pipe.go | 25 ++++++++++++++----------- server.go | 17 ++++++++++++++--- 2 files changed, 28 insertions(+), 14 deletions(-) diff --git a/pipe.go b/pipe.go index 54ae663..12f51cb 100644 --- a/pipe.go +++ b/pipe.go @@ -9,7 +9,6 @@ import ( "net" "regexp" "strings" - "sync" "time" ) @@ -157,8 +156,8 @@ func (p *Pipe) Do() { p.timeAtConnected = time.Now() go p.afterCommHook([]byte(fmt.Sprintf("connected to %s", p.rAddr)), onPxy) - var once sync.Once p.blocker = make(chan interface{}) + done := make(chan bool) // Sender --- packet --> Proxy go func() { @@ -166,7 +165,9 @@ func (p *Pipe) Do() { if err != nil && !errors.Is(err, net.ErrClosed) { go p.afterCommHook([]byte(fmt.Sprintf("io copy error: %s", err)), pxyToDst) } - once.Do(p.close()) + select { + case done <- true: + } }() // Proxy <--- packet -- Receiver @@ -175,8 +176,12 @@ func (p *Pipe) Do() { if err != nil && !errors.Is(err, net.ErrClosed) { go p.afterCommHook([]byte(fmt.Sprintf("io copy error: %s", err)), dstToPxy) } - once.Do(p.close()) + select { + case done <- true: + } }() + + <-done } func (p *Pipe) setSenderServerName(b []byte) { @@ -329,13 +334,11 @@ func (p *Pipe) escapeCRLF(b []byte) []byte { return bytes.ReplaceAll(b, []byte(crlf), []byte("\\r\\n")) } -func (p *Pipe) close() func() { - return func() { - defer p.afterConnHook() - defer p.afterCommHook([]byte("connections closed"), onPxy) - p.rConn.Close() - p.sConn.Close() - } +func (p *Pipe) Close() { + p.rConn.Close() + p.sConn.Close() + go p.afterCommHook([]byte("connections closed"), onPxy) + go p.afterConnHook() } func (p *Pipe) isResponseOfEHLOWithStartTLS(b []byte) bool { diff --git a/server.go b/server.go index 06980dc..951565c 100644 --- a/server.go +++ b/server.go @@ -135,23 +135,34 @@ func (s *Server) HandleConnection(conn net.Conn) { } } p.afterConnHook = func() { + sM := p.sMailAddr + rM := p.rMailAddr + if len(sM) == 0 { + sM = []byte("unknown") + } + if len(rM) == 0 { + rM = []byte("unknown") + } + now := time.Now() elapse := p.elapse() - b := fmt.Sprintf("from:%s to:%s elapse:%s", p.sMailAddr, p.rMailAddr, elapse) if s.Verbose { + b := fmt.Sprintf("from:%s to:%s elapse:%s", sM, rM, elapse) s.log.Printf("%s %s %s", p.id, onPxy, b) } for _, hook := range s.Hooks { hook.AfterConn(&AfterConnData{ ConnID: p.id, OccurredAt: now, - MailFrom: p.sMailAddr, - MailTo: p.rMailAddr, + MailFrom: sM, + MailTo: rM, Elapse: elapse, }) } } + p.Do() + p.Close() } func (s *Server) OriginalAddrDst(conn net.Conn) (*net.TCPAddr, error) { From a3d9cf7bae6c71490c102a215607a107836aaa59 Mon Sep 17 00:00:00 2001 From: linyows Date: Mon, 16 Oct 2023 22:17:22 +0900 Subject: [PATCH 2/5] wait 1 min for test --- integration_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/integration_test.go b/integration_test.go index 93106dd..144e718 100644 --- a/integration_test.go +++ b/integration_test.go @@ -5,6 +5,7 @@ import ( "fmt" "log" "testing" + "time" ) func TestIntegration(t *testing.T) { @@ -53,6 +54,7 @@ func TestIntegration(t *testing.T) { c := &SMTPClient{IP: ip, Port: warpPort} err := c.SendEmail() + time.Sleep(1 * time.Second) fmt.Printf("\nWarp Server:\n%s", &warpLog) fmt.Printf("\nSMTP Server:\n%s\n", &smtpLog) From fbc2cb6183da644ea2ad2d789959c0c94be04725 Mon Sep 17 00:00:00 2001 From: linyows Date: Mon, 16 Oct 2023 22:18:02 +0900 Subject: [PATCH 3/5] no logging after header logging --- pipe.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pipe.go b/pipe.go index 12f51cb..1a95635 100644 --- a/pipe.go +++ b/pipe.go @@ -30,6 +30,7 @@ type Pipe struct { blocker chan interface{} isWaitedStarttlsRes bool + isHeaderRemoved bool timeAtConnected time.Time timeAtDataStarting time.Time @@ -104,7 +105,9 @@ func (p *Pipe) mediateOnUpstream(b []byte, i int) ([]byte, int, bool) { p.waitForTLSConn(b, i) go p.afterCommHook(data, pxyToDst) } else { - go p.afterCommHook(p.removeMailBody(data), srcToDst) + if !p.isHeaderRemoved { + go p.afterCommHook(p.removeMailBody(data), srcToDst) + } } return b, i, false @@ -358,6 +361,7 @@ func (p *Pipe) removeMailBody(b Data) Data { if i == -1 { return b } + p.isHeaderRemoved = true return b[:i] } From 3e463e2f8de0fb8549e373c766874157b37f07b7 Mon Sep 17 00:00:00 2001 From: linyows Date: Mon, 16 Oct 2023 22:18:30 +0900 Subject: [PATCH 4/5] dont use string nil --- pipe.go | 3 --- pipe_test.go | 4 ---- 2 files changed, 7 deletions(-) diff --git a/pipe.go b/pipe.go index 1a95635..db34f81 100644 --- a/pipe.go +++ b/pipe.go @@ -75,9 +75,6 @@ var ( ) func (e Elapse) String() string { - if e < 0 { - return "nil" - } return fmt.Sprintf("%d msec", e) } diff --git a/pipe_test.go b/pipe_test.go index 3bdd4e0..2363a40 100644 --- a/pipe_test.go +++ b/pipe_test.go @@ -172,10 +172,6 @@ func TestElapseString(t *testing.T) { elapse: 2147483647, expect: "2147483647 msec", }, - { - elapse: -1, - expect: "nil", - }, } for _, v := range tests { From b3d11c22272b0df86d2f48a0199d1518fb7e3774 Mon Sep 17 00:00:00 2001 From: linyows Date: Mon, 16 Oct 2023 22:24:21 +0900 Subject: [PATCH 5/5] fix gosimple --- pipe.go | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/pipe.go b/pipe.go index db34f81..02aaa30 100644 --- a/pipe.go +++ b/pipe.go @@ -165,9 +165,7 @@ func (p *Pipe) Do() { if err != nil && !errors.Is(err, net.ErrClosed) { go p.afterCommHook([]byte(fmt.Sprintf("io copy error: %s", err)), pxyToDst) } - select { - case done <- true: - } + done <- true }() // Proxy <--- packet -- Receiver @@ -176,9 +174,7 @@ func (p *Pipe) Do() { if err != nil && !errors.Is(err, net.ErrClosed) { go p.afterCommHook([]byte(fmt.Sprintf("io copy error: %s", err)), dstToPxy) } - select { - case done <- true: - } + done <- true }() <-done