From e02e8ebe7d320e573864141fe6d0a74dcdecf2fd Mon Sep 17 00:00:00 2001 From: Steven Hartland Date: Wed, 25 Sep 2024 07:46:09 -0400 Subject: [PATCH] fix: container logging deadlocks (#2791) Refactor container log handling simplifying the logic fixing various issues with error handling and race conditions between the complex combinations of multiple channels that have been causing random deadlocks in tests. The new version has simple for loop with an inter call to ContainerLogs and stdcopy.StdCopy leveraging an adapter between io.Writer and LogConsumer. This could be used to easily expose separate stdout and stderr handlers. --- docker.go | 261 +++++++++++++++++++++++--------------------- docker_test.go | 15 +++ lifecycle.go | 1 - logconsumer_test.go | 46 ++++---- 4 files changed, 172 insertions(+), 151 deletions(-) diff --git a/docker.go b/docker.go index dcd962ffc8..9319c630dd 100644 --- a/docker.go +++ b/docker.go @@ -5,7 +5,6 @@ import ( "bufio" "context" "encoding/base64" - "encoding/binary" "encoding/json" "errors" "fmt" @@ -17,7 +16,6 @@ import ( "path/filepath" "regexp" "strings" - "sync" "time" "github.com/cenkalti/backoff/v4" @@ -30,6 +28,7 @@ import ( "github.com/docker/docker/client" "github.com/docker/docker/errdefs" "github.com/docker/docker/pkg/jsonmessage" + "github.com/docker/docker/pkg/stdcopy" "github.com/docker/go-connections/nat" "github.com/moby/term" specs "github.com/opencontainers/image-spec/specs-go/v1" @@ -48,11 +47,21 @@ const ( Podman = "podman" ReaperDefault = "reaper_default" // Default network name when bridge is not available packagePath = "github.com/testcontainers/testcontainers-go" - - logStoppedForOutOfSyncMessage = "Stopping log consumer: Headers out of sync" ) -var createContainerFailDueToNameConflictRegex = regexp.MustCompile("Conflict. The container name .* is already in use by container .*") +var ( + // createContainerFailDueToNameConflictRegex is a regular expression that matches the container is already in use error. + createContainerFailDueToNameConflictRegex = regexp.MustCompile("Conflict. The container name .* is already in use by container .*") + + // minLogProductionTimeout is the minimum log production timeout. + minLogProductionTimeout = time.Duration(5 * time.Second) + + // maxLogProductionTimeout is the maximum log production timeout. + maxLogProductionTimeout = time.Duration(60 * time.Second) + + // errLogProductionStop is the cause for stopping log production. + errLogProductionStop = errors.New("log production stopped") +) // DockerContainer represents a container started using Docker type DockerContainer struct { @@ -65,23 +74,19 @@ type DockerContainer struct { isRunning bool imageWasBuilt bool // keepBuiltImage makes Terminate not remove the image if imageWasBuilt. - keepBuiltImage bool - provider *DockerProvider - sessionID string - terminationSignal chan bool - consumers []LogConsumer - logProductionError chan error + keepBuiltImage bool + provider *DockerProvider + sessionID string + terminationSignal chan bool + consumers []LogConsumer // TODO: Remove locking and wait group once the deprecated StartLogProducer and // StopLogProducer have been removed and hence logging can only be started and // stopped once. - // logProductionWaitGroup is used to signal when the log production has stopped. - // This allows stopLogProduction to safely set logProductionStop to nil. - // See simplification in https://go.dev/play/p/x0pOElF2Vjf - logProductionWaitGroup sync.WaitGroup - - logProductionStop chan struct{} + // logProductionCancel is used to signal the log production to stop. + logProductionCancel context.CancelCauseFunc + logProductionCtx context.Context logProductionTimeout *time.Duration logger Logging @@ -263,7 +268,6 @@ func (c *DockerContainer) Stop(ctx context.Context, timeout *time.Duration) erro // without exposing the ability to fully initialize the container state. // See: https://github.com/testcontainers/testcontainers-go/issues/2667 // TODO: Add a check for isRunning when the above issue is resolved. - err := c.stoppingHook(ctx) if err != nil { return fmt.Errorf("stopping hook: %w", err) @@ -310,7 +314,7 @@ func (c *DockerContainer) Terminate(ctx context.Context) error { } select { - // close reaper if it was created + // Close reaper connection if it was attached. case c.terminationSignal <- true: default: } @@ -690,6 +694,29 @@ func (c *DockerContainer) copyToContainer(ctx context.Context, fileContent func( return nil } +// logConsumerWriter is a writer that writes to a LogConsumer. +type logConsumerWriter struct { + log Log + consumers []LogConsumer +} + +// newLogConsumerWriter creates a new logConsumerWriter for logType that sends messages to all consumers. +func newLogConsumerWriter(logType string, consumers []LogConsumer) *logConsumerWriter { + return &logConsumerWriter{ + log: Log{LogType: logType}, + consumers: consumers, + } +} + +// Write writes the p content to all consumers. +func (lw logConsumerWriter) Write(p []byte) (int, error) { + lw.log.Content = p + for _, consumer := range lw.consumers { + consumer.Accept(lw.log) + } + return len(p), nil +} + type LogProductionOption func(*DockerContainer) // WithLogProductionTimeout is a functional option that sets the timeout for the log production. @@ -707,124 +734,94 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context, opts ...LogProdu // startLogProduction will start a concurrent process that will continuously read logs // from the container and will send them to each added LogConsumer. +// // Default log production timeout is 5s. It is used to set the context timeout -// which means that each log-reading loop will last at least the specified timeout -// and that it cannot be cancelled earlier. +// which means that each log-reading loop will last at up to the specified timeout. +// // Use functional option WithLogProductionTimeout() to override default timeout. If it's // lower than 5s and greater than 60s it will be set to 5s or 60s respectively. func (c *DockerContainer) startLogProduction(ctx context.Context, opts ...LogProductionOption) error { - c.logProductionStop = make(chan struct{}, 1) // buffered channel to avoid blocking - c.logProductionWaitGroup.Add(1) - for _, opt := range opts { opt(c) } - minLogProductionTimeout := time.Duration(5 * time.Second) - maxLogProductionTimeout := time.Duration(60 * time.Second) - - if c.logProductionTimeout == nil { + // Validate the log production timeout. + switch { + case c.logProductionTimeout == nil: c.logProductionTimeout = &minLogProductionTimeout - } - - if *c.logProductionTimeout < minLogProductionTimeout { + case *c.logProductionTimeout < minLogProductionTimeout: c.logProductionTimeout = &minLogProductionTimeout - } - - if *c.logProductionTimeout > maxLogProductionTimeout { + case *c.logProductionTimeout > maxLogProductionTimeout: c.logProductionTimeout = &maxLogProductionTimeout } - c.logProductionError = make(chan error, 1) + // Setup the log writers. + stdout := newLogConsumerWriter(StdoutLog, c.consumers) + stderr := newLogConsumerWriter(StderrLog, c.consumers) + + // Setup the log production context which will be used to stop the log production. + c.logProductionCtx, c.logProductionCancel = context.WithCancelCause(ctx) go func() { - defer func() { - close(c.logProductionError) - c.logProductionWaitGroup.Done() - }() - - since := "" - // if the socket is closed we will make additional logs request with updated Since timestamp - BEGIN: - options := container.LogsOptions{ - ShowStdout: true, - ShowStderr: true, - Follow: true, - Since: since, - } + err := c.logProducer(stdout, stderr) + // Set context cancel cause, if not already set. + c.logProductionCancel(err) + }() - ctx, cancel := context.WithTimeout(ctx, *c.logProductionTimeout) + return nil +} + +// logProducer read logs from the container and writes them to stdout, stderr until either: +// - logProductionCtx is done +// - A fatal error occurs +// - No more logs are available +func (c *DockerContainer) logProducer(stdout, stderr io.Writer) error { + // Clean up idle client connections. + defer c.provider.Close() + + // Setup the log options, start from the beginning. + options := container.LogsOptions{ + ShowStdout: true, + ShowStderr: true, + Follow: true, + } + + for { + timeoutCtx, cancel := context.WithTimeout(c.logProductionCtx, *c.logProductionTimeout) defer cancel() - r, err := c.provider.client.ContainerLogs(ctx, c.GetContainerID(), options) - if err != nil { - c.logProductionError <- err - return + err := c.copyLogs(timeoutCtx, stdout, stderr, options) + switch { + case err == nil: + // No more logs available. + return nil + case c.logProductionCtx.Err() != nil: + // Log production was stopped or caller context is done. + return nil + case timeoutCtx.Err() != nil, errors.Is(err, net.ErrClosed): + // Timeout or client connection closed, retry. + default: + // Unexpected error, retry. + Logger.Printf("Unexpected error reading logs: %v", err) } - defer c.provider.Close() - for { - select { - case <-c.logProductionStop: - c.logProductionError <- r.Close() - return - default: - } - h := make([]byte, 8) - _, err := io.ReadFull(r, h) - if err != nil { - switch { - case err == io.EOF: - // No more logs coming - case errors.Is(err, net.ErrClosed): - now := time.Now() - since = fmt.Sprintf("%d.%09d", now.Unix(), int64(now.Nanosecond())) - goto BEGIN - case errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled): - // Probably safe to continue here - continue - default: - _, _ = fmt.Fprintf(os.Stderr, "container log error: %+v. %s", err, logStoppedForOutOfSyncMessage) - // if we would continue here, the next header-read will result into random data... - } - return - } - - count := binary.BigEndian.Uint32(h[4:]) - if count == 0 { - continue - } - logType := h[0] - if logType > 2 { - _, _ = fmt.Fprintf(os.Stderr, "received invalid log type: %d", logType) - // sometimes docker returns logType = 3 which is an undocumented log type, so treat it as stdout - logType = 1 - } + // Retry from the last log received. + now := time.Now() + options.Since = fmt.Sprintf("%d.%09d", now.Unix(), int64(now.Nanosecond())) + } +} - // a map of the log type --> int representation in the header, notice the first is blank, this is stdin, but the go docker client doesn't allow following that in logs - logTypes := []string{"", StdoutLog, StderrLog} +// copyLogs copies logs from the container to stdout and stderr. +func (c *DockerContainer) copyLogs(ctx context.Context, stdout, stderr io.Writer, options container.LogsOptions) error { + rc, err := c.provider.client.ContainerLogs(ctx, c.GetContainerID(), options) + if err != nil { + return fmt.Errorf("container logs: %w", err) + } + defer rc.Close() - b := make([]byte, count) - _, err = io.ReadFull(r, b) - if err != nil { - // TODO: add-logger: use logger to log out this error - _, _ = fmt.Fprintf(os.Stderr, "error occurred reading log with known length %s", err.Error()) - if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { - // Probably safe to continue here - continue - } - // we can not continue here as the next read most likely will not be the next header - _, _ = fmt.Fprintln(os.Stderr, logStoppedForOutOfSyncMessage) - return - } - for _, c := range c.consumers { - c.Accept(Log{ - LogType: logTypes[logType], - Content: b, - }) - } - } - }() + if _, err = stdcopy.StdCopy(stdout, stderr, rc); err != nil { + return fmt.Errorf("stdcopy: %w", err) + } return nil } @@ -837,18 +834,25 @@ func (c *DockerContainer) StopLogProducer() error { // stopLogProduction will stop the concurrent process that is reading logs // and sending them to each added LogConsumer func (c *DockerContainer) stopLogProduction() error { - // signal the log production to stop - c.logProductionStop <- struct{}{} + if c.logProductionCancel == nil { + return nil + } - c.logProductionWaitGroup.Wait() + // Signal the log production to stop. + c.logProductionCancel(errLogProductionStop) - if err := <-c.logProductionError; err != nil { - if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) { - // Returning context errors is not useful for the consumer. + if err := context.Cause(c.logProductionCtx); err != nil { + switch { + case errors.Is(err, errLogProductionStop): + // Log production was stopped. return nil + case errors.Is(err, context.DeadlineExceeded), + errors.Is(err, context.Canceled): + // Parent context is done. + return nil + default: + return err } - - return err } return nil @@ -857,7 +861,16 @@ func (c *DockerContainer) stopLogProduction() error { // GetLogProductionErrorChannel exposes the only way for the consumer // to be able to listen to errors and react to them. func (c *DockerContainer) GetLogProductionErrorChannel() <-chan error { - return c.logProductionError + if c.logProductionCtx == nil { + return nil + } + + errCh := make(chan error, 1) + go func() { + <-c.logProductionCtx.Done() + errCh <- context.Cause(c.logProductionCtx) + }() + return errCh } // DockerNetwork represents a network started using Docker diff --git a/docker_test.go b/docker_test.go index b0a78b346d..bbbe519c28 100644 --- a/docker_test.go +++ b/docker_test.go @@ -240,6 +240,15 @@ func TestContainerReturnItsContainerID(t *testing.T) { } } +// testLogConsumer is a simple implementation of LogConsumer that logs to the test output. +type testLogConsumer struct { + t *testing.T +} + +func (l *testLogConsumer) Accept(log Log) { + l.t.Log(log.LogType + ": " + strings.TrimSpace(string(log.Content))) +} + func TestContainerTerminationResetsState(t *testing.T) { ctx := context.Background() @@ -250,6 +259,9 @@ func TestContainerTerminationResetsState(t *testing.T) { ExposedPorts: []string{ nginxDefaultPort, }, + LogConsumerCfg: &LogConsumerConfig{ + Consumers: []LogConsumer{&testLogConsumer{t: t}}, + }, }, Started: true, }) @@ -274,6 +286,9 @@ func TestContainerStateAfterTermination(t *testing.T) { ExposedPorts: []string{ nginxDefaultPort, }, + LogConsumerCfg: &LogConsumerConfig{ + Consumers: []LogConsumer{&testLogConsumer{t: t}}, + }, }, Started: true, }) diff --git a/lifecycle.go b/lifecycle.go index c38e60240d..ff1472d043 100644 --- a/lifecycle.go +++ b/lifecycle.go @@ -190,7 +190,6 @@ var defaultLogConsumersHook = func(cfg *LogConsumerConfig) ContainerLifecycleHoo } dockerContainer := c.(*DockerContainer) - return dockerContainer.stopLogProduction() }, }, diff --git a/logconsumer_test.go b/logconsumer_test.go index 855a849914..9f4b0b61f9 100644 --- a/logconsumer_test.go +++ b/logconsumer_test.go @@ -520,17 +520,24 @@ func Test_StartLogProductionStillStartsWithTooHighTimeout(t *testing.T) { require.NoError(t, dc.stopLogProduction()) } +// bufLogger is a Logging implementation that writes to a bytes.Buffer. +type bufLogger struct { + bytes.Buffer +} + +// Printf implements Logging. +func (l *bufLogger) Printf(format string, v ...any) { + fmt.Fprintf(l, format, v...) +} + func Test_MultiContainerLogConsumer_CancelledContext(t *testing.T) { - // Redirect stderr to a buffer - r, w, err := os.Pipe() - require.NoError(t, err) - oldStderr := os.Stderr - os.Stderr = w - defer func() { - // Restore stderr - os.Stderr = oldStderr - w.Close() - }() + // Capture global logger. + logger := &bufLogger{} + Logger = logger + oldLogger := Logger + t.Cleanup(func() { + Logger = oldLogger + }) // Context with cancellation functionality for simulating user interruption ctx, cancel := context.WithCancel(context.Background()) @@ -613,23 +620,10 @@ func Test_MultiContainerLogConsumer_CancelledContext(t *testing.T) { // We check log size due to context cancellation causing // varying message counts, leading to test failure. - assert.GreaterOrEqual(t, len(first.Msgs()), 2) - assert.GreaterOrEqual(t, len(second.Msgs()), 2) - - // Close the pipe so as not to block on empty. - w.Close() - - // Read the stderr output from the buffer - var buf bytes.Buffer - _, _ = buf.ReadFrom(r) - - // Check the stderr message - actual := buf.String() + require.GreaterOrEqual(t, len(first.Msgs()), 2) + require.GreaterOrEqual(t, len(second.Msgs()), 2) - // The context cancel shouldn't cause the system to throw a - // logStoppedForOutOfSyncMessage, as it hangs the system with - // the multiple containers. - require.NotContains(t, actual, logStoppedForOutOfSyncMessage) + require.NotContains(t, logger.String(), "Unexpected error reading logs") } // FooLogConsumer is a test log consumer that accepts logs from the