Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions src/Manager/AutoscaleManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,14 @@ final class AutoscaleManager
/** @var list<string> */
private array $lastObservedManagerIds = [];

/**
* Cached per-workload host distributions from the previous cycle.
* Used to prevent thrashing when the cluster-wide target is unchanged.
*
* @var array<string, array<string, int>> workloadKey => [managerId => assignedWorkers]
*/
private array $previousDistributions = [];

public function __construct(
private readonly ScalingEngine $engine,
private readonly WorkerSpawner $spawner,
Expand Down Expand Up @@ -544,6 +552,12 @@ private function evaluateAndPublishClusterRecommendations(): void
}
}

// Prune cached distributions for workloads no longer present
$this->previousDistributions = array_intersect_key(
$this->previousDistributions,
$adjustedTargets,
);

$issuedAt = $this->currentTimestamp();

foreach ($managerIds as $managerId) {
Expand Down Expand Up @@ -674,9 +688,53 @@ private function distributeClusterTarget(
}

if ($targetWorkers <= 0 || $activeManagers === []) {
$this->previousDistributions[$workloadKey] = $targets;

return $targets;
}

// Reuse previous distribution when target is unchanged and all
// cached assignments still fit within each host's remaining capacity.
// This prevents worker pool thrashing caused by sort-order instability
// when reported current counts fluctuate between heartbeats.
$cached = $this->previousDistributions[$workloadKey] ?? null;

if ($cached !== null && array_sum($cached) === $targetWorkers) {
$activeManagerIds = array_map(
static fn (ClusterManagerState $state): string => $state->managerId,
$activeManagers,
);
sort($activeManagerIds);

$cachedManagerIds = array_keys($cached);
sort($cachedManagerIds);

if ($activeManagerIds === $cachedManagerIds) {
$maxWorkersMap = [];
foreach ($activeManagers as $state) {
$maxWorkersMap[$state->managerId] = $state->maxWorkers;
}

$feasible = true;
foreach ($cached as $managerId => $cachedCount) {
$available = $maxWorkersMap[$managerId] - ($assignedTotals[$managerId] ?? 0);
if ($cachedCount > $available) {
$feasible = false;
break;
}
}

if ($feasible) {
foreach ($cached as $managerId => $cachedCount) {
$targets[$managerId] = $cachedCount;
$assignedTotals[$managerId] += $cachedCount;
}

return $targets;
}
}
}

[$type, $connection, $name] = explode(':', $workloadKey, 3);
$currentCounts = [];

Expand Down Expand Up @@ -734,6 +792,8 @@ private function distributeClusterTarget(
$remaining--;
}

$this->previousDistributions[$workloadKey] = $targets;

return $targets;
}

Expand Down
118 changes: 118 additions & 0 deletions tests/Unit/Cluster/ClusterDistributeTargetTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -111,3 +111,121 @@ function invokeDistributeClusterTarget(array $managers, string $workloadKey, int
expect($result['a'])->toBe(2)
->and($result['b'])->toBe(1);
});

it('produces stable assignments across 30 cycles with frozen cluster state', function () {
// First cycle: establish initial distribution
$managers = [
makeManagerState('large', maxWorkers: 15, queueWorkers: ['redis:fast' => 14]),
makeManagerState('small', maxWorkers: 8, queueWorkers: ['redis:fast' => 4]),
];

$assignedTotals = ['large' => 0, 'small' => 0];
$firstResult = invokeDistributeClusterTarget($managers, 'queue:redis:fast', 18, $assignedTotals);

// Run 30 additional cycles with identical target but fluctuating reported counts
// (simulating heartbeat jitter that caused the original thrashing bug)
for ($cycle = 0; $cycle < 30; $cycle++) {
// Simulate fluctuating reported counts — workers appear to shift between hosts
$largeReported = $firstResult['large'] + ($cycle % 3 === 0 ? -1 : ($cycle % 3 === 1 ? 1 : 0));
$smallReported = $firstResult['small'] + ($cycle % 3 === 0 ? 1 : ($cycle % 3 === 1 ? -1 : 0));

$managers = [
makeManagerState('large', maxWorkers: 15, queueWorkers: ['redis:fast' => $largeReported]),
makeManagerState('small', maxWorkers: 8, queueWorkers: ['redis:fast' => $smallReported]),
];

$assignedTotals = ['large' => 0, 'small' => 0];
$result = invokeDistributeClusterTarget($managers, 'queue:redis:fast', 18, $assignedTotals);

expect($result)->toBe($firstResult, "Cycle {$cycle}: assignments shifted despite stable target");
}
});

