diff --git a/_examples/pubsubs/aws-sns/.validate_example.yml b/_examples/pubsubs/aws-sns/.validate_example.yml new file mode 100644 index 000000000..3688c1f21 --- /dev/null +++ b/_examples/pubsubs/aws-sns/.validate_example.yml @@ -0,0 +1,4 @@ +validation_cmd: "docker compose up" +teardown_cmd: "docker compose down" +timeout: 120 +expected_output: "A received message: [0-9a-f\\-]+, payload: Hello, world!" diff --git a/_examples/pubsubs/aws-sns/docker-compose.yml b/_examples/pubsubs/aws-sns/docker-compose.yml new file mode 100644 index 000000000..fa75e2c2e --- /dev/null +++ b/_examples/pubsubs/aws-sns/docker-compose.yml @@ -0,0 +1,24 @@ +services: + server: + image: golang:1.23 + restart: unless-stopped + volumes: + - .:/app + - $GOPATH/pkg/mod:/go/pkg/mod + working_dir: /app + command: go run main.go + + localstack: + image: localstack/localstack:latest + environment: + - SERVICES=sqs,sns + - AWS_DEFAULT_REGION=us-east-1 + - EDGE_PORT=4566 + ports: + - "4566-4597:4566-4597" + healthcheck: + test: awslocal sqs list-queues && awslocal sns list-topics + interval: 5s + timeout: 5s + retries: 5 + start_period: 30s diff --git a/_examples/pubsubs/aws-sns/go.mod b/_examples/pubsubs/aws-sns/go.mod new file mode 100644 index 000000000..f6245a5e0 --- /dev/null +++ b/_examples/pubsubs/aws-sns/go.mod @@ -0,0 +1,23 @@ +module main + +require ( + github.com/ThreeDotsLabs/watermill v1.3.7 + github.com/ThreeDotsLabs/watermill-aws v1.0.0-rc.2 + github.com/aws/aws-sdk-go-v2 v1.30.4 + github.com/aws/aws-sdk-go-v2/service/sns v1.31.4 + github.com/aws/aws-sdk-go-v2/service/sqs v1.34.4 + github.com/aws/smithy-go v1.20.4 + github.com/samber/lo v1.47.0 +) + +require ( + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/lithammer/shortuuid/v3 v3.0.7 // indirect + github.com/oklog/ulid v1.3.1 // indirect + github.com/pkg/errors v0.9.1 // indirect + golang.org/x/text v0.16.0 // indirect +) + +go 1.21 diff --git a/_examples/pubsubs/aws-sns/go.sum b/_examples/pubsubs/aws-sns/go.sum new file mode 100644 index 000000000..5d1d2ffc4 --- /dev/null +++ b/_examples/pubsubs/aws-sns/go.sum @@ -0,0 +1,59 @@ +github.com/ThreeDotsLabs/watermill v1.3.7 h1:NV0PSTmuACVEOV4dMxRnmGXrmbz8U83LENOvpHekN7o= +github.com/ThreeDotsLabs/watermill v1.3.7/go.mod h1:lBnrLbxOjeMRgcJbv+UiZr8Ylz8RkJ4m6i/VN/Nk+to= +github.com/ThreeDotsLabs/watermill-aws v1.0.0-rc.2 h1:5eyROGg3sIB4eZGEkDjbjE/TXgqCVpR1yTGAWp9zxbQ= +github.com/ThreeDotsLabs/watermill-aws v1.0.0-rc.2/go.mod h1:TTht9EHVmVF5iR5joom6VymqgkwcuxsRQfLBc5ITy28= +github.com/aws/aws-sdk-go-v2 v1.30.4 h1:frhcagrVNrzmT95RJImMHgabt99vkXGslubDaDagTk8= +github.com/aws/aws-sdk-go-v2 v1.30.4/go.mod h1:CT+ZPWXbYrci8chcARI3OmI/qgd+f6WtuLOoaIA8PR0= +github.com/aws/aws-sdk-go-v2/config v1.27.28 h1:OTxWGW/91C61QlneCtnD62NLb4W616/NM1jA8LhJqbg= +github.com/aws/aws-sdk-go-v2/config v1.27.28/go.mod h1:uzVRVtJSU5EFv6Fu82AoVFKozJi2ZCY6WRCXj06rbvs= +github.com/aws/aws-sdk-go-v2/credentials v1.17.28 h1:m8+AHY/ND8CMHJnPoH7PJIRakWGa4gbfbxuY9TGTUXM= +github.com/aws/aws-sdk-go-v2/credentials v1.17.28/go.mod h1:6TF7dSc78ehD1SL6KpRIPKMA1GyyWflIkjqg+qmf4+c= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.12 h1:yjwoSyDZF8Jth+mUk5lSPJCkMC0lMy6FaCD51jm6ayE= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.12/go.mod h1:fuR57fAgMk7ot3WcNQfb6rSEn+SUffl7ri+aa8uKysI= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16 h1:TNyt/+X43KJ9IJJMjKfa3bNTiZbUP7DeCxfbTROESwY= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16/go.mod h1:2DwJF39FlNAUiX5pAc0UNeiz16lK2t7IaFcm0LFHEgc= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16 h1:jYfy8UPmd+6kJW5YhY0L1/KftReOGxI/4NtVSTh9O/I= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16/go.mod h1:7ZfEPZxkW42Afq4uQB8H2E2e6ebh6mXTueEpYzjCzcs= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvKgqdiXoTxAF4HQcQ= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.4 h1:KypMCbLPPHEmf9DgMGw51jMj77VfGPAN2Kv4cfhlfgI= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.4/go.mod h1:Vz1JQXliGcQktFTN/LN6uGppAIRoLBR2bMvIMP0gOjc= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.18 h1:tJ5RnkHCiSH0jyd6gROjlJtNwov0eGYNz8s8nFcR0jQ= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.18/go.mod h1:++NHzT+nAF7ZPrHPsA+ENvsXkOO8wEu+C6RXltAG4/c= +github.com/aws/aws-sdk-go-v2/service/sns v1.31.4 h1:Bwb1nTBy6jrLJgSlI+jLt27rjyS1Kg030X5yWPnTecI= +github.com/aws/aws-sdk-go-v2/service/sns v1.31.4/go.mod h1:wDacBq+NshhM8KhdysbM4wRFxVyghyj7AAI+l8+o9f0= +github.com/aws/aws-sdk-go-v2/service/sqs v1.34.4 h1:FXPO72iKC5YmYNEANltl763bUj8A6qT20wx8Jwvxlsw= +github.com/aws/aws-sdk-go-v2/service/sqs v1.34.4/go.mod h1:7idt3XszF6sE9WPS1GqZRiDJOxw4oPtlRBXodWnCGjU= +github.com/aws/aws-sdk-go-v2/service/sso v1.22.5 h1:zCsFCKvbj25i7p1u94imVoO447I/sFv8qq+lGJhRN0c= +github.com/aws/aws-sdk-go-v2/service/sso v1.22.5/go.mod h1:ZeDX1SnKsVlejeuz41GiajjZpRSWR7/42q/EyA/QEiM= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.5 h1:SKvPgvdvmiTWoi0GAJ7AsJfOz3ngVkD/ERbs5pUnHNI= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.5/go.mod h1:20sz31hv/WsPa3HhU3hfrIet2kxM4Pe0r20eBZ20Tac= +github.com/aws/aws-sdk-go-v2/service/sts v1.30.4 h1:iAckBT2OeEK/kBDyN/jDtpEExhjeeA/Im2q4X0rJZT8= +github.com/aws/aws-sdk-go-v2/service/sts v1.30.4/go.mod h1:vmSqFK+BVIwVpDAGZB3CoCXHzurt4qBE8lf+I/kRTh0= +github.com/aws/smithy-go v1.20.4 h1:2HK1zBdPgRbjFOHlfeQZfpC4r72MOb9bZkiFwggKO+4= +github.com/aws/smithy-go v1.20.4/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= +github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8= +github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= +github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= +github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc= +github.com/samber/lo v1.47.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/_examples/pubsubs/aws-sns/main.go b/_examples/pubsubs/aws-sns/main.go new file mode 100644 index 000000000..e66e116a7 --- /dev/null +++ b/_examples/pubsubs/aws-sns/main.go @@ -0,0 +1,133 @@ +// Sources for https://watermill.io/docs/getting-started/ +package main + +import ( + "context" + "fmt" + "log" + "net/url" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + amazonsns "github.com/aws/aws-sdk-go-v2/service/sns" + amazonsqs "github.com/aws/aws-sdk-go-v2/service/sqs" + transport "github.com/aws/smithy-go/endpoints" + "github.com/samber/lo" + + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill-aws/sns" + "github.com/ThreeDotsLabs/watermill-aws/sqs" + "github.com/ThreeDotsLabs/watermill/message" +) + +func main() { + logger := watermill.NewStdLogger(false, false) + + snsOpts := []func(*amazonsns.Options){ + amazonsns.WithEndpointResolverV2(sns.OverrideEndpointResolver{ + Endpoint: transport.Endpoint{ + URI: *lo.Must(url.Parse("http://localstack:4566")), + }, + }), + } + + sqsOpts := []func(*amazonsqs.Options){ + amazonsqs.WithEndpointResolverV2(sqs.OverrideEndpointResolver{ + Endpoint: transport.Endpoint{ + URI: *lo.Must(url.Parse("http://localstack:4566")), + }, + }), + } + + topicResolver, err := sns.NewGenerateArnTopicResolver("000000000000", "us-east-1") + if err != nil { + panic(err) + } + + newSubscriber := func(name string) (message.Subscriber, error) { + subscriberConfig := sns.SubscriberConfig{ + AWSConfig: aws.Config{ + Credentials: aws.AnonymousCredentials{}, + }, + OptFns: snsOpts, + TopicResolver: topicResolver, + GenerateSqsQueueName: func(ctx context.Context, snsTopic sns.TopicArn) (string, error) { + topic, err := sns.ExtractTopicNameFromTopicArn(snsTopic) + if err != nil { + return "", err + } + + return fmt.Sprintf("%v-%v", topic, name), nil + }, + } + + sqsSubscriberConfig := sqs.SubscriberConfig{ + AWSConfig: aws.Config{ + Credentials: aws.AnonymousCredentials{}, + }, + OptFns: sqsOpts, + } + + return sns.NewSubscriber(subscriberConfig, sqsSubscriberConfig, logger) + } + + subA, err := newSubscriber("subA") + if err != nil { + panic(err) + } + + subB, err := newSubscriber("subB") + if err != nil { + panic(err) + } + + messagesA, err := subA.Subscribe(context.Background(), "example-topic") + if err != nil { + panic(err) + } + + messagesB, err := subB.Subscribe(context.Background(), "example-topic") + if err != nil { + panic(err) + } + + go process("A", messagesA) + go process("B", messagesB) + + publisherConfig := sns.PublisherConfig{ + AWSConfig: aws.Config{ + Credentials: aws.AnonymousCredentials{}, + }, + OptFns: snsOpts, + TopicResolver: topicResolver, + } + + publisher, err := sns.NewPublisher(publisherConfig, logger) + if err != nil { + panic(err) + } + + publishMessages(publisher) +} + +func publishMessages(publisher message.Publisher) { + for { + msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!")) + + if err := publisher.Publish("example-topic", msg); err != nil { + panic(err) + } + + time.Sleep(time.Second) + } +} + +func process(prefix string, messages <-chan *message.Message) { + for msg := range messages { + log.Printf("%v received message: %s, payload: %s", prefix, msg.UUID, string(msg.Payload)) + + // we need to Acknowledge that we received and processed the message, + // otherwise, it will be resent over and over again. + msg.Ack() + } +} diff --git a/_examples/pubsubs/aws-sqs/.validate_example.yml b/_examples/pubsubs/aws-sqs/.validate_example.yml new file mode 100644 index 000000000..b60d0b097 --- /dev/null +++ b/_examples/pubsubs/aws-sqs/.validate_example.yml @@ -0,0 +1,4 @@ +validation_cmd: "docker compose up" +teardown_cmd: "docker compose down" +timeout: 120 +expected_output: "received message: [0-9a-f\\-]+, payload: Hello, world!" diff --git a/_examples/pubsubs/aws-sqs/docker-compose.yml b/_examples/pubsubs/aws-sqs/docker-compose.yml new file mode 100644 index 000000000..fa75e2c2e --- /dev/null +++ b/_examples/pubsubs/aws-sqs/docker-compose.yml @@ -0,0 +1,24 @@ +services: + server: + image: golang:1.23 + restart: unless-stopped + volumes: + - .:/app + - $GOPATH/pkg/mod:/go/pkg/mod + working_dir: /app + command: go run main.go + + localstack: + image: localstack/localstack:latest + environment: + - SERVICES=sqs,sns + - AWS_DEFAULT_REGION=us-east-1 + - EDGE_PORT=4566 + ports: + - "4566-4597:4566-4597" + healthcheck: + test: awslocal sqs list-queues && awslocal sns list-topics + interval: 5s + timeout: 5s + retries: 5 + start_period: 30s diff --git a/_examples/pubsubs/aws-sqs/go.mod b/_examples/pubsubs/aws-sqs/go.mod new file mode 100644 index 000000000..282fe5acf --- /dev/null +++ b/_examples/pubsubs/aws-sqs/go.mod @@ -0,0 +1,22 @@ +module main + +require ( + github.com/ThreeDotsLabs/watermill v1.3.7 + github.com/ThreeDotsLabs/watermill-aws v1.0.0-rc.2 + github.com/aws/aws-sdk-go-v2 v1.30.4 + github.com/aws/aws-sdk-go-v2/service/sqs v1.34.4 + github.com/aws/smithy-go v1.20.4 + github.com/samber/lo v1.47.0 +) + +require ( + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/lithammer/shortuuid/v3 v3.0.7 // indirect + github.com/oklog/ulid v1.3.1 // indirect + github.com/pkg/errors v0.9.1 // indirect + golang.org/x/text v0.16.0 // indirect +) + +go 1.21 diff --git a/_examples/pubsubs/aws-sqs/go.sum b/_examples/pubsubs/aws-sqs/go.sum new file mode 100644 index 000000000..d04189996 --- /dev/null +++ b/_examples/pubsubs/aws-sqs/go.sum @@ -0,0 +1,57 @@ +github.com/ThreeDotsLabs/watermill v1.3.7 h1:NV0PSTmuACVEOV4dMxRnmGXrmbz8U83LENOvpHekN7o= +github.com/ThreeDotsLabs/watermill v1.3.7/go.mod h1:lBnrLbxOjeMRgcJbv+UiZr8Ylz8RkJ4m6i/VN/Nk+to= +github.com/ThreeDotsLabs/watermill-aws v1.0.0-rc.2 h1:5eyROGg3sIB4eZGEkDjbjE/TXgqCVpR1yTGAWp9zxbQ= +github.com/ThreeDotsLabs/watermill-aws v1.0.0-rc.2/go.mod h1:TTht9EHVmVF5iR5joom6VymqgkwcuxsRQfLBc5ITy28= +github.com/aws/aws-sdk-go-v2 v1.30.4 h1:frhcagrVNrzmT95RJImMHgabt99vkXGslubDaDagTk8= +github.com/aws/aws-sdk-go-v2 v1.30.4/go.mod h1:CT+ZPWXbYrci8chcARI3OmI/qgd+f6WtuLOoaIA8PR0= +github.com/aws/aws-sdk-go-v2/config v1.27.28 h1:OTxWGW/91C61QlneCtnD62NLb4W616/NM1jA8LhJqbg= +github.com/aws/aws-sdk-go-v2/config v1.27.28/go.mod h1:uzVRVtJSU5EFv6Fu82AoVFKozJi2ZCY6WRCXj06rbvs= +github.com/aws/aws-sdk-go-v2/credentials v1.17.28 h1:m8+AHY/ND8CMHJnPoH7PJIRakWGa4gbfbxuY9TGTUXM= +github.com/aws/aws-sdk-go-v2/credentials v1.17.28/go.mod h1:6TF7dSc78ehD1SL6KpRIPKMA1GyyWflIkjqg+qmf4+c= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.12 h1:yjwoSyDZF8Jth+mUk5lSPJCkMC0lMy6FaCD51jm6ayE= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.12/go.mod h1:fuR57fAgMk7ot3WcNQfb6rSEn+SUffl7ri+aa8uKysI= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16 h1:TNyt/+X43KJ9IJJMjKfa3bNTiZbUP7DeCxfbTROESwY= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.16/go.mod h1:2DwJF39FlNAUiX5pAc0UNeiz16lK2t7IaFcm0LFHEgc= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16 h1:jYfy8UPmd+6kJW5YhY0L1/KftReOGxI/4NtVSTh9O/I= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.16/go.mod h1:7ZfEPZxkW42Afq4uQB8H2E2e6ebh6mXTueEpYzjCzcs= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvKgqdiXoTxAF4HQcQ= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.4 h1:KypMCbLPPHEmf9DgMGw51jMj77VfGPAN2Kv4cfhlfgI= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.4/go.mod h1:Vz1JQXliGcQktFTN/LN6uGppAIRoLBR2bMvIMP0gOjc= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.18 h1:tJ5RnkHCiSH0jyd6gROjlJtNwov0eGYNz8s8nFcR0jQ= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.18/go.mod h1:++NHzT+nAF7ZPrHPsA+ENvsXkOO8wEu+C6RXltAG4/c= +github.com/aws/aws-sdk-go-v2/service/sqs v1.34.4 h1:FXPO72iKC5YmYNEANltl763bUj8A6qT20wx8Jwvxlsw= +github.com/aws/aws-sdk-go-v2/service/sqs v1.34.4/go.mod h1:7idt3XszF6sE9WPS1GqZRiDJOxw4oPtlRBXodWnCGjU= +github.com/aws/aws-sdk-go-v2/service/sso v1.22.5 h1:zCsFCKvbj25i7p1u94imVoO447I/sFv8qq+lGJhRN0c= +github.com/aws/aws-sdk-go-v2/service/sso v1.22.5/go.mod h1:ZeDX1SnKsVlejeuz41GiajjZpRSWR7/42q/EyA/QEiM= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.5 h1:SKvPgvdvmiTWoi0GAJ7AsJfOz3ngVkD/ERbs5pUnHNI= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.26.5/go.mod h1:20sz31hv/WsPa3HhU3hfrIet2kxM4Pe0r20eBZ20Tac= +github.com/aws/aws-sdk-go-v2/service/sts v1.30.4 h1:iAckBT2OeEK/kBDyN/jDtpEExhjeeA/Im2q4X0rJZT8= +github.com/aws/aws-sdk-go-v2/service/sts v1.30.4/go.mod h1:vmSqFK+BVIwVpDAGZB3CoCXHzurt4qBE8lf+I/kRTh0= +github.com/aws/smithy-go v1.20.4 h1:2HK1zBdPgRbjFOHlfeQZfpC4r72MOb9bZkiFwggKO+4= +github.com/aws/smithy-go v1.20.4/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= +github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/lithammer/shortuuid/v3 v3.0.7 h1:trX0KTHy4Pbwo/6ia8fscyHoGA+mf1jWbPJVuvyJQQ8= +github.com/lithammer/shortuuid/v3 v3.0.7/go.mod h1:vMk8ke37EmiewwolSO1NLW8vP4ZaKlRuDIi8tWWmAts= +github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= +github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/samber/lo v1.47.0 h1:z7RynLwP5nbyRscyvcD043DWYoOcYRv3mV8lBeqOCLc= +github.com/samber/lo v1.47.0/go.mod h1:RmDH9Ct32Qy3gduHQuKJ3gW1fMHAnE/fAzQuf6He5cU= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/_examples/pubsubs/aws-sqs/main.go b/_examples/pubsubs/aws-sqs/main.go new file mode 100644 index 000000000..b3c96848b --- /dev/null +++ b/_examples/pubsubs/aws-sqs/main.go @@ -0,0 +1,85 @@ +// Sources for https://watermill.io/docs/getting-started/ +package main + +import ( + "context" + "log" + "net/url" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + amazonsqs "github.com/aws/aws-sdk-go-v2/service/sqs" + transport "github.com/aws/smithy-go/endpoints" + "github.com/samber/lo" + + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill-aws/sqs" + "github.com/ThreeDotsLabs/watermill/message" +) + +func main() { + logger := watermill.NewStdLogger(false, false) + + sqsOpts := []func(*amazonsqs.Options){ + amazonsqs.WithEndpointResolverV2(sqs.OverrideEndpointResolver{ + Endpoint: transport.Endpoint{ + URI: *lo.Must(url.Parse("http://localstack:4566")), + }, + }), + } + + subscriberConfig := sqs.SubscriberConfig{ + AWSConfig: aws.Config{ + Credentials: aws.AnonymousCredentials{}, + }, + OptFns: sqsOpts, + } + + subscriber, err := sqs.NewSubscriber(subscriberConfig, logger) + if err != nil { + panic(err) + } + + messages, err := subscriber.Subscribe(context.Background(), "example-topic") + if err != nil { + panic(err) + } + + go process(messages) + + publisherConfig := sqs.PublisherConfig{ + AWSConfig: aws.Config{ + Credentials: aws.AnonymousCredentials{}, + }, + OptFns: sqsOpts, + } + + publisher, err := sqs.NewPublisher(publisherConfig, logger) + if err != nil { + panic(err) + } + + publishMessages(publisher) +} + +func publishMessages(publisher message.Publisher) { + for { + msg := message.NewMessage(watermill.NewUUID(), []byte("Hello, world!")) + + if err := publisher.Publish("example-topic", msg); err != nil { + panic(err) + } + + time.Sleep(time.Second) + } +} + +func process(messages <-chan *message.Message) { + for msg := range messages { + log.Printf("received message: %s, payload: %s", msg.UUID, string(msg.Payload)) + + // we need to Acknowledge that we received and processed the message, + // otherwise, it will be resent over and over again. + msg.Ack() + } +} diff --git a/docs/content/docs/getting-started.md b/docs/content/docs/getting-started.md index 4c0765937..abf46ab1c 100644 --- a/docs/content/docs/getting-started.md +++ b/docs/content/docs/getting-started.md @@ -198,6 +198,42 @@ A more detailed explanation of how it is working (and how to add live code reloa {{% load-snippet-partial file="src-link/_examples/pubsubs/sql/main.go" first_line_contains="func process" %}} {{< /tab >}} +{{< tab "AWS SQS" "aws-sqs" >}} + +
+Running in Docker + +{{% load-snippet file="src-link/_examples/pubsubs/aws-sqs/docker-compose.yml" type="yaml" %}} + +The source should go to `main.go`. + +To run, execute `docker-compose up`. + +A more detailed explanation of how it is working (and how to add live code reload) can be found in [*Go Docker dev environment* article](https://threedots.tech/post/go-docker-dev-environment-with-go-modules-and-live-code-reloading/). +
+ +{{% load-snippet-partial file="src-link/_examples/pubsubs/aws-sqs/main.go" first_line_contains="package main" last_line_contains="process(messages)" %}} +{{% load-snippet-partial file="src-link/_examples/pubsubs/aws-sqs/main.go" first_line_contains="func process" %}} +{{< /tab >}} + +{{< tab "AWS SNS" "aws-sns" >}} + +
+Running in Docker + +{{% load-snippet file="src-link/_examples/pubsubs/aws-sns/docker-compose.yml" type="yaml" %}} + +The source should go to `main.go`. + +To run, execute `docker-compose up`. + +A more detailed explanation of how it is working (and how to add live code reload) can be found in [*Go Docker dev environment* article](https://threedots.tech/post/go-docker-dev-environment-with-go-modules-and-live-code-reloading/). +
+ +{{% load-snippet-partial file="src-link/_examples/pubsubs/aws-sns/main.go" first_line_contains="package main" last_line_contains="go process(" padding_after="1" %}} +{{% load-snippet-partial file="src-link/_examples/pubsubs/aws-sns/main.go" first_line_contains="func process" %}} +{{< /tab >}} + {{< /tabs >}} ### Creating Messages @@ -249,6 +285,14 @@ if err != nil { {{% load-snippet-partial file="src-link/_examples/pubsubs/sql/main.go" first_line_contains="message.NewMessage" last_line_contains="publisher.Publish" padding_after="2" %}} {{< /tab >}} +{{< tab "AWS SQS" "aws-sqs" >}} +{{% load-snippet-partial file="src-link/_examples/pubsubs/aws-sqs/main.go" first_line_contains="message.NewMessage" last_line_contains="publisher.Publish" padding_after="2" %}} +{{< /tab >}} + +{{< tab "AWS SNS" "aws-sns" >}} +{{% load-snippet-partial file="src-link/_examples/pubsubs/aws-sns/main.go" first_line_contains="message.NewMessage" last_line_contains="publisher.Publish" padding_after="2" %}} +{{< /tab >}} + {{< /tabs >}} ## Router diff --git a/docs/hugo_stats.json b/docs/hugo_stats.json index cb541b1e1..c201774b2 100644 --- a/docs/hugo_stats.json +++ b/docs/hugo_stats.json @@ -433,6 +433,10 @@ "tabs-getting-started-4-tab", "tabs-getting-started-5", "tabs-getting-started-5-tab", + "tabs-getting-started-6", + "tabs-getting-started-6-tab", + "tabs-getting-started-7", + "tabs-getting-started-7-tab", "tabs-publishing-0", "tabs-publishing-0-tab", "tabs-publishing-1", @@ -445,6 +449,10 @@ "tabs-publishing-4-tab", "tabs-publishing-5", "tabs-publishing-5-tab", + "tabs-publishing-6", + "tabs-publishing-6-tab", + "tabs-publishing-7", + "tabs-publishing-7-tab", "testing", "the-pubsub-interface", "throttle",