From f5bff3c5985080d723ef069ecefaa2dfc1514b5c Mon Sep 17 00:00:00 2001 From: Dimitar Ilkov Date: Mon, 2 Feb 2026 17:49:42 +0200 Subject: [PATCH 1/5] Handle 404 Messages and proceed to process messages instead of waiting for a timeout --- src/Client.php | 3 +++ src/Queue.php | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/Client.php b/src/Client.php index c9cec30..d87d8a8 100644 --- a/src/Client.php +++ b/src/Client.php @@ -189,6 +189,9 @@ public function process(null|int|float $timeout = 0, bool $reply = true): mixed return $result; } if (array_key_exists($message->sid, $this->handlers)) { + if(isset($message->payload->headers['Status-Code']) && $message->payload->headers['Status-Code'] === '404') { + return null; + } return $this->processMsg($this->handlers[$message->sid], $message, $reply); } if ($this->skipInvalidMessages) { diff --git a/src/Queue.php b/src/Queue.php index 4fed362..4d947c4 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -48,7 +48,7 @@ public function fetchAll(int $limit = 0): array } if ($this->client->process($processingTimeout) !== $this) { - // stop when clients got message for another handler + // stop when clients got message for another handler or there are no more messages break; } } From 9552b46e14ce297459378cf6cc8c494e3cdf6c1a Mon Sep 17 00:00:00 2001 From: Dimitar Ilkov Date: Mon, 2 Feb 2026 18:03:59 +0200 Subject: [PATCH 2/5] cs fix --- src/Client.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Client.php b/src/Client.php index d87d8a8..1bce802 100644 --- a/src/Client.php +++ b/src/Client.php @@ -189,7 +189,7 @@ public function process(null|int|float $timeout = 0, bool $reply = true): mixed return $result; } if (array_key_exists($message->sid, $this->handlers)) { - if(isset($message->payload->headers['Status-Code']) && $message->payload->headers['Status-Code'] === '404') { + if (isset($message->payload->headers['Status-Code']) && $message->payload->headers['Status-Code'] === '404') { return null; } return $this->processMsg($this->handlers[$message->sid], $message, $reply); From de2d3dc25f831f9a632c2c62ebd617be87dab57a Mon Sep 17 00:00:00 2001 From: Dimitar Ilkov Date: Tue, 3 Feb 2026 22:50:48 +0200 Subject: [PATCH 3/5] add tests --- tests/Functional/ClientTest.php | 30 ++++++++++++++++ tests/Performance/PerformanceTest.php | 49 ++++++++++++++++++++++++++- 2 files changed, 78 insertions(+), 1 deletion(-) diff --git a/tests/Functional/ClientTest.php b/tests/Functional/ClientTest.php index 8385cec..e648e8b 100644 --- a/tests/Functional/ClientTest.php +++ b/tests/Functional/ClientTest.php @@ -5,6 +5,8 @@ namespace Tests\Functional; use Basis\Nats\Connection; +use Basis\Nats\Message\Factory; +use Basis\Nats\Message\Subscribe; use Monolog\Handler\StreamHandler; use Monolog\Logger; use ReflectionProperty; @@ -173,4 +175,32 @@ public function testCloseClosesSocket(): void // Assert that the socket is closed and set to null self::assertNull($property->getValue($connection)); } + + public function testProcessNotFound() + { + $client = $this->createClient([]); + $sid = null; + + /** @var Connection $connectionMock */ + $connectionMock = $this->createMock(Connection::class); + + $connectionMock + ->method('sendMessage') + ->willReturnCallback(function (Subscribe $subscribe) use (&$sid) { + $sid = $subscribe->sid; + }); + $client->connection = $connectionMock; + + $client->subscribe('hello.request', fn ($name) => "Hello, " . $name); + + $message = Factory::create('HMSG handler.' . $sid . ' ' . $sid . ' 28 28'); + $message->parse('NATS/1.0 404 No Messages'); + + $connectionMock + ->method('getMessage') + ->willReturn($message); + + $result = $client->process(); + $this->assertNull($result); + } } diff --git a/tests/Performance/PerformanceTest.php b/tests/Performance/PerformanceTest.php index f86f47b..a1c5c25 100644 --- a/tests/Performance/PerformanceTest.php +++ b/tests/Performance/PerformanceTest.php @@ -4,10 +4,14 @@ namespace Tests\Performance; +use Basis\Nats\Consumer\DeliverPolicy; +use Basis\Nats\Stream\RetentionPolicy; +use Basis\Nats\Stream\StorageBackend; use Tests\FunctionalTestCase; class PerformanceTest extends FunctionalTestCase { + private const CONSUMER_BATCH_SIZE = 50; private int $limit = 500_000; private int $counter = 0; private int $bigMessageIterationLimit = 1000; @@ -74,6 +78,49 @@ public function testPerformanceWithBigMessages() ]); // at least 50rps should be enough for test - $this->assertGreaterThan(50, $this->bigMessageIterationLimit / $publishing); + $this->assertGreaterThan(self::CONSUMER_BATCH_SIZE, $this->bigMessageIterationLimit / $publishing); + } + + public function testFetchNoWait() + { + $client = $this->createClient(['timeout' => 10])->setDelay(0); + $stream = $client->getApi()->getStream('test_fetch_no_wait'); + $stream + ->getConfiguration() + ->setRetentionPolicy(RetentionPolicy::INTEREST) + ->setStorageBackend(StorageBackend::MEMORY) + ->setSubjects(['test']); + + $stream->create(); + + $consumer = $stream->getConsumer('fetch_no_waiter'); + $consumer->getConfiguration() + ->setSubjectFilter('test') + ->setDeliverPolicy(DeliverPolicy::NEW); + $consumer->create(); + $consumer + ->setBatching(self::CONSUMER_BATCH_SIZE) + ->setExpires(0); + + + + foreach (range(1, 10) as $n) { + $stream->put('test', 'Hello, NATS JetStream '.date(DATE_RFC3339_EXTENDED).'!'); + } + + $fetching = microtime(true); + // fetch more than available messages to test no-wait behavior + $messages = $consumer->getQueue()->fetchAll($consumer->getBatching()); + $fetching = microtime(true) - $fetching; + + $messages = array_filter($messages, static fn ($message) => !$message->payload->isEmpty()); + + $this->logger?->info('fetched with no-wait', [ + 'length' => count($messages), + 'time' => $fetching, + ]); + + $this->assertCount(10, $messages); + $this->assertLessThan(1, $fetching, 'Fetching with no-wait should be fast enough'); } } From 96a5940f5db29e593696cc7f416ec8932aa6a9f0 Mon Sep 17 00:00:00 2001 From: Dimitar Ilkov Date: Wed, 4 Feb 2026 11:30:10 +0200 Subject: [PATCH 4/5] Fix flawed handling of 404 messages Initial fix was flawed due to the fact that it dropped entirely the 404 message received from the server. This is a serious breaking change and shouldn't have had happened. The initial issue lies inside the fetchAll logic and it's wait logic and there the 404 handling should be. --- src/Client.php | 3 --- src/Queue.php | 10 +++++++++- tests/Functional/ClientTest.php | 28 --------------------------- tests/Performance/PerformanceTest.php | 12 +++++------- 4 files changed, 14 insertions(+), 39 deletions(-) diff --git a/src/Client.php b/src/Client.php index 1bce802..c9cec30 100644 --- a/src/Client.php +++ b/src/Client.php @@ -189,9 +189,6 @@ public function process(null|int|float $timeout = 0, bool $reply = true): mixed return $result; } if (array_key_exists($message->sid, $this->handlers)) { - if (isset($message->payload->headers['Status-Code']) && $message->payload->headers['Status-Code'] === '404') { - return null; - } return $this->processMsg($this->handlers[$message->sid], $message, $reply); } if ($this->skipInvalidMessages) { diff --git a/src/Queue.php b/src/Queue.php index 4d947c4..02cda91 100644 --- a/src/Queue.php +++ b/src/Queue.php @@ -13,6 +13,7 @@ class Queue private array $queue = []; private float $timeout; private ?Publish $launcher = null; + private ?Msg $lastMessage = null; public function __construct( public readonly Client $client, @@ -51,6 +52,13 @@ public function fetchAll(int $limit = 0): array // stop when clients got message for another handler or there are no more messages break; } + + if ( + $this->lastMessage?->payload->isEmpty() + && $this->lastMessage->payload->getHeader('Status-Code') === '404' + ) { + break; + } } $result = []; @@ -64,7 +72,7 @@ public function fetchAll(int $limit = 0): array public function handle(Msg $message) { - $this->queue[] = $message; + $this->queue[] = $this->lastMessage = $message; } public function next(float $timeout = 0): Msg diff --git a/tests/Functional/ClientTest.php b/tests/Functional/ClientTest.php index e648e8b..c494398 100644 --- a/tests/Functional/ClientTest.php +++ b/tests/Functional/ClientTest.php @@ -175,32 +175,4 @@ public function testCloseClosesSocket(): void // Assert that the socket is closed and set to null self::assertNull($property->getValue($connection)); } - - public function testProcessNotFound() - { - $client = $this->createClient([]); - $sid = null; - - /** @var Connection $connectionMock */ - $connectionMock = $this->createMock(Connection::class); - - $connectionMock - ->method('sendMessage') - ->willReturnCallback(function (Subscribe $subscribe) use (&$sid) { - $sid = $subscribe->sid; - }); - $client->connection = $connectionMock; - - $client->subscribe('hello.request', fn ($name) => "Hello, " . $name); - - $message = Factory::create('HMSG handler.' . $sid . ' ' . $sid . ' 28 28'); - $message->parse('NATS/1.0 404 No Messages'); - - $connectionMock - ->method('getMessage') - ->willReturn($message); - - $result = $client->process(); - $this->assertNull($result); - } } diff --git a/tests/Performance/PerformanceTest.php b/tests/Performance/PerformanceTest.php index a1c5c25..c5d5135 100644 --- a/tests/Performance/PerformanceTest.php +++ b/tests/Performance/PerformanceTest.php @@ -102,10 +102,8 @@ public function testFetchNoWait() ->setBatching(self::CONSUMER_BATCH_SIZE) ->setExpires(0); - - foreach (range(1, 10) as $n) { - $stream->put('test', 'Hello, NATS JetStream '.date(DATE_RFC3339_EXTENDED).'!'); + $stream->publish('test', 'Hello, NATS JetStream '.$n.'!'); } $fetching = microtime(true); @@ -113,14 +111,14 @@ public function testFetchNoWait() $messages = $consumer->getQueue()->fetchAll($consumer->getBatching()); $fetching = microtime(true) - $fetching; - $messages = array_filter($messages, static fn ($message) => !$message->payload->isEmpty()); - $this->logger?->info('fetched with no-wait', [ 'length' => count($messages), 'time' => $fetching, ]); - $this->assertCount(10, $messages); - $this->assertLessThan(1, $fetching, 'Fetching with no-wait should be fast enough'); + // 10 messages were published + 1 404 message to signal the empty stream + $this->assertCount(11, $messages); + $this->assertEquals('404', end($messages)->payload->getHeader('Status-Code'), 'Last message should be 404'); + $this->assertLessThan(1, (int)$fetching, 'Fetching with no-wait should be faster than the timeout'); } } From 744502b7392dd46c343bc34d6048a0bcfb55b467 Mon Sep 17 00:00:00 2001 From: Dimitar Ilkov Date: Wed, 4 Feb 2026 14:36:45 +0200 Subject: [PATCH 5/5] more tests --- tests/Functional/StreamTest.php | 43 +++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/tests/Functional/StreamTest.php b/tests/Functional/StreamTest.php index 8a55cf3..ac9548e 100644 --- a/tests/Functional/StreamTest.php +++ b/tests/Functional/StreamTest.php @@ -7,6 +7,7 @@ use Basis\Nats\Consumer\AckPolicy; use Basis\Nats\Consumer\Configuration; use Basis\Nats\Consumer\Consumer; +use Basis\Nats\Consumer\DeliverPolicy; use Basis\Nats\Consumer\ReplayPolicy; use Basis\Nats\Message\Payload; use Basis\Nats\Stream\ConsumerLimits; @@ -16,6 +17,8 @@ class StreamTest extends FunctionalTestCase { + private const CONSUMER_BATCH_SIZE = 50; + private mixed $called; private bool $empty; @@ -625,4 +628,44 @@ private function assertWrongNumPending(Consumer $consumer, ?int $expected = null } } } + + public function testFetchLessThanBatch() + { + $client = $this->createClient(['timeout' => 10])->setDelay(0); + $stream = $client->getApi()->getStream('test_fetch_no_wait'); + $stream + ->getConfiguration() + ->setRetentionPolicy(RetentionPolicy::INTEREST) + ->setStorageBackend(StorageBackend::MEMORY) + ->setSubjects(['test']); + + $stream->create(); + + $consumer = $stream->getConsumer('fetch_no_waiter'); + $consumer->getConfiguration() + ->setSubjectFilter('test') + ->setDeliverPolicy(DeliverPolicy::NEW); + $consumer->create(); + $consumer + ->setBatching(self::CONSUMER_BATCH_SIZE) + ->setExpires(0); + + foreach (range(1, 10) as $n) { + $stream->publish('test', 'Hello, NATS JetStream '.$n.'!'); + } + + $fetching = microtime(true); + // fetch more than available messages to test no-wait behavior + $messages = $consumer->getQueue()->fetchAll($consumer->getBatching()); + $fetching = microtime(true) - $fetching; + + $this->logger?->info('fetched with no-wait', [ + 'length' => count($messages), + 'time' => $fetching, + ]); + + // 10 messages were published + 1 404 message to signal the empty stream + $this->assertCount(11, $messages); + $this->assertEquals('404', end($messages)->payload->getHeader('Status-Code'), 'Last message should be 404'); + } }