Skip to content

Commit cfb9bfc

Browse files
author
Vladimir Stepanov
committed
implement parallel container runner
1 parent 82b7734 commit cfb9bfc

File tree

1 file changed

+105
-0
lines changed

1 file changed

+105
-0
lines changed

generic.go

+105
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,27 @@ package testcontainers
33
import (
44
"context"
55
"fmt"
6+
"sync"
67
)
78

9+
const (
10+
MaxGenericWorkers = 8
11+
)
12+
13+
type GenericParallelErrors struct {
14+
Errors []GenericContainerRequestError
15+
}
16+
17+
func (gpe GenericParallelErrors) Error() string {
18+
return fmt.Sprintf("%v", gpe.Errors)
19+
}
20+
21+
// GenericContainerRequestError represents error from parallel request
22+
type GenericContainerRequestError struct {
23+
Request GenericContainerRequest
24+
Error error
25+
}
26+
827
// GenericContainerRequest represents parameters to a generic container
928
type GenericContainerRequest struct {
1029
ContainerRequest // embedded request for provider
@@ -33,6 +52,92 @@ func GenericNetwork(ctx context.Context, req GenericNetworkRequest) (Network, er
3352
return network, nil
3453
}
3554

55+
func genericContainerRunner(
56+
ctx context.Context,
57+
requests <-chan GenericContainerRequest,
58+
errors chan<- GenericContainerRequestError,
59+
containers chan<- Container,
60+
wg *sync.WaitGroup) {
61+
for req := range requests {
62+
c, err := GenericContainer(ctx, req)
63+
if err != nil {
64+
errors <- GenericContainerRequestError{
65+
Request: req,
66+
Error: err,
67+
}
68+
continue
69+
}
70+
containers <- c
71+
}
72+
wg.Done()
73+
}
74+
75+
// GenericParallelContainers creates a generic containers with parameters in parallel mode
76+
func GenericParallelContainers(ctx context.Context, reqs []GenericContainerRequest) ([]Container, error) {
77+
tasksChanSize := MaxGenericWorkers
78+
if tasksChanSize > len(reqs) {
79+
tasksChanSize = len(reqs)
80+
}
81+
82+
tasksChan := make(chan GenericContainerRequest, tasksChanSize)
83+
errsChan := make(chan GenericContainerRequestError)
84+
resChan := make(chan Container)
85+
waitRes := make(chan struct{})
86+
87+
containers := make([]Container, 0)
88+
errors := make([]GenericContainerRequestError, 0)
89+
90+
wg := sync.WaitGroup{}
91+
wg.Add(tasksChanSize)
92+
93+
// run workers
94+
for i := 0; i < tasksChanSize; i++ {
95+
go genericContainerRunner(ctx, tasksChan, errsChan, resChan, &wg)
96+
}
97+
98+
go func() {
99+
for {
100+
select {
101+
case c, ok := <-resChan:
102+
if !ok {
103+
resChan = nil
104+
} else {
105+
containers = append(containers, c)
106+
}
107+
case e, ok := <-errsChan:
108+
if !ok {
109+
errsChan = nil
110+
} else {
111+
errors = append(errors, e)
112+
}
113+
}
114+
115+
if resChan == nil && errsChan == nil {
116+
waitRes <- struct{}{}
117+
break
118+
}
119+
}
120+
121+
}()
122+
123+
for _, req := range reqs {
124+
tasksChan <- req
125+
}
126+
close(tasksChan)
127+
wg.Wait()
128+
close(resChan)
129+
close(errsChan)
130+
131+
<-waitRes
132+
133+
if len(errors) == 0 {
134+
return containers, GenericParallelErrors{Errors: errors}
135+
}
136+
137+
return containers, nil
138+
139+
}
140+
36141
// GenericContainer creates a generic container with parameters
37142
func GenericContainer(ctx context.Context, req GenericContainerRequest) (Container, error) {
38143
logging := req.Logger

0 commit comments

Comments
 (0)