From e3de93407ac248113f237d42eea89c397a1adeeb Mon Sep 17 00:00:00 2001 From: Lynh Date: Thu, 7 Sep 2023 14:22:12 +0700 Subject: [PATCH] Refactor symfony http client (#5) --- src/Exception/RequestException.php | 11 +++ src/Psl/SymfonyClient.php | 100 +++------------------------ src/React/SymfonyClient.php | 100 +++------------------------ src/SymfonyClientTrait.php | 106 +++++++++++++++++++++++++++++ 4 files changed, 137 insertions(+), 180 deletions(-) create mode 100644 src/Exception/RequestException.php create mode 100644 src/SymfonyClientTrait.php diff --git a/src/Exception/RequestException.php b/src/Exception/RequestException.php new file mode 100644 index 0000000..209e360 --- /dev/null +++ b/src/Exception/RequestException.php @@ -0,0 +1,11 @@ +streamFactory = $streamFactory ?? Psr17FactoryDiscovery::findStreamFactory(); } - public function sendRequest(RequestInterface $request): ResponseInterface - { - try { - $body = $request->getBody(); - - if ($body->isSeekable()) { - $body->seek(0); - } - - $options = [ - 'headers' => $request->getHeaders(), - 'body' => $body->getContents(), - ]; - - if ('1.0' === $request->getProtocolVersion()) { - $options['http_version'] = '1.0'; - } - - return $this->createResponse( - $this->client->request($request->getMethod(), (string) $request->getUri(), $options) - )->await(); - // @codeCoverageIgnoreStart - } catch (TransportExceptionInterface $e) { - if ($e instanceof \InvalidArgumentException) { - throw new RequestException($e->getMessage(), $request, null, $e); - } - - throw new NetworkException($e->getMessage(), $request, $e); - } - // @codeCoverageIgnoreEnd - } - - private function createResponse(SymfonyResponseInterface $response): Awaitable - { - $defer = new Async\Deferred(); - - Async\Scheduler::defer(function () use ($defer, $response) { - $psrResponse = $this->responseFactory->createResponse($response->getStatusCode()); - - foreach ($response->getHeaders(false) as $name => $values) { - foreach ($values as $value) { - try { - $psrResponse = $psrResponse->withAddedHeader($name, $value); - // @codeCoverageIgnoreStart - } catch (\InvalidArgumentException) { - // ignore invalid header - } - // @codeCoverageIgnoreEnd - } - } - - $body = $response instanceof StreamableInterface ? $response->toStream(false) : StreamWrapper::createResource($response, $this->client); - $body = $this->streamFactory->createStreamFromResource($body); - - if ($body->isSeekable()) { - try { - $body->seek(0); - } catch (\Throwable $e) { - // $defer->reject($e); - } - } - - $defer->complete($psrResponse->withBody($body)); - }); - - return $defer->getAwaitable(); - } - /** - * @codeCoverageIgnore + * @param \Closure(): ResponseInterface $response */ - public function withOptions(array $options): static + private function createResponse(\Closure $response): mixed { - $clone = clone $this; - $clone->client = $clone->client->withOptions($options); + $defer = new Async\Deferred(); - return $clone; - } + Async\Scheduler::defer(static function () use ($defer, $response) { + $defer->complete($response()); + }); - /** - * @codeCoverageIgnore - */ - public function reset(): void - { - if ($this->client instanceof ResetInterface) { - $this->client->reset(); - } + return $defer->getAwaitable()->await(); } } diff --git a/src/React/SymfonyClient.php b/src/React/SymfonyClient.php index d822c04..709126d 100644 --- a/src/React/SymfonyClient.php +++ b/src/React/SymfonyClient.php @@ -5,9 +5,7 @@ namespace Jenky\Atlas\Pool\React; use Http\Discovery\Psr17FactoryDiscovery; -use Jenky\Atlas\Exception\NetworkException; -use Jenky\Atlas\Exception\RequestException; -use Psr\Http\Message\RequestInterface; +use Jenky\Atlas\Pool\SymfonyClientTrait; use Psr\Http\Message\ResponseFactoryInterface; use Psr\Http\Message\ResponseInterface; use Psr\Http\Message\StreamFactoryInterface; @@ -15,17 +13,14 @@ use React\EventLoop\Loop; use React\EventLoop\LoopInterface; use React\Promise\Deferred; -use React\Promise\PromiseInterface; use Symfony\Component\HttpClient\HttpClient; -use Symfony\Component\HttpClient\Response\StreamableInterface; -use Symfony\Component\HttpClient\Response\StreamWrapper; -use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface; use Symfony\Contracts\HttpClient\HttpClientInterface; -use Symfony\Contracts\HttpClient\ResponseInterface as SymfonyResponseInterface; use Symfony\Contracts\Service\ResetInterface; final class SymfonyClient implements AsyncClientInterface, ResetInterface { + use SymfonyClientTrait; + private HttpClientInterface $client; private ResponseFactoryInterface $responseFactory; @@ -46,92 +41,17 @@ public function __construct( $this->loop = $loop ?? Loop::get(); } - public function sendRequest(RequestInterface $request): ResponseInterface - { - try { - $body = $request->getBody(); - - if ($body->isSeekable()) { - $body->seek(0); - } - - $options = [ - 'headers' => $request->getHeaders(), - 'body' => $body->getContents(), - ]; - - if ('1.0' === $request->getProtocolVersion()) { - $options['http_version'] = '1.0'; - } - - return Async\await($this->createResponse( - $this->client->request($request->getMethod(), (string) $request->getUri(), $options) - )); - // @codeCoverageIgnoreStart - } catch (TransportExceptionInterface $e) { - if ($e instanceof \InvalidArgumentException) { - throw new RequestException($e->getMessage(), $request, null, $e); - } - - throw new NetworkException($e->getMessage(), $request, $e); - } - // @codeCoverageIgnoreEnd - } - - private function createResponse(SymfonyResponseInterface $response): PromiseInterface - { - $defer = new Deferred(); - - $this->loop->futureTick(function () use ($defer, $response) { - $psrResponse = $this->responseFactory->createResponse($response->getStatusCode()); - - foreach ($response->getHeaders(false) as $name => $values) { - foreach ($values as $value) { - try { - $psrResponse = $psrResponse->withAddedHeader($name, $value); - // @codeCoverageIgnoreStart - } catch (\InvalidArgumentException) { - // ignore invalid header - } - // @codeCoverageIgnoreEnd - } - } - - $body = $response instanceof StreamableInterface ? $response->toStream(false) : StreamWrapper::createResource($response, $this->client); - $body = $this->streamFactory->createStreamFromResource($body); - - if ($body->isSeekable()) { - try { - $body->seek(0); - } catch (\Throwable $e) { - // $defer->reject($e); - } - } - - $defer->resolve($psrResponse->withBody($body)); - }); - - return $defer->promise(); - } - /** - * @codeCoverageIgnore + * @param \Closure(): ResponseInterface $response */ - public function withOptions(array $options): static + private function createResponse(\Closure $response): mixed { - $clone = clone $this; - $clone->client = $clone->client->withOptions($options); + $defer = new Deferred(); - return $clone; - } + $this->loop->futureTick(static function () use ($defer, $response) { + $defer->resolve($response()); + }); - /** - * @codeCoverageIgnore - */ - public function reset(): void - { - if ($this->client instanceof ResetInterface) { - $this->client->reset(); - } + return Async\await($defer->promise()); } } diff --git a/src/SymfonyClientTrait.php b/src/SymfonyClientTrait.php new file mode 100644 index 0000000..e3f9a99 --- /dev/null +++ b/src/SymfonyClientTrait.php @@ -0,0 +1,106 @@ +getBody(); + + if ($body->isSeekable()) { + $body->seek(0); + } + + $options = [ + 'headers' => $request->getHeaders(), + 'body' => $body->getContents(), + ]; + + if ('1.0' === $request->getProtocolVersion()) { + $options['http_version'] = '1.0'; + } + + $response = $this->client->request($request->getMethod(), (string) $request->getUri(), $options); + + return $this->createResponse(fn () => $this->convertToPsrResponse($response)); + // @codeCoverageIgnoreStart + } catch (TransportExceptionInterface $e) { + if ($e instanceof \InvalidArgumentException) { + throw new RequestException($e->getMessage(), $request, null, $e); + } + + throw new NetworkException($e->getMessage(), $request, $e); + } + // @codeCoverageIgnoreEnd + } + + private function convertToPsrResponse(SymfonyResponseInterface $response): ResponseInterface + { + $psrResponse = $this->responseFactory->createResponse($response->getStatusCode()); + + foreach ($response->getHeaders(false) as $name => $values) { + foreach ($values as $value) { + try { + $psrResponse = $psrResponse->withAddedHeader($name, $value); + // @codeCoverageIgnoreStart + } catch (\InvalidArgumentException) { + // ignore invalid header + } + // @codeCoverageIgnoreEnd + } + } + + $body = $response instanceof StreamableInterface ? $response->toStream(false) : StreamWrapper::createResource($response, $this->client); + $body = $this->streamFactory->createStreamFromResource($body); + + if ($body->isSeekable()) { + try { + $body->seek(0); + } catch (\Throwable) { + // + } + } + + return $psrResponse->withBody($body); + } + + /** + * @codeCoverageIgnore + */ + public function withOptions(array $options): static + { + $clone = clone $this; + $clone->client = $clone->client->withOptions($options); + + return $clone; + } + + /** + * @codeCoverageIgnore + */ + public function reset(): void + { + if ($this->client instanceof ResetInterface) { + $this->client->reset(); + } + } +}