-
Notifications
You must be signed in to change notification settings - Fork 0
/
lib.go
175 lines (152 loc) · 4.81 KB
/
lib.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
package pbatch
import (
"strings"
"sync"
)
type errorHandler bool
const (
STOP_ON_ERROR errorHandler = true
CONTINUE_ON_ERROR errorHandler = false
)
// BatchError is a custom error type that aggregates multiple errors
type BatchError struct {
Errors []error
}
func (e BatchError) Error() string {
var errStrings []string
for _, err := range e.Errors {
errStrings = append(errStrings, err.Error())
}
return "batch error: " + strings.Join(errStrings, "; ")
}
// IsBatchError checks if an error is a BatchError
func IsBatchError(err error) bool {
_, ok := err.(BatchError)
return ok
}
// UnwrapBatchError unwraps a BatchError and returns the list of errors
//
// Parameters:
// - err: the error to unwrap
//
// Returns:
// - a slice of errors if the error is a BatchError, or a slice containing the error itself
func UnwrapBatchError(err error) []error {
if err == nil {
return nil
}
batchErr, ok := err.(BatchError)
if !ok {
return []error{err}
}
return batchErr.Errors
}
// Run runs the process function on each item in the items slice in parallel.
// Only batchSize items are processed at a time.
//
// Parameters:
// - items: the slice of items to process
// - batchSize: the number of items to process at a time
// - handleErrorStrategy: whether to stop processing on the first error or continue processing. Use STOP_ON_ERROR or CONTINUE_ON_ERROR
// - process: the function to run on each item
//
// Returns:
// - a slice of results from the process function
// - an error if any and handleErrorStrategy is STOP_ON_ERROR, or all errors if handleErrorStrategy is CONTINUE_ON_ERROR
func Run[T any, R any](items []T, batchSize int, handleErrorStrategy errorHandler, process func(T) (R, error)) ([]R, error) {
// Create a channel to limit the number of concurrent goroutines
semaphore := make(chan struct{}, batchSize)
// Create a wait group to wait for all goroutines to finish
var wg sync.WaitGroup
// Create a slice to store results
results := make([]R, len(items))
// Create an error channel to capture errors
errChan := make(chan error, len(items))
// Mutex to safely write results and errors
var mu sync.Mutex
var allErrors []error
// Iterate over all items
for i, item := range items {
// Acquire a semaphore slot
semaphore <- struct{}{}
wg.Add(1)
// Start a goroutine for processing the item
go func(i int, item T) {
defer wg.Done()
defer func() { <-semaphore }() // Release semaphore slot when done
// Process the item
result, err := process(item)
if err != nil {
// If handleErrorStrategy is STOP_ON_ERROR, send the first error and return early
if handleErrorStrategy == STOP_ON_ERROR {
select {
case errChan <- err:
default:
}
return
}
// Collect all errors if handleErrorStrategy is CONTINUE_ON_ERROR
mu.Lock()
allErrors = append(allErrors, err)
mu.Unlock()
return
}
// Store result safely
mu.Lock()
results[i] = result
mu.Unlock()
}(i, item)
// If handleErrorStrategy is STOP_ON_ERROR, check if there's an error before continuing
if handleErrorStrategy == STOP_ON_ERROR {
select {
case err := <-errChan:
// If an error occurs, wait for all running goroutines and return early
wg.Wait()
return nil, err
default:
}
}
}
// Wait for all goroutines to finish
wg.Wait()
// If stopOnError is true, check for any errors that may have occurred during processing
if handleErrorStrategy == STOP_ON_ERROR {
select {
case err := <-errChan:
return nil, err
default:
}
}
// If stopOnError is false and there are aggregated errors, return them
if handleErrorStrategy == CONTINUE_ON_ERROR && len(allErrors) > 0 {
return results, aggregateErrors(allErrors)
}
return results, nil
}
// Process runs the process function on each item in the items slice in parallel.
// It is a wrapper around Run that discards the results.
// It should be used when you only care about processing the items and not the results.
// Only batchSize items are processed at a time.
//
// Parameters:
// - items: the slice of items to process
// - batchSize: the number of items to process at a time
// - handleErrorStrategy: whether to stop processing on the first error or continue processing. Use STOP_ON_ERROR or CONTINUE_ON_ERROR
// - process: the function to run on each item
//
// Returns:
// - an error if any and handleErrorStrategy is STOP_ON_ERROR, or all errors if handleErrorStrategy is CONTINUE_ON_ERROR
func Process[T any](items []T, batchSize int, process func(T) error) error {
_, err := Run(items, batchSize, STOP_ON_ERROR, func(item T) (struct{}, error) {
err := process(item)
return struct{}{}, err
})
return err
}
// aggregateErrors combines multiple errors into a single error
func aggregateErrors(errs []error) error {
if len(errs) == 0 {
return nil
}
return BatchError{Errors: errs}
}