Skip to content

Commit

Permalink
move parallel implementation into single file
Browse files Browse the repository at this point in the history
  • Loading branch information
Vladimir Stepanov committed Jun 13, 2022
1 parent e9cc2ac commit a4edf72
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 105 deletions.
105 changes: 0 additions & 105 deletions generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,8 @@ package testcontainers
import (
"context"
"fmt"
"sync"
)

const (
MaxGenericWorkers = 8
)

type GenericParallelErrors struct {
Errors []GenericContainerRequestError
}

func (gpe GenericParallelErrors) Error() string {
return fmt.Sprintf("%v", gpe.Errors)
}

// GenericContainerRequestError represents error from parallel request
type GenericContainerRequestError struct {
Request GenericContainerRequest
Error error
}

// GenericContainerRequest represents parameters to a generic container
type GenericContainerRequest struct {
ContainerRequest // embedded request for provider
Expand Down Expand Up @@ -52,92 +33,6 @@ func GenericNetwork(ctx context.Context, req GenericNetworkRequest) (Network, er
return network, nil
}

func genericContainerRunner(
ctx context.Context,
requests <-chan GenericContainerRequest,
errors chan<- GenericContainerRequestError,
containers chan<- Container,
wg *sync.WaitGroup) {
for req := range requests {
c, err := GenericContainer(ctx, req)
if err != nil {
errors <- GenericContainerRequestError{
Request: req,
Error: err,
}
continue
}
containers <- c
}
wg.Done()
}

// GenericParallelContainers creates a generic containers with parameters in parallel mode
func GenericParallelContainers(ctx context.Context, reqs []GenericContainerRequest) ([]Container, error) {
tasksChanSize := MaxGenericWorkers
if tasksChanSize > len(reqs) {
tasksChanSize = len(reqs)
}

tasksChan := make(chan GenericContainerRequest, tasksChanSize)
errsChan := make(chan GenericContainerRequestError)
resChan := make(chan Container)
waitRes := make(chan struct{})

containers := make([]Container, 0)
errors := make([]GenericContainerRequestError, 0)

wg := sync.WaitGroup{}
wg.Add(tasksChanSize)

// run workers
for i := 0; i < tasksChanSize; i++ {
go genericContainerRunner(ctx, tasksChan, errsChan, resChan, &wg)
}

go func() {
for {
select {
case c, ok := <-resChan:
if !ok {
resChan = nil
} else {
containers = append(containers, c)
}
case e, ok := <-errsChan:
if !ok {
errsChan = nil
} else {
errors = append(errors, e)
}
}

if resChan == nil && errsChan == nil {
waitRes <- struct{}{}
break
}
}

}()

for _, req := range reqs {
tasksChan <- req
}
close(tasksChan)
wg.Wait()
close(resChan)
close(errsChan)

<-waitRes

if len(errors) == 0 {
return containers, GenericParallelErrors{Errors: errors}
}

return containers, nil

}

// GenericContainer creates a generic container with parameters
func GenericContainer(ctx context.Context, req GenericContainerRequest) (Container, error) {
logging := req.Logger
Expand Down
111 changes: 111 additions & 0 deletions parallel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package testcontainers

import (
"context"
"fmt"
"sync"
)

const (
MaxGenericWorkers = 8
)

type GenericParallelErrors struct {
Errors []GenericContainerRequestError
}

func (gpe GenericParallelErrors) Error() string {
return fmt.Sprintf("%v", gpe.Errors)
}

// GenericContainerRequestError represents error from parallel request
type GenericContainerRequestError struct {
Request GenericContainerRequest
Error error
}

func genericContainerRunner(
ctx context.Context,
requests <-chan GenericContainerRequest,
errors chan<- GenericContainerRequestError,
containers chan<- Container,
wg *sync.WaitGroup) {
for req := range requests {
c, err := GenericContainer(ctx, req)
if err != nil {
errors <- GenericContainerRequestError{
Request: req,
Error: err,
}
continue
}
containers <- c
}
wg.Done()
}

// GenericParallelContainers creates a generic containers with parameters in parallel mode
func GenericParallelContainers(ctx context.Context, reqs []GenericContainerRequest) ([]Container, error) {
tasksChanSize := MaxGenericWorkers
if tasksChanSize > len(reqs) {
tasksChanSize = len(reqs)
}

tasksChan := make(chan GenericContainerRequest, tasksChanSize)
errsChan := make(chan GenericContainerRequestError)
resChan := make(chan Container)
waitRes := make(chan struct{})

containers := make([]Container, 0)
errors := make([]GenericContainerRequestError, 0)

wg := sync.WaitGroup{}
wg.Add(tasksChanSize)

// run workers
for i := 0; i < tasksChanSize; i++ {
go genericContainerRunner(ctx, tasksChan, errsChan, resChan, &wg)
}

go func() {
for {
select {
case c, ok := <-resChan:
if !ok {
resChan = nil
} else {
containers = append(containers, c)
}
case e, ok := <-errsChan:
if !ok {
errsChan = nil
} else {
errors = append(errors, e)
}
}

if resChan == nil && errsChan == nil {
waitRes <- struct{}{}
break
}
}

}()

for _, req := range reqs {
tasksChan <- req
}
close(tasksChan)
wg.Wait()
close(resChan)
close(errsChan)

<-waitRes

if len(errors) == 0 {
return containers, GenericParallelErrors{Errors: errors}
}

return containers, nil

}
File renamed without changes.

0 comments on commit a4edf72

Please sign in to comment.