Skip to content

Commit c908b4f

Browse files
committed
small adjustments
1 parent e2719cd commit c908b4f

File tree

10 files changed

+201
-35
lines changed

10 files changed

+201
-35
lines changed

patterns/fan-in-out/efficient/main.go

+64
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
package main
2+
3+
import (
4+
"concurrency/patterns/fanin"
5+
"fmt"
6+
"math/rand"
7+
"runtime"
8+
"time"
9+
10+
"concurrency/patterns/generators"
11+
)
12+
13+
var (
14+
take = generators.Take
15+
toInt = generators.ToInt
16+
repeatFn = generators.RepeatFn
17+
)
18+
19+
func main() {
20+
done := make(chan struct{})
21+
defer close(done)
22+
random := func() interface{} {
23+
return rand.Intn(50000000)
24+
}
25+
start := time.Now()
26+
randIntStream := toInt(done, repeatFn(done, random))
27+
numFinders := runtime.NumCPU()
28+
finders := make([]<-chan int, numFinders)
29+
for i := 0; i < numFinders; i++ {
30+
finders[i] = toInt(done, primeFinder(done, randIntStream))
31+
}
32+
33+
fmt.Println("primes:")
34+
for prime := range take(done, fanin.FanIn(done, finders...), 10) {
35+
fmt.Println("prime:", prime)
36+
}
37+
fmt.Printf("search took: %v", time.Since(start))
38+
}
39+
40+
func primeFinder(done <-chan struct{}, intStream <-chan int) <-chan interface{} {
41+
primeStream := make(chan interface{})
42+
go func() {
43+
defer close(primeStream)
44+
for integer := range intStream {
45+
integer -= 1
46+
prime := true
47+
for divisor := integer - 1; divisor > 1; divisor-- {
48+
if integer%divisor == 0 {
49+
prime = false
50+
break
51+
}
52+
}
53+
54+
if prime {
55+
select {
56+
case <-done:
57+
return
58+
case primeStream <- integer:
59+
}
60+
}
61+
}
62+
}()
63+
return primeStream
64+
}
+57
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"math/rand"
6+
"time"
7+
8+
"concurrency/patterns/generators"
9+
)
10+
11+
var (
12+
take = generators.Take
13+
toInt = generators.ToInt
14+
repeatFn = generators.RepeatFn
15+
)
16+
17+
func main() {
18+
done := make(chan struct{})
19+
defer close(done)
20+
random := func() interface{} {
21+
return rand.Intn(50000000)
22+
}
23+
start := time.Now()
24+
25+
randIntStream := toInt(done, repeatFn(done, random))
26+
fmt.Println("primes:")
27+
for prime := range take(done, primeFinder(done, randIntStream), 10) {
28+
fmt.Println("prime:", prime)
29+
}
30+
fmt.Printf("search took: %v", time.Since(start))
31+
}
32+
33+
func primeFinder(done <-chan struct{}, intStream <-chan int) <-chan interface{} {
34+
primeStream := make(chan interface{})
35+
go func() {
36+
defer close(primeStream)
37+
for integer := range intStream {
38+
integer -= 1
39+
prime := true
40+
for divisor := integer - 1; divisor > 1; divisor-- {
41+
if integer%divisor == 0 {
42+
prime = false
43+
break
44+
}
45+
}
46+
47+
if prime {
48+
select {
49+
case <-done:
50+
return
51+
case primeStream <- integer:
52+
}
53+
}
54+
}
55+
}()
56+
return primeStream
57+
}

patterns/fan-in-out/main.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,15 @@ var (
1515
)
1616

