Skip to content

Commit

Permalink
redis subscriber: use broker config
Browse files Browse the repository at this point in the history
  • Loading branch information
agungdwiprasetyo committed May 22, 2023
1 parent 122ef48 commit efbb3d0
Show file tree
Hide file tree
Showing 7 changed files with 239 additions and 58 deletions.
164 changes: 164 additions & 0 deletions broker/redis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
package broker

import (
"context"
"encoding/json"
"errors"

"github.com/golangid/candi/candihelper"
"github.com/golangid/candi/candishared"
"github.com/golangid/candi/codebase/factory/types"
"github.com/golangid/candi/codebase/interfaces"
"github.com/golangid/candi/logger"
"github.com/golangid/candi/tracer"
"github.com/gomodule/redigo/redis"
"github.com/google/uuid"
)

// RedisOptionFunc func type
type RedisOptionFunc func(*RedisBroker)

// RedisSetConfigCommands set config commands
func RedisSetConfigCommands(commands ...string) RedisOptionFunc {
return func(r *RedisBroker) {
r.configCommands = commands
}
}

// RedisSetSubscribeChannels set channels
func RedisSetSubscribeChannels(channels ...string) RedisOptionFunc {
return func(r *RedisBroker) {
r.subscribeChannels = channels
}
}

type RedisBroker struct {
pool *redis.Pool
configCommands []string
subscribeChannels []string
}

// NewRedisBroker setup redis for publish message
func NewRedisBroker(pool *redis.Pool, opts ...RedisOptionFunc) *RedisBroker {
r := &RedisBroker{
pool: pool,
// default config
configCommands: []string{"SET", "notify-keyspace-events", "Ex"},
subscribeChannels: []string{"__keyevent@*__:expired"},
}

for _, opt := range opts {
opt(r)
}

return r
}

// GetConfiguration method, return redis pubsub connection
func (r *RedisBroker) GetConfiguration() interface{} {
conn := r.pool.Get()

var commands []interface{}
for _, cmd := range r.configCommands {
commands = append(commands, cmd)
}
conn.Do("CONFIG", commands...)

psc := &redis.PubSubConn{Conn: conn}
commands = []interface{}{}
for _, cmd := range r.subscribeChannels {
commands = append(commands, cmd)
}
psc.PSubscribe(commands...)
return psc
}

// GetPublisher method
func (r *RedisBroker) GetPublisher() interfaces.Publisher {
return r
}

// GetName method
func (r *RedisBroker) GetName() types.Worker {
return types.RedisSubscriber
}

// Health method
func (r *RedisBroker) Health() map[string]error {
mErr := make(map[string]error)

ping := r.pool.Get()
_, err := ping.Do("PING")
ping.Close()
mErr[string(types.RedisSubscriber)] = err

return mErr
}

// Disconnect method
func (r *RedisBroker) Disconnect(ctx context.Context) error {
deferFunc := logger.LogWithDefer("redis: closing pool...")
defer deferFunc()

return r.pool.Close()
}

// PublishMessage method
func (r *RedisBroker) PublishMessage(ctx context.Context, args *candishared.PublisherArgument) (err error) {
trace := tracer.StartTrace(ctx, "redis_broker:publish_message")
defer func() {
trace.SetError(err)
trace.Finish()
}()

if err := args.Validate(); err != nil {
return err
}
if args.Expired <= 0 {
return errors.New("expired cannot empty")
}

trace.SetTag("topic", args.Topic)
if args.Key != "" {
trace.SetTag("key", args.Key)
}
trace.Log("header", args.Header)
trace.Log("expired", args.Expired.String())
trace.Log("message", args.Message)

conn := r.pool.Get()
defer conn.Close()

eventID := uuid.NewString()
trace.SetTag("event_id", eventID)
redisMessage, _ := json.Marshal(RedisMessage{
EventID: eventID, HandlerName: args.Topic, Message: string(args.Message),
})
if _, err := conn.Do("SET", string(redisMessage), "ok"); err != nil {
return err
}
_, err = conn.Do("EXPIRE", string(redisMessage), int(args.Expired.Seconds()))
return err
}

// RedisMessage messaging model for redis subscriber key
type RedisMessage struct {
HandlerName string `json:"h"`
Message string `json:"message"`
EventID string `json:"id,omitempty"`
}

// GenerateKeyDeleteRedisPubSubMessage delete redis key pubsub message pattern
func GenerateKeyDeleteRedisPubSubMessage(topic string, message interface{}) string {
b, _ := json.Marshal(RedisMessage{
HandlerName: topic, Message: string(candihelper.ToBytes(message)),
})
b[len(b)-1] = '*'
return string(b)
}

// ParseRedisPubSubKeyTopic parse key to redis message
func ParseRedisPubSubKeyTopic(key []byte) (redisMessage RedisMessage) {
json.Unmarshal(key, &redisMessage)
return redisMessage
}
22 changes: 20 additions & 2 deletions candishared/publisher_argument.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,31 @@
package candishared

