From 4182991eca6f7c502a403e446073c5ad948d7176 Mon Sep 17 00:00:00 2001 From: Steven Hartland Date: Wed, 24 Jul 2024 09:51:25 +0100 Subject: [PATCH] fix(reaper): refactor to allow retries and fix races Refactor how the reaper is created to allow for proper retries of temporary errors such as container not found issues during startup or shutdown races. This eliminates the use of sync.Once which wasn't solving the problem at hand and replaces it with a singleton spawner with locked access. Wrap reaper errors so we can determine the cause of failures more easily. Fix race condition in port wait when loading from container by always waiting for the port first. Remove unnecessary use of buffering and invalid retry logic in reaper connection handling which could never recover correctly from a partial read. Move the reaper creation just before connections are established in compose to ensure its still running when the Connect calls are made. Previously the reaper was requested in NewDockerComposeWith which means it could have already shutdown before connections are made during the later sections of Up if the startup took over 1 minute. This was causing consistent failures for: TestDockerComposeAPIWithWaitLogStrategy Ensure that resource labels are correct so that resources aren't reaped when the reaper is disabled by excluding session id when reaper is disabled. Error when creating a reaper when the config says it's disabled so that we avoid hard to debug issues because a reaper is running when it shouldn't be. --- container.go | 24 +- docker.go | 102 ++--- docker_mounts.go | 4 +- generic.go | 12 +- internal/core/labels.go | 25 +- lifecycle.go | 2 +- modules/compose/compose.go | 18 - modules/compose/compose_api.go | 96 +++-- reaper.go | 569 +++++++++++++++++----------- reaper_test.go | 669 +++++++++++++++------------------ testing.go | 4 +- 11 files changed, 823 insertions(+), 702 deletions(-) diff --git a/container.go b/container.go index 8747335a28e..272bb38d139 100644 --- a/container.go +++ b/container.go @@ -37,17 +37,17 @@ type DeprecatedContainer interface { // Container allows getting info about and controlling a single container instance type Container interface { - GetContainerID() string // get the container id from the provider - Endpoint(context.Context, string) (string, error) // get proto://ip:port string for the lowest exposed port - PortEndpoint(context.Context, nat.Port, string) (string, error) // get proto://ip:port string for the given exposed port - Host(context.Context) (string, error) // get host where the container port is exposed - Inspect(context.Context) (*types.ContainerJSON, error) // get container info - MappedPort(context.Context, nat.Port) (nat.Port, error) // get externally mapped port for a container port - Ports(context.Context) (nat.PortMap, error) // Deprecated: Use c.Inspect(ctx).NetworkSettings.Ports instead - SessionID() string // get session id - IsRunning() bool // IsRunning returns true if the container is running, false otherwise. - Start(context.Context) error // start the container - Stop(context.Context, *time.Duration) error // stop the container + GetContainerID() string // get the container id from the provider + Endpoint(context.Context, string) (string, error) // get proto://ip:port string for the lowest exposed port + PortEndpoint(ctx context.Context, port nat.Port, proto string) (string, error) // get proto://ip:port string for the given exposed port + Host(context.Context) (string, error) // get host where the container port is exposed + Inspect(context.Context) (*types.ContainerJSON, error) // get container info + MappedPort(context.Context, nat.Port) (nat.Port, error) // get externally mapped port for a container port + Ports(context.Context) (nat.PortMap, error) // Deprecated: Use c.Inspect(ctx).NetworkSettings.Ports instead + SessionID() string // get session id + IsRunning() bool // IsRunning returns true if the container is running, false otherwise. + Start(context.Context) error // start the container + Stop(context.Context, *time.Duration) error // stop the container // Terminate stops and removes the container and its image if it was built and not flagged as kept. Terminate(ctx context.Context) error @@ -460,7 +460,7 @@ func (c *ContainerRequest) BuildOptions() (types.ImageBuildOptions, error) { } if !c.ShouldKeepBuiltImage() { - buildOptions.Labels = core.DefaultLabels(core.SessionID()) + buildOptions.Labels = GenericLabels() } // Do this as late as possible to ensure we don't leak the context on error/panic. diff --git a/docker.go b/docker.go index dcd962ffc85..4b94de42965 100644 --- a/docker.go +++ b/docker.go @@ -178,7 +178,7 @@ func (c *DockerContainer) Inspect(ctx context.Context) (*types.ContainerJSON, er func (c *DockerContainer) MappedPort(ctx context.Context, port nat.Port) (nat.Port, error) { inspect, err := c.Inspect(ctx) if err != nil { - return "", err + return "", fmt.Errorf("inspect: %w", err) } if inspect.ContainerJSONBase.HostConfig.NetworkMode == "host" { return port, nil @@ -199,7 +199,7 @@ func (c *DockerContainer) MappedPort(ctx context.Context, port nat.Port) (nat.Po return nat.NewPort(k.Proto(), p[0].HostPort) } - return "", errors.New("port not found") + return "", errdefs.NotFound(fmt.Errorf("port %q not found", port)) } // Deprecated: use c.Inspect(ctx).NetworkSettings.Ports instead. @@ -967,9 +967,7 @@ func (p *DockerProvider) BuildImage(ctx context.Context, img ImageBuildInfo) (st } // CreateContainer fulfils a request for a container without starting it -func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerRequest) (Container, error) { - var err error - +func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerRequest) (con Container, err error) { //nolint:nonamedreturns // Needed for error checking. // defer the close of the Docker client connection the soonest defer p.Close() @@ -1014,22 +1012,23 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque // the reaper does not need to start a reaper for itself isReaperContainer := strings.HasSuffix(imageName, config.ReaperDefaultImage) if !p.config.RyukDisabled && !isReaperContainer { - r, err := reuseOrCreateReaper(context.WithValue(ctx, core.DockerHostContextKey, p.host), core.SessionID(), p) + r, err := spawner.reaper(context.WithValue(ctx, core.DockerHostContextKey, p.host), core.SessionID(), p) if err != nil { - return nil, fmt.Errorf("%w: creating reaper failed", err) + return nil, fmt.Errorf("reaper: %w", err) } - termSignal, err = r.Connect() + + termSignal, err := r.Connect() if err != nil { - return nil, fmt.Errorf("%w: connecting to reaper failed", err) + return nil, fmt.Errorf("reaper connect: %w", err) } - } - // Cleanup on error, otherwise set termSignal to nil before successful return. - defer func() { - if termSignal != nil { - termSignal <- true - } - }() + // Cleanup on error. + defer func() { + if err != nil { + termSignal <- true + } + }() + } if err = req.Validate(); err != nil { return nil, err @@ -1095,10 +1094,9 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque } if !isReaperContainer { - // add the labels that the reaper will use to terminate the container to the request - for k, v := range core.DefaultLabels(core.SessionID()) { - req.Labels[k] = v - } + // Add the labels that identify this as a testcontainers container and + // allow the reaper to terminate it if requested. + AddGenericLabels(req.Labels) } dockerInput := &container.Config{ @@ -1192,9 +1190,6 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque return nil, err } - // Disable cleanup on success - termSignal = nil - return c, nil } @@ -1243,7 +1238,7 @@ func (p *DockerProvider) waitContainerCreation(ctx context.Context, name string) ) } -func (p *DockerProvider) ReuseOrCreateContainer(ctx context.Context, req ContainerRequest) (Container, error) { +func (p *DockerProvider) ReuseOrCreateContainer(ctx context.Context, req ContainerRequest) (con Container, err error) { //nolint:nonamedreturns // Needed for error check. c, err := p.findContainerByName(ctx, req.Name) if err != nil { return nil, err @@ -1266,14 +1261,22 @@ func (p *DockerProvider) ReuseOrCreateContainer(ctx context.Context, req Contain var termSignal chan bool if !p.config.RyukDisabled { - r, err := reuseOrCreateReaper(context.WithValue(ctx, core.DockerHostContextKey, p.host), sessionID, p) + r, err := spawner.reaper(context.WithValue(ctx, core.DockerHostContextKey, p.host), sessionID, p) if err != nil { return nil, fmt.Errorf("reaper: %w", err) } - termSignal, err = r.Connect() + + termSignal, err := r.Connect() if err != nil { - return nil, fmt.Errorf("%w: connecting to reaper failed", err) + return nil, fmt.Errorf("reaper connect: %w", err) } + + // Cleanup on error. + defer func() { + if err != nil { + termSignal <- true + } + }() } // default hooks include logger hook and pre-create hook @@ -1441,9 +1444,7 @@ func daemonHost(ctx context.Context, p *DockerProvider) (string, error) { // Deprecated: use network.New instead // CreateNetwork returns the object representing a new network identified by its name -func (p *DockerProvider) CreateNetwork(ctx context.Context, req NetworkRequest) (Network, error) { - var err error - +func (p *DockerProvider) CreateNetwork(ctx context.Context, req NetworkRequest) (net Network, err error) { //nolint:nonamedreturns // Needed for error check. // defer the close of the Docker client connection the soonest defer p.Close() @@ -1472,31 +1473,30 @@ func (p *DockerProvider) CreateNetwork(ctx context.Context, req NetworkRequest) var termSignal chan bool if !p.config.RyukDisabled { - r, err := reuseOrCreateReaper(context.WithValue(ctx, core.DockerHostContextKey, p.host), sessionID, p) + r, err := spawner.reaper(context.WithValue(ctx, core.DockerHostContextKey, p.host), sessionID, p) if err != nil { - return nil, fmt.Errorf("%w: creating network reaper failed", err) + return nil, fmt.Errorf("reaper: %w", err) } - termSignal, err = r.Connect() + + termSignal, err := r.Connect() if err != nil { - return nil, fmt.Errorf("%w: connecting to network reaper failed", err) + return nil, fmt.Errorf("reaper connect: %w", err) } - } - // add the labels that the reaper will use to terminate the network to the request - for k, v := range core.DefaultLabels(sessionID) { - req.Labels[k] = v + // Cleanup on error. + defer func() { + if err != nil { + termSignal <- true + } + }() } - // Cleanup on error, otherwise set termSignal to nil before successful return. - defer func() { - if termSignal != nil { - termSignal <- true - } - }() + // add the labels that the reaper will use to terminate the network to the request + core.AddDefaultLabels(sessionID, req.Labels) response, err := p.client.NetworkCreate(ctx, req.Name, nc) if err != nil { - return &DockerNetwork{}, err + return &DockerNetwork{}, fmt.Errorf("create network: %w", err) } n := &DockerNetwork{ @@ -1507,9 +1507,6 @@ func (p *DockerProvider) CreateNetwork(ctx context.Context, req NetworkRequest) provider: p, } - // Disable cleanup on success - termSignal = nil - return n, nil } @@ -1579,9 +1576,12 @@ func (p *DockerProvider) getDefaultNetwork(ctx context.Context, cli client.APICl _, err = cli.NetworkCreate(ctx, reaperNetwork, network.CreateOptions{ Driver: Bridge, Attachable: true, - Labels: core.DefaultLabels(core.SessionID()), + Labels: GenericLabels(), }) - if err != nil { + // If the network already exists, we can ignore the error as that can + // happen if we are running multiple tests in parallel and we only + // need to ensure that the network exists. + if err != nil && !errdefs.IsConflict(err) { return "", err } } @@ -1619,7 +1619,7 @@ func containerFromDockerResponse(ctx context.Context, response types.Container) // populate the raw representation of the container jsonRaw, err := ctr.inspectRawContainer(ctx) if err != nil { - return nil, err + return nil, fmt.Errorf("inspect raw container: %w", err) } // the health status of the container, if any diff --git a/docker_mounts.go b/docker_mounts.go index aed30103618..d8af3fae3ef 100644 --- a/docker_mounts.go +++ b/docker_mounts.go @@ -126,9 +126,7 @@ func mapToDockerMounts(containerMounts ContainerMounts) []mount.Mount { Labels: make(map[string]string), } } - for k, v := range GenericLabels() { - containerMount.VolumeOptions.Labels[k] = v - } + AddGenericLabels(containerMount.VolumeOptions.Labels) } mounts = append(mounts, containerMount) diff --git a/generic.go b/generic.go index 9052287b518..fd13a607dee 100644 --- a/generic.go +++ b/generic.go @@ -101,7 +101,17 @@ type GenericProvider interface { ImageProvider } -// GenericLabels returns a map of labels that can be used to identify containers created by this library +// GenericLabels returns a map of labels that can be used to identify resources +// created by this library. This includes the standard LabelSessionID if the +// reaper is enabled, otherwise this is excluded to prevent resources being +// incorrectly reaped. func GenericLabels() map[string]string { return core.DefaultLabels(core.SessionID()) } + +// AddGenericLabels adds the generic labels to target. +func AddGenericLabels(target map[string]string) { + for k, v := range GenericLabels() { + target[k] = v + } +} diff --git a/internal/core/labels.go b/internal/core/labels.go index 58b054ab952..485770f99cb 100644 --- a/internal/core/labels.go +++ b/internal/core/labels.go @@ -2,6 +2,7 @@ package core import ( "github.com/testcontainers/testcontainers-go/internal" + "github.com/testcontainers/testcontainers-go/internal/config" ) const ( @@ -13,11 +14,25 @@ const ( LabelVersion = LabelBase + ".version" ) +// DefaultLabels returns the standard set of labels which +// includes LabelSessionID if the reaper is enabled. func DefaultLabels(sessionID string) map[string]string { - return map[string]string{ - LabelBase: "true", - LabelLang: "go", - LabelSessionID: sessionID, - LabelVersion: internal.Version, + labels := map[string]string{ + LabelBase: "true", + LabelLang: "go", + LabelVersion: internal.Version, + } + + if !config.Read().RyukDisabled { + labels[LabelSessionID] = sessionID + } + + return labels +} + +// AddDefaultLabels adds the default labels for sessionID to target. +func AddDefaultLabels(sessionID string, target map[string]string) { + for k, v := range DefaultLabels(sessionID) { + target[k] = v } } diff --git a/lifecycle.go b/lifecycle.go index c38e60240da..0ca6dd70a62 100644 --- a/lifecycle.go +++ b/lifecycle.go @@ -33,7 +33,7 @@ type ContainerRequestHook func(ctx context.Context, req ContainerRequest) error // - Terminating // - Terminated // For that, it will receive a Container, modify it and return an error if needed. -type ContainerHook func(ctx context.Context, container Container) error +type ContainerHook func(ctx context.Context, ctr Container) error // ContainerLifecycleHooks is a struct that contains all the hooks that can be used // to modify the container lifecycle. All the container lifecycle hooks except the PreCreates hooks diff --git a/modules/compose/compose.go b/modules/compose/compose.go index c63eb73bb1f..fa02cde0773 100644 --- a/modules/compose/compose.go +++ b/modules/compose/compose.go @@ -153,23 +153,6 @@ func NewDockerComposeWith(opts ...ComposeStackOption) (*dockerCompose, error) { return nil, fmt.Errorf("initialize docker client: %w", err) } - reaperProvider, err := testcontainers.NewDockerProvider() - if err != nil { - return nil, fmt.Errorf("failed to create reaper provider for compose: %w", err) - } - - var composeReaper *testcontainers.Reaper - if !reaperProvider.Config().Config.RyukDisabled { - // NewReaper is deprecated: we need to find a way to create the reaper for compose - // bypassing the deprecation. - r, err := testcontainers.NewReaper(context.Background(), testcontainers.SessionID(), reaperProvider, "") - if err != nil { - return nil, fmt.Errorf("failed to create reaper for compose: %w", err) - } - - composeReaper = r - } - composeAPI := &dockerCompose{ name: composeOptions.Identifier, configs: composeOptions.Paths, @@ -182,7 +165,6 @@ func NewDockerComposeWith(opts ...ComposeStackOption) (*dockerCompose, error) { containers: make(map[string]*testcontainers.DockerContainer), networks: make(map[string]*testcontainers.DockerNetwork), sessionID: testcontainers.SessionID(), - reaper: composeReaper, } return composeAPI, nil diff --git a/modules/compose/compose_api.go b/modules/compose/compose_api.go index 9f21d09e875..0333fd9c63c 100644 --- a/modules/compose/compose_api.go +++ b/modules/compose/compose_api.go @@ -2,6 +2,7 @@ package compose import ( "context" + "errors" "fmt" "io" "os" @@ -228,9 +229,6 @@ type dockerCompose struct { // sessionID is used to identify the reaper session sessionID string - - // reaper is used to clean up containers after the stack is stopped - reaper *testcontainers.Reaper } func (d *dockerCompose) ServiceContainer(ctx context.Context, svcName string) (*testcontainers.DockerContainer, error) { @@ -269,12 +267,10 @@ func (d *dockerCompose) Down(ctx context.Context, opts ...StackDownOption) error return d.composeService.Down(ctx, d.name, options.DownOptions) } -func (d *dockerCompose) Up(ctx context.Context, opts ...StackUpOption) error { +func (d *dockerCompose) Up(ctx context.Context, opts ...StackUpOption) (err error) { d.lock.Lock() defer d.lock.Unlock() - var err error - d.project, err = d.compileProject(ctx) if err != nil { return err @@ -329,27 +325,61 @@ func (d *dockerCompose) Up(ctx context.Context, opts ...StackUpOption) error { return err } - if d.reaper != nil { + provider, err := testcontainers.NewDockerProvider(testcontainers.WithLogger(d.logger)) + if err != nil { + return fmt.Errorf("new docker provider: %w", err) + } + + var termSignals []chan bool + var reaper *testcontainers.Reaper + if !provider.Config().Config.RyukDisabled { + // NewReaper is deprecated: we need to find a way to create the reaper for compose + // bypassing the deprecation. + reaper, err = testcontainers.NewReaper(ctx, testcontainers.SessionID(), provider, "") + if err != nil { + return fmt.Errorf("create reaper: %w", err) + } + + // Cleanup on error, otherwise set termSignal to nil before successful return. + defer func() { + if len(termSignals) == 0 { + // Need to call Connect at least once to ensure the initial + // connection is cleaned up. + termSignal, errc := reaper.Connect() + if errc != nil { + err = errors.Join(err, fmt.Errorf("reaper connect: %w", errc)) + } else { + termSignal <- true + } + } + + if err == nil { + // No need to cleanup. + return + } + + for _, ts := range termSignals { + ts <- true + } + }() + + // Connect to the reaper and set the termination signal for each network. for _, n := range d.networks { - termSignal, err := d.reaper.Connect() + termSignal, err := reaper.Connect() if err != nil { - return fmt.Errorf("failed to connect to reaper: %w", err) + return fmt.Errorf("reaper connect: %w", err) } - n.SetTerminationSignal(termSignal) - // Cleanup on error, otherwise set termSignal to nil before successful return. - defer func() { - if termSignal != nil { - termSignal <- true - } - }() + n.SetTerminationSignal(termSignal) + termSignals = append(termSignals, termSignal) } } errGrpContainers, errGrpCtx := errgroup.WithContext(ctx) + // Lookup the containers for each service and connect them + // to the reaper if needed. for _, srv := range d.project.Services { - // we are going to connect each container to the reaper srv := srv errGrpContainers.Go(func() error { dc, err := d.lookupContainer(errGrpCtx, srv.Name) @@ -357,19 +387,14 @@ func (d *dockerCompose) Up(ctx context.Context, opts ...StackUpOption) error { return err } - if d.reaper != nil { - termSignal, err := d.reaper.Connect() + if reaper != nil { + termSignal, err := reaper.Connect() if err != nil { - return fmt.Errorf("failed to connect to reaper: %w", err) + return fmt.Errorf("reaper connect: %w", err) } - dc.SetTerminationSignal(termSignal) - // Cleanup on error, otherwise set termSignal to nil before successful return. - defer func() { - if termSignal != nil { - termSignal <- true - } - }() + dc.SetTerminationSignal(termSignal) + termSignals = append(termSignals, termSignal) } return nil @@ -401,7 +426,11 @@ func (d *dockerCompose) Up(ctx context.Context, opts ...StackUpOption) error { }) } - return errGrpWait.Wait() + if err := errGrpWait.Wait(); err != nil { + return err + } + + return nil } func (d *dockerCompose) WaitForService(s string, strategy wait.Strategy) ComposeStack { @@ -486,6 +515,9 @@ func (d *dockerCompose) lookupContainer(ctx context.Context, svcName string) (*t return ctr, nil } +// lookupNetworks is used to retrieve the networks that are part of the compose stack. +// +// Safe for concurrent calls. func (d *dockerCompose) lookupNetworks(ctx context.Context) error { networks, err := d.dockerClient.NetworkList(ctx, dockernetwork.ListOptions{ Filters: filters.NewArgs( @@ -543,9 +575,7 @@ func (d *dockerCompose) compileProject(ctx context.Context) (*types.Project, err api.OneoffLabel: "False", // default, will be overridden by `run` command } - for k, label := range testcontainers.GenericLabels() { - s.CustomLabels[k] = label - } + testcontainers.AddGenericLabels(s.CustomLabels) for i, envFile := range compiledOptions.EnvFiles { // add a label for each env file, indexed by its position @@ -562,9 +592,7 @@ func (d *dockerCompose) compileProject(ctx context.Context) (*types.Project, err api.VersionLabel: api.ComposeVersion, } - for k, label := range testcontainers.GenericLabels() { - n.Labels[k] = label - } + testcontainers.AddGenericLabels(n.Labels) proj.Networks[key] = n } diff --git a/reaper.go b/reaper.go index c41520b5b73..5aaca15b82a 100644 --- a/reaper.go +++ b/reaper.go @@ -1,13 +1,16 @@ package testcontainers import ( - "bufio" + "bytes" "context" + "errors" "fmt" - "math/rand" + "io" "net" + "os" "strings" "sync" + "syscall" "time" "github.com/cenkalti/backoff/v4" @@ -34,9 +37,22 @@ const ( var ( // Deprecated: it has been replaced by an internal value ReaperDefaultImage = config.ReaperDefaultImage - reaperInstance *Reaper // We would like to create reaper only once - reaperMutex sync.Mutex - reaperOnce sync.Once + + // reaperPort is the port that the reaper listens on. + reaperPort = nat.Port("8080/tcp") + + // errReaperNotFound is returned when no reaper container is found. + errReaperNotFound = errors.New("reaper not found") + + // errReaperDisabled is returned if a reaper is requested but the + // config has it disabled. + errReaperDisabled = errors.New("reaper disabled") + + // spawner is the singleton instance of reaperSpawner. + spawner = &reaperSpawner{} + + // reaperAck is the expected response from the reaper container. + reaperAck = []byte("ACK\n") ) // ReaperProvider represents a provider for the reaper to run itself with @@ -47,10 +63,18 @@ type ReaperProvider interface { } // NewReaper creates a Reaper with a sessionID to identify containers and a provider to use -// Deprecated: it's not possible to create a reaper anymore. Compose module uses this method +// Deprecated: it's not possible to create a reaper any more. Compose module uses this method // to create a reaper for the compose stack. +// +// The caller must call Connect at least once on the returned Reaper and use the returned +// result otherwise the reaper will be kept open until the process exits. func NewReaper(ctx context.Context, sessionID string, provider ReaperProvider, reaperImageName string) (*Reaper, error) { - return reuseOrCreateReaper(ctx, sessionID, provider) + reaper, err := spawner.reaper(ctx, sessionID, provider) + if err != nil { + return nil, fmt.Errorf("reaper: %w", err) + } + + return reaper, nil } // reaperContainerNameFromSessionID returns the container name that uniquely @@ -61,32 +85,68 @@ func reaperContainerNameFromSessionID(sessionID string) string { return fmt.Sprintf("reaper_%s", sessionID) } -// lookUpReaperContainer returns a DockerContainer type with the reaper container in the case +// reaperSpawner is a singleton that manages the reaper container. +type reaperSpawner struct { + instance *Reaper + mtx sync.Mutex +} + +// backoff returns a backoff policy for the reaper spawner. +// It will take at most 20 seconds, doing each attempt every 100ms - 250ms. +func (r *reaperSpawner) backoff() *backoff.ExponentialBackOff { + // We want random intervals between 100ms and 250ms for concurrent executions + // to not be synchronized: it could be the case that multiple executions of this + // function happen at the same time (specifically when called from a different test + // process execution), and we want to avoid that they all try to find the reaper + // container at the same time. + b := &backoff.ExponentialBackOff{ + InitialInterval: time.Millisecond * 100, + RandomizationFactor: backoff.DefaultRandomizationFactor, + Multiplier: backoff.DefaultMultiplier, + // Adjust MaxInterval to compensate for randomization factor which can be added to + // returned interval so we have a maximum of 250ms. + MaxInterval: time.Duration(float64(time.Millisecond*250) * backoff.DefaultRandomizationFactor), + MaxElapsedTime: time.Second * 20, + Stop: backoff.Stop, + Clock: backoff.SystemClock, + } + b.Reset() + + return b +} + +// cleanup terminates the reaper container if set. +func (r *reaperSpawner) cleanup() error { + r.mtx.Lock() + defer r.mtx.Unlock() + + return r.cleanupLocked() +} + +// cleanupLocked terminates the reaper container if set. +// It must be called with the lock held. +func (r *reaperSpawner) cleanupLocked() error { + if r.instance == nil { + return nil + } + + err := TerminateContainer(r.instance.container) + r.instance = nil + + return err +} + +// lookupContainer returns a DockerContainer type with the reaper container in the case // it's found in the running state, and including the labels for sessionID, reaper, and ryuk. // It will perform a retry with exponential backoff to allow for the container to be started and // avoid potential false negatives. -func lookUpReaperContainer(ctx context.Context, sessionID string) (*DockerContainer, error) { +func (r *reaperSpawner) lookupContainer(ctx context.Context, sessionID string) (*DockerContainer, error) { dockerClient, err := NewDockerClientWithOpts(ctx) if err != nil { - return nil, err + return nil, fmt.Errorf("new client: %w", err) } defer dockerClient.Close() - // the backoff will take at most 5 seconds to find the reaper container - // doing each attempt every 100ms - exp := backoff.NewExponentialBackOff() - - // we want random intervals between 100ms and 500ms for concurrent executions - // to not be synchronized: it could be the case that multiple executions of this - // function happen at the same time (specifically when called from a different test - // process execution), and we want to avoid that they all try to find the reaper - // container at the same time. - exp.InitialInterval = time.Duration(rand.Intn(5)*100) * time.Millisecond - exp.RandomizationFactor = rand.Float64() * 0.5 - exp.Multiplier = rand.Float64() * 2.0 - exp.MaxInterval = 5.0 * time.Second // max interval between attempts - exp.MaxElapsedTime = 1 * time.Minute // max time to keep trying - opts := container.ListOptions{ All: true, Filters: filters.NewArgs( @@ -97,159 +157,213 @@ func lookUpReaperContainer(ctx context.Context, sessionID string) (*DockerContai ), } - return backoff.RetryNotifyWithData( + return backoff.RetryWithData( func() (*DockerContainer, error) { resp, err := dockerClient.ContainerList(ctx, opts) if err != nil { - return nil, err + return nil, fmt.Errorf("container list: %w", err) } if len(resp) == 0 { - // reaper container not found in the running state: do not look for it again - return nil, nil + // No reaper container not found. + return nil, backoff.Permanent(errReaperNotFound) } if len(resp) > 1 { - return nil, fmt.Errorf("not possible to have multiple reaper containers found for session ID %s", sessionID) + return nil, fmt.Errorf("multiple reaper containers found for session ID %s", sessionID) } - r, err := containerFromDockerResponse(ctx, resp[0]) + container := resp[0] + r, err := containerFromDockerResponse(ctx, container) if err != nil { - return nil, err + return nil, fmt.Errorf("from docker: %w", err) } - if r.healthStatus == types.Healthy || r.healthStatus == types.NoHealthcheck { + switch { + case r.healthStatus == types.Healthy, + r.healthStatus == types.NoHealthcheck: return r, nil - } - - // if a health status is present on the container, and the container is healthy, error - if r.healthStatus != "" { - return nil, fmt.Errorf("container %s is not healthy, wanted status=%s, got status=%s", resp[0].ID[:8], types.Healthy, r.healthStatus) + case r.healthStatus != "": + return nil, fmt.Errorf("container not healthy: %s", r.healthStatus) } return r, nil }, - backoff.WithContext(exp, ctx), - func(err error, duration time.Duration) { - Logger.Printf("Error looking up reaper container, will retry: %v", err) - }, + backoff.WithContext(r.backoff(), ctx), ) } -// reuseOrCreateReaper returns an existing Reaper instance if it exists and is running. Otherwise, a new Reaper instance -// will be created with a sessionID to identify containers in the same test session/program. -func reuseOrCreateReaper(ctx context.Context, sessionID string, provider ReaperProvider) (*Reaper, error) { - reaperMutex.Lock() - defer reaperMutex.Unlock() - - // 1. if the reaper instance has been already created, return it - if reaperInstance != nil { - // Verify this instance is still running by checking state. - // Can't use Container.IsRunning because the bool is not updated when Reaper is terminated - state, err := reaperInstance.container.State(ctx) - if err != nil { - if !errdefs.IsNotFound(err) { - return nil, err +// isRunning returns an error if the container is not running. +func (r *reaperSpawner) isRunning(ctx context.Context, ctr Container) error { + state, err := ctr.State(ctx) + if err != nil { + return fmt.Errorf("container state: %w", err) + } + + if !state.Running { + // Use NotFound error to indicate the container is not running + // and should be recreated. + return errdefs.NotFound(fmt.Errorf("container state: %s", state.Status)) + } + + return nil +} + +// retryError returns a permanent error if the error is not considered retryable. +func (r *reaperSpawner) retryError(err error) error { + var timeout interface { + Timeout() bool + } + switch { + case isCleanupSafe(err), + createContainerFailDueToNameConflictRegex.MatchString(err.Error()), + errors.Is(err, syscall.ECONNREFUSED), + errors.Is(err, syscall.ECONNRESET), + errors.Is(err, syscall.ECONNABORTED), + errors.Is(err, syscall.ETIMEDOUT), + errors.Is(err, os.ErrDeadlineExceeded), + errors.As(err, &timeout) && timeout.Timeout(), + errors.Is(err, context.DeadlineExceeded), + errors.Is(err, context.Canceled): + // Retryable error. + return err + default: + return backoff.Permanent(err) + } +} + +// reaper returns an existing Reaper instance if it exists and is running, otherwise +// a new Reaper instance will be created with a sessionID to identify containers in +// the same test session/program. If connect is true, the reaper will be connected +// to the reaper container. +// Returns an error if config.RyukDisabled is true. +// +// Safe for concurrent calls. +func (r *reaperSpawner) reaper(ctx context.Context, sessionID string, provider ReaperProvider) (*Reaper, error) { + if config.Read().RyukDisabled { + return nil, errReaperDisabled + } + + r.mtx.Lock() + defer r.mtx.Unlock() + + return backoff.RetryWithData( + r.retryLocked(ctx, sessionID, provider), + backoff.WithContext(r.backoff(), ctx), + ) +} + +// retryLocked returns a function that can be used to create or reuse a reaper container. +// If connect is true, the reaper will be connected to the reaper container. +// It must be called with the lock held. +func (r *reaperSpawner) retryLocked(ctx context.Context, sessionID string, provider ReaperProvider) func() (*Reaper, error) { + return func() (reaper *Reaper, err error) { //nolint:nonamedreturns // Needed for deferred error check. + reaper, err = r.reuseOrCreate(ctx, sessionID, provider) + // Ensure that the reaper is terminated if an error occurred. + defer func() { + if err != nil { + if reaper != nil { + err = errors.Join(err, TerminateContainer(reaper.container)) + } + err = r.retryError(errors.Join(err, r.cleanupLocked())) } - } else if state.Running { - return reaperInstance, nil - } - // else: the reaper instance has been terminated, so we need to create a new one - reaperOnce = sync.Once{} - } - - // 2. because the reaper instance has not been created yet, look for it in the Docker daemon, which - // will happen if the reaper container has been created in the same test session but in a different - // test process execution (e.g. when running tests in parallel), not having initialized the reaper - // instance yet. - reaperContainer, err := lookUpReaperContainer(context.Background(), sessionID) - if err == nil && reaperContainer != nil { - // The reaper container exists as a Docker container: re-use it - Logger.Printf("🔥 Reaper obtained from Docker for this test session %s", reaperContainer.ID) - reaperInstance, err = reuseReaperContainer(ctx, sessionID, provider, reaperContainer) + }() if err != nil { return nil, err } - return reaperInstance, nil - } + if err = r.isRunning(ctx, reaper.container); err != nil { + return nil, err + } - // 3. the reaper container does not exist in the Docker daemon: create it, and do it using the - // synchronization primitive to avoid multiple executions of this function to create the reaper - var reaperErr error - reaperOnce.Do(func() { - r, err := newReaper(ctx, sessionID, provider) + // Check we can still connect. + termSignal, err := reaper.connect(ctx) if err != nil { - reaperErr = err - return + return nil, fmt.Errorf("connect: %w", err) } - reaperInstance, reaperErr = r, nil - }) - if reaperErr != nil { - reaperOnce = sync.Once{} - return nil, reaperErr - } + reaper.setOrSignal(termSignal) - return reaperInstance, nil + r.instance = reaper + + return reaper, nil + } } -// reuseReaperContainer constructs a Reaper from an already running reaper -// DockerContainer. -func reuseReaperContainer(ctx context.Context, sessionID string, provider ReaperProvider, reaperContainer *DockerContainer) (*Reaper, error) { - endpoint, err := reaperContainer.PortEndpoint(ctx, "8080", "") +// reuseOrCreate returns an existing Reaper instance if it exists, otherwise a new Reaper instance. +func (r *reaperSpawner) reuseOrCreate(ctx context.Context, sessionID string, provider ReaperProvider) (*Reaper, error) { + if r.instance != nil { + // We already have an associated reaper. + return r.instance, nil + } + + // Look for an existing reaper created in the same test session but in a + // different test process execution e.g. when running tests in parallel. + container, err := r.lookupContainer(context.Background(), sessionID) + if err != nil { + if !errors.Is(err, errReaperNotFound) { + return nil, fmt.Errorf("look up container: %w", err) + } + + // The reaper container was not found, continue to create a new one. + reaper, err := r.newReaper(ctx, sessionID, provider) + if err != nil { + return nil, fmt.Errorf("new reaper: %w", err) + } + + return reaper, nil + } + + // A reaper container exists re-use it. + reaper, err := r.fromContainer(ctx, sessionID, provider, container) if err != nil { - return nil, err + return nil, fmt.Errorf("from container %q: %w", container.ID[:8], err) } - Logger.Printf("⏳ Waiting for Reaper port to be ready") + return reaper, nil +} - var containerJson *types.ContainerJSON +// fromContainer constructs a Reaper from an already running reaper DockerContainer. +func (r *reaperSpawner) fromContainer(ctx context.Context, sessionID string, provider ReaperProvider, dockerContainer *DockerContainer) (*Reaper, error) { + Logger.Printf("⏳ Waiting for Reaper port %s to be ready", reaperPort) - if containerJson, err = reaperContainer.Inspect(ctx); err != nil { - return nil, fmt.Errorf("failed to inspect reaper container %s: %w", reaperContainer.ID[:8], err) + err := wait.ForListeningPort(reaperPort). + WithPollInterval(100*time.Millisecond). + SkipInternalCheck(). + WaitUntilReady(ctx, dockerContainer) + if err != nil { + return nil, fmt.Errorf("wait for port %s: %w", reaperPort, err) } - if containerJson != nil && containerJson.NetworkSettings != nil { - for port := range containerJson.NetworkSettings.Ports { - err := wait.ForListeningPort(port). - WithPollInterval(100*time.Millisecond). - WaitUntilReady(ctx, reaperContainer) - if err != nil { - return nil, fmt.Errorf("failed waiting for reaper container %s port %s/%s to be ready: %w", - reaperContainer.ID[:8], port.Proto(), port.Port(), err) - } - } + endpoint, err := dockerContainer.PortEndpoint(ctx, reaperPort, "") + if err != nil { + return nil, fmt.Errorf("port endpoint: %w", err) } + Logger.Printf("🔥 Reaper obtained from Docker for this test session %s", dockerContainer.ID[:8]) + return &Reaper{ Provider: provider, SessionID: sessionID, Endpoint: endpoint, - container: reaperContainer, + container: dockerContainer, }, nil } -// newReaper creates a Reaper with a sessionID to identify containers and a -// provider to use. Do not call this directly, use reuseOrCreateReaper instead. -func newReaper(ctx context.Context, sessionID string, provider ReaperProvider) (*Reaper, error) { +// newReaper creates a connected Reaper with a sessionID to identify containers +// and a provider to use. +// +// Do not call this directly, use reuseOrCreateReaper instead. +func (r *reaperSpawner) newReaper(ctx context.Context, sessionID string, provider ReaperProvider) (reaper *Reaper, err error) { //nolint:nonamedreturns // Needed for deferred error check. dockerHostMount := core.MustExtractDockerSocket(ctx) - reaper := &Reaper{ - Provider: provider, - SessionID: sessionID, - } - - listeningPort := nat.Port("8080/tcp") - tcConfig := provider.Config().Config - req := ContainerRequest{ Image: config.ReaperDefaultImage, - ExposedPorts: []string{string(listeningPort)}, + ExposedPorts: []string{string(reaperPort)}, Labels: core.DefaultLabels(sessionID), Privileged: tcConfig.RyukPrivileged, - WaitingFor: wait.ForListeningPort(listeningPort), + WaitingFor: wait.ForListeningPort(reaperPort), Name: reaperContainerNameFromSessionID(sessionID), HostConfigModifier: func(hc *container.HostConfig) { hc.AutoRemove = true @@ -278,123 +392,158 @@ func newReaper(ctx context.Context, sessionID string, provider ReaperProvider) ( } c, err := provider.RunContainer(ctx, req) - if err != nil { - // We need to check whether the error is caused by a container with the same name - // already existing due to race conditions. We manually match the error message - // as we do not have any error types to check against. - if createContainerFailDueToNameConflictRegex.MatchString(err.Error()) { - // Manually retrieve the already running reaper container. However, we need to - // use retries here as there are two possible race conditions that might lead to - // errors: In most cases, there is a small delay between container creation and - // actually being visible in list-requests. This means that creation might fail - // due to name conflicts, but when we list containers with this name, we do not - // get any results. In another case, the container might have simply died in the - // meantime and therefore cannot be found. - const timeout = 5 * time.Second - const cooldown = 100 * time.Millisecond - start := time.Now() - var reaperContainer *DockerContainer - for time.Since(start) < timeout { - reaperContainer, err = lookUpReaperContainer(ctx, sessionID) - if err == nil && reaperContainer != nil { - break - } - select { - case <-ctx.Done(): - case <-time.After(cooldown): - } - } - if err != nil { - return nil, fmt.Errorf("look up reaper container due to name conflict failed: %w", err) - } - // If the reaper container was not found, it is most likely to have died in - // between as we can exclude any client errors because of the previous error - // check. Because the reaper should only die if it performed clean-ups, we can - // fail here as the reaper timeout needs to be increased, anyway. - if reaperContainer == nil { - return nil, fmt.Errorf("look up reaper container returned nil although creation failed due to name conflict") - } - Logger.Printf("🔥 Reaper obtained from Docker for this test session %s", reaperContainer.ID) - reaper, err := reuseReaperContainer(ctx, sessionID, provider, reaperContainer) - if err != nil { - return nil, err - } - return reaper, nil + defer func() { + if err != nil { + err = errors.Join(err, TerminateContainer(c)) } - return nil, err + }() + if err != nil { + return nil, fmt.Errorf("run container: %w", err) } - reaper.container = c - endpoint, err := c.PortEndpoint(ctx, "8080", "") + endpoint, err := c.PortEndpoint(ctx, reaperPort, "") if err != nil { - return nil, err + return nil, fmt.Errorf("port endpoint: %w", err) } - reaper.Endpoint = endpoint - return reaper, nil + return &Reaper{ + Provider: provider, + SessionID: sessionID, + Endpoint: endpoint, + container: c, + }, nil } // Reaper is used to start a sidecar container that cleans up resources type Reaper struct { - Provider ReaperProvider - SessionID string - Endpoint string - container Container + Provider ReaperProvider + SessionID string + Endpoint string + container Container + mtx sync.Mutex // Protects termSignal. + termSignal chan bool } -// Connect runs a goroutine which can be terminated by sending true into the returned channel +// Connect connects to the reaper container and sends the labels to it +// so that it can clean up the containers with the same labels. +// +// It returns a channel that can be closed to terminate the connection. +// Returns an error if config.RyukDisabled is true. func (r *Reaper) Connect() (chan bool, error) { - conn, err := net.DialTimeout("tcp", r.Endpoint, 10*time.Second) - if err != nil { - return nil, fmt.Errorf("%w: Connecting to Ryuk on %s failed", err, r.Endpoint) + if config.Read().RyukDisabled { + return nil, errReaperDisabled } - terminationSignal := make(chan bool) - go func(conn net.Conn) { - sock := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) - defer conn.Close() + if termSignal := r.useTermSignal(); termSignal != nil { + return termSignal, nil + } - labelFilters := []string{} - for l, v := range core.DefaultLabels(r.SessionID) { - labelFilters = append(labelFilters, fmt.Sprintf("label=%s=%s", l, v)) - } + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() - retryLimit := 3 - for retryLimit > 0 { - retryLimit-- + return r.connect(ctx) +} - if _, err := sock.WriteString(strings.Join(labelFilters, "&")); err != nil { - continue - } +// close signals the connection to close if needed. +// Safe for concurrent calls. +func (r *Reaper) close() { + r.mtx.Lock() + defer r.mtx.Unlock() - if _, err := sock.WriteString("\n"); err != nil { - continue - } + if r.termSignal != nil { + r.termSignal <- true + r.termSignal = nil + } +} - if err := sock.Flush(); err != nil { - continue - } +// setOrSignal sets the reapers termSignal field if nil +// otherwise consumes by sending true to it. +// Safe for concurrent calls. +func (r *Reaper) setOrSignal(termSignal chan bool) { + r.mtx.Lock() + defer r.mtx.Unlock() + + if r.termSignal != nil { + // Already have an existing connection, close the new one. + termSignal <- true + return + } - resp, err := sock.ReadString('\n') - if err != nil { - continue - } + // First or new unused termSignal, assign for caller to reuse. + r.termSignal = termSignal +} - if resp == "ACK\n" { - break - } - } +// useTermSignal if termSignal is not nil returns it +// and sets it to nil, otherwise returns nil. +// +// Safe for concurrent calls. +func (r *Reaper) useTermSignal() chan bool { + r.mtx.Lock() + defer r.mtx.Unlock() + + if r.termSignal == nil { + return nil + } + + // Use existing connection. + term := r.termSignal + r.termSignal = nil + + return term +} +// connect connects to the reaper container and sends the labels to it +// so that it can clean up the containers with the same labels. +// +// It returns a channel that can be sent true to terminate the connection. +// Returns an error if config.RyukDisabled is true. +func (r *Reaper) connect(ctx context.Context) (chan bool, error) { + var d net.Dialer + conn, err := d.DialContext(ctx, "tcp", r.Endpoint) + if err != nil { + return nil, fmt.Errorf("dial reaper %s: %w", r.Endpoint, err) + } + + terminationSignal := make(chan bool) + go func() { + defer conn.Close() + if err := r.handshake(conn); err != nil { + Logger.Printf("Reaper handshake failed: %s", err) + } <-terminationSignal - }(conn) + }() return terminationSignal, nil } +// handshake sends the labels to the reaper container and reads the ACK. +func (r *Reaper) handshake(conn net.Conn) error { + labels := core.DefaultLabels(r.SessionID) + labelFilters := make([]string, 0, len(labels)) + for l, v := range labels { + labelFilters = append(labelFilters, fmt.Sprintf("label=%s=%s", l, v)) + } + + filters := []byte(strings.Join(labelFilters, "&") + "\n") + buf := make([]byte, 4) + if _, err := conn.Write(filters); err != nil { + return fmt.Errorf("writing filters: %w", err) + } + + n, err := io.ReadFull(conn, buf) + if err != nil { + return fmt.Errorf("read ack: %w", err) + } + + if !bytes.Equal(reaperAck, buf[:n]) { + // We have received the ACK so all done. + return fmt.Errorf("unexpected reaper response: %s", buf[:n]) + } + + return nil +} + // Labels returns the container labels to use so that this Reaper cleans them up // Deprecated: internally replaced by core.DefaultLabels(sessionID) func (r *Reaper) Labels() map[string]string { - return map[string]string{ - core.LabelLang: "go", - core.LabelSessionID: r.SessionID, - } + return GenericLabels() } diff --git a/reaper_test.go b/reaper_test.go index f421c2686d4..321cfaf6819 100644 --- a/reaper_test.go +++ b/reaper_test.go @@ -4,14 +4,15 @@ import ( "context" "errors" "os" + "strconv" "sync" "testing" "time" "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/network" + "github.com/docker/docker/errdefs" "github.com/docker/go-connections/nat" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go/internal/config" @@ -23,48 +24,29 @@ import ( const testSessionID = "this-is-a-different-session-id" type mockReaperProvider struct { - req ContainerRequest - hostConfig *container.HostConfig - enpointSettings map[string]*network.EndpointSettings - config TestcontainersConfig - initialReaper *Reaper - initialReaperOnce sync.Once - t *testing.T + req ContainerRequest + hostConfig *container.HostConfig + endpointSettings map[string]*network.EndpointSettings + config TestcontainersConfig } -func newMockReaperProvider(t *testing.T) *mockReaperProvider { +func newMockReaperProvider(cfg config.Config) *mockReaperProvider { m := &mockReaperProvider{ config: TestcontainersConfig{ - Config: config.Config{}, + Config: cfg, }, - t: t, - initialReaper: reaperInstance, - //nolint:govet - initialReaperOnce: reaperOnce, } - // explicitly reset the reaperInstance to nil to start from a fresh state - reaperInstance = nil - reaperOnce = sync.Once{} - return m } var errExpected = errors.New("expected") -func (m *mockReaperProvider) RestoreReaperState() { - m.t.Cleanup(func() { - reaperInstance = m.initialReaper - //nolint:govet - reaperOnce = m.initialReaperOnce - }) -} - func (m *mockReaperProvider) RunContainer(ctx context.Context, req ContainerRequest) (Container, error) { m.req = req m.hostConfig = &container.HostConfig{} - m.enpointSettings = map[string]*network.EndpointSettings{} + m.endpointSettings = map[string]*network.EndpointSettings{} if req.HostConfigModifier == nil { req.HostConfigModifier = defaultHostConfigModifier(req) @@ -72,7 +54,7 @@ func (m *mockReaperProvider) RunContainer(ctx context.Context, req ContainerRequ req.HostConfigModifier(m.hostConfig) if req.EnpointSettingsModifier != nil { - req.EnpointSettingsModifier(m.enpointSettings) + req.EnpointSettingsModifier(m.endpointSettings) } // we're only interested in the request, so instead of mocking the Docker client @@ -85,7 +67,7 @@ func (m *mockReaperProvider) Config() TestcontainersConfig { } // createContainerRequest creates the expected request and allows for customization -func createContainerRequest(customize func(ContainerRequest) ContainerRequest) ContainerRequest { +func createContainerRequest(customize ...func(*ContainerRequest)) ContainerRequest { req := ContainerRequest{ Image: config.ReaperDefaultImage, ExposedPorts: []string{"8080/tcp"}, @@ -103,20 +85,24 @@ func createContainerRequest(customize func(ContainerRequest) ContainerRequest) C req.Labels[core.LabelReaper] = "true" req.Labels[core.LabelRyuk] = "true" - if customize == nil { - return req + for _, customize := range customize { + customize(&req) } - return customize(req) + return req } -func TestContainerStartsWithoutTheReaper(t *testing.T) { - config.Reset() // reset the config using the internal method to avoid the sync.Once - tcConfig := config.Read() - if !tcConfig.RyukDisabled { - t.Skip("Ryuk is enabled, skipping test") - } +// reaperDisable disables / enables the reaper for the duration of the test. +func reaperDisable(t *testing.T, disabled bool) { + t.Helper() + + config.Reset() + t.Setenv("TESTCONTAINERS_RYUK_DISABLED", strconv.FormatBool(disabled)) + t.Cleanup(config.Reset) +} +func testContainerStart(t *testing.T) { + t.Helper() ctx := context.Background() ctr, err := GenericContainer(ctx, GenericContainerRequest{ @@ -131,59 +117,55 @@ func TestContainerStartsWithoutTheReaper(t *testing.T) { }) CleanupContainer(t, ctr) require.NoError(t, err) +} - sessionID := core.SessionID() +// testReaperRunning validates that a reaper is running. +func testReaperRunning(t *testing.T) { + t.Helper() - reaperContainer, err := lookUpReaperContainer(ctx, sessionID) - if err != nil { - t.Fatal(err, "expected reaper container not found.") - } - if reaperContainer != nil { - t.Fatal("expected zero reaper running.") - } + ctx := context.Background() + sessionID := core.SessionID() + reaperContainer, err := spawner.lookupContainer(ctx, sessionID) + require.NoError(t, err) + require.NotNil(t, reaperContainer) } -func TestContainerStartsWithTheReaper(t *testing.T) { - config.Reset() // reset the config using the internal method to avoid the sync.Once - tcConfig := config.Read() - if tcConfig.RyukDisabled { - t.Skip("Ryuk is disabled, skipping test") - } +func TestContainer(t *testing.T) { + reaperDisable(t, false) - ctx := context.Background() + t.Run("start/reaper-enabled", func(t *testing.T) { + testContainerStart(t) + testReaperRunning(t) + }) - c, err := GenericContainer(ctx, GenericContainerRequest{ - ProviderType: providerType, - ContainerRequest: ContainerRequest{ - Image: nginxAlpineImage, - ExposedPorts: []string{ - nginxDefaultPort, - }, - }, - Started: true, + t.Run("stop/reaper-enabled", func(t *testing.T) { + testContainerStop(t) + testReaperRunning(t) }) - CleanupContainer(t, c) - if err != nil { - t.Fatal(err) - } - sessionID := core.SessionID() + t.Run("terminate/reaper-enabled", func(t *testing.T) { + testContainerTerminate(t) + testReaperRunning(t) + }) - reaperContainer, err := lookUpReaperContainer(ctx, sessionID) - if err != nil { - t.Fatal(err, "expected reaper container running.") - } - if reaperContainer == nil { - t.Fatal("expected one reaper to be running.") - } + reaperDisable(t, true) + + t.Run("start/reaper-disabled", func(t *testing.T) { + testContainerStart(t) + }) + + t.Run("stop/reaper-disabled", func(t *testing.T) { + testContainerStop(t) + }) + + t.Run("terminate/reaper-disabled", func(t *testing.T) { + testContainerTerminate(t) + }) } -func TestContainerStopWithReaper(t *testing.T) { - config.Reset() // reset the config using the internal method to avoid the sync.Once - tcConfig := config.Read() - if tcConfig.RyukDisabled { - t.Skip("Ryuk is disabled, skipping test") - } +// testContainerStop tests stopping a container. +func testContainerStop(t *testing.T) { + t.Helper() ctx := context.Background() @@ -201,37 +183,21 @@ func TestContainerStopWithReaper(t *testing.T) { require.NoError(t, err) state, err := nginxA.State(ctx) - if err != nil { - t.Fatal(err) - } - if state.Running != true { - t.Fatal("The container shoud be in running state") - } + require.NoError(t, err) + require.True(t, state.Running) + stopTimeout := 10 * time.Second err = nginxA.Stop(ctx, &stopTimeout) - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) state, err = nginxA.State(ctx) - if err != nil { - t.Fatal(err) - } - if state.Running != false { - t.Fatal("The container shoud not be running") - } - if state.Status != "exited" { - t.Fatal("The container shoud be in exited state") - } + require.NoError(t, err) + require.False(t, state.Running) + require.Equal(t, "exited", state.Status) } -func TestContainerTerminationWithReaper(t *testing.T) { - config.Reset() // reset the config using the internal method to avoid the sync.Once - tcConfig := config.Read() - if tcConfig.RyukDisabled { - t.Skip("Ryuk is disabled, skipping test") - } - +// testContainerTerminate tests terminating a container. +func testContainerTerminate(t *testing.T) { ctx := context.Background() nginxA, err := GenericContainer(ctx, GenericContainerRequest{ @@ -258,324 +224,274 @@ func TestContainerTerminationWithReaper(t *testing.T) { require.Error(t, err) } -func TestContainerTerminationWithoutReaper(t *testing.T) { - config.Reset() // reset the config using the internal method to avoid the sync.Once - tcConfig := config.Read() - if !tcConfig.RyukDisabled { - t.Skip("Ryuk is enabled, skipping test") - } +func Test_NewReaper(t *testing.T) { + reaperDisable(t, false) ctx := context.Background() - nginxA, err := GenericContainer(ctx, GenericContainerRequest{ - ProviderType: providerType, - ContainerRequest: ContainerRequest{ - Image: nginxAlpineImage, - ExposedPorts: []string{ - nginxDefaultPort, + t.Run("non-privileged", func(t *testing.T) { + testNewReaper(ctx, t, + config.Config{ + RyukConnectionTimeout: time.Minute, + RyukReconnectionTimeout: 10 * time.Second, }, - }, - Started: true, + createContainerRequest(), + ) }) - CleanupContainer(t, nginxA) - if err != nil { - t.Fatal(err) - } - - state, err := nginxA.State(ctx) - if err != nil { - t.Fatal(err) - } - if state.Running != true { - t.Fatal("The container shoud be in running state") - } - err = nginxA.Terminate(ctx) - if err != nil { - t.Fatal(err) - } - - _, err = nginxA.State(ctx) - if err == nil { - t.Fatal("expected error from container inspect.") - } -} - -func Test_NewReaper(t *testing.T) { - config.Reset() // reset the config using the internal method to avoid the sync.Once - tcConfig := config.Read() - if tcConfig.RyukDisabled { - t.Skip("Ryuk is disabled, skipping test") - } - - type cases struct { - name string - req ContainerRequest - config TestcontainersConfig - ctx context.Context - env map[string]string - } - tests := []cases{ - { - name: "non-privileged", - req: createContainerRequest(nil), - config: TestcontainersConfig{Config: config.Config{ - RyukConnectionTimeout: time.Minute, - RyukReconnectionTimeout: 10 * time.Second, - }}, - }, - { - name: "privileged", - req: createContainerRequest(func(req ContainerRequest) ContainerRequest { - req.Privileged = true - return req - }), - config: TestcontainersConfig{Config: config.Config{ + t.Run("privileged", func(t *testing.T) { + testNewReaper(ctx, t, + config.Config{ RyukPrivileged: true, RyukConnectionTimeout: time.Minute, RyukReconnectionTimeout: 10 * time.Second, - }}, - }, - { - name: "configured non-default timeouts", - req: createContainerRequest(func(req ContainerRequest) ContainerRequest { - req.Env = map[string]string{ - "RYUK_CONNECTION_TIMEOUT": "1m0s", - "RYUK_RECONNECTION_TIMEOUT": "10m0s", - } - return req - }), - config: TestcontainersConfig{Config: config.Config{ + }, + createContainerRequest(), + ) + }) + + t.Run("custom-timeouts", func(t *testing.T) { + testNewReaper(ctx, t, + config.Config{ RyukPrivileged: true, - RyukConnectionTimeout: time.Minute, - RyukReconnectionTimeout: 10 * time.Minute, - }}, - }, - { - name: "configured verbose mode", - req: createContainerRequest(func(req ContainerRequest) ContainerRequest { + RyukConnectionTimeout: 2 * time.Minute, + RyukReconnectionTimeout: 20 * time.Second, + }, + createContainerRequest(func(req *ContainerRequest) { req.Env = map[string]string{ - "RYUK_VERBOSE": "true", + "RYUK_CONNECTION_TIMEOUT": "2m0s", + "RYUK_RECONNECTION_TIMEOUT": "20s", } - return req }), - config: TestcontainersConfig{Config: config.Config{ + ) + }) + + t.Run("verbose", func(t *testing.T) { + testNewReaper(ctx, t, + config.Config{ RyukPrivileged: true, RyukVerbose: true, - }}, - }, - { - name: "docker-host in context", - req: createContainerRequest(func(req ContainerRequest) ContainerRequest { - req.HostConfigModifier = func(hostConfig *container.HostConfig) { - hostConfig.Binds = []string{core.MustExtractDockerSocket(context.Background()) + ":/var/run/docker.sock"} + }, + createContainerRequest(func(req *ContainerRequest) { + req.Env = map[string]string{ + "RYUK_VERBOSE": "true", } - return req }), - config: TestcontainersConfig{Config: config.Config{ + ) + }) + + t.Run("docker-host", func(t *testing.T) { + testNewReaper(context.WithValue(ctx, core.DockerHostContextKey, core.DockerSocketPathWithSchema), t, + config.Config{ RyukConnectionTimeout: time.Minute, RyukReconnectionTimeout: 10 * time.Second, - }}, - ctx: context.WithValue(context.TODO(), core.DockerHostContextKey, core.DockerSocketPathWithSchema), - }, - { - name: "Reaper including custom Hub prefix", - req: createContainerRequest(func(req ContainerRequest) ContainerRequest { - req.Image = config.ReaperDefaultImage - req.Privileged = true - return req + }, + createContainerRequest(func(req *ContainerRequest) { + req.HostConfigModifier = func(hostConfig *container.HostConfig) { + hostConfig.Binds = []string{core.MustExtractDockerSocket(ctx) + ":/var/run/docker.sock"} + } }), - config: TestcontainersConfig{Config: config.Config{ + ) + }) + + t.Run("hub-prefix", func(t *testing.T) { + testNewReaper(context.WithValue(ctx, core.DockerHostContextKey, core.DockerSocketPathWithSchema), t, + config.Config{ HubImageNamePrefix: "registry.mycompany.com/mirror", RyukPrivileged: true, RyukConnectionTimeout: time.Minute, RyukReconnectionTimeout: 10 * time.Second, - }}, - }, - { - name: "Reaper including custom Hub prefix as env var", - req: createContainerRequest(func(req ContainerRequest) ContainerRequest { + }, + createContainerRequest(func(req *ContainerRequest) { req.Image = config.ReaperDefaultImage req.Privileged = true - return req }), - config: TestcontainersConfig{Config: config.Config{ + ) + }) + + t.Run("hub-prefix-env", func(t *testing.T) { + config.Reset() + t.Cleanup(config.Reset) + + t.Setenv("TESTCONTAINERS_HUB_IMAGE_NAME_PREFIX", "registry.mycompany.com/mirror") + testNewReaper(context.WithValue(ctx, core.DockerHostContextKey, core.DockerSocketPathWithSchema), t, + config.Config{ RyukPrivileged: true, RyukConnectionTimeout: time.Minute, RyukReconnectionTimeout: 10 * time.Second, - }}, - env: map[string]string{ - "TESTCONTAINERS_HUB_IMAGE_NAME_PREFIX": "registry.mycompany.com/mirror", }, - }, - } + createContainerRequest(func(req *ContainerRequest) { + req.Image = config.ReaperDefaultImage + req.Privileged = true + }), + ) + }) +} - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - if test.env != nil { - config.Reset() // reset the config using the internal method to avoid the sync.Once - for k, v := range test.env { - t.Setenv(k, v) - } - } +func testNewReaper(ctx context.Context, t *testing.T, cfg config.Config, req ContainerRequest) { + t.Helper() - if prefix := os.Getenv("TESTCONTAINERS_HUB_IMAGE_NAME_PREFIX"); prefix != "" { - test.config.Config.HubImageNamePrefix = prefix - } - - provider := newMockReaperProvider(t) - provider.config = test.config - t.Cleanup(provider.RestoreReaperState) + if prefix := os.Getenv("TESTCONTAINERS_HUB_IMAGE_NAME_PREFIX"); prefix != "" { + cfg.HubImageNamePrefix = prefix + } - if test.ctx == nil { - test.ctx = context.TODO() - } + provider := newMockReaperProvider(cfg) - _, err := reuseOrCreateReaper(test.ctx, testSessionID, provider) - // we should have errored out see mockReaperProvider.RunContainer - require.EqualError(t, err, "expected") + // We need a new reaperSpawner for each test case to avoid reusing + // an existing reaper instance. + spawner := &reaperSpawner{} + reaper, err := spawner.reaper(ctx, testSessionID, provider) + cleanupReaper(t, reaper, spawner) + // We should have errored out see mockReaperProvider.RunContainer. + require.ErrorIs(t, err, errExpected) - assert.Equal(t, test.req.Image, provider.req.Image, "expected image doesn't match the submitted request") - assert.Equal(t, test.req.ExposedPorts, provider.req.ExposedPorts, "expected exposed ports don't match the submitted request") - assert.Equal(t, test.req.Labels, provider.req.Labels, "expected labels don't match the submitted request") - assert.Equal(t, test.req.Mounts, provider.req.Mounts, "expected mounts don't match the submitted request") - assert.Equal(t, test.req.WaitingFor, provider.req.WaitingFor, "expected waitingFor don't match the submitted request") - assert.Equal(t, test.req.Env, provider.req.Env, "expected env doesn't match the submitted request") + require.Equal(t, req.Image, provider.req.Image, "expected image doesn't match the submitted request") + require.Equal(t, req.ExposedPorts, provider.req.ExposedPorts, "expected exposed ports don't match the submitted request") + require.Equal(t, req.Labels, provider.req.Labels, "expected labels don't match the submitted request") + require.Equal(t, req.Mounts, provider.req.Mounts, "expected mounts don't match the submitted request") + require.Equal(t, req.WaitingFor, provider.req.WaitingFor, "expected waitingFor don't match the submitted request") + require.Equal(t, req.Env, provider.req.Env, "expected env doesn't match the submitted request") - // checks for reaper's preCreationCallback fields - assert.Equal(t, container.NetworkMode(Bridge), provider.hostConfig.NetworkMode, "expected networkMode doesn't match the submitted request") - assert.True(t, provider.hostConfig.AutoRemove, "expected networkMode doesn't match the submitted request") - }) - } + // checks for reaper's preCreationCallback fields + require.Equal(t, container.NetworkMode(Bridge), provider.hostConfig.NetworkMode, "expected networkMode doesn't match the submitted request") + require.True(t, provider.hostConfig.AutoRemove, "expected networkMode doesn't match the submitted request") } func Test_ReaperReusedIfHealthy(t *testing.T) { - config.Reset() // reset the config using the internal method to avoid the sync.Once - tcConfig := config.Read() - if tcConfig.RyukDisabled { - t.Skip("Ryuk is disabled, skipping test") - } - - testProvider := newMockReaperProvider(t) - t.Cleanup(testProvider.RestoreReaperState) + reaperDisable(t, false) SkipIfProviderIsNotHealthy(t) ctx := context.Background() // As other integration tests run with the (shared) Reaper as well, re-use the instance to not interrupt other tests - wasReaperRunning := reaperInstance != nil + if spawner.instance != nil { + t.Cleanup(func() { + require.NoError(t, spawner.cleanup()) + }) + } + + provider, err := ProviderDocker.GetProvider() + require.NoError(t, err) - provider, _ := ProviderDocker.GetProvider() - reaper, err := reuseOrCreateReaper(context.WithValue(ctx, core.DockerHostContextKey, provider.(*DockerProvider).host), testSessionID, provider) + reaper, err := spawner.reaper(context.WithValue(ctx, core.DockerHostContextKey, provider.(*DockerProvider).host), testSessionID, provider) + cleanupReaper(t, reaper, spawner) require.NoError(t, err, "creating the Reaper should not error") - reaperReused, err := reuseOrCreateReaper(context.WithValue(ctx, core.DockerHostContextKey, provider.(*DockerProvider).host), testSessionID, provider) + reaperReused, err := spawner.reaper(context.WithValue(ctx, core.DockerHostContextKey, provider.(*DockerProvider).host), testSessionID, provider) + cleanupReaper(t, reaper, spawner) require.NoError(t, err, "reusing the Reaper should not error") - // assert that the internal state of both reaper instances is the same - assert.Equal(t, reaper.SessionID, reaperReused.SessionID, "expecting the same SessionID") - assert.Equal(t, reaper.Endpoint, reaperReused.Endpoint, "expecting the same reaper endpoint") - assert.Equal(t, reaper.Provider, reaperReused.Provider, "expecting the same container provider") - assert.Equal(t, reaper.container.GetContainerID(), reaperReused.container.GetContainerID(), "expecting the same container ID") - assert.Equal(t, reaper.container.SessionID(), reaperReused.container.SessionID(), "expecting the same session ID") - - terminate, err := reaper.Connect() - defer func(term chan bool) { - term <- true - }(terminate) - require.NoError(t, err, "connecting to Reaper should be successful") - if !wasReaperRunning { - CleanupContainer(t, reaper.container) - } + // Ensure the internal state of both reaper instances is the same + require.Equal(t, reaper.SessionID, reaperReused.SessionID, "expecting the same SessionID") + require.Equal(t, reaper.Endpoint, reaperReused.Endpoint, "expecting the same reaper endpoint") + require.Equal(t, reaper.Provider, reaperReused.Provider, "expecting the same container provider") + require.Equal(t, reaper.container.GetContainerID(), reaperReused.container.GetContainerID(), "expecting the same container ID") + require.Equal(t, reaper.container.SessionID(), reaperReused.container.SessionID(), "expecting the same session ID") + + termSignal, err := reaper.Connect() + cleanupTermSignal(t, termSignal) + require.NoError(t, err, "connecting to Reaper should be successful") } func Test_RecreateReaperIfTerminated(t *testing.T) { - config.Reset() // reset the config using the internal method to avoid the sync.Once - tcConfig := config.Read() - if tcConfig.RyukDisabled { - t.Skip("Ryuk is disabled, skipping test") - } - - mockProvider := newMockReaperProvider(t) - t.Cleanup(mockProvider.RestoreReaperState) + reaperDisable(t, false) SkipIfProviderIsNotHealthy(t) - provider, _ := ProviderDocker.GetProvider() + provider, err := ProviderDocker.GetProvider() + require.NoError(t, err) + ctx := context.Background() - reaper, err := reuseOrCreateReaper(context.WithValue(ctx, core.DockerHostContextKey, provider.(*DockerProvider).host), testSessionID, provider) + reaper, err := spawner.reaper(context.WithValue(ctx, core.DockerHostContextKey, provider.(*DockerProvider).host), testSessionID, provider) + cleanupReaper(t, reaper, spawner) require.NoError(t, err, "creating the Reaper should not error") - terminate, err := reaper.Connect() - require.NoError(t, err, "connecting to Reaper should be successful") - terminate <- true + termSignal, err := reaper.Connect() + if termSignal != nil { + termSignal <- true + } + require.NoError(t, err) - // Wait for ryuk's default timeout (10s) + 1s to allow for a graceful shutdown/cleanup of the container. - time.Sleep(11 * time.Second) + // Wait for up to ryuk's default reconnect timeout + 1s to allow for a graceful shutdown/cleanup of the container. + timeout := time.NewTimer(time.Second * 11) + t.Cleanup(func() { + timeout.Stop() + }) + for { + state, err := reaper.container.State(ctx) + if err != nil { + if errdefs.IsNotFound(err) { + break + } + require.NoError(t, err) + } + + if !state.Running { + break + } + + select { + case <-timeout.C: + t.Fatal("reaper container should have been terminated") + default: + } - recreatedReaper, err := reuseOrCreateReaper(context.WithValue(ctx, core.DockerHostContextKey, provider.(*DockerProvider).host), testSessionID, provider) + time.Sleep(time.Millisecond * 100) + } + + recreatedReaper, err := spawner.reaper(context.WithValue(ctx, core.DockerHostContextKey, provider.(*DockerProvider).host), testSessionID, provider) + cleanupReaper(t, recreatedReaper, spawner) require.NoError(t, err, "creating the Reaper should not error") - assert.NotEqual(t, reaper.container.GetContainerID(), recreatedReaper.container.GetContainerID(), "expected different container ID") + require.NotEqual(t, reaper.container.GetContainerID(), recreatedReaper.container.GetContainerID(), "expected different container ID") - terminate, err = recreatedReaper.Connect() - defer func(term chan bool) { - term <- true - }(terminate) + recreatedTermSignal, err := recreatedReaper.Connect() + cleanupTermSignal(t, recreatedTermSignal) require.NoError(t, err, "connecting to Reaper should be successful") - CleanupContainer(t, recreatedReaper.container) } func TestReaper_reuseItFromOtherTestProgramUsingDocker(t *testing.T) { - config.Reset() // reset the config using the internal method to avoid the sync.Once - tcConfig := config.Read() - if tcConfig.RyukDisabled { - t.Skip("Ryuk is disabled, skipping test") - } + reaperDisable(t, false) - mockProvider := &mockReaperProvider{ - initialReaper: reaperInstance, - //nolint:govet - initialReaperOnce: reaperOnce, - t: t, - } - t.Cleanup(mockProvider.RestoreReaperState) - - // explicitly set the reaperInstance to nil to simulate another test program in the same session accessing the same reaper - reaperInstance = nil - reaperOnce = sync.Once{} + // Explicitly set the reaper instance to nil to simulate another test + // program in the same session accessing the same reaper. + spawner.instance = nil SkipIfProviderIsNotHealthy(t) ctx := context.Background() - // As other integration tests run with the (shared) Reaper as well, re-use the instance to not interrupt other tests - wasReaperRunning := reaperInstance != nil + // As other integration tests run with the (shared) Reaper as well, + // re-use the instance to not interrupt other tests. + if spawner.instance != nil { + t.Cleanup(func() { + require.NoError(t, spawner.cleanup()) + }) + } - provider, _ := ProviderDocker.GetProvider() - reaper, err := reuseOrCreateReaper(context.WithValue(ctx, core.DockerHostContextKey, provider.(*DockerProvider).host), testSessionID, provider) + provider, err := ProviderDocker.GetProvider() + require.NoError(t, err) + + reaper, err := spawner.reaper(context.WithValue(ctx, core.DockerHostContextKey, provider.(*DockerProvider).host), testSessionID, provider) + cleanupReaper(t, reaper, spawner) require.NoError(t, err, "creating the Reaper should not error") - // explicitly reset the reaperInstance to nil to simulate another test program in the same session accessing the same reaper - reaperInstance = nil - reaperOnce = sync.Once{} + // Explicitly reset the reaper instance to nil to simulate another test + // program in the same session accessing the same reaper. + spawner.instance = nil - reaperReused, err := reuseOrCreateReaper(context.WithValue(ctx, core.DockerHostContextKey, provider.(*DockerProvider).host), testSessionID, provider) + reaperReused, err := spawner.reaper(context.WithValue(ctx, core.DockerHostContextKey, provider.(*DockerProvider).host), testSessionID, provider) + cleanupReaper(t, reaper, spawner) require.NoError(t, err, "reusing the Reaper should not error") - // assert that the internal state of both reaper instances is the same - assert.Equal(t, reaper.SessionID, reaperReused.SessionID, "expecting the same SessionID") - assert.Equal(t, reaper.Endpoint, reaperReused.Endpoint, "expecting the same reaper endpoint") - assert.Equal(t, reaper.Provider, reaperReused.Provider, "expecting the same container provider") - assert.Equal(t, reaper.container.GetContainerID(), reaperReused.container.GetContainerID(), "expecting the same container ID") - assert.Equal(t, reaper.container.SessionID(), reaperReused.container.SessionID(), "expecting the same session ID") - - terminate, err := reaper.Connect() - defer func(term chan bool) { - term <- true - }(terminate) - require.NoError(t, err, "connecting to Reaper should be successful") - if !wasReaperRunning { - CleanupContainer(t, reaper.container) - } + // Ensure that the internal state of both reaper instances is the same. + require.Equal(t, reaper.SessionID, reaperReused.SessionID, "expecting the same SessionID") + require.Equal(t, reaper.Endpoint, reaperReused.Endpoint, "expecting the same reaper endpoint") + require.Equal(t, reaper.Provider, reaperReused.Provider, "expecting the same container provider") + require.Equal(t, reaper.container.GetContainerID(), reaperReused.container.GetContainerID(), "expecting the same container ID") + require.Equal(t, reaper.container.SessionID(), reaperReused.container.SessionID(), "expecting the same session ID") + + termSignal, err := reaper.Connect() + cleanupTermSignal(t, termSignal) + require.NoError(t, err, "connecting to Reaper should be successful") } // TestReaper_ReuseRunning tests whether reusing the reaper if using @@ -586,15 +502,11 @@ func TestReaper_reuseItFromOtherTestProgramUsingDocker(t *testing.T) { // already running for the same session id by returning its container instance // instead. func TestReaper_ReuseRunning(t *testing.T) { - config.Reset() // reset the config using the internal method to avoid the sync.Once - tcConfig := config.Read() - if tcConfig.RyukDisabled { - t.Skip("Ryuk is disabled, skipping test") - } + reaperDisable(t, false) const concurrency = 64 - timeout, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + timeout, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() sessionID := SessionID() @@ -605,27 +517,54 @@ func TestReaper_ReuseRunning(t *testing.T) { obtainedReaperContainerIDs := make([]string, concurrency) var wg sync.WaitGroup for i := 0; i < concurrency; i++ { - i := i wg.Add(1) - go func() { + go func(i int) { defer wg.Done() - reaperContainer, err := lookUpReaperContainer(timeout, sessionID) - if err == nil && reaperContainer != nil { - // Found. - obtainedReaperContainerIDs[i] = reaperContainer.GetContainerID() - return - } - // Not found -> create. - createdReaper, err := newReaper(timeout, sessionID, dockerProvider) - require.NoError(t, err, "new reaper should not fail") - obtainedReaperContainerIDs[i] = createdReaper.container.GetContainerID() - }() + spawner := &reaperSpawner{} + reaper, err := spawner.reaper(timeout, sessionID, dockerProvider) + cleanupReaper(t, reaper, spawner) + require.NoError(t, err) + + obtainedReaperContainerIDs[i] = reaper.container.GetContainerID() + }(i) } wg.Wait() // Assure that all calls returned the same container. firstContainerID := obtainedReaperContainerIDs[0] for i, containerID := range obtainedReaperContainerIDs { - assert.Equal(t, firstContainerID, containerID, "call %d should have returned same container id", i) + require.Equal(t, firstContainerID, containerID, "call %d should have returned same container id", i) + } +} + +func TestSpawnerBackoff(t *testing.T) { + b := spawner.backoff() + for i := 0; i < 100; i++ { + require.LessOrEqual(t, b.NextBackOff(), time.Millisecond*250, "backoff should not exceed max interval") + } +} + +// cleanupReaper schedules reaper for cleanup if it's not nil. +func cleanupReaper(t *testing.T, reaper *Reaper, spawner *reaperSpawner) { + t.Helper() + + if reaper == nil { + return } + + t.Cleanup(func() { + reaper.close() + require.NoError(t, spawner.cleanup()) + }) +} + +// cleanupTermSignal ensures that termSignal +func cleanupTermSignal(t *testing.T, termSignal chan bool) { + t.Helper() + + t.Cleanup(func() { + if termSignal != nil { + termSignal <- true + } + }) } diff --git a/testing.go b/testing.go index 41391519deb..cafd4fe920c 100644 --- a/testing.go +++ b/testing.go @@ -68,11 +68,11 @@ func (lc *StdoutLogConsumer) Accept(l Log) { // container is stopped when the function ends. // // before any error check. If container is nil, its a no-op. -func CleanupContainer(tb testing.TB, container Container, options ...TerminateOption) { +func CleanupContainer(tb testing.TB, ctr Container, options ...TerminateOption) { tb.Helper() tb.Cleanup(func() { - noErrorOrIgnored(tb, TerminateContainer(container, options...)) + noErrorOrIgnored(tb, TerminateContainer(ctr, options...)) }) }