Skip to content

Commit

Permalink
Delay for async client
Browse files Browse the repository at this point in the history
  • Loading branch information
jenky committed Sep 22, 2023
1 parent c546070 commit b539368
Show file tree
Hide file tree
Showing 12 changed files with 198 additions and 13 deletions.
7 changes: 4 additions & 3 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,18 @@
"psr/http-factory": "^1.0"
},
"require-dev": {
"jenky/atlas": "^0.5",
"azjezz/psl": "^2.7",
"clue/mq-react": "^1.6",
"fansipan/mock-client": "^1.0",
"friendsofphp/php-cs-fixer": "^3.15",
"guzzlehttp/guzzle": "^7.5",
"fansipan/mock-client": "^1.0",
"jenky/atlas": "^0.5",
"phpstan/phpstan": "^1.10",
"phpunit/phpunit": "^9.0",
"react/async": "^4.1",
"react/http": "^1.9",
"symfony/http-client": "^6.3"
"symfony/http-client": "^6.3",
"symfony/var-dumper": "^6.3"
},
"autoload": {
"psr-4": {
Expand Down
7 changes: 7 additions & 0 deletions src/Client/AsyncClientInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,13 @@

interface AsyncClientInterface extends ClientInterface
{
/**
* Delay the request sending in milliseconds.
*
* @param int<0, max> $milliseconds
*/
public function delay(int $milliseconds): void;

/**
* Get the underlying async driver type.
*/
Expand Down
32 changes: 32 additions & 0 deletions src/Client/DelayTrait.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?php

declare(strict_types=1);

namespace Fansipan\Concurrent\Client;

trait DelayTrait
{
/**
* @var int<0, max>
*/
private int $delay = 0;

public function delay(int $milliseconds): void
{
$this->delay = $milliseconds;
}

private function getDelay(bool $asSeconds = false): float|int
{
if ($this->delay <= 0) {
return 0;
}

$delay = $asSeconds ? $this->delay / 1000 : $this->delay;

// Reset the delay value to zero
$this->delay = 0;

return $delay;
}
}
4 changes: 4 additions & 0 deletions src/Client/Factory.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ class Factory
*/
public static function createAsyncClient(ClientInterface $client): AsyncClientInterface
{
if ($client instanceof AsyncClientInterface) {
return $client;
}

$driver = DriverDiscovery::find();

if ($driver === Driver::PSL) {
Expand Down
3 changes: 2 additions & 1 deletion src/Client/GuzzleClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
final class GuzzleClient implements AsyncClientInterface
{
use AsyncClientTrait;
use DelayTrait;

private ClientInterface $client;

Expand All @@ -36,7 +37,7 @@ public function sendRequest(RequestInterface $request): ResponseInterface
static fn (ResponseInterface $response) => $resolve($response),
static fn (\Throwable $e) => $reject($e)
)->wait();
});
}, $this->getDelay(true));
}

private function getDeferrable(): Deferrable
Expand Down
8 changes: 8 additions & 0 deletions src/Client/ReactClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

final class ReactClient implements AsyncClientInterface
{
use DelayTrait;

private Browser $browser;

public function __construct(?Browser $browser = null)
Expand All @@ -23,6 +25,12 @@ public function __construct(?Browser $browser = null)

public function sendRequest(RequestInterface $request): ResponseInterface
{
$delay = $this->getDelay(true);

if ($delay > 0) {
Async\delay($delay);
}

return Async\await(
$this->browser->request(
$request->getMethod(),
Expand Down
3 changes: 2 additions & 1 deletion src/Client/SymfonyClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
final class SymfonyClient implements AsyncClientInterface, ResetInterface
{
use AsyncClientTrait;
use DelayTrait;

private HttpClientInterface $client;

Expand Down Expand Up @@ -81,7 +82,7 @@ private function createResponse(SymfonyResponseInterface $response): mixed
} catch (\Throwable $e) {
$reject($e);
}
});
}, $this->getDelay(true));
}

private function convertToPsrResponse(SymfonyResponseInterface $response): ResponseInterface
Expand Down
5 changes: 3 additions & 2 deletions src/Concurrency/Deferrable.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@
interface Deferrable
{
/**
* Defer the operation.
* Defer the operation with optional delay in seconds.
*
* @param callable(\Closure(T): void, \Closure(\Throwable): void): void $callback
* @parma float<0, max> $delay
* @return T
*/
public function defer(callable $callback): mixed;
public function defer(callable $callback, float $delay = 0): mixed;
}
7 changes: 4 additions & 3 deletions src/Concurrency/PslDeferred.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@

