From 03ab33289c8cab09a8e11ee2feb6c5338022c2dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Mi=C5=82osz=20Sm=C3=B3=C5=82ka?= Date: Sat, 7 Sep 2019 14:38:20 +0200 Subject: [PATCH] Update simple-app example (#120) --- .../1-your-first-app}/.validate_example.yml | 0 .../1-your-first-app}/README.md | 2 +- .../1-your-first-app}/docker-compose.yml | 0 .../1-your-first-app}/go.mod | 0 .../1-your-first-app}/go.sum | 0 .../1-your-first-app}/main.go | 0 .../.validate_example_publishing.yml | 0 .../.validate_example_subscribing.yml | 0 _examples/basic/2-realtime-feed/README.md | 34 ++++++ .../2-realtime-feed/consumer}/go.mod | 0 .../2-realtime-feed/consumer}/go.sum | 0 .../2-realtime-feed/consumer}/main.go | 58 +++++----- .../2-realtime-feed}/docker-compose.yml | 14 +-- .../2-realtime-feed/producer}/go.mod | 2 +- .../2-realtime-feed/producer}/go.sum | 2 + .../basic/2-realtime-feed/producer/main.go | 106 ++++++++++++++++++ _examples/simple-app/README.md | 17 --- _examples/simple-app/publishing-app/main.go | 103 ----------------- 18 files changed, 182 insertions(+), 156 deletions(-) rename _examples/{your-first-app => basic/1-your-first-app}/.validate_example.yml (100%) rename _examples/{your-first-app => basic/1-your-first-app}/README.md (96%) rename _examples/{your-first-app => basic/1-your-first-app}/docker-compose.yml (100%) rename _examples/{your-first-app => basic/1-your-first-app}/go.mod (100%) rename _examples/{your-first-app => basic/1-your-first-app}/go.sum (100%) rename _examples/{your-first-app => basic/1-your-first-app}/main.go (100%) rename _examples/{simple-app => basic/2-realtime-feed}/.validate_example_publishing.yml (100%) rename _examples/{simple-app => basic/2-realtime-feed}/.validate_example_subscribing.yml (100%) create mode 100644 _examples/basic/2-realtime-feed/README.md rename _examples/{simple-app/subscribing-app => basic/2-realtime-feed/consumer}/go.mod (100%) rename _examples/{simple-app/subscribing-app => basic/2-realtime-feed/consumer}/go.sum (100%) rename _examples/{simple-app/subscribing-app => basic/2-realtime-feed/consumer}/main.go (67%) rename _examples/{simple-app => basic/2-realtime-feed}/docker-compose.yml (83%) rename _examples/{simple-app/publishing-app => basic/2-realtime-feed/producer}/go.mod (71%) rename _examples/{simple-app/publishing-app => basic/2-realtime-feed/producer}/go.sum (98%) create mode 100644 _examples/basic/2-realtime-feed/producer/main.go delete mode 100644 _examples/simple-app/README.md delete mode 100644 _examples/simple-app/publishing-app/main.go diff --git a/_examples/your-first-app/.validate_example.yml b/_examples/basic/1-your-first-app/.validate_example.yml similarity index 100% rename from _examples/your-first-app/.validate_example.yml rename to _examples/basic/1-your-first-app/.validate_example.yml diff --git a/_examples/your-first-app/README.md b/_examples/basic/1-your-first-app/README.md similarity index 96% rename from _examples/your-first-app/README.md rename to _examples/basic/1-your-first-app/README.md index c71ca02d3..770f5b5e8 100644 --- a/_examples/your-first-app/README.md +++ b/_examples/basic/1-your-first-app/README.md @@ -16,7 +16,7 @@ To understand the background and internals, see [getting started guide](https:// ## Requirements -To run this example you will need Docker and docker-compose installed. See installation guide at https://docs.docker.com/compose/install/ +To run this example you will need Docker and docker-compose installed. See the [installation guide](https://docs.docker.com/compose/install/). ## Running diff --git a/_examples/your-first-app/docker-compose.yml b/_examples/basic/1-your-first-app/docker-compose.yml similarity index 100% rename from _examples/your-first-app/docker-compose.yml rename to _examples/basic/1-your-first-app/docker-compose.yml diff --git a/_examples/your-first-app/go.mod b/_examples/basic/1-your-first-app/go.mod similarity index 100% rename from _examples/your-first-app/go.mod rename to _examples/basic/1-your-first-app/go.mod diff --git a/_examples/your-first-app/go.sum b/_examples/basic/1-your-first-app/go.sum similarity index 100% rename from _examples/your-first-app/go.sum rename to _examples/basic/1-your-first-app/go.sum diff --git a/_examples/your-first-app/main.go b/_examples/basic/1-your-first-app/main.go similarity index 100% rename from _examples/your-first-app/main.go rename to _examples/basic/1-your-first-app/main.go diff --git a/_examples/simple-app/.validate_example_publishing.yml b/_examples/basic/2-realtime-feed/.validate_example_publishing.yml similarity index 100% rename from _examples/simple-app/.validate_example_publishing.yml rename to _examples/basic/2-realtime-feed/.validate_example_publishing.yml diff --git a/_examples/simple-app/.validate_example_subscribing.yml b/_examples/basic/2-realtime-feed/.validate_example_subscribing.yml similarity index 100% rename from _examples/simple-app/.validate_example_subscribing.yml rename to _examples/basic/2-realtime-feed/.validate_example_subscribing.yml diff --git a/_examples/basic/2-realtime-feed/README.md b/_examples/basic/2-realtime-feed/README.md new file mode 100644 index 000000000..68db9b0d3 --- /dev/null +++ b/_examples/basic/2-realtime-feed/README.md @@ -0,0 +1,34 @@ +# Realtime Feed + +This example features a very busy blogging platform, with thousands of messages showing up on your feed. + +There are two separate applications (microservices) integrating over a Kafka topic. The [`producer`](producer/) generates +thousands of "posts" and publishes them to the topic. The [`consumer`](consumer/) subscribes to this topic and +displays each post on the standard output. + +The consumer has a throttling middleware enabled, so you have a chance to actually read the posts. + +To understand the background and internals, see [getting started guide](https://watermill.io/docs/getting-started/). + +## Requirements + +To run this example you will need Docker and docker-compose installed. See the [installation guide](https://docs.docker.com/compose/install/). + +## Running + +```bash +docker-compose up +``` + +You should see the live feed of posts on the standard output. + +## Exercises + +1. Peek into the posts counter published on `posts_count` topic. + +``` +docker-compose exec kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic posts_count +``` + +2. Add a persistent storage for incoming posts in the consumer service, instead of displaying them. + Consider using the [SQL Publisher](https://github.com/ThreeDotsLabs/watermill-sql). diff --git a/_examples/simple-app/subscribing-app/go.mod b/_examples/basic/2-realtime-feed/consumer/go.mod similarity index 100% rename from _examples/simple-app/subscribing-app/go.mod rename to _examples/basic/2-realtime-feed/consumer/go.mod diff --git a/_examples/simple-app/subscribing-app/go.sum b/_examples/basic/2-realtime-feed/consumer/go.sum similarity index 100% rename from _examples/simple-app/subscribing-app/go.sum rename to _examples/basic/2-realtime-feed/consumer/go.sum diff --git a/_examples/simple-app/subscribing-app/main.go b/_examples/basic/2-realtime-feed/consumer/main.go similarity index 67% rename from _examples/simple-app/subscribing-app/main.go rename to _examples/basic/2-realtime-feed/consumer/main.go index 7bdc16c91..e693f1c63 100644 --- a/_examples/simple-app/subscribing-app/main.go +++ b/_examples/basic/2-realtime-feed/consumer/main.go @@ -22,23 +22,23 @@ var ( ) func main() { - logger := watermill.NewStdLogger(true, true) + logger := watermill.NewStdLogger(false, false) + logger.Info("Starting the consumer", nil) + pub, err := kafka.NewPublisher(brokers, marshaler, nil, logger) if err != nil { panic(err) } - r, err := message.NewRouter( - message.RouterConfig{}, - logger, - ) + r, err := message.NewRouter(message.RouterConfig{}, logger) if err != nil { panic(err) } - retryMiddleware := middleware.Retry{} - retryMiddleware.MaxRetries = 1 - retryMiddleware.InitialInterval = time.Millisecond * 10 + retryMiddleware := middleware.Retry{ + MaxRetries: 1, + InitialInterval: time.Millisecond * 10, + } poisonQueue, err := middleware.PoisonQueue(pub, "poison_queue") if err != nil { @@ -46,48 +46,49 @@ func main() { } r.AddMiddleware( - // limiting processed messages to 10 per second - middleware.NewThrottle(100, time.Second).Middleware, + // Recoverer middleware recovers panic from handlers and middlewares + middleware.Recoverer, + + // Limit incoming messages to 10 per second + middleware.NewThrottle(10, time.Second).Middleware, - // retry middleware retries message processing if error occurred in handler + // If the retries limit is exceeded (see retryMiddleware below), the message is sent + // to the poison queue (published to poison_queue topic) poisonQueue, - // if retries limit was exceeded, message is sent to poison queue (poison_queue topic) + // Retry middleware retries message processing if an error occurred in the handler retryMiddleware.Middleware, - // recovered recovers panic from handlers - middleware.Recoverer, - - // correlation ID middleware adds to every produced message correlation id of consumed message, - // useful for debugging + // Correlation ID middleware adds the correlation ID of the consumed message to each produced message. + // It's useful for debugging. middleware.CorrelationID, - // simulating error or panic from handler + // Simulate errors or panics from handler middleware.RandomFail(0.01), middleware.RandomPanic(0.01), ) - // close router when SIGTERM is sent + // Close the router when a SIGTERM is sent r.AddPlugin(plugin.SignalsHandler) - // handler which just counts added posts + // Handler that counts consumed posts r.AddHandler( "posts_counter", "posts_published", - createSubscriber("posts_counter_v2", logger), + createSubscriber("posts_counter", logger), "posts_count", pub, PostsCounter{memoryCountStorage{new(int64)}}.Count, ) - // handler which generates "feed" from events post + // Handler that generates "feed" from consumed posts // - // this implementation just prints it to stdout, - // but production ready implementation would save posts to some persistent storage + // This implementation just prints the posts on stdout, + // but production ready implementation would save posts to some persistent storage. r.AddNoPublisherHandler( "feed_generator", "posts_published", - createSubscriber("feed_generator_v2", logger), + createSubscriber("feed_generator", logger), FeedGenerator{printFeedStorage{}}.UpdateFeed, ) @@ -139,7 +140,8 @@ type PostsCounter struct { } func (p PostsCounter) Count(msg *message.Message) ([]*message.Message, error) { - // in production use when implementing counter we probably want to make some kind of deduplication here + // When implementing counter for production use, you'd probably need to add some kind of deduplication here, + // unless the used Pub/Sub supports exactly-once delivery. newCount, err := p.countStorage.CountAdd() if err != nil { @@ -155,7 +157,9 @@ func (p PostsCounter) Count(msg *message.Message) ([]*message.Message, error) { return []*message.Message{message.NewMessage(watermill.NewUUID(), b)}, nil } -// intentionally not importing type from app1, because we don't need all data and we want to avoid coupling +// postAdded might look similar to the postAdded type from producer. +// It's intentionally not imported here. We avoid coupling the services at the cost of duplication. +// We don't need all of it's data either (content is not displayed on the feed). type postAdded struct { OccurredOn time.Time `json:"occurred_on"` Author string `json:"author"` diff --git a/_examples/simple-app/docker-compose.yml b/_examples/basic/2-realtime-feed/docker-compose.yml similarity index 83% rename from _examples/simple-app/docker-compose.yml rename to _examples/basic/2-realtime-feed/docker-compose.yml index 219e45991..b6a44945a 100644 --- a/_examples/simple-app/docker-compose.yml +++ b/_examples/basic/2-realtime-feed/docker-compose.yml @@ -1,25 +1,25 @@ version: '3' services: - publishing: - image: golang:1.11 + producer: + image: golang:1.12 restart: unless-stopped depends_on: - kafka volumes: - .:/app - $GOPATH/pkg/mod:/go/pkg/mod - working_dir: /app/publishing-app/ + working_dir: /app/producer/ command: go run main.go - subscribing: - image: golang:1.11 + consumer: + image: golang:1.12 restart: unless-stopped depends_on: - - kafka + - kafka volumes: - .:/app - $GOPATH/pkg/mod:/go/pkg/mod - working_dir: /app/subscribing-app/ + working_dir: /app/consumer/ command: go run main.go zookeeper: diff --git a/_examples/simple-app/publishing-app/go.mod b/_examples/basic/2-realtime-feed/producer/go.mod similarity index 71% rename from _examples/simple-app/publishing-app/go.mod rename to _examples/basic/2-realtime-feed/producer/go.mod index 754e46cef..34c8e0124 100644 --- a/_examples/simple-app/publishing-app/go.mod +++ b/_examples/basic/2-realtime-feed/producer/go.mod @@ -3,5 +3,5 @@ module main.go require ( github.com/ThreeDotsLabs/watermill v1.0.0-rc.2 github.com/ThreeDotsLabs/watermill-kafka v1.0.1 - github.com/renstrom/shortuuid v3.0.0+incompatible + github.com/brianvoe/gofakeit v3.18.0+incompatible ) diff --git a/_examples/simple-app/publishing-app/go.sum b/_examples/basic/2-realtime-feed/producer/go.sum similarity index 98% rename from _examples/simple-app/publishing-app/go.sum rename to _examples/basic/2-realtime-feed/producer/go.sum index de4127b97..f5b591feb 100644 --- a/_examples/simple-app/publishing-app/go.sum +++ b/_examples/basic/2-realtime-feed/producer/go.sum @@ -13,6 +13,8 @@ github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= +github.com/brianvoe/gofakeit v3.18.0+incompatible h1:wDOmHc9DLG4nRjUVVaxA+CEglKOW72Y5+4WNxUIkjM8= +github.com/brianvoe/gofakeit v3.18.0+incompatible/go.mod h1:kfwdRA90vvNhPutZWfH7WPaDzUjz+CZFqG+rPkOjGOc= github.com/cenkalti/backoff/v3 v3.0.0 h1:ske+9nBpD9qZsTBoF41nW5L+AIuFBKMeze18XQ3eG1c= github.com/cenkalti/backoff/v3 v3.0.0/go.mod h1:cIeZDE3IrqwwJl6VUwCN6trj1oXrTS4rc0ij+ULvLYs= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/_examples/basic/2-realtime-feed/producer/main.go b/_examples/basic/2-realtime-feed/producer/main.go new file mode 100644 index 000000000..6af54baf4 --- /dev/null +++ b/_examples/basic/2-realtime-feed/producer/main.go @@ -0,0 +1,106 @@ +package main + +import ( + "encoding/json" + "fmt" + "math/rand" + "os" + "os/signal" + "sync" + "time" + + "github.com/brianvoe/gofakeit" + + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill-kafka/pkg/kafka" + "github.com/ThreeDotsLabs/watermill/message" + "github.com/ThreeDotsLabs/watermill/message/router/middleware" +) + +var ( + brokers = []string{"kafka:9092"} + + messagesPerSecond = 100 + numWorkers = 20 +) + +func main() { + logger := watermill.NewStdLogger(false, false) + logger.Info("Starting the producer", watermill.LogFields{}) + + rand.Seed(time.Now().Unix()) + + publisher, err := kafka.NewPublisher(brokers, kafka.DefaultMarshaler{}, nil, logger) + if err != nil { + panic(err) + } + defer publisher.Close() + + closeCh := make(chan struct{}) + workersGroup := &sync.WaitGroup{} + workersGroup.Add(numWorkers) + + for i := 0; i < numWorkers; i++ { + go worker(publisher, workersGroup, closeCh) + } + + // wait for SIGINT + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt) + <-c + + // signal for the workers to stop publishing + close(closeCh) + + // Waiting for all messages to be published + workersGroup.Wait() + + logger.Info("All messages published", nil) +} + +// worker publishes messages until closeCh is closed. +func worker(publisher message.Publisher, wg *sync.WaitGroup, closeCh chan struct{}) { + ticker := time.NewTicker(time.Duration(int(time.Second) / messagesPerSecond)) + + for { + select { + case <-closeCh: + ticker.Stop() + wg.Done() + return + + case <-ticker.C: + } + + msgPayload := postAdded{ + OccurredOn: time.Now(), + Author: gofakeit.Username(), + Title: gofakeit.Sentence(rand.Intn(5) + 1), + Content: gofakeit.Sentence(rand.Intn(10) + 5), + } + + payload, err := json.Marshal(msgPayload) + if err != nil { + panic(err) + } + + msg := message.NewMessage(watermill.NewUUID(), payload) + + // Use a middleware to set the correlation ID, it's useful for debugging + middleware.SetCorrelationID(watermill.NewShortUUID(), msg) + err = publisher.Publish("posts_published", msg) + if err != nil { + fmt.Println("cannot publish message:", err) + continue + } + } +} + +type postAdded struct { + OccurredOn time.Time `json:"occurred_on"` + + Author string `json:"author"` + Title string `json:"title"` + + Content string `json:"content"` +} diff --git a/_examples/simple-app/README.md b/_examples/simple-app/README.md deleted file mode 100644 index 034462dbd..000000000 --- a/_examples/simple-app/README.md +++ /dev/null @@ -1,17 +0,0 @@ -# Simple app - -This app has separated publisher - [`publishing-app`](publishing-app/) and [`subscribing-app`](subscribing-app/). - -Before checking the examples, it is recommended to read [getting started guide](https://watermill.io/docs/getting-started/). - -Warning: Subscribing app has throttling middleware enabled. - -## Requirements - -To run this example you will need Docker and docker-compose installed. See installation guide at https://docs.docker.com/compose/install/ - -## Running - -```bash -docker-compose up -``` diff --git a/_examples/simple-app/publishing-app/main.go b/_examples/simple-app/publishing-app/main.go deleted file mode 100644 index 15fb8e48f..000000000 --- a/_examples/simple-app/publishing-app/main.go +++ /dev/null @@ -1,103 +0,0 @@ -package main - -import ( - "encoding/json" - "log" - "math/rand" - "time" - - "github.com/renstrom/shortuuid" - - "github.com/ThreeDotsLabs/watermill" - "github.com/ThreeDotsLabs/watermill-kafka/pkg/kafka" - "github.com/ThreeDotsLabs/watermill/message" - "github.com/ThreeDotsLabs/watermill/message/router/middleware" -) - -var ( - brokers = []string{"kafka:9092"} -) - -func main() { - log.Println("Starting publishing app") - logger := watermill.NewStdLogger(true, true) - - publisher, err := kafka.NewPublisher(brokers, kafka.DefaultMarshaler{}, nil, logger) - if err != nil { - panic(err) - } - defer publisher.Close() - - messagesToAdd := 10000 - workers := 25 - - msgAdded := make(chan struct{}) - allMessagesAdded := make(chan struct{}) - - go func() { - for range msgAdded { - messagesToAdd-- - - if messagesToAdd%1000 == 0 { - log.Println("left ", messagesToAdd) - } - if messagesToAdd == 0 { - allMessagesAdded <- struct{}{} - } - } - }() - - for num := 0; num < workers; num++ { - go func() { - var msgPayload postAdded - var msg *message.Message - - for messagesToAdd > 0 { - msgPayload.OccurredOn = time.Now() - msgPayload.Author = randString(10) - msgPayload.Title = randString(15) - msgPayload.Content = randString(30) - - payload, err := json.Marshal(msgPayload) - if err != nil { - panic(err) - } - - msg = message.NewMessage(watermill.NewUUID(), payload) - - // using function from middleware to set correlation id, useful for debugging - middleware.SetCorrelationID(shortuuid.New(), msg) - - err = publisher.Publish("posts_published", msg) - if err != nil { - log.Println("cannot publish message:", err) - continue - } - msgAdded <- struct{}{} - } - }() - } - - // waiting to all being produced - <-allMessagesAdded -} - -type postAdded struct { - OccurredOn time.Time `json:"occurred_on"` - - Author string `json:"author"` - Title string `json:"title"` - - Content string `json:"content"` -} - -var letters = []rune("abcdefghijklmnopqrstuvwxyz") - -// randString generates random string of len n -func randString(n int) string { - b := make([]rune, n) - for i := range b { - b[i] = letters[rand.Intn(len(letters))] - } - return string(b) -}