diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 3053605fa..c593abbc8 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -2,7 +2,7 @@ ## How can I help? -We are always happy to help you in contributing to Watermill. If you have any ideas, please let us know on our [Slack channel](https://github.com/ThreeDotsLabs/watermill#support). +We are always happy to help you in contributing to Watermill. If you have any ideas, please let us know on our [Discord server](https://watermill.io/support/). There are multiple ways in which you can help us. @@ -25,7 +25,7 @@ When adding a new Pub/Sub implementation, you should start with this guide: [htt ### New ideas If you have any idea that is not covered in the issues list, please post a new issue describing it. -It's recommended to discuss your idea on Slack/GitHub before creating production-ready implementation - in some situations, it may save a lot of your time before implementing something that can be simplified or done more easily. :) +It's recommended to discuss your idea on [Discord](https://discord.gg/QV6VFg4YQE)/GitHub before creating production-ready implementation - in some situations, it may save a lot of your time before implementing something that can be simplified or done more easily. :) In general, it's helpful to discuss a Proof of Concept to align with the idea. diff --git a/README.md b/README.md index bfd8ad506..91761779f 100644 --- a/README.md +++ b/README.md @@ -140,7 +140,7 @@ Here's the short version for message size of 16 bytes. If you didn't find the answer to your question in [the documentation](https://watermill.io/), feel free to ask us directly! -Please join us on the `#watermill` channel on the [Gophers slack](https://gophers.slack.com/): You can get an invite [here](https://gophersinvite.herokuapp.com/). +Please join us on the `#watermill` channel on the [Three Dots Labs Discord](https://discord.gg/QV6VFg4YQE). Every bit of feedback is very welcome and appreciated. Please submit it using [the survey](https://www.surveymonkey.com/r/WZXD392). diff --git a/docs/build.sh b/docs/build.sh index 0501b5f74..524ec2e18 100755 --- a/docs/build.sh +++ b/docs/build.sh @@ -72,6 +72,7 @@ cloneOrPull "https://github.com/ThreeDotsLabs/watermill-io.git" content/src-link cloneOrPull "https://github.com/ThreeDotsLabs/watermill-kafka.git" content/src-link/watermill-kafka cloneOrPull "https://github.com/ThreeDotsLabs/watermill-nats.git" content/src-link/watermill-nats cloneOrPull "https://github.com/ThreeDotsLabs/watermill-sql.git" content/src-link/watermill-sql +cloneOrPull "https://github.com/ThreeDotsLabs/watermill-firestore.git" content/src-link/watermill-firestore python3 ./extract_middleware_godocs.py > content/src-link/middleware-defs.md diff --git a/docs/config.toml b/docs/config.toml index 58c4cd5ee..7a5bb94f0 100644 --- a/docs/config.toml +++ b/docs/config.toml @@ -38,3 +38,7 @@ googleAnalytics = "UA-128588911-2" name = "Support" weight = 0 url = "/support/" + +[markup] +defaultMarkdownHandler = 'blackfriday' + diff --git a/docs/content/pubsubs/firestore.md b/docs/content/pubsubs/firestore.md new file mode 100644 index 000000000..7365f7bec --- /dev/null +++ b/docs/content/pubsubs/firestore.md @@ -0,0 +1,84 @@ ++++ +title = "Firestore Pub/Sub" +description = "A scalable document database from Google" +date = 2021-07-29T15:30:00+02:00 +bref = "A scalable document database from Google" +weight = 0 +type = "docs" +toc = false ++++ + +### Firestore Pub/Sub + +Cloud Firestore is a cloud-hosted, NoSQL database from Google. + +This Pub/Sub comes with two publishers. To publish messages in a transaction +use the `TransactionalPublisher`. If you do not want to publish messages in +transaction use the normal `Publisher`. + +Using Firestore as a Pub/Sub instead of using a dedicated Pub/Sub system can be +useful to publish messages in transaction while at the same time saving other +data in Firestore. Thanks to that the data and the messages can be consistently +persisted. If the messages and the data weren't being published transactionally +you could end up in situations where messages were emitted even though the data +wasn't saved or messages weren't emitted even though the data was saved. After +transactionally publishing messages in Firestore you can then subscribe to them +and relay them to a different Pub/Sub system. + +Godoc: + +Firestore documentation: + +#### Characteristics + +| Feature | Implements | Note | +| ------- | ---------- | ---- | +| ConsumerGroups | no | | +| ExactlyOnceDelivery | no | | +| GuaranteedOrder | no | | +| Persistent | yes | | + +#### Configuration + +##### Publisher configuration + +{{% render-md %}} +{{% load-snippet-partial file="src-link/watermill-firestore/pkg/firestore/publisher.go" first_line_contains="type PublisherConfig struct {" last_line_equals="}" %}} +{{% /render-md %}} + +##### Subscriber configuration + +{{% render-md %}} +{{% load-snippet-partial file="src-link/watermill-firestore/pkg/firestore/subscriber.go" first_line_contains="type SubscriberConfig struct {" last_line_equals="}" %}} +{{% /render-md %}} + +##### Subscription name + +To receive messages published to a topic, you must create a subscription to +that topic. Only messages published to the topic after the subscription is +created will be received by the subscribers. + +A topic can have multiple subscriptions, but a given subscription belongs to a +single topic. + +In Watermill, the subscription is created automatically during calling +`Subscribe()`. Subscription name is generated by function passed to +`SubscriberConfig.GenerateSubscriptionName`. By default, it is just the topic +name with a suffix `_sub` appended to it. + +If you want to consume messages from a topic with multiple subscribers +processing the incoming messages in a different way, you should use a custom +function to generate unique subscription names for each subscriber. + +#### Marshaler + +Watermill's messages cannot be stored directly in Firestore. The marshaler is +responsible for converting them to a type which can be stored by Firestore. +The default implementation should be enough for most applications so it is +unlikely that you need to implement your own marshaler. + +{{% render-md %}} +{{% load-snippet-partial file="src-link/watermill-firestore/pkg/firestore/marshaler.go" first_line_contains="// Marshaler" last_line_equals="}" padding_after="0" %}} +{{% /render-md %}} + + diff --git a/docs/content/support.md b/docs/content/support.md index 47bcb1a65..b39c11278 100644 --- a/docs/content/support.md +++ b/docs/content/support.md @@ -5,7 +5,7 @@ description = "" ### Community Support -Please join us on the `#watermill` channel on the [Gophers slack](https://gophers.slack.com/). You can get the invite [here](https://gophersinvite.herokuapp.com/). +Please join us on the `#watermill` channel on the [Three Dots Labs discord](https://discord.gg/QV6VFg4YQE). ### Professional Support diff --git a/docs/layouts/shortcodes/load-snippet-partial.html b/docs/layouts/shortcodes/load-snippet-partial.html index 36a05613a..365f4d966 100644 --- a/docs/layouts/shortcodes/load-snippet-partial.html +++ b/docs/layouts/shortcodes/load-snippet-partial.html @@ -3,6 +3,7 @@ {{ $first_line_contains := (.Get "first_line_contains") }} {{ $last_line_contains := (.Get "last_line_contains") }} +{{ $last_line_equals := (.Get "last_line_equals") }} {{ $show_line := false }} @@ -51,6 +52,10 @@ {{ $last_line_found = true }} {{ end }} + {{ if and ($first_line_found) (eq $elem_val $last_line_equals) (ne $last_line_equals "") }} + {{ $last_line_found = true }} + {{ end }} + {{ if and $last_line_found $show_line }} {{ if gt $padding_after 0 }} {{ $padding_after = sub $padding_after 1}} @@ -72,4 +77,4 @@ {{if and (not $last_line_found) (ne $last_line_contains "") }} {{ errorf "`last_line_contains` %s not found in %s snippet" $last_line_contains $file }} -{{end}} \ No newline at end of file +{{end}} diff --git a/message/router.go b/message/router.go index dd6fe3423..bcdd97acc 100644 --- a/message/router.go +++ b/message/router.go @@ -279,12 +279,12 @@ func (r *Router) AddNoPublisherHandler( subscribeTopic string, subscriber Subscriber, handlerFunc NoPublishHandlerFunc, -) { +) *Handler { handlerFuncAdapter := func(msg *Message) ([]*Message, error) { return nil, handlerFunc(msg) } - r.AddHandler(handlerName, subscribeTopic, subscriber, "", disabledPublisher{}, handlerFuncAdapter) + return r.AddHandler(handlerName, subscribeTopic, subscriber, "", disabledPublisher{}, handlerFuncAdapter) } // Run runs all plugins and handlers and starts subscribing to provided topics. diff --git a/netlify.toml b/netlify.toml index 942ceb468..8ee29659f 100644 --- a/netlify.toml +++ b/netlify.toml @@ -4,14 +4,14 @@ publish = "docs/public/" [context.production.environment] - HUGO_VERSION = "0.53" + HUGO_VERSION = "0.87.0" HUGO_ENV = "production" HUGO_ENABLEGITINFO = "true" [context.deploy-preview.environment] - HUGO_VERSION = "0.53" + HUGO_VERSION = "0.87.0" HUGO_ENABLEGITINFO = "true" [context.branch-deploy.environment] - HUGO_VERSION = "0.53" + HUGO_VERSION = "0.87.0" HUGO_ENABLEGITINFO = "true" diff --git a/tools/mill/README.md b/tools/mill/README.md index 0ebbbe073..2c7c9f39c 100644 --- a/tools/mill/README.md +++ b/tools/mill/README.md @@ -88,3 +88,13 @@ mill googlecloud subscription rm ``` Additional flags are available for `subscription add` to regulate the newly created subscription's settings. + +#### Listing subscriptions + +You can use `mill` to list existings subscriptions: + +```bash +mill googlecloud subscription ls [-t topic] +``` + +The topic is optional. If omitted, all topics will be listed with their subscriptions. diff --git a/tools/mill/cmd/googlecloud.go b/tools/mill/cmd/googlecloud.go index 541024c67..7fce5e16a 100644 --- a/tools/mill/cmd/googlecloud.go +++ b/tools/mill/cmd/googlecloud.go @@ -2,17 +2,20 @@ package cmd import ( "context" + "fmt" "os" "strings" "time" "cloud.google.com/go/pubsub" + "github.com/ThreeDotsLabs/watermill" + "github.com/ThreeDotsLabs/watermill-googlecloud/pkg/googlecloud" + "github.com/ThreeDotsLabs/watermill/tools/mill/cmd/internal" "github.com/pkg/errors" "github.com/spf13/cobra" "github.com/spf13/viper" - - "github.com/ThreeDotsLabs/watermill" - "github.com/ThreeDotsLabs/watermill-googlecloud/pkg/googlecloud" + "google.golang.org/api/iterator" + "gopkg.in/yaml.v2" ) var googleCloudTempSubscriptionID string @@ -30,7 +33,6 @@ For the configuration of consuming/producing of the messages, check the help of } logger.Debug("Using Google Cloud Pub/Sub", nil) - if cmd.Use == "consume" { subName := viper.GetString("googlecloud.consume.subscription") if subName == "" { @@ -94,7 +96,7 @@ var googleCloudSubscriptionAddCmd = &cobra.Command{ RunE: func(cmd *cobra.Command, args []string) (err error) { subID := args[0] - topic := viper.GetString("googlecloud..topic") + topic := viper.GetString("googlecloud.subscription.add.topic") ackDeadline := viper.GetDuration("googlecloud.subscription.add.ackDeadline") retainAcked := viper.GetBool("googlecloud.subscription.add.retainAcked") retentionDuration := viper.GetDuration("googlecloud.subscription.add.retentionDuration") @@ -160,6 +162,26 @@ var googleCloudSubscriptionRmCmd = &cobra.Command{ }, } +var googleCloudSubscriptionLsCmd = &cobra.Command{ + Use: "ls", + Short: "List subscriptions in Google Cloud Pub/Sub. Topic may be provided optionally to filter subscriptions by topic.", + Args: cobra.NoArgs, + RunE: func(cmd *cobra.Command, args []string) (err error) { + topic := viper.GetString("googlecloud.subscription.ls.topic") + verbose := viper.GetBool("googlecloud.subscription.ls.verbose") + + logger := logger + if topic != "" { + logger = logger.With(watermill.LogFields{ + "topic": topic, + }) + } + logger.Info("Listing all subscriptions", nil) + + return listSubscriptions(topic, logger, verbose) + }, +} + func generateTempSubscription() (id string, err error) { defer func() { if err == nil { @@ -256,6 +278,120 @@ func removeSubscription(id string) error { return sub.Delete(ctx) } +func listSubscriptions(topic string, adapter watermill.LoggerAdapter, verbose bool) error { + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + defer cancel() + + client, err := pubsub.NewClient(ctx, projectID()) + if err != nil { + return errors.Wrap(err, "could not create pubsub client") + } + + if topic != "" { + topic := client.Topic(topic) + return listSubscriptionsForTopic(ctx, client, topic, logger, verbose) + } + + it := client.Topics(ctx) + noTopics := true + for { + topic, err := it.Next() + if err == iterator.Done { + if noTopics { + logger.Info("No topics in project", nil) + } + return nil + } + if err != nil { + return errors.Wrap(err, "could not retrieve next subscription") + } + + noTopics = false + err = listSubscriptionsForTopic(ctx, client, topic, logger, verbose) + if err != nil { + return errors.Wrap(err, "error listing subscriptions for topic") + } + } + + return nil +} + +func listSubscriptionsForTopic(ctx context.Context, client *pubsub.Client, topic *pubsub.Topic, logger watermill.LoggerAdapter, verbose bool) error { + noSubs := true + exists, err := topic.Exists(ctx) + if err != nil { + return errors.Wrap(err, "could not check if topic exists") + } + if !exists { + logger.Info("Topic does not exist", watermill.LogFields{"topic": topic.String()}) + return nil + } + + it := topic.Subscriptions(ctx) + for { + sub, err := it.Next() + if err == iterator.Done { + if noSubs { + logger.Info("No subscriptions for the topic", watermill.LogFields{"topic": topic.String()}) + } + return nil + } + if err != nil { + return errors.Wrap(err, "could not retrieve next subscription") + } + + if noSubs { + noSubs = false + fmt.Printf("Topic %s:\n", topic.String()) + } + name := sub.String() + config, err := sub.Config(ctx) + if err != nil { + return errors.Wrapf(err, "could not retrieve subscription config for subscription '%s'", name) + } + + err = printSubscriptionInfo(name, config) + if err != nil { + return errors.Wrapf(err, "error printing subscription '%s'", name) + } + } +} + +func printSubscriptionInfo(name string, config pubsub.SubscriptionConfig) error { + b, err := yaml.Marshal(subscriptionConfig{ + Name: name, + PushConfig: subscriptionConfigPushConfig{ + Endpoint: config.PushConfig.Endpoint, + Attributes: config.PushConfig.Attributes, + }, + AckDeadline: config.AckDeadline, + RetainAckedMessages: config.RetainAckedMessages, + RetentionDuration: config.RetentionDuration, + Labels: config.Labels, + }) + if err != nil { + return err + } + + fmt.Printf(internal.Indent(string(b), " ")) + return nil +} + +// subscriptionConfig provides a marshallable form to pubsub.SubscriptionConfig +type subscriptionConfig struct { + Name string + PushConfig subscriptionConfigPushConfig `yaml:"push_config"` + AckDeadline time.Duration `yaml:"ack_deadline"` + RetainAckedMessages bool `yaml:"retain_acked_messages"` + RetentionDuration time.Duration `yaml:"retention_duration"` + Labels map[string]string `yaml:"labels"` +} + +type subscriptionConfigPushConfig struct { + Endpoint string + Attributes map[string]string +} + func projectID() string { projectID := viper.GetString("googlecloud.projectID") if projectID == "" { @@ -272,10 +408,10 @@ func init() { "topic", "t", "", - "The topic to produce messages to (produce), consume message from (consume) or the topic for the newly created subscription (subscription.add)", + "The topic to produce messages to (produce), consume message from (consume), list from (ls) or the topic for the newly created subscription (subscription.add)", ) - ensure(googleCloudCmd.MarkPersistentFlagRequired("topic")) ensure(viper.BindPFlag("googlecloud.topic", googleCloudCmd.PersistentFlags().Lookup("topic"))) + ensure(googleCloudCmd.MarkPersistentFlagRequired("topic")) consumeCmd := addConsumeCmd(googleCloudCmd, "googlecloud.topic") addProduceCmd(googleCloudCmd, "googlecloud.topic") @@ -295,7 +431,6 @@ func init() { googleCloudCmd.AddCommand(googleCloudSubscriptionCmd) googleCloudSubscriptionCmd.AddCommand(googleCloudSubscriptionAddCmd) - googleCloudSubscriptionCmd.AddCommand(googleCloudSubscriptionRmCmd) googleCloudSubscriptionAddCmd.Flags().StringP("topic", "t", "", "The topic for the new subscription (required)") ensure(googleCloudSubscriptionAddCmd.MarkFlagRequired("topic")) @@ -330,4 +465,23 @@ func init() { "The set of labels for the subscription. Format: '--labels key1=value1,key2=value2,...'", ) ensure(viper.BindPFlag("googlecloud.subscription.add.labels", googleCloudSubscriptionAddCmd.Flags().Lookup("labels"))) + + googleCloudSubscriptionCmd.AddCommand(googleCloudSubscriptionRmCmd) + + googleCloudSubscriptionCmd.AddCommand(googleCloudSubscriptionLsCmd) + googleCloudSubscriptionLsCmd.Flags().StringP( + "topic", + "t", + "", + "The topic for the new subscription (optional, will list subscriptions for all topics if omitted)", + ) + ensure(viper.BindPFlag("googlecloud.subscription.ls.topic", googleCloudSubscriptionLsCmd.Flags().Lookup("topic"))) + + googleCloudSubscriptionLsCmd.Flags().BoolP( + "verbose", + "v", + false, + "will print more information, including the subscription config", + ) + ensure(viper.BindPFlag("googlecloud.subscription.ls.verbose", googleCloudSubscriptionLsCmd.Flags().Lookup("verbose"))) } diff --git a/tools/mill/cmd/internal/indent.go b/tools/mill/cmd/internal/indent.go new file mode 100644 index 000000000..3b1842e11 --- /dev/null +++ b/tools/mill/cmd/internal/indent.go @@ -0,0 +1,18 @@ +package internal + +import "strings" + +func Indent(s, prefix string) string { + endsWithNewline := strings.HasSuffix(s, "\n") + split := strings.Split(s, "\n") + + for i, ss := range split { + split[i] = prefix + ss + } + joined := strings.Join(split, "\n") + if endsWithNewline { + joined += "\n" + } + + return joined +}