From 2130248446a129cb57d2a781f38e2057a41fadea Mon Sep 17 00:00:00 2001 From: cclilshy Date: Sat, 2 Nov 2024 22:01:33 +0800 Subject: [PATCH] Update copyright and capture device --- composer.json | 1 + src/Client.php | 231 ++++--------------- src/Client/Capture.php | 12 +- src/Client/Capture/ServerSentEvents.php | 43 +--- src/Client/Connection.php | 255 +++++++++++++++++---- src/Client/ConnectionPool.php | 36 +-- src/Enum/Method.php | 36 +-- src/Enum/Status.php | 36 +-- src/Guzzle.php | 36 +-- src/Guzzle/RippleHandler.php | 36 +-- src/Server.php | 36 +-- src/Server/Chunk.php | 36 +-- src/Server/Connection.php | 36 +-- src/Server/Exception/Exception.php | 36 +-- src/Server/Exception/FormatException.php | 36 +-- src/Server/Exception/TransferException.php | 36 +-- src/Server/Request.php | 36 +-- src/Server/Response.php | 36 +-- src/Server/Upload/MultipartHandler.php | 36 +-- 19 files changed, 369 insertions(+), 677 deletions(-) diff --git a/composer.json b/composer.json index 9db0a74..daf3021 100644 --- a/composer.json +++ b/composer.json @@ -13,6 +13,7 @@ } }, "require": { + "php": ">=8.1", "ext-sockets": "*", "guzzlehttp/guzzle": "*", "symfony/http-foundation": "*" diff --git a/src/Client.php b/src/Client.php index 47d742d..b484ccb 100644 --- a/src/Client.php +++ b/src/Client.php @@ -1,44 +1,21 @@ config['pool'] ?? 'off'; $this->pool = in_array($pool, [true, 1, 'on'], true); - if ($this->pool) { $this->connectionPool = new ConnectionPool(); } @@ -84,12 +54,10 @@ public function __construct(private readonly array $config = []) * * @return Response * @throws \Ripple\Stream\Exception\ConnectionException - * @throws \Ripple\Stream\Exception\RuntimeException */ public function request(RequestInterface $request, array $option = []): Response { $uri = $request->getUri(); - $method = $request->getMethod(); $scheme = $uri->getScheme(); $host = $uri->getHost(); @@ -97,16 +65,6 @@ public function request(RequestInterface $request, array $option = []): Response $port = $scheme === 'https' ? 443 : 80; } - if (!$path = $uri->getPath()) { - $path = '/'; - } - - if ($query = $uri->getQuery()) { - $query = "?{$query}"; - } else { - $query = ''; - } - if (!isset($option['proxy'])) { if ($scheme === 'http' && $httpProxy = getenv('http_proxy')) { $option['proxy'] = $httpProxy; @@ -115,133 +73,40 @@ public function request(RequestInterface $request, array $option = []): Response } } - $connection = $this->pullConnection( - $host, - $port, - $scheme === 'https', - $option['timeout'] ?? 0, - $option['proxy'] ?? null - ); - - $write = fn (string|false $content) => $connection->stream->write($content); - $tick = fn (string|false $content) => $connection->tick($content); - - if ($captureWrite = $option['capture_write'] ?? null) { - $write = fn (string|false $content) => $captureWrite($content, $write); - } - - if ($captureRead = $option['capture_read'] ?? null) { - $tick = fn (string|false $content) => $captureRead($content, $tick); - } - - $suspension = getSuspension(); - $header = "{$method} {$path}{$query} HTTP/1.1\r\n"; - foreach ($request->getHeaders() as $name => $values) { - $header .= "{$name}: " . implode(', ', $values) . "\r\n"; - } - - $write($header); - if ($bodyStream = $request->getBody()) { - if (!$request->getHeader('Content-Length')) { - $size = $bodyStream->getSize(); - $size > 0 && $write("Content-Length: {$bodyStream->getSize()}\r\n"); - } - - if ($bodyStream->getMetadata('uri') === 'php://temp') { - $write("\r\n"); - if ($bodyContent = $bodyStream->getContents()) { - $write($bodyContent); - } - } elseif ($bodyStream instanceof MultipartStream) { - if (!$request->getHeader('Content-Type')) { - $write("Content-Type: multipart/form-data; boundary={$bodyStream->getBoundary()}\r\n"); - } - $write("\r\n"); - try { - while (!$bodyStream->eof()) { - $write($bodyStream->read(8192)); - } - } catch (Throwable) { - $bodyStream->close(); - $connection->stream->close(); - throw new ConnectionException('Invalid body stream'); - } - } else { - throw new ConnectionException('Invalid body stream'); + $capture = $option['capture'] ?? null; + + try { + $connection = $this->pullConnection( + $host, + $port, + $scheme === 'https', + $option['timeout'] ?? 0, + $option['proxy'] ?? null + ); + } catch (Throwable $exception) { + if ($capture instanceof Capture) { + $capture->onFail($exception); } - } else { - $write("\r\n"); + throw $exception; } - /*** Parse response process*/ - if ($timeout = $option['timeout'] ?? null) { - $timeoutOID = delay(static function () use ($connection, $suspension) { - Coroutine::throw( - $suspension, - new ConnectionException('Request timeout', ConnectionException::CONNECTION_TIMEOUT) - ); - }, $timeout); - } - - if ($sink = $option['sink'] ?? null) { - $connection->setOutput(fopen($sink, 'wb')); - } - - while (1) { - try { - $connection->stream->waitForReadable(); - } catch (Throwable $e) { - if (isset($timeoutOID)) { - cancel($timeoutOID); - } - - if ($sink && is_resource($sink)) { - fclose($sink); - } - - $connection->stream->close(); - throw new ConnectionException( - 'Connection closed by peer', - ConnectionException::CONNECTION_CLOSED, - null, - $connection->stream, - true - ); - } - - $content = $connection->stream->readContinuously(8192); - if ($content === '') { - if (!$connection->stream->eof()) { - continue; - } - $response = $tick(false); - } else { - $response = $tick($content); - } - - if ($response) { - $k = implode(', ', $response->getHeader('Connection')); - if (str_contains(strtolower($k), 'keep-alive') && $this->pool) { - /*** Push into connection pool*/ - $this->pushConnection( - $connection, - ConnectionPool::generateConnectionKey($host, $port) - ); - $connection->stream->cancelReadable(); - } else { - $connection->stream->close(); - } - - if (isset($timeoutOID)) { - cancel($timeoutOID); - } - - if ($sink && is_resource($sink)) { - fclose($sink); - } - return $response; + try { + $response = $connection->request($request, $option); + } catch (Throwable $exception) { + if ($capture instanceof Capture) { + $capture->onError($exception); } + throw $exception; + } + $keepAlive = implode(', ', $response->getHeader('Connection')); + if (str_contains(strtolower($keepAlive), 'keep-alive') && $this->pool) { + /*** Push into connection pool*/ + $this->connectionPool?->pushConnection($connection, ConnectionPool::generateConnectionKey($host, $port)); + $connection->stream->cancelReadable(); + } else { + $connection->stream->close(); } + return $response; } /** @@ -259,6 +124,7 @@ private function pullConnection(string $host, int $port, bool $ssl, int $timeout if ($tunnel && in_array($host, ['127.0.0.1', 'localhost', '::1'], true)) { $tunnel = null; } + if ($this->pool) { $connection = $this->connectionPool->pullConnection($host, $port, $ssl, $timeout, $tunnel); } else { @@ -308,25 +174,12 @@ private function pullConnection(string $host, int $port, bool $ssl, int $timeout return $connection; } - /** - * @param Connection $connection - * @param string $key - * - * @return void - */ - private function pushConnection(Connection $connection, string $key): void - { - if ($this->pool) { - $this->connectionPool->pushConnection($connection, $key); - } - } - /** * @Author cclilshy * @Date 2024/8/31 14:32 - * @return ConnectionPool + * @return ConnectionPool|null */ - public function getConnectionPool(): ConnectionPool + public function getConnectionPool(): ConnectionPool|null { return $this->connectionPool; } diff --git a/src/Client/Capture.php b/src/Client/Capture.php index c4a8083..0c39239 100644 --- a/src/Client/Capture.php +++ b/src/Client/Capture.php @@ -1,4 +1,14 @@ waitGroup = new WaitGroup(); $this->reset(); } @@ -130,6 +125,7 @@ private function reset(): void $this->chunk = false; $this->chunkLength = 0; $this->chunkStep = 0; + $this->capture = null; } /** @@ -141,18 +137,36 @@ private function reset(): void public function tick(string|false $content): ResponseInterface|null { if ($content === false) { - if (!$this->headers) { - throw new RuntimeException('Response header is required'); - } elseif (isset($this->headers['CONTENT-LENGTH'])) { - throw new RuntimeException('Response content length is required'); - } elseif ($this->chunk) { - throw new RuntimeException('Response chunked is required'); - } else { - $this->step = 2; - } + return $this->tickClose(); } - $this->buffer .= $content; + return $this->process(); + } + + /** + * @return ResponseInterface|null + * @throws \Ripple\Stream\Exception\RuntimeException + */ + public function tickClose(): ResponseInterface|null + { + if (!$this->headers) { + throw new RuntimeException('Response header is required'); + } elseif (isset($this->headers['CONTENT-LENGTH'])) { + throw new RuntimeException('Response content length is required'); + } elseif ($this->chunk) { + throw new RuntimeException('Response chunked is required'); + } else { + $this->step = 2; + } + return $this->process(); + } + + /** + * @return \Psr\Http\Message\ResponseInterface|null + * @throws \Ripple\Stream\Exception\RuntimeException + */ + public function process(): ResponseInterface|null + { if ($this->step === 0) { if ($headerEnd = strpos($this->buffer, "\r\n\r\n")) { $buffer = $this->freeBuffer(); @@ -172,9 +186,7 @@ public function tick(string|false $content): ResponseInterface|null $this->statusCode = intval($base[1]); $this->statusMessage = $base[2]; - /** - * Parse header - */ + /*** Parse header*/ while ($line = strtok("\r\n")) { $lineParam = explode(': ', $line, 2); if (count($lineParam) >= 2) { @@ -197,6 +209,8 @@ public function tick(string|false $content): ResponseInterface|null } } } + + $this->capture?->processHeader($this->headers); } } @@ -245,21 +259,17 @@ public function tick(string|false $content): ResponseInterface|null $this->chunkStep = 0; } } while ($this->step !== 2); - $this->buffer = $buffer; } if ($this->step === 2) { - $response = new Response( + return new Response( $this->statusCode, $this->headers, $this->content, $this->versionString, $this->statusMessage, ); - $this->reset(); - - return $response; } return null; } @@ -289,6 +299,8 @@ private function output(string $content): void } else { $this->content .= $content; } + + $this->capture?->processContent($content); } /** @@ -300,4 +312,151 @@ public function setOutput(mixed $resource): void { $this->output = $resource; } + + /** + * @param \Psr\Http\Message\RequestInterface $request + * @param array $option + * + * @return \GuzzleHttp\Psr7\Response + * @throws \Ripple\Stream\Exception\ConnectionException + * @throws \Ripple\Stream\Exception\RuntimeException + */ + public function request(RequestInterface $request, array $option = []): Response + { + $this->waitGroup->wait(); + $this->waitGroup->add(); + try { + return $this->queue($request, $option); + } finally { + $this->reset(); + $this->waitGroup->done(); + } + } + + /** + * @param \Psr\Http\Message\RequestInterface $request + * @param array $option + * + * @return \GuzzleHttp\Psr7\Response + * @throws \Ripple\Stream\Exception\ConnectionException + * @throws \Ripple\Stream\Exception\RuntimeException + */ + private function queue(RequestInterface $request, array $option = []): Response + { + $uri = $request->getUri(); + $method = $request->getMethod(); + + if (!$path = $uri->getPath()) { + $path = '/'; + } + + if ($query = $uri->getQuery()) { + $query = "?{$query}"; + } else { + $query = ''; + } + + $this->capture = $capture = $option['capture'] ?? null; + if (!$capture instanceof Capture) { + $capture = null; + } + + $suspension = getSuspension(); + $header = "{$method} {$path}{$query} HTTP/1.1\r\n"; + foreach ($request->getHeaders() as $name => $values) { + $header .= "{$name}: " . implode(', ', $values) . "\r\n"; + } + + $this->stream->write($header); + if ($bodyStream = $request->getBody()) { + if (!$request->getHeader('Content-Length')) { + $size = $bodyStream->getSize(); + $size > 0 && $this->stream->write("Content-Length: {$bodyStream->getSize()}\r\n"); + } + + if ($bodyStream->getMetadata('uri') === 'php://temp') { + $this->stream->write("\r\n"); + if ($bodyContent = $bodyStream->getContents()) { + $this->stream->write($bodyContent); + } + } elseif ($bodyStream instanceof MultipartStream) { + if (!$request->getHeader('Content-Type')) { + $this->stream->write("Content-Type: multipart/form-data; boundary={$bodyStream->getBoundary()}\r\n"); + } + $this->stream->write("\r\n"); + try { + while (!$bodyStream->eof()) { + $this->stream->write($bodyStream->read(8192)); + } + } catch (Throwable) { + $bodyStream->close(); + $this->stream->close(); + throw new ConnectionException('Invalid body stream'); + } + } else { + throw new ConnectionException('Invalid body stream'); + } + } else { + $this->stream->write("\r\n"); + } + + /*** Parse response process*/ + if ($timeout = $option['timeout'] ?? null) { + $timeoutOID = delay(static function () use ($suspension) { + Coroutine::throw( + $suspension, + new ConnectionException('Request timeout', ConnectionException::CONNECTION_TIMEOUT) + ); + }, $timeout); + } + + if ($sink = $option['sink'] ?? null) { + $this->setOutput(fopen($sink, 'wb')); + } + + while (1) { + try { + $this->stream->waitForReadable(); + } catch (Throwable $e) { + if (isset($timeoutOID)) { + cancel($timeoutOID); + } + + if ($sink && is_resource($sink)) { + fclose($sink); + } + + $this->stream->close(); + throw new ConnectionException( + 'Connection closed by peer', + ConnectionException::CONNECTION_CLOSED, + null, + $this->stream, + true + ); + } + + $content = $this->stream->readContinuously(8192); + if ($content === '') { + if (!$this->stream->eof()) { + continue; + } + $response = $this->tickClose(); + } else { + $response = $this->tick($content); + } + if ($response) { + if (isset($timeoutOID)) { + cancel($timeoutOID); + } + + if ($sink && is_resource($sink)) { + fclose($sink); + } + + $this->capture?->onComplete($response); + return $response; + } + } + } } diff --git a/src/Client/ConnectionPool.php b/src/Client/ConnectionPool.php index 2a515c1..1246a55 100644 --- a/src/Client/ConnectionPool.php +++ b/src/Client/ConnectionPool.php @@ -1,35 +1,13 @@