Skip to content

Commit

Permalink
Update examples
Browse files Browse the repository at this point in the history
  • Loading branch information
m110 committed Oct 29, 2024
1 parent fe1eecd commit acc88ed
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 41 deletions.
7 changes: 1 addition & 6 deletions _examples/real-world-examples/delayed-messages/go.sum
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
github.com/Rican7/retry v0.3.1 h1:scY4IbO8swckzoA/11HgBwaZRJEyY9vaNJshcdhp1Mc=
github.com/Rican7/retry v0.3.1/go.mod h1:CxSDrhAyXmTMeEuRAnArMu1FHu48vtfjLREWqVl7Vw0=
github.com/ThreeDotsLabs/watermill v1.4.0-rc.1.0.20241024100330-cb068b72e948 h1:b8qRHpWtlO94x6dVzSulrO2znSQqz8iYsxUyrdTixHo=
github.com/ThreeDotsLabs/watermill v1.4.0-rc.1.0.20241024100330-cb068b72e948/go.mod h1:lBnrLbxOjeMRgcJbv+UiZr8Ylz8RkJ4m6i/VN/Nk+to=
github.com/ThreeDotsLabs/watermill v1.4.0-rc.2 h1:K62uIAKOkCHTXtAwY+Nj95vyLR0y25UMBsbf/FuWCeQ=
github.com/ThreeDotsLabs/watermill v1.4.0-rc.2/go.mod h1:lBnrLbxOjeMRgcJbv+UiZr8Ylz8RkJ4m6i/VN/Nk+to=
github.com/ThreeDotsLabs/watermill-redisstream v1.4.2 h1:FY6tsBcbhbJpKDOssU4bfybstqY0hQHwiZmVq9qyILQ=
github.com/ThreeDotsLabs/watermill-redisstream v1.4.2/go.mod h1:69++855LyB+ckYDe60PiJLBcUrpckfDE2WwyzuVJRCk=
github.com/ThreeDotsLabs/watermill-sql/v4 v4.0.0-20241024101952-75257d7d0602 h1:CKdW3wb3+C36mMa44DF53KUyM5L6mGOjI3hikBOlAl4=
github.com/ThreeDotsLabs/watermill-sql/v4 v4.0.0-20241024101952-75257d7d0602/go.mod h1:GMWcpauehgI40EeoKPxLnXBWjT7oOm7dJfzk5uU4IOc=
github.com/ThreeDotsLabs/watermill-sql/v4 v4.0.0-20241024102321-584a6f7dab93 h1:KeRk2EG5AtdxfpjqIVPigZqscMvIcy0E2h8k7y38OAE=
github.com/ThreeDotsLabs/watermill-sql/v4 v4.0.0-20241024102321-584a6f7dab93/go.mod h1:GMWcpauehgI40EeoKPxLnXBWjT7oOm7dJfzk5uU4IOc=
github.com/ThreeDotsLabs/watermill-sql/v4 v4.0.0-rc.1 h1:uYfnh1EoqXrzHu+bX/TboRyv4ou+EFcmkC1MABeQ0lI=
github.com/ThreeDotsLabs/watermill-sql/v4 v4.0.0-rc.1/go.mod h1:ttA/lhzSh0YyDkosq1Cgc7IYz6Arrba0jIWfdnON0WA=
github.com/brianvoe/gofakeit/v6 v6.28.0 h1:Xib46XXuQfmlLS2EXRuJpqcw8St6qSZz75OUo0tgAW4=
Expand Down
13 changes: 6 additions & 7 deletions _examples/real-world-examples/delayed-messages/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
stdSQL "database/sql"
"fmt"
"strings"
"time"

