Skip to content

Commit

Permalink
Update: Suppress unnecessary exceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
cclilshy committed Oct 31, 2024
1 parent e8290a9 commit 14620d5
Show file tree
Hide file tree
Showing 3 changed files with 128 additions and 138 deletions.
255 changes: 119 additions & 136 deletions src/Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,24 +34,21 @@

namespace Ripple\Http;

use Closure;
use Co\IO;
use GuzzleHttp\Psr7\MultipartStream;
use GuzzleHttp\Psr7\Response;
use InvalidArgumentException;
use Psr\Http\Message\RequestInterface;
use Ripple\Coroutine;
use Ripple\Http\Client\Connection;
use Ripple\Http\Client\ConnectionPool;
use Ripple\Promise;
use Ripple\Socket\SocketStream;
use Ripple\Socket\Tunnel\Http;
use Ripple\Socket\Tunnel\Socks5;
use Ripple\Stream\Exception\ConnectionException;
use Throwable;

use function Co\cancel;
use function Co\delay;
use function Co\repeat;
use function Co\getSuspension;
use function fclose;
use function fopen;
use function getenv;
Expand Down Expand Up @@ -90,163 +87,146 @@ public function __construct(private readonly array $config = [])
*/
public function request(RequestInterface $request, array $option = []): Response
{
return \Co\promise(function (Closure $resolve, Closure $reject, Promise $promise) use ($request, $option) {
$uri = $request->getUri();
$method = $request->getMethod();
$scheme = $uri->getScheme();
$host = $uri->getHost();
$uri = $request->getUri();
$method = $request->getMethod();
$scheme = $uri->getScheme();
$host = $uri->getHost();

if (!$port = $uri->getPort()) {
$port = $scheme === 'https' ? 443 : 80;
}
if (!$port = $uri->getPort()) {
$port = $scheme === 'https' ? 443 : 80;
}

if (!$path = $uri->getPath()) {
$path = '/';
}
if (!$path = $uri->getPath()) {
$path = '/';
}

if ($query = $uri->getQuery()) {
$query = "?{$query}";
} else {
$query = '';
}
if ($query = $uri->getQuery()) {
$query = "?{$query}";
} else {
$query = '';
}

if (!isset($option['proxy'])) {
if ($scheme === 'http' && $httpProxy = getenv('http_proxy')) {
$option['proxy'] = $httpProxy;
} elseif ($scheme === 'https' && $httpsProxy = getenv('https_proxy')) {
$option['proxy'] = $httpsProxy;
}
if (!isset($option['proxy'])) {
if ($scheme === 'http' && $httpProxy = getenv('http_proxy')) {
$option['proxy'] = $httpProxy;
} elseif ($scheme === 'https' && $httpsProxy = getenv('https_proxy')) {
$option['proxy'] = $httpsProxy;
}
}

$connection = $this->pullConnection(
$host,
$port,
$scheme === 'https',
$option['timeout'] ?? 0,
$option['proxy'] ?? null
);
$connection = $this->pullConnection(
$host,
$port,
$scheme === 'https',
$option['timeout'] ?? 0,
$option['proxy'] ?? null
);

$writeHandler = fn (string|false $content) => $connection->stream->write($content);
$tickHandler = fn (string|false $content) => $connection->tick($content);
$write = fn (string|false $content) => $connection->stream->write($content);
$tick = fn (string|false $content) => $connection->tick($content);

if ($captureWrite = $option['capture_write'] ?? null) {
$writeHandler = fn (string|false $content) => $captureWrite($content, $writeHandler);
}
if ($captureWrite = $option['capture_write'] ?? null) {
$write = fn (string|false $content) => $captureWrite($content, $write);
}

if ($captureRead = $option['capture_read'] ?? null) {
$tickHandler = fn (string|false $content) => $captureRead($content, $tickHandler);
}
if ($captureRead = $option['capture_read'] ?? null) {
$tick = fn (string|false $content) => $captureRead($content, $tick);
}

$suspension = getSuspension();
$header = "{$method} {$path}{$query} HTTP/1.1\r\n";
foreach ($request->getHeaders() as $name => $values) {
$header .= "{$name}: " . implode(', ', $values) . "\r\n";
}

$header = "{$method} {$path}{$query} HTTP/1.1\r\n";
foreach ($request->getHeaders() as $name => $values) {
$header .= "{$name}: " . implode(', ', $values) . "\r\n";
$write($header);
if ($bodyStream = $request->getBody()) {
if (!$request->getHeader('Content-Length')) {
$size = $bodyStream->getSize();
$size > 0 && $write("Content-Length: {$bodyStream->getSize()}\r\n");
}

$writeHandler($header);
if ($bodyStream = $request->getBody()) {
if (!$request->getHeader('Content-Length')) {
$size = $bodyStream->getSize();
$size > 0 && $writeHandler("Content-Length: {$bodyStream->getSize()}\r\n");
if ($bodyStream->getMetadata('uri') === 'php://temp') {
$write("\r\n");
if ($bodyContent = $bodyStream->getContents()) {
$write($bodyContent);
}

if ($bodyStream->getMetadata('uri') === 'php://temp') {
$writeHandler("\r\n");
if ($bodyContent = $bodyStream->getContents()) {
$writeHandler($bodyContent);
}
} elseif ($bodyStream instanceof MultipartStream) {
if (!$request->getHeader('Content-Type')) {
$writeHandler("Content-Type: multipart/form-data; boundary={$bodyStream->getBoundary()}\r\n");
} elseif ($bodyStream instanceof MultipartStream) {
if (!$request->getHeader('Content-Type')) {
$write("Content-Type: multipart/form-data; boundary={$bodyStream->getBoundary()}\r\n");
}
$write("\r\n");
try {
while (!$bodyStream->eof()) {
$write($bodyStream->read(8192));
}
$writeHandler("\r\n");
repeat(static function (Closure $cancel) use ($connection, $bodyStream, $resolve, $reject, $writeHandler) {
try {
$content = '';
while ($buffer = $bodyStream->read(8192)) {
$content .= $buffer;
}

if ($content) {
$writeHandler($content);
} else {
$cancel();
$bodyStream->close();
}
} catch (Throwable) {
$cancel();
$bodyStream->close();
$reject(new InvalidArgumentException('Invalid body stream'));
}
}, 0.1);
} else {
throw new InvalidArgumentException('Invalid body stream');
} catch (Throwable) {
$bodyStream->close();
$connection->stream->close();
throw new ConnectionException('Invalid body stream');
}
} else {
$writeHandler("\r\n");
throw new ConnectionException('Invalid body stream');
}
} else {
$write("\r\n");
}

if ($timeout = $option['timeout'] ?? null) {
$delayEventId = delay(static function () use ($connection, $reject) {
$connection->stream->close();
$reject(new ConnectionException('Request timeout', ConnectionException::CONNECTION_TIMEOUT));
}, $timeout);
/*** Parse response process*/
if ($timeout = $option['timeout'] ?? null) {
$timeoutOID = delay(static function () use ($connection, $suspension) {
Coroutine::throw(
$suspension,
new ConnectionException('Request timeout', ConnectionException::CONNECTION_TIMEOUT)
);
}, $timeout);
}

$promise->finally(static function () use ($delayEventId) {
cancel($delayEventId);
});
}
if ($sink = $option['sink'] ?? null) {
$connection->setOutput(fopen($sink, 'wb'));
}

if ($sink = $option['sink'] ?? null) {
$connection->setOutput($sinkFile = fopen($sink, 'wb'));
$promise->finally(static function () use ($sinkFile) {
if (is_resource($sinkFile)) {
fclose($sinkFile);
while (1) {
try {
$connection->stream->waitForReadable();
$content = $connection->stream->readContinuously(8192);
if ($content === '') {
if (!$connection->stream->eof()) {
continue;
}
});
}

/*** Parse response process*/
$connection->stream->onReadable(function (SocketStream $socketStream, Closure $cancel) use (
$host,
$port,
$connection,
$scheme,
$resolve,
$reject,
$tickHandler
) {
try {
$content = $socketStream->readContinuously(8192);
$response = $tick(false);
} else {
$response = $tick($content);
}

if ($content === '') {
if (!$socketStream->eof()) {
return;
}
$response = $tickHandler(false);
if ($response) {
$k = implode(', ', $response->getHeader('Connection'));
if (str_contains(strtolower($k), 'keep-alive') && $this->pool) {
/*** Push into connection pool*/
$this->pushConnection(
$connection,
ConnectionPool::generateConnectionKey($host, $port)
);
$connection->stream->cancelReadable();
} else {
$response = $tickHandler($content);
$connection->stream->close();
}
return ($response);
}
} catch (Throwable $exception) {
$connection->stream->close();
throw $exception;
} finally {
if (isset($timeoutOID)) {
cancel($timeoutOID);
}

if ($response) {
$k = implode(', ', $response->getHeader('Connection'));
if (str_contains(strtolower($k), 'keep-alive') && $this->pool) {
/*** Push into connection pool*/
$this->pushConnection(
$connection,
ConnectionPool::generateConnectionKey($host, $port)
);
$cancel();
} else {
$socketStream->close();
}
$resolve($response);
}
} catch (Throwable $exception) {
$socketStream->close();
$reject($exception);
if ($sink && is_resource($sink)) {
fclose($sink);
}
});
})->await();
}
}

}

/**
Expand All @@ -262,6 +242,9 @@ public function request(RequestInterface $request, array $option = []): Response
*/
private function pullConnection(string $host, int $port, bool $ssl, int $timeout = 0, string|null $tunnel = null): Connection
{
if ($tunnel && in_array($host, ['127.0.0.1', 'localhost', '::1'], true)) {
$tunnel = null;
}
if ($this->pool) {
$connection = $this->connectionPool->pullConnection($host, $port, $ssl, $timeout, $tunnel);
} else {
Expand Down
4 changes: 3 additions & 1 deletion src/Client/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,9 @@ private function reset(): void
public function tick(string|false $content): ResponseInterface|null
{
if ($content === false) {
if (isset($this->headers['CONTENT-LENGTH'])) {
if (!$this->headers) {
throw new RuntimeException('Response header is required');
} elseif (isset($this->headers['CONTENT-LENGTH'])) {
throw new RuntimeException('Response content length is required');
} elseif ($this->chunk) {
throw new RuntimeException('Response chunked is required');
Expand Down
7 changes: 6 additions & 1 deletion tests/HttpTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,11 @@ private function httpClient(): void
'https://business.oceanengine.com/login',
'https://www.laruence.com/',
'https://www.php.net/',
'https://www.abc.net/',
'https://www.491e5d73fbeb64e8e7d66b25cb3d1823.net/',
'http://www.491e5d73fbeb64e8e7d66b25cb3d1823.net/',
];


$list = [];
foreach ($urls as $url) {
$list[] = async(function () use ($url) {
Expand All @@ -218,6 +220,9 @@ private function httpClient(): void
} catch (Throwable $exception) {
return [$url, $exception];
}
})->except(function () {
\var_dump(1);
die;
});
}

Expand Down

0 comments on commit 14620d5

Please sign in to comment.