Skip to content

Commit

Permalink
Rewrite new version
Browse files Browse the repository at this point in the history
  • Loading branch information
jenky committed Sep 20, 2023
1 parent baa1731 commit 23a685d
Show file tree
Hide file tree
Showing 34 changed files with 720 additions and 569 deletions.
6 changes: 3 additions & 3 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
],
"require": {
"php": "^8.1",
"jenky/atlas": "^0.5",
"jenky/concurrency": "^1.0"
"jenky/atlas": "^0.5"
},
"require-dev": {
"azjezz/psl": "^2.7",
Expand All @@ -34,7 +33,8 @@
"phpunit/phpunit": "^9.0",
"react/async": "^4.1",
"react/http": "^1.9",
"symfony/http-client": "^6.3"
"symfony/http-client": "^6.3",
"symfony/var-dumper": "^6.3"
},
"autoload": {
"psr-4": {
Expand Down
18 changes: 18 additions & 0 deletions src/Client/AsyncClientInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

declare(strict_types=1);

namespace Jenky\Atlas\Pool\Client;

use Psr\Http\Client\ClientInterface;

interface AsyncClientInterface extends ClientInterface
{
public const DRIVER_PSL = 1;
public const DRIVER_REACT = 2;

/**
* Get the underlying async driver type.
*/
public function driver(): int;
}
25 changes: 25 additions & 0 deletions src/Client/AsyncClientTrait.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?php

declare(strict_types=1);

namespace Jenky\Atlas\Pool\Client;

use Jenky\Atlas\Pool\Concurrency\Deferrable;
use Jenky\Atlas\Pool\Concurrency\PslDeferred;
use Jenky\Atlas\Pool\Concurrency\ReactDeferred;

trait AsyncClientTrait
{
abstract private function getDeferrable(): Deferrable;

public function driver(): int
{
$deferrable = $this->getDeferrable();

return match (true) {
$deferrable instanceof PslDeferred => AsyncClientInterface::DRIVER_PSL,
$deferrable instanceof ReactDeferred => AsyncClientInterface::DRIVER_REACT,
default => 0,
};
}
}
77 changes: 77 additions & 0 deletions src/Client/Factory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
<?php

declare(strict_types=1);

namespace Jenky\Atlas\Pool\Client;

use GuzzleHttp\ClientInterface as GuzzleClientInterface;
use Jenky\Atlas\Pool\Concurrency\PslDeferred;
use Jenky\Atlas\Pool\Concurrency\ReactDeferred;
use Jenky\Atlas\Pool\Exception\UnsupportedClientException;
use Jenky\Atlas\Pool\Exception\UnsupportedFeatureException;
use Jenky\Atlas\Pool\Util;
use Psr\Http\Client\ClientInterface;
use Symfony\Component\HttpClient\Psr18Client;
use Symfony\Contracts\HttpClient\HttpClientInterface;

final class Factory
{
/**
* Create new async version of the given client.
*
* @throws \Jenky\Atlas\Pool\Exception\UnsupportedClientException
* @throws \Jenky\Atlas\Pool\Exception\UnsupportedFeatureException
*/
public static function createAsyncClient(ClientInterface $client): AsyncClientInterface
{
if (Util::isPslInstalled()) {
if ($client instanceof GuzzleClientInterface) {
return new GuzzleClient(new PslDeferred(), $client);
}

if ($client instanceof Psr18Client) {
return new SymfonyClient(new PslDeferred(), self::getUnderlyingSymfonyHttpClient($client));
}

throw new UnsupportedClientException(\sprintf(
'The client %s is not supported. The PSL Pool only supports "guzzlehttp/guzzle" and "symfony/http-client".',
\get_debug_type($client)
));
}

if (Util::isReactInstalled()) {
if ($client instanceof GuzzleClientInterface) {
return new GuzzleClient(new ReactDeferred(), $client);
}

if ($client instanceof Psr18Client) {
return new SymfonyClient(new ReactDeferred(), self::getUnderlyingSymfonyHttpClient($client));
}

if (\class_exists(Browser::class)) {
return new ReactClient();
}

throw new UnsupportedClientException(\sprintf(
'The concurrent requests feature cannot be used as the client %s is not supported. To utilize this feature, please install package "react/http".',
\get_debug_type($client)
));
}

throw new UnsupportedFeatureException('You cannot use the concurrent request pool feature as the required packages are not installed.');
}

private static function getUnderlyingSymfonyHttpClient(Psr18Client $client): ?HttpClientInterface
{
try {
$reflectionProperty = new \ReflectionProperty($client, 'client');
$reflectionProperty->setAccessible(true);

return $reflectionProperty->getValue($client);
// @codeCoverageIgnoreStart
} catch (\Throwable) {
return null;
}
// @codeCoverageIgnoreEnd
}
}
20 changes: 12 additions & 8 deletions src/Psl/GuzzleClient.php → src/Client/GuzzleClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,23 @@

declare(strict_types=1);

namespace Jenky\Atlas\Pool\Psl;
namespace Jenky\Atlas\Pool\Client;

use GuzzleHttp\Client;
use GuzzleHttp\ClientInterface;
use GuzzleHttp\RequestOptions;
use Psl\Async;
use Jenky\Atlas\Pool\Concurrency\Deferrable;
use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseInterface;

final class GuzzleClient implements AsyncClientInterface
{
use AsyncClientTrait;

private ClientInterface $client;

public function __construct(
private readonly Deferrable $deferred,
?ClientInterface $client = null,
) {
$this->client = $client ?? new Client();
Expand All @@ -28,15 +31,16 @@ public function sendRequest(RequestInterface $request): ResponseInterface
RequestOptions::HTTP_ERRORS => false,
]);

$defer = new Async\Deferred();

Async\Scheduler::defer(static function () use ($defer, $promise) {
return $this->deferred->defer(static function (callable $resolve, callable $reject) use ($promise) {
$promise->then(
fn (ResponseInterface $response) => $defer->complete($response),
fn (\Throwable $e) => $defer->error($e)
fn (ResponseInterface $response) => $resolve($response),
fn (\Throwable $e) => $reject($e)
)->wait();
});
}

return $defer->getAwaitable()->await();
private function getDeferrable(): Deferrable
{
return $this->deferred;
}
}
9 changes: 7 additions & 2 deletions src/React/Client.php → src/Client/ReactClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@

declare(strict_types=1);

namespace Jenky\Atlas\Pool\React;
namespace Jenky\Atlas\Pool\Client;

use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseInterface;
use React\Async;
use React\Http\Browser;

final class Client implements AsyncClientInterface
final class ReactClient implements AsyncClientInterface
{
private Browser $browser;

Expand All @@ -31,4 +31,9 @@ public function sendRequest(RequestInterface $request): ResponseInterface
)
);
}

