From 031b04a04be05b8b620c3334f2d41b4702aaf9f3 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Thu, 4 Dec 2025 13:51:06 +1030 Subject: [PATCH] x-pack/filebeat/input/cel: log unpublished event count and exit publish loop on context cancellation (#47730) (cherry picked from commit ab3d2c3fb53d8d9d233e169ae00cf5799b29d584) --- changelog/fragments/1763583500-47717-cel.yaml | 45 +++++++++++++++++++ x-pack/filebeat/input/cel/input.go | 20 +++++++++ 2 files changed, 65 insertions(+) create mode 100644 changelog/fragments/1763583500-47717-cel.yaml diff --git a/changelog/fragments/1763583500-47717-cel.yaml b/changelog/fragments/1763583500-47717-cel.yaml new file mode 100644 index 000000000000..33898f823672 --- /dev/null +++ b/changelog/fragments/1763583500-47717-cel.yaml @@ -0,0 +1,45 @@ +# REQUIRED +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: feature + +# REQUIRED for all kinds +# Change summary; a 80ish characters long description of the change. +summary: Log unpublished event count and exit publish loop on input context cancellation. + +# REQUIRED for breaking-change, deprecation, known-issue +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# description: + +# REQUIRED for breaking-change, deprecation, known-issue +# impact: + +# REQUIRED for breaking-change, deprecation, known-issue +# action: + +# REQUIRED for all kinds +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: filebeat + +# AUTOMATED +# OPTIONAL to manually add other PR URLs +# PR URL: A link the PR that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +# pr: https://github.com/owner/repo/1234 + +# AUTOMATED +# OPTIONAL to manually add other issue URLs +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +# issue: https://github.com/owner/repo/1234 diff --git a/x-pack/filebeat/input/cel/input.go b/x-pack/filebeat/input/cel/input.go index a5d37c509307..8e7d0ef2a179 100644 --- a/x-pack/filebeat/input/cel/input.go +++ b/x-pack/filebeat/input/cel/input.go @@ -514,6 +514,7 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p start = time.Now() var hadPublicationError bool + loop: for i, e := range events { event, ok := e.(map[string]interface{}) if !ok { @@ -541,6 +542,25 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p pubCursor = cursor } } + // This is checked prior to the publish attempt since the + // cursor.Publisher interface does not document the behaviour + // related to context cancellation and the context is not + // explicitly passed in, so favour this explicit clarity. + switch err := ctx.Err(); { + case err == nil: + case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded): + log.Infow("context cancelled with unpublished events", "unpublished", len(events)-i) + // Don't update status, since we are about to pass + // through the Running state and then fall through + // to the input exit with a change to Stopped. + break loop + default: + // This should never happen. + log.Warnw("failed with unpublished events", "error", err, "unpublished", len(events)-i) + health.UpdateStatus(status.Degraded, "error publishing events: "+err.Error()) + isDegraded = true + break loop + } err = pub.Publish(beat.Event{ Timestamp: time.Now(), Fields: event,