From 6eb1142ac2330ffbd200c29294c9d6e313fb8068 Mon Sep 17 00:00:00 2001 From: roxblnfk Date: Sat, 16 Sep 2023 23:21:52 +0400 Subject: [PATCH 1/7] Use `StreamWorkerInterface::withStreamMode()` for stream output if worker supports it --- composer.json | 4 +++- src/HttpWorker.php | 11 ++++++++--- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/composer.json b/composer.json index aa4a4d1..79a4047 100644 --- a/composer.json +++ b/composer.json @@ -42,13 +42,15 @@ "psr/http-factory": "^1.0.1", "psr/http-message": "^1.0.1 || ^2.0", "spiral/roadrunner": "^2023.1", - "spiral/roadrunner-worker": "^3.0" + "spiral/roadrunner-worker": "dev-ping-pong-stream as 3.1.0" }, "require-dev": { + "buggregator/trap": "^1.0", "jetbrains/phpstorm-attributes": "^1.0", "nyholm/psr7": "^1.3", "phpunit/phpunit": "^10.0", "symfony/process": "^6.2", + "symfony/var-dumper": "^6.3", "vimeo/psalm": "^5.9" }, "autoload": { diff --git a/src/HttpWorker.php b/src/HttpWorker.php index 4a5294d..e92fd82 100644 --- a/src/HttpWorker.php +++ b/src/HttpWorker.php @@ -8,6 +8,7 @@ use Spiral\RoadRunner\Http\Exception\StreamStoppedException; use Spiral\RoadRunner\Message\Command\StreamStop; use Spiral\RoadRunner\Payload; +use Spiral\RoadRunner\StreamWorkerInterface; use Spiral\RoadRunner\WorkerInterface; /** @@ -86,18 +87,22 @@ private function respondStream(int $status, Generator $body, array $headers = [] 'headers' => $headers ?: (object)[], ], \JSON_THROW_ON_ERROR); + $worker = $this->worker instanceof StreamWorkerInterface + ? $this->worker->withStreamMode() + : $this->worker; + do { if (!$body->valid()) { $content = (string)$body->getReturn(); - $this->worker->respond(new Payload($content, $head, true)); + $worker->respond(new Payload($content, $head, true)); break; } $content = (string)$body->current(); - if ($this->worker->getPayload(StreamStop::class) !== null) { + if ($worker->getPayload(StreamStop::class) !== null) { $body->throw(new StreamStoppedException()); return; } - $this->worker->respond(new Payload($content, $head, false)); + $worker->respond(new Payload($content, $head, false)); $body->next(); $head = null; } while (true); From 398ff1786cebc8c7ac7c3117aa87fcc3a744c410 Mon Sep 17 00:00:00 2001 From: roxblnfk Date: Tue, 3 Oct 2023 17:18:59 +0400 Subject: [PATCH 2/7] HttpWorker::respond : add EOS parameter --- src/HttpWorker.php | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/HttpWorker.php b/src/HttpWorker.php index e92fd82..a634c2a 100644 --- a/src/HttpWorker.php +++ b/src/HttpWorker.php @@ -65,10 +65,10 @@ public function waitRequest(): ?Request /** * @throws \JsonException */ - public function respond(int $status, string|Generator $body, array $headers = []): void + public function respond(int $status, string|Generator $body, array $headers = [], bool $endOfStream = true): void { if ($body instanceof Generator) { - $this->respondStream($status, $body, $headers); + $this->respondStream($status, $body, $headers, $endOfStream); return; } @@ -77,10 +77,10 @@ public function respond(int $status, string|Generator $body, array $headers = [] 'headers' => $headers ?: (object)[], ], \JSON_THROW_ON_ERROR); - $this->worker->respond(new Payload($body, $head)); + $this->worker->respond(new Payload($body, $head, $endOfStream)); } - private function respondStream(int $status, Generator $body, array $headers = []): void + private function respondStream(int $status, Generator $body, array $headers = [], bool $endOfStream = true): void { $head = \json_encode([ 'status' => $status, @@ -94,7 +94,10 @@ private function respondStream(int $status, Generator $body, array $headers = [] do { if (!$body->valid()) { $content = (string)$body->getReturn(); - $worker->respond(new Payload($content, $head, true)); + if ($endOfStream === false && $content === '') { + return; + } + $worker->respond(new Payload($content, $head, $endOfStream)); break; } $content = (string)$body->current(); From 0e7c7c9928f911095e179197f6876b1fb77b197d Mon Sep 17 00:00:00 2001 From: roxblnfk Date: Tue, 3 Oct 2023 23:25:05 +0400 Subject: [PATCH 3/7] Set the spiral/roadrunner-worker dependency version to ^3.1.0 --- composer.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/composer.json b/composer.json index 79a4047..d8781ea 100644 --- a/composer.json +++ b/composer.json @@ -42,7 +42,7 @@ "psr/http-factory": "^1.0.1", "psr/http-message": "^1.0.1 || ^2.0", "spiral/roadrunner": "^2023.1", - "spiral/roadrunner-worker": "dev-ping-pong-stream as 3.1.0" + "spiral/roadrunner-worker": "^3.1.0" }, "require-dev": { "buggregator/trap": "^1.0", From 61bd6b82890011b78b2f767fbe6509abdca89730 Mon Sep 17 00:00:00 2001 From: roxblnfk Date: Wed, 4 Oct 2023 00:09:19 +0400 Subject: [PATCH 4/7] Catch exceptions from streaming generators and send stream stop flag; send stop flag if RR has stopped the stream; add comments --- src/HttpWorker.php | 18 +++++++++++++++++- src/HttpWorkerInterface.php | 3 +-- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/src/HttpWorker.php b/src/HttpWorker.php index a634c2a..03be65e 100644 --- a/src/HttpWorker.php +++ b/src/HttpWorker.php @@ -93,21 +93,37 @@ private function respondStream(int $status, Generator $body, array $headers = [] do { if (!$body->valid()) { + // End of generator $content = (string)$body->getReturn(); if ($endOfStream === false && $content === '') { + // We don't need to send an empty frame if the stream is not ended return; } $worker->respond(new Payload($content, $head, $endOfStream)); break; } + $content = (string)$body->current(); if ($worker->getPayload(StreamStop::class) !== null) { $body->throw(new StreamStoppedException()); + + // RoadRunner is waiting for a Stream Stop Frame to confirm that the stream is closed + // and the worker doesn't hang + $worker->respond(new Payload('')); return; } + + // Send a chunk of data $worker->respond(new Payload($content, $head, false)); - $body->next(); $head = null; + + try { + $body->next(); + } catch (\Throwable) { + // Stop the stream if an exception is thrown from the generator + $worker->respond(new Payload('')); + return; + } } while (true); } diff --git a/src/HttpWorkerInterface.php b/src/HttpWorkerInterface.php index 9198e7b..c163193 100644 --- a/src/HttpWorkerInterface.php +++ b/src/HttpWorkerInterface.php @@ -25,8 +25,7 @@ public function waitRequest(): ?Request; * @param Generator|string $body Body of response. * If the body is a generator, then each yielded value will be sent as a separated stream chunk. * Returned value will be sent as a last stream package. - * Note: Stream response is experimental feature and isn't supported by RoadRunner yet. - * But you can try to use RoadRunner 2.9-alpha to test it. + * Note: Stream response is supported by RoadRunner since version 2023.3 * @param HeadersList|array $headers An associative array of the message's headers. Each key MUST be a header name, * and each value MUST be an array of strings for that header. */ From 48e6c7c23c6d263e4ede240f6ad071feee166227 Mon Sep 17 00:00:00 2001 From: roxblnfk Date: Wed, 4 Oct 2023 00:29:06 +0400 Subject: [PATCH 5/7] Update tests --- tests/Feature/StreamResponseTest.php | 32 +++++++++++++++++++++++++--- 1 file changed, 29 insertions(+), 3 deletions(-) diff --git a/tests/Feature/StreamResponseTest.php b/tests/Feature/StreamResponseTest.php index 13a0450..5589dba 100644 --- a/tests/Feature/StreamResponseTest.php +++ b/tests/Feature/StreamResponseTest.php @@ -4,10 +4,12 @@ namespace Spiral\RoadRunner\Tests\Http\Feature; +use Exception; use PHPUnit\Framework\TestCase; use Spiral\Goridge\SocketRelay; use Spiral\RoadRunner\Http\Exception\StreamStoppedException; use Spiral\RoadRunner\Http\HttpWorker; +use Spiral\RoadRunner\Message\Command\GetProcessId; use Spiral\RoadRunner\Payload; use Spiral\RoadRunner\Tests\Http\Server\Command\BaseCommand; use Spiral\RoadRunner\Tests\Http\Server\Command\StreamStop; @@ -94,10 +96,31 @@ public function testStopStreamResponse(): void self::assertSame(\implode("\n", ['Hel', 'lo,']), \trim(ServerRunner::getBuffer())); } + public function testExceptionInGenerator(): void + { + $httpWorker = $this->makeHttpWorker(); + + // Flush buffer + ServerRunner::getBuffer(); + + $httpWorker->respond( + 200, + (function () { + yield 'Hel'; + yield 'lo,'; + throw new Exception('test'); + })(), + ); + + + \usleep(100_000); + self::assertSame(\implode("\n", ['Hel', 'lo,']), \trim(ServerRunner::getBuffer())); + } + /** * StopStream should be ignored if stream is already ended. * Commented because doesn't pass in CI - * todo: check after RoadRunner Stream Response release + */ public function testStopStreamAfterStreamEnd(): void { $httpWorker = $this->makeHttpWorker(); @@ -116,11 +139,14 @@ public function testStopStreamAfterStreamEnd(): void $this->assertFalse($this->getWorker()->hasPayload(\Spiral\RoadRunner\Message\Command\StreamStop::class)); $this->sendCommand(new StreamStop()); \usleep(200_000); - self::assertSame(\implode("\n", ['Hello', 'World!']), \trim(ServerRunner::getBuffer())); + $this->assertSame(\implode("\n", ['Hello', 'World!']), \trim(ServerRunner::getBuffer())); $this->assertTrue($this->getWorker()->hasPayload(\Spiral\RoadRunner\Message\Command\StreamStop::class)); + + $this->getWorker()->getPayload(\Spiral\RoadRunner\Message\Command\StreamStop::class); + $this->getWorker()->getPayload(GetProcessId::class); + $this->assertFalse($this->getWorker()->hasPayload()); } - */ private function getRelay(): SocketRelay { From 8f31c6a622dde2245848875422360bb2a23f6ba0 Mon Sep 17 00:00:00 2001 From: roxblnfk Date: Wed, 4 Oct 2023 12:54:55 +0400 Subject: [PATCH 6/7] Throw an exception if user sends 1xx response with body --- src/HttpWorker.php | 6 +++++- tests/Feature/StreamResponseTest.php | 15 +++++++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/src/HttpWorker.php b/src/HttpWorker.php index 03be65e..92887f7 100644 --- a/src/HttpWorker.php +++ b/src/HttpWorker.php @@ -65,8 +65,12 @@ public function waitRequest(): ?Request /** * @throws \JsonException */ - public function respond(int $status, string|Generator $body, array $headers = [], bool $endOfStream = true): void + public function respond(int $status, string|Generator $body = '', array $headers = [], bool $endOfStream = true): void { + if ($status < 200 && $status >= 100 && $body !== '') { + throw new \InvalidArgumentException('Unable to send a body with informational status code.'); + } + if ($body instanceof Generator) { $this->respondStream($status, $body, $headers, $endOfStream); return; diff --git a/tests/Feature/StreamResponseTest.php b/tests/Feature/StreamResponseTest.php index 5589dba..9da68b8 100644 --- a/tests/Feature/StreamResponseTest.php +++ b/tests/Feature/StreamResponseTest.php @@ -96,6 +96,21 @@ public function testStopStreamResponse(): void self::assertSame(\implode("\n", ['Hel', 'lo,']), \trim(ServerRunner::getBuffer())); } + public function testSend1xxWithBody(): void + { + $httpWorker = $this->makeHttpWorker(); + + $this->expectExceptionMessage('Unable to send a body with informational status code'); + + $httpWorker->respond( + 103, + (function () { + yield 'Hel'; + yield 'lo,'; + })(), + ); + } + public function testExceptionInGenerator(): void { $httpWorker = $this->makeHttpWorker(); From f785dec0ec420aa9fcde99551d28ff2b9e46a15f Mon Sep 17 00:00:00 2001 From: roxblnfk Date: Wed, 4 Oct 2023 13:37:31 +0400 Subject: [PATCH 7/7] Set spiral/roadrunner version to ^2023.3 --- composer.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/composer.json b/composer.json index d8781ea..e5e9eb5 100644 --- a/composer.json +++ b/composer.json @@ -41,7 +41,7 @@ "ext-json": "*", "psr/http-factory": "^1.0.1", "psr/http-message": "^1.0.1 || ^2.0", - "spiral/roadrunner": "^2023.1", + "spiral/roadrunner": "^2023.3", "spiral/roadrunner-worker": "^3.1.0" }, "require-dev": {