Skip to content

Commit

Permalink
Merge pull request #30 from linyows/buffersize
Browse files Browse the repository at this point in the history
Default buffer size is 10MB for mail copy
  • Loading branch information
linyows authored Oct 15, 2023
2 parents c58e518 + eb9de1e commit 9523a8e
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 27 deletions.
10 changes: 6 additions & 4 deletions cmd/warp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ var (
port = flag.Int("port", 0, "listen port")
oip = flag.String("outbound-ip", "0.0.0.0", "outbound ip")
storage = flag.String("storage", "", "sspecify extended storage from: mysql, sqlite, file")
maxSize = flag.Int("message-size-limit", 10240000, "The maximal size in bytes of a message")
verbose = flag.Bool("verbose", false, "verbose logging")
verFlag = flag.Bool("version", false, "show build version")
)
Expand All @@ -32,10 +33,11 @@ func main() {
}

w := &warp.Server{
Addr: *ip,
Port: *port,
OutboundAddr: *oip,
Verbose: *verbose,
Addr: *ip,
Port: *port,
OutboundAddr: *oip,
Verbose: *verbose,
MessageSizeLimit: *maxSize,
}

switch *storage {
Expand Down
29 changes: 16 additions & 13 deletions pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ import (
)

type Pipe struct {
id string
sConn net.Conn
rConn net.Conn
id string
sConn net.Conn
rConn net.Conn
bufferSize int

rAddr *net.TCPAddr
sMailAddr []byte
Expand All @@ -29,6 +30,8 @@ type Pipe struct {
locked bool
blocker chan interface{}

isWaitedStarttlsRes bool

timeAtConnected time.Time
timeAtDataStarting time.Time

Expand All @@ -46,7 +49,6 @@ const (
mailFromPrefix string = "MAIL FROM:<"
rcptToPrefix string = "RCPT TO:<"
mailRegex string = `[A-z0-9.!#$%&'*+\-/=?^_\{|}~]{1,64}@[A-z0-9.\-]{1,255}`
bufferSize int = 32 * 1024
crlf string = "\r\n"
mailHeaderEnd string = crlf + crlf

Expand Down Expand Up @@ -91,6 +93,7 @@ func (p *Pipe) mediateOnUpstream(b []byte, i int) ([]byte, int, bool) {
if !p.tls && p.readytls {
p.locked = true
er := p.starttls()
p.isWaitedStarttlsRes = true
if er != nil {
go p.afterCommHook([]byte(fmt.Sprintf("starttls error: %s", er.Error())), pxyToDst)
}
Expand Down Expand Up @@ -123,15 +126,15 @@ func (p *Pipe) mediateOnDownstream(b []byte, i int) ([]byte, int, bool) {
}
}

// time before email input
p.setTimeAtDataStarting(b)

// remove buffering ready response
if p.tls && !p.readytls && p.locked {
// continue
// remove buffering "220 2.0.0 Ready to start TLS" response
if p.isWaitedStarttlsRes {
p.isWaitedStarttlsRes = false
return b, i, true
}

// time before email input
p.setTimeAtDataStarting(b)

if p.isResponseOfEHLOWithoutStartTLS(b) {
go p.afterCommHook(data, pxyToSrc)
} else {
Expand Down Expand Up @@ -213,7 +216,7 @@ func (p *Pipe) dst(d Flow) net.Conn {
}

func (p *Pipe) copy(dr Flow, fn Mediator) (written int64, err error) {
size := bufferSize
size := p.bufferSize
src, ok := p.src(dr).(io.Reader)
if !ok {
err = fmt.Errorf("io.Reader cast error")
Expand All @@ -226,7 +229,7 @@ func (p *Pipe) copy(dr Flow, fn Mediator) (written int64, err error) {
}
go p.afterCommHook([]byte(fmt.Sprintf("io.Reader size: %d", size)), onPxy)
}
buf := make([]byte, bufferSize)
buf := make([]byte, p.bufferSize)

for {
var isContinue bool
Expand Down Expand Up @@ -284,7 +287,7 @@ func (p *Pipe) starttls() error {
}

func (p *Pipe) readReceiverConn() error {
buf := make([]byte, bufferSize)
buf := make([]byte, 64*1024)
i, err := p.rConn.Read(buf)
if err != nil {
return err
Expand Down
26 changes: 16 additions & 10 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ import (
const SO_ORIGINAL_DST = 80

type Server struct {
Addr string
Port int
Hooks []Hook
OutboundAddr string
Verbose bool
log *log.Logger
Addr string
Port int
Hooks []Hook
OutboundAddr string
Verbose bool
log *log.Logger
MessageSizeLimit int
}

// These are global variables for integration test.
Expand All @@ -30,6 +31,10 @@ func (s *Server) Start() error {
if s.log == nil {
s.log = log.New(os.Stderr, "", log.Ldate|log.Ltime|log.Lmicroseconds)
}
if s.MessageSizeLimit == 0 {
// default is around 10MB (https://www.postfix.org/postconf.5.html)
s.MessageSizeLimit = 10240000
}

pl := &Plugins{}
if err := pl.load(); err != nil {
Expand Down Expand Up @@ -109,10 +114,11 @@ func (s *Server) HandleConnection(conn net.Conn) {
}

p := &Pipe{
id: uuid,
sConn: conn,
rConn: dstConn,
rAddr: raddr,
id: uuid,
sConn: conn,
rConn: dstConn,
rAddr: raddr,
bufferSize: s.MessageSizeLimit,
}
p.afterCommHook = func(b Data, to Direction) {
now := time.Now()
Expand Down

0 comments on commit 9523a8e

Please sign in to comment.