From 6a46dd15da1e1851828fd4f0918300f7cc273d7b Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Tue, 14 Oct 2025 13:24:55 +0200 Subject: [PATCH 01/13] Split consumer loop and target update loop --- internal/component/loki/source/file/file.go | 97 +++++++++------------ 1 file changed, 43 insertions(+), 54 deletions(-) diff --git a/internal/component/loki/source/file/file.go b/internal/component/loki/source/file/file.go index 96179e3a93..7c7054873b 100644 --- a/internal/component/loki/source/file/file.go +++ b/internal/component/loki/source/file/file.go @@ -2,6 +2,7 @@ package file import ( "context" + "errors" "fmt" "os" "path/filepath" @@ -10,6 +11,7 @@ import ( "time" "github.com/grafana/tail/watch" + "github.com/oklog/run" "github.com/prometheus/common/model" "go.uber.org/atomic" @@ -143,6 +145,7 @@ func (c *Component) Run(ctx context.Context) error { reader: t.reader, } }) + defer func() { level.Info(c.opts.Logger).Log("msg", "loki.source.file component shutting down, stopping readers and positions file") @@ -167,66 +170,53 @@ func (c *Component) Run(ctx context.Context) error { c.tasksMut.RUnlock() }() - for { - select { - case <-ctx.Done(): - return nil - case entry := <-c.handler.Chan(): - c.receiversMut.RLock() - for _, receiver := range c.receivers { - select { - case <-ctx.Done(): - return nil - case receiver.Chan() <- entry: - } - } - c.receiversMut.RUnlock() - case <-c.updateReaders: - // It's important to have the same lock order in Update and Run to avoid - // deadlocks. - c.tasksMut.Lock() - c.receiversMut.RLock() - - // When we are updating tasks we need to continue to read from handler.Chan(). - // This is done to avoid a race condition where stopping a reader is - // flushing its data, but nothing is reading from handler.Chan(). - readCtx, cancel := context.WithCancel(ctx) - go func() { - for { + var rg run.Group + + rg.Add(func() error { + for { + select { + case <-ctx.Done(): + return nil + case entry := <-c.handler.Chan(): + c.receiversMut.RLock() + for _, receiver := range c.receivers { select { - case entry := <-c.handler.Chan(): - for _, receiver := range c.receivers { - select { - case <-readCtx.Done(): - return - case receiver.Chan() <- entry: - } - } - case <-readCtx.Done(): - return + case <-ctx.Done(): + return nil + case receiver.Chan() <- entry: } } - }() - - var tasks []*runnerTask - level.Debug(c.opts.Logger).Log("msg", "updating tasks", "tasks", len(c.tasks)) - for _, entry := range c.tasks { - tasks = append(tasks, &entry) + c.receiversMut.RUnlock() } - err := runner.ApplyTasks(ctx, tasks) - - // We cancel readCtx because we are done updating tasks and the main loop will continue to - // read from it. - cancel() - level.Debug(c.opts.Logger).Log("msg", "workers successfully updated", "workers", len(runner.Workers())) - c.receiversMut.RUnlock() - c.tasksMut.Unlock() + } + }, func(err error) {}) + + rg.Add(func() error { + for { + select { + case <-ctx.Done(): + return nil + case <-c.updateReaders: + level.Debug(c.opts.Logger).Log("msg", "updating tasks", "tasks", len(c.tasks)) + + c.tasksMut.RLock() + var tasks []*runnerTask + for _, entry := range c.tasks { + tasks = append(tasks, &entry) + } + c.tasksMut.RUnlock() - if err != nil && err != context.Canceled { - return err + if err := runner.ApplyTasks(ctx, tasks); err != nil && !errors.Is(err, context.Canceled) { + level.Error(c.opts.Logger).Log("msg", "failed to apply tasks", "err", err) + } else { + level.Debug(c.opts.Logger).Log("msg", "workers successfully updated", "workers", len(runner.Workers())) + } } + } - } + }, func(err error) {}) + + return rg.Run() } // Update implements component.Component. @@ -248,7 +238,6 @@ func (c *Component) Update(args component.Arguments) error { } else { c.receiversMut.RUnlock() } - c.tasks = make(map[positions.Entry]runnerTask) if len(newArgs.Targets) == 0 { level.Debug(c.opts.Logger).Log("msg", "no files targets were passed, nothing will be tailed") From 4bfe5eb945f793287a2ea59dc8d13587935144da Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Tue, 14 Oct 2025 13:43:08 +0200 Subject: [PATCH 02/13] Use waitgroup --- internal/component/loki/source/file/file.go | 26 +++++++++++---------- 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/internal/component/loki/source/file/file.go b/internal/component/loki/source/file/file.go index 7c7054873b..dbbc043b49 100644 --- a/internal/component/loki/source/file/file.go +++ b/internal/component/loki/source/file/file.go @@ -11,7 +11,6 @@ import ( "time" "github.com/grafana/tail/watch" - "github.com/oklog/run" "github.com/prometheus/common/model" "go.uber.org/atomic" @@ -170,32 +169,35 @@ func (c *Component) Run(ctx context.Context) error { c.tasksMut.RUnlock() }() - var rg run.Group - - rg.Add(func() error { + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() for { select { case <-ctx.Done(): - return nil + return case entry := <-c.handler.Chan(): c.receiversMut.RLock() for _, receiver := range c.receivers { select { case <-ctx.Done(): - return nil + return case receiver.Chan() <- entry: } } c.receiversMut.RUnlock() } } - }, func(err error) {}) + }() - rg.Add(func() error { + wg.Add(1) + go func() { + defer wg.Done() for { select { case <-ctx.Done(): - return nil + return case <-c.updateReaders: level.Debug(c.opts.Logger).Log("msg", "updating tasks", "tasks", len(c.tasks)) @@ -212,11 +214,11 @@ func (c *Component) Run(ctx context.Context) error { level.Debug(c.opts.Logger).Log("msg", "workers successfully updated", "workers", len(runner.Workers())) } } - } - }, func(err error) {}) + }() - return rg.Run() + wg.Wait() + return nil } // Update implements component.Component. From cff887e8342895d7f33ffb5e01bf632ab61ad45e Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Tue, 14 Oct 2025 13:46:41 +0200 Subject: [PATCH 03/13] fix log --- internal/component/loki/source/file/file.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/component/loki/source/file/file.go b/internal/component/loki/source/file/file.go index dbbc043b49..e5ce9f5bd8 100644 --- a/internal/component/loki/source/file/file.go +++ b/internal/component/loki/source/file/file.go @@ -210,7 +210,7 @@ func (c *Component) Run(ctx context.Context) error { if err := runner.ApplyTasks(ctx, tasks); err != nil && !errors.Is(err, context.Canceled) { level.Error(c.opts.Logger).Log("msg", "failed to apply tasks", "err", err) - } else { + } else if err == nil { level.Debug(c.opts.Logger).Log("msg", "workers successfully updated", "workers", len(runner.Workers())) } } From ce225c1ca49f04f17513e0ead507f6eb854bc4b7 Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Thu, 16 Oct 2025 14:54:56 +0200 Subject: [PATCH 04/13] file_match: only export targets if anything has changed We also make sure to collect all targets and perform the export every time component is updated with new discovery data --- internal/component/local/file_match/file.go | 75 +++++++++++++++------ 1 file changed, 56 insertions(+), 19 deletions(-) diff --git a/internal/component/local/file_match/file.go b/internal/component/local/file_match/file.go index 102f81beee..6281f4828c 100644 --- a/internal/component/local/file_match/file.go +++ b/internal/component/local/file_match/file.go @@ -37,20 +37,24 @@ var _ component.Component = (*Component)(nil) type Component struct { opts component.Options - mut sync.RWMutex - args Arguments - watches []watch - watchDog *time.Ticker + mut sync.Mutex + args Arguments + watches []watch + watchDog *time.Ticker + previousTargets []discovery.Target + triggerChan chan struct{} } // New creates a new local.file_match component. func New(o component.Options, args Arguments) (*Component, error) { c := &Component{ - opts: o, - mut: sync.RWMutex{}, - args: args, - watches: make([]watch, 0), - watchDog: time.NewTicker(args.SyncPeriod), + opts: o, + mut: sync.Mutex{}, + args: args, + watches: make([]watch, 0), + watchDog: time.NewTicker(args.SyncPeriod), + previousTargets: make([]discovery.Target, 0), + triggerChan: make(chan struct{}, 1), // Buffered channel to avoid blocking } if err := c.Update(args); err != nil { @@ -73,11 +77,16 @@ func (c *Component) Update(args component.Arguments) error { c.mut.Lock() defer c.mut.Unlock() + newArgs := args.(Arguments) + // Check to see if our ticker timer needs to be reset. - if args.(Arguments).SyncPeriod != c.args.SyncPeriod { - c.watchDog.Reset(c.args.SyncPeriod) + if newArgs.SyncPeriod != c.args.SyncPeriod { + c.watchDog.Reset(newArgs.SyncPeriod) } - c.args = args.(Arguments) + + c.args = newArgs + + // Rebuild watches c.watches = c.watches[:0] for _, v := range c.args.PathTargets { c.watches = append(c.watches, watch{ @@ -87,27 +96,43 @@ func (c *Component) Update(args component.Arguments) error { }) } + // Always trigger immediate check when Update is called + select { + case c.triggerChan <- struct{}{}: + default: + } + return nil } // Run satisfies the component interface. func (c *Component) Run(ctx context.Context) error { - update := func() { + expand := func(export func(paths []discovery.Target)) { c.mut.Lock() defer c.mut.Unlock() paths := c.getWatchedFiles() - // The component node checks to see if exports have actually changed. - c.opts.OnStateChange(discovery.Exports{Targets: paths}) + export(paths) } - // Trigger initial check - update() + defer c.watchDog.Stop() for { select { case <-c.watchDog.C: - // This triggers a check for any new paths, along with pushing new targets. - update() + // Only export if targets are different + expand(func(paths []discovery.Target) { + if !equalTargets(c.previousTargets, paths) { + c.previousTargets = make([]discovery.Target, len(paths)) + copy(c.previousTargets, paths) + c.opts.OnStateChange(discovery.Exports{Targets: paths}) + } + }) + case <-c.triggerChan: + // We got new targets so we should export them directly. + expand(func(paths []discovery.Target) { + c.watchDog.Reset(c.args.SyncPeriod) + c.opts.OnStateChange(discovery.Exports{Targets: paths}) + }) case <-ctx.Done(): return nil } @@ -126,3 +151,15 @@ func (c *Component) getWatchedFiles() []discovery.Target { } return paths } + +func equalTargets(a, b []discovery.Target) bool { + if len(a) != len(b) { + return false + } + for i := range a { + if !a[i].Equals(b[i]) { + return false + } + } + return true +} From 9ac201674ae73b18043e130ee754312142439de9 Mon Sep 17 00:00:00 2001 From: Karl Persson <23356117+kalleep@users.noreply.github.com> Date: Thu, 16 Oct 2025 15:01:44 +0200 Subject: [PATCH 05/13] Update internal/component/loki/source/file/file.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- internal/component/loki/source/file/file.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/internal/component/loki/source/file/file.go b/internal/component/loki/source/file/file.go index e5ce9f5bd8..313bc7e811 100644 --- a/internal/component/loki/source/file/file.go +++ b/internal/component/loki/source/file/file.go @@ -208,9 +208,11 @@ func (c *Component) Run(ctx context.Context) error { } c.tasksMut.RUnlock() - if err := runner.ApplyTasks(ctx, tasks); err != nil && !errors.Is(err, context.Canceled) { - level.Error(c.opts.Logger).Log("msg", "failed to apply tasks", "err", err) - } else if err == nil { + if err := runner.ApplyTasks(ctx, tasks); err != nil { + if !errors.Is(err, context.Canceled) { + level.Error(c.opts.Logger).Log("msg", "failed to apply tasks", "err", err) + } + } else { level.Debug(c.opts.Logger).Log("msg", "workers successfully updated", "workers", len(runner.Workers())) } } From 36db133687b8a417306534979d542d7c99aef58f Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Thu, 16 Oct 2025 15:02:06 +0200 Subject: [PATCH 06/13] Update order --- internal/component/local/file_match/file.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/internal/component/local/file_match/file.go b/internal/component/local/file_match/file.go index 6281f4828c..9d767829f4 100644 --- a/internal/component/local/file_match/file.go +++ b/internal/component/local/file_match/file.go @@ -118,6 +118,13 @@ func (c *Component) Run(ctx context.Context) error { defer c.watchDog.Stop() for { select { + case <-c.triggerChan: + // We got new targets so we should export them directly. + expand(func(paths []discovery.Target) { + c.watchDog.Reset(c.args.SyncPeriod) + c.opts.OnStateChange(discovery.Exports{Targets: paths}) + }) + case <-c.watchDog.C: // Only export if targets are different expand(func(paths []discovery.Target) { @@ -127,12 +134,6 @@ func (c *Component) Run(ctx context.Context) error { c.opts.OnStateChange(discovery.Exports{Targets: paths}) } }) - case <-c.triggerChan: - // We got new targets so we should export them directly. - expand(func(paths []discovery.Target) { - c.watchDog.Reset(c.args.SyncPeriod) - c.opts.OnStateChange(discovery.Exports{Targets: paths}) - }) case <-ctx.Done(): return nil } From 716a5fb969fc476d0d7af9e9211798781d6f9370 Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Thu, 16 Oct 2025 15:23:41 +0200 Subject: [PATCH 07/13] fix equal check --- internal/component/local/file_match/file.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/component/local/file_match/file.go b/internal/component/local/file_match/file.go index 9d767829f4..80bb59c20c 100644 --- a/internal/component/local/file_match/file.go +++ b/internal/component/local/file_match/file.go @@ -158,7 +158,7 @@ func equalTargets(a, b []discovery.Target) bool { return false } for i := range a { - if !a[i].Equals(b[i]) { + if !a[i].EqualsTarget(&b[i]) { return false } } From d6aba4fc7b846286bf53683cb5e9b409af63d9a6 Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Thu, 16 Oct 2025 15:48:10 +0200 Subject: [PATCH 08/13] Rework so we don't stat path twice --- internal/component/loki/source/file/file.go | 36 ++++++++------------- 1 file changed, 14 insertions(+), 22 deletions(-) diff --git a/internal/component/loki/source/file/file.go b/internal/component/loki/source/file/file.go index 313bc7e811..f9931ab892 100644 --- a/internal/component/loki/source/file/file.go +++ b/internal/component/loki/source/file/file.go @@ -258,7 +258,20 @@ func (c *Component) Update(args component.Arguments) error { continue } - c.reportSize(path) + fi, err := os.Stat(path) + if err != nil { + level.Error(c.opts.Logger).Log("msg", "failed to tail file, stat failed", "error", err, "filename", path) + c.metrics.totalBytes.DeleteLabelValues(path) + continue + } + + if fi.IsDir() { + level.Info(c.opts.Logger).Log("msg", "failed to tail file", "error", "file is a directory", "filename", path) + c.metrics.totalBytes.DeleteLabelValues(path) + continue + } + + c.metrics.totalBytes.WithLabelValues(path).Set(float64(fi.Size())) reader, err := c.createReader(readerOptions{ path: path, @@ -333,19 +346,6 @@ type readerOptions struct { // For most files, createReader returns a tailer implementation. If the file suffix alludes to it being // a compressed file, then a decompressor will be created instead. func (c *Component) createReader(opts readerOptions) (reader, error) { - fi, err := os.Stat(opts.path) - if err != nil { - level.Error(c.opts.Logger).Log("msg", "failed to tail file, stat failed", "error", err, "filename", opts.path) - c.metrics.totalBytes.DeleteLabelValues(opts.path) - return nil, fmt.Errorf("failed to stat path %s", opts.path) - } - - if fi.IsDir() { - level.Info(c.opts.Logger).Log("msg", "failed to tail file", "error", "file is a directory", "filename", opts.path) - c.metrics.totalBytes.DeleteLabelValues(opts.path) - return nil, fmt.Errorf("failed to tail file, it was a directory %s", opts.path) - } - var reader reader if opts.decompressionConfig.Enabled { decompressor, err := newDecompressor( @@ -396,14 +396,6 @@ func (c *Component) IsStopping() bool { return c.stopping.Load() } -func (c *Component) reportSize(path string) { - fi, err := os.Stat(path) - if err != nil { - return - } - c.metrics.totalBytes.WithLabelValues(path).Set(float64(fi.Size())) -} - func receiversChanged(prev, next []loki.LogsReceiver) bool { if len(prev) != len(next) { return true From 46103e4edb9ef8d615d7587823df61e0034cd62e Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Fri, 17 Oct 2025 00:12:33 +0200 Subject: [PATCH 09/13] always update state --- internal/component/local/file_match/file.go | 20 ++------------------ 1 file changed, 2 insertions(+), 18 deletions(-) diff --git a/internal/component/local/file_match/file.go b/internal/component/local/file_match/file.go index 80bb59c20c..947ab9fbb1 100644 --- a/internal/component/local/file_match/file.go +++ b/internal/component/local/file_match/file.go @@ -126,13 +126,9 @@ func (c *Component) Run(ctx context.Context) error { }) case <-c.watchDog.C: - // Only export if targets are different + // Update state in each interval expand(func(paths []discovery.Target) { - if !equalTargets(c.previousTargets, paths) { - c.previousTargets = make([]discovery.Target, len(paths)) - copy(c.previousTargets, paths) - c.opts.OnStateChange(discovery.Exports{Targets: paths}) - } + c.opts.OnStateChange(discovery.Exports{Targets: paths}) }) case <-ctx.Done(): return nil @@ -152,15 +148,3 @@ func (c *Component) getWatchedFiles() []discovery.Target { } return paths } - -func equalTargets(a, b []discovery.Target) bool { - if len(a) != len(b) { - return false - } - for i := range a { - if !a[i].EqualsTarget(&b[i]) { - return false - } - } - return true -} From a6e3b56ff250691962f2169c4db28bb0b4713fbc Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Fri, 17 Oct 2025 00:25:21 +0200 Subject: [PATCH 10/13] cleanup --- internal/component/local/file_match/file.go | 59 +++++++++------------ 1 file changed, 26 insertions(+), 33 deletions(-) diff --git a/internal/component/local/file_match/file.go b/internal/component/local/file_match/file.go index 947ab9fbb1..6beb24fbed 100644 --- a/internal/component/local/file_match/file.go +++ b/internal/component/local/file_match/file.go @@ -37,24 +37,22 @@ var _ component.Component = (*Component)(nil) type Component struct { opts component.Options - mut sync.Mutex - args Arguments - watches []watch - watchDog *time.Ticker - previousTargets []discovery.Target - triggerChan chan struct{} + mut sync.Mutex + args Arguments + watches []watch + watchDog *time.Ticker + targetsChanged chan struct{} } // New creates a new local.file_match component. func New(o component.Options, args Arguments) (*Component, error) { c := &Component{ - opts: o, - mut: sync.Mutex{}, - args: args, - watches: make([]watch, 0), - watchDog: time.NewTicker(args.SyncPeriod), - previousTargets: make([]discovery.Target, 0), - triggerChan: make(chan struct{}, 1), // Buffered channel to avoid blocking + opts: o, + mut: sync.Mutex{}, + args: args, + watches: make([]watch, 0), + watchDog: time.NewTicker(args.SyncPeriod), + targetsChanged: make(chan struct{}, 1), // Buffered channel to avoid blocking } if err := c.Update(args); err != nil { @@ -98,7 +96,7 @@ func (c *Component) Update(args component.Arguments) error { // Always trigger immediate check when Update is called select { - case c.triggerChan <- struct{}{}: + case c.targetsChanged <- struct{}{}: default: } @@ -107,29 +105,24 @@ func (c *Component) Update(args component.Arguments) error { // Run satisfies the component interface. func (c *Component) Run(ctx context.Context) error { - expand := func(export func(paths []discovery.Target)) { - c.mut.Lock() - defer c.mut.Unlock() - - paths := c.getWatchedFiles() - export(paths) - } - defer c.watchDog.Stop() for { select { - case <-c.triggerChan: - // We got new targets so we should export them directly. - expand(func(paths []discovery.Target) { - c.watchDog.Reset(c.args.SyncPeriod) - c.opts.OnStateChange(discovery.Exports{Targets: paths}) - }) - + case <-c.targetsChanged: + // When we get a signal that we have new targets we will get all watched files and + // reset the timer. + c.mut.Lock() + c.watchDog.Reset(c.args.SyncPeriod) + targets := c.getWatchedFiles() + c.mut.Unlock() + c.opts.OnStateChange(discovery.Exports{Targets: targets}) case <-c.watchDog.C: - // Update state in each interval - expand(func(paths []discovery.Target) { - c.opts.OnStateChange(discovery.Exports{Targets: paths}) - }) + // If we have not received a signal that we have new targets watch job will periodically + // get all files that we should watch. + c.mut.Lock() + targets := c.getWatchedFiles() + c.mut.Unlock() + c.opts.OnStateChange(discovery.Exports{Targets: targets}) case <-ctx.Done(): return nil } From 2dfe22a2ca93ba42eb22811899d464b17f294857 Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Fri, 17 Oct 2025 00:27:34 +0200 Subject: [PATCH 11/13] fix --- internal/component/local/file_match/file.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/internal/component/local/file_match/file.go b/internal/component/local/file_match/file.go index 6beb24fbed..e115bf5015 100644 --- a/internal/component/local/file_match/file.go +++ b/internal/component/local/file_match/file.go @@ -47,12 +47,13 @@ type Component struct { // New creates a new local.file_match component. func New(o component.Options, args Arguments) (*Component, error) { c := &Component{ - opts: o, - mut: sync.Mutex{}, - args: args, - watches: make([]watch, 0), - watchDog: time.NewTicker(args.SyncPeriod), - targetsChanged: make(chan struct{}, 1), // Buffered channel to avoid blocking + opts: o, + mut: sync.Mutex{}, + args: args, + watches: make([]watch, 0), + watchDog: time.NewTicker(args.SyncPeriod), + // Buffered channel to avoid blocking + targetsChanged: make(chan struct{}, 1), } if err := c.Update(args); err != nil { From 494f6a762169ddcc22f86a8b3af6af6438a1992e Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Mon, 20 Oct 2025 16:24:29 +0200 Subject: [PATCH 12/13] Prevent checking targets multiple time in cases where we cannot create a new task --- internal/component/loki/source/file/file.go | 22 +++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/internal/component/loki/source/file/file.go b/internal/component/loki/source/file/file.go index f9931ab892..4d3f3f3202 100644 --- a/internal/component/loki/source/file/file.go +++ b/internal/component/loki/source/file/file.go @@ -242,10 +242,14 @@ func (c *Component) Update(args component.Arguments) error { } else { c.receiversMut.RUnlock() } + c.tasks = make(map[positions.Entry]runnerTask) - if len(newArgs.Targets) == 0 { - level.Debug(c.opts.Logger).Log("msg", "no files targets were passed, nothing will be tailed") - } + + // There are cases where we have several targets with the same path + public labels + // but the path no longe exist so we cannot create a task for it. So we need to track + // what we have checked seperatly from the task map to prevent performing checks that + // will fail multiple times. + checked := make(map[positions.Entry]struct{}) for _, target := range newArgs.Targets { path, _ := target.Get(pathLabel) @@ -253,11 +257,13 @@ func (c *Component) Update(args component.Arguments) error { labels := target.NonReservedLabelSet() // Deduplicate targets which have the same public label set. - readersKey := positions.Entry{Path: path, Labels: labels.String()} - if _, exist := c.tasks[readersKey]; exist { + key := positions.Entry{Path: path, Labels: labels.String()} + if _, exists := checked[key]; exists { continue } + checked[key] = struct{}{} + fi, err := os.Stat(path) if err != nil { level.Error(c.opts.Logger).Log("msg", "failed to tail file, stat failed", "error", err, "filename", path) @@ -286,7 +292,7 @@ func (c *Component) Update(args component.Arguments) error { continue } - c.tasks[readersKey] = runnerTask{ + c.tasks[key] = runnerTask{ reader: reader, path: path, labels: labels.String(), @@ -295,6 +301,10 @@ func (c *Component) Update(args component.Arguments) error { } } + if len(newArgs.Targets) == 0 { + level.Debug(c.opts.Logger).Log("msg", "no files targets were passed, nothing will be tailed") + } + select { case c.updateReaders <- struct{}{}: default: From c24f4a7c95ff50bbbc3999daff87c72f48d5a469 Mon Sep 17 00:00:00 2001 From: Kalle <23356117+kalleep@users.noreply.github.com> Date: Tue, 21 Oct 2025 11:13:25 +0200 Subject: [PATCH 13/13] Add debug logging --- internal/component/local/file_match/file.go | 13 ++++++++--- .../component/local/file_match/file_test.go | 22 +++++++++---------- 2 files changed, 21 insertions(+), 14 deletions(-) diff --git a/internal/component/local/file_match/file.go b/internal/component/local/file_match/file.go index e115bf5015..4acd594a90 100644 --- a/internal/component/local/file_match/file.go +++ b/internal/component/local/file_match/file.go @@ -114,14 +114,14 @@ func (c *Component) Run(ctx context.Context) error { // reset the timer. c.mut.Lock() c.watchDog.Reset(c.args.SyncPeriod) - targets := c.getWatchedFiles() + targets := c.getWatchedFiles(true) c.mut.Unlock() c.opts.OnStateChange(discovery.Exports{Targets: targets}) case <-c.watchDog.C: // If we have not received a signal that we have new targets watch job will periodically // get all files that we should watch. c.mut.Lock() - targets := c.getWatchedFiles() + targets := c.getWatchedFiles(false) c.mut.Unlock() c.opts.OnStateChange(discovery.Exports{Targets: targets}) case <-ctx.Done(): @@ -130,8 +130,12 @@ func (c *Component) Run(ctx context.Context) error { } } -func (c *Component) getWatchedFiles() []discovery.Target { +func (c *Component) getWatchedFiles(targetsUpdated bool) []discovery.Target { + start := time.Now() paths := make([]discovery.Target, 0) + + level.Info(c.opts.Logger).Log("msg", "expanding paths", "targets_updated", targetsUpdated) + // See if there is anything new we need to check. for _, w := range c.watches { newPaths, err := w.getPaths() @@ -140,5 +144,8 @@ func (c *Component) getWatchedFiles() []discovery.Target { } paths = append(paths, newPaths...) } + + level.Info(c.opts.Logger).Log("msg", "finish expanding paths", "paths", len(paths), "duration", time.Since(start)) + return paths } diff --git a/internal/component/local/file_match/file_test.go b/internal/component/local/file_match/file_test.go index 248f082e2b..988b4e6066 100644 --- a/internal/component/local/file_match/file_test.go +++ b/internal/component/local/file_match/file_test.go @@ -36,7 +36,7 @@ func TestFile(t *testing.T) { go c.Run(ct) time.Sleep(20 * time.Millisecond) ct.Done() - foundFiles := c.getWatchedFiles() + foundFiles := c.getWatchedFiles(false) require.Len(t, foundFiles, 1) require.True(t, contains(foundFiles, "t1.txt")) } @@ -58,7 +58,7 @@ func TestDirectoryFile(t *testing.T) { go c.Run(ct) time.Sleep(20 * time.Millisecond) ct.Done() - foundFiles := c.getWatchedFiles() + foundFiles := c.getWatchedFiles(false) require.Len(t, foundFiles, 1) require.True(t, contains(foundFiles, "t1.txt")) } @@ -80,14 +80,14 @@ func TestFileIgnoreOlder(t *testing.T) { c.Update(c.args) go c.Run(ct) - foundFiles := c.getWatchedFiles() + foundFiles := c.getWatchedFiles(false) require.Len(t, foundFiles, 1) require.True(t, contains(foundFiles, "t1.txt")) time.Sleep(150 * time.Millisecond) writeFile(t, dir, "t2.txt") ct.Done() - foundFiles = c.getWatchedFiles() + foundFiles = c.getWatchedFiles(false) require.Len(t, foundFiles, 1) require.True(t, contains(foundFiles, "t2.txt")) } @@ -110,7 +110,7 @@ func TestAddingFile(t *testing.T) { time.Sleep(20 * time.Millisecond) writeFile(t, dir, "t2.txt") ct.Done() - foundFiles := c.getWatchedFiles() + foundFiles := c.getWatchedFiles(false) require.Len(t, foundFiles, 2) require.True(t, contains(foundFiles, "t1.txt")) require.True(t, contains(foundFiles, "t2.txt")) @@ -138,7 +138,7 @@ func TestAddingFileInSubDir(t *testing.T) { require.NoError(t, err) time.Sleep(20 * time.Millisecond) ct.Done() - foundFiles := c.getWatchedFiles() + foundFiles := c.getWatchedFiles(false) require.Len(t, foundFiles, 3) require.True(t, contains(foundFiles, "t1.txt")) require.True(t, contains(foundFiles, "t2.txt")) @@ -175,7 +175,7 @@ func TestAddingFileInAnExcludedSubDir(t *testing.T) { require.NoError(t, err) time.Sleep(20 * time.Millisecond) ct.Done() - foundFiles := c.getWatchedFiles() + foundFiles := c.getWatchedFiles(false) require.Len(t, foundFiles, 3) require.True(t, contains(foundFiles, "t1.txt")) require.True(t, contains(foundFiles, "t2.txt")) @@ -204,7 +204,7 @@ func TestAddingRemovingFileInSubDir(t *testing.T) { err := os.WriteFile(path.Join(subdir, "t3.txt"), []byte("asdf"), 0664) require.NoError(t, err) time.Sleep(100 * time.Millisecond) - foundFiles := c.getWatchedFiles() + foundFiles := c.getWatchedFiles(false) require.Len(t, foundFiles, 3) require.True(t, contains(foundFiles, "t1.txt")) require.True(t, contains(foundFiles, "t2.txt")) @@ -213,7 +213,7 @@ func TestAddingRemovingFileInSubDir(t *testing.T) { err = os.RemoveAll(subdir) require.NoError(t, err) time.Sleep(1000 * time.Millisecond) - foundFiles = c.getWatchedFiles() + foundFiles = c.getWatchedFiles(false) require.Len(t, foundFiles, 2) require.True(t, contains(foundFiles, "t1.txt")) require.True(t, contains(foundFiles, "t2.txt")) @@ -237,7 +237,7 @@ func TestExclude(t *testing.T) { os.Mkdir(subdir, 0755) writeFile(t, subdir, "t3.txt") time.Sleep(100 * time.Millisecond) - foundFiles := c.getWatchedFiles() + foundFiles := c.getWatchedFiles(false) require.Len(t, foundFiles, 2) require.True(t, contains(foundFiles, "t1.txt")) require.True(t, contains(foundFiles, "t3.txt")) @@ -263,7 +263,7 @@ func TestMultiLabels(t *testing.T) { c.args.SyncPeriod = 10 * time.Millisecond go c.Run(ct) time.Sleep(100 * time.Millisecond) - foundFiles := c.getWatchedFiles() + foundFiles := c.getWatchedFiles(false) require.Len(t, foundFiles, 2) require.True(t, contains([]discovery.Target{foundFiles[0]}, "t1.txt")) require.True(t, contains([]discovery.Target{foundFiles[1]}, "t1.txt"))