Skip to content

Commit a0cc561

Browse files
authored
Support for streamed output (#10)
1 parent 2d76b77 commit a0cc561

File tree

4 files changed

+70
-10
lines changed

4 files changed

+70
-10
lines changed

composer.json

+3-3
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
"require": {
1717
"php": ">=7.4",
1818
"ext-json": "*",
19-
"spiral/roadrunner-worker": "^2.0",
19+
"spiral/roadrunner-worker": "^2.2.0",
2020
"psr/http-factory": "^1.0.1",
2121
"psr/http-message": "^1.0.1"
2222
},
@@ -30,15 +30,15 @@
3030
"phpstan/phpstan": "~0.12",
3131
"phpunit/phpunit": "~8.0",
3232
"jetbrains/phpstorm-attributes": "^1.0",
33-
"vimeo/psalm": "^4.4",
33+
"vimeo/psalm": "^4.22",
3434
"symfony/var-dumper": "^5.1"
3535
},
3636
"scripts": {
3737
"analyze": "psalm"
3838
},
3939
"extra": {
4040
"branch-alias": {
41-
"dev-master": "2.1.x-dev"
41+
"dev-master": "2.2.x-dev"
4242
}
4343
},
4444
"suggest": {

src/HttpWorker.php

+33-4
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,10 @@
1111

1212
namespace Spiral\RoadRunner\Http;
1313

14+
use Generator;
1415
use Spiral\RoadRunner\Payload;
1516
use Spiral\RoadRunner\WorkerInterface;
17+
use Stringable;
1618

1719
/**
1820
* @psalm-import-type HeadersList from Request
@@ -83,12 +85,39 @@ public function waitRequest(): ?Request
8385
*/
8486
public function respond(int $status, string $body, array $headers = []): void
8587
{
86-
$headers = (string)\json_encode([
88+
$head = (string)\json_encode([
8789
'status' => $status,
8890
'headers' => $headers ?: (object)[],
8991
], \JSON_THROW_ON_ERROR);
9092

91-
$this->worker->respond(new Payload($body, $headers));
93+
$this->worker->respond(new Payload($body, $head));
94+
}
95+
96+
/**
97+
* Respond data using Streamed Output
98+
*
99+
* @param Generator<mixed, scalar|Stringable, mixed, Stringable|scalar|null> $body Body generator.
100+
* Each yielded value will be sent as a separated stream chunk.
101+
* Returned value will be sent as a last stream package.
102+
*/
103+
public function respondStream(int $status, Generator $body, array $headers = []): void
104+
{
105+
$head = (string)\json_encode([
106+
'status' => $status,
107+
'headers' => $headers ?: (object)[],
108+
], \JSON_THROW_ON_ERROR);
109+
110+
do {
111+
if (!$body->valid()) {
112+
$content = (string)$body->getReturn();
113+
$this->worker->respond(new Payload($content, $head, true));
114+
break;
115+
}
116+
$content = (string)$body->current();
117+
$this->worker->respond(new Payload($content, $head, false));
118+
$body->next();
119+
$head = null;
120+
} while (true);
92121
}
93122

94123
/**
@@ -131,7 +160,7 @@ private function hydrateRequest(Request $request, array $context): void
131160
}
132161

133162
/**
134-
* @param array<mixed, mixed> $headers
163+
* Remove all non-string and empty-string keys
135164
*
136165
* @return array<string, mixed>
137166
*/
@@ -144,7 +173,7 @@ private function filterHeaders(array $headers): array
144173
unset($headers[$key]);
145174
}
146175
}
147-
176+
/** @var array<string, mixed> $headers */
148177
return $headers;
149178
}
150179
}

src/PSR7Worker.php

+34-2
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,16 @@
1111

1212
namespace Spiral\RoadRunner\Http;
1313

14+
use Generator;
1415
use Psr\Http\Message\ResponseInterface;
1516
use Psr\Http\Message\ServerRequestFactoryInterface;
1617
use Psr\Http\Message\ServerRequestInterface;
1718
use Psr\Http\Message\StreamFactoryInterface;
19+
use Psr\Http\Message\StreamInterface;
1820
use Psr\Http\Message\UploadedFileFactoryInterface;
1921
use Psr\Http\Message\UploadedFileInterface;
2022
use Spiral\RoadRunner\WorkerInterface;
23+
use Stringable;
2124

2225
/**
2326
* Manages PSR-7 request and response.
@@ -27,6 +30,8 @@
2730
*/
2831
class PSR7Worker implements PSR7WorkerInterface
2932
{
33+
public int $chunk_size = 8 * 1024;
34+
3035
/**
3136
* @var HttpWorker
3237
*/
@@ -108,13 +113,40 @@ public function waitRequest(): ?ServerRequestInterface
108113
*/
109114
public function respond(ResponseInterface $response): void
110115
{
111-
$this->httpWorker->respond(
116+
$this->httpWorker->respondStream(
112117
$response->getStatusCode(),
113-
(string)$response->getBody(),
118+
$this->streamToGenerator($response->getBody()),
114119
$response->getHeaders()
115120
);
116121
}
117122

123+
/**
124+
* @return Generator<mixed, scalar|Stringable, mixed, Stringable|scalar|null> Compatible
125+
* with {@see \Spiral\RoadRunner\Http\HttpWorker::respondStream()}.
126+
*/
127+
private function streamToGenerator(StreamInterface $stream): Generator
128+
{
129+
$stream->rewind();
130+
$size = $stream->getSize();
131+
if ($size !== null && $size < $this->chunk_size) {
132+
return (string)$stream;
133+
}
134+
$sum = 0;
135+
while (!$stream->eof()) {
136+
if ($size === null) {
137+
$chunk = $stream->read($this->chunk_size);
138+
} else {
139+
$left = $size - $sum;
140+
$chunk = $stream->read(\min($this->chunk_size, $left));
141+
if ($left <= $this->chunk_size && \strlen($chunk) === $left) {
142+
return $chunk;
143+
}
144+
}
145+
$sum += \strlen($chunk);
146+
yield $chunk;
147+
}
148+
}
149+
118150
/**
119151
* Returns altered copy of _SERVER variable. Sets ip-address,
120152
* request-time and other values.

src/Request.php

-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@ final class Request
3636
/**
3737
* @var string
3838
*/
39-
4039
public string $remoteAddr = '127.0.0.1';
4140

4241
/**

0 commit comments

Comments
 (0)