final class PslDeferred implements Deferrable
{
public function defer(callable $callback): mixed
public function defer(callable $callback, float $delay = 0): mixed
{
$defer = new Async\Deferred();

Async\Scheduler::defer(static function () use ($defer, $callback) {
Async\Scheduler::defer(static function () use ($defer, $callback, $delay) {
$resolve = static fn (mixed $value) => $defer->complete($value);
$reject = static fn (\Throwable $e) => $defer->error($e);
$callback($resolve, $reject);

Async\Scheduler::delay($delay, static fn () => $callback($resolve, $reject));
});

return $defer->getAwaitable()->await();
Expand Down
7 changes: 4 additions & 3 deletions src/Concurrency/ReactDeferred.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ public function __construct(?LoopInterface $loop = null)
$this->loop = $loop ?: Loop::get();
}

public function defer(callable $callback): mixed
public function defer(callable $callback, float $delay = 0): mixed
{
$defer = new Deferred();

$this->loop->futureTick(static function () use ($defer, $callback) {
$this->loop->futureTick(function () use ($defer, $callback, $delay) {
$resolve = static fn (mixed $value) => $defer->resolve($value);
$reject = static fn (\Throwable $e) => $defer->reject($e);
$callback($resolve, $reject);

$this->loop->addTimer($delay, static fn () => $callback($resolve, $reject));
});

return Async\await($defer->promise());
Expand Down
4 changes: 4 additions & 0 deletions src/ConnectorPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
use Jenky\Atlas\Request;
use Jenky\Atlas\Response;

if (! \interface_exists(ConnectorInterface::class)) {
throw new \LogicException('You cannot use the ConnectorPool as the "fansipan/fansipan" package is not installed.');
}

/**
* @implements Pool<Request|callable(ConnectorInterface): Response, Response>
*/
Expand Down
124 changes: 124 additions & 0 deletions tests/DelayTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
<?php

declare(strict_types=1);

namespace Fansipan\Concurrent\Tests;

use Fansipan\Concurrent\Client\AsyncClientInterface;
use Fansipan\Concurrent\Client\GuzzleClient;
use Fansipan\Concurrent\Client\ReactClient;
use Fansipan\Concurrent\Client\SymfonyClient;
use Fansipan\Concurrent\Concurrency\Deferrable;
use Fansipan\Concurrent\Concurrency\PslDeferred;
use Fansipan\Concurrent\Concurrency\ReactDeferred;
use Fansipan\Concurrent\PoolFactory;
use Fansipan\Mock\MockResponse;
use GuzzleHttp\Client;
use GuzzleHttp\Handler\MockHandler;
use GuzzleHttp\HandlerStack;
use Http\Discovery\Psr17FactoryDiscovery;
use PHPUnit\Framework\TestCase;
use Psr\Http\Message\RequestFactoryInterface;
use Psr\Http\Message\ResponseInterface;
use Symfony\Component\HttpClient\MockHttpClient;

final class DelayTest extends TestCase
{
private RequestFactoryInterface $requestFactory;

protected function setUp(): void
{
parent::setUp();

$this->requestFactory = Psr17FactoryDiscovery::findRequestFactory();
}

public function test_psl_delay(): void
{
$request = $this->requestFactory->createRequest('GET', 'http://localhost');

$client = $this->mockGuzzleClient(new PslDeferred(), [
MockResponse::create(''),
]);

$reflection = new \ReflectionProperty($client, 'delay');
$reflection->setAccessible(true);

$client->delay(1000);

$this->assertSame(1000, $reflection->getValue($client));

$client->sendRequest($request);

$this->assertSame(0, $reflection->getValue($client));
}

public function test_react_delay(): void
{
$request = $this->requestFactory->createRequest('GET', 'http://localhost');

$client = $this->mockSymfonyClient(new ReactDeferred());

$reflection = new \ReflectionProperty($client, 'delay');
$reflection->setAccessible(true);

$client->delay(1000);

$this->assertSame(1000, $reflection->getValue($client));

$client->sendRequest($request);

$this->assertSame(0, $reflection->getValue($client));

$client = new ReactClient();
$client->delay(1000);
$client->sendRequest($request);
}

public function test_pool_psl_delay(): void
{
$this->runPoolDelayTests(
$client = $this->mockSymfonyClient(new PslDeferred()), 3
);

$this->assertTrue(true);
}

public function test_pool_react_delay(): void
{
$this->runPoolDelayTests(
$this->mockSymfonyClient(new ReactDeferred()), 3
);

$this->assertTrue(true);
}

private function runPoolDelayTests(AsyncClientInterface $client, int $totalRequests, int $delay = 1000): void
{
$requests = function (int $total) use ($delay) {
for ($i = 0; $i < $total; $i++) {
yield function (AsyncClientInterface $client) use ($delay): ResponseInterface {
$client->delay($delay);

return $client->sendRequest($this->requestFactory->createRequest('GET', 'http://localhost'));
};
}
};

$pool = PoolFactory::createForClient($client);
$pool->send($requests($totalRequests));
}

private function mockGuzzleClient(Deferrable $defer, ?array $response = null): GuzzleClient
{
$handler = new MockHandler($response);
$handlerStack = HandlerStack::create($handler);

return new GuzzleClient($defer, new Client(['handler' => $handlerStack]));
}

private function mockSymfonyClient(Deferrable $defer, mixed $response = null): SymfonyClient
{
return new SymfonyClient($defer, new MockHttpClient($response));
}
}

0 comments on commit b539368

Please sign in to comment.