1717
func main() {
18+
done := make(chan struct{})
19+
defer close(done)
20+
1821
odd := intGen(20)
19-
c1, c2, c3 := fanOut(odd), fanOut(odd), fanOut(odd)
20-
for v := range fanIn(c1, c2, c3) {
22+
c1 := fanOut(done, odd)
23+
c2 := fanOut(done, odd)
24+
c3 := fanOut(done, odd)
25+
26+
for v := range fanIn(done, c1, c2, c3) {
2127
fmt.Println("value:", v)
2228
}
2329
}

patterns/fanin/cmd/main.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,14 @@ import (
88
)
99

1010
func main() {
11+
done := make(chan struct{})
12+
defer close(done)
13+
1114
odd := generator.OddIntGen(5)
1215
even := generator.EvenIntGen(5)
1316
hex := generator.HexIntGen(5)
14-
out := fanin.FanIn(odd, even, hex)
17+
out := fanin.FanIn(done, odd, even, hex)
18+
1519
for n := range out {
1620
fmt.Println("fanned number:", n)
1721
}

patterns/fanin/fanin.go

+16-8
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,33 @@
11
package fanin
22

3+
import (
4+
"sync"
5+
)
6+
37
// FanIn reads from multiple channels and writes into 1 final channel
48
// The FAN-IN aka Multiplexing pattern states that a function receives multiple channels as inputs
59
// It reads each input and sends all the values into 1 final output channel
6-
func FanIn(inputs ...<-chan int) <-chan int {
10+
func FanIn(done chan struct{}, inputs ...<-chan int) <-chan int {
711
out := make(chan int)
8-
done := make(chan struct{})
12+
var wg sync.WaitGroup
13+
wg.Add(len(inputs))
14+
915
for _, in := range inputs {
1016
go func(numbers <-chan int) {
17+
defer wg.Done()
1118
for n := range numbers {
12-
out <- n
19+
select {
20+
case <-done:
21+
return
22+
case out <- n:
23+
}
1324
}
14-
done <- struct{}{}
1525
}(in)
1626
}
1727
go func() {
18-
for i := 0; i < len(inputs); i++ {
19-
<-done
20-
}
21-
close(done)
28+
wg.Wait()
2229
close(out)
2330
}()
31+
2432
return out
2533
}

patterns/fanout/cmd/main.go

+8-4
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,15 @@ import (
88
)
99

1010
func main() {
11+
done := make(chan struct{})
12+
defer close(done)
13+
1114
odd := generator.OddIntGen(10)
12-
c1 := fanout.FanOut(odd)
13-
c2 := fanout.FanOut(odd)
14-
c3 := fanout.FanOut(odd)
15-
c4 := fanout.FanOut(odd)
15+
c1 := fanout.FanOut(done, odd)
16+
c2 := fanout.FanOut(done, odd)
17+
c3 := fanout.FanOut(done, odd)
18+
c4 := fanout.FanOut(done, odd)
19+
1620
display(c1, "c1")
1721
display(c2, "c2")
1822
display(c3, "c3")

patterns/fanout/fanout.go

+6-2
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,15 @@ package fanout
44
// The FAN-OUT pattern states that multiple invocation of this function
55
// will generate multiple go routines to read from the same channel till the input channel is closed
66
// This allows for better work distribution
7-
func FanOut(input <-chan int) <-chan int {
7+
func FanOut(done chan struct{}, input <-chan int) <-chan int {
88
out := make(chan int)
99
go func() {
1010
for v := range input {
11-
out <- v
11+
select {
12+
case <-done:
13+
return
14+
case out <- v:
15+
}
1216
}
1317
close(out)
1418
}()

patterns/pipeline/cmd/main.go

+8-5
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,23 @@ var (
1414
)
1515

1616
func main() {
17+
done := make(chan struct{})
18+
defer close(done)
19+
1720
// 1, 2, 3
18-
nums := gen(1, 2, 3)
21+
nums := gen(done, 1, 2, 3)
1922
// 2, 3, 4
20-
incremented := inc(nums)
23+
incremented := inc(done, nums)
2124
// 4, 9, 16
22-
squared := sq(incremented)
25+
squared := sq(done, incremented)
2326
// 3, 8, 15
24-
res := dec(squared)
27+
res := dec(done, squared)
2528
for n := range res {
2629
fmt.Println(n)
2730
}
2831

2932
fmt.Println("the same exact result using nested calls")
30-
for n := range dec(sq(inc(gen(1, 2, 3)))) {
33+
for n := range dec(done, sq(done, inc(done, gen(done, 1, 2, 3)))) {
3134
fmt.Println(n)
3235
}
3336

patterns/pipeline/pipeline.go

+25-9
Original file line numberDiff line numberDiff line change
@@ -44,49 +44,65 @@ func (p *IntPipeline) Square() *IntPipeline {
4444
return p
4545
}
4646

47-
func (p *IntPipeline) Result() chan int {
47+
func (p *IntPipeline) Result() <-chan int {
4848
close(p.out)
4949
return p.out
5050
}
5151

52-
func Gen(vs ...int) chan int {
52+
func Gen(done chan struct{}, vs ...int) <-chan int {
5353
out := make(chan int)
5454
go func() {
5555
for _, n := range vs {
56-
out <- n
56+
select {
57+
case <-done:
58+
return
59+
case out <- n:
60+
}
5761
}
5862
close(out)
5963
}()
6064
return out
6165
}
6266

63-
func Inc(in <-chan int) chan int {
67+
func Inc(done chan struct{}, in <-chan int) <-chan int {
6468
out := make(chan int)
6569
go func() {
6670
for i := range in {
67-
out <- i + 1
71+
select {
72+
case <-done:
73+
return
74+
case out <- i + 1:
75+
}
6876
}
6977
close(out)
7078
}()
7179
return out
7280
}
7381

74-
func Dec(in <-chan int) chan int {
82+
func Dec(done chan struct{}, in <-chan int) <-chan int {
7583
out := make(chan int)
7684
go func() {
7785
for i := range in {
78-
out <- i - 1
86+
select {
87+
case <-done:
88+
return
89+
case out <- i - 1:
90+
}
7991
}
8092
close(out)
8193
}()
8294
return out
8395
}
8496

85-
func Sq(in <-chan int) chan int {
97+
func Sq(done chan struct{}, in <-chan int) <-chan int {
8698
out := make(chan int)
8799
go func() {
88100
for i := range in {
89-
out <- i * i
101+
select {
102+
case <-done:
103+
return
104+
case out <- i * i:
105+
}
90106
}
91107
close(out)
92108
}()

patterns/tee-channel/main.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,16 @@ func main() {
1616
done := make(chan struct{})
1717
defer close(done)
1818

19-
nums := pipeline.Gen(1, 2, 3, 4)
19+
nums := pipeline.Gen(done, 1, 2, 3, 4)
2020
out := tee(done, nums, 3)
2121

22-
for v := range pipeline.Inc(out[0]) {
22+
for v := range pipeline.Inc(done, out[0]) {
2323
fmt.Println("incremented:", v)
2424
}
25-
for v := range pipeline.Dec(out[1]) {
25+
for v := range pipeline.Dec(done, out[1]) {
2626
fmt.Println("decremented:", v)
2727
}
28-
for v := range pipeline.Sq(out[2]) {
28+
for v := range pipeline.Sq(done, out[2]) {
2929
fmt.Println("squared:", v)
3030
}
3131
}

0 commit comments

Comments
 (0)