import (
"errors"
"time"
)

// PublisherArgument declare publisher argument
type PublisherArgument struct {
// Topic or queue name
Topic string
Key string
Header map[string]interface{}
ContentType string
Message []byte
Expired time.Duration

// Deprecated : use Message
Data interface{}
Message []byte
Data interface{}
}

func (p *PublisherArgument) Validate() error {
if len(p.Topic) == 0 {
return errors.New("topic cannot empty")
}
if len(p.Message) == 0 {
return errors.New("message cannot empty")
}

return nil
}
11 changes: 7 additions & 4 deletions cmd/candi/template_configs.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,19 @@ func LoadServiceConfigs(baseCfg *config.Config) (deps dependency.Dependency) {
tracer.InitOpenTracing(baseCfg.ServiceName)
baseCfg.LoadFunc(func(ctx context.Context) []interfaces.Closer {
brokerDeps := broker.InitBrokers(
{{if not .KafkaHandler}}// {{ end }}broker.NewKafkaBroker(),
{{if not .RabbitMQHandler}}// {{ end }}broker.NewRabbitMQBroker(),
)
{{if not .RedisDeps}}// {{end}}redisDeps := database.InitRedis()
{{if not .SQLDeps}}// {{end}}sqlDeps := database.InitSQLDatabase()
{{if not .MongoDeps}}// {{end}}mongoDeps := database.InitMongoDB(ctx)
locker := {{if not .RedisDeps}}&candiutils.NoopLocker{}{{else}}candiutils.NewRedisLocker(redisDeps.WritePool()){{end}}` +
"{{if .ArangoDeps}}\n arangoDeps := arango.InitArangoDB(ctx, sharedEnv.DbArangoReadHost, sharedEnv.DbArangoWriteHost){{end}}" + `
` + "{{ if .IsMonorepo }}\n sdk.SetGlobalSDK(\n // init service client sdk\n )\n{{end}}" + `
brokerDeps := broker.InitBrokers(
{{if not .KafkaHandler}}// {{ end }}broker.NewKafkaBroker(),
{{if not .RabbitMQHandler}}// {{ end }}broker.NewRabbitMQBroker(),
{{if not .RedisSubsHandler}}// {{ end }}broker.NewRedisBroker(redisDeps.WritePool()),
)
// inject all service dependencies
// See all option in dependency package
deps = dependency.InitDependency(
Expand Down
2 changes: 1 addition & 1 deletion cmd/candi/template_delivery_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (h *RedisHandler) handle{{upper (camel .ModuleName)}}(eventContext *candish
trace, _ := tracer.StartTraceWithContext(eventContext.Context(), "{{upper (camel .ModuleName)}}DeliveryRedis:Handle{{upper (camel .ModuleName)}}")
defer trace.Finish()
fmt.Printf("redis subs: execute handler %s with message %s", eventContext.HandlerRoute(), eventContext.Message())
fmt.Printf("redis subs: execute handler %s with message %s\n", eventContext.HandlerRoute(), eventContext.Message())
// exec usecase
// h.uc.SomethingUsecase()
Expand Down
8 changes: 6 additions & 2 deletions cmd/candi/template_usecase.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ type {{camel .ModuleName}}UsecaseImpl struct {
{{if not .SQLDeps}}// {{end}}repoSQL repository.RepoSQL
{{if not .MongoDeps}}// {{end}}repoMongo repository.RepoMongo{{if .ArangoDeps}}
repoArango repository.RepoArango{{end}}
kafkaPub interfaces.Publisher{{if .RabbitMQHandler}}
kafkaPub interfaces.Publisher{{if .RedisSubsHandler}}
redisPub interfaces.Publisher{{ end }}{{if .RabbitMQHandler}}
rabbitmqPub interfaces.Publisher{{ end }}
}
Expand All @@ -124,7 +125,10 @@ func New{{upper (camel .ModuleName)}}Usecase(deps dependency.Dependency) ({{uppe
}
if kafkaBroker := deps.GetBroker(types.Kafka); kafkaBroker != nil {
uc.kafkaPub = kafkaBroker.GetPublisher()
}{{if .RabbitMQHandler}}
}{{if .RedisSubsHandler}}
if redisBroker := deps.GetBroker(types.RedisSubscriber); redisBroker != nil {
uc.redisPub = redisBroker.GetPublisher()
}{{ end }}{{if .RabbitMQHandler}}
if rabbitmqBroker := deps.GetBroker(types.RabbitMQ); rabbitmqBroker != nil {
uc.rabbitmqPub = rabbitmqBroker.GetPublisher()
}{{ end }}
Expand Down
2 changes: 2 additions & 0 deletions codebase/app/redis_worker/key_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"github.com/google/uuid"
)

// Deprecated, move to broker package

// RedisMessage model for redis subscriber key
type RedisMessage struct {
HandlerName string `json:"h"`
Expand Down
Loading

0 comments on commit efbb3d0

Please sign in to comment.