Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Refactor & cleanup with updates to default values and documentation. {pull}41834[41834]
- Enhanced HTTPJSON input error logging with structured error metadata conforming to Elastic Common Schema (ECS) conventions. {pull}45653[45653]19
- Add support for DPoP authentication for the CEL and HTTP JSON inputs. {pull}47441[47441]
- Log unpublished event count and exit publish loop on input context cancellation. {issue}47717[47717] {pull}47730[47730]

*Auditbeat*

Expand Down
20 changes: 20 additions & 0 deletions x-pack/filebeat/input/cel/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@
func (input) Name() string { return inputName }

func (input) Test(src inputcursor.Source, _ v2.TestContext) error {
cfg := src.(*source).cfg

Check failure on line 102 in x-pack/filebeat/input/cel/input.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

Error return value is not checked (errcheck)
if !wantClient(cfg) {
return nil
}
Expand All @@ -109,7 +109,7 @@
// Run starts the input and blocks until it ends completes. It will return on
// context cancellation or type invalidity errors, any other error will be retried.
func (input) Run(env v2.Context, src inputcursor.Source, crsr inputcursor.Cursor, pub inputcursor.Publisher) error {
dataStreamName := src.(*source).cfg.DataStream // May be empty.

Check failure on line 112 in x-pack/filebeat/input/cel/input.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

Error return value is not checked (errcheck)

var cursor map[string]interface{}
env.UpdateStatus(status.Starting, dataStreamName)
Expand All @@ -127,7 +127,7 @@
parent: &env,
}
}
err := input{}.run(env, src.(*source), cursor, pub, health)

Check failure on line 130 in x-pack/filebeat/input/cel/input.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

Error return value is not checked (errcheck)
if err != nil {
msg := "failed to run: " + err.Error()
if dataStreamName != "" {
Expand Down Expand Up @@ -512,6 +512,7 @@

start = time.Now()
var hadPublicationError bool
loop:
for i, e := range events {
event, ok := e.(map[string]interface{})
if !ok {
Expand Down Expand Up @@ -539,6 +540,25 @@
pubCursor = cursor
}
}
// This is checked prior to the publish attempt since the
// cursor.Publisher interface does not document the behaviour
// related to context cancellation and the context is not
// explicitly passed in, so favour this explicit clarity.
switch err := ctx.Err(); {
case err == nil:
case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded):
log.Infow("context cancelled with unpublished events", "unpublished", len(events)-i)
// Don't update status, since we are about to pass
// through the Running state and then fall through
// to the input exit with a change to Stopped.
break loop
default:
// This should never happen.
log.Warnw("failed with unpublished events", "error", err, "unpublished", len(events)-i)
health.UpdateStatus(status.Degraded, "error publishing events: "+err.Error())
isDegraded = true
break loop
}
err = pub.Publish(beat.Event{
Timestamp: time.Now(),
Fields: event,
Expand Down
Loading