Skip to content

Commit

Permalink
Merge pull request #18: better streaming response supporting
Browse files Browse the repository at this point in the history
Add ability to send separated stream frames
  • Loading branch information
roxblnfk authored Oct 4, 2023
2 parents 30d89df + f785dec commit c6aef2a
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 15 deletions.
6 changes: 4 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,16 @@
"ext-json": "*",
"psr/http-factory": "^1.0.1",
"psr/http-message": "^1.0.1 || ^2.0",
"spiral/roadrunner": "^2023.1",
"spiral/roadrunner-worker": "^3.0"
"spiral/roadrunner": "^2023.3",
"spiral/roadrunner-worker": "^3.1.0"
},
"require-dev": {
"buggregator/trap": "^1.0",
"jetbrains/phpstorm-attributes": "^1.0",
"nyholm/psr7": "^1.3",
"phpunit/phpunit": "^10.0",
"symfony/process": "^6.2",
"symfony/var-dumper": "^6.3",
"vimeo/psalm": "^5.9"
},
"autoload": {
Expand Down
44 changes: 36 additions & 8 deletions src/HttpWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Spiral\RoadRunner\Http\Exception\StreamStoppedException;
use Spiral\RoadRunner\Message\Command\StreamStop;
use Spiral\RoadRunner\Payload;
use Spiral\RoadRunner\StreamWorkerInterface;
use Spiral\RoadRunner\WorkerInterface;

/**
Expand Down Expand Up @@ -64,10 +65,14 @@ public function waitRequest(): ?Request
/**
* @throws \JsonException
*/
public function respond(int $status, string|Generator $body, array $headers = []): void
public function respond(int $status, string|Generator $body = '', array $headers = [], bool $endOfStream = true): void
{
if ($status < 200 && $status >= 100 && $body !== '') {
throw new \InvalidArgumentException('Unable to send a body with informational status code.');
}

if ($body instanceof Generator) {
$this->respondStream($status, $body, $headers);
$this->respondStream($status, $body, $headers, $endOfStream);
return;
}

Expand All @@ -76,30 +81,53 @@ public function respond(int $status, string|Generator $body, array $headers = []
'headers' => $headers ?: (object)[],
], \JSON_THROW_ON_ERROR);

$this->worker->respond(new Payload($body, $head));
$this->worker->respond(new Payload($body, $head, $endOfStream));
}

private function respondStream(int $status, Generator $body, array $headers = []): void
private function respondStream(int $status, Generator $body, array $headers = [], bool $endOfStream = true): void
{
$head = \json_encode([
'status' => $status,
'headers' => $headers ?: (object)[],
], \JSON_THROW_ON_ERROR);

$worker = $this->worker instanceof StreamWorkerInterface
? $this->worker->withStreamMode()
: $this->worker;

do {
if (!$body->valid()) {
// End of generator
$content = (string)$body->getReturn();
$this->worker->respond(new Payload($content, $head, true));
if ($endOfStream === false && $content === '') {
// We don't need to send an empty frame if the stream is not ended
return;
}
$worker->respond(new Payload($content, $head, $endOfStream));
break;
}

$content = (string)$body->current();
if ($this->worker->getPayload(StreamStop::class) !== null) {
if ($worker->getPayload(StreamStop::class) !== null) {
$body->throw(new StreamStoppedException());

// RoadRunner is waiting for a Stream Stop Frame to confirm that the stream is closed
// and the worker doesn't hang
$worker->respond(new Payload(''));
return;
}
$this->worker->respond(new Payload($content, $head, false));
$body->next();

// Send a chunk of data
$worker->respond(new Payload($content, $head, false));
$head = null;

try {
$body->next();
} catch (\Throwable) {
// Stop the stream if an exception is thrown from the generator
$worker->respond(new Payload(''));
return;
}
} while (true);
}

Expand Down
3 changes: 1 addition & 2 deletions src/HttpWorkerInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ public function waitRequest(): ?Request;
* @param Generator<mixed, scalar|Stringable, mixed, Stringable|scalar|null>|string $body Body of response.
* If the body is a generator, then each yielded value will be sent as a separated stream chunk.
* Returned value will be sent as a last stream package.
* Note: Stream response is experimental feature and isn't supported by RoadRunner yet.
* But you can try to use RoadRunner 2.9-alpha to test it.
* Note: Stream response is supported by RoadRunner since version 2023.3
* @param HeadersList|array $headers An associative array of the message's headers. Each key MUST be a header name,
* and each value MUST be an array of strings for that header.
*/
Expand Down
47 changes: 44 additions & 3 deletions tests/Feature/StreamResponseTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@

namespace Spiral\RoadRunner\Tests\Http\Feature;

use Exception;
use PHPUnit\Framework\TestCase;
use Spiral\Goridge\SocketRelay;
use Spiral\RoadRunner\Http\Exception\StreamStoppedException;
use Spiral\RoadRunner\Http\HttpWorker;
use Spiral\RoadRunner\Message\Command\GetProcessId;
use Spiral\RoadRunner\Payload;
use Spiral\RoadRunner\Tests\Http\Server\Command\BaseCommand;
use Spiral\RoadRunner\Tests\Http\Server\Command\StreamStop;
Expand Down Expand Up @@ -94,10 +96,46 @@ public function testStopStreamResponse(): void
self::assertSame(\implode("\n", ['Hel', 'lo,']), \trim(ServerRunner::getBuffer()));
}

public function testSend1xxWithBody(): void
{
$httpWorker = $this->makeHttpWorker();

$this->expectExceptionMessage('Unable to send a body with informational status code');

$httpWorker->respond(
103,
(function () {
yield 'Hel';
yield 'lo,';
})(),
);
}

public function testExceptionInGenerator(): void
{
$httpWorker = $this->makeHttpWorker();

// Flush buffer
ServerRunner::getBuffer();

$httpWorker->respond(
200,
(function () {
yield 'Hel';
yield 'lo,';
throw new Exception('test');
})(),
);


\usleep(100_000);
self::assertSame(\implode("\n", ['Hel', 'lo,']), \trim(ServerRunner::getBuffer()));
}

/**
* StopStream should be ignored if stream is already ended.
* Commented because doesn't pass in CI
* todo: check after RoadRunner Stream Response release
*/
public function testStopStreamAfterStreamEnd(): void
{
$httpWorker = $this->makeHttpWorker();
Expand All @@ -116,11 +154,14 @@ public function testStopStreamAfterStreamEnd(): void
$this->assertFalse($this->getWorker()->hasPayload(\Spiral\RoadRunner\Message\Command\StreamStop::class));
$this->sendCommand(new StreamStop());
\usleep(200_000);
self::assertSame(\implode("\n", ['Hello', 'World!']), \trim(ServerRunner::getBuffer()));
$this->assertSame(\implode("\n", ['Hello', 'World!']), \trim(ServerRunner::getBuffer()));
$this->assertTrue($this->getWorker()->hasPayload(\Spiral\RoadRunner\Message\Command\StreamStop::class));

$this->getWorker()->getPayload(\Spiral\RoadRunner\Message\Command\StreamStop::class);
$this->getWorker()->getPayload(GetProcessId::class);

$this->assertFalse($this->getWorker()->hasPayload());
}
*/

private function getRelay(): SocketRelay
{
Expand Down

0 comments on commit c6aef2a

Please sign in to comment.