From 74aa106c37283e746fab112d655c1ee0065d3bd1 Mon Sep 17 00:00:00 2001 From: L3tum <9307432+L3tum@users.noreply.github.com> Date: Mon, 12 Feb 2024 14:36:15 +0100 Subject: [PATCH] fix: Add configurable buffer threshold and change exception message --- src/RPC/MultiRPC.php | 30 +++++++++++++++++++++++++----- tests/Goridge/MultiRPC.php | 2 +- 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/src/RPC/MultiRPC.php b/src/RPC/MultiRPC.php index 5171d58..6ccee37 100644 --- a/src/RPC/MultiRPC.php +++ b/src/RPC/MultiRPC.php @@ -16,6 +16,12 @@ class MultiRPC extends AbstractRPC implements AsyncRPCInterface { + /** + * The default is at 10_000 because in tests things like Doctrine hammer this very hard when used in caching. + * A limit of 1_000 was hit repeatedly. Make it configurable though in case someone wants to change it. + */ + private const DEFAULT_BUFFER_THRESHOLD = 10_000; + /** * @var array */ @@ -40,20 +46,24 @@ class MultiRPC extends AbstractRPC implements AsyncRPCInterface */ private array $asyncResponseBuffer = []; + private int $asyncBufferThreshold = self::DEFAULT_BUFFER_THRESHOLD; + /** * @param array $relays */ public function __construct( array $relays, + int $asyncBufferThreshold = self::DEFAULT_BUFFER_THRESHOLD, CodecInterface $codec = new JsonCodec() ) { foreach ($relays as $relay) { if (!($relay instanceof SocketRelay)) { - throw new RPCException("MultiRPC can only be used with sockets, no pipes allowed"); + throw new RPCException("MultiRPC can only be used with SocketRelay"); } } $this->freeRelays = $relays; + $this->asyncBufferThreshold = $asyncBufferThreshold; parent::__construct($codec); } @@ -63,6 +73,9 @@ public function __construct( */ public function __clone() { + // Clone both freeRelays and occupiedRelays since both array handle Relay objects and thus, not cloning both of them + // would result in relays being mixed between instances and run into issues when one instance expects + // a relay to be in occupiedRelays when it's in freeRelays in the other instance. foreach ($this->freeRelays as $key => $relay) { $this->freeRelays[$key] = clone $relay; } @@ -76,8 +89,12 @@ public function __clone() * @param non-empty-string $connection * @param positive-int $count */ - public static function create(string $connection, int $count = 50, CodecInterface $codec = new JsonCodec()): self - { + public static function create( + string $connection, + int $count = 50, + int $asyncBufferThreshold = self::DEFAULT_BUFFER_THRESHOLD, + CodecInterface $codec = new JsonCodec() + ): self { assert($count > 0); $relays = []; @@ -85,12 +102,13 @@ public static function create(string $connection, int $count = 50, CodecInterfac $relays[] = Relay::create($connection); } - return new self($relays, $codec); + return new self($relays, $asyncBufferThreshold, $codec); } /** * Force-connects all SocketRelays. * Does nothing if no SocketRelay. + * @throws RelayException */ public function preConnectRelays(): void { @@ -142,7 +160,7 @@ public function callIgnoreResponse(string $method, mixed $payload): void public function callAsync(string $method, mixed $payload): int { // Flush buffer if someone doesn't call getResponse - if (count($this->asyncResponseBuffer) > 10_000) { + if (count($this->asyncResponseBuffer) > $this->asyncBufferThreshold) { // We don't need to clean up occupiedRelays here since the buffer is solely for responses already // fetched from relays, and those relays are put back to freeRelays in getNextFreeRelay() $this->seqToRelayMap = array_diff_key($this->seqToRelayMap, $this->asyncResponseBuffer); @@ -166,10 +184,12 @@ public function callAsync(string $method, mixed $payload): int public function hasResponse(int $seq): bool { + // Check if we have the response buffered previously due to congestion if (isset($this->asyncResponseBuffer[$seq])) { return true; } + // Else check if the relay has the response in its buffer if ($this->seqToRelayMap[$seq]->hasFrame()) { return true; } diff --git a/tests/Goridge/MultiRPC.php b/tests/Goridge/MultiRPC.php index 6cb5f2c..76e4927 100644 --- a/tests/Goridge/MultiRPC.php +++ b/tests/Goridge/MultiRPC.php @@ -686,7 +686,7 @@ public function testHandleCloneCorrectly(): void public function testAllowsOnlySockets(): void{ $this->expectException(RPCException::class); - $this->expectExceptionMessage("MultiRPC can only be used with sockets, no pipes allowed"); + $this->expectExceptionMessage("MultiRPC can only be used with SocketRelay"); $this->rpc = new GoridgeMultiRPC([new StreamRelay(STDIN, STDOUT)]); }