-
Notifications
You must be signed in to change notification settings - Fork 4.9k
x-pack/filebeat/input/awss3: allow a grace time on shutdown #43369
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
944fa21
to
6e73b10
Compare
Pinging @elastic/obs-ds-hosted-services (Team:obs-ds-hosted-services) |
Pinging @elastic/security-service-integrations (Team:Security-Service Integrations) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Suggest to merge after someone more familiar with input approves.
@Kavindu-Dodan could you help with the review please ? |
@@ -226,19 +270,23 @@ func (w *sqsWorker) processMessage(ctx context.Context, msg types.Message) { | |||
w.client.Publish(e) | |||
publishCount++ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really, this var should not be needed since the implementation already returns the sum of to-publish events in result.eventCount
, but it seems that this is not implemented in testing due to the approach taken in mock testing.
func cancelWithGrace(parent context.Context, timeout time.Duration) (context.Context, context.CancelFunc) { | ||
ctx, cancel := context.WithCancel(context.WithoutCancel(parent)) | ||
stop := context.AfterFunc(parent, func() { | ||
time.AfterFunc(timeout, cancel) | ||
}) | ||
return ctx, func() { | ||
stop() | ||
cancel() | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it'd be good to have a unit test for this function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good for me :)
case <-time.After(tooLong): | ||
t.Fatal("parent context failed to cancel within timeout") | ||
case <-parentCtx.Done(): | ||
parentCancelled = time.Now() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[NIT]
parent
is already cancelled, you don't need the wait here. If <-parentCtx.Done()
does not get selected, you could fail the test immediately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it's here for symmetry.
This pull request is now in conflicts. Could you fix it? 🙏
|
We may have SQS message requests in flight that are being serviced when the input's context is cancelled. This allows the user to specify a time to wait after the context is cancelled before the processing and publication work is terminated. The approach is a little blunt; it merely waits for the grace period after the requester has been stopped to allow for incoming messages to be processed and published. Ideally, we would finish as soon as the set of pending requests had been received and published, cut short after the grace time. However, the current structure of data flow does not lend itself sharing that information between parts of the input, so a minimal change is made.
This fixes handling of the case where graceCtx has not yet been cancelled, but ctx has. This would leave the first select in a blocked state when and so starve the second select, resulting in failure to work through the pending messages. This also adds an opportunity to exit the loop early when all the pending messages have been handled.
We may have SQS message requests in flight that are being serviced when the input's context is cancelled. This allows the user to specify a time to wait after the context is cancelled before the processing and publication work is terminated. We finish as soon as the set of pending requests had been received and published, cut short after the grace time. (cherry picked from commit c0d439c) # Conflicts: # docs/reference/filebeat/filebeat-input-aws-s3.md
We may have SQS message requests in flight that are being serviced when the input's context is cancelled. This allows the user to specify a time to wait after the context is cancelled before the processing and publication work is terminated. We finish as soon as the set of pending requests had been received and published, cut short after the grace time. (cherry picked from commit c0d439c) # Conflicts: # docs/reference/filebeat/filebeat-input-aws-s3.md
We may have SQS message requests in flight that are being serviced when the input's context is cancelled. This allows the user to specify a time to wait after the context is cancelled before the processing and publication work is terminated. We finish as soon as the set of pending requests had been received and published, cut short after the grace time. (cherry picked from commit c0d439c) # Conflicts: # docs/reference/filebeat/filebeat-input-aws-s3.md
We may have SQS message requests in flight that are being serviced when the input's context is cancelled. This allows the user to specify a time to wait after the context is cancelled before the processing and publication work is terminated. We finish as soon as the set of pending requests had been received and published, cut short after the grace time. (cherry picked from commit c0d439c)
…e on shutdown (#43529) * x-pack/filebeat/input/awss3: allow a grace time on shutdown (#43369) We may have SQS message requests in flight that are being serviced when the input's context is cancelled. This allows the user to specify a time to wait after the context is cancelled before the processing and publication work is terminated. We finish as soon as the set of pending requests had been received and published, cut short after the grace time. (cherry picked from commit c0d439c) # Conflicts: # docs/reference/filebeat/filebeat-input-aws-s3.md * resolve conflicts * remove irrelevant changelog entry * apply doc changes to asciidoc and remove markdown addition --------- Co-authored-by: Dan Kortschak <[email protected]>
…me on shutdown (#43530) * x-pack/filebeat/input/awss3: allow a grace time on shutdown (#43369) We may have SQS message requests in flight that are being serviced when the input's context is cancelled. This allows the user to specify a time to wait after the context is cancelled before the processing and publication work is terminated. We finish as soon as the set of pending requests had been received and published, cut short after the grace time. (cherry picked from commit c0d439c) # Conflicts: # docs/reference/filebeat/filebeat-input-aws-s3.md * resolve conflicts * apply doc changes to asciidoc and remove markdown addition --------- Co-authored-by: Dan Kortschak <[email protected]>
…me on shutdown (#43531) * x-pack/filebeat/input/awss3: allow a grace time on shutdown (#43369) We may have SQS message requests in flight that are being serviced when the input's context is cancelled. This allows the user to specify a time to wait after the context is cancelled before the processing and publication work is terminated. We finish as soon as the set of pending requests had been received and published, cut short after the grace time. (cherry picked from commit c0d439c) # Conflicts: # docs/reference/filebeat/filebeat-input-aws-s3.md * resolve conflicts * remove irrelevant changelog entry * apply doc changes to asciidoc and remove markdown addition --------- Co-authored-by: Dan Kortschak <[email protected]>
We may have SQS message requests in flight that are being serviced when the input's context is cancelled. This allows the user to specify a time to wait after the context is cancelled before the processing and publication work is terminated. We finish as soon as the set of pending requests had been received and published, cut short after the grace time. (cherry picked from commit c0d439c)
…e on shutdown (#43532) * x-pack/filebeat/input/awss3: allow a grace time on shutdown (#43369) We may have SQS message requests in flight that are being serviced when the input's context is cancelled. This allows the user to specify a time to wait after the context is cancelled before the processing and publication work is terminated. We finish as soon as the set of pending requests had been received and published, cut short after the grace time. (cherry picked from commit c0d439c) * remove irrelevant changelog entries --------- Co-authored-by: Dan Kortschak <[email protected]>
Proposed commit message
Checklist
CHANGELOG.next.asciidoc
orCHANGELOG-developer.next.asciidoc
.Disruptive User Impact
Author's Checklist
How to test this PR locally
Related issues
Use cases
Screenshots
Logs