diff --git a/pubsub/gcp/config.go b/pubsub/gcp/config.go index 3e742812a..08f6e2c6f 100644 --- a/pubsub/gcp/config.go +++ b/pubsub/gcp/config.go @@ -1,6 +1,9 @@ package gcp -import "github.com/kelseyhightower/envconfig" +import ( + gpubsub "cloud.google.com/go/pubsub" + "github.com/kelseyhightower/envconfig" +) // Config holds common credentials and config values for // working with GCP PubSub. @@ -9,6 +12,15 @@ type Config struct { // For publishing Topic string `envconfig:"GCP_PUBSUB_TOPIC"` + + // Batch settings for GCP publisher + // See: https://godoc.org/cloud.google.com/go/pubsub#PublishSettings + // Notes: + // This config will not allow you to set zero values for PublishSettings. + // Applications using these settings should be aware that Publish requests + // will block until the lowest of the thresholds in PublishSettings is met. + PublishSettings gpubsub.PublishSettings + // For subscribing Subscription string `envconfig:"GCP_PUBSUB_SUBSCRIPTION"` } diff --git a/pubsub/gcp/gcp.go b/pubsub/gcp/gcp.go index 61a07de2d..43e85b47a 100644 --- a/pubsub/gcp/gcp.go +++ b/pubsub/gcp/gcp.go @@ -142,9 +142,26 @@ func NewPublisher(ctx context.Context, cfg Config, opts ...option.ClientOption) if err != nil { return nil, err } - + t := c.Topic(cfg.Topic) + // Update PublishSettings from cfg.PublishSettings + // but never set thresholds to 0. + if cfg.PublishSettings.DelayThreshold > 0 { + t.PublishSettings.DelayThreshold = cfg.PublishSettings.DelayThreshold + } + if cfg.PublishSettings.CountThreshold > 0 { + t.PublishSettings.CountThreshold = cfg.PublishSettings.CountThreshold + } + if cfg.PublishSettings.ByteThreshold > 0 { + t.PublishSettings.ByteThreshold = cfg.PublishSettings.ByteThreshold + } + if cfg.PublishSettings.NumGoroutines > 0 { + t.PublishSettings.NumGoroutines = cfg.PublishSettings.NumGoroutines + } + if cfg.PublishSettings.Timeout > 0 { + t.PublishSettings.Timeout = cfg.PublishSettings.Timeout + } return &publisher{ - topic: c.Topic(cfg.Topic), + topic: t, }, nil }