Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
jenky committed Sep 21, 2023
1 parent 23a685d commit a0b9b78
Show file tree
Hide file tree
Showing 20 changed files with 247 additions and 245 deletions.
6 changes: 2 additions & 4 deletions src/Client/AsyncClientInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,13 @@

namespace Jenky\Atlas\Pool\Client;

use Jenky\Atlas\Pool\Concurrency\Driver;
use Psr\Http\Client\ClientInterface;

interface AsyncClientInterface extends ClientInterface
{
public const DRIVER_PSL = 1;
public const DRIVER_REACT = 2;

/**
* Get the underlying async driver type.
*/
public function driver(): int;
public function driver(): ?Driver;
}
9 changes: 5 additions & 4 deletions src/Client/AsyncClientTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,22 @@
namespace Jenky\Atlas\Pool\Client;

use Jenky\Atlas\Pool\Concurrency\Deferrable;
use Jenky\Atlas\Pool\Concurrency\Driver;
use Jenky\Atlas\Pool\Concurrency\PslDeferred;
use Jenky\Atlas\Pool\Concurrency\ReactDeferred;

trait AsyncClientTrait
{
abstract private function getDeferrable(): Deferrable;

public function driver(): int
public function driver(): ?Driver
{
$deferrable = $this->getDeferrable();

return match (true) {
$deferrable instanceof PslDeferred => AsyncClientInterface::DRIVER_PSL,
$deferrable instanceof ReactDeferred => AsyncClientInterface::DRIVER_REACT,
default => 0,
$deferrable instanceof PslDeferred => Driver::PSL,
$deferrable instanceof ReactDeferred => Driver::REACT,
default => null,
};
}
}
13 changes: 9 additions & 4 deletions src/Client/Factory.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
namespace Jenky\Atlas\Pool\Client;

use GuzzleHttp\ClientInterface as GuzzleClientInterface;
use Jenky\Atlas\Pool\Concurrency\Driver;
use Jenky\Atlas\Pool\Concurrency\DriverDiscovery;
use Jenky\Atlas\Pool\Concurrency\PslDeferred;
use Jenky\Atlas\Pool\Concurrency\ReactDeferred;
use Jenky\Atlas\Pool\Exception\UnsupportedClientException;
use Jenky\Atlas\Pool\Exception\UnsupportedFeatureException;
use Jenky\Atlas\Pool\Util;
use Psr\Http\Client\ClientInterface;
use Symfony\Component\HttpClient\Psr18Client;
use Symfony\Contracts\HttpClient\HttpClientInterface;
Expand All @@ -24,7 +25,9 @@ final class Factory
*/
public static function createAsyncClient(ClientInterface $client): AsyncClientInterface
{
if (Util::isPslInstalled()) {
$driver = DriverDiscovery::find();

if ($driver === Driver::PSL) {
if ($client instanceof GuzzleClientInterface) {
return new GuzzleClient(new PslDeferred(), $client);
}
Expand All @@ -39,7 +42,7 @@ public static function createAsyncClient(ClientInterface $client): AsyncClientIn
));
}

if (Util::isReactInstalled()) {
if ($driver === Driver::REACT) {
if ($client instanceof GuzzleClientInterface) {
return new GuzzleClient(new ReactDeferred(), $client);
}
Expand All @@ -58,7 +61,9 @@ public static function createAsyncClient(ClientInterface $client): AsyncClientIn
));
}

throw new UnsupportedFeatureException('You cannot use the concurrent request pool feature as the required packages are not installed.');
// @codeCoverageIgnoreStart
throw new UnsupportedFeatureException('You cannot use the concurrent request pool feature as the required packages are not installed.'); // @phpstan-ignore-line
// @codeCoverageIgnoreEnd
}

private static function getUnderlyingSymfonyHttpClient(Psr18Client $client): ?HttpClientInterface
Expand Down
2 changes: 1 addition & 1 deletion src/Client/GuzzleClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public function sendRequest(RequestInterface $request): ResponseInterface
RequestOptions::HTTP_ERRORS => false,
]);

