Skip to content

Commit

Permalink
Update docs (#73)
Browse files Browse the repository at this point in the history
  • Loading branch information
roblaszczak authored May 14, 2019
1 parent acb918e commit 5befc85
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 24 deletions.
15 changes: 6 additions & 9 deletions RELEASE-PROCEDURE.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
# Release procedure

1. [ ] - generate clean go.mod: `make generate_gomod`
2. [ ] - commit && push to master
3. [ ] - update and validate examples: `make validate_examples`
4. [ ] - update missing documentation
5. [ ] - commit && push to master
6. [ ] - add breaking changes to `UPGRADE-[new-version].md`
7. [ ] - commit && push to master
8. [ ] - wait for `master` CI build
9. [ ] - [add release in GitHub](https://github.com/ThreeDotsLabs/watermill/releases)
1. Generate clean go.mod: `make generate_gomod`
2. Update and validate examples: `make validate_examples`
3. Update missing documentation
4. Check snippets in documentation (sometimes `first_line_contains` or `last_line_contains` can change position and load too much)
5. Add breaking changes to `UPGRADE-[new-version].md`
6. [Add release in GitHub](https://github.com/ThreeDotsLabs/watermill/releases)
50 changes: 46 additions & 4 deletions UPGRADE-0.4.md
Original file line number Diff line number Diff line change
@@ -1,23 +1,65 @@
# UPGRADE FROM 0.3.x to 0.4

# `watermill/components/cqrs`
## `watermill/components/cqrs`

### `CommandHandler.HandlerName` and `EventHandler.HandlerName` was added to the interface.

If you are using metrics component, you may want to keep backward capability with handler names. In other case you can implement your own method of generating handler name.
If you are using metrics component, you may want to keep backward capability with handler names. In other cases, you can implement your own method of generating handler name.

Keeping backward capability for **event handlers**:

```
func (h CommandHandler) HandlerName() string {
return fmt.Sprintf("command_processor-%s", h)
return fmt.Sprintf("command_processor-%s", h)
}
```

Keeping backward capability for **command handlers**:

```
func (h EventHandler) HandlerName() string {
return fmt.Sprintf("event_processor-%s", ObjectName(h))
return fmt.Sprintf("event_processor-%s", ObjectName(h))
}
```

### Added `CommandsSubscriberConstructor` and `EventsSubscriberConstructor`

From now on, `CommandsSubscriberConstructor` and `EventsSubscriberConstructor` are passed to constructors in CQRS component.

They allow creating customized subscribers for every handler. For usage examples please check [_examples/cqrs-protobuf](_examples/cqrs-protobuf).


### Added context to `CommandHandler.Handle`, `CommandBus.Send`, `EventHandler.Handle` and `EventBus.Send`

Added missing context, which is passed to Publish function and handlers.

### Other

- `NewCommandProcessor` and `NewEventProcessor` now return an error instead of panic
- `DuplicateCommandHandlerError` is returned instead of panic when two handlers are handling the same command
- `CommandProcessor.routerHandlerFunc` and `EventProcessor.routerHandlerFunc` are now private
- using `GenerateCommandsTopic` and `GenerateEventsTopic` functions instead of constant topic to allow more flexibility


## `watermill/message/infrastructure/amqp`

### `Config.QueueBindConfig.RoutingKey` was replaced with `GenerateRoutingKey`

For backward compatibility, when using the constant value you should use a function:


```
func(topic string) string {
return "routing_key"
}
```


## `message/router/middleware`

- `PoisonQueue` is now `PoisonQueue(pub message.Publisher, topic string) (message.HandlerMiddleware, error)`, not a struct


## `message/router.go`

- From now on, when all handlers are stopped, the router will also stop (`TestRouter_stop_when_all_handlers_stopped` test)
3 changes: 2 additions & 1 deletion docs/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ if [[ ! -d themes/kube ]]; then
mkdir -p themes/kube && pushd themes/kube
git init
git remote add origin https://github.com/jeblister/kube
git fetch --depth 1 origin 0e5397b788dce3f428aeced1cd30aa309927a2c5
git fetch --depth 1 origin bda578df413e441fb24e4f5f751d2b15b7efb53a
git checkout FETCH_HEAD
popd
fi
Expand Down Expand Up @@ -36,6 +36,7 @@ else
"message/infrastructure/amqp/subscriber.go"
"message/infrastructure/amqp/config.go"
"message/infrastructure/amqp/marshaler.go"
"message/infrastructure/amqp/topology_builder.go"
"message/infrastructure/io/publisher.go"
"message/infrastructure/io/subscriber.go"
"message/infrastructure/io/marshal.go"
Expand Down
10 changes: 8 additions & 2 deletions docs/content/docs/pub-sub-implementations.md
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,8 @@ Example:
{{% load-snippet-partial file="content/docs/getting-started/nats-streaming/main.go" first_line_contains="subscriber, err :=" last_line_contains="panic(err)" padding_after="1" %}}
{{% /render-md %}}

You can also use `NewStreamingSubscriberWithStanConn` and `NewStreamingPublisherWithStanConn` to use a custom `stan.Conn` created by `NewStanConnection`.

#### Publishing

{{% render-md %}}
Expand Down Expand Up @@ -394,7 +396,6 @@ For detailed configuration description, please check [message/infrastructure/amq

TLS config can be passed to `Config.TLSConfig`.


##### Connecting

{{% render-md %}}
Expand Down Expand Up @@ -436,9 +437,14 @@ AMQP doesn't provide mechanism like Kafka's "consumer groups". You can still ach
{{% load-snippet-partial file="content/docs/snippets/amqp-consumer-groups/main.go" first_line_contains="func createSubscriber(" last_line_contains="go process(\"subscriber_2\", messages2)" %}}
{{% /render-md %}}


In this example both `pubSub1` and `pubSub2` will receive some messages independently.

#### AMQP `TopologyBuilder`

{{% render-md %}}
{{% load-snippet-partial file="content/src-link/message/infrastructure/amqp/topology_builder.go" first_line_contains="// TopologyBuilder" last_line_contains="}" padding_after="0" %}}
{{% /render-md %}}

### io.Writer/io.Reader

This is an experimental Pub/Sub implementation that leverages the [standard library's](https://golang.org/pkg/io/) `io.Writer` and `io.Reader` interfaces as sources of Publisher and Subscriber, respectively.
Expand Down
8 changes: 4 additions & 4 deletions message/infrastructure/amqp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func NewDurablePubSubConfig(amqpURI string, generateQueueName QueueNameGenerator
PrefetchCount: 1,
},
},
TopologyBuilder:&DefaultTopologyBuilder{},
TopologyBuilder: &DefaultTopologyBuilder{},
}
}

Expand Down Expand Up @@ -103,7 +103,7 @@ func NewNonDurablePubSubConfig(amqpURI string, generateQueueName QueueNameGenera
PrefetchCount: 1,
},
},
TopologyBuilder:&DefaultTopologyBuilder{},
TopologyBuilder: &DefaultTopologyBuilder{},
}
}

