Skip to content

Commit 4231d95

Browse files
authored
Fix flawed handling of 404 messages (#121)
* Handle 404 Messages and proceed to process messages instead of waiting for a timeout * cs fix * add tests * Fix flawed handling of 404 messages Initial fix was flawed due to the fact that it dropped entirely the 404 message received from the server. This is a serious breaking change and shouldn't have had happened. The initial issue lies inside the fetchAll logic and it's wait logic and there the 404 handling should be. * more tests
1 parent e32c578 commit 4231d95

5 files changed

Lines changed: 57 additions & 39 deletions

File tree

src/Client.php

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -189,9 +189,6 @@ public function process(null|int|float $timeout = 0, bool $reply = true): mixed
189189
return $result;
190190
}
191191
if (array_key_exists($message->sid, $this->handlers)) {
192-
if (isset($message->payload->headers['Status-Code']) && $message->payload->headers['Status-Code'] === '404') {
193-
return null;
194-
}
195192
return $this->processMsg($this->handlers[$message->sid], $message, $reply);
196193
}
197194
if ($this->skipInvalidMessages) {

src/Queue.php

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ class Queue
1313
private array $queue = [];
1414
private float $timeout;
1515
private ?Publish $launcher = null;
16+
private ?Msg $lastMessage = null;
1617

1718
public function __construct(
1819
public readonly Client $client,
@@ -51,6 +52,13 @@ public function fetchAll(int $limit = 0): array
5152
// stop when clients got message for another handler or there are no more messages
5253
break;
5354
}
55+
56+
if (
57+
$this->lastMessage?->payload->isEmpty()
58+
&& $this->lastMessage->payload->getHeader('Status-Code') === '404'
59+
) {
60+
break;
61+
}
5462
}
5563

5664
$result = [];
@@ -64,7 +72,7 @@ public function fetchAll(int $limit = 0): array
6472

6573
public function handle(Msg $message)
6674
{
67-
$this->queue[] = $message;
75+
$this->queue[] = $this->lastMessage = $message;
6876
}
6977

7078
public function next(float $timeout = 0): Msg

tests/Functional/ClientTest.php

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -175,32 +175,4 @@ public function testCloseClosesSocket(): void
175175
// Assert that the socket is closed and set to null
176176
self::assertNull($property->getValue($connection));
177177
}
178-
179-
public function testProcessNotFound()
180-
{
181-
$client = $this->createClient([]);
182-
$sid = null;
183-
184-
/** @var Connection $connectionMock */
185-
$connectionMock = $this->createMock(Connection::class);
186-
187-
$connectionMock
188-
->method('sendMessage')
189-
->willReturnCallback(function (Subscribe $subscribe) use (&$sid) {
190-
$sid = $subscribe->sid;
191-
});
192-
$client->connection = $connectionMock;
193-
194-
$client->subscribe('hello.request', fn ($name) => "Hello, " . $name);
195-
196-
$message = Factory::create('HMSG handler.' . $sid . ' ' . $sid . ' 28 28');
197-
$message->parse('NATS/1.0 404 No Messages');
198-
199-
$connectionMock
200-
->method('getMessage')
201-
->willReturn($message);
202-
203-
$result = $client->process();
204-
$this->assertNull($result);
205-
}
206178
}

tests/Functional/StreamTest.php

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
use Basis\Nats\Consumer\AckPolicy;
88
use Basis\Nats\Consumer\Configuration;
99
use Basis\Nats\Consumer\Consumer;
10+
use Basis\Nats\Consumer\DeliverPolicy;
1011
use Basis\Nats\Consumer\ReplayPolicy;
1112
use Basis\Nats\Message\Payload;
1213
use Basis\Nats\Stream\ConsumerLimits;
@@ -16,6 +17,8 @@
1617

1718
class StreamTest extends FunctionalTestCase
1819
{
20+
private const CONSUMER_BATCH_SIZE = 50;
21+
1922
private mixed $called;
2023

2124
private bool $empty;
@@ -625,4 +628,44 @@ private function assertWrongNumPending(Consumer $consumer, ?int $expected = null
625628
}
626629
}
627630
}
631+
632+
public function testFetchLessThanBatch()
633+
{
634+
$client = $this->createClient(['timeout' => 10])->setDelay(0);
635+
$stream = $client->getApi()->getStream('test_fetch_no_wait');
636+
$stream
637+
->getConfiguration()
638+
->setRetentionPolicy(RetentionPolicy::INTEREST)
639+
->setStorageBackend(StorageBackend::MEMORY)
640+
->setSubjects(['test']);
641+
642+
$stream->create();
643+
644+
$consumer = $stream->getConsumer('fetch_no_waiter');
645+
$consumer->getConfiguration()
646+
->setSubjectFilter('test')
647+
->setDeliverPolicy(DeliverPolicy::NEW);
648+
$consumer->create();
649+
$consumer
650+
->setBatching(self::CONSUMER_BATCH_SIZE)
651+
->setExpires(0);
652+
653+
foreach (range(1, 10) as $n) {
654+
$stream->publish('test', 'Hello, NATS JetStream '.$n.'!');
655+
}
656+
657+
$fetching = microtime(true);
658+
// fetch more than available messages to test no-wait behavior
659+
$messages = $consumer->getQueue()->fetchAll($consumer->getBatching());
660+
$fetching = microtime(true) - $fetching;
661+
662+
$this->logger?->info('fetched with no-wait', [
663+
'length' => count($messages),
664+
'time' => $fetching,
665+
]);
666+
667+
// 10 messages were published + 1 404 message to signal the empty stream
668+
$this->assertCount(11, $messages);
669+
$this->assertEquals('404', end($messages)->payload->getHeader('Status-Code'), 'Last message should be 404');
670+
}
628671
}

tests/Performance/PerformanceTest.php

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -102,25 +102,23 @@ public function testFetchNoWait()
102102
->setBatching(self::CONSUMER_BATCH_SIZE)
103103
->setExpires(0);
104104

105-
106-
107105
foreach (range(1, 10) as $n) {
108-
$stream->put('test', 'Hello, NATS JetStream '.date(DATE_RFC3339_EXTENDED).'!');
106+
$stream->publish('test', 'Hello, NATS JetStream '.$n.'!');
109107
}
110108

111109
$fetching = microtime(true);
112110
// fetch more than available messages to test no-wait behavior
113111
$messages = $consumer->getQueue()->fetchAll($consumer->getBatching());
114112
$fetching = microtime(true) - $fetching;
115113

116-
$messages = array_filter($messages, static fn ($message) => !$message->payload->isEmpty());
117-
118114
$this->logger?->info('fetched with no-wait', [
119115
'length' => count($messages),
120116
'time' => $fetching,
121117
]);
122118

123-
$this->assertCount(10, $messages);
124-
$this->assertLessThan(1, $fetching, 'Fetching with no-wait should be fast enough');
119+
// 10 messages were published + 1 404 message to signal the empty stream
120+
$this->assertCount(11, $messages);
121+
$this->assertEquals('404', end($messages)->payload->getHeader('Status-Code'), 'Last message should be 404');
122+
$this->assertLessThan(1, (int)$fetching, 'Fetching with no-wait should be faster than the timeout');
125123
}
126124
}

0 commit comments

Comments
 (0)