-
Notifications
You must be signed in to change notification settings - Fork 0
/
multi.go
75 lines (51 loc) · 1.28 KB
/
multi.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package messenger
import (
"context"
"fmt"
)
type MultiDeliveryAgent struct {
DeliveryAgent
agents []DeliveryAgent
}
func NewMultiDeliveryAgentWithURIs(ctx context.Context, agent_uris ...string) (DeliveryAgent, error) {
agents := make([]DeliveryAgent, len(agent_uris))
for idx, uri := range agent_uris {
agent, err := NewDeliveryAgent(ctx, uri)
if err != nil {
return nil, fmt.Errorf("Failed to create agent for %s, %w", uri, err)
}
agents[idx] = agent
}
return NewMultiDeliveryAgent(ctx, agents...)
}
func NewMultiDeliveryAgent(ctx context.Context, agents ...DeliveryAgent) (DeliveryAgent, error) {
a := &MultiDeliveryAgent{
agents: agents,
}
return a, nil
}
func (a *MultiDeliveryAgent) DeliverMessage(ctx context.Context, msg *Message) error {
done_ch := make(chan bool)
err_ch := make(chan error)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
for _, agent := range a.agents {
go func(agent DeliveryAgent) {
err := agent.DeliverMessage(ctx, msg)
if err != nil {
err_ch <- fmt.Errorf("Failed to deliver message for %T agent, %w", agent, err)
}
done_ch <- true
}(agent)
}
remaining := len(a.agents)
for remaining > 0 {
select {
case <-done_ch:
remaining -= 1
case err := <-err_ch:
return err
}
}
return nil
}