Skip to content
65 changes: 44 additions & 21 deletions internal/component/local/file_match/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,23 @@ var _ component.Component = (*Component)(nil)
type Component struct {
opts component.Options

mut sync.RWMutex
args Arguments
watches []watch
watchDog *time.Ticker
mut sync.Mutex
args Arguments
watches []watch
watchDog *time.Ticker
targetsChanged chan struct{}
}

// New creates a new local.file_match component.
func New(o component.Options, args Arguments) (*Component, error) {
c := &Component{
opts: o,
mut: sync.RWMutex{},
mut: sync.Mutex{},
args: args,
watches: make([]watch, 0),
watchDog: time.NewTicker(args.SyncPeriod),
// Buffered channel to avoid blocking
targetsChanged: make(chan struct{}, 1),
}

if err := c.Update(args); err != nil {
Expand All @@ -73,11 +76,16 @@ func (c *Component) Update(args component.Arguments) error {
c.mut.Lock()
defer c.mut.Unlock()

newArgs := args.(Arguments)

// Check to see if our ticker timer needs to be reset.
if args.(Arguments).SyncPeriod != c.args.SyncPeriod {
c.watchDog.Reset(c.args.SyncPeriod)
if newArgs.SyncPeriod != c.args.SyncPeriod {
c.watchDog.Reset(newArgs.SyncPeriod)
}
c.args = args.(Arguments)

c.args = newArgs

// Rebuild watches
c.watches = c.watches[:0]
for _, v := range c.args.PathTargets {
c.watches = append(c.watches, watch{
Expand All @@ -87,35 +95,47 @@ func (c *Component) Update(args component.Arguments) error {
})
}

// Always trigger immediate check when Update is called
select {
case c.targetsChanged <- struct{}{}:
default:
}

return nil
}

// Run satisfies the component interface.
func (c *Component) Run(ctx context.Context) error {
update := func() {
c.mut.Lock()
defer c.mut.Unlock()

paths := c.getWatchedFiles()
// The component node checks to see if exports have actually changed.
c.opts.OnStateChange(discovery.Exports{Targets: paths})
}
// Trigger initial check
update()
defer c.watchDog.Stop()
for {
select {
case <-c.targetsChanged:
// When we get a signal that we have new targets we will get all watched files and
// reset the timer.
c.mut.Lock()
c.watchDog.Reset(c.args.SyncPeriod)
targets := c.getWatchedFiles(true)
c.mut.Unlock()
c.opts.OnStateChange(discovery.Exports{Targets: targets})
case <-c.watchDog.C:
// This triggers a check for any new paths, along with pushing new targets.
update()
// If we have not received a signal that we have new targets watch job will periodically
// get all files that we should watch.
c.mut.Lock()
targets := c.getWatchedFiles(false)
c.mut.Unlock()
c.opts.OnStateChange(discovery.Exports{Targets: targets})
case <-ctx.Done():
return nil
}
}
}

func (c *Component) getWatchedFiles() []discovery.Target {
func (c *Component) getWatchedFiles(targetsUpdated bool) []discovery.Target {
start := time.Now()
paths := make([]discovery.Target, 0)

level.Info(c.opts.Logger).Log("msg", "expanding paths", "targets_updated", targetsUpdated)

// See if there is anything new we need to check.
for _, w := range c.watches {
newPaths, err := w.getPaths()
Expand All @@ -124,5 +144,8 @@ func (c *Component) getWatchedFiles() []discovery.Target {
}
paths = append(paths, newPaths...)
}

level.Info(c.opts.Logger).Log("msg", "finish expanding paths", "paths", len(paths), "duration", time.Since(start))

return paths
}
22 changes: 11 additions & 11 deletions internal/component/local/file_match/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestFile(t *testing.T) {
go c.Run(ct)
time.Sleep(20 * time.Millisecond)
ct.Done()
foundFiles := c.getWatchedFiles()
foundFiles := c.getWatchedFiles(false)
require.Len(t, foundFiles, 1)
require.True(t, contains(foundFiles, "t1.txt"))
}
Expand All @@ -58,7 +58,7 @@ func TestDirectoryFile(t *testing.T) {
go c.Run(ct)
time.Sleep(20 * time.Millisecond)
ct.Done()
foundFiles := c.getWatchedFiles()
foundFiles := c.getWatchedFiles(false)
require.Len(t, foundFiles, 1)
require.True(t, contains(foundFiles, "t1.txt"))
}
Expand All @@ -80,14 +80,14 @@ func TestFileIgnoreOlder(t *testing.T) {
c.Update(c.args)
go c.Run(ct)

foundFiles := c.getWatchedFiles()
foundFiles := c.getWatchedFiles(false)
require.Len(t, foundFiles, 1)
require.True(t, contains(foundFiles, "t1.txt"))
time.Sleep(150 * time.Millisecond)

writeFile(t, dir, "t2.txt")
ct.Done()
foundFiles = c.getWatchedFiles()
foundFiles = c.getWatchedFiles(false)
require.Len(t, foundFiles, 1)
require.True(t, contains(foundFiles, "t2.txt"))
}
Expand All @@ -110,7 +110,7 @@ func TestAddingFile(t *testing.T) {
time.Sleep(20 * time.Millisecond)
writeFile(t, dir, "t2.txt")
ct.Done()
foundFiles := c.getWatchedFiles()
foundFiles := c.getWatchedFiles(false)
require.Len(t, foundFiles, 2)
require.True(t, contains(foundFiles, "t1.txt"))
require.True(t, contains(foundFiles, "t2.txt"))
Expand Down Expand Up @@ -138,7 +138,7 @@ func TestAddingFileInSubDir(t *testing.T) {
require.NoError(t, err)
time.Sleep(20 * time.Millisecond)
ct.Done()
foundFiles := c.getWatchedFiles()
foundFiles := c.getWatchedFiles(false)
require.Len(t, foundFiles, 3)
require.True(t, contains(foundFiles, "t1.txt"))
require.True(t, contains(foundFiles, "t2.txt"))
Expand Down Expand Up @@ -175,7 +175,7 @@ func TestAddingFileInAnExcludedSubDir(t *testing.T) {
require.NoError(t, err)
time.Sleep(20 * time.Millisecond)
ct.Done()
foundFiles := c.getWatchedFiles()
foundFiles := c.getWatchedFiles(false)
require.Len(t, foundFiles, 3)
require.True(t, contains(foundFiles, "t1.txt"))
require.True(t, contains(foundFiles, "t2.txt"))
Expand Down Expand Up @@ -204,7 +204,7 @@ func TestAddingRemovingFileInSubDir(t *testing.T) {
err := os.WriteFile(path.Join(subdir, "t3.txt"), []byte("asdf"), 0664)
require.NoError(t, err)
time.Sleep(100 * time.Millisecond)
foundFiles := c.getWatchedFiles()
foundFiles := c.getWatchedFiles(false)
require.Len(t, foundFiles, 3)
require.True(t, contains(foundFiles, "t1.txt"))
require.True(t, contains(foundFiles, "t2.txt"))
Expand All @@ -213,7 +213,7 @@ func TestAddingRemovingFileInSubDir(t *testing.T) {
err = os.RemoveAll(subdir)
require.NoError(t, err)
time.Sleep(1000 * time.Millisecond)
foundFiles = c.getWatchedFiles()
foundFiles = c.getWatchedFiles(false)
require.Len(t, foundFiles, 2)
require.True(t, contains(foundFiles, "t1.txt"))
require.True(t, contains(foundFiles, "t2.txt"))
Expand All @@ -237,7 +237,7 @@ func TestExclude(t *testing.T) {
os.Mkdir(subdir, 0755)
writeFile(t, subdir, "t3.txt")
time.Sleep(100 * time.Millisecond)
foundFiles := c.getWatchedFiles()
foundFiles := c.getWatchedFiles(false)
require.Len(t, foundFiles, 2)
require.True(t, contains(foundFiles, "t1.txt"))
require.True(t, contains(foundFiles, "t3.txt"))
Expand All @@ -263,7 +263,7 @@ func TestMultiLabels(t *testing.T) {
c.args.SyncPeriod = 10 * time.Millisecond
go c.Run(ct)
time.Sleep(100 * time.Millisecond)
foundFiles := c.getWatchedFiles()
foundFiles := c.getWatchedFiles(false)
require.Len(t, foundFiles, 2)
require.True(t, contains([]discovery.Target{foundFiles[0]}, "t1.txt"))
require.True(t, contains([]discovery.Target{foundFiles[1]}, "t1.txt"))
Expand Down
Loading