Skip to content

Commit

Permalink
优化rqueue消费的xgroup逻辑
Browse files Browse the repository at this point in the history
  • Loading branch information
chaz6chez committed Jun 3, 2024
1 parent 313a464 commit 80df77f
Showing 1 changed file with 9 additions and 3 deletions.
12 changes: 9 additions & 3 deletions src/Builders/Traits/MessageQueueMethod.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ trait MessageQueueMethod

protected array $claimStartTags = [];

protected bool $_init = false;

/**
* @param string|null $queueName
* @return array
Expand Down Expand Up @@ -392,9 +394,12 @@ public function consume(Worker $worker, bool $del = true): bool
$consumerName = "$groupName-$worker->id";
// create group
$queueStreams = [];
foreach ($queues as $queueName) {
$client->xGroup('CREATE', $queueName, $groupName,'0', true);
$queueStreams[$queueName] = '>';
if (!$this->_init) {
foreach ($queues as $queueName) {
$client->xGroup('CREATE', $queueName, $groupName,'0', true);
$queueStreams[$queueName] = '>';
}
$this->_init = true;
}
// group read
if($res = $client->xReadGroup(
Expand Down Expand Up @@ -473,6 +478,7 @@ public function consume(Worker $worker, bool $del = true): bool
} catch (RedisException $exception) {
Log::channel('plugin.workbunny.webman-rqueue.debug')?->debug($exception->getMessage(), $exception->getTrace());
$this->getLogger()?->debug($exception->getMessage(), $exception->getTrace());
$this->_init = false;
throw new WebmanRqueueException($exception->getMessage(), $exception->getCode(), $exception);
}
}
Expand Down

0 comments on commit 80df77f

Please sign in to comment.