Skip to content

Commit

Permalink
Retries and max attempts
Browse files Browse the repository at this point in the history
  • Loading branch information
timkelty committed Nov 28, 2023
1 parent e073ee7 commit a404109
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 17 deletions.
13 changes: 12 additions & 1 deletion src/runtime/event/CliHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ class CliHandler implements Handler
public ?Process $process = null;
protected string $scriptPath = '/var/task/craft';
protected ?float $totalRunningTime = null;
public int $maxAttempts = 10;
public int $attempts = 0;

/**
* @inheritDoc
Expand Down Expand Up @@ -50,8 +52,8 @@ public function handle(mixed $event, Context $context, $throw = false): array

echo "Command succeeded after {$this->getTotalRunningTime()} seconds: $command\n";
} catch (\Throwable $e) {
$this->attempts++;
echo "Command failed after {$this->getTotalRunningTime()} seconds: $command\n";

echo "Exception while handling CLI event:\n";
echo "{$e->getMessage()}\n";
echo "{$e->getTraceAsString()}\n";
Expand Down Expand Up @@ -85,8 +87,17 @@ public function getTotalRunningTime(): float
return max(0, microtime(true) - $this->process->getStartTime());
}

public function getRemainingAttempts(): int
{
return $this->maxAttempts - $this->attempts;
}

public function shouldRetry(): bool
{
if (!$this->getRemainingAttempts()) {
return false;
}

$diff = Runtime::MAX_EXECUTION_SECONDS - $this->getTotalRunningTime();

return $diff > static::MAX_EXECUTION_BUFFER_SECONDS;
Expand Down
35 changes: 19 additions & 16 deletions src/runtime/event/SqsHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
use Bref\Event\Sqs\SqsEvent;
use Bref\Event\Sqs\SqsRecord;
use Illuminate\Support\Collection;
use RuntimeException;
use Symfony\Component\Process\Exception\ProcessTimedOutException;
use Symfony\Component\Process\Exception\RuntimeException;

class SqsHandler extends \Bref\Event\Sqs\SqsHandler
{
Expand All @@ -31,33 +31,36 @@ public function handleSqs(SqsEvent $event, Context $context): void
$jobId = $body->jobId ?? null;

if (!$jobId) {
throw new RuntimeException('The SQS message does not contain a job ID.');
throw new \Exception('The SQS message does not contain a job ID.');
}

echo "Executing job: #$jobId";

$cliHandler->handle([
'command' => "cloud/queue/exec {$jobId}",
], $context, true);
} catch (ProcessTimedOutException $e) {
if (!$cliHandler->shouldRetry()) {
echo "Job #$jobId ran for {$cliHandler->getTotalRunningTime()} seconds and will not be retried:\n";
} catch (RuntimeException $e) {
if ($e instanceof ProcessTimedOutException && !$cliHandler->shouldRetry()) {
echo "Job #$jobId will not be retried:\n";
echo "Attempts: {$cliHandler->attempts}\n";
echo "Message: #{$record->getMessageId()}\n";
echo "Running Time: {$cliHandler->getTotalRunningTime()} seconds\n";

$failMessage = $cliHandler->getRemainingAttempts()
? 'Job exceeded maximum running time: 15 minutes'
: "Job exceeded maximum attempts: {$cliHandler->maxAttempts}";

$failMessage = 'Job execution exceeded 15 minutes';
$cliHandler->handle([
'command' => "cloud/queue/fail {$jobId} --message={$failMessage}",
], $context, true);
try {
(new CliHandler())->handle([
'command' => "cloud/queue/fail {$jobId} --message={$failMessage}",
], $context, true);
} catch (\Throwable $e) {
$this->markAsFailed($record);
}

return;
}

// triggers SQS retry
$this->markAsFailed($record);
} catch (\Throwable $e) {
echo "Exception while processing SQS message:\n";
echo "{$e->getMessage()}\n";
echo "{$e->getTraceAsString()}\n";
return;
}
});
}
Expand Down

0 comments on commit a404109

Please sign in to comment.