Skip to content

Commit

Permalink
change exp to delay in redis broker
Browse files Browse the repository at this point in the history
and add error tag when failed save job
  • Loading branch information
agungdwiprasetyo committed May 25, 2023
1 parent af2f529 commit b0a15fd
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 7 deletions.
8 changes: 4 additions & 4 deletions broker/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,16 @@ func (r *RedisBroker) PublishMessage(ctx context.Context, args *candishared.Publ
if err := args.Validate(); err != nil {
return err
}
if args.Expired <= 0 {
return errors.New("expired cannot empty")
if args.Delay <= 0 {
return errors.New("delay cannot empty")
}

trace.SetTag("topic", args.Topic)
if args.Key != "" {
trace.SetTag("key", args.Key)
}
trace.Log("header", args.Header)
trace.Log("expired", args.Expired.String())
trace.Log("delay", args.Delay.String())
trace.Log("message", args.Message)

conn := r.pool.Get()
Expand All @@ -137,7 +137,7 @@ func (r *RedisBroker) PublishMessage(ctx context.Context, args *candishared.Publ
if _, err := conn.Do("SET", string(redisMessage), "ok"); err != nil {
return err
}
_, err = conn.Do("EXPIRE", string(redisMessage), int(args.Expired.Seconds()))
_, err = conn.Do("EXPIRE", string(redisMessage), int(args.Delay.Seconds()))
return err
}

Expand Down
2 changes: 1 addition & 1 deletion candishared/publisher_argument.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type PublisherArgument struct {
Header map[string]interface{}
ContentType string
Message []byte
Expired time.Duration
Delay time.Duration

// Deprecated : use Message
Data interface{}
Expand Down
4 changes: 2 additions & 2 deletions cmd/candi/template_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ MONGODB_HOST_READ=mongodb://user:pass@localhost:27017/{{.ServiceName}}?authSourc
SQL_DB_READ_DSN={{ if .SQLDeps }}{{.SQLDriver}}://` +
"{{if eq .SQLDriver \"postgres\"}}user:pass@localhost:5432/db_name?sslmode=disable&TimeZone=Asia/Jakarta{{else if eq .SQLDriver \"mysql\"}}" +
"root:pass@tcp(127.0.0.1:3306)/db_name{{else if eq .SQLDriver \"sqlite3\"}}database.db{{end}}" +
"root:pass@tcp(127.0.0.1:3306)/db_name?charset=utf8&parseTime=True{{else if eq .SQLDriver \"sqlite3\"}}database.db{{end}}" +
`{{ end }}
SQL_DB_WRITE_DSN={{ if .SQLDeps }}{{.SQLDriver}}://` +
"{{if eq .SQLDriver \"postgres\"}}user:pass@localhost:5432/db_name?sslmode=disable&TimeZone=Asia/Jakarta{{else if eq .SQLDriver \"mysql\"}}" +
"root:pass@tcp(127.0.0.1:3306)/db_name{{else if eq .SQLDriver \"sqlite3\"}}database.db{{end}}" +
"root:pass@tcp(127.0.0.1:3306)/db_name?charset=utf8&parseTime=True{{else if eq .SQLDriver \"sqlite3\"}}database.db{{end}}" +
`{{ end }}
{{if .ArangoDeps}}
ARANGODB_HOST_WRITE=http://user:pass@localhost:8529/{{.ServiceName}}
Expand Down
1 change: 1 addition & 0 deletions codebase/app/task_queue_worker/job_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func AddJob(ctx context.Context, req *AddJobRequest) (jobID string, err error) {
newJob.direct = req.direct

if err := engine.opt.persistent.SaveJob(ctx, &newJob); err != nil {
trace.SetError(err)
logger.LogE(fmt.Sprintf("Cannot save job, error: %s", err.Error()))
newJob.ID = ""
err = engine.opt.secondaryPersistent.SaveJob(ctx, &newJob)
Expand Down

0 comments on commit b0a15fd

Please sign in to comment.