Skip to content

Commit

Permalink
fix: container logging deadlocks (testcontainers#2791)
Browse files Browse the repository at this point in the history
Refactor container log handling simplifying the logic fixing various issues
with error handling and race conditions between the complex combinations of
multiple channels that have been causing random deadlocks in tests.

The new version has simple for loop with an inter call to ContainerLogs
and stdcopy.StdCopy leveraging an adapter between io.Writer and
LogConsumer.

This could be used to easily expose separate stdout and stderr handlers.
stevenh authored Sep 25, 2024
1 parent 738e8fc commit e02e8eb
Showing 4 changed files with 172 additions and 151 deletions.
261 changes: 137 additions & 124 deletions docker.go
Original file line number Diff line number Diff line change
@@ -5,7 +5,6 @@ import (
"bufio"
"context"
"encoding/base64"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
@@ -17,7 +16,6 @@ import (
"path/filepath"
"regexp"
"strings"
"sync"
"time"

"github.com/cenkalti/backoff/v4"
@@ -30,6 +28,7 @@ import (
"github.com/docker/docker/client"
"github.com/docker/docker/errdefs"
"github.com/docker/docker/pkg/jsonmessage"
"github.com/docker/docker/pkg/stdcopy"
"github.com/docker/go-connections/nat"
"github.com/moby/term"
specs "github.com/opencontainers/image-spec/specs-go/v1"
@@ -48,11 +47,21 @@ const (
Podman = "podman"
ReaperDefault = "reaper_default" // Default network name when bridge is not available
packagePath = "github.com/testcontainers/testcontainers-go"

logStoppedForOutOfSyncMessage = "Stopping log consumer: Headers out of sync"
)

var createContainerFailDueToNameConflictRegex = regexp.MustCompile("Conflict. The container name .* is already in use by container .*")
var (
// createContainerFailDueToNameConflictRegex is a regular expression that matches the container is already in use error.
createContainerFailDueToNameConflictRegex = regexp.MustCompile("Conflict. The container name .* is already in use by container .*")

// minLogProductionTimeout is the minimum log production timeout.
minLogProductionTimeout = time.Duration(5 * time.Second)

// maxLogProductionTimeout is the maximum log production timeout.
maxLogProductionTimeout = time.Duration(60 * time.Second)

// errLogProductionStop is the cause for stopping log production.
errLogProductionStop = errors.New("log production stopped")
)

// DockerContainer represents a container started using Docker
type DockerContainer struct {
@@ -65,23 +74,19 @@ type DockerContainer struct {
isRunning bool
imageWasBuilt bool
// keepBuiltImage makes Terminate not remove the image if imageWasBuilt.
keepBuiltImage bool
provider *DockerProvider
sessionID string
terminationSignal chan bool
consumers []LogConsumer
logProductionError chan error
keepBuiltImage bool
provider *DockerProvider
sessionID string
terminationSignal chan bool
consumers []LogConsumer

// TODO: Remove locking and wait group once the deprecated StartLogProducer and
// StopLogProducer have been removed and hence logging can only be started and
// stopped once.

// logProductionWaitGroup is used to signal when the log production has stopped.
// This allows stopLogProduction to safely set logProductionStop to nil.
// See simplification in https://go.dev/play/p/x0pOElF2Vjf
logProductionWaitGroup sync.WaitGroup

logProductionStop chan struct{}
// logProductionCancel is used to signal the log production to stop.
logProductionCancel context.CancelCauseFunc
logProductionCtx context.Context

logProductionTimeout *time.Duration
logger Logging
@@ -263,7 +268,6 @@ func (c *DockerContainer) Stop(ctx context.Context, timeout *time.Duration) erro
// without exposing the ability to fully initialize the container state.
// See: https://github.com/testcontainers/testcontainers-go/issues/2667
// TODO: Add a check for isRunning when the above issue is resolved.

err := c.stoppingHook(ctx)
if err != nil {
return fmt.Errorf("stopping hook: %w", err)
@@ -310,7 +314,7 @@ func (c *DockerContainer) Terminate(ctx context.Context) error {
}

select {
// close reaper if it was created
// Close reaper connection if it was attached.
case c.terminationSignal <- true:
default:
}
@@ -690,6 +694,29 @@ func (c *DockerContainer) copyToContainer(ctx context.Context, fileContent func(
return nil
}

// logConsumerWriter is a writer that writes to a LogConsumer.
type logConsumerWriter struct {
log Log
consumers []LogConsumer
}

// newLogConsumerWriter creates a new logConsumerWriter for logType that sends messages to all consumers.
func newLogConsumerWriter(logType string, consumers []LogConsumer) *logConsumerWriter {
return &logConsumerWriter{
log: Log{LogType: logType},
consumers: consumers,
}
}

// Write writes the p content to all consumers.
func (lw logConsumerWriter) Write(p []byte) (int, error) {
lw.log.Content = p
for _, consumer := range lw.consumers {
consumer.Accept(lw.log)
}
return len(p), nil
}

type LogProductionOption func(*DockerContainer)

// WithLogProductionTimeout is a functional option that sets the timeout for the log production.
@@ -707,124 +734,94 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context, opts ...LogProdu

// startLogProduction will start a concurrent process that will continuously read logs
// from the container and will send them to each added LogConsumer.
//
// Default log production timeout is 5s. It is used to set the context timeout
// which means that each log-reading loop will last at least the specified timeout
// and that it cannot be cancelled earlier.
// which means that each log-reading loop will last at up to the specified timeout.
//
// Use functional option WithLogProductionTimeout() to override default timeout. If it's
// lower than 5s and greater than 60s it will be set to 5s or 60s respectively.
func (c *DockerContainer) startLogProduction(ctx context.Context, opts ...LogProductionOption) error {
c.logProductionStop = make(chan struct{}, 1) // buffered channel to avoid blocking
c.logProductionWaitGroup.Add(1)

for _, opt := range opts {
opt(c)
}

minLogProductionTimeout := time.Duration(5 * time.Second)
maxLogProductionTimeout := time.Duration(60 * time.Second)

if c.logProductionTimeout == nil {
// Validate the log production timeout.
switch {
case c.logProductionTimeout == nil:
c.logProductionTimeout = &minLogProductionTimeout
}

if *c.logProductionTimeout < minLogProductionTimeout {
case *c.logProductionTimeout < minLogProductionTimeout:
c.logProductionTimeout = &minLogProductionTimeout
}

if *c.logProductionTimeout > maxLogProductionTimeout {
case *c.logProductionTimeout > maxLogProductionTimeout:
c.logProductionTimeout = &maxLogProductionTimeout
}

c.logProductionError = make(chan error, 1)
// Setup the log writers.
stdout := newLogConsumerWriter(StdoutLog, c.consumers)
stderr := newLogConsumerWriter(StderrLog, c.consumers)

// Setup the log production context which will be used to stop the log production.
c.logProductionCtx, c.logProductionCancel = context.WithCancelCause(ctx)

go func() {
defer func() {
close(c.logProductionError)
c.logProductionWaitGroup.Done()
}()

since := ""
// if the socket is closed we will make additional logs request with updated Since timestamp
BEGIN:
options := container.LogsOptions{
ShowStdout: true,
ShowStderr: true,
Follow: true,
Since: since,
}
err := c.logProducer(stdout, stderr)
// Set context cancel cause, if not already set.
c.logProductionCancel(err)
}()

ctx, cancel := context.WithTimeout(ctx, *c.logProductionTimeout)
return nil
}

// logProducer read logs from the container and writes them to stdout, stderr until either:
// - logProductionCtx is done
// - A fatal error occurs
// - No more logs are available
func (c *DockerContainer) logProducer(stdout, stderr io.Writer) error {
// Clean up idle client connections.
defer c.provider.Close()

// Setup the log options, start from the beginning.
options := container.LogsOptions{
ShowStdout: true,
ShowStderr: true,
Follow: true,
}

for {
timeoutCtx, cancel := context.WithTimeout(c.logProductionCtx, *c.logProductionTimeout)
defer cancel()

r, err := c.provider.client.ContainerLogs(ctx, c.GetContainerID(), options)
if err != nil {
c.logProductionError <- err
return
err := c.copyLogs(timeoutCtx, stdout, stderr, options)
switch {
case err == nil:
// No more logs available.
return nil
case c.logProductionCtx.Err() != nil:
// Log production was stopped or caller context is done.
return nil
case timeoutCtx.Err() != nil, errors.Is(err, net.ErrClosed):
// Timeout or client connection closed, retry.
default:
// Unexpected error, retry.
Logger.Printf("Unexpected error reading logs: %v", err)
}
defer c.provider.Close()

for {
select {
case <-c.logProductionStop:
c.logProductionError <- r.Close()
return
default:
}
h := make([]byte, 8)
_, err := io.ReadFull(r, h)
if err != nil {
switch {
case err == io.EOF:
// No more logs coming
case errors.Is(err, net.ErrClosed):
now := time.Now()
since = fmt.Sprintf("%d.%09d", now.Unix(), int64(now.Nanosecond()))
goto BEGIN
case errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled):
// Probably safe to continue here
continue
default:
_, _ = fmt.Fprintf(os.Stderr, "container log error: %+v. %s", err, logStoppedForOutOfSyncMessage)
// if we would continue here, the next header-read will result into random data...
}
return
}

count := binary.BigEndian.Uint32(h[4:])
if count == 0 {
continue
}
logType := h[0]
if logType > 2 {
_, _ = fmt.Fprintf(os.Stderr, "received invalid log type: %d", logType)
// sometimes docker returns logType = 3 which is an undocumented log type, so treat it as stdout
logType = 1
}
// Retry from the last log received.
now := time.Now()
options.Since = fmt.Sprintf("%d.%09d", now.Unix(), int64(now.Nanosecond()))
}
}

// a map of the log type --> int representation in the header, notice the first is blank, this is stdin, but the go docker client doesn't allow following that in logs
logTypes := []string{"", StdoutLog, StderrLog}
// copyLogs copies logs from the container to stdout and stderr.
func (c *DockerContainer) copyLogs(ctx context.Context, stdout, stderr io.Writer, options container.LogsOptions) error {
rc, err := c.provider.client.ContainerLogs(ctx, c.GetContainerID(), options)
if err != nil {
return fmt.Errorf("container logs: %w", err)
}
defer rc.Close()

b := make([]byte, count)
_, err = io.ReadFull(r, b)
if err != nil {
// TODO: add-logger: use logger to log out this error
_, _ = fmt.Fprintf(os.Stderr, "error occurred reading log with known length %s", err.Error())
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
// Probably safe to continue here
continue
}
// we can not continue here as the next read most likely will not be the next header
_, _ = fmt.Fprintln(os.Stderr, logStoppedForOutOfSyncMessage)
return
}
for _, c := range c.consumers {
c.Accept(Log{
LogType: logTypes[logType],
Content: b,
})
}
}
}()
if _, err = stdcopy.StdCopy(stdout, stderr, rc); err != nil {
return fmt.Errorf("stdcopy: %w", err)
}

return nil
}
@@ -837,18 +834,25 @@ func (c *DockerContainer) StopLogProducer() error {
// stopLogProduction will stop the concurrent process that is reading logs
// and sending them to each added LogConsumer
func (c *DockerContainer) stopLogProduction() error {
// signal the log production to stop
c.logProductionStop <- struct{}{}
if c.logProductionCancel == nil {
return nil
}

c.logProductionWaitGroup.Wait()
// Signal the log production to stop.
c.logProductionCancel(errLogProductionStop)

if err := <-c.logProductionError; err != nil {
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
// Returning context errors is not useful for the consumer.
if err := context.Cause(c.logProductionCtx); err != nil {
switch {
case errors.Is(err, errLogProductionStop):
// Log production was stopped.
return nil
case errors.Is(err, context.DeadlineExceeded),
errors.Is(err, context.Canceled):
// Parent context is done.
return nil
default:
return err
}

return err
}

return nil
@@ -857,7 +861,16 @@ func (c *DockerContainer) stopLogProduction() error {
// GetLogProductionErrorChannel exposes the only way for the consumer
// to be able to listen to errors and react to them.
func (c *DockerContainer) GetLogProductionErrorChannel() <-chan error {
return c.logProductionError
if c.logProductionCtx == nil {
return nil
}

errCh := make(chan error, 1)
go func() {
<-c.logProductionCtx.Done()
errCh <- context.Cause(c.logProductionCtx)
}()
return errCh
}

// DockerNetwork represents a network started using Docker
15 changes: 15 additions & 0 deletions docker_test.go
Original file line number Diff line number Diff line change
@@ -240,6 +240,15 @@ func TestContainerReturnItsContainerID(t *testing.T) {
}
}

// testLogConsumer is a simple implementation of LogConsumer that logs to the test output.
type testLogConsumer struct {
t *testing.T
}

func (l *testLogConsumer) Accept(log Log) {
l.t.Log(log.LogType + ": " + strings.TrimSpace(string(log.Content)))
}

func TestContainerTerminationResetsState(t *testing.T) {
ctx := context.Background()

@@ -250,6 +259,9 @@ func TestContainerTerminationResetsState(t *testing.T) {
ExposedPorts: []string{
nginxDefaultPort,
},
LogConsumerCfg: &LogConsumerConfig{
Consumers: []LogConsumer{&testLogConsumer{t: t}},
},
},
Started: true,
})
@@ -274,6 +286,9 @@ func TestContainerStateAfterTermination(t *testing.T) {
ExposedPorts: []string{
nginxDefaultPort,
},
LogConsumerCfg: &LogConsumerConfig{
Consumers: []LogConsumer{&testLogConsumer{t: t}},
},
},
Started: true,
})
1 change: 0 additions & 1 deletion lifecycle.go
Original file line number Diff line number Diff line change
@@ -190,7 +190,6 @@ var defaultLogConsumersHook = func(cfg *LogConsumerConfig) ContainerLifecycleHoo
}

dockerContainer := c.(*DockerContainer)

return dockerContainer.stopLogProduction()
},
},
46 changes: 20 additions & 26 deletions logconsumer_test.go
Original file line number Diff line number Diff line change
@@ -520,17 +520,24 @@ func Test_StartLogProductionStillStartsWithTooHighTimeout(t *testing.T) {
require.NoError(t, dc.stopLogProduction())
}

// bufLogger is a Logging implementation that writes to a bytes.Buffer.
type bufLogger struct {
bytes.Buffer
}

// Printf implements Logging.
func (l *bufLogger) Printf(format string, v ...any) {
fmt.Fprintf(l, format, v...)
}

func Test_MultiContainerLogConsumer_CancelledContext(t *testing.T) {
// Redirect stderr to a buffer
r, w, err := os.Pipe()
require.NoError(t, err)
oldStderr := os.Stderr
os.Stderr = w
defer func() {
// Restore stderr
os.Stderr = oldStderr
w.Close()
}()
// Capture global logger.
logger := &bufLogger{}
Logger = logger
oldLogger := Logger
t.Cleanup(func() {
Logger = oldLogger
})

// Context with cancellation functionality for simulating user interruption
ctx, cancel := context.WithCancel(context.Background())
@@ -613,23 +620,10 @@ func Test_MultiContainerLogConsumer_CancelledContext(t *testing.T) {

// We check log size due to context cancellation causing
// varying message counts, leading to test failure.
assert.GreaterOrEqual(t, len(first.Msgs()), 2)
assert.GreaterOrEqual(t, len(second.Msgs()), 2)

// Close the pipe so as not to block on empty.
w.Close()

// Read the stderr output from the buffer
var buf bytes.Buffer
_, _ = buf.ReadFrom(r)

// Check the stderr message
actual := buf.String()
require.GreaterOrEqual(t, len(first.Msgs()), 2)
require.GreaterOrEqual(t, len(second.Msgs()), 2)

// The context cancel shouldn't cause the system to throw a
// logStoppedForOutOfSyncMessage, as it hangs the system with
// the multiple containers.
require.NotContains(t, actual, logStoppedForOutOfSyncMessage)
require.NotContains(t, logger.String(), "Unexpected error reading logs")
}

// FooLogConsumer is a test log consumer that accepts logs from the

0 comments on commit e02e8eb

Please sign in to comment.