Skip to content

Commit

Permalink
Update simple-app example (#120)
Browse files Browse the repository at this point in the history
  • Loading branch information
m110 authored Sep 7, 2019
1 parent 37c68a6 commit 03ab332
Show file tree
Hide file tree
Showing 18 changed files with 182 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ To understand the background and internals, see [getting started guide](https://

## Requirements

To run this example you will need Docker and docker-compose installed. See installation guide at https://docs.docker.com/compose/install/
To run this example you will need Docker and docker-compose installed. See the [installation guide](https://docs.docker.com/compose/install/).

## Running

Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
34 changes: 34 additions & 0 deletions _examples/basic/2-realtime-feed/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Realtime Feed

This example features a very busy blogging platform, with thousands of messages showing up on your feed.

There are two separate applications (microservices) integrating over a Kafka topic. The [`producer`](producer/) generates
thousands of "posts" and publishes them to the topic. The [`consumer`](consumer/) subscribes to this topic and
displays each post on the standard output.

The consumer has a throttling middleware enabled, so you have a chance to actually read the posts.

To understand the background and internals, see [getting started guide](https://watermill.io/docs/getting-started/).

## Requirements

To run this example you will need Docker and docker-compose installed. See the [installation guide](https://docs.docker.com/compose/install/).

## Running

```bash
docker-compose up
```

You should see the live feed of posts on the standard output.

## Exercises

1. Peek into the posts counter published on `posts_count` topic.

```
docker-compose exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic posts_count
```

2. Add a persistent storage for incoming posts in the consumer service, instead of displaying them.
Consider using the [SQL Publisher](https://github.com/ThreeDotsLabs/watermill-sql).
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -22,72 +22,73 @@ var (
)

func main() {
logger := watermill.NewStdLogger(true, true)
logger := watermill.NewStdLogger(false, false)
logger.Info("Starting the consumer", nil)

pub, err := kafka.NewPublisher(brokers, marshaler, nil, logger)
if err != nil {
panic(err)
}

r, err := message.NewRouter(
message.RouterConfig{},
logger,
)
r, err := message.NewRouter(message.RouterConfig{}, logger)
if err != nil {
panic(err)
}

retryMiddleware := middleware.Retry{}
retryMiddleware.MaxRetries = 1
retryMiddleware.InitialInterval = time.Millisecond * 10
retryMiddleware := middleware.Retry{
MaxRetries: 1,
InitialInterval: time.Millisecond * 10,
}

poisonQueue, err := middleware.PoisonQueue(pub, "poison_queue")
if err != nil {
panic(err)
}

r.AddMiddleware(
// limiting processed messages to 10 per second
middleware.NewThrottle(100, time.Second).Middleware,
// Recoverer middleware recovers panic from handlers and middlewares
middleware.Recoverer,

// Limit incoming messages to 10 per second
middleware.NewThrottle(10, time.Second).Middleware,

// retry middleware retries message processing if error occurred in handler
// If the retries limit is exceeded (see retryMiddleware below), the message is sent
// to the poison queue (published to poison_queue topic)
poisonQueue,

// if retries limit was exceeded, message is sent to poison queue (poison_queue topic)
// Retry middleware retries message processing if an error occurred in the handler
retryMiddleware.Middleware,

// recovered recovers panic from handlers
middleware.Recoverer,

// correlation ID middleware adds to every produced message correlation id of consumed message,
// useful for debugging
// Correlation ID middleware adds the correlation ID of the consumed message to each produced message.
// It's useful for debugging.
middleware.CorrelationID,

// simulating error or panic from handler
// Simulate errors or panics from handler
middleware.RandomFail(0.01),
middleware.RandomPanic(0.01),
)

// close router when SIGTERM is sent
// Close the router when a SIGTERM is sent
r.AddPlugin(plugin.SignalsHandler)

// handler which just counts added posts
// Handler that counts consumed posts
r.AddHandler(
"posts_counter",
"posts_published",
createSubscriber("posts_counter_v2", logger),
createSubscriber("posts_counter", logger),
"posts_count",
pub,
PostsCounter{memoryCountStorage{new(int64)}}.Count,
)

// handler which generates "feed" from events post
// Handler that generates "feed" from consumed posts
//
// this implementation just prints it to stdout,
// but production ready implementation would save posts to some persistent storage
// This implementation just prints the posts on stdout,
// but production ready implementation would save posts to some persistent storage.
r.AddNoPublisherHandler(
"feed_generator",
"posts_published",
createSubscriber("feed_generator_v2", logger),
createSubscriber("feed_generator", logger),
FeedGenerator{printFeedStorage{}}.UpdateFeed,
)

Expand Down Expand Up @@ -139,7 +140,8 @@ type PostsCounter struct {
}

func (p PostsCounter) Count(msg *message.Message) ([]*message.Message, error) {
// in production use when implementing counter we probably want to make some kind of deduplication here
// When implementing counter for production use, you'd probably need to add some kind of deduplication here,
// unless the used Pub/Sub supports exactly-once delivery.

newCount, err := p.countStorage.CountAdd()
if err != nil {
Expand All @@ -155,7 +157,9 @@ func (p PostsCounter) Count(msg *message.Message) ([]*message.Message, error) {
return []*message.Message{message.NewMessage(watermill.NewUUID(), b)}, nil
}

// intentionally not importing type from app1, because we don't need all data and we want to avoid coupling
// postAdded might look similar to the postAdded type from producer.
// It's intentionally not imported here. We avoid coupling the services at the cost of duplication.
// We don't need all of it's data either (content is not displayed on the feed).
type postAdded struct {
OccurredOn time.Time `json:"occurred_on"`
Author string `json:"author"`
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
version: '3'
services:
publishing:
image: golang:1.11
producer:
image: golang:1.12
restart: unless-stopped
depends_on:
- kafka
volumes:
- .:/app
- $GOPATH/pkg/mod:/go/pkg/mod
working_dir: /app/publishing-app/
working_dir: /app/producer/
command: go run main.go

subscribing:
image: golang:1.11
consumer:
image: golang:1.12
restart: unless-stopped
depends_on:
- kafka
- kafka
volumes:
- .:/app
- $GOPATH/pkg/mod:/go/pkg/mod
working_dir: /app/subscribing-app/
working_dir: /app/consumer/
command: go run main.go

zookeeper:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ module main.go
require (
github.com/ThreeDotsLabs/watermill v1.0.0-rc.2
github.com/ThreeDotsLabs/watermill-kafka v1.0.1
github.com/renstrom/shortuuid v3.0.0+incompatible
github.com/brianvoe/gofakeit v3.18.0+incompatible
)
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuy
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/brianvoe/gofakeit v3.18.0+incompatible h1:wDOmHc9DLG4nRjUVVaxA+CEglKOW72Y5+4WNxUIkjM8=
github.com/brianvoe/gofakeit v3.18.0+incompatible/go.mod h1:kfwdRA90vvNhPutZWfH7WPaDzUjz+CZFqG+rPkOjGOc=
github.com/cenkalti/backoff/v3 v3.0.0 h1:ske+9nBpD9qZsTBoF41nW5L+AIuFBKMeze18XQ3eG1c=
github.com/cenkalti/backoff/v3 v3.0.0/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down
106 changes: 106 additions & 0 deletions _examples/basic/2-realtime-feed/producer/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package main

import (
"encoding/json"
"fmt"
"math/rand"
"os"
"os/signal"
"sync"
"time"

"github.com/brianvoe/gofakeit"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-kafka/pkg/kafka"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/router/middleware"
)

var (
brokers = []string{"kafka:9092"}

messagesPerSecond = 100
numWorkers = 20
)

func main() {
logger := watermill.NewStdLogger(false, false)
logger.Info("Starting the producer", watermill.LogFields{})

rand.Seed(time.Now().Unix())

publisher, err := kafka.NewPublisher(brokers, kafka.DefaultMarshaler{}, nil, logger)
if err != nil {
panic(err)
}
defer publisher.Close()

closeCh := make(chan struct{})
workersGroup := &sync.WaitGroup{}
workersGroup.Add(numWorkers)

for i := 0; i < numWorkers; i++ {
go worker(publisher, workersGroup, closeCh)
}

// wait for SIGINT
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
<-c

// signal for the workers to stop publishing
close(closeCh)

// Waiting for all messages to be published
workersGroup.Wait()

logger.Info("All messages published", nil)
}

// worker publishes messages until closeCh is closed.
func worker(publisher message.Publisher, wg *sync.WaitGroup, closeCh chan struct{}) {
ticker := time.NewTicker(time.Duration(int(time.Second) / messagesPerSecond))

for {
select {
case <-closeCh:
ticker.Stop()
wg.Done()
return

case <-ticker.C:
}

msgPayload := postAdded{
OccurredOn: time.Now(),
Author: gofakeit.Username(),
Title: gofakeit.Sentence(rand.Intn(5) + 1),
Content: gofakeit.Sentence(rand.Intn(10) + 5),
}

payload, err := json.Marshal(msgPayload)
if err != nil {
panic(err)
}

msg := message.NewMessage(watermill.NewUUID(), payload)

// Use a middleware to set the correlation ID, it's useful for debugging
middleware.SetCorrelationID(watermill.NewShortUUID(), msg)
err = publisher.Publish("posts_published", msg)
if err != nil {
fmt.Println("cannot publish message:", err)
continue
}
}
}

type postAdded struct {
OccurredOn time.Time `json:"occurred_on"`

Author string `json:"author"`
Title string `json:"title"`

Content string `json:"content"`
}
17 changes: 0 additions & 17 deletions _examples/simple-app/README.md

This file was deleted.

Loading

0 comments on commit 03ab332

Please sign in to comment.