Skip to content

Commit

Permalink
task queue worker: improve stop job & summary
Browse files Browse the repository at this point in the history
  • Loading branch information
agungdwiprasetyo committed Oct 7, 2022
1 parent a35f2b2 commit 8a4d349
Show file tree
Hide file tree
Showing 14 changed files with 135 additions and 100 deletions.
3 changes: 0 additions & 3 deletions cmd/candi/project_generator_add_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"bytes"
"encoding/json"
"fmt"
"log"
"os"
Expand Down Expand Up @@ -187,11 +186,9 @@ func scopeAddHandler(flagParam *flagParameter, cfg serviceConfig, serverHandlers
},
}

configJSON, _ := json.Marshal(cfg)
root := FileStructure{
Skip: true, Childs: []FileStructure{
apiStructure, internalServiceStructure,
{Source: string(configJSON), FileName: "candi.json"},
},
}

Expand Down
4 changes: 2 additions & 2 deletions cmd/candi/template_etc.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,8 +448,8 @@ mocks: check
test: check
@echo "\x1b[32;1m>>> running unit test and calculate coverage for service $(service)\x1b[0m"
@if [ -f services/$(service)/coverage.txt ]; then rm services/$(service)/coverage.txt; fi;
@go test -race ./services/$(service)/... -cover -coverprofile=services/$(service)/coverage.txt -covermode=atomic \
-coverpkg=$$(go list ./services/$(service)/... | grep -v -e mocks -e codebase | tr '\n' ',')
@go test ./services/$(service)/... -cover -coverprofile=services/$(service)/coverage.txt -covermode=count \
-coverpkg=$$(go list ./services/$(service)/... | grep -v mocks | tr '\n' ',')
@go tool cover -func=services/$(service)/coverage.txt
sonar: check
Expand Down
3 changes: 3 additions & 0 deletions codebase/app/task_queue_worker/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ func (c *configurationUsecase) setConfiguration(cfg *Configuration) error {
}

taskIndex := engine.runningWorkerIndexTask[len(engine.workerChannels)-1]
if taskIndex == nil {
return errors.New("Missing task for worker")
}
if cfg.IsActive {
taskIndex.activeInterval = time.NewTicker(interval)
engine.workerChannels[len(engine.workerChannels)-1].Chan = reflect.ValueOf(taskIndex.activeInterval.C)
Expand Down
51 changes: 23 additions & 28 deletions codebase/app/task_queue_worker/graphql_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,36 +166,31 @@ func (r *rootResolver) StopAllJob(ctx context.Context, input struct {
input.TaskName, strings.Join(r.engine.tasks, ", "))
}

go func(ctx context.Context) {

r.engine.subscriber.broadcastWhenChangeAllJob(ctx, input.TaskName, true)

r.engine.stopAllJobInTask(input.TaskName)
r.engine.opt.queue.Clear(ctx, input.TaskName)

incrQuery := map[string]int64{}
affectedStatus := []string{string(statusQueueing), string(statusRetrying)}
for _, status := range affectedStatus {
countMatchedFilter, countAffected, err := r.engine.opt.persistent.UpdateJob(ctx,
&Filter{
TaskName: input.TaskName, Status: &status,
},
map[string]interface{}{
"status": statusStopped,
},
)
if err != nil {
continue
}
incrQuery[strings.ToLower(status)] -= countMatchedFilter
incrQuery[strings.ToLower(string(statusStopped))] += countAffected
r.engine.subscriber.broadcastWhenChangeAllJob(r.engine.ctx, input.TaskName, true)
r.engine.opt.queue.Clear(ctx, input.TaskName)
go r.engine.stopAllJobInTask(input.TaskName)

incrQuery := map[string]int64{}
affectedStatus := []string{string(statusQueueing), string(statusRetrying)}
for _, status := range affectedStatus {
countMatchedFilter, countAffected, err := r.engine.opt.persistent.UpdateJob(ctx,
&Filter{
TaskName: input.TaskName, Status: &status,
},
map[string]interface{}{
"status": statusStopped,
},
)
if err != nil {
continue
}
incrQuery[strings.ToLower(status)] -= countMatchedFilter
incrQuery[strings.ToLower(string(statusStopped))] += countAffected
}

r.engine.subscriber.broadcastWhenChangeAllJob(ctx, input.TaskName, false)
r.engine.opt.persistent.Summary().IncrementSummary(ctx, input.TaskName, incrQuery)
r.engine.subscriber.broadcastAllToSubscribers(r.engine.ctx)

}(r.engine.ctx)
r.engine.subscriber.broadcastWhenChangeAllJob(r.engine.ctx, input.TaskName, false)
r.engine.opt.persistent.Summary().IncrementSummary(ctx, input.TaskName, incrQuery)
r.engine.subscriber.broadcastAllToSubscribers(r.engine.ctx)

return "Success stop all job in task " + input.TaskName, nil
}
Expand Down
21 changes: 16 additions & 5 deletions codebase/app/task_queue_worker/job_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/golangid/candi/candihelper"
"github.com/golangid/candi/candiutils"
"github.com/golangid/candi/logger"
"github.com/golangid/candi/tracer"
)

