diff --git a/src/Connection.php b/src/Connection.php index 425d567..348b201 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -28,6 +28,7 @@ class Connection private float $pingAt = 0; private float $pongAt = 0; private float $prolongateTill = 0; + private int $paketSize = 1024; private ?Authenticator $authenticator; private Configuration $config; @@ -128,28 +129,32 @@ public function sendMessage(Message $message) $line = $message->render() . "\r\n"; $length = strlen($line); + $total = 0; $this->logger?->debug('send ' . $line); - while (strlen($line)) { + while ($total < $length) { try { - $written = @fwrite($this->socket, $line, 1024); + $written = @fwrite($this->socket, substr($line, $total, $this->paketSize)); if ($written === false) { throw new LogicException('Error sending data'); } if ($written === 0) { throw new LogicException('Broken pipe or closed connection'); } - if ($length == $written) { + $total += $written; + + if ($length == $total) { break; } - $line = substr($line, $written); } catch (Throwable $e) { $this->processException($e); $line = $message->render() . "\r\n"; } } + unset($line); + if ($message instanceof Publish) { if (strpos($message->subject, '$JS.API.CONSUMER.MSG.NEXT.') === 0) { $prolongate = $message->payload->expires / 1_000_000_000; @@ -291,6 +296,11 @@ private function processException(Throwable $e) } } + public function setPacketSize(int $size): void + { + $this->paketSize = $size; + } + public function close(): void { if ($this->socket) { diff --git a/tests/Performance/PerformanceTest.php b/tests/Performance/PerformanceTest.php index 74f6d1f..d10f986 100644 --- a/tests/Performance/PerformanceTest.php +++ b/tests/Performance/PerformanceTest.php @@ -10,6 +10,8 @@ class PerformanceTest extends FunctionalTestCase { private int $limit = 500_000; private int $counter = 0; + private int $bigMessageIterationLimit = 1000; + private int $payloadSize = 1024 * 450; // 900kb public function testPerformance() { @@ -49,4 +51,29 @@ public function testPerformance() // at least 5000rps should be enough for test $this->assertGreaterThan(5000, $this->limit / $processing); } + + + public function testPerformanceWithBigMessages() + { + $client = $this->createClient()->setTimeout(0.1)->setDelay(0); + $client->connection->setLogger(null); + + $bigPayload = bin2hex(random_bytes($this->payloadSize)); + $this->logger?->info('start big message performance test with size: '. strlen($bigPayload) / 1024 .'kb'); + + $publishing = microtime(true); + for ($i = 0; $i < $this->bigMessageIterationLimit; $i++) { + $client->publish('hello', $bigPayload); + } + $publishing = microtime(true) - $publishing; + + $this->logger?->info('publishing', [ + 'rps' => floor($this->bigMessageIterationLimit / $publishing), + 'length' => $this->bigMessageIterationLimit, + 'time' => $publishing, + ]); + + // at least 50rps should be enough for test + $this->assertGreaterThan(500, $this->bigMessageIterationLimit / $publishing); + } }