diff --git a/.editorconfig b/.editorconfig
new file mode 100644
index 0000000..a998937
--- /dev/null
+++ b/.editorconfig
@@ -0,0 +1,15 @@
+root = true
+
+[*]
+charset = utf-8
+end_of_line = lf
+insert_final_newline = true
+indent_style = space
+indent_size = 4
+trim_trailing_whitespace = true
+
+[*.{yml, yaml, sh, conf, neon*}]
+indent_size = 2
+
+[*.md]
+trim_trailing_whitespace = false
diff --git a/.gitattributes b/.gitattributes
new file mode 100644
index 0000000..bc4b7e4
--- /dev/null
+++ b/.gitattributes
@@ -0,0 +1,10 @@
+* text=auto
+
+/.github export-ignore
+/tests export-ignore
+/.* export-ignore
+/phpunit.xml* export-ignore
+/phpstan.* export-ignore
+/psalm.* export-ignore
+/infection.* export-ignore
+/codecov.* export-ignore
diff --git a/.github/CODE_OF_CONDUCT.md b/.github/CODE_OF_CONDUCT.md
new file mode 100644
index 0000000..5d589cb
--- /dev/null
+++ b/.github/CODE_OF_CONDUCT.md
@@ -0,0 +1,12 @@
+# Code of Conduct
+
+The Spiral code of conduct is derived from the Ruby code of conduct.
+Any violations of the code of conduct may be reported by email `wolfy-j[at]spiralscout.com`.
+
+- Participants will be tolerant of opposing views.
+
+- Participants must ensure that their language and actions are free of personal attacks and disparaging personal remarks.
+
+- When interpreting the words and actions of others, participants should always assume good intentions.
+
+- Behavior which can be reasonably considered harassment will not be tolerated.
diff --git a/.github/CONTRIBUTING.md b/.github/CONTRIBUTING.md
new file mode 100644
index 0000000..90e5310
--- /dev/null
+++ b/.github/CONTRIBUTING.md
@@ -0,0 +1,6 @@
+# Contributing
+
+Feel free to contribute to the development of the Framework or its components.
+
+For more information on contributing rules you can find on the documentation
+page at https://spiral.dev/docs/about-contributing
diff --git a/.github/FUNDING.yml b/.github/FUNDING.yml
new file mode 100644
index 0000000..38798a2
--- /dev/null
+++ b/.github/FUNDING.yml
@@ -0,0 +1,3 @@
+# These are supported funding model platforms
+
+github: roadrunner-server
diff --git a/.github/ISSUE_TEMPLATE/1_Bug_report.md b/.github/ISSUE_TEMPLATE/1_Bug_report.md
new file mode 100644
index 0000000..5ea3ac3
--- /dev/null
+++ b/.github/ISSUE_TEMPLATE/1_Bug_report.md
@@ -0,0 +1,23 @@
+---
+name: 🐛 Bug Report
+about: Report errors and problems
+labels: Bug
+
+---
+### Description
+
+
+
+### How To Reproduce
+
+
+
+### Additional Info
+
+| Q | A
+|------------------| ---
+| Package Version | x.y.z
+| PHP version | x.y.z
+| Operating system | Linux
+
+
diff --git a/.github/ISSUE_TEMPLATE/2_Feature_request.md b/.github/ISSUE_TEMPLATE/2_Feature_request.md
new file mode 100644
index 0000000..4a14c3d
--- /dev/null
+++ b/.github/ISSUE_TEMPLATE/2_Feature_request.md
@@ -0,0 +1,14 @@
+---
+name: 🚀 Feature Request
+about: RFC and ideas for new features and improvements
+labels: Feature
+
+---
+## Description
+
+
+
+## Example
+
+
diff --git a/.github/ISSUE_TEMPLATE/3_Support_question.md b/.github/ISSUE_TEMPLATE/3_Support_question.md
new file mode 100644
index 0000000..7d0ecb4
--- /dev/null
+++ b/.github/ISSUE_TEMPLATE/3_Support_question.md
@@ -0,0 +1,15 @@
+---
+name: ❓ Question
+about: Use if you have problems and don't know how to formulate them
+labels: Question
+
+---
+
+
diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md
new file mode 100644
index 0000000..8c0643a
--- /dev/null
+++ b/.github/PULL_REQUEST_TEMPLATE.md
@@ -0,0 +1,10 @@
+| Q | A
+| ------------- | ---
+| Bugfix? | ✔️/❌
+| Breaks BC? | ✔️/❌
+| New feature? | ✔️/❌
+| Issues | #...
+| Docs PR | spiral/docs#...
+
+
diff --git a/.github/SECURITY.md b/.github/SECURITY.md
new file mode 100644
index 0000000..259a7a0
--- /dev/null
+++ b/.github/SECURITY.md
@@ -0,0 +1,11 @@
+Security Policy
+===============
+
+If you found something which shouldn't be there or a bug which opens a security
+hole, please let me know immediately by email `wolfy-j[at]spiralscout.com`.
+
+DO NOT PUBLISH SECURITY REPORTS PUBLICLY.
+
+The full [Security Policy][1] is described in the official documentation.
+
+ [1]: https://spiral.dev/docs/about-contributing#critical-security-issues
diff --git a/.github/workflows/phpunit.yml b/.github/workflows/phpunit.yml
new file mode 100644
index 0000000..504edfd
--- /dev/null
+++ b/.github/workflows/phpunit.yml
@@ -0,0 +1,19 @@
+on:
+ pull_request: null
+ push:
+ branches:
+ - master
+ - '*.*'
+
+name: phpunit
+
+jobs:
+ phpunit:
+ uses: spiral/gh-actions/.github/workflows/phpunit.yml@master
+ with:
+ os: >-
+ ['ubuntu-latest']
+ php: >-
+ ['8.1', '8.2']
+ stability: >-
+ ['prefer-lowest', 'prefer-stable']
diff --git a/.github/workflows/psalm.yml b/.github/workflows/psalm.yml
new file mode 100644
index 0000000..02a83d3
--- /dev/null
+++ b/.github/workflows/psalm.yml
@@ -0,0 +1,17 @@
+on:
+ pull_request: null
+ push:
+ branches:
+ - master
+ - '*.*'
+
+name: static analysis
+
+jobs:
+ psalm:
+ uses: spiral/gh-actions/.github/workflows/psalm.yml@master
+ with:
+ os: >-
+ ['ubuntu-latest']
+ php: >-
+ ['8.1']
diff --git a/.gitignore b/.gitignore
index ce65a4b..4059830 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,5 +1,20 @@
-# Dependency directories (remove the comment below to include it)
-# vendor/
-.idea
-vendor
+# Composer lock file
composer.lock
+
+# IDEs
+/.idea
+/.vscode
+
+# Vendors
+/vendor
+**/vendor
+
+# Temp dirs & trash
+/tests/server*
+clover*
+cover*
+.DS_Store
+*.cache
+
+.phpunit.cache/
+.phpunit.result.cache
diff --git a/composer.json b/composer.json
index ab294bd..6402581 100644
--- a/composer.json
+++ b/composer.json
@@ -5,42 +5,71 @@
"license": "MIT",
"authors": [
{
- "name": "Anton Titov / Wolfy-J",
- "email": "wolfy.jd@gmail.com"
+ "name": "Anton Titov (wolfy-j)",
+ "email": "wolfy-j@spiralscout.com"
+ },
+ {
+ "name": "Valery Piashchynski",
+ "homepage": "https://github.com/rustatian"
+ },
+ {
+ "name": "Aleksei Gagarin (roxblnfk)",
+ "homepage": "https://github.com/roxblnfk"
+ },
+ {
+ "name": "Pavel Buchnev (butschster)",
+ "email": "pavel.buchnev@spiralscout.com"
+ },
+ {
+ "name": "Maksim Smakouz (msmakouz)",
+ "email": "maksim.smakouz@spiralscout.com"
},
{
"name": "RoadRunner Community",
- "homepage": "https://github.com/spiral/roadrunner/graphs/contributors"
+ "homepage": "https://github.com/roadrunner-server/roadrunner/graphs/contributors"
}
],
+ "homepage": "https://spiral.dev/",
+ "support": {
+ "docs": "https://roadrunner.dev/docs",
+ "issues": "https://github.com/roadrunner-server/roadrunner/issues",
+ "forum": "https://forum.roadrunner.dev/",
+ "chat": "https://discord.gg/V6EK4he"
+ },
"require": {
- "php": ">=7.4",
+ "php": ">=8.1",
"ext-json": "*",
- "spiral/roadrunner-worker": "^2.2.0",
"psr/http-factory": "^1.0.1",
- "psr/http-message": "^1.0.1"
+ "psr/http-message": "^1.0.1",
+ "spiral/roadrunner": "^2023.1",
+ "spiral/roadrunner-worker": "^3.0"
+ },
+ "require-dev": {
+ "jetbrains/phpstorm-attributes": "^1.0",
+ "nyholm/psr7": "^1.3",
+ "phpunit/phpunit": "^10.0",
+ "symfony/process": "^6.2",
+ "vimeo/psalm": "^5.9"
},
"autoload": {
"psr-4": {
"Spiral\\RoadRunner\\Http\\": "src"
}
},
- "require-dev": {
- "nyholm/psr7": "^1.3",
- "phpstan/phpstan": "~0.12",
- "phpunit/phpunit": "~8.0",
- "jetbrains/phpstorm-attributes": "^1.0",
- "vimeo/psalm": "^4.22",
- "symfony/var-dumper": "^5.1"
+ "autoload-dev": {
+ "psr-4": {
+ "Spiral\\RoadRunner\\Tests\\Http\\": "tests"
+ }
},
+ "funding": [
+ {
+ "type": "github",
+ "url": "https://github.com/sponsors/roadrunner-server"
+ }
+ ],
"scripts": {
"analyze": "psalm"
},
- "extra": {
- "branch-alias": {
- "dev-master": "2.2.x-dev"
- }
- },
"suggest": {
"spiral/roadrunner-cli": "Provides RoadRunner installation and management CLI tools"
},
diff --git a/phpunit.xml b/phpunit.xml
new file mode 100644
index 0000000..c12c7b2
--- /dev/null
+++ b/phpunit.xml
@@ -0,0 +1,22 @@
+
+
+
+
+ tests
+
+
+
+
+ src
+
+
+
diff --git a/psalm.xml b/psalm.xml
index 6716a38..817a397 100644
--- a/psalm.xml
+++ b/psalm.xml
@@ -1,10 +1,12 @@
diff --git a/src/HttpWorker.php b/src/HttpWorker.php
index 097f602..d07b0ea 100644
--- a/src/HttpWorker.php
+++ b/src/HttpWorker.php
@@ -1,20 +1,13 @@
worker = $worker;
+ public function __construct(
+ private readonly WorkerInterface $worker,
+ ) {
}
- /**
- * @return WorkerInterface
- */
public function getWorker(): WorkerInterface
{
return $this->worker;
}
/**
- * {@inheritDoc}
* @throws \JsonException
*/
public function waitRequest(): ?Request
@@ -80,12 +61,16 @@ public function waitRequest(): ?Request
}
/**
- * {@inheritDoc}
* @throws \JsonException
*/
- public function respond(int $status, string $body, array $headers = []): void
+ public function respond(int $status, string|Generator $body, array $headers = []): void
{
- $head = (string)\json_encode([
+ if ($body instanceof Generator) {
+ $this->respondStream($status, $body, $headers);
+ return;
+ }
+
+ $head = \json_encode([
'status' => $status,
'headers' => $headers ?: (object)[],
], \JSON_THROW_ON_ERROR);
@@ -93,16 +78,9 @@ public function respond(int $status, string $body, array $headers = []): void
$this->worker->respond(new Payload($body, $head));
}
- /**
- * Respond data using Streamed Output
- *
- * @param Generator $body Body generator.
- * Each yielded value will be sent as a separated stream chunk.
- * Returned value will be sent as a last stream package.
- */
- public function respondStream(int $status, Generator $body, array $headers = []): void
+ private function respondStream(int $status, Generator $body, array $headers = []): void
{
- $head = (string)\json_encode([
+ $head = \json_encode([
'status' => $status,
'headers' => $headers ?: (object)[],
], \JSON_THROW_ON_ERROR);
@@ -114,6 +92,10 @@ public function respondStream(int $status, Generator $body, array $headers = [])
break;
}
$content = (string)$body->current();
+ if ($this->worker->getPayload(StreamStop::class) !== null) {
+ $body->throw(new \RuntimeException('Stream has been stopped by the client.'));
+ return;
+ }
$this->worker->respond(new Payload($content, $head, false));
$body->next();
$head = null;
@@ -121,50 +103,33 @@ public function respondStream(int $status, Generator $body, array $headers = [])
}
/**
- * @param string $body
* @param RequestContext $context
- * @return Request
- *
- * @psalm-suppress InaccessibleProperty
*/
private function createRequest(string $body, array $context): Request
{
- $request = new Request();
- $request->body = $body;
-
- $this->hydrateRequest($request, $context);
-
- return $request;
- }
-
- /**
- * @param Request $request
- * @param RequestContext $context
- *
- * @psalm-suppress InaccessibleProperty
- * @psalm-suppress MixedPropertyTypeCoercion
- */
- private function hydrateRequest(Request $request, array $context): void
- {
- $request->remoteAddr = $context['remoteAddr'];
- $request->protocol = $context['protocol'];
- $request->method = $context['method'];
- $request->uri = $context['uri'];
- \parse_str($context['rawQuery'], $request->query);
-
- $request->attributes = (array)($context['attributes'] ?? []);
- $request->headers = $this->filterHeaders((array)($context['headers'] ?? []));
- $request->cookies = (array)($context['cookies'] ?? []);
- $request->uploads = (array)($context['uploads'] ?? []);
- $request->parsed = (bool)$context['parsed'];
-
- $request->attributes[Request::PARSED_BODY_ATTRIBUTE_NAME] = $request->parsed;
+ \parse_str($context['rawQuery'], $query);
+ return new Request(
+ remoteAddr: $context['remoteAddr'],
+ protocol: $context['protocol'],
+ method: $context['method'],
+ uri: $context['uri'],
+ headers: $this->filterHeaders((array)($context['headers'] ?? [])),
+ cookies: (array)($context['cookies'] ?? []),
+ uploads: (array)($context['uploads'] ?? []),
+ attributes: [
+ Request::PARSED_BODY_ATTRIBUTE_NAME => $context['parsed'],
+ ] + (array)($context['attributes'] ?? []),
+ query: $query,
+ body: $body,
+ parsed: $context['parsed'],
+ );
}
/**
* Remove all non-string and empty-string keys
*
- * @return array
+ * @param array> $headers
+ * @return HeadersList
*/
private function filterHeaders(array $headers): array
{
@@ -175,7 +140,8 @@ private function filterHeaders(array $headers): array
unset($headers[$key]);
}
}
- /** @var array $headers */
+
+ /** @var HeadersList $headers */
return $headers;
}
}
diff --git a/src/HttpWorkerInterface.php b/src/HttpWorkerInterface.php
index 20f58b4..9198e7b 100644
--- a/src/HttpWorkerInterface.php
+++ b/src/HttpWorkerInterface.php
@@ -1,17 +1,12 @@
|string $body Body of response.
+ * If the body is a generator, then each yielded value will be sent as a separated stream chunk.
+ * Returned value will be sent as a last stream package.
+ * Note: Stream response is experimental feature and isn't supported by RoadRunner yet.
+ * But you can try to use RoadRunner 2.9-alpha to test it.
* @param HeadersList|array $headers An associative array of the message's headers. Each key MUST be a header name,
* and each value MUST be an array of strings for that header.
*/
- public function respond(int $status, string $body, array $headers = []): void;
+ public function respond(int $status, string|Generator $body, array $headers = []): void;
}
diff --git a/src/PSR7Worker.php b/src/PSR7Worker.php
index cbeabaa..8df0338 100644
--- a/src/PSR7Worker.php
+++ b/src/PSR7Worker.php
@@ -1,12 +1,5 @@
httpWorker = new HttpWorker($worker);
- $this->requestFactory = $requestFactory;
- $this->streamFactory = $streamFactory;
- $this->uploadsFactory = $uploadsFactory;
$this->originalServer = $_SERVER;
}
- /**
- * @return WorkerInterface
- */
public function getWorker(): WorkerInterface
{
return $this->httpWorker->getWorker();
}
/**
- * @return ServerRequestInterface|null
* @throws \JsonException
*/
public function waitRequest(): ?ServerRequestInterface
@@ -112,23 +72,17 @@ public function waitRequest(): ?ServerRequestInterface
/**
* Send response to the application server.
*
- * @param ResponseInterface $response
* @throws \JsonException
*/
public function respond(ResponseInterface $response): void
{
- if ($this->chunkSize > 0) {
- $this->httpWorker->respondStream(
- $response->getStatusCode(),
- $this->streamToGenerator($response->getBody()),
- $response->getHeaders()
- );
- } else {
- $this->httpWorker->respond(
- $response->getStatusCode(),
- (string)$response->getBody(),
- $response->getHeaders());
- }
+ $this->httpWorker->respond(
+ $response->getStatusCode(),
+ $this->chunkSize > 0
+ ? $this->streamToGenerator($response->getBody())
+ : (string)$response->getBody(),
+ $response->getHeaders()
+ );
}
/**
@@ -162,7 +116,6 @@ private function streamToGenerator(StreamInterface $stream): Generator
* Returns altered copy of _SERVER variable. Sets ip-address,
* request-time and other values.
*
- * @param Request $request
* @return non-empty-array
*/
protected function configureServer(Request $request): array
@@ -177,7 +130,7 @@ protected function configureServer(Request $request): array
$server['HTTP_USER_AGENT'] = '';
foreach ($request->headers as $key => $value) {
- $key = \strtoupper(\str_replace('-', '_', (string)$key));
+ $key = \strtoupper(\str_replace('-', '_', $key));
if (\in_array($key, ['CONTENT_TYPE', 'CONTENT_LENGTH'])) {
$server[$key] = \implode(', ', $value);
} else {
@@ -188,26 +141,17 @@ protected function configureServer(Request $request): array
return $server;
}
- /**
- * @return int
- */
protected function timeInt(): int
{
return \time();
}
- /**
- * @return float
- */
protected function timeFloat(): float
{
return \microtime(true);
}
/**
- * @param Request $httpRequest
- * @param array $server
- * @return ServerRequestInterface
* @throws \JsonException
*/
protected function mapRequest(Request $httpRequest, array $server): ServerRequestInterface
@@ -282,9 +226,6 @@ protected function wrapUploads(array $files): array
/**
* Normalize HTTP protocol version to valid values
- *
- * @param string $version
- * @return string
*/
private static function fetchProtocolVersion(string $version): string
{
diff --git a/src/PSR7WorkerInterface.php b/src/PSR7WorkerInterface.php
index 137c4cd..becbfb2 100644
--- a/src/PSR7WorkerInterface.php
+++ b/src/PSR7WorkerInterface.php
@@ -1,12 +1,5 @@
,
+ * tmpName: non-empty-string,
+ * size: int<0, max>,
* mime: string
* }
*
- * @psalm-type HeadersList = array>
+ * @psalm-type HeadersList = array>
* @psalm-type AttributesList = array
- * @psalm-type QueryArgumentsList = array
+ * @psalm-type QueryArgumentsList = array
* @psalm-type CookiesList = array
* @psalm-type UploadedFilesList = array
+ *
+ * @psalm-immutable
*/
#[Immutable]
final class Request
@@ -36,70 +31,33 @@ final class Request
public const PARSED_BODY_ATTRIBUTE_NAME = 'rr_parsed_body';
/**
- * @var string
- */
- public string $remoteAddr = '127.0.0.1';
-
- /**
- * @var string
- */
- public string $protocol = 'HTTP/1.0';
-
- /**
- * @var string
- */
- public string $method = 'GET';
-
- /**
- * @var string
+ * @param HeadersList $headers
+ * @param CookiesList $cookies
+ * @param UploadedFilesList $uploads
+ * @param AttributesList $attributes
+ * @param QueryArgumentsList $query
*/
- public string $uri = 'http://localhost';
-
- /**
- * @var HeadersList
- */
- public array $headers = [];
-
- /**
- * @var CookiesList
- */
- public array $cookies = [];
-
- /**
- * @var UploadedFilesList
- */
- public array $uploads = [];
-
- /**
- * @var AttributesList
- */
- public array $attributes = [];
-
- /**
- * @var QueryArgumentsList
- */
- public array $query = [];
-
- /**
- * @var string
- */
- public string $body = '';
-
- /**
- * @var bool
- */
- public bool $parsed = false;
+ public function __construct(
+ public readonly string $remoteAddr = '127.0.0.1',
+ public readonly string $protocol = 'HTTP/1.0',
+ public readonly string $method = 'GET',
+ public readonly string $uri = 'http://localhost',
+ public readonly array $headers = [],
+ public readonly array $cookies = [],
+ public readonly array $uploads = [],
+ public readonly array $attributes = [],
+ public readonly array $query = [],
+ public readonly string $body = '',
+ public readonly bool $parsed = false,
+ ) {
+ }
- /**
- * @return string
- */
public function getRemoteAddr(): string
{
return (string)($this->attributes['ipAddress'] ?? $this->remoteAddr);
}
/**
- * @return array|null
* @throws \JsonException
*/
public function getParsedBody(): ?array
diff --git a/tests/Feature/StreamResponseTest.php b/tests/Feature/StreamResponseTest.php
new file mode 100644
index 0000000..ca8c370
--- /dev/null
+++ b/tests/Feature/StreamResponseTest.php
@@ -0,0 +1,142 @@
+relay, $this->worker);
+ ServerRunner::stop();
+ parent::tearDown();
+ }
+
+ /**
+ * Regular case
+ */
+ public function testRegularCase(): void
+ {
+ $worker = $this->getWorker();
+ $worker->respond(new Payload('Hello, World!'));
+
+ \usleep(100_000);
+ self::assertSame('Hello, World!', \trim(ServerRunner::getBuffer()));
+ }
+
+ /**
+ * Test stream response with multiple frames
+ */
+ public function testStreamResponseWithMultipleFrames(): void
+ {
+ $httpWorker = $this->makeHttpWorker();
+
+ $chunks = ['Hel', 'lo,', ' Wo', 'rld', '!'];
+ ServerRunner::getBuffer();
+ $httpWorker->respond(
+ 200,
+ (function () use ($chunks) {
+ yield from $chunks;
+ })(),
+ );
+
+ \usleep(100_000);
+ self::assertSame(\implode("\n", $chunks), \trim(ServerRunner::getBuffer()));
+ }
+
+ public function testStopStreamResponse(): void
+ {
+ $httpWorker = $this->makeHttpWorker();
+
+ // Flush buffer
+ ServerRunner::getBuffer();
+
+ $httpWorker->respond(
+ 200,
+ (function () {
+ yield 'Hel';
+ yield 'lo,';
+ $this->sendCommand(new StreamStop());
+ try {
+ yield ' Wo';
+ } catch (\Throwable $e) {
+ return;
+ }
+ yield 'rld';
+ yield '!';
+ })(),
+ );
+
+
+ \usleep(100_000);
+ self::assertSame(\implode("\n", ['Hel', 'lo,']), \trim(ServerRunner::getBuffer()));
+ }
+
+ /**
+ * StopStream should be ignored if stream is already ended.
+ */
+ public function testStopStreamAfterStreamEnd(): void
+ {
+ $httpWorker = $this->makeHttpWorker();
+
+ // Flush buffer
+ ServerRunner::getBuffer();
+
+ $httpWorker->respond(
+ 200,
+ (function () {
+ yield 'Hello';
+ yield 'World!';
+ })(),
+ );
+
+ $this->assertFalse($this->getWorker()->hasPayload(\Spiral\RoadRunner\Message\Command\StreamStop::class));
+ $this->sendCommand(new StreamStop());
+ \usleep(200_000);
+ self::assertSame(\implode("\n", ['Hello', 'World!']), \trim(ServerRunner::getBuffer()));
+ $this->assertTrue($this->getWorker()->hasPayload(\Spiral\RoadRunner\Message\Command\StreamStop::class));
+ $this->assertFalse($this->getWorker()->hasPayload());
+ }
+
+ private function getRelay(): SocketRelay
+ {
+ return $this->relay ??= SocketRelay::create($this->serverAddress);
+ }
+
+ private function getWorker(): Worker
+ {
+ return $this->worker ??= new Worker(relay: $this->getRelay(), interceptSideEffects: false);
+ }
+
+ private function makeHttpWorker(): HttpWorker
+ {
+ return new HttpWorker($this->getWorker());
+ }
+
+ private function sendCommand(BaseCommand $command)
+ {
+ $this->getRelay()->send($command->getRequestFrame());
+ \usleep(500_000);
+ }
+}
diff --git a/tests/Server/Client.php b/tests/Server/Client.php
new file mode 100644
index 0000000..e5dc0c7
--- /dev/null
+++ b/tests/Server/Client.php
@@ -0,0 +1,173 @@
+socket = $socket;
+ \socket_set_nonblock($this->socket);
+ }
+
+ public function __destruct()
+ {
+ \socket_close($this->socket);
+ }
+
+ public static function init(\Socket $socket): self
+ {
+ return new self($socket);
+ }
+
+ public function process(): void
+ {
+ $this->onInit();
+
+ do {
+ $read = [$this->socket];
+ $write = [$this->socket];
+ $except = [$this->socket];
+ if (\socket_select($read, $write, $except, 0, 0) === false) {
+ throw new \RuntimeException('Socket select failed.');
+ }
+
+ if ($read !== []) {
+ $this->readMessage();
+ }
+
+ if ($write !== [] && $this->writeQueue !== []) {
+ $this->writeQueue();
+ }
+
+ Fiber::suspend();
+ } while (true);
+ }
+
+ private function onInit()
+ {
+ $this->writeQueue[] = Frame::packFrame(new Frame('{"pid":true}', [], Frame::CONTROL));
+ }
+
+ private function onFrame(Frame $frame): void
+ {
+ $command = $this->getCommand($frame);
+
+ if ($command === null) {
+ echo \substr($frame->payload, $frame->options[0]) . "\n";
+ return;
+ }
+
+ $this->onCommand($command);
+ }
+
+ private function writeQueue(): void
+ {
+ foreach ($this->writeQueue as $data) {
+ \socket_write($this->socket, $data);
+ }
+ socket_set_nonblock($this->socket);
+
+ $this->writeQueue = [];
+ }
+
+ /**
+ * @see \Spiral\Goridge\SocketRelay::waitFrame()
+ */
+ private function readMessage(): void
+ {
+ $header = $this->readNBytes(12);
+
+ $parts = Frame::readHeader($header);
+ // total payload length
+ $length = $parts[1] * 4 + $parts[2];
+
+ if ($length >= 8 * 1024 * 1024) {
+ throw new \RuntimeException('Frame payload is too large.');
+ }
+ $payload = $this->readNBytes($length);
+
+ $frame = Frame::initFrame($parts, $payload);
+
+ $this->onFrame($frame);
+ }
+
+ /**
+ * @param positive-int $bytes
+ *
+ * @return non-empty-string
+ */
+ private function readNBytes(int $bytes, bool $canBeLess = false): string
+ {
+ while (($left = $bytes - \strlen($this->readBuffer)) > 0) {
+ $data = @\socket_read($this->socket, $left, \PHP_BINARY_READ);
+ if ($data === false) {
+ $errNo = \socket_last_error($this->socket);
+ throw new \RuntimeException('Socket read failed [' . $errNo . ']: ' . \socket_strerror($errNo));
+ }
+
+ if ($canBeLess) {
+ return $data;
+ }
+
+ if ($data === '') {
+ Fiber::suspend();
+ continue;
+ }
+
+ $this->readBuffer .= $data;
+ }
+
+ $result = \substr($this->readBuffer, 0, $bytes);
+ $this->readBuffer = \substr($this->readBuffer, $bytes);
+
+ return $result;
+ }
+
+ private function getCommand(Frame $frame): ?BaseCommand
+ {
+ $payload = $frame->payload;
+ try {
+ $data = \json_decode($payload, true, 3, \JSON_THROW_ON_ERROR);
+ } catch (\JsonException) {
+ return null;
+ }
+
+ return match (false) {
+ \is_array($data),
+ \array_key_exists(BaseCommand::COMMAND_KEY, $data),
+ \is_string($data[BaseCommand::COMMAND_KEY]),
+ \class_exists($data[BaseCommand::COMMAND_KEY]),
+ \is_a($data[BaseCommand::COMMAND_KEY], BaseCommand::class, true) => null,
+ default => new ($data[BaseCommand::COMMAND_KEY])(),
+ };
+ }
+
+ private function onCommand(BaseCommand $command): void
+ {
+ switch ($command::class) {
+ case StreamStop::class:
+ $this->writeQueue[] = $command->getResponse();
+ break;
+ }
+ }
+}
\ No newline at end of file
diff --git a/tests/Server/Command/BaseCommand.php b/tests/Server/Command/BaseCommand.php
new file mode 100644
index 0000000..f36e7fd
--- /dev/null
+++ b/tests/Server/Command/BaseCommand.php
@@ -0,0 +1,29 @@
+frame = new Frame(\json_encode([self::COMMAND_KEY => static::class]));
+ }
+
+ public function getRequestFrame(): Frame
+ {
+ return $this->frame;
+ }
+
+ public function getResponse(): string
+ {
+ return Frame::packFrame($this->getResponseFrame());
+ }
+
+ public abstract function getResponseFrame(): Frame;
+}
\ No newline at end of file
diff --git a/tests/Server/Command/StreamStop.php b/tests/Server/Command/StreamStop.php
new file mode 100644
index 0000000..34a3e56
--- /dev/null
+++ b/tests/Server/Command/StreamStop.php
@@ -0,0 +1,18 @@
+byte10 |= Frame::BYTE10_STOP;
+
+ return $frame;
+ }
+}
diff --git a/tests/Server/Server.php b/tests/Server/Server.php
new file mode 100644
index 0000000..2e1f865
--- /dev/null
+++ b/tests/Server/Server.php
@@ -0,0 +1,69 @@
+socket = \socket_create_listen($port);
+ if ($this->socket === false) {
+ throw new \RuntimeException('Socket create failed.');
+ }
+ \socket_set_nonblock($this->socket);
+
+ echo "Server started\n";
+ }
+
+ public function __destruct()
+ {
+ \socket_close($this->socket);
+ }
+
+ public static function init(int $port = 6002): self
+ {
+ return new self($port);
+ }
+
+ public function process(): void
+ {
+ $client = \socket_accept($this->socket);
+ if ($client !== false) {
+ $key = \array_key_last($this->clients) + 1;
+ try {
+ $this->clients[$key] = Client::init($client);
+ $this->fibers[$key] = new Fiber($this->clients[$key]->process(...));
+ } catch (\Throwable) {
+ unset($this->clients[$key], $this->fibers[$key]);
+ }
+ }
+
+ foreach ($this->fibers as $key => $fiber) {
+ try {
+ $fiber->isStarted() ? $fiber->resume() : $fiber->start();
+
+ if ($fiber->isTerminated()) {
+ throw new RuntimeException('Client terminated.');
+ }
+ } catch (\Throwable) {
+ unset($this->clients[$key], $this->fibers[$key]);
+ }
+ }
+ }
+}
diff --git a/tests/Server/ServerRunner.php b/tests/Server/ServerRunner.php
new file mode 100644
index 0000000..0ba49f8
--- /dev/null
+++ b/tests/Server/ServerRunner.php
@@ -0,0 +1,61 @@
+setTimeout($timeout);
+ self::$process->start(static function (string $type, string $output) use (&$run) {
+ if (!$run && $type === Process::OUT && \str_contains($output, 'Server started')) {
+ $run = true;
+ }
+ if ($type === Process::OUT) {
+ self::$output .= $output;
+ }
+ // echo $output;
+ });
+
+ if (!self::$process->isRunning()) {
+ throw new RuntimeException('Error starting Server: ' . self::$process->getErrorOutput());
+ }
+
+ // wait for roadrunner to start
+ $ticks = $timeout * 10;
+ while (!$run && $ticks > 0) {
+ self::$process->getStatus();
+ \usleep(100000);
+ --$ticks;
+ }
+
+ if (!$run) {
+ throw new RuntimeException('Error starting Server: timeout');
+ }
+ }
+
+ public static function stop(): void
+ {
+ self::$process?->stop(0, 0);
+ }
+
+ public static function getBuffer(): string
+ {
+ self::$process->getStatus();
+ $result = self::$output;
+ self::$output = '';
+ return $result;
+ }
+}
diff --git a/tests/Server/run_server.php b/tests/Server/run_server.php
new file mode 100644
index 0000000..81d2652
--- /dev/null
+++ b/tests/Server/run_server.php
@@ -0,0 +1,14 @@
+process();
+ \usleep(5_000);
+}
diff --git a/tests/Unit/StreamResponseTest.php b/tests/Unit/StreamResponseTest.php
new file mode 100644
index 0000000..e491129
--- /dev/null
+++ b/tests/Unit/StreamResponseTest.php
@@ -0,0 +1,91 @@
+relay, $this->worker);
+ parent::tearDown();
+ }
+
+ /**
+ * Regular case
+ */
+ public function testRegularCase(): void
+ {
+ $worker = $this->getWorker();
+ $this->getRelay()
+ ->addFrame(status: 200, body: 'Hello, World!', headers: ['Content-Type' => 'text/plain'], stream: true);
+
+ self::assertTrue($worker->hasPayload());
+ self::assertInstanceOf(Payload::class, $payload = $worker->waitPayload());
+ self::assertSame('Hello, World!', $payload->body);
+ }
+
+ /**
+ * Test stream response with multiple frames
+ */
+ public function testStreamResponseWithMultipleFrames(): void
+ {
+ $httpWorker = $this->makeHttpWorker();
+
+ $httpWorker->respond(200, (function () {
+ yield 'Hel';
+ yield 'lo,';
+ yield ' Wo';
+ yield 'rld';
+ yield '!';
+ })());
+
+ self::assertFalse($this->worker->hasPayload());
+ self::assertSame('Hello, World!', $this->getRelay()->getReceivedBody());
+ }
+
+ public function testStopStreamResponse(): void
+ {
+ $httpWorker = $this->makeHttpWorker();
+
+ $httpWorker->respond(200, (function () {
+ yield 'Hel';
+ yield 'lo,';
+ $this->getRelay()->addStopStreamFrame();
+ try {
+ yield ' Wo';
+ } catch (\Throwable $e) {
+ return;
+ }
+ yield 'rld';
+ yield '!';
+ })());
+
+ self::assertSame('Hello,', $this->getRelay()->getReceivedBody());
+ }
+
+ private function getRelay(): TestRelay
+ {
+ return $this->relay ??= new TestRelay();
+ }
+
+ private function getWorker(): Worker
+ {
+ return $this->worker ??= new Worker($this->getRelay(), false);
+ }
+
+ private function makeHttpWorker(): HttpWorker
+ {
+ return new HttpWorker($this->getWorker());
+ }
+}
diff --git a/tests/Unit/Stub/TestRelay.php b/tests/Unit/Stub/TestRelay.php
new file mode 100644
index 0000000..4b2cd34
--- /dev/null
+++ b/tests/Unit/Stub/TestRelay.php
@@ -0,0 +1,75 @@
+frames = [...$this->frames, ...\array_values($frames)];
+ return $this;
+ }
+
+ public function addFrame(
+ int $status = 200,
+ string $body = '',
+ array $headers = [],
+ bool $stream = false,
+ bool $stopStream = false,
+ ): self {
+ $head = (string)\json_encode([
+ 'status' => $status,
+ 'headers' => $headers,
+ ], \JSON_THROW_ON_ERROR);
+ $frame = new Frame($head .$body, [\strlen($head)]);
+ $frame->byte10 |= $stream ? Frame::BYTE10_STREAM : 0;
+ $frame->byte10 |= $stopStream ? Frame::BYTE10_STOP : 0;
+ return $this->addFrames($frame);
+ }
+
+ public function addStopStreamFrame(): self
+ {
+ return $this->addFrame(stopStream: true);
+ }
+
+ public function getReceived(): array
+ {
+ return $this->received;
+ }
+
+ public function getReceivedBody(): string
+ {
+ return \implode('', \array_map(static fn (Frame $frame)
+ => \substr($frame->payload, $frame->options[0]), $this->received));
+ }
+
+ public function waitFrame(): Frame
+ {
+ if ($this->frames === []) {
+ throw new \RuntimeException('There are no frames to return.');
+ }
+
+ return \array_shift($this->frames);
+ }
+
+ public function send(Frame $frame): void
+ {
+ $this->received[] = $frame;
+ }
+
+ public function hasFrame(): bool
+ {
+ return $this->frames !== [];
+ }
+}