From a56ff0cb96d4893c063e4d5546ae2dc37c506c41 Mon Sep 17 00:00:00 2001 From: Hamed Kamrava Date: Thu, 20 Oct 2022 15:59:53 +0300 Subject: [PATCH 1/3] change fair signal pattern --- src/Repositories/RedisKeys.php | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/Repositories/RedisKeys.php b/src/Repositories/RedisKeys.php index f88e5d1..607ac59 100644 --- a/src/Repositories/RedisKeys.php +++ b/src/Repositories/RedisKeys.php @@ -25,16 +25,17 @@ private function queueKey($queue, $prefix = '') ); } - private function fairSignalKey($queue) + private function fairSignalKey($queue, $partition = '*') { $signal_key_prefix_for_horizon = config('fair-queue.signal_key_prefix_for_horizon'); $horizon_prefix = config('horizon.prefix'); return sprintf( - '%s%s%s:[0-9]*', + '%s%s%s:%s:[0-9]*', $horizon_prefix, $signal_key_prefix_for_horizon, - $queue + $queue, + $partition, ); } From efb1c77fbd5d12e55b62648dba1d4c9f71554f42 Mon Sep 17 00:00:00 2001 From: Hamed Kamrava Date: Thu, 20 Oct 2022 19:38:20 +0300 Subject: [PATCH 2/3] better comments --- src/Interfaces/RepositoryInterface.php | 2 +- src/Repositories/RedisKeys.php | 10 +++++++++- src/Repositories/RedisRepository.php | 22 +++++++++++++++++++--- 3 files changed, 29 insertions(+), 5 deletions(-) diff --git a/src/Interfaces/RepositoryInterface.php b/src/Interfaces/RepositoryInterface.php index 2630ac9..909497a 100644 --- a/src/Interfaces/RepositoryInterface.php +++ b/src/Interfaces/RepositoryInterface.php @@ -65,7 +65,7 @@ public function recoverStuckJobs(); public function getConnection(); - public function countFairSignals($queue); + public function countFairSignals($queue, $partition); public function countAllJobs($queue); diff --git a/src/Repositories/RedisKeys.php b/src/Repositories/RedisKeys.php index 607ac59..15bf243 100644 --- a/src/Repositories/RedisKeys.php +++ b/src/Repositories/RedisKeys.php @@ -25,7 +25,15 @@ private function queueKey($queue, $prefix = '') ); } - private function fairSignalKey($queue, $partition = '*') + /** + * Get Fair Signal Redis Key Name + * + * @param string $queue + * @param string $partition + * + * @return string + */ + private function fairSignalKey($queue, $partition) { $signal_key_prefix_for_horizon = config('fair-queue.signal_key_prefix_for_horizon'); $horizon_prefix = config('horizon.prefix'); diff --git a/src/Repositories/RedisRepository.php b/src/Repositories/RedisRepository.php index 4f473ad..7b13c54 100644 --- a/src/Repositories/RedisRepository.php +++ b/src/Repositories/RedisRepository.php @@ -343,16 +343,32 @@ public function recoverPartitionLost($queue, $partition, $age = 300) return $count; } - public function countFairSignals($queue) + + /** + * Count Fair Signals + * + * @param string $queue + * @param string $partition + * + * @return int + */ + public function countFairSignals($queue, $partition) { $signals_redis = $this->getSignalsConnection(); - $pattern = $this->fairSignalKey($queue); + $pattern = $this->fairSignalKey($queue, $partition); return $signals_redis->eval(<<<"LUA" return #redis.pcall('keys', '{$pattern}') LUA, 0); } + /** + * Count All Jobs + * + * @param string $queue + * + * @return int + */ public function countAllJobs($queue) { $redis = $this->getConnection(); @@ -375,7 +391,7 @@ public function recoverStuckJobs() foreach ($queues as $queue) { $jobs_count = $this->countAllJobs($queue); - $signals_count = $this->countFairSignals($queue); + $signals_count = $this->countFairSignals($queue, '*'); if($jobs_count > $signals_count) { $queue_size = $jobs_count - $signals_count; From fec04b1d8feac964cb635e3347a2ee350688ec7a Mon Sep 17 00:00:00 2001 From: Hamed Kamrava Date: Thu, 20 Oct 2022 21:21:47 +0300 Subject: [PATCH 3/3] add more docs --- src/Repositories/RedisRepository.php | 325 ++++++++++++++++++++++++++- 1 file changed, 316 insertions(+), 9 deletions(-) diff --git a/src/Repositories/RedisRepository.php b/src/Repositories/RedisRepository.php index 7b13c54..2bef050 100644 --- a/src/Repositories/RedisRepository.php +++ b/src/Repositories/RedisRepository.php @@ -11,26 +11,53 @@ class RedisRepository implements RepositoryInterface { use RedisKeys; + /** + * Get all partitions + * + * @param string $queue + * + * @return array + */ public function partitions($queue) { return $this->partitionsPrivate($queue); } + /** + * Get all queues + * + * @return array + */ public function queues() { return $this->queuesPrivate('queueListPattern', 'extractQueueNameFromPartitionKey'); } + /** + * Get all queues with their partitions + * + * @return array + */ public function queuesWithPartitions() { return $this->queuesWithPartitionsPrivate('queues', 'partitions'); } + /** + * Get all jobs + * + * @return array + */ public function jobs($queue, $partition) { return $this->jobsPrivate($queue, $partition, 'partitionKey'); } + /** + * Get job Class name and the payload + * + * @return array + */ public function job($queue, $partition, $index) { $job = $this->jobPrivate($queue, $partition, $index, 'partitionKey'); @@ -41,6 +68,13 @@ public function job($queue, $partition, $index) ]; } + /** + * Get partitions With number of jobs + * + * @param string $queue + * + * @return array + */ public function partitionsWithCount($queue) { return $this->partitionsWithCountPrivate( @@ -53,11 +87,25 @@ public function partitionsWithCount($queue) ); } + /** + * Get total jobs of the given queues + * + * @param array $queues + * + * @return int + */ public function totalJobsCount($queues) { return $this->totalJobsCountPrivate($queues, 'partitions', 'partitionKey'); } + /** + * Get Total Jobs + * + * @param array $queues + * + * @return int + */ public function processedJobsInPastMinutes($queues, $minutes) { $redis = $this->getConnection(); @@ -72,6 +120,14 @@ public function processedJobsInPastMinutes($queues, $minutes) return $total; } + /** + * Get number of processed jobs of a queue in past minutes + * + * @param string $queue + * @param int $minutes + * + * @return int + */ public function queueProcessedJobsInPastMinutes($queue, $minutes) { $redis = $this->getConnection(); @@ -86,6 +142,15 @@ public function queueProcessedJobsInPastMinutes($queue, $minutes) return $total; } + /** + * Get number of processed jobs of a partition in past minutes + * + * @param string $queue + * @param string $partition + * @param int $minutes + * + * @return int + */ public function partitionProcessedJobsInPastMinutes($queue, $partition, $minutes) { $redis = $this->getConnection(); @@ -94,6 +159,13 @@ public function partitionProcessedJobsInPastMinutes($queue, $partition, $minutes return $redis->zcard($partition_key); } + /** + * Get list of failed partitions of a queue + * + * @param string $queue + * + * @return array + */ public function failedPartitions($queue) { return $this->partitionsPrivate( @@ -103,16 +175,33 @@ public function failedPartitions($queue) ); } + /** + * Get list of failed queues + * + * @return array + */ public function failedQueues() { return $this->queuesPrivate('failedQueueListPattern', 'extractQueueNameFromFailedPartitionKey'); } + /** + * Get list of failed queues + * + * @return array + */ public function failedQueuesWithPartitions() { return $this->queuesWithPartitionsPrivate('failedQueues', 'failedPartitions'); } + /** + * Get list of failed partitions with number of jobs + * + * @param string $queue + * + * @return array + */ public function failedPartitionsWithCount($queue) { return $this->partitionsWithCountPrivate( @@ -125,11 +214,28 @@ public function failedPartitionsWithCount($queue) ); } + /** + * Get list of failed job of a partition + * + * @param string $queue + * @param string $partition + * + * @return array + */ public function failedJobs($queue, $partition) { return $this->jobsPrivate($queue, $partition, 'failedPartitionKey'); } + /** + * Get failed job Class name and Payload based on its index + * + * @param string $queue + * @param string $partition + * @param int $index + * + * @return array + */ public function failedJob($queue, $partition, $index) { $job = $this->jobPrivate($queue, $partition, $index, 'failedPartitionKey'); @@ -140,11 +246,26 @@ public function failedJob($queue, $partition, $index) ]; } + /** + * Get number of failed jobs of given queues + * + * @param array $queues + * + * @return int + */ public function totalFailedJobsCount($queues) { return $this->totalJobsCountPrivate($queues, 'failedPartitions', 'failedPartitionKey'); } + /** + * Push a job into given queue and partition at the tail of list + * + * @param string $queue + * @param string $partition + * + * @return void + */ public function push($queue, $partition, $job) { $redis = $this->getConnection(); @@ -154,6 +275,14 @@ public function push($queue, $partition, $job) $redis->rpush($partitionKey, $job); } + /** + * Push a job into given queue and partition at the head of list + * + * @param string $queue + * @param string $partition + * + * @return void + */ public function lPush($queue, $partition, $job) { $redis = $this->getConnection(); @@ -163,6 +292,14 @@ public function lPush($queue, $partition, $job) $redis->lpush($partitionKey, $job); } + /** + * Push a failed job into given queue and partition at the tail of list + * + * @param string $queue + * @param string $partition + * + * @return void + */ public function pushFailed($queue, $partition, $job) { $redis = $this->getConnection(); @@ -172,29 +309,66 @@ public function pushFailed($queue, $partition, $job) $redis->rpush($partitionKey, $job); } + /** + * Returns and removes the first element of the list + * + * @param string $queue + * @param string $partition + * + * @return array|null + */ public function pop($queue, $partition) { return $this->popPrivate($queue, $partition, 'partitionKey'); } + /** + * Returns and removes the first element of the failed jobs list + * + * @param string $queue + * @param string $partition + * + * @return array|null + */ public function popFailed($queue, $partition) { return $this->popPrivate($queue, $partition, 'failedPartitionKey'); } + /** + * Sets a expect of acknowledge + * + * @param string $connection + * @param string $queue + * @param string $partition + * @param string $jobUuid + * @param string $job + * + * @return void + */ public function expectAcknowledge($connection, $queue, $partition, $jobUuid, $job) { $redis = $this->getConnection(); - $key = $this->inProgressJobKey($connection, $queue, $partition, $jobUuid); + $key = $this->inProgressJobKey($connection, $queue, $partition, $jobUuid); $sampleSignalKey = $this->queueSampleSignalKey($queue); $redis->mset([ - $key => $job, + $key => $job, $sampleSignalKey => serialize([$connection, $queue]) ]); } + /** + * After acknowledge receive lets remove it from the list + * + * @param string $connection + * @param string $queue + * @param string $partition + * @param string $jobUuid + * + * @return void + */ public function acknowledge($connection, $queue, $partition, $jobUuid) { $redis = $this->getConnection(); @@ -204,6 +378,14 @@ public function acknowledge($connection, $queue, $partition, $jobUuid) $redis->del($key); } + /** + * Retry failed jobs of given queues and partitions + * + * @param array $queues + * @param array $queue_partitions + * + * @return int + */ public function retryFailedJobs(array $queues = [], array $queue_partitions = []) { $count = 0; @@ -234,6 +416,14 @@ public function retryFailedJobs(array $queues = [], array $queue_partitions = [] return $count; } + /** + * Retry failed jobs of a given partition + * + * @param string $queue + * @param string $partition + * + * @return int + */ public function retryPartitionFailedJobs($queue, $partition) { $count = 0; @@ -251,6 +441,14 @@ public function retryPartitionFailedJobs($queue, $partition) return $count; } + /** + * Purge failed jobs of given queues and partitions + * + * @param array $queues + * @param array $queue_partitions + * + * @return void + */ public function purgeFailedJobs(array $queues = [], array $queue_partitions = []) { $redis = $this->getConnection(); @@ -272,6 +470,13 @@ public function purgeFailedJobs(array $queues = [], array $queue_partitions = [] } } + /** + * Recovers lost jobs since given seconds ago + * + * @param int $age + * + * @return int + */ public function recoverLost($age = 300) { $redis = $this->getConnection(); @@ -308,6 +513,15 @@ public function recoverLost($age = 300) return $count; } + /** + * Recovers lost jobs for the given partition since given seconds ago + * + * @param string $queue + * @param string $partition + * @param int $age + * + * @return int + */ public function recoverPartitionLost($queue, $partition, $age = 300) { $redis = $this->getConnection(); @@ -343,9 +557,8 @@ public function recoverPartitionLost($queue, $partition, $age = 300) return $count; } - /** - * Count Fair Signals + * Count Fair Signals of the give queue and partition * * @param string $queue * @param string $partition @@ -363,7 +576,7 @@ public function countFairSignals($queue, $partition) } /** - * Count All Jobs + * Count all jobs of the given queue * * @param string $queue * @@ -383,6 +596,13 @@ public function countAllJobs($queue) return $count; } + /** + * Count all jobs of the given queue + * + * @param string $queue + * + * @return int + */ public function recoverStuckJobs() { $queues = $this->queues(); @@ -403,8 +623,15 @@ public function recoverStuckJobs() return $count; } - /** + /** + * Generates fake signal + * + * @param string $queue + * @param int $count + * * @throws SampleNotFoundException + + * @return int */ public function generateFakeSignals($queue, $count) { @@ -427,6 +654,15 @@ public function generateFakeSignals($queue, $count) } } + /** + * Gets name of all queues + * + * @param string $queueListPatternResolver + * @param string $extractQueueNameFromPartitionKeyResolver + * + * + * @return array + */ private function queuesPrivate( $queueListPatternResolver = 'queueListPattern', $extractQueueNameFromPartitionKeyResolver = 'extractQueueNameFromPartitionKey' @@ -442,6 +678,14 @@ private function queuesPrivate( return array_values(array_unique($queues)); } + /** + * Get all queues with their partitions + * + * @param string $queuesResolver + * @param string $partitionsResolver + * + * @return array + */ private function queuesWithPartitionsPrivate( $queuesResolver = 'queues', $partitionsResolver = 'partitions' @@ -465,6 +709,15 @@ private function queuesWithPartitionsPrivate( return $queues; } + /** + * Get all partitions + * + * @param string $queue + * @param string $queuePartitionListPatternResolver + * @param string $extractorResolver + * + * @return array + */ private function partitionsPrivate( $queue, $queuePartitionListPatternResolver = 'queuePartitionListPattern', @@ -481,13 +734,20 @@ private function partitionsPrivate( return array_values($partitions); } + /** + * Get partitions With number of jobs + * + * @param string $queue + * @param string $queuePartitionListPatternResolver + * @param string $extractPartitionNameFromPartitionKeyResolver + * + * @return array + */ private function partitionsWithCountPrivate( $queue, $queuePartitionListPatternResolver = 'queuePartitionListPattern', $extractPartitionNameFromPartitionKeyResolver = 'extractPartitionNameFromPartitionKey', - $partitionKeyResolver = 'partitionKey', - $partitionPerSecKeyResolver = 'partitionPerSecKey', - $includePartitionPerSecKeyColumn = true + $partitionKeyResolver = 'partitionKey' ) { $redis = $this->getConnection(); @@ -522,6 +782,15 @@ private function partitionsWithCountPrivate( return $partitions; } + /** + * Get all jobs + * + * @param string $queue + * @param string $partition + * @param string $partitionKeyResolver + * + * @return array + */ private function jobsPrivate($queue, $partition, $partitionKeyResolver = 'partitionKey') { $redis = $this->getConnection(); @@ -555,6 +824,16 @@ private function jobsPrivate($queue, $partition, $partitionKeyResolver = 'partit ]; } + /** + * Get job + * + * @param string $queue + * @param string $partition + * @param string $index + * @param string $partitionKeyResolver + * + * @return array + */ private function jobPrivate($queue, $partition, $index, $partitionKeyResolver = 'partitionKey') { $redis = $this->getConnection(); @@ -566,6 +845,15 @@ private function jobPrivate($queue, $partition, $index, $partitionKeyResolver = return $jobs ? $jobs[0] : null; } + /** + * Get total jobs of the given queues + * + * @param array $queues + * @param string $partitionsResolver + * @param string $partitionKeyResolver + * + * @return int + */ private function totalJobsCountPrivate( $queues, $partitionsResolver = 'partitions', @@ -584,6 +872,15 @@ private function totalJobsCountPrivate( return $jobsCount; } + /** + * Returns and removes the first element of the list + * + * @param string $queue + * @param string $partition + * @param string $partitionKeyResolver + * + * @return string|null + */ private function popPrivate($queue, $partition, $partitionKeyResolver = 'partitionKey') { $redis = $this->getConnection(); @@ -611,12 +908,22 @@ private function popPrivate($queue, $partition, $partitionKeyResolver = 'partiti return $redis->lpop($partitionKey); } + /** + * Returns Redis Connection + * + * @return \Illuminate\Redis\Connections\Connection + */ public function getConnection() { $database = config('fair-queue.database'); return Redis::connection($database); } + /** + * Returns Redis Fair Signal Connection + * + * @return \Illuminate\Redis\Connections\Connection + */ public function getSignalsConnection() { $database = config('fair-queue.signals_database');