return $this->deferred->defer(static function (callable $resolve, callable $reject) use ($promise) {
return $this->deferred->defer(static function (\Closure $resolve, \Closure $reject) use ($promise) {
$promise->then(
fn (ResponseInterface $response) => $resolve($response),
fn (\Throwable $e) => $reject($e)
Expand Down
5 changes: 3 additions & 2 deletions src/Client/ReactClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Jenky\Atlas\Pool\Client;

use Jenky\Atlas\Pool\Concurrency\Driver;
use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseInterface;
use React\Async;
Expand Down Expand Up @@ -32,8 +33,8 @@ public function sendRequest(RequestInterface $request): ResponseInterface
);
}

public function driver(): int
public function driver(): ?Driver
{
return self::DRIVER_REACT;
return Driver::REACT;
}
}
2 changes: 1 addition & 1 deletion src/Client/SymfonyClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public function sendRequest(RequestInterface $request): ResponseInterface

private function createResponse(SymfonyResponseInterface $response): mixed
{
return $this->deferred->defer(function (callable $resolve, callable $reject) use ($response) {
return $this->deferred->defer(function (\Closure $resolve, \Closure $reject) use ($response) {
try {
$resolve($this->convertToPsrResponse($response));
} catch (\Throwable $e) {
Expand Down
2 changes: 1 addition & 1 deletion src/Concurrency/Deferrable.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ interface Deferrable
/**
* @template T
*
* @param callable(\Closure(T), \Closure(\Throwable)) $callback
* @param callable(\Closure(T): void, \Closure(\Throwable): void): void $callback
* @return T
*/
public function defer(callable $callback): mixed;
Expand Down
11 changes: 11 additions & 0 deletions src/Concurrency/Driver.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<?php

declare(strict_types=1);

namespace Jenky\Atlas\Pool\Concurrency;

enum Driver
{
case PSL;
case REACT;
}
63 changes: 63 additions & 0 deletions src/Concurrency/DriverDiscovery.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
<?php

declare(strict_types=1);

namespace Jenky\Atlas\Pool\Concurrency;

use Clue\React\Mq\Queue;
use Psl\Async\Awaitable;

final class DriverDiscovery
{
private static ?Driver $cached = null;

private static ?Driver $prefer = null;

/**
* Find the appropriate async driver based on the installed package.
*
* @throws \RuntimeException
*/
public static function find(bool $cacheResult = true): Driver
{
if (self::$prefer !== null) {
return self::$prefer;
}

if ($cacheResult && self::$cached !== null) {
return self::$cached;
}

if (self::isPslInstalled()) {
$driver = Driver::PSL;
} elseif (self::isReactInstalled()) {
$driver = Driver::REACT;
} else {
throw new \RuntimeException('Unable to find async driver.');
}

if ($cacheResult) {
self::$cached = $driver;
}

return $driver;
}

/**
* Set the preferred async driver.
*/
public static function prefer(Driver $driver): void
{
self::$prefer = $driver;
}

public static function isReactInstalled(): bool
{
return \function_exists('React\\Async\\async') && \class_exists(Queue::class);
}

public static function isPslInstalled(): bool
{
return \class_exists(Awaitable::class);
}
}
55 changes: 55 additions & 0 deletions src/ConnectorPool.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
<?php

declare(strict_types=1);

namespace Jenky\Atlas\Pool;

use Jenky\Atlas\Contracts\ConnectorInterface;
use Jenky\Atlas\Pool\Client\AsyncClientInterface;
use Jenky\Atlas\Pool\Exception\InvalidPoolRequestException;
use Jenky\Atlas\Pool\Exception\UnsupportedClientException;
use Jenky\Atlas\Request;
use Jenky\Atlas\Response;

/**
* @implements PoolInterface<Request|callable(ConnectorInterface): Response, Response>
*/
final class ConnectorPool implements PoolInterface
{
use PoolTrait;

private AsyncClientInterface $client;

public function __construct(private readonly ConnectorInterface $connector)
{
$client = $connector->client();

if (! $client instanceof AsyncClientInterface) {
// @codeCoverageIgnoreStart
throw new UnsupportedClientException(\sprintf(
'The client %s is not supported. Please swap the underlying client to supported one.',
\get_debug_type($client)
));
// @codeCoverageIgnoreEnd
}

$this->client = $client;
}

public function send(iterable $requests): array
{
$promises = static function (ConnectorInterface $connector, iterable $requests) {
foreach ($requests as $key => $request) {
if ($request instanceof Request) {
yield $key => static fn (): Response => $connector->send($request);
} elseif (\is_callable($request)) {
yield $key => static fn (): Response => $request($connector);
} else {
throw new InvalidPoolRequestException(Request::class, Response::class);
}
}
};

return $this->getRunner($this->client)->run($promises($this->connector, $requests));
}
}
70 changes: 12 additions & 58 deletions src/Pool.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,84 +4,38 @@

namespace Jenky\Atlas\Pool;

use Jenky\Atlas\Contracts\ConnectorInterface;
use Jenky\Atlas\Pool\Client\AsyncClientInterface;
use Jenky\Atlas\Pool\Client\Factory;
use Jenky\Atlas\Pool\Concurrency\PslConcurrency;
use Jenky\Atlas\Pool\Concurrency\ReactConcurrency;
use Jenky\Atlas\Pool\Concurrency\Runner;
use Jenky\Atlas\Pool\Exception\InvalidPoolRequestException;
use Jenky\Atlas\Pool\Exception\UnsupportedClientException;
use Jenky\Atlas\Request;
use Jenky\Atlas\Response;
use Psr\Http\Message\RequestInterface;
use Psr\Http\Message\ResponseInterface;

/**
* @implements PoolInterface<Request|callable(ConnectorInterface): Response, Response>
* @implements PoolInterface<RequestInterface|callable(AsyncClientInterface): ResponseInterface, ResponseInterface>
*/
final class Pool implements PoolInterface
{
use PoolTrait;

private AsyncClientInterface $client;

public function __construct(private readonly ConnectorInterface $connector)
public function __construct(private readonly AsyncClientInterface $client)
{
$client = $connector->client();

if (! $client instanceof AsyncClientInterface) {
// @codeCoverageIgnoreStart
throw new UnsupportedClientException(\sprintf(
'The client %s is not supported. Please swap the underlying client to supported one.',
\get_debug_type($client)
));
// @codeCoverageIgnoreEnd
}

$this->client = $client;
}

public function send(iterable $requests): array
{
$promises = static function (ConnectorInterface $connector, iterable $requests) {
$promises = static function (AsyncClientInterface $client, iterable $requests) {
foreach ($requests as $key => $request) {
if ($request instanceof Request) {
yield $key => static fn (): Response => $connector->send($request);
if ($request instanceof RequestInterface) {
yield $key => static fn (): ResponseInterface => $client->sendRequest($request);
} elseif (\is_callable($request)) {
yield $key => static fn (): Response => $request($connector);
yield $key => static fn (): ResponseInterface => $request($client);
} else {
throw new InvalidPoolRequestException(Request::class, Response::class);
throw new InvalidPoolRequestException(RequestInterface::class, ResponseInterface::class);
}
}
};

return $this->getRunner($this->client)->run($promises($this->connector, $requests));
}

private function getRunner(AsyncClientInterface $client): Runner
{
$driver = $client->driver();

return match (true) {
$driver === AsyncClientInterface::DRIVER_PSL => new PslConcurrency($this->concurrency),
$driver === AsyncClientInterface::DRIVER_REACT => new ReactConcurrency($this->concurrency),
default => throw new \RuntimeException('You cannot use the concurrent request pool feature as the required packages are not installed.'),
};
}

public static function create(ConnectorInterface $connector): self
{
$client = $connector->client();

if (! $client instanceof AsyncClientInterface) {
if (! \method_exists($connector, 'withClient')) {
// @codeCoverageIgnoreStart
throw new \LogicException('Unable to swap the underlying client of connector '.\get_debug_type($connector));
// @codeCoverageIgnoreEnd
}

$connector = $connector->withClient(Factory::createAsyncClient($client));
}

return new self($connector);
return $this->getRunner($this->client)->run(
$promises($this->client, $requests)
);
}
}
Loading

0 comments on commit a0b9b78

Please sign in to comment.