Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,6 @@ public function process(null|int|float $timeout = 0, bool $reply = true): mixed
return $result;
}
if (array_key_exists($message->sid, $this->handlers)) {
if (isset($message->payload->headers['Status-Code']) && $message->payload->headers['Status-Code'] === '404') {
return null;
}
return $this->processMsg($this->handlers[$message->sid], $message, $reply);
}
if ($this->skipInvalidMessages) {
Expand Down
10 changes: 9 additions & 1 deletion src/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class Queue
private array $queue = [];
private float $timeout;
private ?Publish $launcher = null;
private ?Msg $lastMessage = null;

public function __construct(
public readonly Client $client,
Expand Down Expand Up @@ -51,6 +52,13 @@ public function fetchAll(int $limit = 0): array
// stop when clients got message for another handler or there are no more messages
break;
}

if (
$this->lastMessage?->payload->isEmpty()
&& $this->lastMessage->payload->getHeader('Status-Code') === '404'
) {
break;
}
}

$result = [];
Expand All @@ -64,7 +72,7 @@ public function fetchAll(int $limit = 0): array

public function handle(Msg $message)
{
$this->queue[] = $message;
$this->queue[] = $this->lastMessage = $message;
}

public function next(float $timeout = 0): Msg
Expand Down
28 changes: 0 additions & 28 deletions tests/Functional/ClientTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -175,32 +175,4 @@ public function testCloseClosesSocket(): void
// Assert that the socket is closed and set to null
self::assertNull($property->getValue($connection));
}

public function testProcessNotFound()
{
$client = $this->createClient([]);
$sid = null;

/** @var Connection $connectionMock */
$connectionMock = $this->createMock(Connection::class);

$connectionMock
->method('sendMessage')
->willReturnCallback(function (Subscribe $subscribe) use (&$sid) {
$sid = $subscribe->sid;
});
$client->connection = $connectionMock;

$client->subscribe('hello.request', fn ($name) => "Hello, " . $name);

$message = Factory::create('HMSG handler.' . $sid . ' ' . $sid . ' 28 28');
$message->parse('NATS/1.0 404 No Messages');

$connectionMock
->method('getMessage')
->willReturn($message);

$result = $client->process();
$this->assertNull($result);
}
}
43 changes: 43 additions & 0 deletions tests/Functional/StreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Basis\Nats\Consumer\AckPolicy;
use Basis\Nats\Consumer\Configuration;
use Basis\Nats\Consumer\Consumer;
use Basis\Nats\Consumer\DeliverPolicy;
use Basis\Nats\Consumer\ReplayPolicy;
use Basis\Nats\Message\Payload;
use Basis\Nats\Stream\ConsumerLimits;
Expand All @@ -16,6 +17,8 @@

class StreamTest extends FunctionalTestCase
{
private const CONSUMER_BATCH_SIZE = 50;

private mixed $called;

private bool $empty;
Expand Down Expand Up @@ -625,4 +628,44 @@ private function assertWrongNumPending(Consumer $consumer, ?int $expected = null
}
}
}

public function testFetchLessThanBatch()
{
$client = $this->createClient(['timeout' => 10])->setDelay(0);
$stream = $client->getApi()->getStream('test_fetch_no_wait');
$stream
->getConfiguration()
->setRetentionPolicy(RetentionPolicy::INTEREST)
->setStorageBackend(StorageBackend::MEMORY)
->setSubjects(['test']);

$stream->create();

$consumer = $stream->getConsumer('fetch_no_waiter');
$consumer->getConfiguration()
->setSubjectFilter('test')
->setDeliverPolicy(DeliverPolicy::NEW);
$consumer->create();
$consumer
->setBatching(self::CONSUMER_BATCH_SIZE)
->setExpires(0);

foreach (range(1, 10) as $n) {
$stream->publish('test', 'Hello, NATS JetStream '.$n.'!');
}

$fetching = microtime(true);
// fetch more than available messages to test no-wait behavior
$messages = $consumer->getQueue()->fetchAll($consumer->getBatching());
$fetching = microtime(true) - $fetching;

$this->logger?->info('fetched with no-wait', [
'length' => count($messages),
'time' => $fetching,
]);

// 10 messages were published + 1 404 message to signal the empty stream
$this->assertCount(11, $messages);
$this->assertEquals('404', end($messages)->payload->getHeader('Status-Code'), 'Last message should be 404');
}
}
12 changes: 5 additions & 7 deletions tests/Performance/PerformanceTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -102,25 +102,23 @@ public function testFetchNoWait()
->setBatching(self::CONSUMER_BATCH_SIZE)
->setExpires(0);



foreach (range(1, 10) as $n) {
$stream->put('test', 'Hello, NATS JetStream '.date(DATE_RFC3339_EXTENDED).'!');
$stream->publish('test', 'Hello, NATS JetStream '.$n.'!');
}

$fetching = microtime(true);
// fetch more than available messages to test no-wait behavior
$messages = $consumer->getQueue()->fetchAll($consumer->getBatching());
$fetching = microtime(true) - $fetching;

$messages = array_filter($messages, static fn ($message) => !$message->payload->isEmpty());

$this->logger?->info('fetched with no-wait', [
'length' => count($messages),
'time' => $fetching,
]);

$this->assertCount(10, $messages);
$this->assertLessThan(1, $fetching, 'Fetching with no-wait should be fast enough');
// 10 messages were published + 1 404 message to signal the empty stream
$this->assertCount(11, $messages);
$this->assertEquals('404', end($messages)->payload->getHeader('Status-Code'), 'Last message should be 404');
$this->assertLessThan(1, (int)$fetching, 'Fetching with no-wait should be faster than the timeout');
}
}
Loading