From 8b1a05b7b0fcc459b82a5671908f44e4bfd53299 Mon Sep 17 00:00:00 2001 From: Alexander Belanger Date: Wed, 18 Dec 2024 10:05:13 -0500 Subject: [PATCH 1/9] feat: scheduling extensions --- pkg/scheduling/v2/extension.go | 70 +++++++++++++++++++++++++++++ pkg/scheduling/v2/pool.go | 11 ++++- pkg/scheduling/v2/scheduler.go | 47 ++++++++++++++++++- pkg/scheduling/v2/tenant_manager.go | 4 +- 4 files changed, 127 insertions(+), 5 deletions(-) create mode 100644 pkg/scheduling/v2/extension.go diff --git a/pkg/scheduling/v2/extension.go b/pkg/scheduling/v2/extension.go new file mode 100644 index 000000000..8fc0c3261 --- /dev/null +++ b/pkg/scheduling/v2/extension.go @@ -0,0 +1,70 @@ +package v2 + +import ( + "sync" + + "golang.org/x/sync/errgroup" + + "github.com/hatchet-dev/hatchet/pkg/repository/prisma/dbsqlc" +) + +type PostScheduleInput struct { + Workers map[string]*WorkerCp + + ActionsToSlots map[string][]*SlotCp +} + +type WorkerCp struct { + WorkerId string + Labels []*dbsqlc.ListManyWorkerLabelsRow +} + +type SlotCp struct { + WorkerId string + Used bool +} + +type SchedulerExtension interface { + PostSchedule(tenantId string, input *PostScheduleInput) + Cleanup() error +} + +type Extensions struct { + mu sync.RWMutex + exts []SchedulerExtension +} + +func (e *Extensions) Add(ext SchedulerExtension) { + e.mu.Lock() + defer e.mu.Unlock() + + if e.exts == nil { + e.exts = make([]SchedulerExtension, 0) + } + + e.exts = append(e.exts, ext) +} + +func (e *Extensions) PostSchedule(tenantId string, input *PostScheduleInput) { + e.mu.RLock() + defer e.mu.RUnlock() + + for _, ext := range e.exts { + f := ext.PostSchedule + go f(tenantId, input) + } +} + +func (e *Extensions) Cleanup() error { + e.mu.RLock() + defer e.mu.RUnlock() + + eg := errgroup.Group{} + + for _, ext := range e.exts { + f := ext.Cleanup + eg.Go(f) + } + + return eg.Wait() +} diff --git a/pkg/scheduling/v2/pool.go b/pkg/scheduling/v2/pool.go index 93def2b74..2a52f4bf4 100644 --- a/pkg/scheduling/v2/pool.go +++ b/pkg/scheduling/v2/pool.go @@ -21,6 +21,8 @@ type sharedConfig struct { // SchedulingPool is responsible for managing a pool of tenantManagers. type SchedulingPool struct { + Extensions *Extensions + tenants sync.Map setMu mutex @@ -33,6 +35,7 @@ func NewSchedulingPool(repo repository.SchedulerRepository, l *zerolog.Logger, s resultsCh := make(chan *QueueResults, 1000) s := &SchedulingPool{ + Extensions: &Extensions{}, cf: &sharedConfig{ repo: repo, l: l, @@ -62,6 +65,12 @@ func (p *SchedulingPool) cleanup() { }) p.cleanupTenants(toCleanup) + + err := p.Extensions.Cleanup() + + if err != nil { + p.cf.l.Error().Err(err).Msg("failed to cleanup extensions") + } } func (p *SchedulingPool) SetTenants(tenants []*dbsqlc.Tenant) { @@ -148,7 +157,7 @@ func (p *SchedulingPool) getTenantManager(tenantId string, storeIfNotFound bool) if !ok { if storeIfNotFound { - tm = newTenantManager(p.cf, tenantId, p.resultsCh) + tm = newTenantManager(p.cf, tenantId, p.resultsCh, p.Extensions) p.tenants.Store(tenantId, tm) } else { return nil diff --git a/pkg/scheduling/v2/scheduler.go b/pkg/scheduling/v2/scheduler.go index 37486f4bb..72dc9ec02 100644 --- a/pkg/scheduling/v2/scheduler.go +++ b/pkg/scheduling/v2/scheduler.go @@ -39,10 +39,11 @@ type Scheduler struct { unackedSlots map[int]*slot unackedMu mutex - rl *rateLimiter + rl *rateLimiter + exts *Extensions } -func newScheduler(cf *sharedConfig, tenantId pgtype.UUID, rl *rateLimiter) *Scheduler { +func newScheduler(cf *sharedConfig, tenantId pgtype.UUID, rl *rateLimiter, exts *Extensions) *Scheduler { l := cf.l.With().Str("tenant_id", sqlchelpers.UUIDToStr(tenantId)).Logger() return &Scheduler{ @@ -57,6 +58,7 @@ func newScheduler(cf *sharedConfig, tenantId pgtype.UUID, rl *rateLimiter) *Sche workersMu: newMu(cf.l), assignedCountMu: newMu(cf.l), unackedMu: newMu(cf.l), + exts: exts, } } @@ -752,6 +754,10 @@ func (s *Scheduler) tryAssign( span.End() close(resultsCh) + extInput := s.getExtensionInput() + + s.exts.PostSchedule(sqlchelpers.UUIDToStr(s.tenantId), extInput) + if sinceStart := time.Since(startTotal); sinceStart > 100*time.Millisecond { s.l.Warn().Dur("duration", sinceStart).Msgf("assigning queue items took longer than 100ms") } @@ -760,6 +766,43 @@ func (s *Scheduler) tryAssign( return resultsCh } +func (s *Scheduler) getExtensionInput() *PostScheduleInput { + workers := s.getWorkers() + + res := &PostScheduleInput{ + Workers: make(map[string]*WorkerCp), + } + + for workerId, worker := range workers { + res.Workers[workerId] = &WorkerCp{ + WorkerId: workerId, + Labels: worker.Labels, + } + } + + s.actionsMu.RLock() + defer s.actionsMu.RUnlock() + + actionsToSlots := make(map[string][]*SlotCp) + + for actionId, action := range s.actions { + slots := make([]*SlotCp, 0, len(action.slots)) + + for _, slot := range action.slots { + slots = append(slots, &SlotCp{ + WorkerId: slot.getWorkerId(), + Used: slot.used, + }) + } + + actionsToSlots[actionId] = slots + } + + res.ActionsToSlots = actionsToSlots + + return res +} + func isTimedOut(qi *dbsqlc.QueueItem) bool { // if the current time is after the scheduleTimeoutAt, then mark this as timed out now := time.Now().UTC().UTC() diff --git a/pkg/scheduling/v2/tenant_manager.go b/pkg/scheduling/v2/tenant_manager.go index c404902f7..4d32d8bb5 100644 --- a/pkg/scheduling/v2/tenant_manager.go +++ b/pkg/scheduling/v2/tenant_manager.go @@ -34,11 +34,11 @@ type tenantManager struct { cleanup func() } -func newTenantManager(cf *sharedConfig, tenantId string, resultsCh chan *QueueResults) *tenantManager { +func newTenantManager(cf *sharedConfig, tenantId string, resultsCh chan *QueueResults, exts *Extensions) *tenantManager { tenantIdUUID := sqlchelpers.UUIDFromStr(tenantId) rl := newRateLimiter(cf, tenantIdUUID) - s := newScheduler(cf, tenantIdUUID, rl) + s := newScheduler(cf, tenantIdUUID, rl, exts) leaseManager, workersCh, queuesCh := newLeaseManager(cf, tenantIdUUID) t := &tenantManager{ From b29c45de4c54c1ca421861d7d60ff6d3103a7883 Mon Sep 17 00:00:00 2001 From: Alexander Belanger Date: Thu, 19 Dec 2024 11:29:38 -0500 Subject: [PATCH 2/9] add maxRuns to worker objects --- pkg/repository/prisma/scheduler_lease.go | 5 +++-- pkg/repository/scheduler.go | 5 +++-- pkg/scheduling/v2/extension.go | 1 + pkg/scheduling/v2/scheduler.go | 1 + 4 files changed, 8 insertions(+), 4 deletions(-) diff --git a/pkg/repository/prisma/scheduler_lease.go b/pkg/repository/prisma/scheduler_lease.go index 13c5183f7..a7ab0267e 100644 --- a/pkg/repository/prisma/scheduler_lease.go +++ b/pkg/repository/prisma/scheduler_lease.go @@ -146,8 +146,9 @@ func (d *leaseRepository) ListActiveWorkers(ctx context.Context, tenantId pgtype for _, worker := range activeWorkers { wId := sqlchelpers.UUIDToStr(worker.ID) res = append(res, &repository.ListActiveWorkersResult{ - ID: worker.ID, - Labels: workerIdsToLabels[wId], + ID: worker.ID, + MaxRuns: int(worker.MaxRuns), + Labels: workerIdsToLabels[wId], }) } diff --git a/pkg/repository/scheduler.go b/pkg/repository/scheduler.go index 5202dfe63..dd2a6485f 100644 --- a/pkg/repository/scheduler.go +++ b/pkg/repository/scheduler.go @@ -16,8 +16,9 @@ type SchedulerRepository interface { } type ListActiveWorkersResult struct { - ID pgtype.UUID - Labels []*dbsqlc.ListManyWorkerLabelsRow + ID pgtype.UUID + MaxRuns int + Labels []*dbsqlc.ListManyWorkerLabelsRow } type LeaseRepository interface { diff --git a/pkg/scheduling/v2/extension.go b/pkg/scheduling/v2/extension.go index 8fc0c3261..b36ff47d4 100644 --- a/pkg/scheduling/v2/extension.go +++ b/pkg/scheduling/v2/extension.go @@ -16,6 +16,7 @@ type PostScheduleInput struct { type WorkerCp struct { WorkerId string + MaxRuns int Labels []*dbsqlc.ListManyWorkerLabelsRow } diff --git a/pkg/scheduling/v2/scheduler.go b/pkg/scheduling/v2/scheduler.go index 72dc9ec02..6daee8811 100644 --- a/pkg/scheduling/v2/scheduler.go +++ b/pkg/scheduling/v2/scheduler.go @@ -776,6 +776,7 @@ func (s *Scheduler) getExtensionInput() *PostScheduleInput { for workerId, worker := range workers { res.Workers[workerId] = &WorkerCp{ WorkerId: workerId, + MaxRuns: worker.MaxRuns, Labels: worker.Labels, } } From 4dfdf195f10e0196ba9b5a96a11e005e829cdcb1 Mon Sep 17 00:00:00 2001 From: Alexander Belanger Date: Thu, 19 Dec 2024 12:45:36 -0500 Subject: [PATCH 3/9] don't double count slots --- pkg/scheduling/v2/extension.go | 2 +- pkg/scheduling/v2/scheduler.go | 21 ++++++++++++--------- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/pkg/scheduling/v2/extension.go b/pkg/scheduling/v2/extension.go index b36ff47d4..b3a4056a2 100644 --- a/pkg/scheduling/v2/extension.go +++ b/pkg/scheduling/v2/extension.go @@ -11,7 +11,7 @@ import ( type PostScheduleInput struct { Workers map[string]*WorkerCp - ActionsToSlots map[string][]*SlotCp + Slots []*SlotCp } type WorkerCp struct { diff --git a/pkg/scheduling/v2/scheduler.go b/pkg/scheduling/v2/scheduler.go index 6daee8811..b6222dc5d 100644 --- a/pkg/scheduling/v2/scheduler.go +++ b/pkg/scheduling/v2/scheduler.go @@ -771,6 +771,7 @@ func (s *Scheduler) getExtensionInput() *PostScheduleInput { res := &PostScheduleInput{ Workers: make(map[string]*WorkerCp), + Slots: make([]*SlotCp, 0), } for workerId, worker := range workers { @@ -784,22 +785,24 @@ func (s *Scheduler) getExtensionInput() *PostScheduleInput { s.actionsMu.RLock() defer s.actionsMu.RUnlock() - actionsToSlots := make(map[string][]*SlotCp) - - for actionId, action := range s.actions { - slots := make([]*SlotCp, 0, len(action.slots)) + uniqueSlots := make(map[*slot]*SlotCp) + for _, action := range s.actions { for _, slot := range action.slots { - slots = append(slots, &SlotCp{ + if _, ok := uniqueSlots[slot]; ok { + continue + } + + uniqueSlots[slot] = &SlotCp{ WorkerId: slot.getWorkerId(), Used: slot.used, - }) + } } - - actionsToSlots[actionId] = slots } - res.ActionsToSlots = actionsToSlots + for _, slot := range uniqueSlots { + res.Slots = append(res.Slots, slot) + } return res } From 2d079edb508e77624a21284df531f5b6d9912a69 Mon Sep 17 00:00:00 2001 From: Alexander Belanger Date: Fri, 20 Dec 2024 10:53:29 -0500 Subject: [PATCH 4/9] add unassigned and actions to slots to unassigned qi --- pkg/scheduling/v2/extension.go | 4 ++++ pkg/scheduling/v2/scheduler.go | 31 ++++++++++++++++++++++++++----- 2 files changed, 30 insertions(+), 5 deletions(-) diff --git a/pkg/scheduling/v2/extension.go b/pkg/scheduling/v2/extension.go index b3a4056a2..797559179 100644 --- a/pkg/scheduling/v2/extension.go +++ b/pkg/scheduling/v2/extension.go @@ -12,6 +12,10 @@ type PostScheduleInput struct { Workers map[string]*WorkerCp Slots []*SlotCp + + Unassigned []*dbsqlc.QueueItem + + ActionsToSlots map[string][]*SlotCp } type WorkerCp struct { diff --git a/pkg/scheduling/v2/scheduler.go b/pkg/scheduling/v2/scheduler.go index b6222dc5d..6725393a2 100644 --- a/pkg/scheduling/v2/scheduler.go +++ b/pkg/scheduling/v2/scheduler.go @@ -667,6 +667,9 @@ func (s *Scheduler) tryAssign( wg := sync.WaitGroup{} startTotal := time.Now() + extensionResultsCh := make(chan *assignResults, len(actionIdToQueueItems)) + defer close(extensionResultsCh) + // process each action id in parallel for actionId, qis := range actionIdToQueueItems { wg.Add(1) @@ -735,12 +738,16 @@ func (s *Scheduler) tryAssign( s.l.Warn().Dur("duration", sinceStart).Msgf("processing batch of %d queue items took longer than 100ms", len(batchQis)) } - resultsCh <- &assignResults{ + r := &assignResults{ assigned: batchAssigned, rateLimited: batchRateLimited, unassigned: batchUnassigned, } + extensionResultsCh <- r + + resultsCh <- r + return nil }) @@ -754,7 +761,7 @@ func (s *Scheduler) tryAssign( span.End() close(resultsCh) - extInput := s.getExtensionInput() + extInput := s.getExtensionInput(extensionResultsCh) s.exts.PostSchedule(sqlchelpers.UUIDToStr(s.tenantId), extInput) @@ -766,12 +773,19 @@ func (s *Scheduler) tryAssign( return resultsCh } -func (s *Scheduler) getExtensionInput() *PostScheduleInput { +func (s *Scheduler) getExtensionInput(ch <-chan *assignResults) *PostScheduleInput { + unassigned := make([]*dbsqlc.QueueItem, 0) + + for res := range ch { + unassigned = append(unassigned, res.unassigned...) + } + workers := s.getWorkers() res := &PostScheduleInput{ - Workers: make(map[string]*WorkerCp), - Slots: make([]*SlotCp, 0), + Workers: make(map[string]*WorkerCp), + Slots: make([]*SlotCp, 0), + Unassigned: unassigned, } for workerId, worker := range workers { @@ -786,8 +800,11 @@ func (s *Scheduler) getExtensionInput() *PostScheduleInput { defer s.actionsMu.RUnlock() uniqueSlots := make(map[*slot]*SlotCp) + actionsToSlots := make(map[string][]*SlotCp) for _, action := range s.actions { + actionsToSlots[action.actionId] = make([]*SlotCp, 0, len(action.slots)) + for _, slot := range action.slots { if _, ok := uniqueSlots[slot]; ok { continue @@ -797,6 +814,8 @@ func (s *Scheduler) getExtensionInput() *PostScheduleInput { WorkerId: slot.getWorkerId(), Used: slot.used, } + + actionsToSlots[action.actionId] = append(actionsToSlots[action.actionId], uniqueSlots[slot]) } } @@ -804,6 +823,8 @@ func (s *Scheduler) getExtensionInput() *PostScheduleInput { res.Slots = append(res.Slots, slot) } + res.ActionsToSlots = actionsToSlots + return res } From 712319e89799b7c8096e5153f3b07980b556d4b4 Mon Sep 17 00:00:00 2001 From: Alexander Belanger Date: Fri, 20 Dec 2024 11:41:15 -0500 Subject: [PATCH 5/9] fix: proper use of extensions channel --- pkg/scheduling/v2/scheduler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/scheduling/v2/scheduler.go b/pkg/scheduling/v2/scheduler.go index 6725393a2..561cd4bd4 100644 --- a/pkg/scheduling/v2/scheduler.go +++ b/pkg/scheduling/v2/scheduler.go @@ -668,7 +668,6 @@ func (s *Scheduler) tryAssign( startTotal := time.Now() extensionResultsCh := make(chan *assignResults, len(actionIdToQueueItems)) - defer close(extensionResultsCh) // process each action id in parallel for actionId, qis := range actionIdToQueueItems { @@ -760,6 +759,7 @@ func (s *Scheduler) tryAssign( wg.Wait() span.End() close(resultsCh) + close(extensionResultsCh) extInput := s.getExtensionInput(extensionResultsCh) From bf9b4820129c6a1d137af163a588a51a24f94707 Mon Sep 17 00:00:00 2001 From: Alexander Belanger Date: Fri, 20 Dec 2024 12:00:55 -0500 Subject: [PATCH 6/9] convert ext results ch to slice --- pkg/scheduling/v2/scheduler.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/pkg/scheduling/v2/scheduler.go b/pkg/scheduling/v2/scheduler.go index 561cd4bd4..9638d7ab0 100644 --- a/pkg/scheduling/v2/scheduler.go +++ b/pkg/scheduling/v2/scheduler.go @@ -667,7 +667,8 @@ func (s *Scheduler) tryAssign( wg := sync.WaitGroup{} startTotal := time.Now() - extensionResultsCh := make(chan *assignResults, len(actionIdToQueueItems)) + extensionResults := make([]*assignResults, 0) + extensionResultsMu := sync.Mutex{} // process each action id in parallel for actionId, qis := range actionIdToQueueItems { @@ -743,7 +744,9 @@ func (s *Scheduler) tryAssign( unassigned: batchUnassigned, } - extensionResultsCh <- r + extensionResultsMu.Lock() + extensionResults = append(extensionResults, r) + extensionResultsMu.Unlock() resultsCh <- r @@ -759,9 +762,8 @@ func (s *Scheduler) tryAssign( wg.Wait() span.End() close(resultsCh) - close(extensionResultsCh) - extInput := s.getExtensionInput(extensionResultsCh) + extInput := s.getExtensionInput(extensionResults) s.exts.PostSchedule(sqlchelpers.UUIDToStr(s.tenantId), extInput) @@ -773,10 +775,10 @@ func (s *Scheduler) tryAssign( return resultsCh } -func (s *Scheduler) getExtensionInput(ch <-chan *assignResults) *PostScheduleInput { +func (s *Scheduler) getExtensionInput(results []*assignResults) *PostScheduleInput { unassigned := make([]*dbsqlc.QueueItem, 0) - for res := range ch { + for _, res := range results { unassigned = append(unassigned, res.unassigned...) } From fdf961e51427a551f5ebef6e4c9789a22d66abc3 Mon Sep 17 00:00:00 2001 From: Alexander Belanger Date: Fri, 20 Dec 2024 12:44:16 -0500 Subject: [PATCH 7/9] fix: race conditions with read locks --- pkg/scheduling/v2/scheduler.go | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/pkg/scheduling/v2/scheduler.go b/pkg/scheduling/v2/scheduler.go index 9638d7ab0..7401d5526 100644 --- a/pkg/scheduling/v2/scheduler.go +++ b/pkg/scheduling/v2/scheduler.go @@ -798,13 +798,27 @@ func (s *Scheduler) getExtensionInput(results []*assignResults) *PostScheduleInp } } + // NOTE: these locks are important because we must acquire locks in the same order as the replenish and tryAssignBatch + // functions. we always acquire actionsMu first and then the specific action's lock. + actionKeys := make([]string, 0, len(s.actions)) + s.actionsMu.RLock() - defer s.actionsMu.RUnlock() + + for actionId := range s.actions { + actionKeys = append(actionKeys, actionId) + } + + s.actionsMu.RUnlock() uniqueSlots := make(map[*slot]*SlotCp) actionsToSlots := make(map[string][]*SlotCp) - for _, action := range s.actions { + for _, actionId := range actionKeys { + s.actionsMu.RLock() + action := s.actions[actionId] + s.actionsMu.RUnlock() + + action.mu.RLock() actionsToSlots[action.actionId] = make([]*SlotCp, 0, len(action.slots)) for _, slot := range action.slots { @@ -819,6 +833,7 @@ func (s *Scheduler) getExtensionInput(results []*assignResults) *PostScheduleInp actionsToSlots[action.actionId] = append(actionsToSlots[action.actionId], uniqueSlots[slot]) } + action.mu.RUnlock() } for _, slot := range uniqueSlots { From e8265baaffd7e97f549d21e7dc301d6b3aecf39f Mon Sep 17 00:00:00 2001 From: Alexander Belanger Date: Fri, 20 Dec 2024 15:39:01 -0500 Subject: [PATCH 8/9] add ability to set tenants on the extension --- pkg/scheduling/v2/extension.go | 11 +++++++++++ pkg/scheduling/v2/pool.go | 2 ++ 2 files changed, 13 insertions(+) diff --git a/pkg/scheduling/v2/extension.go b/pkg/scheduling/v2/extension.go index 797559179..9bb087eea 100644 --- a/pkg/scheduling/v2/extension.go +++ b/pkg/scheduling/v2/extension.go @@ -30,6 +30,7 @@ type SlotCp struct { } type SchedulerExtension interface { + SetTenants(tenants []*dbsqlc.Tenant) PostSchedule(tenantId string, input *PostScheduleInput) Cleanup() error } @@ -73,3 +74,13 @@ func (e *Extensions) Cleanup() error { return eg.Wait() } + +func (e *Extensions) SetTenants(tenants []*dbsqlc.Tenant) { + e.mu.RLock() + defer e.mu.RUnlock() + + for _, ext := range e.exts { + f := ext.SetTenants + go f(tenants) + } +} diff --git a/pkg/scheduling/v2/pool.go b/pkg/scheduling/v2/pool.go index 2a52f4bf4..75202f0dd 100644 --- a/pkg/scheduling/v2/pool.go +++ b/pkg/scheduling/v2/pool.go @@ -112,6 +112,8 @@ func (p *SchedulingPool) SetTenants(tenants []*dbsqlc.Tenant) { // any cleaned up tenants in the map p.cleanupTenants(toCleanup) }() + + go p.Extensions.SetTenants(tenants) } func (p *SchedulingPool) cleanupTenants(toCleanup []*tenantManager) { From 8c55ff5f73b811bff9922c9f27f595c9c916f23d Mon Sep 17 00:00:00 2001 From: Alexander Belanger Date: Fri, 20 Dec 2024 16:04:05 -0500 Subject: [PATCH 9/9] fix: panic on scheduler --- pkg/scheduling/v2/scheduler.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/pkg/scheduling/v2/scheduler.go b/pkg/scheduling/v2/scheduler.go index 7401d5526..325c2f885 100644 --- a/pkg/scheduling/v2/scheduler.go +++ b/pkg/scheduling/v2/scheduler.go @@ -815,9 +815,13 @@ func (s *Scheduler) getExtensionInput(results []*assignResults) *PostScheduleInp for _, actionId := range actionKeys { s.actionsMu.RLock() - action := s.actions[actionId] + action, ok := s.actions[actionId] s.actionsMu.RUnlock() + if !ok || action == nil { + continue + } + action.mu.RLock() actionsToSlots[action.actionId] = make([]*SlotCp, 0, len(action.slots))