Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions config/queue-autoscale.php
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@
'max_cpu_percent' => 85,
'max_memory_percent' => 85,
'worker_memory_mb_estimate' => 128,
'worker_cpu_core_estimate' => 1.0,
'reserve_cpu_cores' => 1,
'worker_cpu_core_estimate' => 0.2,
'reserve_cpu_cores' => 0.2,
],

/*
Expand Down
8 changes: 5 additions & 3 deletions docs/algorithms/ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -531,10 +531,12 @@ $cpuUsage = SystemMetrics::cpuUsage(1.0)->usagePercentage(); // e.g., 60%

$availableCpuPercent = max($maxCpuPercent - $cpuUsage, 0); // 30%

$reserveCores = config('queue-autoscale.resource_limits.reserve_cpu_cores'); // 0.5
$usableCores = max($limits->availableCpuCores() - $reserveCores, 1);
$reserveCores = config('queue-autoscale.limits.reserve_cpu_cores'); // 0.2
$usableCores = max($limits->availableCpuCores() - $reserveCores, 0);

$maxWorkersByCpu = floor($usableCores * ($availableCpuPercent / 100));
$workerCpuEstimate = config('queue-autoscale.limits.worker_cpu_core_estimate'); // 0.2
$availableCoreEquivalents = $usableCores * ($availableCpuPercent / 100);
$maxWorkersByCpu = floor($availableCoreEquivalents / $workerCpuEstimate);
```

### Memory Constraints
Expand Down
3 changes: 2 additions & 1 deletion docs/algorithms/resource-constraints.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ Prevent resource exhaustion:
'max_cpu_percent' => 85, // Skip spawning when host CPU ≥ this
'max_memory_percent' => 85, // Same for memory
'worker_memory_mb_estimate' => 128, // Used in the per-worker memory ceiling
'reserve_cpu_cores' => 1, // Cores reserved for OS / other services
'worker_cpu_core_estimate' => 0.2, // Baseline CPU cores per worker (fallback)
'reserve_cpu_cores' => 0.2, // Cores reserved for OS / other services
],
```

Expand Down
3 changes: 2 additions & 1 deletion docs/basic-usage/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,8 @@ Caps are under the top-level `limits` key:
'max_cpu_percent' => 85, // Skip spawning when host CPU ≥ this
'max_memory_percent' => 85, // Skip spawning when host memory ≥ this
'worker_memory_mb_estimate' => 128, // Assumed memory footprint per worker
'reserve_cpu_cores' => 1, // Cores reserved for the OS/other services
'worker_cpu_core_estimate' => 0.2, // Baseline CPU cores per worker (fallback)
'reserve_cpu_cores' => 0.2, // Cores reserved for the OS/other services
],
```

Expand Down
5 changes: 3 additions & 2 deletions docs/basic-usage/performance.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ The global `limits` section protects the host from runaway spawning:
'max_cpu_percent' => 85, // Skip spawning at or above this
'max_memory_percent' => 85, // Same for memory
'worker_memory_mb_estimate' => 128, // Used to derive a per-worker ceiling
'reserve_cpu_cores' => 1, // Cores kept for OS/other services
'worker_cpu_core_estimate' => 0.2, // Baseline CPU cores per worker (fallback)
'reserve_cpu_cores' => 0.2, // Cores kept for OS/other services
],
```

Expand All @@ -215,7 +216,7 @@ $maxByMemory = floor(
$systemMemoryMb * ($limits['max_memory_percent'] / 100) / $limits['worker_memory_mb_estimate']
);

$maxByCpu = ($cpuCores - $limits['reserve_cpu_cores']) * 2;
$maxByCpu = floor(($cpuCores - $limits['reserve_cpu_cores']) / $limits['worker_cpu_core_estimate']);

$hostCeiling = min($maxByMemory, $maxByCpu);
```
Expand Down
4 changes: 2 additions & 2 deletions src/Cluster/ClusterManagerState.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public function __construct(
public float $cpuPercent,
public float $cpuCores,
public float $cpuUsableCores,
public int $cpuReservedCores,
public float $cpuReservedCores,
public float $memoryPercent,
public float $memoryTotalMb,
public float $memoryUsedMb,
Expand Down Expand Up @@ -103,7 +103,7 @@ public static function fromArray(array $payload): self
cpuPercent: is_numeric($payload['cpu_percent'] ?? null) ? (float) $payload['cpu_percent'] : 0.0,
cpuCores: is_numeric($payload['cpu_cores'] ?? null) ? (float) $payload['cpu_cores'] : 0.0,
cpuUsableCores: is_numeric($payload['cpu_usable_cores'] ?? null) ? (float) $payload['cpu_usable_cores'] : 0.0,
cpuReservedCores: is_numeric($payload['cpu_reserved_cores'] ?? null) ? (int) $payload['cpu_reserved_cores'] : 0,
cpuReservedCores: is_numeric($payload['cpu_reserved_cores'] ?? null) ? (float) $payload['cpu_reserved_cores'] : 0.0,
memoryPercent: is_numeric($payload['memory_percent'] ?? null) ? (float) $payload['memory_percent'] : 0.0,
memoryTotalMb: is_numeric($payload['memory_total_mb'] ?? null) ? (float) $payload['memory_total_mb'] : 0.0,
memoryUsedMb: is_numeric($payload['memory_used_mb'] ?? null) ? (float) $payload['memory_used_mb'] : 0.0,
Expand Down
6 changes: 3 additions & 3 deletions src/Configuration/AutoscaleConfiguration.php
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,12 @@ public static function workerMemoryMbEstimate(): int

public static function workerCpuCoreEstimate(): float
{
return self::floatConfig('queue-autoscale.limits.worker_cpu_core_estimate', 1.0);
return self::floatConfig('queue-autoscale.limits.worker_cpu_core_estimate', 0.2);
}

public static function reserveCpuCores(): int
public static function reserveCpuCores(): float
{
return self::intConfig('queue-autoscale.limits.reserve_cpu_cores', 1);
return self::floatConfig('queue-autoscale.limits.reserve_cpu_cores', 0.2);
}

public static function workerTimeoutSeconds(): int
Expand Down
63 changes: 55 additions & 8 deletions src/Manager/AutoscaleManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ private function runClusterCycle(): void
cpuPercent: $this->clusterFloat($cpuDetails['current_cpu_percent'] ?? 0.0),
cpuCores: is_numeric($cpuDetails['total_cores'] ?? null) ? (float) $cpuDetails['total_cores'] : 0.0,
cpuUsableCores: is_numeric($cpuDetails['usable_cores'] ?? null) ? (float) $cpuDetails['usable_cores'] : 0.0,
cpuReservedCores: is_numeric($cpuDetails['reserve_cores'] ?? null) ? (int) $cpuDetails['reserve_cores'] : 0,
cpuReservedCores: is_numeric($cpuDetails['reserve_cores'] ?? null) ? (float) $cpuDetails['reserve_cores'] : 0.0,
memoryPercent: $this->clusterFloat($memoryDetails['current_memory_percent'] ?? 0.0),
memoryTotalMb: $memoryTotalMb,
memoryUsedMb: $memoryUsedMb,
Expand Down Expand Up @@ -530,10 +530,12 @@ private function clusterTargetWorkers(
return $config->workers->pinnedCount();
}

$decision = $this->engine->evaluate($metrics, $config, $currentWorkers, $clusterTotalWorkers);
$decision = $this->policies->beforeScaling($decision);

return $decision->targetWorkers;
// Use demand-only evaluation: strategy + config bounds, no system
// capacity constraint. The leader must see actual demand so it can
// recommend the right host count and distribute work across all
// managers. Per-host capacity enforcement happens during distribution
// (distributeClusterTarget respects each manager's maxWorkers).
return $this->engine->evaluateDemand($metrics, $config);
}

/**
Expand Down Expand Up @@ -590,19 +592,32 @@ private function distributeClusterTarget(

$remaining = $targetWorkers;

// Phase 1: Preserve existing workers on their current managers,
// but cap at each manager's reported capacity.
foreach ($preserveOrder as $state) {
if ($remaining <= 0) {
break;
}

$keep = min($currentCounts[$state->managerId], $remaining);
$hostCapacity = max($state->maxWorkers - ($assignedTotals[$state->managerId] ?? 0), 0);
$keep = min($currentCounts[$state->managerId], $remaining, $hostCapacity);
$targets[$state->managerId] = $keep;
$assignedTotals[$state->managerId] += $keep;
$remaining -= $keep;
}

// Phase 2: Distribute remaining workers to least-loaded managers
// that still have capacity.
while ($remaining > 0) {
$candidates = $activeManagers;
$candidates = array_values(array_filter(
$activeManagers,
fn (ClusterManagerState $state): bool => $assignedTotals[$state->managerId] < $state->maxWorkers,
));

if ($candidates === []) {
break;
}

usort(
$candidates,
fn (ClusterManagerState $a, ClusterManagerState $b): int => ($assignedTotals[$a->managerId] <=> $assignedTotals[$b->managerId])
Expand Down Expand Up @@ -712,7 +727,7 @@ private function buildClusterSummary(array $activeManagers, array $workloads): a
$requiredWorkers = array_sum(array_map(static fn (array $workload): int => (int) $workload['target_workers'], $workloads));
$totalWorkers = array_sum(array_map(static fn (ClusterManagerState $state): int => $state->totalWorkers, $activeManagers));
$recommendedHosts = $this->recommendedHostCount($activeManagers, $requiredWorkers);
$signal = $this->clusterScaleSignal($currentHosts, $recommendedHosts, $requiredWorkers, $totalWorkerCapacity);
$signal = $this->clusterScaleSignal($currentHosts, $recommendedHosts, $requiredWorkers, $totalWorkerCapacity, $totalWorkers, $workloads);
$generatedAt = now();
$generatedAtMs = $this->currentTimestamp();
$leaderLeaseTtlSeconds = AutoscaleConfiguration::clusterLeaderLeaseSeconds();
Expand Down Expand Up @@ -806,13 +821,16 @@ private function recommendedHostCount(array $activeManagers, int $requiredWorker
}

/**
* @param array<int, array<string, mixed>> $workloads
* @return array<string, int|string>
*/
private function clusterScaleSignal(
int $currentHosts,
int $recommendedHosts,
int $requiredWorkers,
int $totalWorkerCapacity,
int $totalWorkers,
array $workloads,
): array {
if ($requiredWorkers > $totalWorkerCapacity) {
return [
Expand All @@ -824,6 +842,35 @@ private function clusterScaleSignal(
}

if ($recommendedHosts < $currentHosts) {
// Do not recommend scale-down when the cluster is under pressure.
$utilizationPercent = $totalWorkerCapacity > 0
? ($totalWorkers / $totalWorkerCapacity) * 100
: 0.0;

$hasScaleUpPressure = false;
foreach ($workloads as $workload) {
$target = is_numeric($workload['target_workers'] ?? null) ? (int) $workload['target_workers'] : 0;
$current = is_numeric($workload['current_workers'] ?? null) ? (int) $workload['current_workers'] : 0;
$pending = is_numeric($workload['pending'] ?? null) ? (int) $workload['pending'] : 0;

if ($target > $current || $pending > 0) {
$hasScaleUpPressure = true;

break;
}
}

if ($utilizationPercent >= 80.0 || $hasScaleUpPressure) {
return [
'action' => 'hold',
'reason' => $utilizationPercent >= 80.0
? sprintf('high utilization (%.0f%%) prevents scale-down', $utilizationPercent)
: 'pending workload prevents scale-down',
'current_hosts' => $currentHosts,
'recommended_hosts' => $currentHosts,
];
}

return [
'action' => 'scale_down',
'reason' => 'required workers fit on fewer hosts',
Expand Down
2 changes: 1 addition & 1 deletion src/Scaling/Calculators/CapacityCalculator.php
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public function calculateMaxWorkers(int $currentWorkers = 0): CapacityCalculatio

$availableCpuPercent = max($maxCpuPercent - $currentCpuPercent, 0);
$reserveCores = AutoscaleConfiguration::reserveCpuCores();
$usableCores = max($this->cachedAvailableCores - $reserveCores, 1);
$usableCores = max($this->cachedAvailableCores - $reserveCores, 0);

$workerCpuCoreEstimate = max(
$this->measuredWorkerCpuCoreEstimate ?? AutoscaleConfiguration::workerCpuCoreEstimate(),
Expand Down
38 changes: 33 additions & 5 deletions src/Scaling/ScalingEngine.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,18 @@ public function __construct(
) {}

/**
* Evaluate scaling decision for a queue
* Evaluate scaling decision for a single-host queue.
*
* Uses LOCAL system capacity (CPU/memory on this host) to constrain the
* strategy recommendation. Both $totalPoolWorkers and the capacity result
* must be from the SAME host — passing cluster-wide pool counts against
* local capacity produces incorrect results. For cluster-wide demand
* calculations use evaluateDemand() instead.
*
* @param QueueMetricsData $metrics Queue metrics from laravel-queue-metrics
* @param QueueConfiguration $config Queue SLA configuration
* @param int $currentWorkers Current worker count for this queue
* @param int $totalPoolWorkers Total workers across all queues (for accurate capacity sharing)
* @param int $currentWorkers Current worker count for this queue on this host
* @param int $totalPoolWorkers Total workers across all queues on this host
* @return ScalingDecision Scaling decision with target workers
*/
public function evaluate(
Expand All @@ -36,9 +42,11 @@ public function evaluate(
$strategyRecommendation = $this->strategy->calculateTargetWorkers($metrics, $config);
$targetWorkers = $strategyRecommendation;

// 2. Get system capacity using total pool workers for accurate measurement.
// This ensures capacity is calculated against ALL workers, not just this queue's,
// 2. Get LOCAL system capacity using total pool workers for accurate measurement.
// This ensures capacity is calculated against ALL workers on THIS host,
// preventing each queue from assuming it has all remaining system capacity.
// Note: both $totalPoolWorkers and calculateMaxWorkers() must be local-host
// scoped. In cluster mode, use evaluateDemand() instead.
$effectiveTotalWorkers = max($totalPoolWorkers, $currentWorkers);
$capacityResult = $this->capacity->calculateMaxWorkers($effectiveTotalWorkers);

Expand Down Expand Up @@ -88,6 +96,26 @@ public function evaluate(
);
}

/**
* Evaluate demand-only target for cluster-wide scaling decisions.
*
* Returns the strategy recommendation constrained only by config bounds
* (workers.min / workers.max), WITHOUT applying system capacity constraints.
* In cluster mode the leader must see actual demand so it can recommend
* the right host count; per-host capacity enforcement happens during
* distribution and at execution time on each manager.
*/
public function evaluateDemand(
QueueMetricsData $metrics,
QueueConfiguration $config,
): int {
$targetWorkers = $this->strategy->calculateTargetWorkers($metrics, $config);
$targetWorkers = max($targetWorkers, $config->workers->min);
$targetWorkers = min($targetWorkers, $config->workers->max);

return $targetWorkers;
}

/**
* Determine which constraint is the final limiting factor
*/
Expand Down
32 changes: 25 additions & 7 deletions tests/Unit/CapacityCalculatorTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -131,40 +131,58 @@
});

it('allows more workers with lower worker_cpu_core_estimate', function () {
// Eliminate environment dependencies: no reserve, allow full CPU.
config()->set('queue-autoscale.limits.max_cpu_percent', 100);
config()->set('queue-autoscale.limits.reserve_cpu_cores', 0);
config()->set('queue-autoscale.limits.worker_cpu_core_estimate', 1.0);
$calculator = new CapacityCalculator;
$highEstimate = $calculator->calculateMaxWorkers();

// Reuse cached metrics — only the config value changes.
config()->set('queue-autoscale.limits.worker_cpu_core_estimate', 0.2);
$calculator->invalidateCache();
$lowEstimate = $calculator->calculateMaxWorkers();

expect($lowEstimate->maxWorkersByCpu)->toBeGreaterThan($highEstimate->maxWorkersByCpu);
// On CI runners where system-metrics reports 0 cores (no cgroup limit),
// both estimates yield 0 workers — verify estimate is applied instead.
if ($highEstimate->maxWorkersByCpu > 0) {
expect($lowEstimate->maxWorkersByCpu)->toBeGreaterThan($highEstimate->maxWorkersByCpu);
} else {
expect($lowEstimate->details['cpu_details']['worker_cpu_core_estimate'])->toBe(0.2)
->and($highEstimate->details['cpu_details']['worker_cpu_core_estimate'])->toBe(1.0);
}
});

it('uses default worker_cpu_core_estimate of 1.0 when not configured', function () {
it('uses default worker_cpu_core_estimate of 0.2 when not configured', function () {
config()->offsetUnset('queue-autoscale.limits.worker_cpu_core_estimate');
$calculator = new CapacityCalculator;

$result = $calculator->calculateMaxWorkers();

expect($result->details['cpu_details']['worker_cpu_core_estimate'])->toBe(1.0)
expect($result->details['cpu_details']['worker_cpu_core_estimate'])->toBe(0.2)
->and($result->details['cpu_details']['cpu_estimate_source'])->toBe('config');
});

it('uses measured CPU estimate when set, overriding config', function () {
// Eliminate environment dependencies: no reserve, allow full CPU.
config()->set('queue-autoscale.limits.max_cpu_percent', 100);
config()->set('queue-autoscale.limits.reserve_cpu_cores', 0);
config()->set('queue-autoscale.limits.worker_cpu_core_estimate', 1.0);
$calculator = new CapacityCalculator;

$configResult = $calculator->calculateMaxWorkers();

// Reuse cached metrics — only the estimate changes.
$calculator->setMeasuredWorkerCpuCoreEstimate(0.1);
$calculator->invalidateCache();
$measuredResult = $calculator->calculateMaxWorkers();

expect($measuredResult->maxWorkersByCpu)->toBeGreaterThan($configResult->maxWorkersByCpu)
->and($measuredResult->details['cpu_details']['worker_cpu_core_estimate'])->toBe(0.1)
// Estimate is correctly applied regardless of available capacity.
expect($measuredResult->details['cpu_details']['worker_cpu_core_estimate'])->toBe(0.1)
->and($measuredResult->details['cpu_details']['cpu_estimate_source'])->toBe('measured');

// When system has detectable CPU cores, lower estimate yields more workers.
if ($configResult->maxWorkersByCpu > 0) {
expect($measuredResult->maxWorkersByCpu)->toBeGreaterThan($configResult->maxWorkersByCpu);
}
});

it('falls back to config when measured estimate is cleared', function () {
Expand Down
8 changes: 4 additions & 4 deletions tests/Unit/Cluster/ClusterDataTransferObjectsTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
cpuPercent: 42.5,
cpuCores: 8.0,
cpuUsableCores: 7.0,
cpuReservedCores: 1,
cpuReservedCores: 1.0,
memoryPercent: 63.1,
memoryTotalMb: 8192.0,
memoryUsedMb: 5169.2,
Expand All @@ -37,7 +37,7 @@
->and($decoded->capacityLimiter)->toBe('memory')
->and($decoded->cpuCores)->toBe(8.0)
->and($decoded->cpuUsableCores)->toBe(7.0)
->and($decoded->cpuReservedCores)->toBe(1)
->and($decoded->cpuReservedCores)->toBe(1.0)
->and($decoded->memoryTotalMb)->toBe(8192.0)
->and($decoded->memoryUsedMb)->toBe(5169.2)
->and($decoded->memoryFreeMb)->toBe(3022.8)
Expand All @@ -62,7 +62,7 @@
cpuPercent: 25.0,
cpuCores: $cpuCores,
cpuUsableCores: $usableCores,
cpuReservedCores: 0,
cpuReservedCores: 0.0,
memoryPercent: 40.0,
memoryTotalMb: 512.0,
memoryUsedMb: 204.8,
Expand All @@ -80,7 +80,7 @@
->and($decoded->cpuCores)->toBeFloat()
->and($decoded->cpuUsableCores)->toBe($usableCores)
->and($decoded->cpuUsableCores)->toBeFloat()
->and($decoded->cpuReservedCores)->toBe(0);
->and($decoded->cpuReservedCores)->toBe(0.0);
})->with([0.2, 0.5, 1.5, 2.0, 4.0]);

it('resolves recommendation targets for queues and groups', function () {
Expand Down
Loading