Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

nsqd: add support for configurable max-defer-delay for DPUB #1504

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/nsqd/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ func nsqdFlagSet(opts *nsqd.Options) *flag.FlagSet {
flagSet.Int64("max-msg-size", opts.MaxMsgSize, "maximum size of a single message in bytes")
flagSet.Duration("max-req-timeout", opts.MaxReqTimeout, "maximum requeuing timeout for a message")
flagSet.Int64("max-body-size", opts.MaxBodySize, "maximum size of a single command body")
flagSet.Duration("max-defer-delay", opts.MaxDeferDelay, "maximum duration when deferring a message")

// client overridable configuration options
flagSet.Duration("max-heartbeat-interval", opts.MaxHeartbeatInterval, "maximum client configurable duration of time between client heartbeats")
Expand Down
2 changes: 1 addition & 1 deletion nsqd/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprout
return nil, http_api.Err{400, "INVALID_DEFER"}
}
deferred = time.Duration(di) * time.Millisecond
if deferred < 0 || deferred > s.nsqd.getOpts().MaxReqTimeout {
if deferred < 0 || deferred > s.nsqd.getOpts().MaxDeferDelay {
return nil, http_api.Err{400, "INVALID_DEFER"}
}
}
Expand Down
2 changes: 2 additions & 0 deletions nsqd/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type Options struct {
MaxBodySize int64 `flag:"max-body-size"`
MaxReqTimeout time.Duration `flag:"max-req-timeout"`
ClientTimeout time.Duration
MaxDeferDelay time.Duration `flag:"max-defer-delay"`

// client overridable configuration options
MaxHeartbeatInterval time.Duration `flag:"max-heartbeat-interval"`
Expand Down Expand Up @@ -133,6 +134,7 @@ func NewOptions() *Options {
MaxBodySize: 5 * 1024 * 1024,
MaxReqTimeout: 1 * time.Hour,
ClientTimeout: 60 * time.Second,
MaxDeferDelay: 1 * time.Hour,

MaxHeartbeatInterval: 60 * time.Second,
MaxRdyCount: 2500,
Expand Down
14 changes: 7 additions & 7 deletions nsqd/protocol_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,17 +878,17 @@ func (p *protocolV2) DPUB(client *clientV2, params [][]byte) ([]byte, error) {
fmt.Sprintf("DPUB topic name %q is not valid", topicName))
}

timeoutMs, err := protocol.ByteToBase10(params[2])
delayMs, err := protocol.ByteToBase10(params[2])
if err != nil {
return nil, protocol.NewFatalClientErr(err, "E_INVALID",
fmt.Sprintf("DPUB could not parse timeout %s", params[2]))
fmt.Sprintf("DPUB could not parse defer delay %s", params[2]))
}
timeoutDuration := time.Duration(timeoutMs) * time.Millisecond
delayDuration := time.Duration(delayMs) * time.Millisecond

if timeoutDuration < 0 || timeoutDuration > p.nsqd.getOpts().MaxReqTimeout {
if delayDuration < 0 || delayDuration > p.nsqd.getOpts().MaxDeferDelay {
return nil, protocol.NewFatalClientErr(nil, "E_INVALID",
fmt.Sprintf("DPUB timeout %d out of range 0-%d",
timeoutMs, p.nsqd.getOpts().MaxReqTimeout/time.Millisecond))
fmt.Sprintf("DPUB defer delay %d out of range 0-%d",
delayMs, p.nsqd.getOpts().MaxDeferDelay/time.Millisecond))
}

bodyLen, err := readLen(client.Reader, client.lenSlice)
Expand Down Expand Up @@ -918,7 +918,7 @@ func (p *protocolV2) DPUB(client *clientV2, params [][]byte) ([]byte, error) {

topic := p.nsqd.GetTopic(topicName)
msg := NewMessage(topic.GenerateID(), messageBody)
msg.deferred = timeoutDuration
msg.deferred = delayDuration
err = topic.PutMessage(msg)
if err != nil {
return nil, protocol.NewFatalClientErr(err, "E_DPUB_FAILED", "DPUB failed "+err.Error())
Expand Down
4 changes: 2 additions & 2 deletions nsqd/protocol_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,12 +596,12 @@ func TestDPUB(t *testing.T) {
test.Equal(t, 1, int(atomic.LoadUint64(&ch.messageCount)))

// duration out of range
nsq.DeferredPublish(topicName, opts.MaxReqTimeout+100*time.Millisecond, make([]byte, 100)).WriteTo(conn)
nsq.DeferredPublish(topicName, opts.MaxDeferDelay+100*time.Millisecond, make([]byte, 100)).WriteTo(conn)
resp, _ = nsq.ReadResponse(conn)
frameType, data, _ = nsq.UnpackResponse(resp)
t.Logf("frameType: %d, data: %s", frameType, data)
test.Equal(t, frameTypeError, frameType)
test.Equal(t, "E_INVALID DPUB timeout 3600100 out of range 0-3600000", string(data))
test.Equal(t, "E_INVALID DPUB defer delay 3600100 out of range 0-3600000", string(data))
}

func TestTouch(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions nsqd/protocol_v2_unixsocket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,12 +531,12 @@ func TestUnixSocketDPUB(t *testing.T) {
test.Equal(t, 1, int(atomic.LoadUint64(&ch.messageCount)))

// duration out of range
nsq.DeferredPublish(topicName, opts.MaxReqTimeout+100*time.Millisecond, make([]byte, 100)).WriteTo(conn)
nsq.DeferredPublish(topicName, opts.MaxDeferDelay+100*time.Millisecond, make([]byte, 100)).WriteTo(conn)
resp, _ = nsq.ReadResponse(conn)
frameType, data, _ = nsq.UnpackResponse(resp)
t.Logf("frameType: %d, data: %s", frameType, data)
test.Equal(t, frameTypeError, frameType)
test.Equal(t, "E_INVALID DPUB timeout 3600100 out of range 0-3600000", string(data))
test.Equal(t, "E_INVALID DPUB defer delay 3600100 out of range 0-3600000", string(data))
}

func TestUnixSocketTouch(t *testing.T) {
Expand Down