Skip to content

Commit

Permalink
feat(wait): log sub match callback
Browse files Browse the repository at this point in the history
Add support for a sub match callback to wait.LogStrategy which allows containers
to process the matched pattern storing details or otherwise validating them.

The callback can return a PermanentError if no more retries should be attempted.
  • Loading branch information
stevenh committed Dec 20, 2024
1 parent abe0f82 commit 50bfda7
Show file tree
Hide file tree
Showing 3 changed files with 246 additions and 74 deletions.
40 changes: 39 additions & 1 deletion docs/features/wait/log.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
The Log wait strategy will check if a string occurs in the container logs for a desired number of times, and allows to set the following conditions:

- the string to be waited for in the container log.
- the number of occurrences of the string to wait for, default is `1`.
- the number of occurrences of the string to wait for, default is `1` (ignored for Submatch).
- look for the string using a regular expression, default is `false`.
- the startup timeout to be used in seconds, default is 60 seconds.
- the poll interval to be used in milliseconds, default is 100 milliseconds.
- the regular expression submatch callback, default nil (occurrences is ignored).

```golang
req := ContainerRequest{
Expand All @@ -33,3 +34,40 @@ req := ContainerRequest{
WaitingFor: wait.ForLog(`.*MySQL Community Server`).AsRegexp(),
}
```

Using regular expression with submatch:

```golang
var host, port string
req := ContainerRequest{
Image: "ollama/ollama:0.1.25",
ExposedPorts: []string{"11434/tcp"},
WaitingFor: wait.ForLog(`Listening on (.*:\d+) \(version\s(.*)\)`).Submatch(func(pattern string, submatches [][][]byte) error {
var err error
for _, matches := range submatches {
if len(matches) != 3 {
err = fmt.Errorf("`%s` matched %d times, expected %d", pattern, len(matches), 3)
continue
}
host, port, err = net.SplitHostPort(string(matches[1]))
if err != nil {
return wait.NewPermanentError(fmt.Errorf("split host port: %w", err))
}

// Host and port successfully extracted from log.
return nil
}

if err != nil {
// Return the last error encountered.
return err
}

return fmt.Errorf("address and version not found: `%s` no matches", pattern)
}),
}
```

If the return from a Submatch callback function is a `wait.PermanentError` the
wait will stop and the error will be returned. Use `wait.NewPermanentError(err error)`
to achieve this.
111 changes: 90 additions & 21 deletions wait/log.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package wait

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"regexp"
"strings"
"time"
)

Expand All @@ -14,6 +16,21 @@ var (
_ StrategyTimeout = (*LogStrategy)(nil)
)

// PermanentError is a special error that will stop the wait and return an error.
type PermanentError struct {
err error
}

// Error implements the error interface.
func (e *PermanentError) Error() string {
return e.err.Error()
}

// NewPermanentError creates a new PermanentError.
func NewPermanentError(err error) *PermanentError {
return &PermanentError{err: err}
}

// LogStrategy will wait until a given log entry shows up in the docker logs
type LogStrategy struct {
// all Strategies should have a startupTimeout to avoid waiting infinitely
Expand All @@ -24,6 +41,18 @@ type LogStrategy struct {
IsRegexp bool
Occurrence int
PollInterval time.Duration

// check is the function that will be called to check if the log entry is present.
check func([]byte) error

// submatchCallback is a callback that will be called with the sub matches of the regexp.
submatchCallback func(pattern string, matches [][][]byte) error

// re is the optional compiled regexp.
re *regexp.Regexp

// log byte slice version of [LogStrategy.Log] used for count checks.
log []byte
}

// NewLogStrategy constructs with polling interval of 100 milliseconds and startup timeout of 60 seconds by default
Expand All @@ -46,6 +75,18 @@ func (ws *LogStrategy) AsRegexp() *LogStrategy {
return ws
}

// Submatch configures a function that will be called with the result of
// [regexp.Regexp.FindAllSubmatch], allowing the caller to process the results.
// If the callback returns nil, the strategy will be considered successful.
// Returning a [PermanentError] will stop the wait and return an error, otherwise
// it will retry until the timeout is reached.
// [LogStrategy.Occurrence] is ignored if this option is set.
func (ws *LogStrategy) Submatch(callback func(pattern string, matches [][][]byte) error) *LogStrategy {
ws.submatchCallback = callback

return ws
}

// WithStartupTimeout can be used to change the default startup timeout
func (ws *LogStrategy) WithStartupTimeout(timeout time.Duration) *LogStrategy {
ws.timeout = &timeout
Expand Down Expand Up @@ -89,57 +130,85 @@ func (ws *LogStrategy) WaitUntilReady(ctx context.Context, target StrategyTarget
timeout = *ws.timeout
}

switch {
case ws.submatchCallback != nil:
ws.re = regexp.MustCompile(ws.Log)
ws.check = ws.checkSubmatch
case ws.IsRegexp:
ws.re = regexp.MustCompile(ws.Log)
ws.check = ws.checkRegexp
default:
ws.log = []byte(ws.Log)
ws.check = ws.checkCount
}

ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()

length := 0

LOOP:
var lastLen int
var lastError error
for {
select {
case <-ctx.Done():
return ctx.Err()
return errors.Join(lastError, ctx.Err())
default:
checkErr := checkTarget(ctx, target)

reader, err := target.Logs(ctx)
if err != nil {
// TODO: fix as this will wait for timeout if the logs are not available.
time.Sleep(ws.PollInterval)
continue
}

b, err := io.ReadAll(reader)
if err != nil {
// TODO: fix as this will wait for timeout if the logs are not readable.
time.Sleep(ws.PollInterval)
continue
}

logs := string(b)

switch {
case length == len(logs) && checkErr != nil:
if lastLen == len(b) && checkErr != nil {
// Log length hasn't changed so we're not making progress.
return checkErr
case checkLogsFn(ws, b):
break LOOP
default:
length = len(logs)
}

if err := ws.check(b); err != nil {
var errPermanent *PermanentError
if errors.As(err, &errPermanent) {
return err
}

lastError = err
lastLen = len(b)
time.Sleep(ws.PollInterval)
continue
}

return nil
}
}
}

// checkCount checks if the log entry is present in the logs using a string count.
func (ws *LogStrategy) checkCount(b []byte) error {
if count := bytes.Count(b, ws.log); count < ws.Occurrence {
return fmt.Errorf("%q matched %d times, expected %d", ws.Log, count, ws.Occurrence)
}

return nil
}

func checkLogsFn(ws *LogStrategy, b []byte) bool {
if ws.IsRegexp {
re := regexp.MustCompile(ws.Log)
occurrences := re.FindAll(b, -1)

return len(occurrences) >= ws.Occurrence
// checkRegexp checks if the log entry is present in the logs using a regexp count.
func (ws *LogStrategy) checkRegexp(b []byte) error {
if matches := ws.re.FindAll(b, -1); len(matches) < ws.Occurrence {
return fmt.Errorf("`%s` matched %d times, expected %d", ws.Log, len(matches), ws.Occurrence)
}

logs := string(b)
return strings.Count(logs, ws.Log) >= ws.Occurrence
return nil
}

// checkSubmatch checks if the log entry is present in the logs using a regexp sub match callback.
func (ws *LogStrategy) checkSubmatch(b []byte) error {
return ws.submatchCallback(ws.Log, ws.re.FindAllSubmatch(b, -1))
}
Loading

0 comments on commit 50bfda7

Please sign in to comment.