Skip to content

Commit

Permalink
Merge pull request #50 from aloware/fix/partitions-list
Browse files Browse the repository at this point in the history
Fix Partitions List
  • Loading branch information
SohrabZ authored Apr 14, 2023
2 parents 5020b68 + a055b7e commit ca71dd7
Showing 1 changed file with 16 additions and 17 deletions.
33 changes: 16 additions & 17 deletions src/Repositories/RedisRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public function __construct()
*/
public function partitions($queue)
{
return $this->partitionsPrivate($queue);
return $this->getPartitionNames($queue);
}

/**
Expand Down Expand Up @@ -211,9 +211,7 @@ public function failedPartitionsWithCount($queue)
$queue,
'failedPartitionListPattern',
'extractPartitionNameFromFailedPartitionKey',
'failedPartitionKey',
'failedPartitionPerSecKey',
false
'failedPartitionKey'
);
}

Expand Down Expand Up @@ -766,6 +764,20 @@ private function partitionsPrivate(
return array_values($partitions);
}

/**
* Get partition names of a queue
*
* @param string $queue
*
* @return string
*/
public function getPartitionNames($queue)
{
$listKeyName = $this->queuePartitionsListKeyName($queue);

return $this->redis->smembers($listKeyName);
}

/**
* Get random partition name of a queue
*
Expand Down Expand Up @@ -924,23 +936,10 @@ private function popPrivate($queue, $partition, $partitionKeyResolver = 'partiti
$partitionKey = $this->$partitionKeyResolver($queue, $partition);

$processedKey = $this->partitionProcessedCountJobKey($queue, $partition);
$partitionPerSecKey = $this->partitionPerSecKey($queue, $partition);

$this->redis->incr($processedKey);
$this->redis->expire($processedKey, 3);

$now = time();
list ($lastAccess, $lastPersec) = explode(',', $this->redis->get($partitionPerSecKey) ?? ($now - 1) . ',0');

if ($now - $lastAccess >= 1) {
$persec = max($this->redis->get($processedKey) ?? 0, 0);

$data = $now . ',' . max($persec, $persec - $lastPersec);
$this->redis->set($partitionPerSecKey, $data, 'EX', 3);

$this->redis->decrBy($processedKey, $persec);
}

$result = $this->redis->multi()
->lpop($partitionKey)
->exists($partitionKey)
Expand Down

0 comments on commit ca71dd7

Please sign in to comment.