Expand Down Expand Up @@ -93,8 +94,7 @@ func AddJob(ctx context.Context, req *AddJobRequest) (jobID string, err error) {
strings.ToLower(newJob.Status): 1,
})
engine.subscriber.broadcastAllToSubscribers(context.Background())
n := engine.opt.queue.PushJob(ctx, &newJob)
if n <= 1 {
if n := engine.opt.queue.PushJob(ctx, &newJob); n <= 1 {
engine.registerJobToWorker(&newJob, task.workerIndex)
engine.doRefreshWorker()
}
Expand Down Expand Up @@ -175,6 +175,7 @@ func RetryJob(ctx context.Context, jobID string) error {

job, err := engine.opt.persistent.FindJobByID(ctx, jobID, nil)
if err != nil {
logger.LogE(err.Error())
return err
}

Expand All @@ -184,15 +185,24 @@ func RetryJob(ctx context.Context, jobID string) error {
if (job.Status == string(statusFailure)) || (job.Retries >= job.MaxRetry) {
job.Retries = 0
}
matched, affected, _ := engine.opt.persistent.UpdateJob(ctx, &Filter{JobID: &job.ID}, map[string]interface{}{
matched, affected, err := engine.opt.persistent.UpdateJob(ctx, &Filter{JobID: &job.ID}, map[string]interface{}{
"status": job.Status, "interval": job.Interval, "retries": job.Retries,
})
if err != nil {
logger.LogE(err.Error())
return err
}
engine.opt.persistent.Summary().IncrementSummary(ctx, job.TaskName, map[string]int64{
statusBefore: -matched,
job.Status: affected,
})

task := engine.registeredTask[job.TaskName]
task, ok := engine.registeredTask[job.TaskName]
if !ok {
err := errors.New("Task not found")
logger.LogE(err.Error())
return err
}
engine.opt.queue.PushJob(ctx, &job)
engine.subscriber.broadcastAllToSubscribers(ctx)
engine.registerJobToWorker(&job, task.workerIndex)
Expand All @@ -218,7 +228,7 @@ func StopJob(ctx context.Context, jobID string) error {

statusBefore := job.Status
if job.Status == string(statusRetrying) {
engine.stopAllJobInTask(job.TaskName)
go engine.stopAllJobInTask(job.TaskName)
}

job.Status = string(statusStopped)
Expand All @@ -227,6 +237,7 @@ func StopJob(ctx context.Context, jobID string) error {
map[string]interface{}{"status": job.Status},
)
if err != nil {
logger.LogE(err.Error())
return err
}
engine.opt.persistent.Summary().IncrementSummary(ctx, job.TaskName, map[string]int64{
Expand Down
2 changes: 2 additions & 0 deletions codebase/app/task_queue_worker/persistent_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type (
MaxRetry int `bson:"max_retry" json:"max_retry"`
Interval string `bson:"interval" json:"interval"`
CreatedAt time.Time `bson:"created_at" json:"created_at"`
UpdatedAt time.Time `bson:"updated_at" json:"updated_at"`
FinishedAt time.Time `bson:"finished_at" json:"finished_at"`
Status string `bson:"status" json:"status"`
Error string `bson:"error" json:"error"`
Expand Down Expand Up @@ -141,6 +142,7 @@ func (job *Job) toMap() map[string]interface{} {
"retries": job.Retries,
"max_retry": job.MaxRetry,
"interval": job.Interval,
"updated_at": job.UpdatedAt,
"finished_at": job.FinishedAt,
"status": job.Status,
"error": job.Error,
Expand Down
14 changes: 10 additions & 4 deletions codebase/app/task_queue_worker/persistent_mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ func (s *MongoPersistent) AggregateAllTaskJob(ctx context.Context, filter *Filte
func (s *MongoPersistent) SaveJob(ctx context.Context, job *Job, retryHistories ...RetryHistory) (err error) {
tracer.Log(ctx, "persistent.mongo:save_job", job.ID)

job.UpdatedAt = time.Now()
if job.ID == "" {
job.ID = uuid.New().String()
job.CreatedAt = time.Now()
Expand Down Expand Up @@ -354,6 +355,7 @@ func (s *MongoPersistent) SaveJob(ctx context.Context, job *Job, retryHistories

func (s *MongoPersistent) UpdateJob(ctx context.Context, filter *Filter, updated map[string]interface{}, retryHistories ...RetryHistory) (matchedCount, affectedRow int64, err error) {

updated["updated_at"] = time.Now()
updateQuery := bson.M{
"$set": bson.M(updated),
}
Expand Down Expand Up @@ -472,7 +474,6 @@ func (s *MongoPersistent) toBsonFilter(f *Filter) bson.M {
func (s *MongoPersistent) FindAllSummary(ctx context.Context, filter *Filter) (result []TaskSummary) {

query := bson.M{}

if filter.TaskName != "" {
query["task_name"] = filter.TaskName
} else if len(filter.TaskNameList) > 0 {
Expand All @@ -481,7 +482,12 @@ func (s *MongoPersistent) FindAllSummary(ctx context.Context, filter *Filter) (r
}
}

cur, err := s.db.Collection(jobSummaryModelName).Find(s.ctx, query)
findOptions := &options.FindOptions{}
findOptions.SetSort(bson.M{
"task_name": 1,
})

cur, err := s.db.Collection(jobSummaryModelName).Find(s.ctx, query, findOptions)
if err != nil {
logger.LogE(err.Error())
return
Expand Down Expand Up @@ -566,8 +572,8 @@ func (s *MongoPersistent) UpdateSummary(ctx context.Context, taskName string, up
}
}

func (s *MongoPersistent) DeleteAllSummary(ctx context.Context) {
_, err := s.db.Collection(jobSummaryModelName).DeleteMany(ctx, bson.M{})
func (s *MongoPersistent) DeleteAllSummary(ctx context.Context, filter *Filter) {
_, err := s.db.Collection(jobSummaryModelName).DeleteMany(ctx, s.toBsonFilter(filter))
if err != nil {
logger.LogE(err.Error())
return
Expand Down
21 changes: 13 additions & 8 deletions codebase/app/task_queue_worker/persistent_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ func (s *SQLPersistent) UpdateJob(ctx context.Context, filter *Filter, updated m

s.db.QueryRow(`SELECT COUNT(*) FROM ` + jobModelName + ` ` + where).Scan(&matchedCount)
var setFields []string
updated["updated_at"] = time.Now()
for field, value := range updated {
if t, ok := value.(time.Time); ok {
value = t.Format(time.RFC3339)
Expand Down Expand Up @@ -405,7 +406,7 @@ func (s *SQLPersistent) FindAllSummary(ctx context.Context, filter *Filter) (res
}
where = " WHERE id IN (" + strings.Join(taskNameList, ",") + ")"
}
query := `SELECT id, success, queueing, retrying, failure, stopped, is_loading FROM ` + jobSummaryModelName + where
query := `SELECT id, success, queueing, retrying, failure, stopped, is_loading FROM ` + jobSummaryModelName + where + " ORDER BY id ASC"
rows, err := s.db.Query(query)
if err != nil {
return
Expand Down Expand Up @@ -434,13 +435,10 @@ func (s *SQLPersistent) FindAllSummary(ctx context.Context, filter *Filter) (res
return
}
func (s *SQLPersistent) FindDetailSummary(ctx context.Context, taskName string) (result TaskSummary) {
err := s.db.QueryRow(`SELECT id, success, queueing, retrying, failure, stopped, is_loading
s.db.QueryRow(`SELECT id, success, queueing, retrying, failure, stopped, is_loading
FROM `+jobSummaryModelName+` WHERE id='`+s.queryReplacer.Replace(taskName)+`'`).
Scan(&result.TaskName, &result.Success, &result.Queueing, &result.Retrying,
&result.Failure, &result.Stopped, &result.IsLoading)
if err != nil {
logger.LogE(err.Error())
}
result.ID = result.TaskName
return
}
Expand Down Expand Up @@ -502,8 +500,12 @@ func (s *SQLPersistent) IncrementSummary(ctx context.Context, taskName string, i
}
return
}
func (s *SQLPersistent) DeleteAllSummary(ctx context.Context) {
_, err := s.db.Exec(`DELETE FROM ` + jobSummaryModelName)
func (s *SQLPersistent) DeleteAllSummary(ctx context.Context, filter *Filter) {
var where string
if len(filter.ExcludeTaskNameList) > 0 {
where = "WHERE id NOT IN " + s.toMultiParamQuery(filter.ExcludeTaskNameList)
}
_, err := s.db.Exec(`DELETE FROM ` + jobSummaryModelName + ` ` + where)
if err != nil {
logger.LogE(err.Error())
return
Expand All @@ -515,7 +517,10 @@ func (s *SQLPersistent) Type() string {
if s.versionFunc == "sqlite_version()" {
version = "SQLite " + version
}
return "SQL Persistent, version: " + version
if version != "" {
version = ", version: " + version
}
return "SQL Persistent" + version
}

func (s *SQLPersistent) toQueryFilter(f *Filter) (where string, err error) {
Expand Down
48 changes: 15 additions & 33 deletions codebase/app/task_queue_worker/subscribers.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,25 +151,16 @@ func (s *subscriber) broadcastAllToSubscribers(ctx context.Context) {
func (s *subscriber) broadcastTaskList(ctx context.Context) {

var taskRes TaskListResolver
taskRes.Data = make([]TaskResolver, len(engine.tasks))
mapper := make(map[string]int, len(engine.tasks))
for i, task := range engine.tasks {
taskRes.Data[i].Name = task
taskRes.Data[i].ModuleName = engine.registeredTask[task].moduleName
mapper[task] = i
}

taskRes.Data = make([]TaskResolver, 0)
for _, summary := range s.opt.persistent.Summary().FindAllSummary(ctx, &Filter{}) {
if idx, ok := mapper[summary.TaskName]; ok {
res := TaskResolver{
Name: summary.TaskName,
ModuleName: engine.registeredTask[summary.TaskName].moduleName,
TotalJobs: summary.CountTotalJob(),
}
res.Detail = summary.ToSummaryDetail()
res.IsLoading = summary.IsLoading
taskRes.Data[idx] = res
res := TaskResolver{
Name: summary.TaskName,
ModuleName: engine.registeredTask[summary.TaskName].moduleName,
TotalJobs: summary.CountTotalJob(),
}
res.Detail = summary.ToSummaryDetail()
res.IsLoading = summary.IsLoading
taskRes.Data = append(taskRes.Data, res)
}

sort.Slice(taskRes.Data, func(i, j int) bool {
Expand Down Expand Up @@ -245,24 +236,15 @@ func (s *subscriber) broadcastWhenChangeAllJob(ctx context.Context, taskName str
})

var taskRes TaskListResolver
taskRes.Data = make([]TaskResolver, len(engine.tasks))
mapper := make(map[string]int, len(engine.tasks))
for i, task := range engine.tasks {
taskRes.Data[i].Name = task
taskRes.Data[i].ModuleName = engine.registeredTask[task].moduleName
mapper[task] = i
}

taskRes.Data = make([]TaskResolver, 0)
for _, summary := range s.opt.persistent.Summary().FindAllSummary(ctx, &Filter{}) {
if idx, ok := mapper[summary.TaskName]; ok {
res := TaskResolver{
Name: summary.TaskName, ModuleName: engine.registeredTask[summary.TaskName].moduleName,
TotalJobs: summary.CountTotalJob(),
}
res.Detail = summary.ToSummaryDetail()
res.IsLoading = summary.IsLoading
taskRes.Data[idx] = res
res := TaskResolver{
Name: summary.TaskName, ModuleName: engine.registeredTask[summary.TaskName].moduleName,
TotalJobs: summary.CountTotalJob(),
}
res.Detail = summary.ToSummaryDetail()
res.IsLoading = summary.IsLoading
taskRes.Data = append(taskRes.Data, res)
}

sort.Slice(taskRes.Data, func(i, j int) bool {
Expand Down
4 changes: 2 additions & 2 deletions codebase/app/task_queue_worker/summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type (
FindDetailSummary(ctx context.Context, taskName string) (result TaskSummary)
UpdateSummary(ctx context.Context, taskName string, updated map[string]interface{})
IncrementSummary(ctx context.Context, taskName string, incr map[string]int64)
DeleteAllSummary(ctx context.Context)
DeleteAllSummary(ctx context.Context, filter *Filter)
}
)

Expand Down Expand Up @@ -127,4 +127,4 @@ func (i *inMemSummary) IncrementSummary(ctx context.Context, taskName string, in
i.values[taskName] = summary
return
}
func (i *inMemSummary) DeleteAllSummary(ctx context.Context) {}
func (i *inMemSummary) DeleteAllSummary(ctx context.Context, filter *Filter) {}
Loading

0 comments on commit 8a4d349

Please sign in to comment.