Skip to content

Commit cd7fc02

Browse files
committed
feat: Go and GoWait
Go performs a concurrent Effect by spinning up a goroutine for each item. GoWait is like Go but it additionally accepts a sync.WaitGroup to which it reports once the effect has completed.
1 parent 6e6a3de commit cd7fc02

File tree

3 files changed

+71
-2
lines changed

3 files changed

+71
-2
lines changed

go.go

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package fungi
2+
3+
import "sync"
4+
5+
// Go performs a concurrent [Effect] by spinning up a goroutine for each item.
6+
func Go[T any](effect func(T)) StreamIdentity[T] {
7+
return Effect(func(item T) { go effect(item) })
8+
}
9+
10+
// GoWait is like [Go] but it additionally accepts a [sync.WaitGroup] to which
11+
// it reports once the effect has completed.
12+
func GoWait[T any](effect func(T), wg *sync.WaitGroup) StreamIdentity[T] {
13+
return Effect(func(item T) {
14+
wg.Add(1)
15+
go func() { defer wg.Done(); effect(item) }()
16+
})
17+
}

go_test.go

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package fungi
2+
3+
import (
4+
"sync"
5+
"testing"
6+
7+
"github.com/stretchr/testify/require"
8+
)
9+
10+
func TestGo(t *testing.T) {
11+
var wg sync.WaitGroup
12+
var mutex sync.Mutex
13+
14+
total := 0
15+
slice := []int{1, 2, 3, 4, 5}
16+
stream := SliceStream(slice)
17+
18+
wg.Add(5)
19+
count := Go(func(i int) {
20+
defer wg.Done()
21+
mutex.Lock()
22+
defer mutex.Unlock()
23+
total += i
24+
})
25+
26+
consume := Loop(Nop[int])
27+
require.NoError(t, consume(count(stream)))
28+
wg.Wait()
29+
require.Equal(t, 15, total)
30+
}
31+
32+
func TestGoWait(t *testing.T) {
33+
var wg sync.WaitGroup
34+
var mutex sync.Mutex
35+
36+
total := 0
37+
slice := []int{1, 2, 3, 4, 5}
38+
stream := SliceStream(slice)
39+
40+
count := GoWait(func(i int) {
41+
mutex.Lock()
42+
defer mutex.Unlock()
43+
total += i
44+
}, &wg)
45+
46+
consume := Loop(Nop[int])
47+
require.NoError(t, consume(count(stream)))
48+
wg.Wait()
49+
require.Equal(t, 15, total)
50+
}

unique.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ package fungi
33
// Unique returns a stream of unique items. The incoming stream must be a stream
44
// of comparable items.
55
func Unique[T comparable](items Stream[T]) Stream[T] {
6-
return UniqueBy(identity[T])(items)
6+
return UniqueBy(Identity[T])(items)
77
}
88

99
// UniqueBy accepts a stream of any items and returns a stream of unique items.
@@ -35,4 +35,6 @@ func UniqueBy[T any, K comparable](id func(T) K) StreamIdentity[T] {
3535
return StreamIdentity[T](unique)
3636
}
3737

38-
func identity[T any](item T) T { return item }
38+
func Identity[T any](item T) T { return item }
39+
40+
func Nop[T any](_ T) {}

0 commit comments

Comments
 (0)