-
Notifications
You must be signed in to change notification settings - Fork 415
/
Copy pathmain.go
135 lines (111 loc) · 4.07 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
// Sources for https://watermill.io/docs/getting-started/
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
"github.com/ThreeDotsLabs/watermill/message/router/plugin"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
)
var (
// For this example, we're using just a simple logger implementation,
// You probably want to ship your own implementation of `watermill.LoggerAdapter`.
logger = watermill.NewStdLogger(false, false)
)
func main() {
router, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}
// SignalsHandler will gracefully shutdown Router when SIGTERM is received.
// You can also close the router by just calling `r.Close()`.
router.AddPlugin(plugin.SignalsHandler)
// Router level middleware are executed for every message sent to the router
router.AddMiddleware(
// CorrelationID will copy the correlation id from the incoming message's metadata to the produced messages
middleware.CorrelationID,
// The handler function is retried if it returns an error.
// After MaxRetries, the message is Nacked and it's up to the PubSub to resend it.
middleware.Retry{
MaxRetries: 3,
InitialInterval: time.Millisecond * 100,
Logger: logger,
}.Middleware,
// Recoverer handles panics from handlers.
// In this case, it passes them as errors to the Retry middleware.
middleware.Recoverer,
)
// For simplicity, we are using the gochannel Pub/Sub here,
// You can replace it with any Pub/Sub implementation, it will work the same.
pubSub := gochannel.NewGoChannel(gochannel.Config{}, logger)
// Producing some incoming messages in background
go publishMessages(pubSub)
// AddHandler returns a handler which can be used to add handler level middleware
// or to stop handler.
handler := router.AddHandler(
"struct_handler", // handler name, must be unique
"incoming_messages_topic", // topic from which we will read events
pubSub,
"outgoing_messages_topic", // topic to which we will publish events
pubSub,
structHandler{}.Handler,
)
// Handler level middleware is only executed for a specific handler
// Such middleware can be added the same way the router level ones
handler.AddMiddleware(func(h message.HandlerFunc) message.HandlerFunc {
return func(message *message.Message) ([]*message.Message, error) {
log.Println("executing handler specific middleware for ", message.UUID)
return h(message)
}
})
// just for debug, we are printing all messages received on `incoming_messages_topic`
router.AddNoPublisherHandler(
"print_incoming_messages",
"incoming_messages_topic",
pubSub,
printMessages,
)
// just for debug, we are printing all events sent to `outgoing_messages_topic`
router.AddNoPublisherHandler(
"print_outgoing_messages",
"outgoing_messages_topic",
pubSub,
printMessages,
)
// Now that all handlers are registered, we're running the Router.
// Run is blocking while the router is running.
ctx := context.Background()
if err := router.Run(ctx); err != nil {
panic(err)
}
}
func publishMessages(publisher message.Publisher) {
for {
msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!"))
middleware.SetCorrelationID(watermill.NewUUID(), msg)
log.Printf("sending message %s, correlation id: %s\n", msg.UUID, middleware.MessageCorrelationID(msg))
if err := publisher.Publish("incoming_messages_topic", msg); err != nil {
panic(err)
}
time.Sleep(time.Second)
}
}
func printMessages(msg *message.Message) error {
fmt.Printf(
"\n> Received message: %s\n> %s\n> metadata: %v\n\n",
msg.UUID, string(msg.Payload), msg.Metadata,
)
return nil
}
type structHandler struct {
// we can add some dependencies here
}
func (s structHandler) Handler(msg *message.Message) ([]*message.Message, error) {
log.Println("structHandler received message", msg.UUID)
msg = message.NewMessage(watermill.NewUUID(), []byte("message produced by structHandler"))
return message.Messages{msg}, nil
}