From f16ed73033ab683adda13c83cedac7ed0451d64a Mon Sep 17 00:00:00 2001 From: Lynh Date: Tue, 26 Sep 2023 15:31:43 +0700 Subject: [PATCH] Rewrite and moving to org (#6) * Rewrite new version * Refactor * Rename * Tweak * Rename property * Update composer.json * Rename package * Update readme * Update composer.json * Update readme * Use static closure * Delay for async client * Use real address to test ReactClient * Minor changes * Applies CS fixes * Add callback operation * Driver discovery test * Update readme * Update driver * Ignore coverage * Rename * Add docblock * Update readme * Rename * Change generic type * Update readme * Improve delay * Delayable * Update readme --------- Co-authored-by: jenky --- README.md | 178 ++++++++++++++++-- composer.json | 26 +-- src/Client/AsyncClientFactory.php | 102 ++++++++++ src/Client/AsyncClientInterface.php | 16 ++ src/Client/AsyncClientTrait.php | 26 +++ src/Client/DelayTrait.php | 33 ++++ src/Client/Delayable.php | 16 ++ src/{React => Client}/GuzzleClient.php | 34 ++-- .../Client.php => Client/ReactClient.php} | 20 +- .../SymfonyClient.php} | 57 +++++- src/ClientPool.php | 41 ++++ src/Concurrency/Deferrable.php | 20 ++ src/Concurrency/Driver.php | 11 ++ src/Concurrency/DriverDiscovery.php | 78 ++++++++ src/Concurrency/PslDeferred.php | 24 +++ src/Concurrency/PslWorker.php | 41 ++++ src/Concurrency/ReactDeferred.php | 34 ++++ src/Concurrency/ReactWorker.php | 42 +++++ src/Concurrency/Worker.php | 18 ++ src/ConnectorPool.php | 61 ++++++ src/Exception/InvalidPoolRequestException.php | 19 ++ src/Exception/RequestException.php | 11 -- src/Exception/UnsupportedClientException.php | 2 +- src/Exception/UnsupportedFeatureException.php | 2 +- src/Pool.php | 27 +++ src/PoolFactory.php | 171 +++-------------- src/PoolTrait.php | 26 ++- src/Psl/AsyncClientInterface.php | 11 -- src/Psl/GuzzleClient.php | 42 ----- src/Psl/Pool.php | 52 ----- src/Psl/SymfonyClient.php | 50 ----- src/React/AsyncClientInterface.php | 11 -- src/React/Pool.php | 53 ------ src/React/SymfonyClient.php | 57 ------ tests/AkamaiTileRequest.php | 2 +- tests/DelayTest.php | 124 ++++++++++++ tests/DummyRequest.php | 2 +- tests/PoolTest.php | 120 ++++++------ tests/PslPoolTest.php | 27 +-- tests/ReactPoolTest.php | 27 +-- tests/TestCase.php | 34 ++-- tests/TestRequestTrait.php | 24 +++ 42 files changed, 1169 insertions(+), 603 deletions(-) create mode 100644 src/Client/AsyncClientFactory.php create mode 100644 src/Client/AsyncClientInterface.php create mode 100644 src/Client/AsyncClientTrait.php create mode 100644 src/Client/DelayTrait.php create mode 100644 src/Client/Delayable.php rename src/{React => Client}/GuzzleClient.php (52%) rename src/{React/Client.php => Client/ReactClient.php} (66%) rename src/{SymfonyClientTrait.php => Client/SymfonyClient.php} (62%) create mode 100644 src/ClientPool.php create mode 100644 src/Concurrency/Deferrable.php create mode 100644 src/Concurrency/Driver.php create mode 100644 src/Concurrency/DriverDiscovery.php create mode 100644 src/Concurrency/PslDeferred.php create mode 100644 src/Concurrency/PslWorker.php create mode 100644 src/Concurrency/ReactDeferred.php create mode 100644 src/Concurrency/ReactWorker.php create mode 100644 src/Concurrency/Worker.php create mode 100644 src/ConnectorPool.php create mode 100644 src/Exception/InvalidPoolRequestException.php delete mode 100644 src/Exception/RequestException.php create mode 100644 src/Pool.php delete mode 100644 src/Psl/AsyncClientInterface.php delete mode 100644 src/Psl/GuzzleClient.php delete mode 100644 src/Psl/Pool.php delete mode 100644 src/Psl/SymfonyClient.php delete mode 100644 src/React/AsyncClientInterface.php delete mode 100644 src/React/Pool.php delete mode 100644 src/React/SymfonyClient.php create mode 100644 tests/DelayTest.php create mode 100644 tests/TestRequestTrait.php diff --git a/README.md b/README.md index 2cd6e69..2c15580 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,5 @@ -# Atlas Pool +# Peak [![Latest Version on Packagist][ico-version]][link-packagist] [![Github Actions][ico-gh-actions]][link-gh-actions] @@ -7,19 +7,163 @@ [![Total Downloads][ico-downloads]][link-downloads] [![Software License][ico-license]](LICENSE.md) -A powerful tool that allows you to send multiple HTTP requests simultaneously. +A simple and efficient solution for concurrently sending HTTP requests using PSR-18 client implementations. + +Peak is a library that enables concurrent request sending using a request pool. It leverages the event loop of [ReactPHP](https://github.com/reactphp) or [PSL](https://github.com/azjezz/psl) to handle and manage the requests concurrently. + +## Requirements + +- PHP 8.1 or higher. +- A package that supports non-block I/O using Fibers under the hood (now refer as **driver**). ## Installation You can install the package via composer: ```bash -composer require jenky/atlas-pool +composer require fansipan/peak +``` + +Additionally, depending on your choice of driver, these packages may also need to be installed. + +### PSL + +```bash +composer require azjezz/psl +``` + +### ReactPHP + +```bash +composer require clue/mq-react react/async ``` ## Usage -See the [documentation](https://jenky.github.io/atlas) for detailed installation and usage instructions. +### Create Request Pool + +Typical applications would use the `PoolFactory` class to create a pool. + +```php +use Fansipan\Peak\PoolFactory; + +/** @var \Psr\Http\Client\ClientInterface $client */ +$pool = PoolFactory::createForClient($client); +``` + +It will attempt to create async version of the client using `AsyncClientFactory`. The supported clients are [Guzzle](https://github.com/guzzle/guzzle) and [Symfony HTTPClient](https://github.com/symfony/http-client) ([`Psr18Client`](https://symfony.com/doc/current/http_client.html#psr-18-and-psr-17)) except for [ReactPHP driver](#reactphp). + +> You can use any PSR-18 client implementations with ReactPHP driver. If an unsupported client is used, it will be replaced with the [`Browser`](https://github.com/reactphp/http#browser) HTTP client. + +The `Fansipan\Peak\PoolFactory` provides a configured request pool based on the installed packages, which is suitable for most cases. However, if desired, you can specify a particular implementation if it is available on your platform and/or in your application. + +First, you need to create your desired driver: + +```php +use Fansipan\Peak\Concurrency\PslDeferred; +use Fansipan\Peak\Concurrency\ReactDeferred; + +// PSL +$defer = new PslDeferred(); + +// ReactPHP +$defer = new ReactDeferred(); +``` + +Then create an asynchronous client, which is essentially a decorator for the PSR-18 client: + +```php +use Fansipan\Peak\Client\GuzzleClient; +use Fansipan\Peak\Client\SymfonyClient; +use Fansipan\Peak\ClientPool; + +// Guzzle + +$asyncClient = new GuzzleClient($defer); +// or using existing Guzzle client +/** @var \GuzzleHttp\ClientInterface $client */ +$asyncClient = new GuzzleClient($defer, $client); + +// Symfony HTTP Client + +$asyncClient = new SymfonyClient($defer); +// or using existing Symfony client +/** @var \Symfony\Contracts\HttpClient\HttpClientInterface $client */ +$asyncClient = new SymfonyClient($defer, $client); + + +$pool = new ClientPool($asyncClient); +``` + +### Sending Requests + +The `send` method accepts an iterator of PSR-7 requests or closures/invokable class which receive an `Psr\Http\Client\ClientInterface` instance. + +```php +use Psr\Http\Client\ClientInterface; +use Psr\Http\Message\RequestInterface; +use Psr\Http\Message\ResponseInterface; + +// Using array +$responses = $pool->send([ + $psr7Request, + fn (ClientInterface $client): ResponseInterface => $client->sendRequest($psr7Request), +]); + +var_dump($responses[0]); +var_dump($responses[1]); + +// Using generator when you have an indeterminate amount of requests you wish to send +$requests = static function (int $total) { + for ($i = 0; $i < $total; $i++) { + yield $psr7Request; + } +} +$responses = $pool->send($requests(100)); +``` + +### Retrieving Responses + +As you can see from the example above, each response instance can be accessed using an index. However, the response order is not guaranteed. If you wish, you can assign names to the requests to easily track the specific requests that have been sent. This allows you to access the corresponding responses by their assigned names. + +```php +use Psr\Http\Client\ClientInterface; +use Psr\Http\Message\RequestInterface; +use Psr\Http\Message\ResponseInterface; + +$responses = $pool->send([ + 'first' => $psr7Request, + 'second' => fn (ClientInterface $client): ResponseInterface => $client->sendRequest($psr7Request), +]); + +// Or using generator + +$requests = function (): \Generator { + yield 'first' => $psr7Request; + yield 'second' => fn (ClientInterface $client): ResponseInterface => $client->sendRequest($psr7Request); +}; + +$responses = $pool->send($requests()); + +var_dump($responses['first']); +var_dump($responses['second']); +``` + +### Concurrency Limit + +Sending an excessive number of requests may either take up all resources on your side or it may even get you banned by the remote side if it sees an unreasonable number of requests from your side. + +As a consequence, it's usually recommended to limit concurrency on the sending side to a reasonable value. It's common to use a rather small limit, as doing more than a dozen of things at once may easily overwhelm the receiving side. + +You can use `concurrent` method to set the maximum number of requests to send concurrently. The default value is `25`. + +```php +$response = $pool + ->concurrent(10) // Process up to 10 requests concurrently + ->send($requests); +``` + +Additional requests that exceed the concurrency limit will automatically be enqueued until one of the pending requests completes. ## Testing @@ -37,7 +181,7 @@ Please see [CONTRIBUTING](CONTRIBUTING.md) and [CODE_OF_CONDUCT](CODE_OF_CONDUCT ## Security -If you discover any security related issues, please email jenky.w0w@gmail.com instead of using the issue tracker. +If you discover any security related issues, please email contact@lynh.me instead of using the issue tracker. ## Credits @@ -48,20 +192,14 @@ If you discover any security related issues, please email jenky.w0w@gmail.com in The MIT License (MIT). Please see [License File](LICENSE.md) for more information. -[ico-version]: https://img.shields.io/packagist/v/jenky/atlas-pool.svg?style=for-the-badge +[ico-version]: https://img.shields.io/packagist/v/fansipan/peak.svg?style=for-the-badge [ico-license]: https://img.shields.io/badge/license-MIT-brightgreen.svg?style=for-the-badge -[ico-travis]: https://img.shields.io/travis/jenky/atlas-pool/master.svg?style=for-the-badge -[ico-scrutinizer]: https://img.shields.io/scrutinizer/coverage/g/jenky/atlas-pool.svg?style=for-the-badge -[ico-code-quality]: https://img.shields.io/scrutinizer/g/jenky/atlas-pool.svg?style=for-the-badge -[ico-gh-actions]: https://img.shields.io/github/actions/workflow/status/jenky/atlas-pool/testing.yml?branch=main&label=actions&logo=github&style=for-the-badge -[ico-codecov]: https://img.shields.io/codecov/c/github/jenky/atlas-pool?logo=codecov&style=for-the-badge -[ico-downloads]: https://img.shields.io/packagist/dt/jenky/atlas-pool.svg?style=for-the-badge - -[link-packagist]: https://packagist.org/packages/jenky/atlas-pool -[link-travis]: https://travis-ci.org/jenky/atlas-pool -[link-scrutinizer]: https://scrutinizer-ci.com/g/jenky/atlas-pool/code-structure -[link-code-quality]: https://scrutinizer-ci.com/g/jenky/atlas-pool -[link-gh-actions]: https://github.com/jenky/atlas-pool -[link-codecov]: https://codecov.io/gh/jenky/atlas-pool -[link-downloads]: https://packagist.org/packages/jenky/atlas-pool +[ico-gh-actions]: https://img.shields.io/github/actions/workflow/status/phanxipang/peak/testing.yml?branch=main&label=actions&logo=github&style=for-the-badge +[ico-codecov]: https://img.shields.io/codecov/c/github/phanxipang/peak?logo=codecov&style=for-the-badge +[ico-downloads]: https://img.shields.io/packagist/dt/fansipan/peak.svg?style=for-the-badge + +[link-packagist]: https://packagist.org/packages/phanxipang/peak +[link-gh-actions]: https://github.com/phanxipang/peak +[link-codecov]: https://codecov.io/gh/phanxipang/peak +[link-downloads]: https://packagist.org/packages/fansipan/peak diff --git a/composer.json b/composer.json index c096858..0b57dce 100644 --- a/composer.json +++ b/composer.json @@ -1,16 +1,18 @@ { - "name": "jenky/atlas-pool", - "description": "Send concurrent requests for Atlas", + "name": "fansipan/peak", + "description": "A simple and efficient solution for concurrently sending HTTP requests using PSR-18 client implementations.", "keywords": [ - "jenky", - "atlas", + "concurrently", "pool", + "http", + "request", + "response", "concurrent-requests", "parallel-requests", "async", "await" ], - "homepage": "https://github.com/jenky/atlas-pool", + "homepage": "https://github.com/phanxipang/peak", "license": "MIT", "authors": [ { @@ -21,29 +23,31 @@ ], "require": { "php": "^8.1", - "jenky/atlas": "^0.5", - "jenky/concurrency": "^1.0" + "psr/http-client": "^1.0", + "psr/http-factory": "^1.0" }, "require-dev": { "azjezz/psl": "^2.7", "clue/mq-react": "^1.6", + "fansipan/mock-client": "^1.0", "friendsofphp/php-cs-fixer": "^3.15", "guzzlehttp/guzzle": "^7.5", - "jenky/atlas-mock-client": "^1.0", + "jenky/atlas": "^0.5", "phpstan/phpstan": "^1.10", "phpunit/phpunit": "^9.0", "react/async": "^4.1", "react/http": "^1.9", - "symfony/http-client": "^6.3" + "symfony/http-client": "^6.3", + "symfony/var-dumper": "^6.3" }, "autoload": { "psr-4": { - "Jenky\\Atlas\\Pool\\": "src" + "Fansipan\\Peak\\": "src" } }, "autoload-dev": { "psr-4": { - "Jenky\\Atlas\\Pool\\Tests\\": "tests" + "Fansipan\\Peak\\Tests\\": "tests" } }, "scripts": { diff --git a/src/Client/AsyncClientFactory.php b/src/Client/AsyncClientFactory.php new file mode 100644 index 0000000..88d4bb7 --- /dev/null +++ b/src/Client/AsyncClientFactory.php @@ -0,0 +1,102 @@ +setAccessible(true); + + return $reflectionProperty->getValue($client); + // @codeCoverageIgnoreStart + } catch (\Throwable) { + return null; + } + // @codeCoverageIgnoreEnd + } +} diff --git a/src/Client/AsyncClientInterface.php b/src/Client/AsyncClientInterface.php new file mode 100644 index 0000000..455532e --- /dev/null +++ b/src/Client/AsyncClientInterface.php @@ -0,0 +1,16 @@ +getDeferrable(); + + return match (true) { + $deferrable instanceof PslDeferred => Driver::PSL, + $deferrable instanceof ReactDeferred => Driver::REACT, + default => null, + }; + } +} diff --git a/src/Client/DelayTrait.php b/src/Client/DelayTrait.php new file mode 100644 index 0000000..bcf5373 --- /dev/null +++ b/src/Client/DelayTrait.php @@ -0,0 +1,33 @@ + + */ + private int $delay = 0; + + public function delay(int $milliseconds): void + { + if ($milliseconds < 0) { + throw new \ValueError('Delay must be positive, got '.$milliseconds); + } + + $this->delay = $milliseconds; + } + + private function getDelayAsSeconds(): float + { + if ($this->delay <= 0) { + return 0; + } + + return $this->delay / 1000; + } +} diff --git a/src/Client/Delayable.php b/src/Client/Delayable.php new file mode 100644 index 0000000..f0f29a6 --- /dev/null +++ b/src/Client/Delayable.php @@ -0,0 +1,16 @@ + $milliseconds + */ + public function delay(int $milliseconds): void; +} diff --git a/src/React/GuzzleClient.php b/src/Client/GuzzleClient.php similarity index 52% rename from src/React/GuzzleClient.php rename to src/Client/GuzzleClient.php index bec0a29..73b8555 100644 --- a/src/React/GuzzleClient.php +++ b/src/Client/GuzzleClient.php @@ -2,30 +2,27 @@ declare(strict_types=1); -namespace Jenky\Atlas\Pool\React; +namespace Fansipan\Peak\Client; +use Fansipan\Peak\Concurrency\Deferrable; use GuzzleHttp\Client; use GuzzleHttp\ClientInterface; use GuzzleHttp\RequestOptions; use Psr\Http\Message\RequestInterface; use Psr\Http\Message\ResponseInterface; -use React\Async; -use React\EventLoop\Loop; -use React\EventLoop\LoopInterface; -use React\Promise\Deferred; -final class GuzzleClient implements AsyncClientInterface +final class GuzzleClient implements AsyncClientInterface, Delayable { - private ClientInterface $client; + use AsyncClientTrait; + use DelayTrait; - private LoopInterface $loop; + private ClientInterface $client; public function __construct( + private readonly Deferrable $deferred, ?ClientInterface $client = null, - ?LoopInterface $loop = null ) { $this->client = $client ?? new Client(); - $this->loop = $loop ?? Loop::get(); } public function sendRequest(RequestInterface $request): ResponseInterface @@ -35,15 +32,20 @@ public function sendRequest(RequestInterface $request): ResponseInterface RequestOptions::HTTP_ERRORS => false, ]); - $defer = new Deferred(); + $delay = $this->getDelayAsSeconds(); + + $this->delay = 0; - $this->loop->futureTick(static function () use ($defer, $promise) { + return $this->deferred->defer(static function (\Closure $resolve, \Closure $reject) use ($promise) { $promise->then( - fn (ResponseInterface $response) => $defer->resolve($response), - fn (\Throwable $e) => $defer->reject($e) + static fn (ResponseInterface $response) => $resolve($response), + static fn (\Throwable $e) => $reject($e) )->wait(); - }); + }, $delay); + } - return Async\await($defer->promise()); + private function getDeferrable(): Deferrable + { + return $this->deferred; } } diff --git a/src/React/Client.php b/src/Client/ReactClient.php similarity index 66% rename from src/React/Client.php rename to src/Client/ReactClient.php index 21b1cc5..b66cd15 100644 --- a/src/React/Client.php +++ b/src/Client/ReactClient.php @@ -2,15 +2,18 @@ declare(strict_types=1); -namespace Jenky\Atlas\Pool\React; +namespace Fansipan\Peak\Client; +use Fansipan\Peak\Concurrency\Driver; use Psr\Http\Message\RequestInterface; use Psr\Http\Message\ResponseInterface; use React\Async; use React\Http\Browser; -final class Client implements AsyncClientInterface +final class ReactClient implements AsyncClientInterface, Delayable { + use DelayTrait; + private Browser $browser; public function __construct(?Browser $browser = null) @@ -22,6 +25,14 @@ public function __construct(?Browser $browser = null) public function sendRequest(RequestInterface $request): ResponseInterface { + $delay = $this->getDelayAsSeconds(); + + if ($delay > 0) { + Async\delay($delay); + } + + $this->delay = 0; + return Async\await( $this->browser->request( $request->getMethod(), @@ -31,4 +42,9 @@ public function sendRequest(RequestInterface $request): ResponseInterface ) ); } + + public function driver(): ?Driver + { + return Driver::REACT; + } } diff --git a/src/SymfonyClientTrait.php b/src/Client/SymfonyClient.php similarity index 62% rename from src/SymfonyClientTrait.php rename to src/Client/SymfonyClient.php index e3f9a99..4baf961 100644 --- a/src/SymfonyClientTrait.php +++ b/src/Client/SymfonyClient.php @@ -2,24 +2,45 @@ declare(strict_types=1); -namespace Jenky\Atlas\Pool; +namespace Fansipan\Peak\Client; +use Fansipan\Peak\Concurrency\Deferrable; +use Http\Discovery\Psr17FactoryDiscovery; use Jenky\Atlas\Exception\NetworkException; -use Jenky\Atlas\Pool\Exception\RequestException; +use Jenky\Atlas\Exception\RequestException; use Psr\Http\Message\RequestInterface; +use Psr\Http\Message\ResponseFactoryInterface; use Psr\Http\Message\ResponseInterface; +use Psr\Http\Message\StreamFactoryInterface; +use Symfony\Component\HttpClient\HttpClient; use Symfony\Component\HttpClient\Response\StreamableInterface; use Symfony\Component\HttpClient\Response\StreamWrapper; use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface; +use Symfony\Contracts\HttpClient\HttpClientInterface; use Symfony\Contracts\HttpClient\ResponseInterface as SymfonyResponseInterface; use Symfony\Contracts\Service\ResetInterface; -trait SymfonyClientTrait +final class SymfonyClient implements AsyncClientInterface, ResetInterface, Delayable { - /** - * @param \Closure(): ResponseInterface $response - */ - abstract private function createResponse(\Closure $response): mixed; + use AsyncClientTrait; + use DelayTrait; + + private HttpClientInterface $client; + + private ResponseFactoryInterface $responseFactory; + + private StreamFactoryInterface $streamFactory; + + public function __construct( + private readonly Deferrable $deferred, + ?HttpClientInterface $client = null, + ?ResponseFactoryInterface $responseFactory = null, + ?StreamFactoryInterface $streamFactory = null, + ) { + $this->client = $client ?? HttpClient::create(); + $this->responseFactory = $responseFactory ?? Psr17FactoryDiscovery::findResponseFactory(); + $this->streamFactory = $streamFactory ?? Psr17FactoryDiscovery::findStreamFactory(); + } public function sendRequest(RequestInterface $request): ResponseInterface { @@ -41,7 +62,7 @@ public function sendRequest(RequestInterface $request): ResponseInterface $response = $this->client->request($request->getMethod(), (string) $request->getUri(), $options); - return $this->createResponse(fn () => $this->convertToPsrResponse($response)); + return $this->createResponse($response); // @codeCoverageIgnoreStart } catch (TransportExceptionInterface $e) { if ($e instanceof \InvalidArgumentException) { @@ -53,6 +74,21 @@ public function sendRequest(RequestInterface $request): ResponseInterface // @codeCoverageIgnoreEnd } + private function createResponse(SymfonyResponseInterface $response): mixed + { + $delay = $this->getDelayAsSeconds(); + + $this->delay = 0; + + return $this->deferred->defer(function (\Closure $resolve, \Closure $reject) use ($response) { + try { + $resolve($this->convertToPsrResponse($response)); + } catch (\Throwable $e) { + $reject($e); + } + }, $delay); + } + private function convertToPsrResponse(SymfonyResponseInterface $response): ResponseInterface { $psrResponse = $this->responseFactory->createResponse($response->getStatusCode()); @@ -83,6 +119,11 @@ private function convertToPsrResponse(SymfonyResponseInterface $response): Respo return $psrResponse->withBody($body); } + private function getDeferrable(): Deferrable + { + return $this->deferred; + } + /** * @codeCoverageIgnore */ diff --git a/src/ClientPool.php b/src/ClientPool.php new file mode 100644 index 0000000..43de1bd --- /dev/null +++ b/src/ClientPool.php @@ -0,0 +1,41 @@ + + */ +final class ClientPool implements Pool +{ + use PoolTrait; + + public function __construct(private readonly AsyncClientInterface $client) + { + } + + public function send(iterable $requests): array + { + $promises = static function (AsyncClientInterface $client, iterable $requests) { + foreach ($requests as $key => $request) { + if ($request instanceof RequestInterface) { + yield $key => static fn (): ResponseInterface => $client->sendRequest($request); + } elseif (\is_callable($request)) { + yield $key => static fn (): ResponseInterface => $request($client); + } else { + throw new InvalidPoolRequestException(RequestInterface::class, ResponseInterface::class); + } + } + }; + + return $this->createWorker($this->client)->run( + $promises($this->client, $requests) + ); + } +} diff --git a/src/Concurrency/Deferrable.php b/src/Concurrency/Deferrable.php new file mode 100644 index 0000000..4ace343 --- /dev/null +++ b/src/Concurrency/Deferrable.php @@ -0,0 +1,20 @@ + $delay + * @return T + */ + public function defer(callable $callback, float $delay = 0): mixed; +} diff --git a/src/Concurrency/Driver.php b/src/Concurrency/Driver.php new file mode 100644 index 0000000..ee56d65 --- /dev/null +++ b/src/Concurrency/Driver.php @@ -0,0 +1,11 @@ + self::isPslInstalled(), + Driver::REACT => self::isReactInstalled(), + }; + + if (! $check) { + // @codeCoverageIgnoreStart + throw new \InvalidArgumentException(\sprintf( + 'You cannot use the driver %s as required packages are not installed. Try running "composer require %s"', + $driver->name, + $driver->value + )); + // @codeCoverageIgnoreEnd + } + + self::$preferred = $driver; + } + + public static function isReactInstalled(): bool + { + return \function_exists('React\\Async\\async') && \class_exists(Queue::class); + } + + public static function isPslInstalled(): bool + { + return \class_exists(Awaitable::class); + } +} diff --git a/src/Concurrency/PslDeferred.php b/src/Concurrency/PslDeferred.php new file mode 100644 index 0000000..534bb49 --- /dev/null +++ b/src/Concurrency/PslDeferred.php @@ -0,0 +1,24 @@ + $defer->complete($value); + $reject = static fn (\Throwable $e) => $defer->error($e); + + Async\Scheduler::delay($delay, static fn () => $callback($resolve, $reject)); + }); + + return $defer->getAwaitable()->await(); + } +} diff --git a/src/Concurrency/PslWorker.php b/src/Concurrency/PslWorker.php new file mode 100644 index 0000000..363bcdf --- /dev/null +++ b/src/Concurrency/PslWorker.php @@ -0,0 +1,41 @@ + $limit + */ + public function __construct(int $limit = 10) + { + if ($limit < 1) { + throw new \ValueError('Argument #1 ($limit) must be positive, got '.$limit); + } + + $this->semaphore = new Async\Semaphore( + $limit, static fn ($value) => $value + ); + } + + public function run(iterable $tasks): array + { + $promises = static function (iterable $tasks, Async\Semaphore $semaphore) { + foreach ($tasks as $key => $task) { + if (! \is_callable($task)) { + continue; + } + + yield $key => static fn () => $semaphore->waitFor($task()); + } + }; + + return Async\concurrently($promises($tasks, $this->semaphore)); //@phpstan-ignore-line + } +} diff --git a/src/Concurrency/ReactDeferred.php b/src/Concurrency/ReactDeferred.php new file mode 100644 index 0000000..32ff2fe --- /dev/null +++ b/src/Concurrency/ReactDeferred.php @@ -0,0 +1,34 @@ +loop = $loop ?: Loop::get(); + } + + public function defer(callable $callback, float $delay = 0): mixed + { + $defer = new Deferred(); + + $this->loop->futureTick(function () use ($defer, $callback, $delay) { + $resolve = static fn (mixed $value) => $defer->resolve($value); + $reject = static fn (\Throwable $e) => $defer->reject($e); + + $this->loop->addTimer($delay, static fn () => $callback($resolve, $reject)); + }); + + return Async\await($defer->promise()); + } +} diff --git a/src/Concurrency/ReactWorker.php b/src/Concurrency/ReactWorker.php new file mode 100644 index 0000000..93667e2 --- /dev/null +++ b/src/Concurrency/ReactWorker.php @@ -0,0 +1,42 @@ + $limit + */ + public function __construct(int $limit = 10) + { + if ($limit < 1) { + throw new \ValueError('Argument #1 ($limit) must be positive, got '.$limit); + } + + $this->queue = new Queue( + $limit, null, static fn (\Closure $cb) => Async\async($cb)() + ); + } + + public function run(iterable $tasks): array + { + $promises = static function (iterable $tasks, Queue $queue) { + foreach ($tasks as $key => $task) { + if (! \is_callable($task)) { + continue; + } + + yield $key => static fn () => $queue($task); + } + }; + + return Async\await(Async\parallel($promises($tasks, $this->queue))); + } +} diff --git a/src/Concurrency/Worker.php b/src/Concurrency/Worker.php new file mode 100644 index 0000000..9310cdf --- /dev/null +++ b/src/Concurrency/Worker.php @@ -0,0 +1,18 @@ + $tasks + * @return array + */ + public function run(iterable $tasks): array; +} diff --git a/src/ConnectorPool.php b/src/ConnectorPool.php new file mode 100644 index 0000000..32de5d4 --- /dev/null +++ b/src/ConnectorPool.php @@ -0,0 +1,61 @@ + + */ +final class ConnectorPool implements Pool +{ + use PoolTrait; + + private AsyncClientInterface $client; + + public function __construct(private readonly ConnectorInterface $connector) + { + $client = $connector->client(); + + if (! $client instanceof AsyncClientInterface) { + // @codeCoverageIgnoreStart + throw new UnsupportedClientException(\sprintf( + 'The client %s is not supported. Please swap the underlying client to supported one.', + \get_debug_type($client) + )); + // @codeCoverageIgnoreEnd + } + + $this->client = $client; + } + + public function send(iterable $requests): array + { + $promises = static function (ConnectorInterface $connector, iterable $requests) { + foreach ($requests as $key => $request) { + if ($request instanceof Request) { + yield $key => static fn (): Response => $connector->send($request); + } elseif (\is_callable($request)) { + yield $key => static fn (): Response => $request($connector); + } else { + throw new InvalidPoolRequestException(Request::class, Response::class); + } + } + }; + + return $this->createWorker($this->client)->run($promises($this->connector, $requests)); + } +} diff --git a/src/Exception/InvalidPoolRequestException.php b/src/Exception/InvalidPoolRequestException.php new file mode 100644 index 0000000..2511f94 --- /dev/null +++ b/src/Exception/InvalidPoolRequestException.php @@ -0,0 +1,19 @@ + $requests + * @return array + */ + public function send(iterable $requests): array; +} diff --git a/src/PoolFactory.php b/src/PoolFactory.php index da7b055..1bb5e5b 100644 --- a/src/PoolFactory.php +++ b/src/PoolFactory.php @@ -2,173 +2,46 @@ declare(strict_types=1); -namespace Jenky\Atlas\Pool; +namespace Fansipan\Peak; -use Clue\React\Mq\Queue; -use GuzzleHttp\ClientInterface; +use Fansipan\Peak\Client\AsyncClientFactory; +use Fansipan\Peak\Client\AsyncClientInterface; use Jenky\Atlas\Contracts\ConnectorInterface; -use Jenky\Atlas\Pool\Exception\UnsupportedClientException; -use Jenky\Atlas\Pool\Exception\UnsupportedFeatureException; -use Jenky\Concurrency\PoolInterface; -use Psl\Async\Awaitable; -use React\Http\Browser; -use Symfony\Component\HttpClient\Psr18Client; -use Symfony\Contracts\HttpClient\HttpClientInterface; +use Psr\Http\Client\ClientInterface; -final class PoolFactory +class PoolFactory { /** - * @var array - */ - private static array $candidates = []; - - public function __construct() - { - if (! empty(self::$candidates)) { - return; - } - - self::$candidates[] = fn (ConnectorInterface $connector) => $this->createPoolByClientType($connector); - - if ($this->isPslInstalled()) { - self::$candidates[] = fn (ConnectorInterface $connector) => $this->createPslPool($connector); - } - - if ($this->isReactInstalled()) { - self::$candidates[] = fn (ConnectorInterface $connector) => $this->createReactPool($connector); - } - } - - /** - * Create a new pool instance for given connector. + * Create a new pool for the given client. * - * @throws UnsupportedClientException - * @throws UnsupportedFeatureException + * @throws \Fansipan\Peak\Exception\UnsupportedClientException + * @throws \Fansipan\Peak\Exception\UnsupportedFeatureException */ - public static function create(ConnectorInterface $connector): PoolInterface + public static function createFromClient(ClientInterface $client): Pool { - return (new self())->createPool($connector); + return new ClientPool(AsyncClientFactory::create($client)); } /** - * Create a new pool instance for given connector. + * Create a new pool for the given connector. * - * @throws UnsupportedClientException - * @throws UnsupportedFeatureException + * @throws \Fansipan\Peak\Exception\UnsupportedClientException + * @throws \Fansipan\Peak\Exception\UnsupportedFeatureException */ - public function createPool(ConnectorInterface $connector): PoolInterface - { - foreach (self::$candidates as $callback) { - try { - return $callback($connector); - } catch (\Throwable $e) { - if ($e instanceof UnsupportedClientException) { - throw $e; - } - - continue; - } - } - - throw new UnsupportedFeatureException('You cannot use the pool feature as the required packages are not installed.'); - } - - /** - * @throws \LogicException - */ - private function assertConnector(ConnectorInterface $connector): void - { - if (! \method_exists($connector, 'withClient')) { - // @codeCoverageIgnoreStart - throw new \LogicException('Unable to swap the underlying client of connector '.get_debug_type($connector)); - // @codeCoverageIgnoreEnd - } - } - - private function getUnderlyingSymfonyHttpClient(Psr18Client $client): ?HttpClientInterface - { - try { - $reflectionProperty = new \ReflectionProperty($client, 'client'); - $reflectionProperty->setAccessible(true); - - return $reflectionProperty->getValue($client); - // @codeCoverageIgnoreStart - } catch (\Throwable) { - return null; - } - // @codeCoverageIgnoreEnd - } - - private function createPoolByClientType(ConnectorInterface $connector): PoolInterface - { - $client = $connector->client(); - - return match (true) { - $this->isReactInstalled() && $client instanceof React\AsyncClientInterface => $this->createReactPool($connector), - $this->isPslInstalled() && $client instanceof Psl\AsyncClientInterface => $this->createPslPool($connector), - default => throw new \Exception('Unsupported client. Swap client and retry') - }; - } - - private function isReactInstalled(): bool - { - return \function_exists('React\\Async\\async') && \class_exists(Queue::class); - } - - private function isPslInstalled(): bool - { - return \class_exists(Awaitable::class); - } - - private function createReactPool(ConnectorInterface $connector): React\Pool + public static function createFromConnector(ConnectorInterface $connector): Pool { $client = $connector->client(); - if ($client instanceof React\AsyncClientInterface) { - return new React\Pool($connector); - } - - $this->assertConnector($connector); - - if ($client instanceof Psr18Client) { - $newClient = new React\SymfonyClient($this->getUnderlyingSymfonyHttpClient($client)); - } elseif ($client instanceof ClientInterface) { - $newClient = new React\GuzzleClient($client); - } elseif (\class_exists(Browser::class)) { - $newClient = new React\Client(); - } else { - // @codeCoverageIgnoreStart - throw new UnsupportedClientException(\sprintf( - 'The concurrent requests feature cannot be used as the client %s is not supported. To utilize this feature, please install package "react/http".', - \get_debug_type($client) - )); - // @codeCoverageIgnoreEnd - } - - return new React\Pool($connector->withClient($newClient)); //@phpstan-ignore-line - } - - private function createPslPool(ConnectorInterface $connector): Psl\Pool - { - $client = $connector->client(); - - if ($client instanceof Psl\AsyncClientInterface) { - return new Psl\Pool($connector); - } - - $this->assertConnector($connector); + if (! $client instanceof AsyncClientInterface) { + if (! \method_exists($connector, 'withClient')) { + // @codeCoverageIgnoreStart + throw new \LogicException('Unable to swap the underlying client of connector '.\get_debug_type($connector)); + // @codeCoverageIgnoreEnd + } - if ($client instanceof Psr18Client) { - $newClient = new Psl\SymfonyClient($this->getUnderlyingSymfonyHttpClient($client)); - } elseif ($client instanceof ClientInterface) { - $newClient = new Psl\GuzzleClient($client); - } else { - throw new UnsupportedClientException(\sprintf( - 'The client %s is not supported. The PSL Pool only supports "guzzlehttp/guzzle" and "symfony/http-client".', - \get_debug_type($client) - )); + $connector = $connector->withClient(AsyncClientFactory::create($client)); } - return new Psl\Pool($connector->withClient($newClient)); //@phpstan-ignore-line + return new ConnectorPool($connector); } } diff --git a/src/PoolTrait.php b/src/PoolTrait.php index 81bb340..a0239fb 100644 --- a/src/PoolTrait.php +++ b/src/PoolTrait.php @@ -2,23 +2,28 @@ declare(strict_types=1); -namespace Jenky\Atlas\Pool; +namespace Fansipan\Peak; -use Jenky\Concurrency\PoolInterface; +use Fansipan\Peak\Client\AsyncClientInterface; +use Fansipan\Peak\Concurrency\Driver; +use Fansipan\Peak\Concurrency\PslWorker; +use Fansipan\Peak\Concurrency\ReactWorker; +use Fansipan\Peak\Concurrency\Worker; +use Fansipan\Peak\Exception\UnsupportedFeatureException; trait PoolTrait { /** - * @var positive-int + * @var int<1, max> */ private int $concurrency = 25; /** - * @param positive-int $concurrency + * @param int<1, max> $concurrency * * @throws \ValueError */ - public function concurrent(int $concurrency): PoolInterface + public function concurrent(int $concurrency): Pool { if ($concurrency < 1) { throw new \ValueError('Argument #1 ($concurrency) must be positive, got '.$concurrency); @@ -29,4 +34,15 @@ public function concurrent(int $concurrency): PoolInterface return $clone; } + + private function createWorker(AsyncClientInterface $client): Worker + { + $driver = $client->driver(); + + return match (true) { + $driver === Driver::PSL => new PslWorker($this->concurrency), + $driver === Driver::REACT => new ReactWorker($this->concurrency), + default => throw new UnsupportedFeatureException('You cannot use the concurrent request pool feature as the required packages are not installed.'), + }; + } } diff --git a/src/Psl/AsyncClientInterface.php b/src/Psl/AsyncClientInterface.php deleted file mode 100644 index bd4a203..0000000 --- a/src/Psl/AsyncClientInterface.php +++ /dev/null @@ -1,11 +0,0 @@ -client = $client ?? new Client(); - } - - public function sendRequest(RequestInterface $request): ResponseInterface - { - $promise = $this->client->sendAsync($request, [ - RequestOptions::ALLOW_REDIRECTS => false, - RequestOptions::HTTP_ERRORS => false, - ]); - - $defer = new Async\Deferred(); - - Async\Scheduler::defer(static function () use ($defer, $promise) { - $promise->then( - fn (ResponseInterface $response) => $defer->complete($response), - fn (\Throwable $e) => $defer->error($e) - )->wait(); - }); - - return $defer->getAwaitable()->await(); - } -} diff --git a/src/Psl/Pool.php b/src/Psl/Pool.php deleted file mode 100644 index 2c53509..0000000 --- a/src/Psl/Pool.php +++ /dev/null @@ -1,52 +0,0 @@ - - */ -final class Pool implements PoolInterface -{ - use PoolTrait; - - public function __construct(private ConnectorInterface $connector) - { - if (! $connector->client() instanceof AsyncClientInterface) { - // @codeCoverageIgnoreStart - throw new UnsupportedClientException(\sprintf( - 'The client %s is not supported. Please swap the underlying client to supported one.', - \get_debug_type($connector->client()) - )); - // @codeCoverageIgnoreEnd - } - } - - public function send(iterable $requests): array - { - $semaphore = new Async\Semaphore($this->concurrency, static fn (Response $response) => $response); - - $promises = static function (ConnectorInterface $connector) use ($requests, $semaphore) { - foreach ($requests as $key => $request) { - if ($request instanceof Request) { - yield $key => static fn (): Response => $semaphore->waitFor($connector->send($request)); - } elseif (\is_callable($request)) { - yield $key => static fn (): Response => $semaphore->waitFor($request($connector)); - } else { - throw new \InvalidArgumentException('Each value of the iterator must be a Jenky\Atlas\Request or a \Closure that returns a Jenky\Atlas\Response object.'); - } - } - }; - - return Async\concurrently($promises($this->connector)); //@phpstan-ignore-line - } -} diff --git a/src/Psl/SymfonyClient.php b/src/Psl/SymfonyClient.php deleted file mode 100644 index 5f3fb46..0000000 --- a/src/Psl/SymfonyClient.php +++ /dev/null @@ -1,50 +0,0 @@ -client = $client ?? HttpClient::create(); - $this->responseFactory = $responseFactory ?? Psr17FactoryDiscovery::findResponseFactory(); - $this->streamFactory = $streamFactory ?? Psr17FactoryDiscovery::findStreamFactory(); - } - - /** - * @param \Closure(): ResponseInterface $response - */ - private function createResponse(\Closure $response): mixed - { - $defer = new Async\Deferred(); - - Async\Scheduler::defer(static function () use ($defer, $response) { - $defer->complete($response()); - }); - - return $defer->getAwaitable()->await(); - } -} diff --git a/src/React/AsyncClientInterface.php b/src/React/AsyncClientInterface.php deleted file mode 100644 index eb27b45..0000000 --- a/src/React/AsyncClientInterface.php +++ /dev/null @@ -1,11 +0,0 @@ - - */ -final class Pool implements PoolInterface -{ - use PoolTrait; - - public function __construct(private ConnectorInterface $connector) - { - if (! $connector->client() instanceof AsyncClientInterface) { - // @codeCoverageIgnoreStart - throw new UnsupportedClientException(\sprintf( - 'The client %s is not supported. Please swap the underlying client to supported one.', - \get_debug_type($connector->client()) - )); - // @codeCoverageIgnoreEnd - } - } - - public function send(iterable $requests): array - { - $queue = new Queue($this->concurrency, null, static fn (\Closure $cb) => Async\async($cb)()); - - $promises = static function (ConnectorInterface $connector) use ($requests, $queue) { - foreach ($requests as $key => $request) { - if ($request instanceof Request) { - yield $key => static fn () => $queue(static fn (): Response => $connector->send($request)); - } elseif (\is_callable($request)) { - yield $key => static fn () => $queue(static fn (): Response => $request($connector)); - } else { - throw new \InvalidArgumentException('Each value of the iterator must be a Jenky\Atlas\Request or a \Closure that returns a Jenky\Atlas\Response object.'); - } - } - }; - - return Async\await(Async\parallel($promises($this->connector))); - } -} diff --git a/src/React/SymfonyClient.php b/src/React/SymfonyClient.php deleted file mode 100644 index 709126d..0000000 --- a/src/React/SymfonyClient.php +++ /dev/null @@ -1,57 +0,0 @@ -client = $client ?? HttpClient::create(); - $this->responseFactory = $responseFactory ?? Psr17FactoryDiscovery::findResponseFactory(); - $this->streamFactory = $streamFactory ?? Psr17FactoryDiscovery::findStreamFactory(); - $this->loop = $loop ?? Loop::get(); - } - - /** - * @param \Closure(): ResponseInterface $response - */ - private function createResponse(\Closure $response): mixed - { - $defer = new Deferred(); - - $this->loop->futureTick(static function () use ($defer, $response) { - $defer->resolve($response()); - }); - - return Async\await($defer->promise()); - } -} diff --git a/tests/AkamaiTileRequest.php b/tests/AkamaiTileRequest.php index 2902913..18593ef 100644 --- a/tests/AkamaiTileRequest.php +++ b/tests/AkamaiTileRequest.php @@ -2,7 +2,7 @@ declare(strict_types=1); -namespace Jenky\Atlas\Pool\Tests; +namespace Fansipan\Peak\Tests; use Jenky\Atlas\Request; diff --git a/tests/DelayTest.php b/tests/DelayTest.php new file mode 100644 index 0000000..4c34202 --- /dev/null +++ b/tests/DelayTest.php @@ -0,0 +1,124 @@ +requestFactory = Psr17FactoryDiscovery::findRequestFactory(); + } + + public function test_psl_delay(): void + { + $request = $this->requestFactory->createRequest('GET', 'http://localhost'); + + $client = $this->mockGuzzleClient(new PslDeferred(), [ + MockResponse::create(''), + ]); + + $reflection = new \ReflectionProperty($client, 'delay'); + $reflection->setAccessible(true); + + $client->delay(1000); + + $this->assertSame(1000, $reflection->getValue($client)); + + $client->sendRequest($request); + + $this->assertSame(0, $reflection->getValue($client)); + } + + public function test_react_delay(): void + { + $request = $this->requestFactory->createRequest('GET', 'https://example.com'); + + $client = $this->mockSymfonyClient(new ReactDeferred()); + + $reflection = new \ReflectionProperty($client, 'delay'); + $reflection->setAccessible(true); + + $client->delay(1000); + + $this->assertSame(1000, $reflection->getValue($client)); + + $client->sendRequest($request); + + $this->assertSame(0, $reflection->getValue($client)); + + $client = new ReactClient(); + $client->delay(1000); + $client->sendRequest($request); + } + + public function test_pool_psl_delay(): void + { + $this->runPoolDelayTests( + $this->mockSymfonyClient(new PslDeferred()), 3 + ); + + $this->assertTrue(true); + } + + public function test_pool_react_delay(): void + { + $this->runPoolDelayTests( + $this->mockSymfonyClient(new ReactDeferred()), 3 + ); + + $this->assertTrue(true); + } + + private function runPoolDelayTests(AsyncClientInterface $client, int $totalRequests, int $delay = 1000): void + { + $requests = function (int $total) use ($delay) { + for ($i = 0; $i < $total; $i++) { + yield function (AsyncClientInterface $client) use ($delay): ResponseInterface { + $client->delay($delay); + + return $client->sendRequest($this->requestFactory->createRequest('GET', 'http://localhost')); + }; + } + }; + + $pool = PoolFactory::createFromClient($client); + $pool->send($requests($totalRequests)); + } + + private function mockGuzzleClient(Deferrable $defer, ?array $response = null): GuzzleClient + { + $handler = new MockHandler($response); + $handlerStack = HandlerStack::create($handler); + + return new GuzzleClient($defer, new Client(['handler' => $handlerStack])); + } + + private function mockSymfonyClient(Deferrable $defer, mixed $response = null): SymfonyClient + { + return new SymfonyClient($defer, new MockHttpClient($response)); + } +} diff --git a/tests/DummyRequest.php b/tests/DummyRequest.php index d369c84..5113b40 100644 --- a/tests/DummyRequest.php +++ b/tests/DummyRequest.php @@ -2,7 +2,7 @@ declare(strict_types=1); -namespace Jenky\Atlas\Pool\Tests; +namespace Fansipan\Peak\Tests; use Jenky\Atlas\Request; diff --git a/tests/PoolTest.php b/tests/PoolTest.php index 7bef911..4648663 100644 --- a/tests/PoolTest.php +++ b/tests/PoolTest.php @@ -1,17 +1,24 @@ concurrent(-1); } - public function test_factory_without_candidates(): void + public function test_driver_discovery(): void { - $factory = new PoolFactory(); - - $reflection = new \ReflectionClass($factory); - $reflection->setStaticPropertyValue('candidates', []); + $this->assertSame(Driver::PSL, DriverDiscovery::find(false)); - $this->expectException(UnsupportedFeatureException::class); + DriverDiscovery::prefer(Driver::REACT); - $factory->createPool(new NullConnector()); + $this->assertSame(Driver::REACT, DriverDiscovery::find(false)); } - private function createFactory(string $method): PoolFactory + public function test_async_client_factory(): void { - $factory = new PoolFactory(); - - $reflection = new \ReflectionClass($factory); + $client = AsyncClientFactory::create(new Client()); + $this->assertInstanceOf(GuzzleClient::class, $client); - $method = $reflection->getMethod($method); - $method->setAccessible(true); - - $reflection->setStaticPropertyValue('candidates', [fn (ConnectorInterface $connector) => $method->invoke($factory, $connector)]); - - return $factory; + $client = AsyncClientFactory::create(new Psr18Client()); + $this->assertInstanceOf(SymfonyClient::class, $client); } - /** - * @param class-string $poolClass - * @param class-string $clientClass - */ - private function assertPoolAndClient(string $poolClass, string $clientClass, PoolInterface $pool): void + public function test_invalid_pool_request(): void { - $this->assertInstanceOf($poolClass, $pool); + $client = new ReactClient(); - $reflection = new \ReflectionProperty($pool, 'connector'); - $reflection->setAccessible(true); + $clientPool = PoolFactory::createFromClient($client); - $this->assertInstanceOf($clientClass, $reflection->getValue($pool)->client()); - } + $this->expectException(InvalidPoolRequestException::class); - public function test_factory_using_supported_client(): void - { - // Reset the factory candidates - $reflection = new \ReflectionClass(PoolFactory::class); - $reflection->setStaticPropertyValue('candidates', []); + $clientPool->send([1, 2, 3]); - $pool = PoolFactory::create((new NullConnector())->withClient(new Pool\React\GuzzleClient())); - $this->assertPoolAndClient(Pool\React\Pool::class, Pool\React\GuzzleClient::class, $pool); + $connectorPool = PoolFactory::createFromConnector( + (new GenericConnector())->withClient($client) + ); - $pool = PoolFactory::create((new NullConnector())->withClient(new Pool\Psl\SymfonyClient())); - $this->assertPoolAndClient(Pool\Psl\Pool::class, Pool\Psl\SymfonyClient::class, $pool); + $this->expectException(InvalidPoolRequestException::class); + $connectorPool->send([1, fn () => new \stdClass()]); } - public function test_factory_react(): void + public function test_pool_factory(): void { - $factory = $this->createFactory('createReactPool'); + $pool = PoolFactory::createFromConnector((new GenericConnector())->withClient(new Client())); + $this->assertInstanceOf(GuzzleClient::class, $this->getClientFromPool($pool)); + + $pool = PoolFactory::createFromConnector((new GenericConnector())->withClient(new Psr18Client())); + $this->assertInstanceOf(SymfonyClient::class, $this->getClientFromPool($pool)); - $pool = $factory->createPool((new NullConnector())->withClient(new Psr18Client())); - $this->assertPoolAndClient(Pool\React\Pool::class, Pool\React\SymfonyClient::class, $pool); + $this->expectException(UnsupportedClientException::class); + $pool = PoolFactory::createFromClient(new FakeHttpClient()); - $pool = $factory->createPool((new NullConnector())->withClient(new Client())); - $this->assertPoolAndClient(Pool\React\Pool::class, Pool\React\GuzzleClient::class, $pool); + DriverDiscovery::prefer(Driver::REACT); - $pool = $factory->createPool((new NullConnector())->withClient(new FakeHttpClient())); - $this->assertPoolAndClient(Pool\React\Pool::class, Pool\React\Client::class, $pool); + $pool = PoolFactory::createFromClient(AsyncClientFactory::create(new FakeHttpClient())); + $this->assertInstanceOf(ReactClient::class, $this->getClientFromPool($pool)); } - public function test_factory_psl(): void + private function getClientFromPool(Pool $pool): ClientInterface { - $factory = $this->createFactory('createPslPool'); + if ($pool instanceof ConnectorPool) { + $reflection = new \ReflectionProperty($pool, 'connector'); + $reflection->setAccessible(true); - $pool = $factory->createPool((new NullConnector())->withClient(new Psr18Client())); - $this->assertPoolAndClient(Pool\Psl\Pool::class, Pool\Psl\SymfonyClient::class, $pool); + return $reflection->getValue($pool)->client(); + } - $pool = $factory->createPool((new NullConnector())->withClient(new Client())); - $this->assertPoolAndClient(Pool\Psl\Pool::class, Pool\Psl\GuzzleClient::class, $pool); + $reflection = new \ReflectionProperty($pool, 'client'); + $reflection->setAccessible(true); - $this->expectException(UnsupportedClientException::class); - $pool = $factory->createPool((new NullConnector())->withClient(new FakeHttpClient())); + return $reflection->getValue($pool); } } -final class NullPool implements PoolInterface +final class NullPool implements Pool { use PoolTrait; diff --git a/tests/PslPoolTest.php b/tests/PslPoolTest.php index 64c51fe..84864c3 100644 --- a/tests/PslPoolTest.php +++ b/tests/PslPoolTest.php @@ -2,28 +2,31 @@ declare(strict_types=1); -namespace Jenky\Atlas\Pool\Tests; +namespace Fansipan\Peak\Tests; -use Jenky\Atlas\Contracts\ConnectorInterface; -use Jenky\Atlas\Pool\Psl\GuzzleClient; -use Jenky\Atlas\Pool\Psl\Pool; -use Jenky\Atlas\Pool\Psl\SymfonyClient; -use Jenky\Concurrency\PoolInterface; +use Fansipan\Peak\Client\GuzzleClient; +use Fansipan\Peak\Client\SymfonyClient; +use Fansipan\Peak\Concurrency\PslDeferred; final class PslPoolTest extends TestCase { - protected function createPool(ConnectorInterface $connector): PoolInterface + private function createSymfonyClient(): SymfonyClient { - return new Pool($connector); + return new SymfonyClient(new PslDeferred()); } - public function test_psl_pool_using_symfony_http_client(): void + private function createGuzzleClient(): GuzzleClient { - $this->performTests($this->createConnector(new SymfonyClient())); + return new GuzzleClient(new PslDeferred()); } - public function test_psl_pool_using_guzzle(): void + public function test_react_pool_using_symfony_http_client(): void { - $this->performTests($this->createConnector(new GuzzleClient())); + $this->performConnectorTests($this->createConnector($this->createSymfonyClient())); + } + + public function test_react_pool_using_guzzle(): void + { + $this->performConnectorTests($this->createConnector($this->createGuzzleClient())); } } diff --git a/tests/ReactPoolTest.php b/tests/ReactPoolTest.php index 1fa24ff..79b4879 100644 --- a/tests/ReactPoolTest.php +++ b/tests/ReactPoolTest.php @@ -1,33 +1,36 @@ performTests($this->createConnector(new Client())); + $this->performConnectorTests($this->createConnector(new ReactClient())); } public function test_react_pool_using_symfony_http_client(): void { - $this->performTests($this->createConnector(new SymfonyClient())); + $this->performConnectorTests($this->createConnector($this->createSymfonyClient())); } public function test_react_pool_using_guzzle(): void { - $this->performTests($this->createConnector(new GuzzleClient())); + $this->performConnectorTests($this->createConnector($this->createGuzzleClient())); } } diff --git a/tests/TestCase.php b/tests/TestCase.php index 2b23d6d..9c12334 100644 --- a/tests/TestCase.php +++ b/tests/TestCase.php @@ -2,38 +2,44 @@ declare(strict_types=1); -namespace Jenky\Atlas\Pool\Tests; +namespace Fansipan\Peak\Tests; +use Fansipan\Peak\Client\AsyncClientInterface; +use Fansipan\Peak\ClientPool; +use Fansipan\Peak\ConnectorPool; use Jenky\Atlas\Contracts\ConnectorInterface; +use Jenky\Atlas\GenericConnector; use Jenky\Atlas\Middleware\Interceptor; -use Jenky\Atlas\NullConnector; use Jenky\Atlas\Response; -use Jenky\Concurrency\PoolInterface; use PHPUnit\Framework\TestCase as BaseTestCase; use Psr\Http\Client\ClientInterface; use Psr\Http\Message\ResponseInterface; abstract class TestCase extends BaseTestCase { - abstract protected function createPool(ConnectorInterface $connector): PoolInterface; + use TestRequestTrait; - protected function createRequests(int $total): iterable + protected function createConnector(?ClientInterface $client = null): ConnectorInterface { - for ($i = 1; $i <= $total; $i++) { - yield new AkamaiTileRequest($i); - } + $connector = new GenericConnector(); + + return $client ? $connector->withClient($client) : $connector; } - protected function createConnector(?ClientInterface $client = null): ConnectorInterface + protected function performClientTests(AsyncClientInterface $client): void { - $connector = new NullConnector(); + $total = (int) getenv('TEST_TOTAL_CONCURRENT_REQUESTS') ?: 100; - return $client ? $connector->withClient($client) : $connector; + $responses = (new ClientPool($client)) + ->send($this->createPsrRequests($total)); + + $this->assertCount($total, $responses); + $this->assertInstanceOf(ResponseInterface::class, $responses[0]); } - protected function performTests(ConnectorInterface $connector, int $totalRequests = 100): void + protected function performConnectorTests(ConnectorInterface $connector): void { - $total = (int) getenv('TEST_TOTAL_CONCURRENT_REQUESTS') ?: $totalRequests; + $total = (int) getenv('TEST_TOTAL_CONCURRENT_REQUESTS') ?: 100; $connector->middleware()->push( Interceptor::response(function (ResponseInterface $response) { @@ -41,7 +47,7 @@ protected function performTests(ConnectorInterface $connector, int $totalRequest }) ); - $responses = $this->createPool($connector) + $responses = (new ConnectorPool($connector)) ->send($this->createRequests($total)); $this->assertCount($total, $responses); diff --git a/tests/TestRequestTrait.php b/tests/TestRequestTrait.php new file mode 100644 index 0000000..1e02815 --- /dev/null +++ b/tests/TestRequestTrait.php @@ -0,0 +1,24 @@ +