From b3b12af58538b1255a8296bffbe6afe16c0d4892 Mon Sep 17 00:00:00 2001 From: brandonkelly Date: Sat, 7 Oct 2023 12:33:06 +0200 Subject: [PATCH] Queue job profiling --- src/config/app.php | 2 +- src/controllers/QueueController.php | 8 +- src/db/Table.php | 2 + src/migrations/Install.php | 6 + .../m231006_071504_job_profiling.php | 36 ++++ src/queue/BaseBatchedJob.php | 13 +- src/queue/ProfilableJobInterface.php | 27 +++ src/queue/Queue.php | 191 ++++++++++++------ src/queue/jobs/ResaveElements.php | 1 + 9 files changed, 221 insertions(+), 65 deletions(-) create mode 100644 src/migrations/m231006_071504_job_profiling.php create mode 100644 src/queue/ProfilableJobInterface.php diff --git a/src/config/app.php b/src/config/app.php index 41516e522eb..d187090065a 100644 --- a/src/config/app.php +++ b/src/config/app.php @@ -4,7 +4,7 @@ 'id' => 'CraftCMS', 'name' => 'Craft CMS', 'version' => '4.5.6.1', - 'schemaVersion' => '4.5.3.0', + 'schemaVersion' => '4.6.0.0', 'minVersionRequired' => '3.7.11', 'basePath' => dirname(__DIR__), // Defines the @app alias 'runtimePath' => '@storage/runtime', // Defines the @runtime alias diff --git a/src/controllers/QueueController.php b/src/controllers/QueueController.php index 6595d608415..974123016b8 100644 --- a/src/controllers/QueueController.php +++ b/src/controllers/QueueController.php @@ -9,6 +9,7 @@ use Craft; use craft\helpers\App; +use craft\helpers\DateTimeHelper; use craft\helpers\Json; use craft\queue\QueueInterface; use craft\web\Controller; @@ -174,7 +175,12 @@ public function actionGetJobInfo(): Response return $this->asJson([ 'total' => $this->queue->getTotalJobs(), - 'jobs' => $this->queue->getJobInfo($limit), + 'jobs' => array_map(function(array $info) { + if (isset($info['averageDuration'])) { + $info['averageDurationLabel'] = DateTimeHelper::humanDuration($info['averageDuration']); + } + return $info; + }, $this->queue->getJobInfo($limit)), ]); } diff --git a/src/db/Table.php b/src/db/Table.php index 3202637cc6b..aac97b0f92d 100644 --- a/src/db/Table.php +++ b/src/db/Table.php @@ -74,6 +74,8 @@ abstract class Table /** @since 3.4.0 */ public const PROJECTCONFIG = '{{%projectconfig}}'; public const QUEUE = '{{%queue}}'; + /** @since 4.6.0 */ + public const QUEUEPROFILES = '{{%queueprofiles}}'; public const RELATIONS = '{{%relations}}'; public const SECTIONS = '{{%sections}}'; public const SECTIONS_SITES = '{{%sections_sites}}'; diff --git a/src/migrations/Install.php b/src/migrations/Install.php index ebe50b3f05d..06457077ea9 100644 --- a/src/migrations/Install.php +++ b/src/migrations/Install.php @@ -557,6 +557,12 @@ public function createTables(): void 'dateFailed' => $this->dateTime(), 'error' => $this->text(), ]); + $this->createTable(Table::QUEUEPROFILES, [ + 'key' => $this->string(), + 'dateExecuted' => $this->dateTime()->notNull(), + 'duration' => $this->integer()->notNull(), + 'PRIMARY KEY([[key]], [[dateExecuted]])', + ]); $this->createTable(Table::RELATIONS, [ 'id' => $this->primaryKey(), 'fieldId' => $this->integer()->notNull(), diff --git a/src/migrations/m231006_071504_job_profiling.php b/src/migrations/m231006_071504_job_profiling.php new file mode 100644 index 00000000000..84821eea3bb --- /dev/null +++ b/src/migrations/m231006_071504_job_profiling.php @@ -0,0 +1,36 @@ +createTable(Table::QUEUEPROFILES, [ + 'key' => $this->string(), + 'dateExecuted' => $this->dateTime()->notNull(), + 'duration' => $this->integer()->notNull(), + 'PRIMARY KEY([[key]], [[dateExecuted]])', + ]); + + return true; + } + + /** + * @inheritdoc + */ + public function safeDown(): bool + { + echo "m231006_071504_job_profiling cannot be reverted.\n"; + return false; + } +} diff --git a/src/queue/BaseBatchedJob.php b/src/queue/BaseBatchedJob.php index 80bbb80175b..6b215a20c75 100644 --- a/src/queue/BaseBatchedJob.php +++ b/src/queue/BaseBatchedJob.php @@ -9,6 +9,7 @@ use Craft; use craft\base\Batchable; +use craft\helpers\ArrayHelper; use craft\helpers\ConfigHelper; use craft\helpers\Queue as QueueHelper; use craft\i18n\Translation; @@ -31,7 +32,7 @@ * @author Pixel & Tonic, Inc. * @since 4.4.0 */ -abstract class BaseBatchedJob extends BaseJob +abstract class BaseBatchedJob extends BaseJob implements ProfilableJobInterface { /** * @var int The number of items that should be processed in a single batch @@ -172,4 +173,14 @@ final public function getDescription(): ?string 'total' => $totalBatches, ]); } + + /** + * @inheritdoc + */ + public function getProfileAttributes(): array + { + $arr = ArrayHelper::toArray($this); + unset($arr['batchIndex'], $arr['itemOffset'], $arr['priority'], $arr['ttr']); + return $arr; + } } diff --git a/src/queue/ProfilableJobInterface.php b/src/queue/ProfilableJobInterface.php new file mode 100644 index 00000000000..bde1b18d5e3 --- /dev/null +++ b/src/queue/ProfilableJobInterface.php @@ -0,0 +1,27 @@ + + * @since 4.6.0 + */ +interface ProfilableJobInterface +{ + /** + * Returns the job attributes that should be used to semi-uniquely identify this job, based on its configuration, + * so that its profile results can be compared with recent similarly-configured jobs. + * + * @return array + */ + public function getProfileAttributes(): array; +} diff --git a/src/queue/Queue.php b/src/queue/Queue.php index de2eb932869..b19b29ec86f 100644 --- a/src/queue/Queue.php +++ b/src/queue/Queue.php @@ -12,6 +12,7 @@ use craft\db\Table; use craft\helpers\App; use craft\helpers\ArrayHelper; +use craft\helpers\DateTimeHelper; use craft\helpers\Db; use craft\helpers\Json; use craft\helpers\Queue as QueueHelper; @@ -20,6 +21,7 @@ use craft\i18n\Translation; use craft\queue\jobs\Proxy; use DateTime; +use Throwable; use yii\base\Exception; use yii\base\InvalidArgumentException; use yii\base\InvalidConfigException; @@ -143,12 +145,22 @@ public function init(): void $this->proxyQueue = Instance::ensure($this->proxyQueue, BaseQueue::class); } - $this->on(self::EVENT_BEFORE_EXEC, function(ExecEvent $e) { - $this->_executingJobId = $e->id; + $time = null; + + $this->on(self::EVENT_BEFORE_EXEC, function(ExecEvent $event) use (&$time) { + $this->_executingJobId = $event->id; + $time = microtime(true); }); - $this->on(self::EVENT_AFTER_EXEC, function(ExecEvent $e) { + $this->on(self::EVENT_AFTER_EXEC, function(ExecEvent $event) use (&$time) { $this->_executingJobId = null; + + // save the profile info + Db::upsert(Table::QUEUEPROFILES, [ + 'key' => $this->_profileKey($event->job), + 'dateExecuted' => Db::prepareDateForDb(DateTimeHelper::now()), + 'duration' => round(microtime(true) - $time), + ]); }); } @@ -173,6 +185,43 @@ public function run(bool $repeat = false, int $timeout = 0): ?int }); } + /** + * Returns the average duration for the given job. + * + * @param string $id The job ID + * @return float|null The average job duration, or null if not knwn + * @since 4.6.0 + */ + public function getAverageDuration(string $id): ?float + { + $result = $this->_job($id); + + if (!$result) { + throw new InvalidArgumentException("Invalid job ID: $id"); + } + + $job = $this->serializer->unserialize($result['job']); + $key = $this->_profileKey($job); + + // delete all but the last 5 rows + $time = (new Query()) + ->select('dateExecuted') + ->from(Table::QUEUEPROFILES) + ->where(['key' => $key]) + ->orderBy(['dateExecuted' => SORT_DESC]) + ->offset(5) + ->scalar(); + + if ($time) { + Db::delete(Table::QUEUEPROFILES, ['<=', 'dateExecuted', $time]); + } + + return (new Query()) + ->from(Table::QUEUEPROFILES) + ->where(['key' => $key]) + ->average('duration') ?: null; + } + /** * Executes a single job. * @@ -181,24 +230,24 @@ public function run(bool $repeat = false, int $timeout = 0): ?int */ public function executeJob(?string $id = null): bool { - $payload = $this->reserve($id); + $result = $this->reserve($id); - if (!$payload) { + if (!$result) { return false; } - if ($this->handleMessage($payload['id'], $payload['job'], $payload['ttr'], $payload['attempt'])) { - $this->release($payload['id']); + if ($this->handleMessage($result['id'], $result['job'], $result['ttr'], $result['attempt'])) { + $this->release($result['id']); if ($this->hasEventHandlers(self::EVENT_AFTER_EXEC_AND_RELEASE)) { // Can't just capture the exec event from handleMessage() // because it was probably created in a subprocess - [$job, $error] = $this->unserializeMessage($payload['job']); + [$job, $error] = $this->unserializeMessage($result['job']); $this->trigger(self::EVENT_AFTER_EXEC_AND_RELEASE, new ExecEvent([ - 'id' => $payload['id'], + 'id' => $result['id'], 'job' => $job, - 'ttr' => $payload['ttr'], - 'attempt' => $payload['attempt'], + 'ttr' => $result['ttr'], + 'attempt' => $result['attempt'], 'error' => $error, ])); } @@ -233,15 +282,13 @@ public function isFailed(string $id): bool */ public function status($id): int { - $payload = $this->db->usePrimary(function() use ($id) { - return $this->_createJobQuery() - ->select(['fail', 'timeUpdated']) - // No need to use andWhere() here since we're fetching by ID - ->where(['id' => $id]) - ->one($this->db); - }); + $result = $this->_job($id); - return $this->_status($payload); + if (!$result) { + return self::STATUS_DONE; + } + + return $this->_status($result); } /** @@ -473,19 +520,14 @@ public function getTotalFailed(): int */ public function getJobDetails(string $id): array { - $result = $this->db->usePrimary(function() use ($id) { - return (new Query()) - ->from($this->tableName) - ->where(['id' => $id]) - ->one($this->db); - }); + $result = $this->_job($id); - if ($result === false) { + if (!$result) { throw new InvalidArgumentException("Invalid job ID: $id"); } $formatter = Craft::$app->getFormatter(); - $job = $this->serializer->unserialize($this->_jobData($result['job'])); + $job = $this->serializer->unserialize($result['job']); return ArrayHelper::filterEmptyStringsFromArray([ 'delay' => max(0, $result['timePushed'] + $result['delay'] - time()), @@ -500,6 +542,7 @@ public function getJobDetails(string $id): array 'Pushed at' => $result['timePushed'] ? $formatter->asDatetime($result['timePushed']) : '', 'Updated at' => $result['timeUpdated'] ? $formatter->asDatetime($result['timeUpdated']) : '', 'Failed at' => $result['dateFailed'] ? $formatter->asDatetime($result['dateFailed']) : '', +// 'Average duration' => $result['averageDuration'] ? ]); } @@ -706,48 +749,56 @@ private function componentId(): string */ protected function reserve(?string $id = null): ?array { - $payload = null; + $result = null; - $this->_lock(function() use (&$payload, $id) { + $this->_lock(function() use (&$result, $id) { // Move expired messages into waiting list $this->_moveExpired(); // Reserve one message - /** @var array|null $payload */ - $payload = $this->db->usePrimary(function() use ($id) { - $query = $this->_createJobQuery() - ->andWhere(['fail' => false, 'timeUpdated' => null]) - ->andWhere('[[timePushed]] + [[delay]] <= :time', ['time' => time()]) - ->orderBy(['priority' => SORT_ASC, 'id' => SORT_ASC]) - ->limit(1); - - if ($id) { - $query->andWhere(['id' => $id]); - } - - return $query->one($this->db) ?: null; - }); + $result = $this->_job($id); - if (is_array($payload)) { - $payload['dateReserved'] = new DateTime(); - $payload['timeUpdated'] = $payload['dateReserved']->getTimestamp(); - $payload['attempt'] = (int)$payload['attempt'] + 1; + if (is_array($result)) { + $result['dateReserved'] = new DateTime(); + $result['timeUpdated'] = $result['dateReserved']->getTimestamp(); + $result['attempt'] = (int)$result['attempt'] + 1; Db::update($this->tableName, [ - 'dateReserved' => Db::prepareDateForDb($payload['dateReserved']), - 'timeUpdated' => $payload['timeUpdated'], - 'attempt' => $payload['attempt'], + 'dateReserved' => Db::prepareDateForDb($result['dateReserved']), + 'timeUpdated' => $result['timeUpdated'], + 'attempt' => $result['attempt'], ], [ - 'id' => $payload['id'], + 'id' => $result['id'], ], [], false, $this->db); } }); + return $result; + } + + private function _job(?string $id): ?array + { + // Reserve one message + /** @var array|null $result */ + $result = $this->db->usePrimary(function() use ($id) { + $query = $this->_createJobQuery() + ->andWhere(['fail' => false, 'timeUpdated' => null]) + ->andWhere('[[timePushed]] + [[delay]] <= :time', ['time' => time()]) + ->orderBy(['priority' => SORT_ASC, 'id' => SORT_ASC]) + ->limit(1); + + if ($id) { + $query->andWhere(['id' => $id]); + } + + return $query->one($this->db) ?: null; + }); + // pgsql - if (is_array($payload)) { - $payload['job'] = $this->_jobData($payload['job']); + if (is_array($result)) { + $result['job'] = $this->_jobData($result['job']); } - return $payload; + return $result; } /** @@ -772,6 +823,26 @@ private function _jobData(mixed $job): string return $job; } + private function _profileKey(object $job): string + { + $class = $key = get_class($job); + + if ($job instanceof ProfilableJobInterface) { + try { + $attributes = $job->getProfileAttributes(); + if ($job instanceof BaseJob) { + unset($attributes['description']); + } + $key .= sprintf(':%s', md5(Json::encode($attributes))); + } catch (Throwable $e) { + Craft::warning(sprintf("Couldn't build a profile key for queue job %s: %s", $class, $e->getMessage()), __METHOD__); + Craft::$app->getErrorHandler()->logException($e); + } + } + + return $key; + } + /** * Moves expired messages into waiting list. * @@ -885,20 +956,16 @@ private function _createFailedJobQuery(): Query /** * Returns a job's status. * - * @param array|false $payload + * @param array $result * @return int */ - private function _status(array|false $payload): int + private function _status(array $result): int { - if (!$payload) { - return self::STATUS_DONE; - } - - if ($payload['fail']) { + if ($result['fail']) { return self::STATUS_FAILED; } - if (!$payload['timeUpdated']) { + if (!$result['timeUpdated']) { return self::STATUS_WAITING; } diff --git a/src/queue/jobs/ResaveElements.php b/src/queue/jobs/ResaveElements.php index 96653659fd5..4ff6342efab 100644 --- a/src/queue/jobs/ResaveElements.php +++ b/src/queue/jobs/ResaveElements.php @@ -91,6 +91,7 @@ protected function loadData(): Batchable */ protected function processItem(mixed $item): void { +// sleep(1); // Make sure the element was queried with its content /** @var ElementInterface $item */ if ($item::hasContent() && $item->contentId === null) {