Skip to content

Commit

Permalink
upstream: add missing pipeline maxCq opt in udp/tcp
Browse files Browse the repository at this point in the history
  • Loading branch information
IrineSistiana committed Oct 29, 2023
1 parent 0b72323 commit a07b44f
Showing 1 changed file with 9 additions and 6 deletions.
15 changes: 9 additions & 6 deletions pkg/upstream/upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ func NewUpstream(addr string, opt Opt) (_ Upstream, err error) {
switch addrURL.Scheme {
case "", "udp":
const defaultPort = 53
const maxConcurrentQueryPreConn = 4096 // Protocol limit is 65535.
host, port, err := parseDialAddr(addrUrlHost, opt.DialAddr, defaultPort)
if err != nil {
return nil, err
Expand All @@ -288,8 +289,9 @@ func NewUpstream(addr string, opt Opt) (_ Upstream, err error) {
return nil, err
}
to := transport.TraditionalDnsConnOpts{
WithLengthHeader: false,
IdleTimeout: time.Minute * 5,
WithLengthHeader: false,
IdleTimeout: time.Minute * 5,
MaxConcurrentQuery: maxConcurrentQueryPreConn,
}
return transport.NewDnsConn(to, wrapConn(c, opt.EventObserver)), nil
}
Expand All @@ -304,7 +306,7 @@ func NewUpstream(addr string, opt Opt) (_ Upstream, err error) {
return &udpWithFallback{
u: transport.NewPipelineTransport(transport.PipelineOpts{
DialContext: dialUdpPipeline,
MaxConcurrentQueryWhileDialing: 4096, // Protocol limit is 65535.
MaxConcurrentQueryWhileDialing: maxConcurrentQueryPreConn,
Logger: opt.Logger,
}),
t: transport.NewReuseConnTransport(transport.ReuseConnOpts{DialContext: dialTcpNetConn}),
Expand Down Expand Up @@ -378,8 +380,9 @@ func NewUpstream(addr string, opt Opt) (_ Upstream, err error) {

if opt.EnablePipeline {
to := transport.TraditionalDnsConnOpts{
WithLengthHeader: true,
IdleTimeout: opt.IdleTimeout,
WithLengthHeader: true,
IdleTimeout: opt.IdleTimeout,
MaxConcurrentQuery: pipelineConcurrentLimit,
}
dialDnsConn := func(ctx context.Context) (transport.DnsConn, error) {
c, err := dialNetConn(ctx)
Expand All @@ -390,7 +393,7 @@ func NewUpstream(addr string, opt Opt) (_ Upstream, err error) {
}
return transport.NewPipelineTransport(transport.PipelineOpts{
DialContext: dialDnsConn,
MaxConcurrentQueryWhileDialing: 16,
MaxConcurrentQueryWhileDialing: pipelineConcurrentLimit,
Logger: opt.Logger,
}), nil
}
Expand Down

0 comments on commit a07b44f

Please sign in to comment.