"github.com/brianvoe/gofakeit/v6"
Expand Down Expand Up @@ -115,7 +116,7 @@ func main() {
cqrs.NewEventHandler(
"OnOrderPlacedHandler",
func(ctx context.Context, event *OrderPlaced) error {
fmt.Printf("Received order placed from %v\n", event.Customer.Name)
fmt.Printf("💰 Received order from %v <%v>\n", event.Customer.Name, event.Customer.Email)

cmd := SendFeedbackForm{
To: event.Customer.Email,
Expand All @@ -142,10 +143,7 @@ func main() {
cqrs.NewCommandHandler(
"OnSendFeedbackForm",
func(ctx context.Context, cmd *SendFeedbackForm) error {
msg := fmt.Sprintf("Hello %s! It's been a while since you placed your order, how did you like it? Let us know!", cmd.Name)

fmt.Println("Sending feedback form to:", cmd.To)
fmt.Println("\tMessage:", msg)
fmt.Printf("📧 Sending feedback form to %v <%v>\n", cmd.Name, cmd.To)

// In a real world scenario, we would send an email to the customer here

Expand Down Expand Up @@ -188,11 +186,12 @@ func main() {
<-router.Running()

for {
name := gofakeit.FirstName()
e := OrderPlaced{
OrderID: uuid.NewString(),
Customer: Customer{
Name: gofakeit.FirstName(),
Email: gofakeit.Email(),
Name: name,
Email: fmt.Sprintf("%v@%v", strings.ToLower(name), gofakeit.DomainName()),
},
}

Expand Down
7 changes: 1 addition & 6 deletions _examples/real-world-examples/delayed-requeue/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,10 @@ filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
github.com/Rican7/retry v0.3.1 h1:scY4IbO8swckzoA/11HgBwaZRJEyY9vaNJshcdhp1Mc=
github.com/Rican7/retry v0.3.1/go.mod h1:CxSDrhAyXmTMeEuRAnArMu1FHu48vtfjLREWqVl7Vw0=
github.com/ThreeDotsLabs/watermill v1.4.0-rc.1.0.20241024100330-cb068b72e948 h1:b8qRHpWtlO94x6dVzSulrO2znSQqz8iYsxUyrdTixHo=
github.com/ThreeDotsLabs/watermill v1.4.0-rc.1.0.20241024100330-cb068b72e948/go.mod h1:lBnrLbxOjeMRgcJbv+UiZr8Ylz8RkJ4m6i/VN/Nk+to=
github.com/ThreeDotsLabs/watermill v1.4.0-rc.2 h1:K62uIAKOkCHTXtAwY+Nj95vyLR0y25UMBsbf/FuWCeQ=
github.com/ThreeDotsLabs/watermill v1.4.0-rc.2/go.mod h1:lBnrLbxOjeMRgcJbv+UiZr8Ylz8RkJ4m6i/VN/Nk+to=
github.com/ThreeDotsLabs/watermill-redisstream v1.4.2 h1:FY6tsBcbhbJpKDOssU4bfybstqY0hQHwiZmVq9qyILQ=
github.com/ThreeDotsLabs/watermill-redisstream v1.4.2/go.mod h1:69++855LyB+ckYDe60PiJLBcUrpckfDE2WwyzuVJRCk=
github.com/ThreeDotsLabs/watermill-sql/v4 v4.0.0-20241024101952-75257d7d0602 h1:CKdW3wb3+C36mMa44DF53KUyM5L6mGOjI3hikBOlAl4=
github.com/ThreeDotsLabs/watermill-sql/v4 v4.0.0-20241024101952-75257d7d0602/go.mod h1:GMWcpauehgI40EeoKPxLnXBWjT7oOm7dJfzk5uU4IOc=
github.com/ThreeDotsLabs/watermill-sql/v4 v4.0.0-20241024102321-584a6f7dab93 h1:KeRk2EG5AtdxfpjqIVPigZqscMvIcy0E2h8k7y38OAE=
github.com/ThreeDotsLabs/watermill-sql/v4 v4.0.0-20241024102321-584a6f7dab93/go.mod h1:GMWcpauehgI40EeoKPxLnXBWjT7oOm7dJfzk5uU4IOc=
github.com/ThreeDotsLabs/watermill-sql/v4 v4.0.0-rc.1 h1:uYfnh1EoqXrzHu+bX/TboRyv4ou+EFcmkC1MABeQ0lI=
github.com/ThreeDotsLabs/watermill-sql/v4 v4.0.0-rc.1/go.mod h1:ttA/lhzSh0YyDkosq1Cgc7IYz6Arrba0jIWfdnON0WA=
github.com/brianvoe/gofakeit/v6 v6.28.0 h1:Xib46XXuQfmlLS2EXRuJpqcw8St6qSZz75OUo0tgAW4=
Expand Down
37 changes: 18 additions & 19 deletions _examples/real-world-examples/delayed-requeue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"math/rand"
"time"

"github.com/ThreeDotsLabs/watermill/message/router/middleware"

"github.com/brianvoe/gofakeit/v6"
_ "github.com/lib/pq"
"github.com/redis/go-redis/v9"
Expand All @@ -15,8 +17,6 @@ import (
"github.com/ThreeDotsLabs/watermill-redisstream/pkg/redisstream"
"github.com/ThreeDotsLabs/watermill-sql/v4/pkg/sql"
"github.com/ThreeDotsLabs/watermill/components/cqrs"
"github.com/ThreeDotsLabs/watermill/components/delay"
"github.com/ThreeDotsLabs/watermill/components/requeuer"
"github.com/ThreeDotsLabs/watermill/message"
)

Expand All @@ -40,7 +40,12 @@ func main() {
delayedRequeuer, err := sql.NewPostgreSQLDelayedRequeuer(sql.DelayedRequeuerConfig{
DB: db,
Publisher: redisPublisher,
Logger: logger,
DelayOnError: &middleware.DelayOnError{
InitialInterval: 10 * time.Second,
MaxInterval: 3 * time.Minute,
Multiplier: 2,
},
Logger: logger,
})
if err != nil {
panic(err)
Expand Down Expand Up @@ -85,23 +90,13 @@ func main() {
cqrs.NewEventHandler(
"OnOrderPlacedHandler",
func(ctx context.Context, event *OrderPlaced) error {
fmt.Println("Received order placed:", event.OrderID)

msg := cqrs.OriginalMessageFromCtx(ctx)
retries := msg.Metadata.Get(requeuer.RetriesKey)
delayedUntil := msg.Metadata.Get(delay.DelayedUntilKey)
delayedFor := msg.Metadata.Get(delay.DelayedForKey)

if retries != "" {
fmt.Println("\tRetries:", retries)
fmt.Println("\tDelayed until:", delayedUntil)
fmt.Println("\tDelayed for:", delayedFor)
}

if event.OrderID == "" {
fmt.Println("ERROR: Received order placed without order_id")
return fmt.Errorf("empty order_id")
}

fmt.Println("Received order placed:", event.OrderID)

return nil
},
),
Expand All @@ -126,20 +121,24 @@ func main() {

<-router.Running()

i := 0

for {
e := newFakeOrderPlaced()

chance := rand.Intn(10)
if chance < 2 {
i++

if i == 10 {
e.OrderID = ""
i = 0
}

err = eventBus.Publish(context.Background(), e)
if err != nil {
panic(err)
}

time.Sleep(1 * time.Second)
time.Sleep(2 * time.Second)
}
}

Expand Down
10 changes: 7 additions & 3 deletions docs/content/advanced/requeuing-after-error.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@ draft = false
bref = "How to requeue a message after it fails to process"
+++

When a message fails to process (a nack is sent), it usually blocks other messages on the same topic.
If you don't care about the message order, you can requeue the failed message back to the tail of the queue,
and let other messages process.
When a message fails to process (a nack is sent), it usually blocks other messages on the same topic (within the same consumer group or partition).

Depending on your setup, it may be useful to requeue the failed message back to the tail of the queue.

Consider this if:
* You don't care about the order of messages.
* Your system isn't resilient to blocked messages.

## Requeuer

Expand Down

0 comments on commit acc88ed

Please sign in to comment.