-
Notifications
You must be signed in to change notification settings - Fork 415
/
Copy pathmain.go
146 lines (121 loc) · 3.66 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package main
import (
"context"
"errors"
"flag"
"math"
"math/rand"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/components/metrics"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
"github.com/ThreeDotsLabs/watermill/message/router/plugin"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)
var (
metricsAddr = flag.String("metrics", ":8081", "The address that will expose /metrics for Prometheus")
handlerDelay = flag.Float64("delay", 0, "The stdev of normal distribution of delay in handler (in seconds), to simulate load")
logger = watermill.NewStdLogger(true, true)
random = rand.New(rand.NewSource(time.Now().Unix()))
)
func delay() {
seconds := *handlerDelay
if seconds == 0 {
return
}
delay := math.Abs(random.NormFloat64() * seconds)
time.Sleep(time.Duration(float64(time.Second) * delay))
}
// handler publishes 0-4 messages with a random delay.
func handler(msg *message.Message) ([]*message.Message, error) {
delay()
numOutgoing := random.Intn(4)
outgoing := make([]*message.Message, numOutgoing)
for i := 0; i < numOutgoing; i++ {
outgoing[i] = msg.Copy()
}
return outgoing, nil
}
// consumeMessages consumes the messages exiting the handler.
func consumeMessages(subscriber message.Subscriber) {
messages, err := subscriber.Subscribe(context.Background(), "pub_topic")
if err != nil {
panic(err)
}
for msg := range messages {
msg.Ack()
}
}
// produceMessages produces the incoming messages in delays of 50-100 milliseconds.
func produceMessages(routerClosed chan struct{}, publisher message.Publisher) {
for {
select {
case <-routerClosed:
return
default:
// go on
}
time.Sleep(50*time.Millisecond + time.Duration(random.Intn(50))*time.Millisecond)
msg := message.NewMessage(watermill.NewUUID(), []byte{})
_ = publisher.Publish("sub_topic", msg)
}
}
func main() {
flag.Parse()
pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
router, err := message.NewRouter(
message.RouterConfig{},
logger,
)
if err != nil {
panic(err)
}
prometheusRegistry, closeMetricsServer := metrics.CreateRegistryAndServeHTTP(*metricsAddr)
defer closeMetricsServer()
// we leave the namespace and subsystem empty
metricsBuilder := metrics.NewPrometheusMetricsBuilder(prometheusRegistry, "", "")
metricsBuilder.AddPrometheusRouterMetrics(router)
router.AddMiddleware(
middleware.Recoverer,
middleware.RandomFail(0.1),
middleware.RandomPanic(0.1),
)
router.AddPlugin(plugin.SignalsHandler)
router.AddHandler(
"metrics-example",
"sub_topic",
pubSub,
"pub_topic",
pubSub,
handler,
)
pub := randomFailPublisherDecorator{pubSub, 0.1}
// The handler's publisher and subscriber will be decorated by `AddPrometheusRouterMetrics`.
// We are using the same pub/sub to generate messages incoming to the handler
// and consume the outgoing messages.
// They will have `handler_name=<no handler>` label in Prometheus.
subWithMetrics, err := metricsBuilder.DecorateSubscriber(pubSub)
if err != nil {
panic(err)
}
pubWithMetrics, err := metricsBuilder.DecoratePublisher(pub)
if err != nil {
panic(err)
}
routerClosed := make(chan struct{})
go produceMessages(routerClosed, pubWithMetrics)
go consumeMessages(subWithMetrics)
_ = router.Run(context.Background())
close(routerClosed)
}
type randomFailPublisherDecorator struct {
message.Publisher
failProbability float64
}
func (r randomFailPublisherDecorator) Publish(topic string, messages ...*message.Message) error {
if random.Float64() < r.failProbability {
return errors.New("random publishing failure")
}
return r.Publisher.Publish(topic, messages...)
}