Skip to content

Commit

Permalink
AMPHP driver (#7)
Browse files Browse the repository at this point in the history
* Fix generics

* Refactor defer

* Add amphp driver

* Fix test case

* Add @return doc for generic

* Support keyed response for amp

* Update readme

* refactor AmpDeferred:

move delay check and callback invocation inside Amp\async function

* Add suggest

* Separate client and connector test

* Ignore coverage

* Update test case

* Ignore coverage

* Merge driver discovery and async client factory test

* Fix issue with incorrect driver

* Test driver discovery

* Revert test case

* Ignore coverage for now

* Update benchmark

* Add benchmark workflow

* Update benchmark workflow

* Change bench command

* Add generic docblock
  • Loading branch information
jenky committed Oct 6, 2023
1 parent e77512e commit 4c03372
Show file tree
Hide file tree
Showing 36 changed files with 480 additions and 104 deletions.
12 changes: 11 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

A simple and efficient solution for concurrently sending HTTP requests using PSR-18 client implementations.

Peak is a library that enables concurrent request sending using a request pool. It leverages the event loop of [ReactPHP](https://github.com/reactphp) or [PSL](https://github.com/azjezz/psl) to handle and manage the requests concurrently.
Peak is a library that enables concurrent request sending using a request pool. It leverages the event loop of [AMPHP](https://github.com/amphp), [ReactPHP](https://github.com/reactphp) or [PSL](https://github.com/azjezz/psl) to handle and manage the requests concurrently.

## Requirements

Expand All @@ -26,6 +26,12 @@ composer require fansipan/peak

Additionally, depending on your choice of driver, these packages may also need to be installed.

### AMPHP

```bash
composer require amphp/pipeline
```

### PSL

```bash
Expand Down Expand Up @@ -60,9 +66,13 @@ The `Fansipan\Peak\PoolFactory` provides a configured request pool based on the
First, you need to create your desired driver:

```php
use Fansipan\Peak\Concurrency\AmpDeferred;
use Fansipan\Peak\Concurrency\PslDeferred;
use Fansipan\Peak\Concurrency\ReactDeferred;

// AMPHP
$defer = new AmpDeferred();

// PSL
$defer = new PslDeferred();

Expand Down
4 changes: 3 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
"psr/http-factory": "^1.0"
},
"require-dev": {
"amphp/pipeline": "^1.0",
"azjezz/psl": "^2.7",
"clue/mq-react": "^1.6",
"fansipan/mock-client": "^1.0",
Expand Down Expand Up @@ -55,7 +56,7 @@
"vendor/bin/phpstan analyse"
],
"bench": [
"./vendor/bin/phpbench run --report=aggregate"
"./vendor/bin/phpbench run --report=all --progress=blinken"
],
"cs": [
"vendor/bin/php-cs-fixer fix"
Expand All @@ -73,6 +74,7 @@
]
},
"suggest": {
"php-http/discovery": "Provides a convenient solution for auto-discovery and auto-installation of well-known implementations of PSR-17 (HTTP Factories), PSR-18 (HTTP Clients)",
"azjezz/psl": "Required for the PSL driver",
"clue/mq-react": "Required for the ReactPHP driver",
"react/async": "Required for the ReactPHP driver"
Expand Down
33 changes: 32 additions & 1 deletion phpbench.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,36 @@
"$schema":"./vendor/phpbench/phpbench/phpbench.schema.json",
"runner.bootstrap": "vendor/autoload.php",
"runner.path": "tests/Benchmark",
"runner.file_pattern": "*Bench.php"
"runner.file_pattern": "*Bench.php",
"report.generators": {
"chart": {
"generator": "component",
"components": [
{
"title": "Request average time comparison",
"component": "bar_chart_aggregate",
"x_partition": "variant_params['limit'] ~ ' requests'",
"bar_partition": "subject_name",
"y_expr": "mode(partition['result_time_avg']) as time",
"y_axes_label": "yValue as time"
},
{
"title": "Peak memory usage comparison",
"component": "bar_chart_aggregate",
"x_partition": "variant_params['limit'] ~ ' requests'",
"bar_partition": "subject_name",
"y_expr": "mode(partition['result_mem_peak']) as memory",
"y_axes_label": "yValue as memory"
}
]
},
"all": {
"generator": "composite",
"reports": [
"default",
"aggregate",
"chart"
]
}
}
}
16 changes: 16 additions & 0 deletions src/Client/AsyncClientFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Fansipan\Peak\Client;

use Fansipan\Peak\Concurrency\AmpDeferred;
use Fansipan\Peak\Concurrency\Driver;
use Fansipan\Peak\Concurrency\DriverDiscovery;
use Fansipan\Peak\Concurrency\PslDeferred;
Expand Down Expand Up @@ -36,6 +37,21 @@ public static function create(?ClientInterface $client = null): AsyncClientInter

$driver = DriverDiscovery::find();

if ($driver === Driver::AMP) {
if ($client instanceof GuzzleClientInterface) {
return new GuzzleClient(new AmpDeferred(), $client);
}

if ($client instanceof Psr18Client) {
return new SymfonyClient(new AmpDeferred(), self::getUnderlyingSymfonyHttpClient($client));
}

throw new UnsupportedClientException(\sprintf(
'The client %s is not supported. The PSL Pool only supports "guzzlehttp/guzzle" and "symfony/http-client".',
\get_debug_type($client)
));
}

if ($driver === Driver::PSL) {
if ($client instanceof GuzzleClientInterface) {
return new GuzzleClient(new PslDeferred(), $client);
Expand Down
2 changes: 2 additions & 0 deletions src/Client/AsyncClientTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Fansipan\Peak\Client;

use Fansipan\Peak\Concurrency\AmpDeferred;
use Fansipan\Peak\Concurrency\Deferrable;
use Fansipan\Peak\Concurrency\Driver;
use Fansipan\Peak\Concurrency\PslDeferred;
Expand All @@ -18,6 +19,7 @@ public function driver(): ?Driver
$deferrable = $this->getDeferrable();

return match (true) {
$deferrable instanceof AmpDeferred => Driver::AMP,
$deferrable instanceof PslDeferred => Driver::PSL,
$deferrable instanceof ReactDeferred => Driver::REACT,
default => null,
Expand Down
2 changes: 2 additions & 0 deletions src/Client/DelayTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ trait DelayTrait
public function delay(int $milliseconds): void
{
if ($milliseconds < 0) {
// @codeCoverageIgnoreStart
throw new \ValueError('Delay must be positive, got '.$milliseconds);
// @codeCoverageIgnoreEnd
}

$this->delay = $milliseconds;
Expand Down
9 changes: 3 additions & 6 deletions src/Client/GuzzleClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,9 @@ public function sendRequest(RequestInterface $request): ResponseInterface

$this->delay = 0;

return $this->deferred->defer(static function (\Closure $resolve, \Closure $reject) use ($promise) {
$promise->then(
static fn (ResponseInterface $response) => $resolve($response),
static fn (\Throwable $e) => $reject($e)
)->wait();
}, $delay);
return $this->deferred->defer(
static fn () => $promise->wait(), $delay
);
}

private function getDeferrable(): Deferrable
Expand Down
10 changes: 3 additions & 7 deletions src/Client/SymfonyClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,9 @@ private function createResponse(SymfonyResponseInterface $response): mixed

$this->delay = 0;

return $this->deferred->defer(function (\Closure $resolve, \Closure $reject) use ($response) {
try {
$resolve($this->convertToPsrResponse($response));
} catch (\Throwable $e) {
$reject($e);
}
}, $delay);
return $this->deferred->defer(
fn () => $this->convertToPsrResponse($response), $delay
);
}

private function convertToPsrResponse(SymfonyResponseInterface $response): ResponseInterface
Expand Down
1 change: 1 addition & 0 deletions src/ClientPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public function __construct(private readonly AsyncClientInterface $client)
public function send(iterable $requests): array
{
$promises = static function (AsyncClientInterface $client, iterable $requests) {
/** @var array-key $key */
foreach ($requests as $key => $request) {
if ($request instanceof RequestInterface) {
yield $key => static fn (): ResponseInterface => $client->sendRequest($request);
Expand Down
21 changes: 21 additions & 0 deletions src/Concurrency/AmpDeferred.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php

declare(strict_types=1);

namespace Fansipan\Peak\Concurrency;

use Amp;

final class AmpDeferred implements Deferrable
{
public function defer(callable $callback, float $delay = 0): mixed
{
return Amp\async(static function (callable $callback, float $delay) {
if ($delay > 0) {
Amp\delay($delay);
}

return $callback();
}, $callback, $delay)->await();
}
}
45 changes: 45 additions & 0 deletions src/Concurrency/AmpTask.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<?php

declare(strict_types=1);

namespace Fansipan\Peak\Concurrency;

/**
* @template T
*/
final class AmpTask
{
/**
* @var T
*/
private mixed $value = null;

/**
* @param \Closure(): T $task
*/
public function __construct(
private readonly string|int $key,
private readonly \Closure $task
) {
}

public function key(): string|int
{
return $this->key;
}

/**
* @return T
*/
public function value(): mixed
{
return $this->value;
}

public function __invoke(): self
{
$this->value = ($this->task)();

return $this;
}
}
49 changes: 49 additions & 0 deletions src/Concurrency/AmpWorker.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
<?php

declare(strict_types=1);

namespace Fansipan\Peak\Concurrency;

use Amp;
use Amp\Future;
use Amp\Pipeline\Pipeline;

final class AmpWorker implements Worker
{
/**
* @param int<1, max> $limit
*/
public function __construct(private readonly int $limit = 10)
{
if ($limit < 1) {
// @codeCoverageIgnoreStart
throw new \ValueError('Argument #1 ($limit) must be positive, got '.$limit);
// @codeCoverageIgnoreEnd
}
}

public function run(iterable $tasks): array
{
$promises = Pipeline::fromIterable(static function () use ($tasks): \Generator {
foreach ($tasks as $key => $task) {
yield new AmpTask($key, $task);
}
})
->concurrent($this->limit)
->unordered()
->map(static fn (AmpTask $task) => Amp\async(\Closure::fromCallable($task)));

$results = [];

foreach (Future::iterate($promises) as $promise) {
try {
/** @var AmpTask $t */
$t = $promise->await();
$results[$t->key()] = $t->value();
} catch (\Throwable) {
}
}

return $results;
}
}
8 changes: 3 additions & 5 deletions src/Concurrency/Deferrable.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,14 @@

namespace Fansipan\Peak\Concurrency;

/**
* @template-covariant T
*/
interface Deferrable
{
/**
* Defer the operation with optional delay in seconds.
*
* @param callable(\Closure(T): void, \Closure(\Throwable): void): void $callback
* @parma float<0, max> $delay
* @template T
*
* @param callable(): T $callback
* @return T
*/
public function defer(callable $callback, float $delay = 0): mixed;
Expand Down
1 change: 1 addition & 0 deletions src/Concurrency/Driver.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

enum Driver: string
{
case AMP = 'amphp/pipeline';
case PSL = 'azjezz/psl';
case REACT = 'react/async clue/mq-react';
}
14 changes: 13 additions & 1 deletion src/Concurrency/DriverDiscovery.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

namespace Fansipan\Peak\Concurrency;

use Amp\Future;
use Amp\Pipeline\Pipeline;
use Clue\React\Mq\Queue;
use Psl\Async\Awaitable;

Expand All @@ -28,13 +30,17 @@ public static function find(bool $cacheResult = true): Driver
return self::$cached;
}

if (self::isPslInstalled()) {
// @codeCoverageIgnoreStart
if (self::isAmpInstalled()) {
$driver = Driver::AMP;
} elseif (self::isPslInstalled()) {
$driver = Driver::PSL;
} elseif (self::isReactInstalled()) {
$driver = Driver::REACT;
} else {
throw new \RuntimeException('Unable to find async driver.');
}
// @codeCoverageIgnoreEnd

if ($cacheResult) {
self::$cached = $driver;
Expand All @@ -49,6 +55,7 @@ public static function find(bool $cacheResult = true): Driver
public static function prefer(Driver $driver): void
{
$check = match ($driver) {
Driver::AMP => self::isAmpInstalled(),
Driver::PSL => self::isPslInstalled(),
Driver::REACT => self::isReactInstalled(),
};
Expand All @@ -66,6 +73,11 @@ public static function prefer(Driver $driver): void
self::$preferred = $driver;
}

public static function isAmpInstalled(): bool
{
return \class_exists(Future::class) && \class_exists(Pipeline::class);
}

public static function isReactInstalled(): bool
{
return \function_exists('React\\Async\\async') && \class_exists(Queue::class);
Expand Down
Loading

0 comments on commit 4c03372

Please sign in to comment.