Skip to content

Commit

Permalink
Tailer: add option to start tailing at specified seek offset
Browse files Browse the repository at this point in the history
  • Loading branch information
virtuald committed Oct 10, 2018
1 parent 691b510 commit d6093c0
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 7 deletions.
35 changes: 28 additions & 7 deletions tailer/fileTailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,39 @@ func (f *fileTailer) Errors() chan Error {
return f.errors
}

func getSeekArgs(readall bool) (bool, int64, int) {
if readall {
return false, 0, 0
} else {
return true, 0, io.SeekEnd
}
}

func RunFseventFileTailer(path string, readall bool, failOnMissingFile bool, logger simpleLogger) Tailer {
return runFileTailer(path, readall, failOnMissingFile, logger, NewFseventWatcher)
seek, offset, whence := getSeekArgs(readall)
return runFileTailer(path, failOnMissingFile, logger, seek, offset, whence, NewFseventWatcher)
}

func RunFseventFileTailerWithSeek(path string, failOnMissingFile bool, logger simpleLogger, offset int64, whence int) Tailer {
return runFileTailer(path, failOnMissingFile, logger, true, offset, whence, NewFseventWatcher)
}

func RunPollingFileTailer(path string, readall bool, failOnMissingFile bool, pollIntervall time.Duration, logger simpleLogger) Tailer {
seek, offset, whence := getSeekArgs(readall)
makeWatcher := func(abspath string, _ *File) (Watcher, error) {
return NewPollingWatcher(abspath, pollIntervall)
}
return runFileTailer(path, failOnMissingFile, logger, seek, offset, whence, makeWatcher)
}

func RunPollingFileTailerWithSeek(path string, failOnMissingFile bool, pollIntervall time.Duration, logger simpleLogger, offset int64, whence int) Tailer {
makeWatcher := func(abspath string, _ *File) (Watcher, error) {
return NewPollingWatcher(abspath, pollIntervall)
}
return runFileTailer(path, readall, failOnMissingFile, logger, makeWatcher)
return runFileTailer(path, failOnMissingFile, logger, true, offset, whence, makeWatcher)
}

func runFileTailer(path string, readall bool, failOnMissingFile bool, logger simpleLogger, makeWatcher func(string, *File) (Watcher, error)) Tailer {
func runFileTailer(path string, failOnMissingFile bool, logger simpleLogger, seek bool, seekOffset int64, seekWhence int, makeWatcher func(string, *File) (Watcher, error)) Tailer {
if logger == nil {
logger = &nilLogger{}
}
Expand All @@ -71,7 +92,7 @@ func runFileTailer(path string, readall bool, failOnMissingFile bool, logger sim
closed: false,
}

file, abspath, err := openLogfile(path, readall, failOnMissingFile) // file may be nil if failOnMissingFile is false and the file doesn't exist yet.
file, abspath, err := openLogfile(path, failOnMissingFile, seek, seekOffset, seekWhence) // file may be nil if failOnMissingFile is false and the file doesn't exist yet.
if err != nil {
go func(err error) {
writeError(errors, done, err, "failed to initialize file system watcher for %v", path)
Expand Down Expand Up @@ -173,7 +194,7 @@ func runFileTailer(path string, readall bool, failOnMissingFile bool, logger sim
}

// may return *File == nil if the file does not exist and failOnMissingFile == false
func openLogfile(path string, readall bool, failOnMissingFile bool) (*File, string, error) {
func openLogfile(path string, failOnMissingFile bool, seek bool, seekOffset int64, seekWhence int) (*File, string, error) {
abspath, err := filepath.Abs(path)
if err != nil {
return nil, "", err
Expand All @@ -182,8 +203,8 @@ func openLogfile(path string, readall bool, failOnMissingFile bool) (*File, stri
if err != nil && (failOnMissingFile || !os.IsNotExist(err)) {
return nil, "", err
}
if !readall && file != nil {
_, err = file.Seek(0, io.SeekEnd)
if seek && file != nil {
_, err = file.Seek(seekOffset, seekWhence)
if err != nil {
if file != nil {
file.Close()
Expand Down
29 changes: 29 additions & 0 deletions tailer/fileTailer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package tailer

import (
"fmt"
"io"
"io/ioutil"
"os"
"os/user"
Expand Down Expand Up @@ -211,6 +212,34 @@ func TestFileMissingOnStartup(t *testing.T) {
expect(t, log, tail.Lines(), "test line 2", 1*time.Second)
}

func TestFileSeek(t *testing.T) {
const logfileName = "grok_exporter_seek_logfile.log"
log := NewTestRunLogger(400)
tmpDir := mkTmpDirOrFail(t)
defer cleanUp(t, tmpDir)
var logfile = fmt.Sprintf("%s%c%s", tmpDir, os.PathSeparator, logfileName)

logFileWriter := newLogFileWriter(t, logfile, closeFileAfterEachLine)
logFileWriter.writeLine(t, log, "test line 1")
logFileWriter.writeLine(t, log, "test line 2")

tail := RunFseventFileTailerWithSeek(logfile, true, log, 12, io.SeekStart)
defer tail.Close()

// We don't expect errors. However, start a go-routine listening on
// the tailer's errorChannel in case something goes wrong.
go func() {
for err := range tail.Errors() {
t.Errorf("Tailer failed: %v", err.Error()) // Cannot call t.Fatalf() in other goroutine.
}
}()

expect(t, log, tail.Lines(), "test line 2", 1*time.Second)

logFileWriter.writeLine(t, log, "test line 3")
expect(t, log, tail.Lines(), "test line 3", 1*time.Second)
}

func TestShutdownDuringSyscall(t *testing.T) {
runTestShutdown(t, "shutdown while the watcher is hanging in the blocking kevent() or syscall.Read() call")
}
Expand Down

0 comments on commit d6093c0

Please sign in to comment.