Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions config/queue-autoscale.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,18 @@
| Each value can be a ProfileContract class OR an array of partial overrides
| that merges with sla_defaults.
|
| Optional 'resources' key declares per-queue CPU/memory estimates for
| capacity calculations. These override the global limits when measured
| data is not yet available (cold start). Once the autoscaler has enough
| measured samples, measured values take precedence automatically.
|
| 'slow' => [
| 'resources' => [
| 'cpu_cores' => 0.5, // CPU cores per worker (default: limits.worker_cpu_core_estimate)
| 'memory_mb' => 2048, // Memory MB per worker (default: limits.worker_memory_mb_estimate)
| ],
| ],
|
*/
'queues' => [
// 'payments' => CriticalProfile::class,
Expand Down
3 changes: 2 additions & 1 deletion docs/advanced-usage/custom-strategies.md
Original file line number Diff line number Diff line change
Expand Up @@ -669,7 +669,8 @@ use Cbox\LaravelQueueAutoscale\Scaling\ScalingEngine;
it('integrates with scaling engine', function () {
$strategy = new SimpleRateBasedStrategy();
$capacity = app(\Cbox\LaravelQueueAutoscale\Scaling\Calculators\CapacityCalculator::class);
$engine = new ScalingEngine($strategy, $capacity);
$resolver = app(\Cbox\LaravelQueueAutoscale\Scaling\ResourceEstimateResolver::class);
$engine = new ScalingEngine($strategy, $capacity, $resolver);

$metrics = (object) [
'processingRate' => 10.0,
Expand Down
32 changes: 32 additions & 0 deletions src/Configuration/AutoscaleConfiguration.php
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,38 @@ public static function configuredQueues(): array
return $result;
}

/**
* Get per-queue resource overrides.
*
* Returns the 'resources' sub-array from the queue's config entry,
* or an empty array if not configured. Valid keys: 'cpu_cores', 'memory_mb'.
* Unknown keys are preserved for forward compatibility.
*
* @return array<string, float|int>
*/
public static function queueResources(string $queue): array
{
$queueConfig = config("queue-autoscale.queues.{$queue}");

if (! is_array($queueConfig)) {
return [];
}

$resources = $queueConfig['resources'] ?? null;

if (! is_array($resources)) {
return [];
}

/** @var array<string, float|int> $filtered */
$filtered = array_filter(
$resources,
static fn (mixed $value): bool => is_int($value) || is_float($value),
);

return $filtered;
}

private static function intConfig(string $key, int $default): int
{
return self::intValue(config($key, $default), $default);
Expand Down
2 changes: 2 additions & 0 deletions src/LaravelQueueAutoscaleServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
use Cbox\LaravelQueueAutoscale\Scaling\Calculators\LinearRegressionForecaster;
use Cbox\LaravelQueueAutoscale\Scaling\Calculators\LittlesLawCalculator;
use Cbox\LaravelQueueAutoscale\Scaling\Forecasting\Policies\ModerateForecastPolicy;
use Cbox\LaravelQueueAutoscale\Scaling\ResourceEstimateResolver;
use Cbox\LaravelQueueAutoscale\Scaling\ScalingEngine;
use Cbox\LaravelQueueAutoscale\Support\ManagerProcessLock;
use Cbox\LaravelQueueAutoscale\Support\RestartSignal;
Expand Down Expand Up @@ -67,6 +68,7 @@ public function register(): void
$this->app->singleton(LittlesLawCalculator::class);
$this->app->singleton(BacklogDrainCalculator::class);
$this->app->singleton(CapacityCalculator::class);
$this->app->singleton(ResourceEstimateResolver::class);
$this->app->singleton(ArrivalRateEstimator::class);

// Register v2 contracts with their default implementations
Expand Down
103 changes: 78 additions & 25 deletions src/Manager/AutoscaleManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
use Cbox\LaravelQueueAutoscale\Output\DataTransferObjects\WorkerStatus;
use Cbox\LaravelQueueAutoscale\Policies\PolicyExecutor;
use Cbox\LaravelQueueAutoscale\Scaling\Calculators\CapacityCalculator;
use Cbox\LaravelQueueAutoscale\Scaling\DTOs\ResourceEstimate;
use Cbox\LaravelQueueAutoscale\Scaling\ResourceEstimateResolver;
use Cbox\LaravelQueueAutoscale\Scaling\ScalingDecision;
use Cbox\LaravelQueueAutoscale\Scaling\ScalingEngine;
use Cbox\LaravelQueueAutoscale\Support\RestartSignal;
Expand Down Expand Up @@ -105,6 +107,7 @@ public function __construct(
private readonly RestartSignal $restartSignal,
private readonly ClusterStore $clusterStore,
private readonly CapacityCalculator $capacity,
private readonly ResourceEstimateResolver $resolver,
) {
$this->pool = new WorkerPool;
$this->outputBuffer = new WorkerOutputBuffer;
Expand Down Expand Up @@ -251,7 +254,10 @@ private function runLoop(): void

private function runClusterCycle(): void
{
$capacity = $this->capacity->calculateMaxWorkers($this->pool->totalCount());
$capacity = $this->capacity->calculateMaxWorkers(
$this->pool->totalCount(),
ResourceEstimate::globalDefault(),
);
$capacityDetails = $capacity->details;
$cpuDetails = is_array($capacityDetails['cpu_details'] ?? null) ? $capacityDetails['cpu_details'] : [];
$memoryDetails = is_array($capacityDetails['memory_details'] ?? null) ? $capacityDetails['memory_details'] : [];
Expand Down Expand Up @@ -315,7 +321,7 @@ private function evaluateAndPublishClusterRecommendations(): void
{
app(CalculateQueueMetricsAction::class)->executeForAllQueues();

$this->updateMeasuredWorkerCpuEstimate();
$this->updateMeasuredResourceEstimates();

$allQueues = QueueMetrics::getAllQueuesWithMetrics();

Expand Down Expand Up @@ -928,7 +934,7 @@ private function evaluateAndScale(): void
// Recalculate metrics first to ensure throughput uses current sliding window
app(CalculateQueueMetricsAction::class)->executeForAllQueues();

$this->updateMeasuredWorkerCpuEstimate();
$this->updateMeasuredResourceEstimates();

// Get ALL queues with metrics from laravel-queue-metrics
// Returns: ['redis:default' => [...metrics array...], ...]
Expand Down Expand Up @@ -1066,51 +1072,98 @@ private function announceExclusion(string $connection, string $queue): void
}

/**
* Minimum measured CPU core estimate to prevent runaway capacity calculations.
* A worker using less than 1% of a core during processing is implausible;
* clamping here avoids division producing thousands of workers from noise.
*/
private const MIN_MEASURED_CPU_CORE_ESTIMATE = 0.01;

/**
* Compute measured CPU core estimate from actual job processing metrics.
* Compute per-queue measured CPU and memory estimates from actual job metrics.
*
* Uses the ratio cpuTimeMs / durationMs across all job classes to determine
* how much CPU each worker actually uses during processing. This pessimistic
* estimate (based on active processing, not idle time) ensures capacity
* planning accounts for worst-case worker CPU load.
* Walks QueueMetrics::getAllJobsWithMetrics() and groups results by
* connection:queue. For each queue, calculates weighted average CPU cores
* (cpuTimeMs / durationMs) and weighted average memory. Results are pushed
* into the ResourceEstimateResolver for use by ScalingEngine.
*/
private function updateMeasuredWorkerCpuEstimate(): void
private function updateMeasuredResourceEstimates(): void
{
try {
$allJobs = QueueMetrics::getAllJobsWithMetrics();
} catch (\Throwable) {
return;
}

$totalWeightedCpuCores = 0.0;
$totalProcessed = 0;
/** @var array<string, array{cpu_weighted: float, mem_weighted: float, cpu_processed: int, mem_processed: int}> $perQueue */
$perQueue = [];

foreach ($allJobs as $jobData) {
$connection = is_string($jobData['connection'] ?? null) ? $jobData['connection'] : 'default';
$queue = is_string($jobData['queue'] ?? null) ? $jobData['queue'] : 'default';
$key = "{$connection}:{$queue}";

$cpu = is_array($jobData['cpu'] ?? null) ? $jobData['cpu'] : [];
$duration = is_array($jobData['duration'] ?? null) ? $jobData['duration'] : [];
$memory = is_array($jobData['memory'] ?? null) ? $jobData['memory'] : [];
$execution = is_array($jobData['execution'] ?? null) ? $jobData['execution'] : [];

$cpuAvgMs = is_numeric($cpu['avg'] ?? null) ? (float) $cpu['avg'] : 0.0;
$durationAvgMs = is_numeric($duration['avg'] ?? null) ? (float) $duration['avg'] : 0.0;
$memAvgMb = is_numeric($memory['avg'] ?? null) ? (float) $memory['avg'] : 0.0;
$processed = is_numeric($execution['total_processed'] ?? null) ? (int) $execution['total_processed'] : 0;

if ($durationAvgMs > 0 && $cpuAvgMs > 0 && $processed > 0) {
if ($processed <= 0) {
continue;
}

if (! isset($perQueue[$key])) {
$perQueue[$key] = ['cpu_weighted' => 0.0, 'mem_weighted' => 0.0, 'cpu_processed' => 0, 'mem_processed' => 0];
}

if ($durationAvgMs > 0 && $cpuAvgMs > 0) {
$coresPerWorker = $cpuAvgMs / $durationAvgMs;
$totalWeightedCpuCores += $coresPerWorker * $processed;
$totalProcessed += $processed;
$perQueue[$key]['cpu_weighted'] += $coresPerWorker * $processed;
$perQueue[$key]['cpu_processed'] += $processed;
}

if ($memAvgMb > 0) {
$perQueue[$key]['mem_weighted'] += $memAvgMb * $processed;
$perQueue[$key]['mem_processed'] += $processed;
}
}

if ($totalProcessed > 0) {
$measuredEstimate = max($totalWeightedCpuCores / $totalProcessed, self::MIN_MEASURED_CPU_CORE_ESTIMATE);
$this->capacity->setMeasuredWorkerCpuCoreEstimate($measuredEstimate);
$this->verbose(sprintf(' Measured worker CPU: %.3f cores/worker (from %d jobs)', $measuredEstimate, $totalProcessed), 'debug');
foreach ($perQueue as $key => $data) {
[$connection, $queue] = explode(':', $key, 2);

$hasCpu = $data['cpu_processed'] > 0;
$hasMemory = $data['mem_processed'] > 0;

if ($hasCpu && $hasMemory) {
$this->resolver->setMeasured(
$connection,
$queue,
$data['cpu_weighted'] / $data['cpu_processed'],
$data['mem_weighted'] / $data['mem_processed'],
$data['cpu_processed'],
$data['mem_processed'],
);
} elseif ($hasCpu) {
$this->resolver->setMeasuredCpu(
$connection,
$queue,
$data['cpu_weighted'] / $data['cpu_processed'],
$data['cpu_processed'],
);
} elseif ($hasMemory) {
$this->resolver->setMeasuredMemory(
$connection,
$queue,
$data['mem_weighted'] / $data['mem_processed'],
$data['mem_processed'],
);
}

$this->verbose(sprintf(
' Measured resources [%s]: cpu=%.3f cores (%d samples), mem=%.1f MB (%d samples)',
$key,
$hasCpu ? $data['cpu_weighted'] / $data['cpu_processed'] : 0.0,
$data['cpu_processed'],
$hasMemory ? $data['mem_weighted'] / $data['mem_processed'] : 0.0,
$data['mem_processed'],
), 'debug');
}
}

Expand Down
6 changes: 5 additions & 1 deletion src/Policies/NoScaleDownPolicy.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use Cbox\LaravelQueueAutoscale\Contracts\ScalingPolicy;
use Cbox\LaravelQueueAutoscale\Scaling\Calculators\CapacityCalculator;
use Cbox\LaravelQueueAutoscale\Scaling\DTOs\ResourceEstimate;
use Cbox\LaravelQueueAutoscale\Scaling\ScalingDecision;

/**
Expand Down Expand Up @@ -43,7 +44,10 @@ public function beforeScaling(ScalingDecision $decision): ?ScalingDecision

// Check if scale-down is forced by resource constraints
// If current workers exceed system capacity, we must scale down for stability
$capacityResult = $this->capacity->calculateMaxWorkers();
$capacityResult = $this->capacity->calculateMaxWorkers(
$decision->currentWorkers,
ResourceEstimate::globalDefault(),
);
if ($decision->currentWorkers > $capacityResult->finalMaxWorkers) {
// Let resource-constrained scale-down proceed to maintain system stability
return null;
Expand Down
31 changes: 6 additions & 25 deletions src/Scaling/Calculators/CapacityCalculator.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use Cbox\LaravelQueueAutoscale\Configuration\AutoscaleConfiguration;
use Cbox\LaravelQueueAutoscale\Scaling\DTOs\CapacityCalculationResult;
use Cbox\LaravelQueueAutoscale\Scaling\DTOs\ResourceEstimate;
use Cbox\SystemMetrics\SystemMetrics;

class CapacityCalculator
Expand All @@ -25,30 +26,12 @@ class CapacityCalculator

private ?float $cacheTimestamp = null;

/**
* Measured CPU core usage per worker derived from actual job processing metrics.
* When set, takes precedence over the config-based estimate.
*/
private ?float $measuredWorkerCpuCoreEstimate = null;

/**
* How long cached metrics remain valid (seconds).
* Should be shorter than the evaluation interval to ensure fresh data each tick.
*/
private const CACHE_TTL_SECONDS = 4.0;

/**
* Set measured CPU core estimate from actual job processing data.
*
* This value represents the average fraction of a CPU core each worker
* consumes during job processing (e.g. 0.15 = 15% of one core).
* When set, this takes precedence over the config-based worker_cpu_core_estimate.
*/
public function setMeasuredWorkerCpuCoreEstimate(?float $estimate): void
{
$this->measuredWorkerCpuCoreEstimate = $estimate;
}

/**
* Calculate maximum workers with detailed capacity breakdown
*
Expand All @@ -61,7 +44,7 @@ public function setMeasuredWorkerCpuCoreEstimate(?float $estimate): void
* @param int $currentWorkers Total workers currently running across all queues (for accurate capacity math)
* @return CapacityCalculationResult Detailed capacity analysis with system-wide max workers
*/
public function calculateMaxWorkers(int $currentWorkers = 0): CapacityCalculationResult
public function calculateMaxWorkers(int $currentWorkers, ResourceEstimate $estimate): CapacityCalculationResult
{
// Refresh system metrics if cache is stale or empty
if (! $this->isCacheValid()) {
Expand Down Expand Up @@ -92,11 +75,8 @@ public function calculateMaxWorkers(int $currentWorkers = 0): CapacityCalculatio
$reserveCores = AutoscaleConfiguration::reserveCpuCores();
$usableCores = max($this->cachedAvailableCores - $reserveCores, 0);

$workerCpuCoreEstimate = max(
$this->measuredWorkerCpuCoreEstimate ?? AutoscaleConfiguration::workerCpuCoreEstimate(),
0.01
);
$cpuEstimateSource = $this->measuredWorkerCpuCoreEstimate !== null ? 'measured' : 'config';
$workerCpuCoreEstimate = max($estimate->cpuCoresPerWorker, 0.01);
$cpuEstimateSource = $estimate->cpuSource->value;

$availableCoreEquivalents = $usableCores * ($availableCpuPercent / 100);
$additionalWorkersByCpu = (int) floor($availableCoreEquivalents / $workerCpuCoreEstimate);
Expand All @@ -107,7 +87,7 @@ public function calculateMaxWorkers(int $currentWorkers = 0): CapacityCalculatio
$currentMemoryPercent = $this->cachedMemoryPercent ?? 50.0;

$availableMemoryPercent = max($maxMemoryPercent - $currentMemoryPercent, 0);
$workerMemoryMb = AutoscaleConfiguration::workerMemoryMbEstimate();
$workerMemoryMb = max($estimate->memoryMbPerWorker, 1.0);
$totalMemoryMb = $this->cachedTotalMemoryMb ?? 4096.0;

// Calculate additional workers we can add based on available memory
Expand Down Expand Up @@ -155,6 +135,7 @@ public function calculateMaxWorkers(int $currentWorkers = 0): CapacityCalculatio
'available_memory_percent' => $availableMemoryPercent,
'total_memory_mb' => $totalMemoryMb,
'worker_memory_mb' => $workerMemoryMb,
'memory_estimate_source' => $estimate->memorySource->value,
],
];

Expand Down
12 changes: 12 additions & 0 deletions src/Scaling/DTOs/EstimateSource.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
<?php

declare(strict_types=1);

namespace Cbox\LaravelQueueAutoscale\Scaling\DTOs;

enum EstimateSource: string
{
case Measured = 'measured';
case Config = 'config';
case Default = 'default';
}
Loading