Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/pilot/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ Examples:
// Create dispatcher if store available
if gwStore != nil {
gwDispatcher = executor.NewDispatcher(gwStore, gwRunner, nil)
if dispErr := gwDispatcher.Start(); dispErr != nil {
if dispErr := gwDispatcher.Start(context.Background()); dispErr != nil {
logging.WithComponent("start").Warn("Failed to start dispatcher for gateway polling", slog.Any("error", dispErr))
gwDispatcher = nil
}
Expand Down Expand Up @@ -1793,7 +1793,7 @@ func runPollingMode(cfg *config.Config, projectPath string, replace, dashboardMo
var dispatcher *executor.Dispatcher
if store != nil {
dispatcher = executor.NewDispatcher(store, runner, nil)
if err := dispatcher.Start(); err != nil {
if err := dispatcher.Start(ctx); err != nil {
logging.WithComponent("start").Warn("Failed to start dispatcher", slog.Any("error", err))
dispatcher = nil
} else {
Expand Down
135 changes: 112 additions & 23 deletions internal/executor/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,61 @@ import (

// DispatcherConfig configures the task dispatcher behavior.
type DispatcherConfig struct {
// StaleTaskDuration is how long a "running" task can be stale before reset.
// Used on startup to detect crashed workers.
// StaleTaskDuration is a backwards-compat alias for StaleRunningThreshold.
// Deprecated: use StaleRunningThreshold instead.
StaleTaskDuration time.Duration

// StaleRunningThreshold is how long a "running" task can remain before
// it is considered orphaned and marked failed. Default: 30 minutes.
StaleRunningThreshold time.Duration

// StaleQueuedThreshold is how long a "queued" task can remain without
// being picked up before it is considered stuck and marked failed.
// Default: 5 minutes.
StaleQueuedThreshold time.Duration

// StaleRecoveryInterval is how often the periodic stale-recovery loop
// runs. Default: 5 minutes.
StaleRecoveryInterval time.Duration
}

// DefaultDispatcherConfig returns default dispatcher settings.
func DefaultDispatcherConfig() *DispatcherConfig {
return &DispatcherConfig{
StaleTaskDuration: 30 * time.Minute,
StaleRunningThreshold: 30 * time.Minute,
StaleQueuedThreshold: 5 * time.Minute,
StaleRecoveryInterval: 5 * time.Minute,
}
}

// effectiveRunningThreshold returns StaleRunningThreshold, falling back
// to StaleTaskDuration for backwards compatibility.
func (c *DispatcherConfig) effectiveRunningThreshold() time.Duration {
if c.StaleRunningThreshold > 0 {
return c.StaleRunningThreshold
}
if c.StaleTaskDuration > 0 {
return c.StaleTaskDuration
}
return 30 * time.Minute
}

// effectiveQueuedThreshold returns StaleQueuedThreshold with a default.
func (c *DispatcherConfig) effectiveQueuedThreshold() time.Duration {
if c.StaleQueuedThreshold > 0 {
return c.StaleQueuedThreshold
}
return 5 * time.Minute
}

// effectiveRecoveryInterval returns StaleRecoveryInterval with a default.
func (c *DispatcherConfig) effectiveRecoveryInterval() time.Duration {
if c.StaleRecoveryInterval > 0 {
return c.StaleRecoveryInterval
}
return 5 * time.Minute
}

// Dispatcher manages task queuing and per-project workers.
// It ensures that tasks for the same project are executed serially
// while allowing parallel execution across different projects.
Expand Down Expand Up @@ -72,18 +115,46 @@ func (d *Dispatcher) SetDecomposer(decomposer *TaskDecomposer) {
d.decomposer = decomposer
}

// Start initializes the dispatcher and recovers from any stale tasks.
func (d *Dispatcher) Start() error {
// Start initializes the dispatcher, recovers stale tasks, and launches the
// periodic stale-recovery loop. The provided context controls the loop lifetime.
func (d *Dispatcher) Start(ctx context.Context) error {
d.log.Info("Starting dispatcher")

// Recover stale running tasks (from crashed workers)
if err := d.recoverStaleTasks(); err != nil {
d.log.Warn("Failed to recover stale tasks", slog.Any("error", err))
}
// Initial recovery pass on startup.
d.recoverStaleTasks()

// Launch periodic recovery loop.
d.wg.Add(1)
go d.runStaleRecoveryLoop(ctx)

return nil
}

// runStaleRecoveryLoop ticks every StaleRecoveryInterval and calls
// recoverStaleTasks. It stops when ctx is cancelled or the dispatcher stops.
func (d *Dispatcher) runStaleRecoveryLoop(ctx context.Context) {
defer d.wg.Done()

interval := d.config.effectiveRecoveryInterval()
ticker := time.NewTicker(interval)
defer ticker.Stop()

d.log.Info("Stale recovery loop started", slog.Duration("interval", interval))

for {
select {
case <-ctx.Done():
d.log.Debug("Stale recovery loop stopped (context cancelled)")
return
case <-d.ctx.Done():
d.log.Debug("Stale recovery loop stopped (dispatcher stopped)")
return
case <-ticker.C:
d.recoverStaleTasks()
}
}
}

// Stop gracefully stops all workers and the dispatcher.
func (d *Dispatcher) Stop() {
d.log.Info("Stopping dispatcher")
Expand All @@ -101,31 +172,49 @@ func (d *Dispatcher) Stop() {
d.log.Info("Dispatcher stopped")
}

// recoverStaleTasks resets tasks that were left in "running" state
// from a previous crashed session.
func (d *Dispatcher) recoverStaleTasks() error {
stale, err := d.store.GetStaleRunningExecutions(d.config.StaleTaskDuration)
// recoverStaleTasks marks orphaned running and queued tasks as failed.
// Re-queuing without a worker just recreates the orphan, so we fail them.
func (d *Dispatcher) recoverStaleTasks() int {
var resetCount int

// Recover stale running tasks (crashed workers).
staleRunning, err := d.store.GetStaleRunningExecutions(d.config.effectiveRunningThreshold())
if err != nil {
return err
d.log.Warn("Failed to fetch stale running executions", slog.Any("error", err))
}

for _, exec := range stale {
d.log.Warn("Recovering stale task",
for _, exec := range staleRunning {
d.log.Warn("Marking stale running task as failed",
slog.String("execution_id", exec.ID),
slog.String("task_id", exec.TaskID),
slog.Time("created_at", exec.CreatedAt),
)
// Reset to queued so it will be picked up again
if err := d.store.UpdateExecutionStatus(exec.ID, "queued", "recovered from stale running state"); err != nil {
d.log.Error("Failed to reset stale task", slog.String("id", exec.ID), slog.Any("error", err))
if err := d.store.UpdateExecutionStatus(exec.ID, "failed", "stale running task recovered (orphaned worker)"); err != nil {
d.log.Error("Failed to mark stale running task", slog.String("id", exec.ID), slog.Any("error", err))
} else {
resetCount++
}
}

if len(stale) > 0 {
d.log.Info("Recovered stale tasks", slog.Int("count", len(stale)))
// Recover stale queued tasks (stuck in queue with no worker).
staleQueued, err := d.store.GetStaleQueuedExecutions(d.config.effectiveQueuedThreshold())
if err != nil {
d.log.Warn("Failed to fetch stale queued executions", slog.Any("error", err))
}
for _, exec := range staleQueued {
d.log.Warn("Marking stale queued task as failed",
slog.String("execution_id", exec.ID),
slog.String("task_id", exec.TaskID),
slog.Time("created_at", exec.CreatedAt),
)
if err := d.store.UpdateExecutionStatus(exec.ID, "failed", "stale queued task recovered (no worker picked up)"); err != nil {
d.log.Error("Failed to mark stale queued task", slog.String("id", exec.ID), slog.Any("error", err))
} else {
resetCount++
}
}

return nil
d.log.Info("stale recovery complete, reset N tasks", slog.Int("count", resetCount))
return resetCount
}

// QueueTask adds a task to the execution queue and returns the execution ID.
Expand Down
Loading
Loading