public function driver(): int
{
return self::DRIVER_REACT;
}
}
52 changes: 44 additions & 8 deletions src/SymfonyClientTrait.php → src/Client/SymfonyClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,44 @@

declare(strict_types=1);

namespace Jenky\Atlas\Pool;
namespace Jenky\Atlas\Pool\Client;

use Http\Discovery\Psr17FactoryDiscovery;
use Jenky\Atlas\Exception\NetworkException;
use Jenky\Atlas\Pool\Exception\RequestException;
use Jenky\Atlas\Exception\RequestException;
use Jenky\Atlas\Pool\Concurrency\Deferrable;
use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseFactoryInterface;
use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\StreamFactoryInterface;
use Symfony\Component\HttpClient\HttpClient;
use Symfony\Component\HttpClient\Response\StreamableInterface;
use Symfony\Component\HttpClient\Response\StreamWrapper;
use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface;
use Symfony\Contracts\HttpClient\HttpClientInterface;
use Symfony\Contracts\HttpClient\ResponseInterface as SymfonyResponseInterface;
use Symfony\Contracts\Service\ResetInterface;

trait SymfonyClientTrait
final class SymfonyClient implements AsyncClientInterface, ResetInterface
{
/**
* @param \Closure(): ResponseInterface $response
*/
abstract private function createResponse(\Closure $response): mixed;
use AsyncClientTrait;

private HttpClientInterface $client;

private ResponseFactoryInterface $responseFactory;

private StreamFactoryInterface $streamFactory;

public function __construct(
private readonly Deferrable $deferred,
?HttpClientInterface $client = null,
?ResponseFactoryInterface $responseFactory = null,
?StreamFactoryInterface $streamFactory = null,
) {
$this->client = $client ?? HttpClient::create();
$this->responseFactory = $responseFactory ?? Psr17FactoryDiscovery::findResponseFactory();
$this->streamFactory = $streamFactory ?? Psr17FactoryDiscovery::findStreamFactory();
}

public function sendRequest(RequestInterface $request): ResponseInterface
{
Expand All @@ -41,7 +61,7 @@ public function sendRequest(RequestInterface $request): ResponseInterface

$response = $this->client->request($request->getMethod(), (string) $request->getUri(), $options);

return $this->createResponse(fn () => $this->convertToPsrResponse($response));
return $this->createResponse($response);
// @codeCoverageIgnoreStart
} catch (TransportExceptionInterface $e) {
if ($e instanceof \InvalidArgumentException) {
Expand All @@ -53,6 +73,17 @@ public function sendRequest(RequestInterface $request): ResponseInterface
// @codeCoverageIgnoreEnd
}

private function createResponse(SymfonyResponseInterface $response): mixed
{
return $this->deferred->defer(function (callable $resolve, callable $reject) use ($response) {
try {
$resolve($this->convertToPsrResponse($response));
} catch (\Throwable $e) {
$reject($e);
}
});
}

private function convertToPsrResponse(SymfonyResponseInterface $response): ResponseInterface
{
$psrResponse = $this->responseFactory->createResponse($response->getStatusCode());
Expand Down Expand Up @@ -83,6 +114,11 @@ private function convertToPsrResponse(SymfonyResponseInterface $response): Respo
return $psrResponse->withBody($body);
}

private function getDeferrable(): Deferrable
{
return $this->deferred;
}

/**
* @codeCoverageIgnore
*/
Expand Down
16 changes: 16 additions & 0 deletions src/Concurrency/Deferrable.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?php

declare(strict_types=1);

namespace Jenky\Atlas\Pool\Concurrency;

interface Deferrable
{
/**
* @template T
*
* @param callable(\Closure(T), \Closure(\Throwable)) $callback
* @return T
*/
public function defer(callable $callback): mixed;
}
41 changes: 41 additions & 0 deletions src/Concurrency/PslConcurrency.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?php

declare(strict_types=1);

namespace Jenky\Atlas\Pool\Concurrency;

use Psl\Async;

final class PslConcurrency implements Runner
{
private readonly Async\Semaphore $semaphore;

/**
* @param int<1, max> $limit
*/
public function __construct(int $limit = 10, ?\Closure $operation = null)
{
if ($limit < 1) {
throw new \ValueError('Argument #1 ($limit) must be positive, got '.$limit);
}

$this->semaphore = new Async\Semaphore(
$limit, $operation ?? static fn ($value) => $value
);
}

public function run(iterable $tasks): array
{
$promises = static function (iterable $tasks, Async\Semaphore $semaphore) {
foreach ($tasks as $key => $task) {
if (! \is_callable($task)) {
continue;
}

yield $key => static fn () => $semaphore->waitFor($task());
}
};

return Async\concurrently($promises($tasks, $this->semaphore)); //@phpstan-ignore-line
}
}
23 changes: 23 additions & 0 deletions src/Concurrency/PslDeferred.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<?php

declare(strict_types=1);

namespace Jenky\Atlas\Pool\Concurrency;

use Psl\Async;

final class PslDeferred implements Deferrable
{
public function defer(callable $callback): mixed
{
$defer = new Async\Deferred();

Async\Scheduler::defer(static function () use ($defer, $callback) {
$resolve = static fn (mixed $value) => $defer->complete($value);
$reject = static fn (\Throwable $e) => $defer->error($e);
$callback($resolve, $reject);
});

return $defer->getAwaitable()->await();
}
}
Loading

0 comments on commit 23a685d

Please sign in to comment.