diff --git a/src/Manager/AutoscaleManager.php b/src/Manager/AutoscaleManager.php index 1dc446f..2ae9379 100644 --- a/src/Manager/AutoscaleManager.php +++ b/src/Manager/AutoscaleManager.php @@ -99,6 +99,14 @@ final class AutoscaleManager /** @var list */ private array $lastObservedManagerIds = []; + /** + * Cached per-workload host distributions from the previous cycle. + * Used to prevent thrashing when the cluster-wide target is unchanged. + * + * @var array> workloadKey => [managerId => assignedWorkers] + */ + private array $previousDistributions = []; + public function __construct( private readonly ScalingEngine $engine, private readonly WorkerSpawner $spawner, @@ -544,6 +552,12 @@ private function evaluateAndPublishClusterRecommendations(): void } } + // Prune cached distributions for workloads no longer present + $this->previousDistributions = array_intersect_key( + $this->previousDistributions, + $adjustedTargets, + ); + $issuedAt = $this->currentTimestamp(); foreach ($managerIds as $managerId) { @@ -674,9 +688,53 @@ private function distributeClusterTarget( } if ($targetWorkers <= 0 || $activeManagers === []) { + $this->previousDistributions[$workloadKey] = $targets; + return $targets; } + // Reuse previous distribution when target is unchanged and all + // cached assignments still fit within each host's remaining capacity. + // This prevents worker pool thrashing caused by sort-order instability + // when reported current counts fluctuate between heartbeats. + $cached = $this->previousDistributions[$workloadKey] ?? null; + + if ($cached !== null && array_sum($cached) === $targetWorkers) { + $activeManagerIds = array_map( + static fn (ClusterManagerState $state): string => $state->managerId, + $activeManagers, + ); + sort($activeManagerIds); + + $cachedManagerIds = array_keys($cached); + sort($cachedManagerIds); + + if ($activeManagerIds === $cachedManagerIds) { + $maxWorkersMap = []; + foreach ($activeManagers as $state) { + $maxWorkersMap[$state->managerId] = $state->maxWorkers; + } + + $feasible = true; + foreach ($cached as $managerId => $cachedCount) { + $available = $maxWorkersMap[$managerId] - ($assignedTotals[$managerId] ?? 0); + if ($cachedCount > $available) { + $feasible = false; + break; + } + } + + if ($feasible) { + foreach ($cached as $managerId => $cachedCount) { + $targets[$managerId] = $cachedCount; + $assignedTotals[$managerId] += $cachedCount; + } + + return $targets; + } + } + } + [$type, $connection, $name] = explode(':', $workloadKey, 3); $currentCounts = []; @@ -734,6 +792,8 @@ private function distributeClusterTarget( $remaining--; } + $this->previousDistributions[$workloadKey] = $targets; + return $targets; } diff --git a/tests/Unit/Cluster/ClusterDistributeTargetTest.php b/tests/Unit/Cluster/ClusterDistributeTargetTest.php index 959ff73..4b91e66 100644 --- a/tests/Unit/Cluster/ClusterDistributeTargetTest.php +++ b/tests/Unit/Cluster/ClusterDistributeTargetTest.php @@ -111,3 +111,121 @@ function invokeDistributeClusterTarget(array $managers, string $workloadKey, int expect($result['a'])->toBe(2) ->and($result['b'])->toBe(1); }); + +it('produces stable assignments across 30 cycles with frozen cluster state', function () { + // First cycle: establish initial distribution + $managers = [ + makeManagerState('large', maxWorkers: 15, queueWorkers: ['redis:fast' => 14]), + makeManagerState('small', maxWorkers: 8, queueWorkers: ['redis:fast' => 4]), + ]; + + $assignedTotals = ['large' => 0, 'small' => 0]; + $firstResult = invokeDistributeClusterTarget($managers, 'queue:redis:fast', 18, $assignedTotals); + + // Run 30 additional cycles with identical target but fluctuating reported counts + // (simulating heartbeat jitter that caused the original thrashing bug) + for ($cycle = 0; $cycle < 30; $cycle++) { + // Simulate fluctuating reported counts — workers appear to shift between hosts + $largeReported = $firstResult['large'] + ($cycle % 3 === 0 ? -1 : ($cycle % 3 === 1 ? 1 : 0)); + $smallReported = $firstResult['small'] + ($cycle % 3 === 0 ? 1 : ($cycle % 3 === 1 ? -1 : 0)); + + $managers = [ + makeManagerState('large', maxWorkers: 15, queueWorkers: ['redis:fast' => $largeReported]), + makeManagerState('small', maxWorkers: 8, queueWorkers: ['redis:fast' => $smallReported]), + ]; + + $assignedTotals = ['large' => 0, 'small' => 0]; + $result = invokeDistributeClusterTarget($managers, 'queue:redis:fast', 18, $assignedTotals); + + expect($result)->toBe($firstResult, "Cycle {$cycle}: assignments shifted despite stable target"); + } +}); + +it('recomputes distribution when target changes', function () { + // Cycle 1: target = 10 + $managers = [ + makeManagerState('a', maxWorkers: 10, queueWorkers: ['redis:fast' => 5]), + makeManagerState('b', maxWorkers: 10, queueWorkers: ['redis:fast' => 5]), + ]; + + $assignedTotals = ['a' => 0, 'b' => 0]; + $result1 = invokeDistributeClusterTarget($managers, 'queue:redis:fast', 10, $assignedTotals); + expect(array_sum($result1))->toBe(10); + + // Cycle 2: target increases to 14 — must recompute, not reuse cached + $managers = [ + makeManagerState('a', maxWorkers: 10, queueWorkers: ['redis:fast' => $result1['a']]), + makeManagerState('b', maxWorkers: 10, queueWorkers: ['redis:fast' => $result1['b']]), + ]; + + $assignedTotals = ['a' => 0, 'b' => 0]; + $result2 = invokeDistributeClusterTarget($managers, 'queue:redis:fast', 14, $assignedTotals); + expect(array_sum($result2))->toBe(14); +}); + +it('recomputes distribution when a manager joins', function () { + // Cycle 1: two managers + $managers = [ + makeManagerState('a', maxWorkers: 10, queueWorkers: ['redis:fast' => 5]), + makeManagerState('b', maxWorkers: 10, queueWorkers: ['redis:fast' => 5]), + ]; + + $assignedTotals = ['a' => 0, 'b' => 0]; + $result1 = invokeDistributeClusterTarget($managers, 'queue:redis:fast', 10, $assignedTotals); + + // Cycle 2: third manager joins — must recompute + $managers = [ + makeManagerState('a', maxWorkers: 10, queueWorkers: ['redis:fast' => $result1['a']]), + makeManagerState('b', maxWorkers: 10, queueWorkers: ['redis:fast' => $result1['b']]), + makeManagerState('c', maxWorkers: 10, queueWorkers: []), + ]; + + $assignedTotals = ['a' => 0, 'b' => 0, 'c' => 0]; + $result2 = invokeDistributeClusterTarget($managers, 'queue:redis:fast', 10, $assignedTotals); + expect(array_sum($result2))->toBe(10) + ->and(array_key_exists('c', $result2))->toBeTrue(); +}); + +it('recomputes distribution when a manager leaves', function () { + // Cycle 1: three managers + $managers = [ + makeManagerState('a', maxWorkers: 10, queueWorkers: ['redis:fast' => 4]), + makeManagerState('b', maxWorkers: 10, queueWorkers: ['redis:fast' => 3]), + makeManagerState('c', maxWorkers: 10, queueWorkers: ['redis:fast' => 3]), + ]; + + $assignedTotals = ['a' => 0, 'b' => 0, 'c' => 0]; + $result1 = invokeDistributeClusterTarget($managers, 'queue:redis:fast', 10, $assignedTotals); + + // Cycle 2: manager c leaves — must recompute and redistribute its share + $managers = [ + makeManagerState('a', maxWorkers: 10, queueWorkers: ['redis:fast' => $result1['a']]), + makeManagerState('b', maxWorkers: 10, queueWorkers: ['redis:fast' => $result1['b']]), + ]; + + $assignedTotals = ['a' => 0, 'b' => 0]; + $result2 = invokeDistributeClusterTarget($managers, 'queue:redis:fast', 10, $assignedTotals); + expect(array_sum($result2))->toBe(10) + ->and(array_key_exists('c', $result2))->toBeFalse(); +}); + +it('recomputes distribution when cached assignments exceed host capacity', function () { + // Cycle 1: two workloads, first gets distributed + $managers = [ + makeManagerState('a', maxWorkers: 6, queueWorkers: ['redis:fast' => 4]), + makeManagerState('b', maxWorkers: 6, queueWorkers: ['redis:fast' => 2]), + ]; + + $assignedTotals = ['a' => 0, 'b' => 0]; + $result1 = invokeDistributeClusterTarget($managers, 'queue:redis:fast', 6, $assignedTotals); + + // Cycle 2: another workload consumed capacity first, so cached fast + // assignments may no longer fit + $assignedTotals = ['a' => 4, 'b' => 4]; // other workloads already took 4 on each host + $result2 = invokeDistributeClusterTarget($managers, 'queue:redis:fast', 6, $assignedTotals); + + // Must respect host capacity: a can take max 2 more, b can take max 2 more + expect($result2['a'])->toBeLessThanOrEqual(2) + ->and($result2['b'])->toBeLessThanOrEqual(2) + ->and(array_sum($result2))->toBeLessThanOrEqual(4); +});