From a0407bfedbd09375d7f0aac45dc8db12ab71ed8d Mon Sep 17 00:00:00 2001 From: cclilshy Date: Wed, 13 Nov 2024 22:38:50 +0800 Subject: [PATCH] Pre-release: updated copyrighted content --- .php-cs-fixer.php | 1 + src/Client.php | 24 +- src/Client/Capture/ServerSentEvents.php | 28 +-- src/Client/Connection.php | 322 ++++++++++++------------ src/Client/ConnectionPool.php | 27 +- src/Guzzle.php | 30 +-- src/Guzzle/RippleHandler.php | 2 +- src/Server.php | 15 +- src/Server/Connection.php | 8 +- src/Server/Request.php | 34 +-- src/Server/Response.php | 28 +-- 11 files changed, 257 insertions(+), 262 deletions(-) diff --git a/.php-cs-fixer.php b/.php-cs-fixer.php index 9a68f87..73520a5 100644 --- a/.php-cs-fixer.php +++ b/.php-cs-fixer.php @@ -1,4 +1,5 @@ getSocketStream(); - $ssl && IO::Socket()->enableSSL($tunnelSocket, $timeout); + $tunnelSocket = Socks5::connect("tcp://{$parse['host']}:{$parse['port']}", $payload)->getSocket(); + $ssl && $tunnelSocket->enableSSL(); $connection = new Connection($tunnelSocket); break; case 'http': - $tunnelSocket = Http::connect("tcp://{$parse['host']}:{$parse['port']}", $payload)->getSocketStream(); - $ssl && IO::Socket()->enableSSL($tunnelSocket, $timeout); + $tunnelSocket = Http::connect("tcp://{$parse['host']}:{$parse['port']}", $payload)->getSocket(); + $ssl && $tunnelSocket->enableSSL(); $connection = new Connection($tunnelSocket); break; case 'https': - $tunnel = IO::Socket()->connectWithSSL("tcp://{$parse['host']}:{$parse['port']}", $timeout); - $tunnelSocket = Http::connect($tunnel, $payload)->getSocketStream(); - $ssl && IO::Socket()->enableSSL($tunnelSocket, $timeout); + $tunnel = Socket::connectWithSSL("tcp://{$parse['host']}:{$parse['port']}", $timeout); + $tunnelSocket = Http::connect($tunnel, $payload)->getSocket(); + $ssl && $tunnelSocket->enableSSL(); $connection = new Connection($tunnelSocket); break; default: @@ -165,8 +165,8 @@ private function pullConnection(string $host, int $port, bool $ssl, int $timeout } } else { $connection = $ssl - ? new Connection(IO::Socket()->connectWithSSL("ssl://{$host}:{$port}", $timeout)) - : new Connection(IO::Socket()->connect("tcp://{$host}:{$port}", $timeout)); + ? new Connection(Socket::connectWithSSL("ssl://{$host}:{$port}", $timeout)) + : new Connection(Socket::connect("tcp://{$host}:{$port}", $timeout)); } } diff --git a/src/Client/Capture/ServerSentEvents.php b/src/Client/Capture/ServerSentEvents.php index c1b8487..1a2ea67 100644 --- a/src/Client/Capture/ServerSentEvents.php +++ b/src/Client/Capture/ServerSentEvents.php @@ -16,7 +16,7 @@ use Exception; use GuzzleHttp\Psr7\Response; use Iterator; -use Ripple\Coroutine; +use Ripple\Coroutine\Coroutine; use Ripple\Http\Client\Capture; use Throwable; @@ -35,9 +35,14 @@ */ class ServerSentEvents extends Capture { + /*** @var \Closure|null */ + public Closure|null $onEvent = null; + /*** @var \Closure|null */ + public Closure|null $onComplete = null; + /*** @var array */ + protected array $iterators = []; /*** @var string */ private string $status = 'pending'; - /*** @var string */ private string $buffer = ''; @@ -138,15 +143,6 @@ public function processContent(string $content): void } } - /*** @return string */ - public function getStatus(): string - { - return $this->status; - } - - /*** @var array */ - protected array $iterators = []; - /** * @return iterable */ @@ -238,9 +234,9 @@ public function rewind(): void }; } - /*** @var \Closure|null */ - public Closure|null $onEvent = null; - - /*** @var \Closure|null */ - public Closure|null $onComplete = null; + /*** @return string */ + public function getStatus(): string + { + return $this->status; + } } diff --git a/src/Client/Connection.php b/src/Client/Connection.php index 396c97a..ece39ff 100644 --- a/src/Client/Connection.php +++ b/src/Client/Connection.php @@ -16,9 +16,9 @@ use GuzzleHttp\Psr7\Response; use Psr\Http\Message\RequestInterface; use Psr\Http\Message\ResponseInterface; -use Ripple\Coroutine; +use Ripple\Coroutine\Coroutine; use Ripple\Coroutine\WaitGroup; -use Ripple\Socket\SocketStream; +use Ripple\Socket; use Ripple\Stream\Exception\ConnectionException; use Ripple\Stream\Exception\RuntimeException; use Throwable; @@ -93,9 +93,9 @@ class Connection private WaitGroup $waitGroup; /** - * @param SocketStream $stream + * @param Socket $stream */ - public function __construct(public SocketStream $stream) + public function __construct(public Socket $stream) { $this->waitGroup = new WaitGroup(); $this->reset(); @@ -128,6 +128,163 @@ private function reset(): void $this->capture = null; } + /** + * @param \Psr\Http\Message\RequestInterface $request + * @param array $option + * + * @return \GuzzleHttp\Psr7\Response + * @throws \Ripple\Stream\Exception\ConnectionException + * @throws \Ripple\Stream\Exception\RuntimeException + */ + public function request(RequestInterface $request, array $option = []): Response + { + $this->waitGroup->wait(); + $this->waitGroup->add(); + try { + return $this->queue($request, $option); + } finally { + $this->reset(); + $this->waitGroup->done(); + } + } + + /** + * @param \Psr\Http\Message\RequestInterface $request + * @param array $option + * + * @return \GuzzleHttp\Psr7\Response + * @throws \Ripple\Stream\Exception\ConnectionException + * @throws \Ripple\Stream\Exception\RuntimeException + */ + private function queue(RequestInterface $request, array $option = []): Response + { + $uri = $request->getUri(); + $method = $request->getMethod(); + + if (!$path = $uri->getPath()) { + $path = '/'; + } + + if ($query = $uri->getQuery()) { + $query = "?{$query}"; + } else { + $query = ''; + } + + $this->capture = $capture = $option['capture'] ?? null; + if (!$capture instanceof Capture) { + $capture = null; + } + + $suspension = getSuspension(); + $header = "{$method} {$path}{$query} HTTP/1.1\r\n"; + foreach ($request->getHeaders() as $name => $values) { + $header .= "{$name}: " . implode(', ', $values) . "\r\n"; + } + + $this->stream->write($header); + if ($bodyStream = $request->getBody()) { + if (!$request->getHeader('Content-Length')) { + $size = $bodyStream->getSize(); + $size > 0 && $this->stream->write("Content-Length: {$bodyStream->getSize()}\r\n"); + } + + if ($bodyStream->getMetadata('uri') === 'php://temp') { + $this->stream->write("\r\n"); + if ($bodyContent = $bodyStream->getContents()) { + $this->stream->write($bodyContent); + } + } elseif ($bodyStream instanceof MultipartStream) { + if (!$request->getHeader('Content-Type')) { + $this->stream->write("Content-Type: multipart/form-data; boundary={$bodyStream->getBoundary()}\r\n"); + } + $this->stream->write("\r\n"); + try { + while (!$bodyStream->eof()) { + $this->stream->write($bodyStream->read(8192)); + } + } catch (Throwable) { + $bodyStream->close(); + $this->stream->close(); + throw new ConnectionException('Invalid body stream'); + } + } else { + throw new ConnectionException('Invalid body stream'); + } + } else { + $this->stream->write("\r\n"); + } + + /*** Parse response process*/ + if ($timeout = $option['timeout'] ?? null) { + $timeoutOID = delay(static function () use ($suspension) { + Coroutine::throw( + $suspension, + new ConnectionException('Request timeout', ConnectionException::CONNECTION_TIMEOUT) + ); + }, $timeout); + } + + if ($sink = $option['sink'] ?? null) { + $this->setOutput(fopen($sink, 'wb')); + } + + while (1) { + try { + $this->stream->waitForReadable(); + } catch (Throwable $e) { + if (isset($timeoutOID)) { + cancel($timeoutOID); + } + + if ($sink && is_resource($sink)) { + fclose($sink); + } + + $this->stream->close(); + throw new ConnectionException( + 'Connection closed by peer', + ConnectionException::CONNECTION_CLOSED, + null, + $this->stream, + true + ); + } + + $content = $this->stream->readContinuously(8192); + if ($content === '') { + if (!$this->stream->eof()) { + continue; + } + $response = $this->tickClose(); + } else { + $response = $this->tick($content); + } + if ($response) { + if (isset($timeoutOID)) { + cancel($timeoutOID); + } + + if ($sink && is_resource($sink)) { + fclose($sink); + } + + $this->capture?->onComplete($response); + return $response; + } + } + } + + /** + * @param mixed $resource + * + * @return void + */ + public function setOutput(mixed $resource): void + { + $this->output = $resource; + } + /** * @param string|false $content * @@ -302,161 +459,4 @@ private function output(string $content): void $this->capture?->processContent($content); } - - /** - * @param mixed $resource - * - * @return void - */ - public function setOutput(mixed $resource): void - { - $this->output = $resource; - } - - /** - * @param \Psr\Http\Message\RequestInterface $request - * @param array $option - * - * @return \GuzzleHttp\Psr7\Response - * @throws \Ripple\Stream\Exception\ConnectionException - * @throws \Ripple\Stream\Exception\RuntimeException - */ - public function request(RequestInterface $request, array $option = []): Response - { - $this->waitGroup->wait(); - $this->waitGroup->add(); - try { - return $this->queue($request, $option); - } finally { - $this->reset(); - $this->waitGroup->done(); - } - } - - /** - * @param \Psr\Http\Message\RequestInterface $request - * @param array $option - * - * @return \GuzzleHttp\Psr7\Response - * @throws \Ripple\Stream\Exception\ConnectionException - * @throws \Ripple\Stream\Exception\RuntimeException - */ - private function queue(RequestInterface $request, array $option = []): Response - { - $uri = $request->getUri(); - $method = $request->getMethod(); - - if (!$path = $uri->getPath()) { - $path = '/'; - } - - if ($query = $uri->getQuery()) { - $query = "?{$query}"; - } else { - $query = ''; - } - - $this->capture = $capture = $option['capture'] ?? null; - if (!$capture instanceof Capture) { - $capture = null; - } - - $suspension = getSuspension(); - $header = "{$method} {$path}{$query} HTTP/1.1\r\n"; - foreach ($request->getHeaders() as $name => $values) { - $header .= "{$name}: " . implode(', ', $values) . "\r\n"; - } - - $this->stream->write($header); - if ($bodyStream = $request->getBody()) { - if (!$request->getHeader('Content-Length')) { - $size = $bodyStream->getSize(); - $size > 0 && $this->stream->write("Content-Length: {$bodyStream->getSize()}\r\n"); - } - - if ($bodyStream->getMetadata('uri') === 'php://temp') { - $this->stream->write("\r\n"); - if ($bodyContent = $bodyStream->getContents()) { - $this->stream->write($bodyContent); - } - } elseif ($bodyStream instanceof MultipartStream) { - if (!$request->getHeader('Content-Type')) { - $this->stream->write("Content-Type: multipart/form-data; boundary={$bodyStream->getBoundary()}\r\n"); - } - $this->stream->write("\r\n"); - try { - while (!$bodyStream->eof()) { - $this->stream->write($bodyStream->read(8192)); - } - } catch (Throwable) { - $bodyStream->close(); - $this->stream->close(); - throw new ConnectionException('Invalid body stream'); - } - } else { - throw new ConnectionException('Invalid body stream'); - } - } else { - $this->stream->write("\r\n"); - } - - /*** Parse response process*/ - if ($timeout = $option['timeout'] ?? null) { - $timeoutOID = delay(static function () use ($suspension) { - Coroutine::throw( - $suspension, - new ConnectionException('Request timeout', ConnectionException::CONNECTION_TIMEOUT) - ); - }, $timeout); - } - - if ($sink = $option['sink'] ?? null) { - $this->setOutput(fopen($sink, 'wb')); - } - - while (1) { - try { - $this->stream->waitForReadable(); - } catch (Throwable $e) { - if (isset($timeoutOID)) { - cancel($timeoutOID); - } - - if ($sink && is_resource($sink)) { - fclose($sink); - } - - $this->stream->close(); - throw new ConnectionException( - 'Connection closed by peer', - ConnectionException::CONNECTION_CLOSED, - null, - $this->stream, - true - ); - } - - $content = $this->stream->readContinuously(8192); - if ($content === '') { - if (!$this->stream->eof()) { - continue; - } - $response = $this->tickClose(); - } else { - $response = $this->tick($content); - } - if ($response) { - if (isset($timeoutOID)) { - cancel($timeoutOID); - } - - if ($sink && is_resource($sink)) { - fclose($sink); - } - - $this->capture?->onComplete($response); - return $response; - } - } - } } diff --git a/src/Client/ConnectionPool.php b/src/Client/ConnectionPool.php index 1246a55..0cb0ba6 100644 --- a/src/Client/ConnectionPool.php +++ b/src/Client/ConnectionPool.php @@ -12,11 +12,10 @@ namespace Ripple\Http\Client; -use Co\IO; -use Ripple\Socket\SocketStream; -use Ripple\Socket\Tunnel\Http; -use Ripple\Socket\Tunnel\Socks5; +use Ripple\Socket; use Ripple\Stream\Exception\ConnectionException; +use Ripple\Tunnel\Http; +use Ripple\Tunnel\Socks5; use Throwable; use function array_pop; @@ -163,13 +162,13 @@ private function createConnection(string $host, int $port, bool $ssl, int|float $payload['username'] = $parse['user']; $payload['password'] = $parse['pass']; } - $proxySocketStream = $this->createProxySocketStream($parse, $payload); - $ssl && IO::Socket()->enableSSL($proxySocketStream, $timeout); - return new Connection($proxySocketStream); + $proxySocket = $this->createProxySocket($parse, $payload); + $ssl && $proxySocket->enableSSL(); + return new Connection($proxySocket); } - $stream = IO::Socket()->connect("tcp://{$host}:{$port}", $timeout); - $ssl && IO::Socket()->enableSSL($stream, $timeout); + $stream = Socket::connect("tcp://{$host}:{$port}", $timeout); + $ssl && $stream->enableSSL(); return new Connection($stream); } @@ -180,14 +179,14 @@ private function createConnection(string $host, int $port, bool $ssl, int|float * @param array $parse * @param array $payload * - * @return SocketStream + * @return Socket * @throws ConnectionException */ - private function createProxySocketStream(array $parse, array $payload): SocketStream + private function createProxySocket(array $parse, array $payload): Socket { return match ($parse['scheme']) { - 'socks', 'socks5' => Socks5::connect("tcp://{$parse['host']}:{$parse['port']}", $payload)->getSocketStream(), - 'http', 'https' => Http::connect("tcp://{$parse['host']}:{$parse['port']}", $payload)->getSocketStream(), + 'socks', 'socks5' => Socks5::connect("tcp://{$parse['host']}:{$parse['port']}", $payload)->getSocket(), + 'http', 'https' => Http::connect("tcp://{$parse['host']}:{$parse['port']}", $payload)->getSocket(), default => throw new ConnectionException('Unsupported proxy protocol', ConnectionException::CONNECTION_ERROR), }; } @@ -209,7 +208,7 @@ public function pushConnection(Connection $connection, string $key): void unset($this->listenEventMap[$streamId]); } $this->idleConnections[$key][$streamId] = $connection; - $this->listenEventMap[$streamId] = $connection->stream->onReadable(function (SocketStream $stream) use ($key, $connection) { + $this->listenEventMap[$streamId] = $connection->stream->onReadable(function (Socket $stream) use ($key, $connection) { try { if ($stream->read(1) === '' && $stream->eof()) { throw new ConnectionException('Connection closed by peer', ConnectionException::CONNECTION_CLOSED); diff --git a/src/Guzzle.php b/src/Guzzle.php index 31c9df9..add5284 100644 --- a/src/Guzzle.php +++ b/src/Guzzle.php @@ -41,6 +41,16 @@ protected function __construct() $this->rippleHandler = new RippleHandler($httpClient); } + /** + * @param array $config + * + * @return \GuzzleHttp\Client + */ + public static function newClient(array $config = []): Client + { + return new Client(array_merge(['handler' => self::getInstance()->getHandler()], $config)); + } + /*** @return Guzzle */ public static function getInstance(): Guzzle { @@ -50,16 +60,6 @@ public static function getInstance(): Guzzle return self::$instance; } - /** - * @Author cclilshy - * @Date 2024/8/31 14:28 - * @return RippleHandler - */ - public function getHandler(): RippleHandler - { - return $this->rippleHandler; - } - /** * @return \Ripple\Http\Client */ @@ -69,12 +69,12 @@ public function getHttpClient(): \Ripple\Http\Client } /** - * @param array $config - * - * @return \GuzzleHttp\Client + * @Author cclilshy + * @Date 2024/8/31 14:28 + * @return RippleHandler */ - public static function newClient(array $config = []): Client + public function getHandler(): RippleHandler { - return new Client(array_merge(['handler' => self::getInstance()->getHandler()], $config)); + return $this->rippleHandler; } } diff --git a/src/Guzzle/RippleHandler.php b/src/Guzzle/RippleHandler.php index f6d4adf..ee787b3 100644 --- a/src/Guzzle/RippleHandler.php +++ b/src/Guzzle/RippleHandler.php @@ -22,7 +22,7 @@ class RippleHandler { - public function __construct(private Client $httpClient) + public function __construct(private readonly Client $httpClient) { } diff --git a/src/Server.php b/src/Server.php index 1ee9601..55cfb8a 100644 --- a/src/Server.php +++ b/src/Server.php @@ -13,13 +13,12 @@ namespace Ripple\Http; use Closure; -use Co\IO; use InvalidArgumentException; use Ripple\Http\Enum\Status; use Ripple\Http\Server\Connection; use Ripple\Http\Server\Exception\FormatException; use Ripple\Http\Server\Request; -use Ripple\Socket\SocketStream; +use Ripple\Socket; use Ripple\Stream\Exception\ConnectionException; use Ripple\Utils\Output; use Throwable; @@ -46,8 +45,8 @@ class Server */ public Closure $onRequest; - /*** @var SocketStream */ - private SocketStream $server; + /*** @var \Ripple\Socket */ + private Socket $server; /** * @param string $address @@ -74,7 +73,7 @@ public function __construct(string $address, mixed $context = null) }; $server = match ($scheme) { - 'http', 'https' => IO::Socket()->server("tcp://{$host}:{$port}", $context), + 'http', 'https' => Socket::server("tcp://{$host}:{$port}", $context), default => throw new InvalidArgumentException('Address format error') }; @@ -92,7 +91,7 @@ public function __construct(string $address, mixed $context = null) */ public function listen(): void { - $this->server->onReadable(function (SocketStream $stream) { + $this->server->onReadable(function (Socket $stream) { if (!$client = $stream->accept()) { return; } @@ -123,11 +122,11 @@ public function listen(): void } /** - * @param SocketStream $stream + * @param \Ripple\Socket $stream * * @return void */ - private function listenSocket(SocketStream $stream): void + private function listenSocket(Socket $stream): void { $connection = new Connection($stream); $connection->listen(function (array $requestInfo) use ($stream) { diff --git a/src/Server/Connection.php b/src/Server/Connection.php index e5077a8..d8a9be7 100644 --- a/src/Server/Connection.php +++ b/src/Server/Connection.php @@ -15,7 +15,7 @@ use Closure; use Ripple\Http\Server\Exception\FormatException; use Ripple\Http\Server\Upload\MultipartHandler; -use Ripple\Socket\SocketStream; +use Ripple\Socket; use Ripple\Stream\Exception\RuntimeException; use Ripple\Utils\Output; use Throwable; @@ -82,9 +82,9 @@ class Connection private int $contentLength; /** - * @param SocketStream $stream + * @param Socket $stream */ - public function __construct(private readonly SocketStream $stream) + public function __construct(private readonly Socket $stream) { $this->reset(); } @@ -120,7 +120,7 @@ public function listen(Closure $builder): void } }); - $this->stream->onReadable(function (SocketStream $stream) use ($builder) { + $this->stream->onReadable(function (Socket $stream) use ($builder) { try { $content = $stream->read(8192); } catch (Throwable) { diff --git a/src/Server/Request.php b/src/Server/Request.php index 2082953..c3a2ac8 100644 --- a/src/Server/Request.php +++ b/src/Server/Request.php @@ -12,7 +12,7 @@ namespace Ripple\Http\Server; -use Ripple\Socket\SocketStream; +use Ripple\Socket; use function array_merge; use function is_string; @@ -30,30 +30,30 @@ class Request protected Response $response; /** - * @param SocketStream $stream - * @param array $GET - * @param array $POST - * @param array $COOKIE - * @param array $FILES - * @param array $SERVER - * @param mixed|null $CONTENT + * @param Socket $stream + * @param array $GET + * @param array $POST + * @param array $COOKIE + * @param array $FILES + * @param array $SERVER + * @param mixed|null $CONTENT */ public function __construct( - public readonly SocketStream $stream, - public readonly array $GET = [], - public readonly array $POST = [], - public readonly array $COOKIE = [], - public readonly array $FILES = [], - public readonly array $SERVER = [], - public readonly mixed $CONTENT = null, + public readonly Socket $stream, + public readonly array $GET = [], + public readonly array $POST = [], + public readonly array $COOKIE = [], + public readonly array $FILES = [], + public readonly array $SERVER = [], + public readonly mixed $CONTENT = null, ) { $this->REQUEST = array_merge($this->GET, $this->POST); } /** - * @return SocketStream + * @return Socket */ - public function getStream(): SocketStream + public function getStream(): Socket { return $this->stream; } diff --git a/src/Server/Response.php b/src/Server/Response.php index 8be4706..6f60fb9 100644 --- a/src/Server/Response.php +++ b/src/Server/Response.php @@ -14,7 +14,7 @@ use Closure; use Generator; -use Ripple\Socket\SocketStream; +use Ripple\Socket; use Ripple\Stream; use Ripple\Stream\Exception\ConnectionException; use Throwable; @@ -52,9 +52,9 @@ class Response protected string $statusText = 'OK'; /** - * @param SocketStream $stream + * @param Socket $stream */ - public function __construct(private readonly SocketStream $stream) + public function __construct(private readonly Socket $stream) { } @@ -199,9 +199,9 @@ public function setStatusCode(int $statusCode): static /** * @Author cclilshy * @Date 2024/9/1 14:12 - * @return SocketStream + * @return Socket */ - public function getStream(): SocketStream + public function getStream(): Socket { return $this->stream; } @@ -300,27 +300,27 @@ public function withHeaders(array $headers): static } /** - * @param string $name - * @param string $value + * @param array $cookies * * @return $this */ - public function withCookie(string $name, string $value): static + public function withCookies(array $cookies): static { - $this->cookies[$name] = $value; + foreach ($cookies as $name => $value) { + $this->withCookie($name, $value); + } return $this; } /** - * @param array $cookies + * @param string $name + * @param string $value * * @return $this */ - public function withCookies(array $cookies): static + public function withCookie(string $name, string $value): static { - foreach ($cookies as $name => $value) { - $this->withCookie($name, $value); - } + $this->cookies[$name] = $value; return $this; }