Skip to content

Commit 8f1cfe0

Browse files
mergify[bot]efd6
andauthored
[8.19](backport #47730) x-pack/filebeat/input/cel: log unpublished event count and exit publish loop on context cancellation (#47907)
* x-pack/filebeat/input/cel: log unpublished event count and exit publish loop on context cancellation (#47730) (cherry picked from commit ab3d2c3) * remove changelog fragment * add changelog entry --------- Co-authored-by: Dan Kortschak <[email protected]>
1 parent cfeda4c commit 8f1cfe0

File tree

2 files changed

+21
-0
lines changed

2 files changed

+21
-0
lines changed

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
266266
- Refactor & cleanup with updates to default values and documentation. {pull}41834[41834]
267267
- Enhanced HTTPJSON input error logging with structured error metadata conforming to Elastic Common Schema (ECS) conventions. {pull}45653[45653]19
268268
- Add support for DPoP authentication for the CEL and HTTP JSON inputs. {pull}47441[47441]
269+
- Log unpublished event count and exit publish loop on input context cancellation. {issue}47717[47717] {pull}47730[47730]
269270

270271
*Auditbeat*
271272

x-pack/filebeat/input/cel/input.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,7 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
512512

513513
start = time.Now()
514514
var hadPublicationError bool
515+
loop:
515516
for i, e := range events {
516517
event, ok := e.(map[string]interface{})
517518
if !ok {
@@ -539,6 +540,25 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p
539540
pubCursor = cursor
540541
}
541542
}
543+
// This is checked prior to the publish attempt since the
544+
// cursor.Publisher interface does not document the behaviour
545+
// related to context cancellation and the context is not
546+
// explicitly passed in, so favour this explicit clarity.
547+
switch err := ctx.Err(); {
548+
case err == nil:
549+
case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded):
550+
log.Infow("context cancelled with unpublished events", "unpublished", len(events)-i)
551+
// Don't update status, since we are about to pass
552+
// through the Running state and then fall through
553+
// to the input exit with a change to Stopped.
554+
break loop
555+
default:
556+
// This should never happen.
557+
log.Warnw("failed with unpublished events", "error", err, "unpublished", len(events)-i)
558+
health.UpdateStatus(status.Degraded, "error publishing events: "+err.Error())
559+
isDegraded = true
560+
break loop
561+
}
542562
err = pub.Publish(beat.Event{
543563
Timestamp: time.Now(),
544564
Fields: event,

0 commit comments

Comments
 (0)