diff --git a/RELEASE-PROCEDURE.md b/RELEASE-PROCEDURE.md index 0a42c8b91..95a580bf9 100644 --- a/RELEASE-PROCEDURE.md +++ b/RELEASE-PROCEDURE.md @@ -1,11 +1,8 @@ # Release procedure -1. [ ] - generate clean go.mod: `make generate_gomod` -2. [ ] - commit && push to master -3. [ ] - update and validate examples: `make validate_examples` -4. [ ] - update missing documentation -5. [ ] - commit && push to master -6. [ ] - add breaking changes to `UPGRADE-[new-version].md` -7. [ ] - commit && push to master -8. [ ] - wait for `master` CI build -9. [ ] - [add release in GitHub](https://github.com/ThreeDotsLabs/watermill/releases) +1. Generate clean go.mod: `make generate_gomod` +2. Update and validate examples: `make validate_examples` +3. Update missing documentation +4. Check snippets in documentation (sometimes `first_line_contains` or `last_line_contains` can change position and load too much) +5. Add breaking changes to `UPGRADE-[new-version].md` +6. [Add release in GitHub](https://github.com/ThreeDotsLabs/watermill/releases) diff --git a/UPGRADE-0.4.md b/UPGRADE-0.4.md index 6d8a02823..d38f738d7 100644 --- a/UPGRADE-0.4.md +++ b/UPGRADE-0.4.md @@ -1,16 +1,16 @@ # UPGRADE FROM 0.3.x to 0.4 -# `watermill/components/cqrs` +## `watermill/components/cqrs` ### `CommandHandler.HandlerName` and `EventHandler.HandlerName` was added to the interface. -If you are using metrics component, you may want to keep backward capability with handler names. In other case you can implement your own method of generating handler name. +If you are using metrics component, you may want to keep backward capability with handler names. In other cases, you can implement your own method of generating handler name. Keeping backward capability for **event handlers**: ``` func (h CommandHandler) HandlerName() string { - return fmt.Sprintf("command_processor-%s", h) + return fmt.Sprintf("command_processor-%s", h) } ``` @@ -18,6 +18,48 @@ Keeping backward capability for **command handlers**: ``` func (h EventHandler) HandlerName() string { - return fmt.Sprintf("event_processor-%s", ObjectName(h)) + return fmt.Sprintf("event_processor-%s", ObjectName(h)) } ``` + +### Added `CommandsSubscriberConstructor` and `EventsSubscriberConstructor` + +From now on, `CommandsSubscriberConstructor` and `EventsSubscriberConstructor` are passed to constructors in CQRS component. + +They allow creating customized subscribers for every handler. For usage examples please check [_examples/cqrs-protobuf](_examples/cqrs-protobuf). + + +### Added context to `CommandHandler.Handle`, `CommandBus.Send`, `EventHandler.Handle` and `EventBus.Send` + +Added missing context, which is passed to Publish function and handlers. + +### Other + +- `NewCommandProcessor` and `NewEventProcessor` now return an error instead of panic +- `DuplicateCommandHandlerError` is returned instead of panic when two handlers are handling the same command +- `CommandProcessor.routerHandlerFunc` and `EventProcessor.routerHandlerFunc` are now private +- using `GenerateCommandsTopic` and `GenerateEventsTopic` functions instead of constant topic to allow more flexibility + + +## `watermill/message/infrastructure/amqp` + +### `Config.QueueBindConfig.RoutingKey` was replaced with `GenerateRoutingKey` + +For backward compatibility, when using the constant value you should use a function: + + +``` +func(topic string) string { + return "routing_key" +} +``` + + +## `message/router/middleware` + +- `PoisonQueue` is now `PoisonQueue(pub message.Publisher, topic string) (message.HandlerMiddleware, error)`, not a struct + + +## `message/router.go` + +- From now on, when all handlers are stopped, the router will also stop (`TestRouter_stop_when_all_handlers_stopped` test) diff --git a/docs/build.sh b/docs/build.sh index 3f8fe9a15..efe05e57d 100755 --- a/docs/build.sh +++ b/docs/build.sh @@ -4,7 +4,7 @@ if [[ ! -d themes/kube ]]; then mkdir -p themes/kube && pushd themes/kube git init git remote add origin https://github.com/jeblister/kube - git fetch --depth 1 origin 0e5397b788dce3f428aeced1cd30aa309927a2c5 + git fetch --depth 1 origin bda578df413e441fb24e4f5f751d2b15b7efb53a git checkout FETCH_HEAD popd fi @@ -36,6 +36,7 @@ else "message/infrastructure/amqp/subscriber.go" "message/infrastructure/amqp/config.go" "message/infrastructure/amqp/marshaler.go" + "message/infrastructure/amqp/topology_builder.go" "message/infrastructure/io/publisher.go" "message/infrastructure/io/subscriber.go" "message/infrastructure/io/marshal.go" diff --git a/docs/content/docs/pub-sub-implementations.md b/docs/content/docs/pub-sub-implementations.md index 24d87693e..7191b1620 100644 --- a/docs/content/docs/pub-sub-implementations.md +++ b/docs/content/docs/pub-sub-implementations.md @@ -334,6 +334,8 @@ Example: {{% load-snippet-partial file="content/docs/getting-started/nats-streaming/main.go" first_line_contains="subscriber, err :=" last_line_contains="panic(err)" padding_after="1" %}} {{% /render-md %}} +You can also use `NewStreamingSubscriberWithStanConn` and `NewStreamingPublisherWithStanConn` to use a custom `stan.Conn` created by `NewStanConnection`. + #### Publishing {{% render-md %}} @@ -394,7 +396,6 @@ For detailed configuration description, please check [message/infrastructure/amq TLS config can be passed to `Config.TLSConfig`. - ##### Connecting {{% render-md %}} @@ -436,9 +437,14 @@ AMQP doesn't provide mechanism like Kafka's "consumer groups". You can still ach {{% load-snippet-partial file="content/docs/snippets/amqp-consumer-groups/main.go" first_line_contains="func createSubscriber(" last_line_contains="go process(\"subscriber_2\", messages2)" %}} {{% /render-md %}} - In this example both `pubSub1` and `pubSub2` will receive some messages independently. +#### AMQP `TopologyBuilder` + +{{% render-md %}} +{{% load-snippet-partial file="content/src-link/message/infrastructure/amqp/topology_builder.go" first_line_contains="// TopologyBuilder" last_line_contains="}" padding_after="0" %}} +{{% /render-md %}} + ### io.Writer/io.Reader This is an experimental Pub/Sub implementation that leverages the [standard library's](https://golang.org/pkg/io/) `io.Writer` and `io.Reader` interfaces as sources of Publisher and Subscriber, respectively. diff --git a/message/infrastructure/amqp/config.go b/message/infrastructure/amqp/config.go index cc131b5c2..7a4168603 100644 --- a/message/infrastructure/amqp/config.go +++ b/message/infrastructure/amqp/config.go @@ -57,7 +57,7 @@ func NewDurablePubSubConfig(amqpURI string, generateQueueName QueueNameGenerator PrefetchCount: 1, }, }, - TopologyBuilder:&DefaultTopologyBuilder{}, + TopologyBuilder: &DefaultTopologyBuilder{}, } } @@ -103,7 +103,7 @@ func NewNonDurablePubSubConfig(amqpURI string, generateQueueName QueueNameGenera PrefetchCount: 1, }, }, - TopologyBuilder:&DefaultTopologyBuilder{}, + TopologyBuilder: &DefaultTopologyBuilder{}, } } @@ -149,7 +149,7 @@ func NewDurableQueueConfig(amqpURI string) Config { PrefetchCount: 1, }, }, - TopologyBuilder:&DefaultTopologyBuilder{}, + TopologyBuilder: &DefaultTopologyBuilder{}, } } @@ -193,7 +193,7 @@ func NewNonDurableQueueConfig(amqpURI string) Config { PrefetchCount: 1, }, }, - TopologyBuilder:&DefaultTopologyBuilder{}, + TopologyBuilder: &DefaultTopologyBuilder{}, } } diff --git a/message/infrastructure/amqp/subscriber.go b/message/infrastructure/amqp/subscriber.go index 7c2ecec74..72e681c42 100644 --- a/message/infrastructure/amqp/subscriber.go +++ b/message/infrastructure/amqp/subscriber.go @@ -15,7 +15,7 @@ import ( type Subscriber struct { *connectionWrapper - config Config + config Config } func NewSubscriber(config Config, logger watermill.LoggerAdapter) (*Subscriber, error) { diff --git a/message/infrastructure/amqp/topology_builder.go b/message/infrastructure/amqp/topology_builder.go index 999bf30cf..536387b2c 100644 --- a/message/infrastructure/amqp/topology_builder.go +++ b/message/infrastructure/amqp/topology_builder.go @@ -6,13 +6,20 @@ import ( "github.com/streadway/amqp" ) +// TopologyBuilder is responsible for declaring exchange, queues and queues binding. +// +// Default TopologyBuilder is DefaultTopologyBuilder. +// If you need custom built topology, you should implement your own TopologyBuilder and pass it to the amqp.Config: +// +// config := NewDurablePubSubConfig() +// config.TopologyBuilder = MyProCustomBuilder{} +// type TopologyBuilder interface { BuildTopology(channel *amqp.Channel, queueName string, exchangeName string, config Config, logger watermill.LoggerAdapter) error ExchangeDeclare(channel *amqp.Channel, exchangeName string, config Config) error } -type DefaultTopologyBuilder struct { -} +type DefaultTopologyBuilder struct{} func (builder DefaultTopologyBuilder) ExchangeDeclare(channel *amqp.Channel, exchangeName string, config Config) error { return channel.ExchangeDeclare( @@ -26,7 +33,7 @@ func (builder DefaultTopologyBuilder) ExchangeDeclare(channel *amqp.Channel, exc ) } -func (builder *DefaultTopologyBuilder) BuildTopology(channel *amqp.Channel, queueName string, exchangeName string, config Config, logger watermill.LoggerAdapter) error { +func (builder *DefaultTopologyBuilder) BuildTopology(channel *amqp.Channel, queueName string, exchangeName string, config Config, logger watermill.LoggerAdapter) error { if _, err := channel.QueueDeclare( queueName, config.Queue.Durable, diff --git a/message/router.go b/message/router.go index 0fb43a383..5a9fdb0e9 100644 --- a/message/router.go +++ b/message/router.go @@ -239,6 +239,8 @@ func (r *Router) AddNoPublisherHandler( // When all handlers have stopped (for example, because subscriptions were closed), the router will also stop. // // To stop Run() you should call Close() on the router. +// +// When all handlers are stopped (for example: because of closed connection), Run() will be also stopped. func (r *Router) Run() (err error) { if r.isRunning { return errors.New("router is already running")