From 14620d5301f8f41f96d3c554e69bf3300d9932dc Mon Sep 17 00:00:00 2001 From: cclilshy Date: Fri, 1 Nov 2024 03:11:47 +0800 Subject: [PATCH] Update: Suppress unnecessary exceptions --- src/Client.php | 255 ++++++++++++++++++-------------------- src/Client/Connection.php | 4 +- tests/HttpTest.php | 7 +- 3 files changed, 128 insertions(+), 138 deletions(-) diff --git a/src/Client.php b/src/Client.php index f51f2d8..7cb9967 100644 --- a/src/Client.php +++ b/src/Client.php @@ -34,16 +34,13 @@ namespace Ripple\Http; -use Closure; use Co\IO; use GuzzleHttp\Psr7\MultipartStream; use GuzzleHttp\Psr7\Response; -use InvalidArgumentException; use Psr\Http\Message\RequestInterface; +use Ripple\Coroutine; use Ripple\Http\Client\Connection; use Ripple\Http\Client\ConnectionPool; -use Ripple\Promise; -use Ripple\Socket\SocketStream; use Ripple\Socket\Tunnel\Http; use Ripple\Socket\Tunnel\Socks5; use Ripple\Stream\Exception\ConnectionException; @@ -51,7 +48,7 @@ use function Co\cancel; use function Co\delay; -use function Co\repeat; +use function Co\getSuspension; use function fclose; use function fopen; use function getenv; @@ -90,163 +87,146 @@ public function __construct(private readonly array $config = []) */ public function request(RequestInterface $request, array $option = []): Response { - return \Co\promise(function (Closure $resolve, Closure $reject, Promise $promise) use ($request, $option) { - $uri = $request->getUri(); - $method = $request->getMethod(); - $scheme = $uri->getScheme(); - $host = $uri->getHost(); + $uri = $request->getUri(); + $method = $request->getMethod(); + $scheme = $uri->getScheme(); + $host = $uri->getHost(); - if (!$port = $uri->getPort()) { - $port = $scheme === 'https' ? 443 : 80; - } + if (!$port = $uri->getPort()) { + $port = $scheme === 'https' ? 443 : 80; + } - if (!$path = $uri->getPath()) { - $path = '/'; - } + if (!$path = $uri->getPath()) { + $path = '/'; + } - if ($query = $uri->getQuery()) { - $query = "?{$query}"; - } else { - $query = ''; - } + if ($query = $uri->getQuery()) { + $query = "?{$query}"; + } else { + $query = ''; + } - if (!isset($option['proxy'])) { - if ($scheme === 'http' && $httpProxy = getenv('http_proxy')) { - $option['proxy'] = $httpProxy; - } elseif ($scheme === 'https' && $httpsProxy = getenv('https_proxy')) { - $option['proxy'] = $httpsProxy; - } + if (!isset($option['proxy'])) { + if ($scheme === 'http' && $httpProxy = getenv('http_proxy')) { + $option['proxy'] = $httpProxy; + } elseif ($scheme === 'https' && $httpsProxy = getenv('https_proxy')) { + $option['proxy'] = $httpsProxy; } + } - $connection = $this->pullConnection( - $host, - $port, - $scheme === 'https', - $option['timeout'] ?? 0, - $option['proxy'] ?? null - ); + $connection = $this->pullConnection( + $host, + $port, + $scheme === 'https', + $option['timeout'] ?? 0, + $option['proxy'] ?? null + ); - $writeHandler = fn (string|false $content) => $connection->stream->write($content); - $tickHandler = fn (string|false $content) => $connection->tick($content); + $write = fn (string|false $content) => $connection->stream->write($content); + $tick = fn (string|false $content) => $connection->tick($content); - if ($captureWrite = $option['capture_write'] ?? null) { - $writeHandler = fn (string|false $content) => $captureWrite($content, $writeHandler); - } + if ($captureWrite = $option['capture_write'] ?? null) { + $write = fn (string|false $content) => $captureWrite($content, $write); + } - if ($captureRead = $option['capture_read'] ?? null) { - $tickHandler = fn (string|false $content) => $captureRead($content, $tickHandler); - } + if ($captureRead = $option['capture_read'] ?? null) { + $tick = fn (string|false $content) => $captureRead($content, $tick); + } + + $suspension = getSuspension(); + $header = "{$method} {$path}{$query} HTTP/1.1\r\n"; + foreach ($request->getHeaders() as $name => $values) { + $header .= "{$name}: " . implode(', ', $values) . "\r\n"; + } - $header = "{$method} {$path}{$query} HTTP/1.1\r\n"; - foreach ($request->getHeaders() as $name => $values) { - $header .= "{$name}: " . implode(', ', $values) . "\r\n"; + $write($header); + if ($bodyStream = $request->getBody()) { + if (!$request->getHeader('Content-Length')) { + $size = $bodyStream->getSize(); + $size > 0 && $write("Content-Length: {$bodyStream->getSize()}\r\n"); } - $writeHandler($header); - if ($bodyStream = $request->getBody()) { - if (!$request->getHeader('Content-Length')) { - $size = $bodyStream->getSize(); - $size > 0 && $writeHandler("Content-Length: {$bodyStream->getSize()}\r\n"); + if ($bodyStream->getMetadata('uri') === 'php://temp') { + $write("\r\n"); + if ($bodyContent = $bodyStream->getContents()) { + $write($bodyContent); } - - if ($bodyStream->getMetadata('uri') === 'php://temp') { - $writeHandler("\r\n"); - if ($bodyContent = $bodyStream->getContents()) { - $writeHandler($bodyContent); - } - } elseif ($bodyStream instanceof MultipartStream) { - if (!$request->getHeader('Content-Type')) { - $writeHandler("Content-Type: multipart/form-data; boundary={$bodyStream->getBoundary()}\r\n"); + } elseif ($bodyStream instanceof MultipartStream) { + if (!$request->getHeader('Content-Type')) { + $write("Content-Type: multipart/form-data; boundary={$bodyStream->getBoundary()}\r\n"); + } + $write("\r\n"); + try { + while (!$bodyStream->eof()) { + $write($bodyStream->read(8192)); } - $writeHandler("\r\n"); - repeat(static function (Closure $cancel) use ($connection, $bodyStream, $resolve, $reject, $writeHandler) { - try { - $content = ''; - while ($buffer = $bodyStream->read(8192)) { - $content .= $buffer; - } - - if ($content) { - $writeHandler($content); - } else { - $cancel(); - $bodyStream->close(); - } - } catch (Throwable) { - $cancel(); - $bodyStream->close(); - $reject(new InvalidArgumentException('Invalid body stream')); - } - }, 0.1); - } else { - throw new InvalidArgumentException('Invalid body stream'); + } catch (Throwable) { + $bodyStream->close(); + $connection->stream->close(); + throw new ConnectionException('Invalid body stream'); } } else { - $writeHandler("\r\n"); + throw new ConnectionException('Invalid body stream'); } + } else { + $write("\r\n"); + } - if ($timeout = $option['timeout'] ?? null) { - $delayEventId = delay(static function () use ($connection, $reject) { - $connection->stream->close(); - $reject(new ConnectionException('Request timeout', ConnectionException::CONNECTION_TIMEOUT)); - }, $timeout); + /*** Parse response process*/ + if ($timeout = $option['timeout'] ?? null) { + $timeoutOID = delay(static function () use ($connection, $suspension) { + Coroutine::throw( + $suspension, + new ConnectionException('Request timeout', ConnectionException::CONNECTION_TIMEOUT) + ); + }, $timeout); + } - $promise->finally(static function () use ($delayEventId) { - cancel($delayEventId); - }); - } + if ($sink = $option['sink'] ?? null) { + $connection->setOutput(fopen($sink, 'wb')); + } - if ($sink = $option['sink'] ?? null) { - $connection->setOutput($sinkFile = fopen($sink, 'wb')); - $promise->finally(static function () use ($sinkFile) { - if (is_resource($sinkFile)) { - fclose($sinkFile); + while (1) { + try { + $connection->stream->waitForReadable(); + $content = $connection->stream->readContinuously(8192); + if ($content === '') { + if (!$connection->stream->eof()) { + continue; } - }); - } - - /*** Parse response process*/ - $connection->stream->onReadable(function (SocketStream $socketStream, Closure $cancel) use ( - $host, - $port, - $connection, - $scheme, - $resolve, - $reject, - $tickHandler - ) { - try { - $content = $socketStream->readContinuously(8192); + $response = $tick(false); + } else { + $response = $tick($content); + } - if ($content === '') { - if (!$socketStream->eof()) { - return; - } - $response = $tickHandler(false); + if ($response) { + $k = implode(', ', $response->getHeader('Connection')); + if (str_contains(strtolower($k), 'keep-alive') && $this->pool) { + /*** Push into connection pool*/ + $this->pushConnection( + $connection, + ConnectionPool::generateConnectionKey($host, $port) + ); + $connection->stream->cancelReadable(); } else { - $response = $tickHandler($content); + $connection->stream->close(); } + return ($response); + } + } catch (Throwable $exception) { + $connection->stream->close(); + throw $exception; + } finally { + if (isset($timeoutOID)) { + cancel($timeoutOID); + } - if ($response) { - $k = implode(', ', $response->getHeader('Connection')); - if (str_contains(strtolower($k), 'keep-alive') && $this->pool) { - /*** Push into connection pool*/ - $this->pushConnection( - $connection, - ConnectionPool::generateConnectionKey($host, $port) - ); - $cancel(); - } else { - $socketStream->close(); - } - $resolve($response); - } - } catch (Throwable $exception) { - $socketStream->close(); - $reject($exception); + if ($sink && is_resource($sink)) { + fclose($sink); } - }); - })->await(); + } + } + } /** @@ -262,6 +242,9 @@ public function request(RequestInterface $request, array $option = []): Response */ private function pullConnection(string $host, int $port, bool $ssl, int $timeout = 0, string|null $tunnel = null): Connection { + if ($tunnel && in_array($host, ['127.0.0.1', 'localhost', '::1'], true)) { + $tunnel = null; + } if ($this->pool) { $connection = $this->connectionPool->pullConnection($host, $port, $ssl, $timeout, $tunnel); } else { diff --git a/src/Client/Connection.php b/src/Client/Connection.php index 3165ef5..8fffe4e 100644 --- a/src/Client/Connection.php +++ b/src/Client/Connection.php @@ -141,7 +141,9 @@ private function reset(): void public function tick(string|false $content): ResponseInterface|null { if ($content === false) { - if (isset($this->headers['CONTENT-LENGTH'])) { + if (!$this->headers) { + throw new RuntimeException('Response header is required'); + } elseif (isset($this->headers['CONTENT-LENGTH'])) { throw new RuntimeException('Response content length is required'); } elseif ($this->chunk) { throw new RuntimeException('Response chunked is required'); diff --git a/tests/HttpTest.php b/tests/HttpTest.php index 5b6f5fc..b778dba 100644 --- a/tests/HttpTest.php +++ b/tests/HttpTest.php @@ -207,9 +207,11 @@ private function httpClient(): void 'https://business.oceanengine.com/login', 'https://www.laruence.com/', 'https://www.php.net/', + 'https://www.abc.net/', + 'https://www.491e5d73fbeb64e8e7d66b25cb3d1823.net/', + 'http://www.491e5d73fbeb64e8e7d66b25cb3d1823.net/', ]; - $list = []; foreach ($urls as $url) { $list[] = async(function () use ($url) { @@ -218,6 +220,9 @@ private function httpClient(): void } catch (Throwable $exception) { return [$url, $exception]; } + })->except(function () { + \var_dump(1); + die; }); }