Skip to content

Commit

Permalink
client reconnect on consumer timeout fix
Browse files Browse the repository at this point in the history
  • Loading branch information
nekufa committed Feb 12, 2024
1 parent 8ea9329 commit b4ee0b6
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 4 deletions.
4 changes: 2 additions & 2 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public function setTimeout(float $value): self
/**
* @throws Throwable
*/
public function process(null|int|float $timeout = 0, bool $reply = true)
public function process(null|int|float $timeout = 0, bool $reply = true, bool $checkTimeout = true)
{
$this->lastDataReadFailureAt = null;
$max = microtime(true) + $timeout;
Expand All @@ -238,7 +238,7 @@ public function process(null|int|float $timeout = 0, bool $reply = true)
$iteration = 0;
while (true) {
try {
$line = $this->readLine(1024, "\r\n");
$line = $this->readLine(1024, "\r\n", $checkTimeout);

if ($line && ($this->ping || trim($line) != 'PONG')) {
break;
Expand Down
3 changes: 2 additions & 1 deletion src/Consumer/Consumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ public function handle(Closure $handler, Closure $emptyHandler = null, bool $ack
foreach (range(1, $this->batch) as $_) {
$runtime->empty = true;
// expires request means that we should receive answer from stream
$this->client->process($this->expires ? PHP_INT_MAX : null, $ack);
// consumer timeout can be more that client connection timeout
$this->client->process($this->expires ? PHP_INT_MAX : null, $ack, false);

if ($runtime->empty) {
if ($emptyHandler) {
Expand Down
3 changes: 2 additions & 1 deletion tests/Functional/StreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public function testNoMessages()
$this->called = false;
$this->empty = false;

$stream = $this->getClient()->getApi()->getStream('no_messages');
$stream = $this->createClient(['reconnect' => false])->getApi()->getStream('no_messages');
$stream->getConfiguration()->setSubjects(['cucumber']);
$stream->create();

Expand All @@ -126,6 +126,7 @@ public function testNoMessages()
$consumer->create()
->setDelay(0)
->setIterations(1)
->setExpires(1)
->handle(function ($response) {
$this->called = $response;
}, function () {
Expand Down

0 comments on commit b4ee0b6

Please sign in to comment.