diff --git a/internal/component/local/file_match/file.go b/internal/component/local/file_match/file.go index 102f81beee..4acd594a90 100644 --- a/internal/component/local/file_match/file.go +++ b/internal/component/local/file_match/file.go @@ -37,20 +37,23 @@ var _ component.Component = (*Component)(nil) type Component struct { opts component.Options - mut sync.RWMutex - args Arguments - watches []watch - watchDog *time.Ticker + mut sync.Mutex + args Arguments + watches []watch + watchDog *time.Ticker + targetsChanged chan struct{} } // New creates a new local.file_match component. func New(o component.Options, args Arguments) (*Component, error) { c := &Component{ opts: o, - mut: sync.RWMutex{}, + mut: sync.Mutex{}, args: args, watches: make([]watch, 0), watchDog: time.NewTicker(args.SyncPeriod), + // Buffered channel to avoid blocking + targetsChanged: make(chan struct{}, 1), } if err := c.Update(args); err != nil { @@ -73,11 +76,16 @@ func (c *Component) Update(args component.Arguments) error { c.mut.Lock() defer c.mut.Unlock() + newArgs := args.(Arguments) + // Check to see if our ticker timer needs to be reset. - if args.(Arguments).SyncPeriod != c.args.SyncPeriod { - c.watchDog.Reset(c.args.SyncPeriod) + if newArgs.SyncPeriod != c.args.SyncPeriod { + c.watchDog.Reset(newArgs.SyncPeriod) } - c.args = args.(Arguments) + + c.args = newArgs + + // Rebuild watches c.watches = c.watches[:0] for _, v := range c.args.PathTargets { c.watches = append(c.watches, watch{ @@ -87,35 +95,47 @@ func (c *Component) Update(args component.Arguments) error { }) } + // Always trigger immediate check when Update is called + select { + case c.targetsChanged <- struct{}{}: + default: + } + return nil } // Run satisfies the component interface. func (c *Component) Run(ctx context.Context) error { - update := func() { - c.mut.Lock() - defer c.mut.Unlock() - - paths := c.getWatchedFiles() - // The component node checks to see if exports have actually changed. - c.opts.OnStateChange(discovery.Exports{Targets: paths}) - } - // Trigger initial check - update() defer c.watchDog.Stop() for { select { + case <-c.targetsChanged: + // When we get a signal that we have new targets we will get all watched files and + // reset the timer. + c.mut.Lock() + c.watchDog.Reset(c.args.SyncPeriod) + targets := c.getWatchedFiles(true) + c.mut.Unlock() + c.opts.OnStateChange(discovery.Exports{Targets: targets}) case <-c.watchDog.C: - // This triggers a check for any new paths, along with pushing new targets. - update() + // If we have not received a signal that we have new targets watch job will periodically + // get all files that we should watch. + c.mut.Lock() + targets := c.getWatchedFiles(false) + c.mut.Unlock() + c.opts.OnStateChange(discovery.Exports{Targets: targets}) case <-ctx.Done(): return nil } } } -func (c *Component) getWatchedFiles() []discovery.Target { +func (c *Component) getWatchedFiles(targetsUpdated bool) []discovery.Target { + start := time.Now() paths := make([]discovery.Target, 0) + + level.Info(c.opts.Logger).Log("msg", "expanding paths", "targets_updated", targetsUpdated) + // See if there is anything new we need to check. for _, w := range c.watches { newPaths, err := w.getPaths() @@ -124,5 +144,8 @@ func (c *Component) getWatchedFiles() []discovery.Target { } paths = append(paths, newPaths...) } + + level.Info(c.opts.Logger).Log("msg", "finish expanding paths", "paths", len(paths), "duration", time.Since(start)) + return paths } diff --git a/internal/component/local/file_match/file_test.go b/internal/component/local/file_match/file_test.go index 248f082e2b..988b4e6066 100644 --- a/internal/component/local/file_match/file_test.go +++ b/internal/component/local/file_match/file_test.go @@ -36,7 +36,7 @@ func TestFile(t *testing.T) { go c.Run(ct) time.Sleep(20 * time.Millisecond) ct.Done() - foundFiles := c.getWatchedFiles() + foundFiles := c.getWatchedFiles(false) require.Len(t, foundFiles, 1) require.True(t, contains(foundFiles, "t1.txt")) } @@ -58,7 +58,7 @@ func TestDirectoryFile(t *testing.T) { go c.Run(ct) time.Sleep(20 * time.Millisecond) ct.Done() - foundFiles := c.getWatchedFiles() + foundFiles := c.getWatchedFiles(false) require.Len(t, foundFiles, 1) require.True(t, contains(foundFiles, "t1.txt")) } @@ -80,14 +80,14 @@ func TestFileIgnoreOlder(t *testing.T) { c.Update(c.args) go c.Run(ct) - foundFiles := c.getWatchedFiles() + foundFiles := c.getWatchedFiles(false) require.Len(t, foundFiles, 1) require.True(t, contains(foundFiles, "t1.txt")) time.Sleep(150 * time.Millisecond) writeFile(t, dir, "t2.txt") ct.Done() - foundFiles = c.getWatchedFiles() + foundFiles = c.getWatchedFiles(false) require.Len(t, foundFiles, 1) require.True(t, contains(foundFiles, "t2.txt")) } @@ -110,7 +110,7 @@ func TestAddingFile(t *testing.T) { time.Sleep(20 * time.Millisecond) writeFile(t, dir, "t2.txt") ct.Done() - foundFiles := c.getWatchedFiles() + foundFiles := c.getWatchedFiles(false) require.Len(t, foundFiles, 2) require.True(t, contains(foundFiles, "t1.txt")) require.True(t, contains(foundFiles, "t2.txt")) @@ -138,7 +138,7 @@ func TestAddingFileInSubDir(t *testing.T) { require.NoError(t, err) time.Sleep(20 * time.Millisecond) ct.Done() - foundFiles := c.getWatchedFiles() + foundFiles := c.getWatchedFiles(false) require.Len(t, foundFiles, 3) require.True(t, contains(foundFiles, "t1.txt")) require.True(t, contains(foundFiles, "t2.txt")) @@ -175,7 +175,7 @@ func TestAddingFileInAnExcludedSubDir(t *testing.T) { require.NoError(t, err) time.Sleep(20 * time.Millisecond) ct.Done() - foundFiles := c.getWatchedFiles() + foundFiles := c.getWatchedFiles(false) require.Len(t, foundFiles, 3) require.True(t, contains(foundFiles, "t1.txt")) require.True(t, contains(foundFiles, "t2.txt")) @@ -204,7 +204,7 @@ func TestAddingRemovingFileInSubDir(t *testing.T) { err := os.WriteFile(path.Join(subdir, "t3.txt"), []byte("asdf"), 0664) require.NoError(t, err) time.Sleep(100 * time.Millisecond) - foundFiles := c.getWatchedFiles() + foundFiles := c.getWatchedFiles(false) require.Len(t, foundFiles, 3) require.True(t, contains(foundFiles, "t1.txt")) require.True(t, contains(foundFiles, "t2.txt")) @@ -213,7 +213,7 @@ func TestAddingRemovingFileInSubDir(t *testing.T) { err = os.RemoveAll(subdir) require.NoError(t, err) time.Sleep(1000 * time.Millisecond) - foundFiles = c.getWatchedFiles() + foundFiles = c.getWatchedFiles(false) require.Len(t, foundFiles, 2) require.True(t, contains(foundFiles, "t1.txt")) require.True(t, contains(foundFiles, "t2.txt")) @@ -237,7 +237,7 @@ func TestExclude(t *testing.T) { os.Mkdir(subdir, 0755) writeFile(t, subdir, "t3.txt") time.Sleep(100 * time.Millisecond) - foundFiles := c.getWatchedFiles() + foundFiles := c.getWatchedFiles(false) require.Len(t, foundFiles, 2) require.True(t, contains(foundFiles, "t1.txt")) require.True(t, contains(foundFiles, "t3.txt")) @@ -263,7 +263,7 @@ func TestMultiLabels(t *testing.T) { c.args.SyncPeriod = 10 * time.Millisecond go c.Run(ct) time.Sleep(100 * time.Millisecond) - foundFiles := c.getWatchedFiles() + foundFiles := c.getWatchedFiles(false) require.Len(t, foundFiles, 2) require.True(t, contains([]discovery.Target{foundFiles[0]}, "t1.txt")) require.True(t, contains([]discovery.Target{foundFiles[1]}, "t1.txt")) diff --git a/internal/component/loki/source/file/file.go b/internal/component/loki/source/file/file.go index 96179e3a93..4d3f3f3202 100644 --- a/internal/component/loki/source/file/file.go +++ b/internal/component/loki/source/file/file.go @@ -2,6 +2,7 @@ package file import ( "context" + "errors" "fmt" "os" "path/filepath" @@ -143,6 +144,7 @@ func (c *Component) Run(ctx context.Context) error { reader: t.reader, } }) + defer func() { level.Info(c.opts.Logger).Log("msg", "loki.source.file component shutting down, stopping readers and positions file") @@ -167,66 +169,58 @@ func (c *Component) Run(ctx context.Context) error { c.tasksMut.RUnlock() }() - for { - select { - case <-ctx.Done(): - return nil - case entry := <-c.handler.Chan(): - c.receiversMut.RLock() - for _, receiver := range c.receivers { - select { - case <-ctx.Done(): - return nil - case receiver.Chan() <- entry: - } - } - c.receiversMut.RUnlock() - case <-c.updateReaders: - // It's important to have the same lock order in Update and Run to avoid - // deadlocks. - c.tasksMut.Lock() - c.receiversMut.RLock() - - // When we are updating tasks we need to continue to read from handler.Chan(). - // This is done to avoid a race condition where stopping a reader is - // flushing its data, but nothing is reading from handler.Chan(). - readCtx, cancel := context.WithCancel(ctx) - go func() { - for { + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + case entry := <-c.handler.Chan(): + c.receiversMut.RLock() + for _, receiver := range c.receivers { select { - case entry := <-c.handler.Chan(): - for _, receiver := range c.receivers { - select { - case <-readCtx.Done(): - return - case receiver.Chan() <- entry: - } - } - case <-readCtx.Done(): + case <-ctx.Done(): return + case receiver.Chan() <- entry: } } - }() - - var tasks []*runnerTask - level.Debug(c.opts.Logger).Log("msg", "updating tasks", "tasks", len(c.tasks)) - for _, entry := range c.tasks { - tasks = append(tasks, &entry) + c.receiversMut.RUnlock() } - err := runner.ApplyTasks(ctx, tasks) + } + }() - // We cancel readCtx because we are done updating tasks and the main loop will continue to - // read from it. - cancel() - level.Debug(c.opts.Logger).Log("msg", "workers successfully updated", "workers", len(runner.Workers())) - c.receiversMut.RUnlock() - c.tasksMut.Unlock() + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + case <-c.updateReaders: + level.Debug(c.opts.Logger).Log("msg", "updating tasks", "tasks", len(c.tasks)) + + c.tasksMut.RLock() + var tasks []*runnerTask + for _, entry := range c.tasks { + tasks = append(tasks, &entry) + } + c.tasksMut.RUnlock() - if err != nil && err != context.Canceled { - return err + if err := runner.ApplyTasks(ctx, tasks); err != nil { + if !errors.Is(err, context.Canceled) { + level.Error(c.opts.Logger).Log("msg", "failed to apply tasks", "err", err) + } + } else { + level.Debug(c.opts.Logger).Log("msg", "workers successfully updated", "workers", len(runner.Workers())) + } } } - } + }() + + wg.Wait() + return nil } // Update implements component.Component. @@ -250,9 +244,12 @@ func (c *Component) Update(args component.Arguments) error { } c.tasks = make(map[positions.Entry]runnerTask) - if len(newArgs.Targets) == 0 { - level.Debug(c.opts.Logger).Log("msg", "no files targets were passed, nothing will be tailed") - } + + // There are cases where we have several targets with the same path + public labels + // but the path no longe exist so we cannot create a task for it. So we need to track + // what we have checked seperatly from the task map to prevent performing checks that + // will fail multiple times. + checked := make(map[positions.Entry]struct{}) for _, target := range newArgs.Targets { path, _ := target.Get(pathLabel) @@ -260,12 +257,27 @@ func (c *Component) Update(args component.Arguments) error { labels := target.NonReservedLabelSet() // Deduplicate targets which have the same public label set. - readersKey := positions.Entry{Path: path, Labels: labels.String()} - if _, exist := c.tasks[readersKey]; exist { + key := positions.Entry{Path: path, Labels: labels.String()} + if _, exists := checked[key]; exists { continue } - c.reportSize(path) + checked[key] = struct{}{} + + fi, err := os.Stat(path) + if err != nil { + level.Error(c.opts.Logger).Log("msg", "failed to tail file, stat failed", "error", err, "filename", path) + c.metrics.totalBytes.DeleteLabelValues(path) + continue + } + + if fi.IsDir() { + level.Info(c.opts.Logger).Log("msg", "failed to tail file", "error", "file is a directory", "filename", path) + c.metrics.totalBytes.DeleteLabelValues(path) + continue + } + + c.metrics.totalBytes.WithLabelValues(path).Set(float64(fi.Size())) reader, err := c.createReader(readerOptions{ path: path, @@ -280,7 +292,7 @@ func (c *Component) Update(args component.Arguments) error { continue } - c.tasks[readersKey] = runnerTask{ + c.tasks[key] = runnerTask{ reader: reader, path: path, labels: labels.String(), @@ -289,6 +301,10 @@ func (c *Component) Update(args component.Arguments) error { } } + if len(newArgs.Targets) == 0 { + level.Debug(c.opts.Logger).Log("msg", "no files targets were passed, nothing will be tailed") + } + select { case c.updateReaders <- struct{}{}: default: @@ -340,19 +356,6 @@ type readerOptions struct { // For most files, createReader returns a tailer implementation. If the file suffix alludes to it being // a compressed file, then a decompressor will be created instead. func (c *Component) createReader(opts readerOptions) (reader, error) { - fi, err := os.Stat(opts.path) - if err != nil { - level.Error(c.opts.Logger).Log("msg", "failed to tail file, stat failed", "error", err, "filename", opts.path) - c.metrics.totalBytes.DeleteLabelValues(opts.path) - return nil, fmt.Errorf("failed to stat path %s", opts.path) - } - - if fi.IsDir() { - level.Info(c.opts.Logger).Log("msg", "failed to tail file", "error", "file is a directory", "filename", opts.path) - c.metrics.totalBytes.DeleteLabelValues(opts.path) - return nil, fmt.Errorf("failed to tail file, it was a directory %s", opts.path) - } - var reader reader if opts.decompressionConfig.Enabled { decompressor, err := newDecompressor( @@ -403,14 +406,6 @@ func (c *Component) IsStopping() bool { return c.stopping.Load() } -func (c *Component) reportSize(path string) { - fi, err := os.Stat(path) - if err != nil { - return - } - c.metrics.totalBytes.WithLabelValues(path).Set(float64(fi.Size())) -} - func receiversChanged(prev, next []loki.LogsReceiver) bool { if len(prev) != len(next) { return true