Expand Down Expand Up @@ -149,7 +149,7 @@ func NewDurableQueueConfig(amqpURI string) Config {
PrefetchCount: 1,
},
},
TopologyBuilder:&DefaultTopologyBuilder{},
TopologyBuilder: &DefaultTopologyBuilder{},
}
}

Expand Down Expand Up @@ -193,7 +193,7 @@ func NewNonDurableQueueConfig(amqpURI string) Config {
PrefetchCount: 1,
},
},
TopologyBuilder:&DefaultTopologyBuilder{},
TopologyBuilder: &DefaultTopologyBuilder{},
}
}

Expand Down
2 changes: 1 addition & 1 deletion message/infrastructure/amqp/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
type Subscriber struct {
*connectionWrapper

config Config
config Config
}

func NewSubscriber(config Config, logger watermill.LoggerAdapter) (*Subscriber, error) {
Expand Down
13 changes: 10 additions & 3 deletions message/infrastructure/amqp/topology_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,20 @@ import (
"github.com/streadway/amqp"
)

// TopologyBuilder is responsible for declaring exchange, queues and queues binding.
//
// Default TopologyBuilder is DefaultTopologyBuilder.
// If you need custom built topology, you should implement your own TopologyBuilder and pass it to the amqp.Config:
//
// config := NewDurablePubSubConfig()
// config.TopologyBuilder = MyProCustomBuilder{}
//
type TopologyBuilder interface {
BuildTopology(channel *amqp.Channel, queueName string, exchangeName string, config Config, logger watermill.LoggerAdapter) error
ExchangeDeclare(channel *amqp.Channel, exchangeName string, config Config) error
}

type DefaultTopologyBuilder struct {
}
type DefaultTopologyBuilder struct{}

func (builder DefaultTopologyBuilder) ExchangeDeclare(channel *amqp.Channel, exchangeName string, config Config) error {
return channel.ExchangeDeclare(
Expand All @@ -26,7 +33,7 @@ func (builder DefaultTopologyBuilder) ExchangeDeclare(channel *amqp.Channel, exc
)
}

func (builder *DefaultTopologyBuilder) BuildTopology(channel *amqp.Channel, queueName string, exchangeName string, config Config, logger watermill.LoggerAdapter) error {
func (builder *DefaultTopologyBuilder) BuildTopology(channel *amqp.Channel, queueName string, exchangeName string, config Config, logger watermill.LoggerAdapter) error {
if _, err := channel.QueueDeclare(
queueName,
config.Queue.Durable,
Expand Down
2 changes: 2 additions & 0 deletions message/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,8 @@ func (r *Router) AddNoPublisherHandler(
// When all handlers have stopped (for example, because subscriptions were closed), the router will also stop.
//
// To stop Run() you should call Close() on the router.
//
// When all handlers are stopped (for example: because of closed connection), Run() will be also stopped.
func (r *Router) Run() (err error) {
if r.isRunning {
return errors.New("router is already running")
Expand Down

0 comments on commit 5befc85

Please sign in to comment.