-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbuilder.go
96 lines (81 loc) · 2.04 KB
/
builder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package stream
import (
"context"
"github.com/go-park/stream/support/collections"
"github.com/go-park/stream/support/function"
"github.com/go-park/stream/support/routine"
"golang.org/x/exp/constraints"
)
func Builder[T any]() builder[T] {
return builder[T]{}
}
type builder[T any] struct {
iter collections.Iterator[T]
reusable bool
parallel bool
}
func (b builder[T]) Source(t ...T) builder[T] {
b.iter = collections.IterableSlice(t...)
return b
}
func (b builder[T]) iterator(iter collections.Iterator[T]) builder[T] {
b.iter = iter
return b
}
func (b builder[T]) Parallel() builder[T] {
b.parallel = true
return b
}
func (b builder[T]) Simple() Stream[T] {
return b.buildSimple()
}
func (b builder[T]) Build() Stream[T] {
return b.buildFast()
}
func (b builder[T]) buildSimple() SimplePipline[T] {
target := make(chan T)
ctx, cancelFn := context.WithCancel(context.Background())
routine.Run(func() {
defer cancelFn()
defer close(target)
for b.iter.HasNext() {
select {
case <-ctx.Done():
return
default:
target <- b.iter.Next()
}
}
})
return SimplePipline[T]{upstream: target, cancel: cancelFn, parallel: b.parallel}
}
func (b builder[T]) buildFast() Stream[T] {
_, cancelFn := context.WithCancel(context.Background())
return &FastPipline[T]{source: b.iter, opWrapper: defultOpWrapper[T], cancel: cancelFn}
}
func defultOpWrapper[T any](down function.Consumer[T]) function.Consumer[T] {
return func(t T) {
down.Accept(t)
}
}
func FromMap[M ~map[K]V, K comparable, V any](m M) Stream[collections.Entry[K, V]] {
return From(collections.GetEntrySet(m)...)
}
func From[T any](list ...T) Stream[T] {
return Builder[T]().Source(list...).Build()
}
func Range[T constraints.Integer](start, end T) Stream[T] {
var iter collections.Iterator[T] = collections.Iterable(
func() (func() bool, function.Supplier[T]) {
hasNext := func() bool {
return start <= end
}
next := func() T {
cur := start
start++
return cur
}
return hasNext, next
})
return Builder[T]().iterator(iter).Build()
}