Skip to content

Commit

Permalink
fix: Add configurable buffer threshold and change exception message
Browse files Browse the repository at this point in the history
  • Loading branch information
L3tum committed Feb 12, 2024
1 parent bead301 commit 74aa106
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 6 deletions.
30 changes: 25 additions & 5 deletions src/RPC/MultiRPC.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@

class MultiRPC extends AbstractRPC implements AsyncRPCInterface
{
/**
* The default is at 10_000 because in tests things like Doctrine hammer this very hard when used in caching.
* A limit of 1_000 was hit repeatedly. Make it configurable though in case someone wants to change it.
*/
private const DEFAULT_BUFFER_THRESHOLD = 10_000;

/**
* @var array<int, RelayInterface>
*/
Expand All @@ -40,20 +46,24 @@ class MultiRPC extends AbstractRPC implements AsyncRPCInterface
*/
private array $asyncResponseBuffer = [];

private int $asyncBufferThreshold = self::DEFAULT_BUFFER_THRESHOLD;

/**
* @param array<int, RelayInterface> $relays
*/
public function __construct(
array $relays,
int $asyncBufferThreshold = self::DEFAULT_BUFFER_THRESHOLD,
CodecInterface $codec = new JsonCodec()
) {
foreach ($relays as $relay) {
if (!($relay instanceof SocketRelay)) {
throw new RPCException("MultiRPC can only be used with sockets, no pipes allowed");
throw new RPCException("MultiRPC can only be used with SocketRelay");
}
}

$this->freeRelays = $relays;
$this->asyncBufferThreshold = $asyncBufferThreshold;
parent::__construct($codec);
}

Expand All @@ -63,6 +73,9 @@ public function __construct(
*/
public function __clone()
{
// Clone both freeRelays and occupiedRelays since both array handle Relay objects and thus, not cloning both of them
// would result in relays being mixed between instances and run into issues when one instance expects
// a relay to be in occupiedRelays when it's in freeRelays in the other instance.
foreach ($this->freeRelays as $key => $relay) {
$this->freeRelays[$key] = clone $relay;
}
Expand All @@ -76,21 +89,26 @@ public function __clone()
* @param non-empty-string $connection
* @param positive-int $count
*/
public static function create(string $connection, int $count = 50, CodecInterface $codec = new JsonCodec()): self
{
public static function create(

Check warning on line 92 in src/RPC/MultiRPC.php

View check run for this annotation

Codecov / codecov/patch

src/RPC/MultiRPC.php#L92

Added line #L92 was not covered by tests
string $connection,
int $count = 50,
int $asyncBufferThreshold = self::DEFAULT_BUFFER_THRESHOLD,
CodecInterface $codec = new JsonCodec()
): self {
assert($count > 0);
$relays = [];

Check warning on line 99 in src/RPC/MultiRPC.php

View check run for this annotation

Codecov / codecov/patch

src/RPC/MultiRPC.php#L99

Added line #L99 was not covered by tests

for ($i = 0; $i < $count; $i++) {
$relays[] = Relay::create($connection);

Check warning on line 102 in src/RPC/MultiRPC.php

View check run for this annotation

Codecov / codecov/patch

src/RPC/MultiRPC.php#L101-L102

Added lines #L101 - L102 were not covered by tests
}

return new self($relays, $codec);
return new self($relays, $asyncBufferThreshold, $codec);

Check warning on line 105 in src/RPC/MultiRPC.php

View check run for this annotation

Codecov / codecov/patch

src/RPC/MultiRPC.php#L105

Added line #L105 was not covered by tests
}

/**
* Force-connects all SocketRelays.
* Does nothing if no SocketRelay.
* @throws RelayException
*/
public function preConnectRelays(): void
{
Expand Down Expand Up @@ -142,7 +160,7 @@ public function callIgnoreResponse(string $method, mixed $payload): void
public function callAsync(string $method, mixed $payload): int
{
// Flush buffer if someone doesn't call getResponse
if (count($this->asyncResponseBuffer) > 10_000) {
if (count($this->asyncResponseBuffer) > $this->asyncBufferThreshold) {
// We don't need to clean up occupiedRelays here since the buffer is solely for responses already
// fetched from relays, and those relays are put back to freeRelays in getNextFreeRelay()
$this->seqToRelayMap = array_diff_key($this->seqToRelayMap, $this->asyncResponseBuffer);
Expand All @@ -166,10 +184,12 @@ public function callAsync(string $method, mixed $payload): int

public function hasResponse(int $seq): bool
{
// Check if we have the response buffered previously due to congestion
if (isset($this->asyncResponseBuffer[$seq])) {
return true;

Check warning on line 189 in src/RPC/MultiRPC.php

View check run for this annotation

Codecov / codecov/patch

src/RPC/MultiRPC.php#L189

Added line #L189 was not covered by tests
}

// Else check if the relay has the response in its buffer
if ($this->seqToRelayMap[$seq]->hasFrame()) {
return true;

Check warning on line 194 in src/RPC/MultiRPC.php

View check run for this annotation

Codecov / codecov/patch

src/RPC/MultiRPC.php#L194

Added line #L194 was not covered by tests
}
Expand Down
2 changes: 1 addition & 1 deletion tests/Goridge/MultiRPC.php
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ public function testHandleCloneCorrectly(): void

public function testAllowsOnlySockets(): void{
$this->expectException(RPCException::class);
$this->expectExceptionMessage("MultiRPC can only be used with sockets, no pipes allowed");
$this->expectExceptionMessage("MultiRPC can only be used with SocketRelay");
$this->rpc = new GoridgeMultiRPC([new StreamRelay(STDIN, STDOUT)]);
}

Expand Down

0 comments on commit 74aa106

Please sign in to comment.