From 80df77f8dd6745ac615923ba7c0f9c5993c62b02 Mon Sep 17 00:00:00 2001 From: chaz6chez Date: Mon, 3 Jun 2024 17:55:18 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96rqueue=E6=B6=88=E8=B4=B9?= =?UTF-8?q?=E7=9A=84xgroup=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Builders/Traits/MessageQueueMethod.php | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/src/Builders/Traits/MessageQueueMethod.php b/src/Builders/Traits/MessageQueueMethod.php index 69af4cb..c08699e 100644 --- a/src/Builders/Traits/MessageQueueMethod.php +++ b/src/Builders/Traits/MessageQueueMethod.php @@ -15,6 +15,8 @@ trait MessageQueueMethod protected array $claimStartTags = []; + protected bool $_init = false; + /** * @param string|null $queueName * @return array @@ -392,9 +394,12 @@ public function consume(Worker $worker, bool $del = true): bool $consumerName = "$groupName-$worker->id"; // create group $queueStreams = []; - foreach ($queues as $queueName) { - $client->xGroup('CREATE', $queueName, $groupName,'0', true); - $queueStreams[$queueName] = '>'; + if (!$this->_init) { + foreach ($queues as $queueName) { + $client->xGroup('CREATE', $queueName, $groupName,'0', true); + $queueStreams[$queueName] = '>'; + } + $this->_init = true; } // group read if($res = $client->xReadGroup( @@ -473,6 +478,7 @@ public function consume(Worker $worker, bool $del = true): bool } catch (RedisException $exception) { Log::channel('plugin.workbunny.webman-rqueue.debug')?->debug($exception->getMessage(), $exception->getTrace()); $this->getLogger()?->debug($exception->getMessage(), $exception->getTrace()); + $this->_init = false; throw new WebmanRqueueException($exception->getMessage(), $exception->getCode(), $exception); } }