Skip to content

Commit

Permalink
Refactor symfony http client (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
jenky committed Sep 7, 2023
1 parent 57e4e2f commit e3de934
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 180 deletions.
11 changes: 11 additions & 0 deletions src/Exception/RequestException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<?php

declare(strict_types=1);

namespace Jenky\Atlas\Pool\Exception;

use Jenky\Atlas\Exception\RequestAwareException;

class RequestException extends RequestAwareException
{
}
100 changes: 10 additions & 90 deletions src/Psl/SymfonyClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,19 @@
namespace Jenky\Atlas\Pool\Psl;

use Http\Discovery\Psr17FactoryDiscovery;
use Jenky\Atlas\Exception\NetworkException;
use Jenky\Atlas\Exception\RequestException;
use Jenky\Atlas\Pool\SymfonyClientTrait;
use Psl\Async;
use Psl\Async\Awaitable;
use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseFactoryInterface;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\StreamFactoryInterface;
use Symfony\Component\HttpClient\HttpClient;
use Symfony\Component\HttpClient\Response\StreamableInterface;
use Symfony\Component\HttpClient\Response\StreamWrapper;
use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface;
use Symfony\Contracts\HttpClient\HttpClientInterface;
use Symfony\Contracts\HttpClient\ResponseInterface as SymfonyResponseInterface;
use Symfony\Contracts\Service\ResetInterface;

final class SymfonyClient implements AsyncClientInterface, ResetInterface
{
use SymfonyClientTrait;

private HttpClientInterface $client;

private ResponseFactoryInterface $responseFactory;
Expand All @@ -39,92 +34,17 @@ public function __construct(
$this->streamFactory = $streamFactory ?? Psr17FactoryDiscovery::findStreamFactory();
}

public function sendRequest(RequestInterface $request): ResponseInterface
{
try {
$body = $request->getBody();

if ($body->isSeekable()) {
$body->seek(0);
}

$options = [
'headers' => $request->getHeaders(),
'body' => $body->getContents(),
];

if ('1.0' === $request->getProtocolVersion()) {
$options['http_version'] = '1.0';
}

return $this->createResponse(
$this->client->request($request->getMethod(), (string) $request->getUri(), $options)
)->await();
// @codeCoverageIgnoreStart
} catch (TransportExceptionInterface $e) {
if ($e instanceof \InvalidArgumentException) {
throw new RequestException($e->getMessage(), $request, null, $e);
}

throw new NetworkException($e->getMessage(), $request, $e);
}
// @codeCoverageIgnoreEnd
}

private function createResponse(SymfonyResponseInterface $response): Awaitable
{
$defer = new Async\Deferred();

Async\Scheduler::defer(function () use ($defer, $response) {
$psrResponse = $this->responseFactory->createResponse($response->getStatusCode());

foreach ($response->getHeaders(false) as $name => $values) {
foreach ($values as $value) {
try {
$psrResponse = $psrResponse->withAddedHeader($name, $value);
// @codeCoverageIgnoreStart
} catch (\InvalidArgumentException) {
// ignore invalid header
}
// @codeCoverageIgnoreEnd
}
}

$body = $response instanceof StreamableInterface ? $response->toStream(false) : StreamWrapper::createResource($response, $this->client);
$body = $this->streamFactory->createStreamFromResource($body);

if ($body->isSeekable()) {
try {
$body->seek(0);
} catch (\Throwable $e) {
// $defer->reject($e);
}
}

$defer->complete($psrResponse->withBody($body));
});

return $defer->getAwaitable();
}

/**
* @codeCoverageIgnore
* @param \Closure(): ResponseInterface $response
*/
public function withOptions(array $options): static
private function createResponse(\Closure $response): mixed
{
$clone = clone $this;
$clone->client = $clone->client->withOptions($options);
$defer = new Async\Deferred();

return $clone;
}
Async\Scheduler::defer(static function () use ($defer, $response) {
$defer->complete($response());
});

/**
* @codeCoverageIgnore
*/
public function reset(): void
{
if ($this->client instanceof ResetInterface) {
$this->client->reset();
}
return $defer->getAwaitable()->await();
}
}
100 changes: 10 additions & 90 deletions src/React/SymfonyClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,27 +5,22 @@
namespace Jenky\Atlas\Pool\React;

use Http\Discovery\Psr17FactoryDiscovery;
use Jenky\Atlas\Exception\NetworkException;
use Jenky\Atlas\Exception\RequestException;
use Psr\Http\Message\RequestInterface;
use Jenky\Atlas\Pool\SymfonyClientTrait;
use Psr\Http\Message\ResponseFactoryInterface;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\StreamFactoryInterface;
use React\Async;
use React\EventLoop\Loop;
use React\EventLoop\LoopInterface;
use React\Promise\Deferred;
use React\Promise\PromiseInterface;
use Symfony\Component\HttpClient\HttpClient;
use Symfony\Component\HttpClient\Response\StreamableInterface;
use Symfony\Component\HttpClient\Response\StreamWrapper;
use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface;
use Symfony\Contracts\HttpClient\HttpClientInterface;
use Symfony\Contracts\HttpClient\ResponseInterface as SymfonyResponseInterface;
use Symfony\Contracts\Service\ResetInterface;

final class SymfonyClient implements AsyncClientInterface, ResetInterface
{
use SymfonyClientTrait;

private HttpClientInterface $client;

private ResponseFactoryInterface $responseFactory;
Expand All @@ -46,92 +41,17 @@ public function __construct(
$this->loop = $loop ?? Loop::get();
}

public function sendRequest(RequestInterface $request): ResponseInterface
{
try {
$body = $request->getBody();

if ($body->isSeekable()) {
$body->seek(0);
}

$options = [
'headers' => $request->getHeaders(),
'body' => $body->getContents(),
];

if ('1.0' === $request->getProtocolVersion()) {
$options['http_version'] = '1.0';
}

return Async\await($this->createResponse(
$this->client->request($request->getMethod(), (string) $request->getUri(), $options)
));
// @codeCoverageIgnoreStart
} catch (TransportExceptionInterface $e) {
if ($e instanceof \InvalidArgumentException) {
throw new RequestException($e->getMessage(), $request, null, $e);
}

throw new NetworkException($e->getMessage(), $request, $e);
}
// @codeCoverageIgnoreEnd
}

private function createResponse(SymfonyResponseInterface $response): PromiseInterface
{
$defer = new Deferred();

$this->loop->futureTick(function () use ($defer, $response) {
$psrResponse = $this->responseFactory->createResponse($response->getStatusCode());

foreach ($response->getHeaders(false) as $name => $values) {
foreach ($values as $value) {
try {
$psrResponse = $psrResponse->withAddedHeader($name, $value);
// @codeCoverageIgnoreStart
} catch (\InvalidArgumentException) {
// ignore invalid header
}
// @codeCoverageIgnoreEnd
}
}

$body = $response instanceof StreamableInterface ? $response->toStream(false) : StreamWrapper::createResource($response, $this->client);
$body = $this->streamFactory->createStreamFromResource($body);

if ($body->isSeekable()) {
try {
$body->seek(0);
} catch (\Throwable $e) {
// $defer->reject($e);
}
}

$defer->resolve($psrResponse->withBody($body));
});

return $defer->promise();
}

/**
* @codeCoverageIgnore
* @param \Closure(): ResponseInterface $response
*/
public function withOptions(array $options): static
private function createResponse(\Closure $response): mixed
{
$clone = clone $this;
$clone->client = $clone->client->withOptions($options);
$defer = new Deferred();

return $clone;
}
$this->loop->futureTick(static function () use ($defer, $response) {
$defer->resolve($response());
});

/**
* @codeCoverageIgnore
*/
public function reset(): void
{
if ($this->client instanceof ResetInterface) {
$this->client->reset();
}
return Async\await($defer->promise());
}
}
106 changes: 106 additions & 0 deletions src/SymfonyClientTrait.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
<?php

declare(strict_types=1);

namespace Jenky\Atlas\Pool;

use Jenky\Atlas\Exception\NetworkException;
use Jenky\Atlas\Pool\Exception\RequestException;
use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseInterface;
use Symfony\Component\HttpClient\Response\StreamableInterface;
use Symfony\Component\HttpClient\Response\StreamWrapper;
use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface;
use Symfony\Contracts\HttpClient\ResponseInterface as SymfonyResponseInterface;
use Symfony\Contracts\Service\ResetInterface;

trait SymfonyClientTrait
{
/**
* @param \Closure(): ResponseInterface $response
*/
abstract private function createResponse(\Closure $response): mixed;

public function sendRequest(RequestInterface $request): ResponseInterface
{
try {
$body = $request->getBody();

if ($body->isSeekable()) {
$body->seek(0);
}

$options = [
'headers' => $request->getHeaders(),
'body' => $body->getContents(),
];

if ('1.0' === $request->getProtocolVersion()) {
$options['http_version'] = '1.0';
}

$response = $this->client->request($request->getMethod(), (string) $request->getUri(), $options);

return $this->createResponse(fn () => $this->convertToPsrResponse($response));
// @codeCoverageIgnoreStart
} catch (TransportExceptionInterface $e) {
if ($e instanceof \InvalidArgumentException) {
throw new RequestException($e->getMessage(), $request, null, $e);
}

throw new NetworkException($e->getMessage(), $request, $e);
}
// @codeCoverageIgnoreEnd
}

private function convertToPsrResponse(SymfonyResponseInterface $response): ResponseInterface
{
$psrResponse = $this->responseFactory->createResponse($response->getStatusCode());

foreach ($response->getHeaders(false) as $name => $values) {
foreach ($values as $value) {
try {
$psrResponse = $psrResponse->withAddedHeader($name, $value);
// @codeCoverageIgnoreStart
} catch (\InvalidArgumentException) {
// ignore invalid header
}
// @codeCoverageIgnoreEnd
}
}

$body = $response instanceof StreamableInterface ? $response->toStream(false) : StreamWrapper::createResource($response, $this->client);
$body = $this->streamFactory->createStreamFromResource($body);

if ($body->isSeekable()) {
try {
$body->seek(0);
} catch (\Throwable) {
//
}
}

return $psrResponse->withBody($body);
}

/**
* @codeCoverageIgnore
*/
public function withOptions(array $options): static
{
$clone = clone $this;
$clone->client = $clone->client->withOptions($options);

return $clone;
}

/**
* @codeCoverageIgnore
*/
public function reset(): void
{
if ($this->client instanceof ResetInterface) {
$this->client->reset();
}
}
}

0 comments on commit e3de934

Please sign in to comment.