it('recomputes distribution when target changes', function () {
// Cycle 1: target = 10
$managers = [
makeManagerState('a', maxWorkers: 10, queueWorkers: ['redis:fast' => 5]),
makeManagerState('b', maxWorkers: 10, queueWorkers: ['redis:fast' => 5]),
];

$assignedTotals = ['a' => 0, 'b' => 0];
$result1 = invokeDistributeClusterTarget($managers, 'queue:redis:fast', 10, $assignedTotals);
expect(array_sum($result1))->toBe(10);

// Cycle 2: target increases to 14 — must recompute, not reuse cached
$managers = [
makeManagerState('a', maxWorkers: 10, queueWorkers: ['redis:fast' => $result1['a']]),
makeManagerState('b', maxWorkers: 10, queueWorkers: ['redis:fast' => $result1['b']]),
];

$assignedTotals = ['a' => 0, 'b' => 0];
$result2 = invokeDistributeClusterTarget($managers, 'queue:redis:fast', 14, $assignedTotals);
expect(array_sum($result2))->toBe(14);
});

it('recomputes distribution when a manager joins', function () {
// Cycle 1: two managers
$managers = [
makeManagerState('a', maxWorkers: 10, queueWorkers: ['redis:fast' => 5]),
makeManagerState('b', maxWorkers: 10, queueWorkers: ['redis:fast' => 5]),
];

$assignedTotals = ['a' => 0, 'b' => 0];
$result1 = invokeDistributeClusterTarget($managers, 'queue:redis:fast', 10, $assignedTotals);

// Cycle 2: third manager joins — must recompute
$managers = [
makeManagerState('a', maxWorkers: 10, queueWorkers: ['redis:fast' => $result1['a']]),
makeManagerState('b', maxWorkers: 10, queueWorkers: ['redis:fast' => $result1['b']]),
makeManagerState('c', maxWorkers: 10, queueWorkers: []),
];

$assignedTotals = ['a' => 0, 'b' => 0, 'c' => 0];
$result2 = invokeDistributeClusterTarget($managers, 'queue:redis:fast', 10, $assignedTotals);
expect(array_sum($result2))->toBe(10)
->and(array_key_exists('c', $result2))->toBeTrue();
});

it('recomputes distribution when a manager leaves', function () {
// Cycle 1: three managers
$managers = [
makeManagerState('a', maxWorkers: 10, queueWorkers: ['redis:fast' => 4]),
makeManagerState('b', maxWorkers: 10, queueWorkers: ['redis:fast' => 3]),
makeManagerState('c', maxWorkers: 10, queueWorkers: ['redis:fast' => 3]),
];

$assignedTotals = ['a' => 0, 'b' => 0, 'c' => 0];
$result1 = invokeDistributeClusterTarget($managers, 'queue:redis:fast', 10, $assignedTotals);

// Cycle 2: manager c leaves — must recompute and redistribute its share
$managers = [
makeManagerState('a', maxWorkers: 10, queueWorkers: ['redis:fast' => $result1['a']]),
makeManagerState('b', maxWorkers: 10, queueWorkers: ['redis:fast' => $result1['b']]),
];

$assignedTotals = ['a' => 0, 'b' => 0];
$result2 = invokeDistributeClusterTarget($managers, 'queue:redis:fast', 10, $assignedTotals);
expect(array_sum($result2))->toBe(10)
->and(array_key_exists('c', $result2))->toBeFalse();
});

it('recomputes distribution when cached assignments exceed host capacity', function () {
// Cycle 1: two workloads, first gets distributed
$managers = [
makeManagerState('a', maxWorkers: 6, queueWorkers: ['redis:fast' => 4]),
makeManagerState('b', maxWorkers: 6, queueWorkers: ['redis:fast' => 2]),
];

$assignedTotals = ['a' => 0, 'b' => 0];
$result1 = invokeDistributeClusterTarget($managers, 'queue:redis:fast', 6, $assignedTotals);

// Cycle 2: another workload consumed capacity first, so cached fast
// assignments may no longer fit
$assignedTotals = ['a' => 4, 'b' => 4]; // other workloads already took 4 on each host
$result2 = invokeDistributeClusterTarget($managers, 'queue:redis:fast', 6, $assignedTotals);

// Must respect host capacity: a can take max 2 more, b can take max 2 more
expect($result2['a'])->toBeLessThanOrEqual(2)
->and($result2['b'])->toBeLessThanOrEqual(2)
->and(array_sum($result2))->toBeLessThanOrEqual(4);
});