diff --git a/composer.json b/composer.json index aa4a4d1..e5e9eb5 100644 --- a/composer.json +++ b/composer.json @@ -41,14 +41,16 @@ "ext-json": "*", "psr/http-factory": "^1.0.1", "psr/http-message": "^1.0.1 || ^2.0", - "spiral/roadrunner": "^2023.1", - "spiral/roadrunner-worker": "^3.0" + "spiral/roadrunner": "^2023.3", + "spiral/roadrunner-worker": "^3.1.0" }, "require-dev": { + "buggregator/trap": "^1.0", "jetbrains/phpstorm-attributes": "^1.0", "nyholm/psr7": "^1.3", "phpunit/phpunit": "^10.0", "symfony/process": "^6.2", + "symfony/var-dumper": "^6.3", "vimeo/psalm": "^5.9" }, "autoload": { diff --git a/src/HttpWorker.php b/src/HttpWorker.php index 4a5294d..92887f7 100644 --- a/src/HttpWorker.php +++ b/src/HttpWorker.php @@ -8,6 +8,7 @@ use Spiral\RoadRunner\Http\Exception\StreamStoppedException; use Spiral\RoadRunner\Message\Command\StreamStop; use Spiral\RoadRunner\Payload; +use Spiral\RoadRunner\StreamWorkerInterface; use Spiral\RoadRunner\WorkerInterface; /** @@ -64,10 +65,14 @@ public function waitRequest(): ?Request /** * @throws \JsonException */ - public function respond(int $status, string|Generator $body, array $headers = []): void + public function respond(int $status, string|Generator $body = '', array $headers = [], bool $endOfStream = true): void { + if ($status < 200 && $status >= 100 && $body !== '') { + throw new \InvalidArgumentException('Unable to send a body with informational status code.'); + } + if ($body instanceof Generator) { - $this->respondStream($status, $body, $headers); + $this->respondStream($status, $body, $headers, $endOfStream); return; } @@ -76,30 +81,53 @@ public function respond(int $status, string|Generator $body, array $headers = [] 'headers' => $headers ?: (object)[], ], \JSON_THROW_ON_ERROR); - $this->worker->respond(new Payload($body, $head)); + $this->worker->respond(new Payload($body, $head, $endOfStream)); } - private function respondStream(int $status, Generator $body, array $headers = []): void + private function respondStream(int $status, Generator $body, array $headers = [], bool $endOfStream = true): void { $head = \json_encode([ 'status' => $status, 'headers' => $headers ?: (object)[], ], \JSON_THROW_ON_ERROR); + $worker = $this->worker instanceof StreamWorkerInterface + ? $this->worker->withStreamMode() + : $this->worker; + do { if (!$body->valid()) { + // End of generator $content = (string)$body->getReturn(); - $this->worker->respond(new Payload($content, $head, true)); + if ($endOfStream === false && $content === '') { + // We don't need to send an empty frame if the stream is not ended + return; + } + $worker->respond(new Payload($content, $head, $endOfStream)); break; } + $content = (string)$body->current(); - if ($this->worker->getPayload(StreamStop::class) !== null) { + if ($worker->getPayload(StreamStop::class) !== null) { $body->throw(new StreamStoppedException()); + + // RoadRunner is waiting for a Stream Stop Frame to confirm that the stream is closed + // and the worker doesn't hang + $worker->respond(new Payload('')); return; } - $this->worker->respond(new Payload($content, $head, false)); - $body->next(); + + // Send a chunk of data + $worker->respond(new Payload($content, $head, false)); $head = null; + + try { + $body->next(); + } catch (\Throwable) { + // Stop the stream if an exception is thrown from the generator + $worker->respond(new Payload('')); + return; + } } while (true); } diff --git a/src/HttpWorkerInterface.php b/src/HttpWorkerInterface.php index 9198e7b..c163193 100644 --- a/src/HttpWorkerInterface.php +++ b/src/HttpWorkerInterface.php @@ -25,8 +25,7 @@ public function waitRequest(): ?Request; * @param Generator|string $body Body of response. * If the body is a generator, then each yielded value will be sent as a separated stream chunk. * Returned value will be sent as a last stream package. - * Note: Stream response is experimental feature and isn't supported by RoadRunner yet. - * But you can try to use RoadRunner 2.9-alpha to test it. + * Note: Stream response is supported by RoadRunner since version 2023.3 * @param HeadersList|array $headers An associative array of the message's headers. Each key MUST be a header name, * and each value MUST be an array of strings for that header. */ diff --git a/tests/Feature/StreamResponseTest.php b/tests/Feature/StreamResponseTest.php index 13a0450..9da68b8 100644 --- a/tests/Feature/StreamResponseTest.php +++ b/tests/Feature/StreamResponseTest.php @@ -4,10 +4,12 @@ namespace Spiral\RoadRunner\Tests\Http\Feature; +use Exception; use PHPUnit\Framework\TestCase; use Spiral\Goridge\SocketRelay; use Spiral\RoadRunner\Http\Exception\StreamStoppedException; use Spiral\RoadRunner\Http\HttpWorker; +use Spiral\RoadRunner\Message\Command\GetProcessId; use Spiral\RoadRunner\Payload; use Spiral\RoadRunner\Tests\Http\Server\Command\BaseCommand; use Spiral\RoadRunner\Tests\Http\Server\Command\StreamStop; @@ -94,10 +96,46 @@ public function testStopStreamResponse(): void self::assertSame(\implode("\n", ['Hel', 'lo,']), \trim(ServerRunner::getBuffer())); } + public function testSend1xxWithBody(): void + { + $httpWorker = $this->makeHttpWorker(); + + $this->expectExceptionMessage('Unable to send a body with informational status code'); + + $httpWorker->respond( + 103, + (function () { + yield 'Hel'; + yield 'lo,'; + })(), + ); + } + + public function testExceptionInGenerator(): void + { + $httpWorker = $this->makeHttpWorker(); + + // Flush buffer + ServerRunner::getBuffer(); + + $httpWorker->respond( + 200, + (function () { + yield 'Hel'; + yield 'lo,'; + throw new Exception('test'); + })(), + ); + + + \usleep(100_000); + self::assertSame(\implode("\n", ['Hel', 'lo,']), \trim(ServerRunner::getBuffer())); + } + /** * StopStream should be ignored if stream is already ended. * Commented because doesn't pass in CI - * todo: check after RoadRunner Stream Response release + */ public function testStopStreamAfterStreamEnd(): void { $httpWorker = $this->makeHttpWorker(); @@ -116,11 +154,14 @@ public function testStopStreamAfterStreamEnd(): void $this->assertFalse($this->getWorker()->hasPayload(\Spiral\RoadRunner\Message\Command\StreamStop::class)); $this->sendCommand(new StreamStop()); \usleep(200_000); - self::assertSame(\implode("\n", ['Hello', 'World!']), \trim(ServerRunner::getBuffer())); + $this->assertSame(\implode("\n", ['Hello', 'World!']), \trim(ServerRunner::getBuffer())); $this->assertTrue($this->getWorker()->hasPayload(\Spiral\RoadRunner\Message\Command\StreamStop::class)); + + $this->getWorker()->getPayload(\Spiral\RoadRunner\Message\Command\StreamStop::class); + $this->getWorker()->getPayload(GetProcessId::class); + $this->assertFalse($this->getWorker()->hasPayload()); } - */ private function getRelay(): SocketRelay {