Skip to content

Commit

Permalink
ported updates from version 1.x
Browse files Browse the repository at this point in the history
  • Loading branch information
arusinowski committed Dec 9, 2024
1 parent 29a1dcc commit 97adf6a
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 6 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ Set up the QueueMonitor configuration in your `config/app_local.php`:
```php
// ...
'QueueMonitor' => [
// With this setting you can enable or disable the queue monitoring, the queue
// monitoring is enabled by default
'disable' => false,

// mailer config, the default is `default` mailer, you can ommit
// this setting if you use default value
'mailerConfig' => 'myCustomMailer',
Expand Down
13 changes: 13 additions & 0 deletions src/Command/NotifyCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,18 @@
use Cake\Console\ConsoleOptionParser;
use Cake\Core\Configure;
use Cake\Log\LogTrait;
use CakeDC\QueueMonitor\Core\DisableTrait;
use CakeDC\QueueMonitor\Service\QueueMonitoringService;
use Exception;
use Psr\Log\LogLevel;
use function Cake\I18n\__;

/**
* QueueMonitoringNotify command.
*/
final class NotifyCommand extends Command
{
use DisableTrait;
use LogTrait;

private const DEFAULT_LONG_JOB_IN_MINUTES = 30;
Expand All @@ -37,6 +40,7 @@ final class NotifyCommand extends Command
public function __construct(
private readonly QueueMonitoringService $queueMonitoringService
) {
parent::__construct();
}

/**
Expand All @@ -61,6 +65,15 @@ public function buildOptionParser(ConsoleOptionParser $parser): ConsoleOptionPar
*/
public function execute(Arguments $args, ConsoleIo $io)
{
if ($this->isDisabled()) {
$this->log(
'Notification were not sent because Queue Monitor is disabled.',
LogLevel::WARNING
);

return self::CODE_SUCCESS;
}

try {
$this->queueMonitoringService->notifyAboutLongRunningJobs(
(int)Configure::read(
Expand Down
23 changes: 18 additions & 5 deletions src/Command/PurgeCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use Cake\Console\ConsoleOptionParser;
use Cake\Core\Configure;
use Cake\Log\LogTrait;
use CakeDC\QueueMonitor\Core\DisableTrait;
use CakeDC\QueueMonitor\Service\QueueMonitoringService;
use Exception;
use Psr\Log\LogLevel;
Expand All @@ -28,6 +29,7 @@
*/
final class PurgeCommand extends Command
{
use DisableTrait;
use LogTrait;

private const DEFAULT_PURGE_DAYS_OLD = 30;
Expand All @@ -38,6 +40,7 @@ final class PurgeCommand extends Command
public function __construct(
private readonly QueueMonitoringService $queueMonitoringService
) {
parent::__construct();
}

/**
Expand All @@ -62,18 +65,28 @@ public function buildOptionParser(ConsoleOptionParser $parser): ConsoleOptionPar
*/
public function execute(Arguments $args, ConsoleIo $io)
{
if ($this->isDisabled()) {
$this->log(
'Logs were not purged because Queue Monitor is disabled.',
LogLevel::WARNING
);

return self::CODE_SUCCESS;
}
$purgeLogsOlderThanDays = (int)Configure::read(
'QueueMonitor.purgeLogsOlderThanDays',
self::DEFAULT_PURGE_DAYS_OLD
);

$purgeToDate = $this->queueMonitoringService->getPurgeToDate(
(int)Configure::read(
'QueueMonitor.purgeLogsOlderThanDays',
self::DEFAULT_PURGE_DAYS_OLD
)
$purgeLogsOlderThanDays
);
$this->log(
"Purging queue logs older than {$purgeToDate->toDateTimeString()} UTC",
LogLevel::INFO
);
try {
$rowCount = $this->queueMonitoringService->purgeLogs(self::DEFAULT_PURGE_DAYS_OLD);
$rowCount = $this->queueMonitoringService->purgeLogs($purgeLogsOlderThanDays);
$this->log(
"Purged $rowCount queue messages older than {$purgeToDate->toDateTimeString()} UTC",
LogLevel::INFO
Expand Down
20 changes: 20 additions & 0 deletions src/Core/DisableTrait.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?php
declare(strict_types=1);

namespace CakeDC\QueueMonitor\Core;

use Cake\Core\Configure;

/**
* Disable trait
*/
trait DisableTrait
{
/**
* Check if queue monitoring is disabled by configuration
*/
protected function isDisabled(): bool
{
return (bool)Configure::read('QueueMonitor.disabled', false);
}
}
20 changes: 19 additions & 1 deletion src/Listener/QueueMonitorListener.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,20 @@
use Cake\Event\EventInterface;
use Cake\Event\EventListenerInterface;
use Cake\I18n\DateTime;
use Cake\Log\Log;
use Cake\Log\LogTrait;
use Cake\ORM\Locator\LocatorAwareTrait;
use Cake\ORM\Table;
use Cake\Queue\Job\Message;
use Cake\Utility\Hash;
use CakeDC\QueueMonitor\Core\DisableTrait;
use CakeDC\QueueMonitor\Exception\QueueMonitorException;
use CakeDC\QueueMonitor\Model\Status\MessageEvent;
use CakeDC\QueueMonitor\Model\Table\LogsTable;
use Exception;
use Interop\Queue\Message as QueueMessage;
use Throwable;
use function Cake\I18n\__;

/**
* QueueMonitorListener
Expand All @@ -34,6 +37,7 @@
*/
final class QueueMonitorListener implements EventListenerInterface
{
use DisableTrait;
use LocatorAwareTrait;
use LogTrait;

Expand Down Expand Up @@ -73,6 +77,10 @@ public function implementedEvents(): array
*/
public function handleException(EventInterface $event, ?Message $message, ?Throwable $exception = null): void
{
if ($this->isDisabled()) {
return;
}

try {
$message = $this->validateQueueMessage($message);

Expand Down Expand Up @@ -103,6 +111,10 @@ public function handleException(EventInterface $event, ?Message $message, ?Throw
*/
public function handleMessageEvent(EventInterface $event, ?Message $message): void
{
if ($this->isDisabled()) {
return;
}

try {
$message = $this->validateQueueMessage($message);

Expand All @@ -122,6 +134,10 @@ public function handleMessageEvent(EventInterface $event, ?Message $message): vo
*/
public function handleSeen(EventInterface $event, ?QueueMessage $queueMessage): void
{
if ($this->isDisabled()) {
return;
}

try {
$queueMessage = $this->validateInteropQueueMessage($queueMessage);
$messageBody = json_decode($queueMessage->getBody(), true);
Expand Down Expand Up @@ -174,7 +190,9 @@ private function storeEvent(
'properties' => $queueMessage->getProperties(),
]);

$this->QueueMonitoringLogs->saveOrFail($queueMonitoringLog);
if (!$this->QueueMonitoringLogs->save($queueMonitoringLog)) {
Log::warning(__('Unable to save queue monitoring log into database'));
}
}

/**
Expand Down
17 changes: 17 additions & 0 deletions src/Model/Status/MessageEvent.php
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,23 @@ public function getEventAsInt(): int
};
}

/**
* Get Event enum from int value
*/
public static function getEventFromInt(?int $eventInt): ?self
{
return match ($eventInt) {
1 => self::Seen,
2 => self::Invalid,
3 => self::Start,
4 => self::Exception,
5 => self::Success,
6 => self::Reject,
7 => self::Failure,
default => null,
};
}

/**
* Get as options
*/
Expand Down

0 comments on commit 97adf6a

Please sign in to comment.