From b4ee0b66bb29ae8690e7460dc5451febebfd13d8 Mon Sep 17 00:00:00 2001 From: dmitry krokhin Date: Mon, 12 Feb 2024 14:36:56 +0300 Subject: [PATCH] client reconnect on consumer timeout fix --- src/Client.php | 4 ++-- src/Consumer/Consumer.php | 3 ++- tests/Functional/StreamTest.php | 3 ++- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/Client.php b/src/Client.php index 8f12c75..1096d7f 100644 --- a/src/Client.php +++ b/src/Client.php @@ -229,7 +229,7 @@ public function setTimeout(float $value): self /** * @throws Throwable */ - public function process(null|int|float $timeout = 0, bool $reply = true) + public function process(null|int|float $timeout = 0, bool $reply = true, bool $checkTimeout = true) { $this->lastDataReadFailureAt = null; $max = microtime(true) + $timeout; @@ -238,7 +238,7 @@ public function process(null|int|float $timeout = 0, bool $reply = true) $iteration = 0; while (true) { try { - $line = $this->readLine(1024, "\r\n"); + $line = $this->readLine(1024, "\r\n", $checkTimeout); if ($line && ($this->ping || trim($line) != 'PONG')) { break; diff --git a/src/Consumer/Consumer.php b/src/Consumer/Consumer.php index dc56eb9..3e182fd 100644 --- a/src/Consumer/Consumer.php +++ b/src/Consumer/Consumer.php @@ -143,7 +143,8 @@ public function handle(Closure $handler, Closure $emptyHandler = null, bool $ack foreach (range(1, $this->batch) as $_) { $runtime->empty = true; // expires request means that we should receive answer from stream - $this->client->process($this->expires ? PHP_INT_MAX : null, $ack); + // consumer timeout can be more that client connection timeout + $this->client->process($this->expires ? PHP_INT_MAX : null, $ack, false); if ($runtime->empty) { if ($emptyHandler) { diff --git a/tests/Functional/StreamTest.php b/tests/Functional/StreamTest.php index d1c7afe..323c7c3 100644 --- a/tests/Functional/StreamTest.php +++ b/tests/Functional/StreamTest.php @@ -116,7 +116,7 @@ public function testNoMessages() $this->called = false; $this->empty = false; - $stream = $this->getClient()->getApi()->getStream('no_messages'); + $stream = $this->createClient(['reconnect' => false])->getApi()->getStream('no_messages'); $stream->getConfiguration()->setSubjects(['cucumber']); $stream->create(); @@ -126,6 +126,7 @@ public function testNoMessages() $consumer->create() ->setDelay(0) ->setIterations(1) + ->setExpires(1) ->handle(function ($response) { $this->called = $response; }, function () {