+
+{{ end }}
+{{ define "footer"}} {{ partial "footer" .}} {{end}}
diff --git a/docs/layouts/partials/footer.html b/docs/layouts/partials/footer.html
new file mode 100644
index 000000000..80ae1b269
--- /dev/null
+++ b/docs/layouts/partials/footer.html
@@ -0,0 +1,14 @@
+
\ No newline at end of file
diff --git a/docs/layouts/shortcodes/collapse-box.html b/docs/layouts/shortcodes/collapse-box.html
new file mode 100644
index 000000000..ed7e6898e
--- /dev/null
+++ b/docs/layouts/shortcodes/collapse-box.html
@@ -0,0 +1,3 @@
+
+ {{ .Inner }}
+
\ No newline at end of file
diff --git a/docs/layouts/shortcodes/collapse-toggle.html b/docs/layouts/shortcodes/collapse-toggle.html
new file mode 100644
index 000000000..d541b5490
--- /dev/null
+++ b/docs/layouts/shortcodes/collapse-toggle.html
@@ -0,0 +1 @@
+
\ No newline at end of file
diff --git a/docs/layouts/shortcodes/collapse.html b/docs/layouts/shortcodes/collapse.html
new file mode 100644
index 000000000..95d838c70
--- /dev/null
+++ b/docs/layouts/shortcodes/collapse.html
@@ -0,0 +1,3 @@
+
+ {{ .Inner }}
+
\ No newline at end of file
diff --git a/docs/layouts/shortcodes/contact-email.html b/docs/layouts/shortcodes/contact-email.html
new file mode 100644
index 000000000..7e80368d6
--- /dev/null
+++ b/docs/layouts/shortcodes/contact-email.html
@@ -0,0 +1,3 @@
+{{/* hack: we cannot use variables directly on the markdown, only shortcodes */}}
+
+{{ $.Page.Site.Params.Email }}
\ No newline at end of file
diff --git a/docs/layouts/shortcodes/highlight.html b/docs/layouts/shortcodes/highlight.html
new file mode 100644
index 000000000..8df83db38
--- /dev/null
+++ b/docs/layouts/shortcodes/highlight.html
@@ -0,0 +1 @@
+{{ highlight .Inner (.Get "language" | default "go") "" }}
\ No newline at end of file
diff --git a/docs/layouts/shortcodes/load-snippet-partial.html b/docs/layouts/shortcodes/load-snippet-partial.html
new file mode 100644
index 000000000..8fa952c88
--- /dev/null
+++ b/docs/layouts/shortcodes/load-snippet-partial.html
@@ -0,0 +1,67 @@
+{{ $file := (.Get "file") }}
+{{ $content := readFile $file }}
+
+{{ $first_line_contains := (.Get "first_line_contains") }}
+{{ $last_line_contains := (.Get "last_line_contains") }}
+
+{{ $show_line := false }}
+
+{{/*if true, first or last line was not found*/}}
+{{ $first_line_found := false}}
+{{ $last_line_found := false}}
+
+{{ $padding_after := (.Get "padding_after" | default "0" | int) }}
+
+{{ $first_line_num := 0 }}
+{{ $last_line_num := 0 }}
+
+{{ $linkFile := $file }}
+{{ if in $file "content/src-link/" }}
+ {{ $linkFile = replace $linkFile "content/src-link/" "" }}
+{{ end }}
+
+{{ $lines := slice }}
+
+{{ range $elem_key, $elem_val := split $content "\n" }}
+ {{ $line_num := (add $elem_key 1) }}
+
+ {{ if and (not $first_line_found) (in $elem_val $first_line_contains) }}
+ {{ if ne $elem_key 0 }}
+ {{ $lines = $lines | append "// ..." }}
+ {{ end }}
+
+ {{ $show_line = true }}
+ {{ $first_line_found = true}}
+ {{ $first_line_num = $line_num }}
+ {{ end }}
+
+ {{ if $show_line }}
+ {{ $lines = $lines | append $elem_val }}
+ {{ end }}
+
+ {{ if and ($first_line_found) (in $elem_val $last_line_contains) (ne $last_line_contains "") }}
+ {{ $last_line_found = true }}
+ {{ end }}
+
+ {{ if and $last_line_found $show_line }}
+ {{ if gt $padding_after 0 }}
+ {{ $padding_after = sub $padding_after 1}}
+ {{ else }}
+ {{ $lines = $lines | append "// ..." }}
+ {{ $show_line = false }}
+ {{ $last_line_num = $line_num }}
+ {{ end }}
+ {{ end }}
+{{ end }}
+
+Full source: [{{ $linkFile }}](https://github.com/ThreeDotsLabs/watermill/tree/master/{{ $linkFile }}{{ if ne $first_line_num 0 }}#L{{ $first_line_num }}{{ end }})
+
+{{ highlight (delimit $lines "\n") (.Get "type" | default "go") "" }}
+
+{{if not $first_line_found }}
+ {{ errorf "`first_line_contains` %s not found in %s snippet" $first_line_contains $file }}
+{{end}}
+
+{{if and (not $last_line_found) (ne $last_line_contains "") }}
+ {{ errorf "`last_line_contains` %s not found in %s snippet" $last_line_contains $file }}
+{{end}}
\ No newline at end of file
diff --git a/docs/layouts/shortcodes/load-snippet.html b/docs/layouts/shortcodes/load-snippet.html
new file mode 100644
index 000000000..032f980c6
--- /dev/null
+++ b/docs/layouts/shortcodes/load-snippet.html
@@ -0,0 +1,26 @@
+{{ $file := (.Get "file") }}
+{{ $content := readFile $file }}
+
+{{ $start_line := (.Get "start_line") | default "0" }}
+{{ $end_line := (.Get "end_line") | default "0" }}
+
+{{ $has_start_line := (ne $start_line "0") }}
+{{ $has_end_line := (ne $end_line "0") }}
+
+{{ $lines := slice }}
+
+{{ $linkFile := $file }}
+{{ if in $file "content/src-link/" }}
+ {{ $linkFile = replace $linkFile "content/src-link/" "" }}
+{{ end }}
+
+Full source: [{{ $linkFile }}](https://github.com/ThreeDotsLabs/watermill/tree/master/{{ $linkFile }})
+
+{{ range $elem_key, $elem_val := split $content "\n" }}
+ {{if and (or (not $has_start_line) (ge (add $elem_key 1) ($start_line | int))) (or (not $has_end_line) (le (add $elem_key 1) ($end_line | int)))}}
+ {{ $lines = $lines | append $elem_val }}
+ {{ end }}
+{{ end }}
+
+
+{{ highlight (delimit $lines "\n") (.Get "type" | default "go") "" }}
diff --git a/docs/layouts/shortcodes/message.html b/docs/layouts/shortcodes/message.html
new file mode 100644
index 000000000..16fc58452
--- /dev/null
+++ b/docs/layouts/shortcodes/message.html
@@ -0,0 +1,4 @@
+
+ {{ if ne (.Get "title") "" }}
{{ .Get "title" }}
{{ end }}
+ {{ .Inner }}
+
\ No newline at end of file
diff --git a/docs/layouts/shortcodes/render-md.html b/docs/layouts/shortcodes/render-md.html
new file mode 100644
index 000000000..825c3f939
--- /dev/null
+++ b/docs/layouts/shortcodes/render-md.html
@@ -0,0 +1,11 @@
+{{/*
+hugo pls, why i just cannot render shortcode as markdown,
+used for:
+
+{{% render-md %}}
+{{% load-snippet-partial (...) %}}
+{{% /render-md %}}
+
+bacause it is rendered as raw text by default
+*/}}
+{{.Inner}}
\ No newline at end of file
diff --git a/docs/layouts/shortcodes/tabs-tab.html b/docs/layouts/shortcodes/tabs-tab.html
new file mode 100644
index 000000000..00885f920
--- /dev/null
+++ b/docs/layouts/shortcodes/tabs-tab.html
@@ -0,0 +1,3 @@
+
+ {{ .Inner }}
+
diff --git a/docs/layouts/shortcodes/tabs.html b/docs/layouts/shortcodes/tabs.html
new file mode 100644
index 000000000..657af6448
--- /dev/null
+++ b/docs/layouts/shortcodes/tabs.html
@@ -0,0 +1,13 @@
+
+
+ {{.Inner}}
+
\ No newline at end of file
diff --git a/docs/static/css/custom.css b/docs/static/css/custom.css
new file mode 100644
index 000000000..b9dd3a09c
--- /dev/null
+++ b/docs/static/css/custom.css
@@ -0,0 +1,24 @@
+pre, pre code {
+ background: #f8f8f8 !important;
+}
+
+#kube-features .row:first-child {
+ border-bottom: none;
+}
+
+.message > p {
+ margin-bottom: 0px;
+}
+
+/* fix for stupid css */
+.my-collapse div {
+ border: none;
+ padding: 0;
+ margin-bottom: 0;
+}
+
+.my-collapse > div.collapse-box {
+ border: 1px solid rgba(0, 0, 0, 0.1);
+ padding: 24px 32px 1px;
+ margin-bottom: 1px;
+}
\ No newline at end of file
diff --git a/docs/static/img/watermill-router.svg b/docs/static/img/watermill-router.svg
new file mode 100644
index 000000000..5b174c939
--- /dev/null
+++ b/docs/static/img/watermill-router.svg
@@ -0,0 +1 @@
+
\ No newline at end of file
diff --git a/log.go b/log.go
index 2630d9919..74ed4db2e 100644
--- a/log.go
+++ b/log.go
@@ -45,7 +45,7 @@ type StdLoggerAdapter struct {
func NewStdLogger(debug, trace bool) LoggerAdapter {
l := log.New(os.Stderr, "[watermill] ", log.LstdFlags|log.Lmicroseconds|log.Lshortfile)
- a := &StdLoggerAdapter{InfoLogger: l}
+ a := &StdLoggerAdapter{InfoLogger: l, ErrorLogger: l}
if debug {
a.DebugLogger = l
@@ -58,7 +58,7 @@ func NewStdLogger(debug, trace bool) LoggerAdapter {
}
func (l *StdLoggerAdapter) Error(msg string, err error, fields LogFields) {
- l.log(l.TraceLogger, "ERROR", msg, fields.Add(LogFields{"err": err}))
+ l.log(l.ErrorLogger, "ERROR", msg, fields.Add(LogFields{"err": err}))
}
func (l *StdLoggerAdapter) Info(msg string, fields LogFields) {
diff --git a/message/infrastructure/gochannel/pubsub.go b/message/infrastructure/gochannel/pubsub.go
index 8b778dbfb..14ab2d7a8 100644
--- a/message/infrastructure/gochannel/pubsub.go
+++ b/message/infrastructure/gochannel/pubsub.go
@@ -16,7 +16,12 @@ type subscriber struct {
outputChannel chan *message.Message
}
-type goChannel struct {
+// GoChannel is the simplest Pub/Sub implementation.
+// It is based on Golang's channels which are sent within the process.
+//
+// GoChannel has no global state,
+// that means that you need to use the same instance for Publishing and Subscribing!
+type GoChannel struct {
sendTimeout time.Duration
buffer int64
@@ -29,7 +34,7 @@ type goChannel struct {
}
func NewGoChannel(buffer int64, logger watermill.LoggerAdapter, sendTimeout time.Duration) message.PubSub {
- return &goChannel{
+ return &GoChannel{
sendTimeout: sendTimeout,
buffer: buffer,
@@ -39,7 +44,11 @@ func NewGoChannel(buffer int64, logger watermill.LoggerAdapter, sendTimeout time
}
}
-func (g *goChannel) Publish(topic string, messages ...*message.Message) error {
+// Publish in GoChannel is blocking until all consumers consume and acknowledge the message.
+// Sending message to one subscriber has timeout equal to GoChannel.sendTimeout configured via constructor.
+//
+// Messages are not persisted. If there are no subscribers and message is produced it will be gone.
+func (g *GoChannel) Publish(topic string, messages ...*message.Message) error {
for _, msg := range messages {
if err := g.sendMessage(topic, msg); err != nil {
return err
@@ -49,7 +58,7 @@ func (g *goChannel) Publish(topic string, messages ...*message.Message) error {
return nil
}
-func (g *goChannel) sendMessage(topic string, message *message.Message) error {
+func (g *GoChannel) sendMessage(topic string, message *message.Message) error {
messageLogFields := watermill.LogFields{
"message_uuid": message.UUID,
}
@@ -92,7 +101,11 @@ func (g *goChannel) sendMessage(topic string, message *message.Message) error {
return nil
}
-func (g *goChannel) Subscribe(topic string) (chan *message.Message, error) {
+// Subscribe returns channel to which all published messages are sent.
+// Messages are not persisted. If there are no subscribers and message is produced it will be gone.
+//
+// There are no consumer groups support etc. Every consumer will receive every produced message.
+func (g *GoChannel) Subscribe(topic string) (chan *message.Message, error) {
g.subscribersLock.Lock()
defer g.subscribersLock.Unlock()
@@ -109,7 +122,7 @@ func (g *goChannel) Subscribe(topic string) (chan *message.Message, error) {
return s.outputChannel, nil
}
-func (g *goChannel) Close() error {
+func (g *GoChannel) Close() error {
g.subscribersLock.Lock()
defer g.subscribersLock.Unlock()
diff --git a/message/infrastructure/http/subscriber.go b/message/infrastructure/http/subscriber.go
index fe74ff5fa..10128a6a0 100644
--- a/message/infrastructure/http/subscriber.go
+++ b/message/infrastructure/http/subscriber.go
@@ -11,6 +11,7 @@ import (
type UnmarshalMessageFunc func(topic string, request *http.Request) (*message.Message, error)
+// Subscriber can subscribe to HTTP requests and create Watermill's messages based on them.
type Subscriber struct {
router chi.Router
server *http.Server
@@ -24,6 +25,13 @@ type Subscriber struct {
closed bool
}
+// NewSubscriber creates new Subscriber.
+//
+// addr is TCP address to listen on
+//
+// unmarshalMessageFunc is function which converts HTTP request to Watermill's message.
+//
+// logger is Watermill's logger.
func NewSubscriber(addr string, unmarshalMessageFunc UnmarshalMessageFunc, logger watermill.LoggerAdapter) (*Subscriber, error) {
r := chi.NewRouter()
s := &http.Server{Addr: addr, Handler: r}
@@ -39,17 +47,23 @@ func NewSubscriber(addr string, unmarshalMessageFunc UnmarshalMessageFunc, logge
}, nil
}
-func (s *Subscriber) Subscribe(topic string) (chan *message.Message, error) {
+// Subscribe adds HTTP handler which will listen in provided url for messages.
+//
+// Subscribe needs to be called before `StartHTTPServer`.
+//
+// When request is sent, it will wait for the `Ack`. When Ack is received 200 HTTP status wil be sent.
+// When Nack is sent, 500 HTTP status will be sent.
+func (s *Subscriber) Subscribe(url string) (chan *message.Message, error) {
messages := make(chan *message.Message)
s.outputChannelsLock.Lock()
s.outputChannels = append(s.outputChannels, messages)
s.outputChannelsLock.Unlock()
- baseLogFields := watermill.LogFields{"topic": topic}
+ baseLogFields := watermill.LogFields{"url": url}
- s.router.Post(topic, func(w http.ResponseWriter, r *http.Request) {
- msg, err := s.unmarshalMessageFunc(topic, r)
+ s.router.Post(url, func(w http.ResponseWriter, r *http.Request) {
+ msg, err := s.unmarshalMessageFunc(url, r)
if err != nil {
s.logger.Info("Cannot unmarshal message", baseLogFields.Add(watermill.LogFields{"err": err}))
w.WriteHeader(http.StatusBadRequest)
diff --git a/message/infrastructure/kafka/marshaler.go b/message/infrastructure/kafka/marshaler.go
index 8b87c598b..9eb08ec7a 100644
--- a/message/infrastructure/kafka/marshaler.go
+++ b/message/infrastructure/kafka/marshaler.go
@@ -6,10 +6,14 @@ import (
"github.com/pkg/errors"
)
+const UUIDHeaderKey = "_watermill_message_uuid"
+
+// Marshaler marshals Watermill's message to Kafka message.
type Marshaler interface {
Marshal(topic string, msg *message.Message) (*confluentKafka.Message, error)
}
+// Unmarshaler unmarshals Kafka's message to Watermill's message.
type Unmarshaler interface {
Unmarshal(*confluentKafka.Message) (*message.Message, error)
}
@@ -19,8 +23,6 @@ type MarshalerUnmarshaler interface {
Unmarshaler
}
-const UUIDHeaderKey = "_watermill_message_uuid"
-
type DefaultMarshaler struct{}
func (DefaultMarshaler) Marshal(topic string, msg *message.Message) (*confluentKafka.Message, error) {
diff --git a/message/infrastructure/kafka/publisher.go b/message/infrastructure/kafka/publisher.go
index 47af1bc62..5bea9554f 100644
--- a/message/infrastructure/kafka/publisher.go
+++ b/message/infrastructure/kafka/publisher.go
@@ -21,7 +21,7 @@ func NewPublisher(brokers []string, marshaler Marshaler, kafkaConfigOverwrite ka
"bootstrap.servers": strings.Join(brokers, ","),
"queue.buffering.max.messages": 10000000,
"queue.buffering.max.kbytes": 2097151,
- "debug": ",",
+ "debug": ",",
}
if err := mergeConfluentConfigs(config, kafkaConfigOverwrite); err != nil {
@@ -40,6 +40,10 @@ func NewCustomPublisher(producer *kafka.Producer, marshaler Marshaler) (message.
return &confluentPublisher{producer, marshaler, false}, nil
}
+// Publish publishes message to Kafka.
+//
+// Publish is blocking and wait for ack from Kafka.
+// When one of messages delivery fails - function is interrupted.
func (p confluentPublisher) Publish(topic string, msgs ...*message.Message) error {
if p.closed {
return errors.New("publisher closed")
diff --git a/message/infrastructure/kafka/subscriber.go b/message/infrastructure/kafka/subscriber.go
index 0fe0791e7..45f9a3c0a 100644
--- a/message/infrastructure/kafka/subscriber.go
+++ b/message/infrastructure/kafka/subscriber.go
@@ -29,15 +29,25 @@ type confluentSubscriber struct {
}
type SubscriberConfig struct {
+ // Kafka brokers list.
Brokers []string
- ConsumerGroup string
+ // Kafka consumer group.
+ ConsumerGroup string
+ // When we want to consume without consumer group, you should set it to true.
+ // In practice you will receive all messages sent to the topic.
NoConsumerGroup bool
+ // Action to take when there is no initial offset in offset store or the desired offset is out of range.
+ // Available options: smallest, earliest, beginning, largest, latest, end, error.
AutoOffsetReset string
+ // How much consumers should be spawned.
+ // Every consumer will receive messages for their own partition so messages order will be preserved.
ConsumersCount int
+ // Passing librdkafka options.
+ // Available options: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
KafkaConfigOverwrite kafka.ConfigMap
}
@@ -130,6 +140,9 @@ func DefaultConfluentConsumerConstructor(config SubscriberConfig) (*kafka.Consum
return kafka.NewConsumer(kafkaConfig)
}
+// Subscribe subscribers for messages in Kafka.
+//
+// They are multiple subscribers spawned
func (s *confluentSubscriber) Subscribe(topic string) (chan *message.Message, error) {
if s.closed {
return nil, errors.New("subscriber closed")
diff --git a/message/message.go b/message/message.go
index f924a5d23..8b087e6dd 100644
--- a/message/message.go
+++ b/message/message.go
@@ -20,16 +20,28 @@ var (
type Payload []byte
type Message struct {
- UUID string // todo - change to []byte?, change to type
-
+ // UUID is an unique identifier of message.
+ //
+ // It is only used by Watermill for debugging.
+ // UUID can be empty.
+ UUID string
+
+ // Metadata contains the message metadata.
+ //
+ // Can be used to store data which doesn't require unmarshaling entire payload.
+ // It is something similar to HTTP request's headers.
Metadata Metadata
+ // Payload is message's payload.
Payload Payload
- ack chan struct{}
- noAck chan struct{}
- ackMutex sync.Mutex
- ackSent ackType
+ // ack is closed, when acknowledge is received.
+ ack chan struct{}
+ // noACk is closed, when negative acknowledge is received.
+ noAck chan struct{}
+
+ ackMutex sync.Mutex
+ ackSentType ackType
}
func NewMessage(uuid string, payload Payload) *Message {
@@ -50,18 +62,23 @@ const (
nack
)
+// Ack sends message's acknowledgement.
+//
+// Ack is not blocking.
+// Ack is idempotent.
+// Error is returned, if Nack is already sent.
func (m *Message) Ack() error {
m.ackMutex.Lock()
defer m.ackMutex.Unlock()
- if m.ackSent == nack {
+ if m.ackSentType == nack {
return ErrAlreadyNacked
}
- if m.ackSent != noAckSent {
+ if m.ackSentType != noAckSent {
return nil
}
- m.ackSent = ack
+ m.ackSentType = ack
if m.noAck == nil {
m.ack = closedchan
} else {
@@ -71,18 +88,23 @@ func (m *Message) Ack() error {
return nil
}
+// Nack sends message's negative acknowledgement.
+//
+// Nack is not blocking.
+// Nack is idempotent.
+// Error is returned, if Ack is already sent.
func (m *Message) Nack() error {
m.ackMutex.Lock()
defer m.ackMutex.Unlock()
- if m.ackSent == ack {
+ if m.ackSentType == ack {
return ErrAlreadyAcked
}
- if m.ackSent != noAckSent {
+ if m.ackSentType != noAckSent {
return nil
}
- m.ackSent = nack
+ m.ackSentType = nack
if m.noAck == nil {
m.noAck = closedchan
@@ -93,10 +115,28 @@ func (m *Message) Nack() error {
return nil
}
+// Acked returns channel which is closed when acknowledgement is sent.
+//
+// Usage:
+// select {
+// case <-message.Acked():
+// // ack received
+// case <-message.Nacked():
+// // nack received
+// }
func (m *Message) Acked() <-chan struct{} {
return m.ack
}
+// Nacked returns channel which is closed when negative acknowledgement is sent.
+//
+// Usage:
+// select {
+// case <-message.Acked():
+// // ack received
+// case <-message.Nacked():
+// // nack received
+// }
func (m *Message) Nacked() <-chan struct{} {
return m.noAck
}
diff --git a/message/publisher.go b/message/publisher.go
index bf3f40e25..c1f553628 100644
--- a/message/publisher.go
+++ b/message/publisher.go
@@ -1,10 +1,18 @@
package message
type publisher interface {
+ // Publish publishes provided messages to given topic.
+ //
+ // Publish can be synchronous or asynchronous - it depends of implementation.
+ //
+ // Most publishers implementations doesn't support atomic publishing of messages.
+ // That means, that when publishing one of messages failed next messages will be not published.
Publish(topic string, messages ...*Message) error
}
type Publisher interface {
publisher
+
+ // Close should flush unsent messages, if publisher is async.
Close() error
}
diff --git a/message/pubsub.go b/message/pubsub.go
index 17881a346..e9d87c6dc 100644
--- a/message/pubsub.go
+++ b/message/pubsub.go
@@ -9,6 +9,10 @@ type PubSub interface {
Close() error
}
+func NewPubSub(publisher Publisher, subscriber Subscriber) PubSub {
+ return pubSub{publisher, subscriber}
+}
+
type pubSub struct {
Publisher
Subscriber
@@ -32,7 +36,3 @@ func (p pubSub) Close() error {
return errors.New(errMsg)
}
-
-func NewPubSub(publisher Publisher, subscriber Subscriber) PubSub {
- return pubSub{publisher, subscriber}
-}
diff --git a/message/router.go b/message/router.go
index 20fa0b4a1..7670ba0c6 100644
--- a/message/router.go
+++ b/message/router.go
@@ -10,13 +10,40 @@ import (
"github.com/pkg/errors"
)
+// HandlerFunc is function called when message is received.
+//
+// msg.Ack() is called automatically when HandlerFunc doesn't return error.
+// When HandlerFunc returns error, msg.Nack() is called.
+// When msg.Ack() was called in handler and HandlerFunc returns error,
+// msg.Nack() will be not sent because Ack was already sent.
+//
+// HandlerFunc's are executed parallel when multiple messages was received
+// (because msg.Ack() was sent in HandlerFunc or Subscriber supports multiple consumers).
type HandlerFunc func(msg *Message) ([]*Message, error)
+// HandlerMiddleware allows us to write something like decorators to HandlerFunc.
+// It can execute something before handler (for example: modify consumed message)
+// or after (modify produced messages, ack/nack on consumed message, handle errors, logging, etc.).
+//
+// It can be attached to the router by using `AddMiddleware` method.
+//
+// Example:
+// func ExampleMiddleware(h message.HandlerFunc) message.HandlerFunc {
+// return func(message *message.Message) ([]*message.Message, error) {
+// fmt.Println("executed before handler")
+// producedMessages, err := h(message)
+// fmt.Println("executed after handler")
+//
+// return producedMessages, err
+// }
+// }
type HandlerMiddleware func(h HandlerFunc) HandlerFunc
+// RouterPlugin is function which is executed on Router start.
type RouterPlugin func(*Router) error
type RouterConfig struct {
+ // CloseTimeout determines how long router should work for handlers when closing.
CloseTimeout time.Duration
}
@@ -94,6 +121,20 @@ func (r *Router) AddPlugin(p ...RouterPlugin) {
r.plugins = append(r.plugins, p...)
}
+// AddHandler adds a new handler.
+//
+// handlerName must be unique. For now, it is used only for debugging.
+//
+// subscribeTopic is a topic from which handler will receive messages.
+//
+// publishTopic is a topic to which router will produce messages retuened by handlerFunc.
+// When handler needs to publish to multiple topics,
+// it is recommended to just inject Publisher to Handler or implement middleware
+// which will catch messages and publish to topic based on metadata for example.
+//
+// pubSub is PubSub from which messages will be consumed and to which created messages will be published.
+// If you have separated Publisher and Subscriber object,
+// you can create PubSub object by calling message.NewPubSub(publisher, subscriber).
func (r *Router) AddHandler(
handlerName string,
subscribeTopic string,
@@ -110,6 +151,15 @@ func (r *Router) AddHandler(
return nil
}
+// AddNoPublisherHandler adds a new handler.
+// This handler cannot return messages.
+// When message is returned it will occur an error and Nack will be sent.
+//
+// handlerName must be unique. For now, it is used only for debugging.
+//
+// subscribeTopic is a topic from which handler will receive messages.
+//
+// subscriber is Subscriber from which messages will be consumed.
func (r *Router) AddNoPublisherHandler(
handlerName string,
subscribeTopic string,
@@ -138,6 +188,10 @@ func (r *Router) AddNoPublisherHandler(
return nil
}
+// Run runs all plugins and handlers and starts subscribing to provided topics.
+// This call is blocking until router is running.
+//
+// To stop Run() you should call Close() on the router.
func (r *Router) Run() (err error) {
if r.isRunning {
return errors.New("router is already running")
@@ -205,7 +259,10 @@ func (r *Router) Run() (err error) {
// Running is closed when router is running.
// In other words: you can wait till router is running using
-// <- r.Running()
+// fmt.Println("Starting router")
+// go r.Run()
+// <- r.Running()
+// fmt.Println("Router is running")
func (r *Router) Running() chan struct{} {
return r.running
}
diff --git a/message/subscriber.go b/message/subscriber.go
index b2324f5db..b3183f0b4 100644
--- a/message/subscriber.go
+++ b/message/subscriber.go
@@ -1,10 +1,17 @@
package message
type subscriber interface {
+ // Subscribe returns output channel with messages from provided topic.
+ // Channel is closed, when Close() was called to the subscriber.
+ //
+ // To receive next message, `Ack()` must be called on the received message.
+ // If message processing was failed and message should be redelivered `Nack()` should be called.
Subscribe(topic string) (chan *Message, error)
}
type Subscriber interface {
subscriber
+
+ // Close closes all subscriptions with their output channels and flush offsets etc. when needed.
Close() error
}
diff --git a/netlify.toml b/netlify.toml
new file mode 100644
index 000000000..61315ea79
--- /dev/null
+++ b/netlify.toml
@@ -0,0 +1,17 @@
+[build]
+ command = "hugo version && ./build.sh"
+ base = "docs/"
+ publish = "docs/public/"
+
+[context.production.environment]
+ HUGO_VERSION = "0.52"
+ HUGO_ENV = "production"
+ HUGO_ENABLEGITINFO = "true"
+
+[context.deploy-preview.environment]
+ HUGO_VERSION = "0.52"
+ HUGO_ENABLEGITINFO = "true"
+
+[context.branch-deploy.environment]
+ HUGO_VERSION = "0.52"
+ HUGO_ENABLEGITINFO = "true"