Skip to content

Commit

Permalink
Rename
Browse files Browse the repository at this point in the history
  • Loading branch information
jenky committed Sep 23, 2023
1 parent 4b51193 commit ba233d2
Show file tree
Hide file tree
Showing 10 changed files with 25 additions and 24 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ A simple and efficient solution for concurrently sending HTTP requests using PSR
Peak is a library that enables concurrent request sending using a request pool. It leverages the event loop of [ReactPHP](https://github.com/reactphp) or [PSL](https://github.com/azjezz/psl) to handle and manage the requests concurrently.

## Requirements

- PHP 8.1 or higher.
- A package that supports non-block I/O using Fibers under the hood (now refer as **driver**).

Expand Down
2 changes: 1 addition & 1 deletion src/ClientPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public function send(iterable $requests): array
}
};

return $this->getRunner($this->client)->run(
return $this->createWorker($this->client)->run(
$promises($this->client, $requests)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,21 @@

use Psl\Async;

final class PslConcurrency implements Runner
final class PslWorker implements Worker
{
private readonly Async\Semaphore $semaphore;

/**
* @param int<1, max> $limit
*/
public function __construct(int $limit = 10, ?\Closure $operation = null)
public function __construct(int $limit = 10)
{
if ($limit < 1) {
throw new \ValueError('Argument #1 ($limit) must be positive, got '.$limit);
}

$this->semaphore = new Async\Semaphore(
$limit, $operation ?? static fn ($value) => $value
$limit, static fn ($value) => $value
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,21 @@
use Clue\React\Mq\Queue;
use React\Async;

final class ReactConcurrency implements Runner
final class ReactWorker implements Worker
{
private readonly Queue $queue;

/**
* @param int<1, max> $limit
*/
public function __construct(int $limit = 10, ?\Closure $operation = null)
public function __construct(int $limit = 10)
{
if ($limit < 1) {
throw new \ValueError('Argument #1 ($limit) must be positive, got '.$limit);
}

$this->queue = new Queue(
$limit, null, $operation ?? static fn (\Closure $cb) => Async\async($cb)()
$limit, null, static fn (\Closure $cb) => Async\async($cb)()
);
}

Expand Down
2 changes: 1 addition & 1 deletion src/Concurrency/Runner.php → src/Concurrency/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

namespace Fansipan\Peak\Concurrency;

interface Runner
interface Worker
{
/**
* Run the functions in the tasks iterable concurrently, without waiting until the previous function has completed.
Expand Down
2 changes: 1 addition & 1 deletion src/ConnectorPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,6 @@ public function send(iterable $requests): array
}
};

return $this->getRunner($this->client)->run($promises($this->connector, $requests));
return $this->createWorker($this->client)->run($promises($this->connector, $requests));
}
}
4 changes: 2 additions & 2 deletions src/PoolFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class PoolFactory
* @throws \Fansipan\Peak\Exception\UnsupportedClientException
* @throws \Fansipan\Peak\Exception\UnsupportedFeatureException
*/
public static function createForClient(ClientInterface $client): Pool
public static function createFromClient(ClientInterface $client): Pool
{
return new ClientPool(AsyncClientFactory::create($client));
}
Expand All @@ -28,7 +28,7 @@ public static function createForClient(ClientInterface $client): Pool
* @throws \Fansipan\Peak\Exception\UnsupportedClientException
* @throws \Fansipan\Peak\Exception\UnsupportedFeatureException
*/
public static function createForConnector(ConnectorInterface $connector): Pool
public static function createFromConnector(ConnectorInterface $connector): Pool
{
$client = $connector->client();

Expand Down
12 changes: 6 additions & 6 deletions src/PoolTrait.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@

use Fansipan\Peak\Client\AsyncClientInterface;
use Fansipan\Peak\Concurrency\Driver;
use Fansipan\Peak\Concurrency\PslConcurrency;
use Fansipan\Peak\Concurrency\ReactConcurrency;
use Fansipan\Peak\Concurrency\Runner;
use Fansipan\Peak\Concurrency\PslWorker;
use Fansipan\Peak\Concurrency\ReactWorker;
use Fansipan\Peak\Concurrency\Worker;
use Fansipan\Peak\Exception\UnsupportedFeatureException;

trait PoolTrait
Expand All @@ -35,13 +35,13 @@ public function concurrent(int $concurrency): Pool
return $clone;
}

private function getRunner(AsyncClientInterface $client, ?\Closure $operation = null): Runner
private function createWorker(AsyncClientInterface $client): Worker
{
$driver = $client->driver();

return match (true) {
$driver === Driver::PSL => new PslConcurrency($this->concurrency, $operation),
$driver === Driver::REACT => new ReactConcurrency($this->concurrency, $operation),
$driver === Driver::PSL => new PslWorker($this->concurrency),
$driver === Driver::REACT => new ReactWorker($this->concurrency),
default => throw new UnsupportedFeatureException('You cannot use the concurrent request pool feature as the required packages are not installed.'),
};
}
Expand Down
2 changes: 1 addition & 1 deletion tests/DelayTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private function runPoolDelayTests(AsyncClientInterface $client, int $totalReque
}
};

$pool = PoolFactory::createForClient($client);
$pool = PoolFactory::createFromClient($client);
$pool->send($requests($totalRequests));
}

Expand Down
12 changes: 6 additions & 6 deletions tests/PoolTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,13 @@ public function test_invalid_pool_request(): void
{
$client = new ReactClient();

$clientPool = PoolFactory::createForClient($client);
$clientPool = PoolFactory::createFromClient($client);

$this->expectException(InvalidPoolRequestException::class);

$clientPool->send([1, 2, 3]);

$connectorPool = PoolFactory::createForConnector(
$connectorPool = PoolFactory::createFromConnector(
(new GenericConnector())->withClient($client)
);

Expand All @@ -78,18 +78,18 @@ public function test_invalid_pool_request(): void

public function test_pool_factory(): void
{
$pool = PoolFactory::createForConnector((new GenericConnector())->withClient(new Client()));
$pool = PoolFactory::createFromConnector((new GenericConnector())->withClient(new Client()));
$this->assertInstanceOf(GuzzleClient::class, $this->getClientFromPool($pool));

$pool = PoolFactory::createForConnector((new GenericConnector())->withClient(new Psr18Client()));
$pool = PoolFactory::createFromConnector((new GenericConnector())->withClient(new Psr18Client()));
$this->assertInstanceOf(SymfonyClient::class, $this->getClientFromPool($pool));

$this->expectException(UnsupportedClientException::class);
$pool = PoolFactory::createForClient(new FakeHttpClient());
$pool = PoolFactory::createFromClient(new FakeHttpClient());

DriverDiscovery::prefer(Driver::REACT);

$pool = PoolFactory::createForClient(AsyncClientFactory::create(new FakeHttpClient()));
$pool = PoolFactory::createFromClient(AsyncClientFactory::create(new FakeHttpClient()));
$this->assertInstanceOf(ReactClient::class, $this->getClientFromPool($pool));
}

Expand Down

0 comments on commit ba233d2

Please sign in to comment.