Skip to content

Commit

Permalink
Update to add explicit Register{PROVIDER}Schemes methods (#5)
Browse files Browse the repository at this point in the history
* update vendor deps

* add RegisterGoCloudPublisherSchemes

* snapshot: add RegisterFooPublishers/Subscribers methods

---------

Co-authored-by: sfomuseumbot <sfomuseumbot@localhost>
  • Loading branch information
thisisaaronland and sfomuseumbot authored Oct 11, 2023
1 parent aa71b25 commit 35a856f
Show file tree
Hide file tree
Showing 463 changed files with 53,847 additions and 11,079 deletions.
56 changes: 28 additions & 28 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,46 +1,46 @@
module github.com/sfomuseum/go-pubsub

go 1.18
go 1.21

require (
github.com/aaronland/go-aws-session v0.1.0
github.com/aaronland/go-aws-session v0.2.1
github.com/aaronland/go-roster v1.0.0
github.com/aws/aws-sdk-go v1.44.264
github.com/aws/aws-sdk-go v1.45.24
github.com/go-redis/redis/v8 v8.11.5
gocloud.dev v0.29.0
gocloud.dev v0.34.0
)

require (
github.com/aaronland/go-string v1.0.0 // indirect
github.com/aws/aws-sdk-go-v2 v1.17.4 // indirect
github.com/aws/aws-sdk-go-v2/config v1.18.12 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.13.12 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.12.22 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.28 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.22 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.29 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.22 // indirect
github.com/aws/aws-sdk-go-v2/service/sns v1.20.2 // indirect
github.com/aws/aws-sdk-go-v2/service/sqs v1.20.2 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.12.1 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.14.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.18.3 // indirect
github.com/aws/smithy-go v1.13.5 // indirect
github.com/aws/aws-sdk-go-v2 v1.20.0 // indirect
github.com/aws/aws-sdk-go-v2/config v1.18.32 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.13.31 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.13.7 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.37 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.31 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.3.38 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.31 // indirect
github.com/aws/aws-sdk-go-v2/service/sns v1.21.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sqs v1.24.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.13.1 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.15.1 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.21.1 // indirect
github.com/aws/smithy-go v1.14.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/wire v0.5.0 // indirect
github.com/googleapis/gax-go/v2 v2.7.0 // indirect
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
go.opencensus.io v0.24.0 // indirect
golang.org/x/net v0.6.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
golang.org/x/net v0.13.0 // indirect
golang.org/x/sync v0.3.0 // indirect
golang.org/x/sys v0.10.0 // indirect
golang.org/x/text v0.11.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/api v0.110.0 // indirect
google.golang.org/genproto v0.0.0-20230209215440-0dfe4f8abfcc // indirect
google.golang.org/grpc v1.53.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
google.golang.org/api v0.134.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230731193218-e0aa005b6bdf // indirect
google.golang.org/grpc v1.57.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
)
2,716 changes: 78 additions & 2,638 deletions go.sum

Large diffs are not rendered by default.

27 changes: 22 additions & 5 deletions publisher/gocloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,34 @@ type GoCloudPublisher struct {
func init() {

ctx := context.Background()
err := RegisterGoCloudPublishers(ctx)

if err != nil {
panic(err)
}
}

// RegisterGoCloudPublishers will explicitly register all the schemes associated with the `GoCloudPublisher` interface.
func RegisterGoCloudPublishers(ctx context.Context) error {

to_register := []string{
"awssqs-creds",
}

RegisterPublisher(ctx, "awssqs-creds", NewGoCloudPublisher)

for _, scheme := range pubsub.DefaultURLMux().TopicSchemes() {
to_register = append(to_register, scheme)
}

for _, scheme := range to_register {

err := RegisterPublisher(ctx, scheme, NewGoCloudPublisher)

if err != nil {
panic(err)
return fmt.Errorf("Failed to register blob writer for '%s', %w", scheme, err)
}
}

return nil
}

func NewGoCloudPublisher(ctx context.Context, uri string) (Publisher, error) {
Expand All @@ -50,7 +67,7 @@ func NewGoCloudPublisher(ctx context.Context, uri string) (Publisher, error) {
region := q.Get("region")
credentials := q.Get("credentials")
queue_url := q.Get("queue-url")

cfg, err := aa_session.NewConfigWithCredentialsAndRegion(credentials, region)

if err != nil {
Expand All @@ -64,7 +81,7 @@ func NewGoCloudPublisher(ctx context.Context, uri string) (Publisher, error) {
}

// https://gocloud.dev/howto/pubsub/publish/#sqs-ctor

topic = awssnssqs.OpenSQSTopic(ctx, sess, queue_url, nil)

default:
Expand Down
6 changes: 5 additions & 1 deletion publisher/null.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ type NullPublisher struct {

func init() {
ctx := context.Background()
RegisterPublisher(ctx, "null", NewNullPublisher)
RegisterNullPublishers(ctx)
}

func RegisterNullPublishers(ctx context.Context) error {
return RegisterPublisher(ctx, "null", NewNullPublisher)
}

func NewNullPublisher(ctx context.Context, uri string) (Publisher, error) {
Expand Down
31 changes: 29 additions & 2 deletions publisher/publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,21 @@ import (
"net/url"
"sort"
"strings"
"sync"

"github.com/aaronland/go-roster"
)

// In principle this could also be done with a sync.OnceFunc call but that will
// require that everyone uses Go 1.21 (whose package import changes broke everything)
// which is literally days old as I write this. So maybe a few releases after 1.21.
//
// Also, _not_ using a sync.OnceFunc means we can call RegisterSchemes multiple times
// if and when multiple gomail-sender instances register themselves.

var register_mu = new(sync.RWMutex)
var register_map = map[string]bool{}

type Publisher interface {
Publish(context.Context, string) error
Close() error
Expand Down Expand Up @@ -44,10 +55,26 @@ func RegisterPublisher(ctx context.Context, scheme string, f PublisherInitialize
return err
}

return publishers.Register(ctx, scheme, f)
register_mu.Lock()
defer register_mu.Unlock()

_, exists := register_map[scheme]

if exists {
return nil
}

err = publishers.Register(ctx, scheme, f)

if err != nil {
return err
}

register_map[scheme] = true
return nil
}

func Schemes() []string {
func PublisherSchemes() []string {

ctx := context.Background()
schemes := []string{}
Expand Down
6 changes: 5 additions & 1 deletion publisher/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ type RedisPublisher struct {

func init() {
ctx := context.Background()
RegisterPublisher(ctx, "redis", NewRedisPublisher)
RegisterRedisPublishers(ctx)
}

func RegisterRedisPublishers(ctx context.Context) error {
return RegisterPublisher(ctx, "redis", NewRedisPublisher)
}

func NewRedisPublisher(ctx context.Context, uri string) (Publisher, error) {
Expand Down
6 changes: 5 additions & 1 deletion publisher/stdout.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ type StdoutPublisher struct {

func init() {
ctx := context.Background()
RegisterPublisher(ctx, "stdout", NewStdoutPublisher)
RegisterStdoutPublishers(ctx)
}

func RegisterStdoutPublishers(ctx context.Context) error {
return RegisterPublisher(ctx, "stdout", NewStdoutPublisher)
}

func NewStdoutPublisher(ctx context.Context, uri string) (Publisher, error) {
Expand Down
7 changes: 7 additions & 0 deletions subscriber/gocloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package subscriber

import (
"context"

"gocloud.dev/pubsub"
)

Expand All @@ -15,7 +16,11 @@ type GoCloudSubscriber struct {
func init() {

ctx := context.Background()
RegisterGoCloudSubscribers(ctx)
}

func RegisterGoCloudSubscribers(ctx context.Context) error {

for _, scheme := range pubsub.DefaultURLMux().SubscriptionSchemes() {

err := RegisterSubscriber(ctx, scheme, NewGoCloudSubscriber)
Expand All @@ -24,6 +29,8 @@ func init() {
panic(err)
}
}

return nil
}

func NewGoCloudSubscriber(ctx context.Context, uri string) (Subscriber, error) {
Expand Down
6 changes: 5 additions & 1 deletion subscriber/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ type RedisSubscriber struct {

func init() {
ctx := context.Background()
RegisterSubscriber(ctx, "redis", NewRedisSubscriber)
RegisterRedisSubscribers(ctx)
}

func RegisterRedisSubscribers(ctx context.Context) error {
return RegisterSubscriber(ctx, "redis", NewRedisSubscriber)
}

func NewRedisSubscriber(ctx context.Context, uri string) (Subscriber, error) {
Expand Down
34 changes: 31 additions & 3 deletions subscriber/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,24 @@ package subscriber
import (
"context"
"fmt"
"github.com/aaronland/go-roster"
"net/url"
"sort"
"strings"
"sync"

"github.com/aaronland/go-roster"
)

// In principle this could also be done with a sync.OnceFunc call but that will
// require that everyone uses Go 1.21 (whose package import changes broke everything)
// which is literally days old as I write this. So maybe a few releases after 1.21.
//
// Also, _not_ using a sync.OnceFunc means we can call RegisterSchemes multiple times
// if and when multiple gomail-sender instances register themselves.

var register_mu = new(sync.RWMutex)
var register_map = map[string]bool{}

type Subscriber interface {
Listen(context.Context, chan string) error
Close() error
Expand Down Expand Up @@ -42,10 +54,26 @@ func RegisterSubscriber(ctx context.Context, scheme string, f SubscriberInitiali
return err
}

return subscribers.Register(ctx, scheme, f)
register_mu.Lock()
defer register_mu.Unlock()

_, exists := register_map[scheme]

if exists {
return nil
}

err = subscribers.Register(ctx, scheme, f)

if err != nil {
return err
}

register_map[scheme] = true
return nil
}

func Schemes() []string {
func SubscriberSchemes() []string {

ctx := context.Background()
schemes := []string{}
Expand Down
6 changes: 5 additions & 1 deletion subscriber/ticker.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ type TickerSubscriber struct {

func init() {
ctx := context.Background()
RegisterSubscriber(ctx, "ticker", NewTickerSubscriber)
RegisterTickerSubscribers(ctx)
}

func RegisterTickerSubscribers(ctx context.Context) error {
return RegisterSubscriber(ctx, "ticker", NewTickerSubscriber)
}

func NewTickerSubscriber(ctx context.Context, uri string) (Subscriber, error) {
Expand Down
1 change: 1 addition & 0 deletions vendor/github.com/aaronland/go-aws-session/README.md

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 28 additions & 3 deletions vendor/github.com/aaronland/go-aws-session/config.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 35a856f

Please sign in to comment.