diff --git a/generic.go b/generic.go index d5712deb97..fbfd8947c2 100644 --- a/generic.go +++ b/generic.go @@ -3,27 +3,8 @@ package testcontainers import ( "context" "fmt" - "sync" ) -const ( - MaxGenericWorkers = 8 -) - -type GenericParallelErrors struct { - Errors []GenericContainerRequestError -} - -func (gpe GenericParallelErrors) Error() string { - return fmt.Sprintf("%v", gpe.Errors) -} - -// GenericContainerRequestError represents error from parallel request -type GenericContainerRequestError struct { - Request GenericContainerRequest - Error error -} - // GenericContainerRequest represents parameters to a generic container type GenericContainerRequest struct { ContainerRequest // embedded request for provider @@ -52,92 +33,6 @@ func GenericNetwork(ctx context.Context, req GenericNetworkRequest) (Network, er return network, nil } -func genericContainerRunner( - ctx context.Context, - requests <-chan GenericContainerRequest, - errors chan<- GenericContainerRequestError, - containers chan<- Container, - wg *sync.WaitGroup) { - for req := range requests { - c, err := GenericContainer(ctx, req) - if err != nil { - errors <- GenericContainerRequestError{ - Request: req, - Error: err, - } - continue - } - containers <- c - } - wg.Done() -} - -// GenericParallelContainers creates a generic containers with parameters in parallel mode -func GenericParallelContainers(ctx context.Context, reqs []GenericContainerRequest) ([]Container, error) { - tasksChanSize := MaxGenericWorkers - if tasksChanSize > len(reqs) { - tasksChanSize = len(reqs) - } - - tasksChan := make(chan GenericContainerRequest, tasksChanSize) - errsChan := make(chan GenericContainerRequestError) - resChan := make(chan Container) - waitRes := make(chan struct{}) - - containers := make([]Container, 0) - errors := make([]GenericContainerRequestError, 0) - - wg := sync.WaitGroup{} - wg.Add(tasksChanSize) - - // run workers - for i := 0; i < tasksChanSize; i++ { - go genericContainerRunner(ctx, tasksChan, errsChan, resChan, &wg) - } - - go func() { - for { - select { - case c, ok := <-resChan: - if !ok { - resChan = nil - } else { - containers = append(containers, c) - } - case e, ok := <-errsChan: - if !ok { - errsChan = nil - } else { - errors = append(errors, e) - } - } - - if resChan == nil && errsChan == nil { - waitRes <- struct{}{} - break - } - } - - }() - - for _, req := range reqs { - tasksChan <- req - } - close(tasksChan) - wg.Wait() - close(resChan) - close(errsChan) - - <-waitRes - - if len(errors) == 0 { - return containers, GenericParallelErrors{Errors: errors} - } - - return containers, nil - -} - // GenericContainer creates a generic container with parameters func GenericContainer(ctx context.Context, req GenericContainerRequest) (Container, error) { logging := req.Logger diff --git a/parallel.go b/parallel.go new file mode 100644 index 0000000000..5aa7dc0a7c --- /dev/null +++ b/parallel.go @@ -0,0 +1,111 @@ +package testcontainers + +import ( + "context" + "fmt" + "sync" +) + +const ( + MaxGenericWorkers = 8 +) + +type GenericParallelErrors struct { + Errors []GenericContainerRequestError +} + +func (gpe GenericParallelErrors) Error() string { + return fmt.Sprintf("%v", gpe.Errors) +} + +// GenericContainerRequestError represents error from parallel request +type GenericContainerRequestError struct { + Request GenericContainerRequest + Error error +} + +func genericContainerRunner( + ctx context.Context, + requests <-chan GenericContainerRequest, + errors chan<- GenericContainerRequestError, + containers chan<- Container, + wg *sync.WaitGroup) { + for req := range requests { + c, err := GenericContainer(ctx, req) + if err != nil { + errors <- GenericContainerRequestError{ + Request: req, + Error: err, + } + continue + } + containers <- c + } + wg.Done() +} + +// GenericParallelContainers creates a generic containers with parameters in parallel mode +func GenericParallelContainers(ctx context.Context, reqs []GenericContainerRequest) ([]Container, error) { + tasksChanSize := MaxGenericWorkers + if tasksChanSize > len(reqs) { + tasksChanSize = len(reqs) + } + + tasksChan := make(chan GenericContainerRequest, tasksChanSize) + errsChan := make(chan GenericContainerRequestError) + resChan := make(chan Container) + waitRes := make(chan struct{}) + + containers := make([]Container, 0) + errors := make([]GenericContainerRequestError, 0) + + wg := sync.WaitGroup{} + wg.Add(tasksChanSize) + + // run workers + for i := 0; i < tasksChanSize; i++ { + go genericContainerRunner(ctx, tasksChan, errsChan, resChan, &wg) + } + + go func() { + for { + select { + case c, ok := <-resChan: + if !ok { + resChan = nil + } else { + containers = append(containers, c) + } + case e, ok := <-errsChan: + if !ok { + errsChan = nil + } else { + errors = append(errors, e) + } + } + + if resChan == nil && errsChan == nil { + waitRes <- struct{}{} + break + } + } + + }() + + for _, req := range reqs { + tasksChan <- req + } + close(tasksChan) + wg.Wait() + close(resChan) + close(errsChan) + + <-waitRes + + if len(errors) == 0 { + return containers, GenericParallelErrors{Errors: errors} + } + + return containers, nil + +} diff --git a/generic_test.go b/parallel_test.go similarity index 100% rename from generic_test.go rename to parallel_test.go