diff --git a/src/ConnectedRelayInterface.php b/src/ConnectedRelayInterface.php index 26c63c3..ce15cd3 100644 --- a/src/ConnectedRelayInterface.php +++ b/src/ConnectedRelayInterface.php @@ -30,6 +30,7 @@ public function close(): void; /** * Enforce implementation of __clone magic method + * @psalm-return void */ public function __clone(); } diff --git a/src/RPC/AbstractRPC.php b/src/RPC/AbstractRPC.php index e23002b..ab4e593 100644 --- a/src/RPC/AbstractRPC.php +++ b/src/RPC/AbstractRPC.php @@ -29,8 +29,7 @@ abstract class AbstractRPC implements RPCInterface public function __construct( protected CodecInterface $codec - ) - { + ) { } /** diff --git a/src/RPC/MultiRPC.php b/src/RPC/MultiRPC.php index cf6544f..0661553 100644 --- a/src/RPC/MultiRPC.php +++ b/src/RPC/MultiRPC.php @@ -10,7 +10,6 @@ use Spiral\Goridge\Frame; use Spiral\Goridge\MultiRelayHelper; use Spiral\Goridge\Relay; -use Spiral\Goridge\RelayInterface; use Spiral\Goridge\RPC\Codec\JsonCodec; use Spiral\Goridge\RPC\Exception\RPCException; use Spiral\Goridge\SocketRelay; @@ -26,13 +25,13 @@ class MultiRPC extends AbstractRPC implements AsyncRPCInterface /** * @var array */ - private array $freeRelays = []; + private static array $freeRelays = []; /** * Occupied Relays is a map of seq to relay to make removal easier once a response is received. * @var array */ - private array $occupiedRelays = []; + private static array $occupiedRelays = []; /** * A map of seq to relay to use for decodeResponse(). @@ -40,7 +39,7 @@ class MultiRPC extends AbstractRPC implements AsyncRPCInterface * * @var array */ - private array $seqToRelayMap = []; + private static array $seqToRelayMap = []; /** * Map of seq to response Frame @@ -48,7 +47,7 @@ class MultiRPC extends AbstractRPC implements AsyncRPCInterface * * @var array */ - private array $asyncResponseBuffer = []; + private static array $asyncResponseBuffer = []; /** * The threshold after which the asyncResponseBuffer is flushed of all entries. @@ -79,29 +78,12 @@ public function __construct( } } - $this->freeRelays = $relays; + self::$freeRelays = $relays; + self::$occupiedRelays = self::$seqToRelayMap = self::$asyncResponseBuffer = []; $this->asyncBufferThreshold = $asyncBufferThreshold; parent::__construct($codec); } - /** - * Without cloning relays explicitly they get shared and thus, when one RPC gets called, the freeRelays array - * in the other RPC stays the same, making it reuse the just-used and still occupied relay. - */ - public function __clone() - { - // Clone both freeRelays and occupiedRelays since both array handle Relay objects and thus, not cloning both of them - // would result in relays being mixed between instances and run into issues when one instance expects - // a relay to be in occupiedRelays when it's in freeRelays in the other instance. - foreach ($this->freeRelays as $key => $relay) { - $this->freeRelays[$key] = clone $relay; - } - foreach ($this->occupiedRelays as $key => $relay) { - $this->occupiedRelays[$key] = clone $relay; - } - $this->seqToRelayMap = $this->asyncResponseBuffer = []; - } - /** * @param non-empty-string $connection * @param positive-int $count @@ -130,17 +112,16 @@ public static function create( */ public function preConnectRelays(): void { - foreach ($this->freeRelays as $relay) { + foreach (self::$freeRelays as $relay) { // Force connect $relay->connect(); } } - public function call(string $method, mixed $payload, mixed $options = null): mixed { $relayIndex = $this->ensureFreeRelayAvailable(); - $relay = $this->freeRelays[$relayIndex]; + $relay = self::$freeRelays[$relayIndex]; $relay->send($this->packFrame($method, $payload)); @@ -155,38 +136,38 @@ public function call(string $method, mixed $payload, mixed $options = null): mix public function callIgnoreResponse(string $method, mixed $payload): void { $relayIndex = $this->ensureFreeRelayAvailable(); - $relay = $this->freeRelays[$relayIndex]; + $relay = self::$freeRelays[$relayIndex]; $relay->send($this->packFrame($method, $payload)); $seq = self::$seq; self::$seq++; - $this->occupiedRelays[$seq] = $relay; + self::$occupiedRelays[$seq] = $relay; // Last index so no need for array_pop or stuff - unset($this->freeRelays[$relayIndex]); + unset(self::$freeRelays[$relayIndex]); } public function callAsync(string $method, mixed $payload): int { // Flush buffer if someone doesn't call getResponse - if (count($this->asyncResponseBuffer) > $this->asyncBufferThreshold) { + if (count(self::$asyncResponseBuffer) > $this->asyncBufferThreshold) { // We don't need to clean up occupiedRelays here since the buffer is solely for responses already // fetched from relays, and those relays are put back to freeRelays in getNextFreeRelay() - $this->seqToRelayMap = array_diff_key($this->seqToRelayMap, $this->asyncResponseBuffer); - $this->asyncResponseBuffer = []; + self::$seqToRelayMap = array_diff_key(self::$seqToRelayMap, self::$asyncResponseBuffer); + self::$asyncResponseBuffer = []; } $relayIndex = $this->ensureFreeRelayAvailable(); - $relay = $this->freeRelays[$relayIndex]; + $relay = self::$freeRelays[$relayIndex]; $relay->send($this->packFrame($method, $payload)); $seq = self::$seq; self::$seq++; - $this->occupiedRelays[$seq] = $relay; - $this->seqToRelayMap[$seq] = $relay; + self::$occupiedRelays[$seq] = $relay; + self::$seqToRelayMap[$seq] = $relay; // Last index so no need for array_pop or stuff - unset($this->freeRelays[$relayIndex]); + unset(self::$freeRelays[$relayIndex]); return $seq; } @@ -194,12 +175,12 @@ public function callAsync(string $method, mixed $payload): int public function hasResponse(int $seq): bool { // Check if we have the response buffered previously due to congestion - if (isset($this->asyncResponseBuffer[$seq])) { + if (isset(self::$asyncResponseBuffer[$seq])) { return true; } // Else check if the relay has the response in its buffer - if ($this->seqToRelayMap[$seq]->hasFrame()) { + if (self::$seqToRelayMap[$seq]->hasFrame()) { return true; } @@ -214,11 +195,11 @@ public function hasResponses(array $seqs): array $seqsWithResponse = []; foreach ($seqs as $seq) { - if (isset($this->asyncResponseBuffer[$seq])) { + if (isset(self::$asyncResponseBuffer[$seq])) { $seqsWithResponse[] = $seq; - } elseif (isset($this->seqToRelayMap[$seq])) { + } elseif (isset(self::$seqToRelayMap[$seq])) { $relayIndexToSeq[count($relays)] = $seq; - $relays[] = $this->seqToRelayMap[$seq]; + $relays[] = self::$seqToRelayMap[$seq]; } } @@ -238,8 +219,8 @@ public function hasResponses(array $seqs): array public function getResponse(int $seq, mixed $options = null): mixed { - $relay = $this->seqToRelayMap[$seq] ?? throw new RPCException('Invalid sequence number. This may occur if the number was already used, the buffers were flushed due to insufficient getResponse calling, or with a plain inccorect number. Please check your code.'); - unset($this->seqToRelayMap[$seq]); + $relay = self::$seqToRelayMap[$seq] ?? throw new RPCException('Invalid sequence number. This may occur if the number was already used, the buffers were flushed due to insufficient getResponse calling, or with a plain inccorect number. Please check your code.'); + unset(self::$seqToRelayMap[$seq]); if (($frame = $this->getResponseFromBuffer($seq)) !== null) { /** @@ -248,8 +229,8 @@ public function getResponse(int $seq, mixed $options = null): mixed * Thus we only re-add (and do so without searching for it first) if we don't have the response yet. */ } else { - $this->freeRelays[] = $this->occupiedRelays[$seq]; - unset($this->occupiedRelays[$seq]); + self::$freeRelays[] = self::$occupiedRelays[$seq]; + unset(self::$occupiedRelays[$seq]); $frame = $this->getResponseFromRelay($relay, $seq, true); } @@ -268,7 +249,7 @@ public function getResponses(array $seqs, mixed $options = null): iterable $seqsKeyed = []; foreach ($seqs as $seq) { - if (isset($this->asyncResponseBuffer[$seq])) { + if (isset(self::$asyncResponseBuffer[$seq])) { // We can use getResponse() here since it's doing basically what we want to do here anyway yield $seq => $this->getResponse($seq, $options); } else { @@ -277,7 +258,7 @@ public function getResponses(array $seqs, mixed $options = null): iterable } // Fetch all relays that are still occupied and which we need responses from - $seqsToRelays = array_intersect_key($this->occupiedRelays, $seqsKeyed); + $seqsToRelays = array_intersect_key(self::$occupiedRelays, $seqsKeyed); // Make sure we have relays for all $seqs, otherwise something went wrong if (count($seqsToRelays) !== count($seqsKeyed)) { @@ -294,7 +275,7 @@ public function getResponses(array $seqs, mixed $options = null): iterable if ($seqsReceivedResponse === false) { if ($this->checkAllOccupiedRelaysStillConnected()) { // Check if we've lost a relay we were waiting on, if so we need to quit since something is wrong. - if (count(array_diff_key($seqsToRelays, $this->occupiedRelays)) > 0) { + if (count(array_diff_key($seqsToRelays, self::$occupiedRelays)) > 0) { throw new RPCException("Invalid sequence number. This may occur if the number was already used, the buffers were flushed due to insufficient getResponse calling, or with a plain inccorect number. Please check your code."); } } @@ -304,15 +285,15 @@ public function getResponses(array $seqs, mixed $options = null): iterable foreach ($seqsReceivedResponse as $seq) { // Add the previously occupied relay to freeRelays here so that we don't lose it in case of an error $relay = $seqsToRelays[$seq]; - $this->freeRelays[] = $relay; - unset($this->occupiedRelays[$seq]); + self::$freeRelays[] = $relay; + unset(self::$occupiedRelays[$seq]); // Yield the response $frame = $this->getResponseFromRelay($relay, $seq, true); yield $seq => $this->decodeResponse($frame, $relay, $options); // Unset tracking map - unset($seqsToRelays[$seq], $this->seqToRelayMap[$seq]); + unset($seqsToRelays[$seq], self::$seqToRelayMap[$seq]); } } } @@ -323,21 +304,21 @@ public function getResponses(array $seqs, mixed $options = null): iterable */ private function ensureFreeRelayAvailable(): int { - if (count($this->freeRelays) > 0) { - // Return the last entry on $this->freeRelays so that further code can use unset() instead of array_splice (index handling) + if (count(self::$freeRelays) > 0) { + // Return the last entry on self::$freeRelays so that further code can use unset() instead of array_splice (index handling) /** @psalm-return int */ - return array_key_last($this->freeRelays); + return array_key_last(self::$freeRelays); } - if (count($this->occupiedRelays) === 0) { + if (count(self::$occupiedRelays) === 0) { // If we have neither freeRelays nor occupiedRelays then someone either initialized this with 0 relays // or something went terribly wrong. Either way we need to quit. throw new RPCException("No relays available at all"); } - while (count($this->freeRelays) === 0) { + while (count(self::$freeRelays) === 0) { /** @var positive-int[]|false $index */ - $index = MultiRelayHelper::findRelayWithMessage($this->occupiedRelays); + $index = MultiRelayHelper::findRelayWithMessage(self::$occupiedRelays); if ($index === false) { // Check if all currently occupied relays are even still connected. Do another loop if they aren't. @@ -346,7 +327,7 @@ private function ensureFreeRelayAvailable(): int } else { // Just choose the first occupiedRelay to wait on since instead we may busyloop here // checking relay status and not giving RR the chance to actually answer (in a single core env for example). - $index = [array_key_first($this->occupiedRelays)]; + $index = [array_key_first(self::$occupiedRelays)]; } } @@ -356,13 +337,13 @@ private function ensureFreeRelayAvailable(): int $seq = $index[$i]; // Move relay from occupiedRelays into freeRelays before trying to get the response from it // in case something happens, so we don't lose it. - $relay = $this->occupiedRelays[$seq]; - $this->freeRelays[] = $relay; - unset($this->occupiedRelays[$seq]); + $relay = self::$occupiedRelays[$seq]; + self::$freeRelays[] = $relay; + unset(self::$occupiedRelays[$seq]); // Save response if in seqToRelayMap (aka a response is expected) // only save response in case of mismatched seq = response not in seqToRelayMap try { - $this->getResponseFromRelay($relay, $seq, !isset($this->seqToRelayMap[$seq])); + $this->getResponseFromRelay($relay, $seq, !isset(self::$seqToRelayMap[$seq])); } catch (RelayException|RPCException) { // Intentionally left blank } @@ -372,8 +353,8 @@ private function ensureFreeRelayAvailable(): int // Sometimes check if all occupied relays are even still connected $this->checkAllOccupiedRelaysStillConnected(); - // Return the last entry on $this->freeRelays so that further code can use unset() instead of array_splice (index handling) - return array_key_last($this->freeRelays); + // Return the last entry on self::$freeRelays so that further code can use unset() instead of array_splice (index handling) + return array_key_last(self::$freeRelays); } /** @@ -398,7 +379,7 @@ private function getResponseFromRelay(ConnectedRelayInterface $relay, int $expec // Save response since $seq was invalid but the response may not /** @var positive-int $responseSeq */ $responseSeq = $frame->options[0]; - $this->asyncResponseBuffer[$responseSeq] = $frame; + self::$asyncResponseBuffer[$responseSeq] = $frame; throw new RPCException('Invalid RPC frame, sequence mismatch'); } @@ -408,7 +389,7 @@ private function getResponseFromRelay(ConnectedRelayInterface $relay, int $expec // we'll need to add it to the buffer. // This is used in e.g. flushing a relay in ensureFreeRelay() // so that we can at least *try* to get the resonse back to the user. - $this->asyncResponseBuffer[$expectedSeq] = $frame; + self::$asyncResponseBuffer[$expectedSeq] = $frame; } return $frame; @@ -422,8 +403,8 @@ private function getResponseFromRelay(ConnectedRelayInterface $relay, int $expec */ private function getResponseFromBuffer(int $seq): ?Frame { - if (($frame = $this->asyncResponseBuffer[$seq] ?? null) !== null) { - unset($this->asyncResponseBuffer[$seq]); + if (($frame = self::$asyncResponseBuffer[$seq] ?? null) !== null) { + unset(self::$asyncResponseBuffer[$seq]); } return $frame; @@ -431,11 +412,11 @@ private function getResponseFromBuffer(int $seq): ?Frame private function checkAllOccupiedRelaysStillConnected(): bool { - if (($relaysNotConnected = MultiRelayHelper::checkConnected($this->occupiedRelays)) !== false) { + if (($relaysNotConnected = MultiRelayHelper::checkConnected(self::$occupiedRelays)) !== false) { /** @var positive-int $seq */ foreach ($relaysNotConnected as $seq) { - $this->freeRelays[] = $this->occupiedRelays[$seq]; - unset($this->seqToRelayMap[$seq], $this->occupiedRelays[$seq]); + self::$freeRelays[] = self::$occupiedRelays[$seq]; + unset(self::$seqToRelayMap[$seq], self::$occupiedRelays[$seq]); } return true; diff --git a/tests/Goridge/MsgPackMultiRPCTest.php b/tests/Goridge/MsgPackMultiRPCTest.php index fd451bb..a66e2e1 100644 --- a/tests/Goridge/MsgPackMultiRPCTest.php +++ b/tests/Goridge/MsgPackMultiRPCTest.php @@ -7,7 +7,6 @@ use Exception; use Spiral\Goridge\RPC\Codec\MsgpackCodec; use Spiral\Goridge\RPC\Exception\ServiceException; -use Spiral\Goridge\RPC\MultiRPC; class MsgPackMultiRPCTest extends \Spiral\Goridge\Tests\MultiRPC { @@ -17,34 +16,25 @@ class MsgPackMultiRPCTest extends \Spiral\Goridge\Tests\MultiRPC public function testJsonException(): void { $this->expectException(ServiceException::class); - - $conn = $this->makeRPC(); - - $conn->call('Service.Process', random_bytes(256)); + $this->rpc->call('Service.Process', random_bytes(256)); } public function testJsonExceptionAsync(): void { - $conn = $this->makeRPC(); - $id = $conn->callAsync('Service.Process', random_bytes(256)); + $id = $this->rpc->callAsync('Service.Process', random_bytes(256)); $this->expectException(ServiceException::class); - $conn->getResponse($id); + $this->rpc->getResponse($id); } public function testJsonExceptionNotThrownWithIgnoreResponse(): void { - $conn = $this->makeRPC(); - $conn->callIgnoreResponse('Service.Process', random_bytes(256)); - - $this->forceFlushRpc($conn); + $this->rpc->callIgnoreResponse('Service.Process', random_bytes(256)); + $this->forceFlushRpc(); } - - /** - * @return MultiRPC - */ - protected function makeRPC(int $count = 10): MultiRPC + protected function makeRPC(int $count = 10): void { - return parent::makeRPC($count)->withCodec(new MsgpackCodec()); + parent::makeRPC($count); + $this->rpc = $this->rpc->withCodec(new MsgpackCodec()); } } diff --git a/tests/Goridge/MultiRPC.php b/tests/Goridge/MultiRPC.php index 1bed70c..0e6616d 100644 --- a/tests/Goridge/MultiRPC.php +++ b/tests/Goridge/MultiRPC.php @@ -10,6 +10,7 @@ use Spiral\Goridge\ConnectedRelayInterface; use Spiral\Goridge\Exception\TransportException; use Spiral\Goridge\RelayInterface; +use Spiral\Goridge\RPC\Codec\JsonCodec; use Spiral\Goridge\RPC\Codec\MsgpackCodec; use Spiral\Goridge\RPC\Codec\RawCodec; use Spiral\Goridge\RPC\Exception\CodecException; @@ -26,7 +27,8 @@ abstract class MultiRPC extends TestCase public const SOCK_ADDR = '127.0.0.1'; public const SOCK_PORT = 7079; public const SOCK_TYPE = SocketType::TCP; - private GoridgeMultiRPC $rpc; + protected GoridgeMultiRPC $rpc; + private int $expectedNumberOfRelays; public function testManualConnect(): void { @@ -36,39 +38,39 @@ public function testManualConnect(): void } /** @var SocketRelay $relay */ $relay = $relays[0]; - $conn = new GoridgeMultiRPC($relays); + $this->rpc = new GoridgeMultiRPC($relays); + $this->expectedNumberOfRelays = 10; $this->assertFalse($relay->isConnected()); $relay->connect(); $this->assertTrue($relay->isConnected()); - $this->assertSame('pong', $conn->call('Service.Ping', 'ping')); + $this->assertSame('pong', $this->rpc->call('Service.Ping', 'ping')); $this->assertTrue($relay->isConnected()); - $conn->preConnectRelays(); + $this->rpc->preConnectRelays(); foreach ($relays as $relay) { $this->assertTrue($relay->isConnected()); } - - $this->assertFreeRelaysCorrectNumber($conn); } public function testReconnect(): void { /** @var SocketRelay $relay */ $relay = $this->makeRelay(); - $conn = new GoridgeMultiRPC([$relay]); + $this->rpc = new GoridgeMultiRPC([$relay]); + $this->expectedNumberOfRelays = 1; $this->assertFalse($relay->isConnected()); - $this->assertSame('pong', $conn->call('Service.Ping', 'ping')); + $this->assertSame('pong', $this->rpc->call('Service.Ping', 'ping')); $this->assertTrue($relay->isConnected()); $relay->close(); $this->assertFalse($relay->isConnected()); - $this->assertSame('pong', $conn->call('Service.Ping', 'ping')); + $this->assertSame('pong', $this->rpc->call('Service.Ping', 'ping')); $this->assertTrue($relay->isConnected()); } @@ -85,13 +87,13 @@ public function testPingPongAsync(): void public function testPrefixPingPong(): void { - $this->rpc = $this->makeRPC()->withServicePrefix('Service'); + $this->rpc = $this->rpc->withServicePrefix('Service'); $this->assertSame('pong', $this->rpc->call('Ping', 'ping')); } public function testPrefixPingPongAsync(): void { - $this->rpc = $this->makeRPC()->withServicePrefix('Service'); + $this->rpc = $this->rpc->withServicePrefix('Service'); $id = $this->rpc->callAsync('Ping', 'ping'); $this->assertSame('pong', $this->rpc->getResponse($id)); } @@ -132,13 +134,13 @@ public function testNegateNegativeAsync(): void public function testInvalidService(): void { $this->expectException(ServiceException::class); - $this->rpc = $this->makeRPC()->withServicePrefix('Service2'); + $this->rpc = $this->rpc->withServicePrefix('Service2'); $this->assertSame('pong', $this->rpc->call('Ping', 'ping')); } public function testInvalidServiceAsync(): void { - $this->rpc = $this->makeRPC()->withServicePrefix('Service2'); + $this->rpc = $this->rpc->withServicePrefix('Service2'); $id = $this->rpc->callAsync('Ping', 'ping'); $this->expectException(ServiceException::class); $this->assertSame('pong', $this->rpc->getResponse($id)); @@ -147,13 +149,13 @@ public function testInvalidServiceAsync(): void public function testInvalidMethod(): void { $this->expectException(ServiceException::class); - $this->rpc = $this->makeRPC()->withServicePrefix('Service'); + $this->rpc = $this->rpc->withServicePrefix('Service'); $this->assertSame('pong', $this->rpc->call('Ping2', 'ping')); } public function testInvalidMethodAsync(): void { - $this->rpc = $this->makeRPC()->withServicePrefix('Service'); + $this->rpc = $this->rpc->withServicePrefix('Service'); $id = $this->rpc->callAsync('Ping2', 'ping'); $this->expectException(ServiceException::class); $this->assertSame('pong', $this->rpc->getResponse($id)); @@ -438,7 +440,7 @@ public function testSleepEchoIgnoreResponse(): void // Wait for response usleep(100_000); - $this->forceFlushRpc($this->rpc); + $this->forceFlushRpc(); } public function testCannotGetSameResponseTwice(): void @@ -477,7 +479,7 @@ public function testCanCallMoreTimesThanBufferAndNotGetResponses(): void // We cheat here since the order in which responses are discarded depends on when they are received $property = new ReflectionProperty(GoridgeMultiRPC::class, 'asyncResponseBuffer'); - $buffer = $property->getValue($this->rpc); + $buffer = $property->getValue(); foreach ($ids as $id) { if (!isset($buffer[$id])) { @@ -510,7 +512,7 @@ public function testHandleRelayDisconnect(): void { $id = $this->rpc->callAsync('Service.Ping', 'ping'); $property = new ReflectionProperty(GoridgeMultiRPC::class, 'occupiedRelays'); - $occupiedRelays = $property->getValue($this->rpc); + $occupiedRelays = $property->getValue(); $this->assertInstanceOf(SocketRelay::class, $occupiedRelays[$id]); $occupiedRelays[$id]->close(); $this->expectException(TransportException::class); @@ -521,7 +523,7 @@ public function testHandleRelayDisconnectWithPressure(): void { $id = $this->rpc->callAsync('Service.Ping', 'ping'); $property = new ReflectionProperty(GoridgeMultiRPC::class, 'occupiedRelays'); - $occupiedRelays = $property->getValue($this->rpc); + $occupiedRelays = $property->getValue(); $this->assertInstanceOf(SocketRelay::class, $occupiedRelays[$id]); $occupiedRelays[$id]->close(); @@ -539,7 +541,7 @@ public function testHandleRelayDisconnectWithPressure(): void // In the second one, the disconnected relay is only now discovered, which throws a TransportException instead. // We need to kind of force the issue in the second two tests. This one does whatever the MultiRPC has done. $property = new ReflectionProperty(GoridgeMultiRPC::class, 'seqToRelayMap'); - $discovered = !isset($property->getValue($this->rpc)[$id]); + $discovered = !isset($property->getValue()[$id]); if ($discovered) { $this->expectException(RPCException::class); @@ -555,7 +557,7 @@ public function testHandleRelayDisconnectWithPressureForceDiscovered(): void { $id = $this->rpc->callAsync('Service.Ping', 'ping'); $property = new ReflectionProperty(GoridgeMultiRPC::class, 'occupiedRelays'); - $occupiedRelays = $property->getValue($this->rpc); + $occupiedRelays = $property->getValue(); $this->assertInstanceOf(SocketRelay::class, $occupiedRelays[$id]); $occupiedRelays[$id]->close(); @@ -573,7 +575,7 @@ public function testHandleRelayDisconnectWithPressureForceDiscovered(): void // In the second one, the disconnected relay is only now discovered, which throws a TransportException instead. // We need to kind of force the issue in the second two tests. This one does whatever the MultiRPC has done. $property = new ReflectionProperty(GoridgeMultiRPC::class, 'seqToRelayMap'); - $discovered = !isset($property->getValue($this->rpc)[$id]); + $discovered = !isset($property->getValue()[$id]); if (!$discovered) { $method = new ReflectionMethod(GoridgeMultiRPC::class, 'checkAllOccupiedRelaysStillConnected'); @@ -589,7 +591,7 @@ public function testHandleRelayDisconnectWithPressureForceUndiscovered(): void { $id = $this->rpc->callAsync('Service.Ping', 'ping'); $occupiedProperty = new ReflectionProperty(GoridgeMultiRPC::class, 'occupiedRelays'); - $occupiedRelays = $occupiedProperty->getValue($this->rpc); + $occupiedRelays = $occupiedProperty->getValue(); $this->assertInstanceOf(SocketRelay::class, $occupiedRelays[$id]); $occupiedRelays[$id]->close(); @@ -607,20 +609,20 @@ public function testHandleRelayDisconnectWithPressureForceUndiscovered(): void // In the second one, the disconnected relay is only now discovered, which throws a TransportException instead. // We need to kind of force the issue in the second two tests. This one does whatever the MultiRPC has done. $mapProperty = new ReflectionProperty(GoridgeMultiRPC::class, 'seqToRelayMap'); - $seqToRelayMap = $mapProperty->getValue($this->rpc); + $seqToRelayMap = $mapProperty->getValue(); $discovered = !isset($seqToRelayMap[$id]); if ($discovered) { $property = new ReflectionProperty(GoridgeMultiRPC::class, 'freeRelays'); - $freeRelays = $property->getValue($this->rpc); + $freeRelays = $property->getValue(); $relay = array_pop($freeRelays); - $property->setValue($this->rpc, $freeRelays); + $property->setValue($freeRelays); assert($relay instanceof SocketRelay); $relay->close(); $seqToRelayMap[$id] = $relay; $occupiedRelays[$id] = $relay; - $mapProperty->setValue($this->rpc, $seqToRelayMap); - $occupiedProperty->setValue($this->rpc, $occupiedRelays); + $mapProperty->setValue($seqToRelayMap); + $occupiedProperty->setValue($occupiedRelays); $this->expectException(RPCException::class); @@ -637,7 +639,7 @@ public function testHandleRelayDisconnectWithPressureGetResponses(): void $ids = []; $ids[] = $id = $this->rpc->callAsync('Service.Ping', 'ping'); $property = new ReflectionProperty(GoridgeMultiRPC::class, 'occupiedRelays'); - $occupiedRelays = $property->getValue($this->rpc); + $occupiedRelays = $property->getValue(); $this->assertInstanceOf(SocketRelay::class, $occupiedRelays[$id]); $occupiedRelays[$id]->close(); @@ -657,33 +659,72 @@ public function testHandleRelayDisconnectWithPressureGetResponses(): void * Without cloning them explicitly they get shared and thus, when one RPC gets called, the freeRelays array * in the other RPC stays the same, making it reuse the just-used and still occupied relay. */ - public function testHandleCloneCorrectly(): void + public function testHandlesCloneCorrectly(): void { $this->rpc->preConnectRelays(); - $this->rpc->callIgnoreResponse('Service.Ping', 'ping'); - $clonedRpc = $this->rpc->withCodec(new MsgpackCodec()); + + // This is to support the MsgPackMultiRPC Tests + $property = new ReflectionProperty(GoridgeMultiRPC::class, 'codec'); + $codec = $property->getValue($this->rpc); + $clonedRpc = $this->rpc->withCodec($codec instanceof MsgpackCodec ? new JsonCodec() : new MsgpackCodec()); + + $property = new ReflectionProperty(GoridgeMultiRPC::class, 'freeRelays'); + foreach ($property->getValue() as $relay) { + /** @var ConnectedRelayInterface $relay */ + $this->assertTrue($relay->isConnected()); + } + + $ids = []; + $clonedIds = []; + + for ($i = 0; $i < 50; $i++) { + $ids[] = $this->rpc->callAsync('Service.Ping', 'ping'); + } + for ($i = 0; $i < 50; $i++) { - $clonedRpc->callIgnoreResponse('Service.Ping', 'ping'); + $clonedIds[] = $clonedRpc->callAsync('Service.Echo', 'Hello'); } // Wait 100ms for the response(s) usleep(100 * 1000); - // Close all relays in cloned RPC - $property = new ReflectionProperty(GoridgeMultiRPC::class, 'freeRelays'); - $propertyOccupied = new ReflectionProperty(GoridgeMultiRPC::class, 'occupiedRelays'); - $allRelays = [...$property->getValue($clonedRpc), ...$propertyOccupied->getValue($clonedRpc)]; - foreach ($allRelays as $relay) { - if ($relay instanceof SocketRelay && $relay->isConnected()) { - $relay->close(); + // Can use wrong RPC for response (unfortunately, but there's no easy solution) + try { + $response = $this->rpc->getResponse($clonedIds[0]); + $property = new ReflectionProperty(GoridgeMultiRPC::class, 'codec'); + + if ($property->getValue($this->rpc) instanceof MsgpackCodec) { + // Msgpack internally does not throw an error, only returns the encoded response because of course why + // would normal error handling be something that is important in a library. + // Locally this returned the number 34, but I'm not sure if there's some variation in that + // so we test on the expected response. + // This also notifies PHPUnit since msgpack logs a warning. + if ($response !== 'Hello') { + throw new CodecException("msgpack is a big meany"); + } } + + $this->fail("Should've thrown an Exception due to wrong codec"); + } catch (CodecException $exception) { + $this->assertNotEmpty($exception->getMessage()); } - foreach ($property->getValue($this->rpc) as $relay) { - if ($relay instanceof SocketRelay) { - $this->assertTrue($relay->isConnected()); - } + // The $seq should not be available anymore + try { + $response = $clonedRpc->getResponse($clonedIds[0]); + $this->fail("Should've thrown an exception due to wrong seq"); + } catch (RPCException $exception) { + $this->assertNotEmpty($exception->getMessage()); + } + + array_shift($clonedIds); + + foreach ($this->rpc->getResponses($ids) as $response) { + $this->assertSame('pong', $response); + } + + foreach ($clonedRpc->getResponses($clonedIds) as $response) { + $this->assertSame('Hello', $response); } - $this->assertSame('pong', $this->rpc->call('Service.Ping', 'ping')); } public function testAllowsOnlySockets(): void @@ -696,14 +737,14 @@ public function testAllowsOnlySockets(): void SocketRelay::class ) ); - $this->rpc = new GoridgeMultiRPC([new StreamRelay(STDIN, STDOUT)]); + new GoridgeMultiRPC([new StreamRelay(STDIN, STDOUT)]); } public function testNeedsAtLeastOne(): void { $this->expectException(RPCException::class); $this->expectExceptionMessage("MultiRPC needs at least one relay. Zero provided."); - $this->rpc = new GoridgeMultiRPC([]); + new GoridgeMultiRPC([]); } public function testChecksIfResponseIsInRelay(): void @@ -720,7 +761,7 @@ public function testChecksIfResponseIsInBuffer(): void $id = $this->rpc->callAsync('Service.Ping', 'ping'); // Wait a bit usleep(100 * 1000); - $this->forceFlushRpc($this->rpc); + $this->forceFlushRpc(); $this->assertTrue($this->rpc->hasResponse($id)); } @@ -735,7 +776,7 @@ public function testChecksMultipleResponses(): void { $ids = []; $ids[] = $this->rpc->callAsync('Service.Ping', 'ping'); - $this->forceFlushRpc($this->rpc); + $this->forceFlushRpc(); $ids[] = $this->rpc->callAsync('Service.Ping', 'ping'); usleep(100 * 1000); $ids[] = $this->rpc->callAsync('Service.Ping', 'ping'); @@ -753,39 +794,38 @@ public function testHasResponsesReturnsEmptyArrayWhenNoResponses(): void public function testGetResponsesReturnsWhenNoRelaysAvailableToAvoidInfiniteLoop(): void { - // occupiedRelays is already empty - $rpc = $this->makeRPC(); $property = new ReflectionProperty(GoridgeMultiRPC::class, 'freeRelays'); - $property->setValue($rpc, []); + $property->setValue([]); + $property = new ReflectionProperty(GoridgeMultiRPC::class, 'occupiedRelays'); + $property->setValue([]); + $this->expectedNumberOfRelays = 0; $this->expectException(RPCException::class); $this->expectExceptionMessage("No relays available at all"); - $rpc->call('Service.Ping', 'ping'); + $this->rpc->call('Service.Ping', 'ping'); } public function testMultiRPCIsUsableWithOneRelay(): void { - $rpc = $this->makeRPC(1); - $rpc->callIgnoreResponse('Service.Ping', 'ping'); - $rpc->callIgnoreResponse('Service.SleepEcho', 'Hello'); - $id = $rpc->callAsync('Service.Ping', 'ping'); - $rpc->callIgnoreResponse('Service.Echo', 'Hello'); - $this->assertSame('pong', $rpc->getResponse($id)); + $this->makeRPC(1); + $this->rpc->callIgnoreResponse('Service.Ping', 'ping'); + $this->rpc->callIgnoreResponse('Service.SleepEcho', 'Hello'); + $id = $this->rpc->callAsync('Service.Ping', 'ping'); + $this->rpc->callIgnoreResponse('Service.Echo', 'Hello'); + $this->assertSame('pong', $this->rpc->getResponse($id)); } protected function setUp(): void { - $this->rpc = $this->makeRPC(); + $this->makeRPC(); } - /** - * @return GoridgeMultiRPC - */ - protected function makeRPC(int $count = 10): GoridgeMultiRPC + protected function makeRPC(int $count = 10): void { $type = self::SOCK_TYPE->value; $address = self::SOCK_ADDR; $port = self::SOCK_PORT; - return GoridgeMultiRPC::create("$type://$address:$port", $count); + $this->rpc = GoridgeMultiRPC::create("$type://$address:$port", $count); + $this->expectedNumberOfRelays = $count; } /** @@ -798,33 +838,36 @@ protected function makeRelay(): RelayInterface protected function tearDown(): void { - $this->assertFreeRelaysCorrectNumber($this->rpc); + if (isset($this->rpc)) { + $this->assertFreeRelaysCorrectNumber(); + unset($this->rpc); + } } - protected function assertFreeRelaysCorrectNumber(GoridgeMultiRPC $rpc): void + protected function assertFreeRelaysCorrectNumber(): void { $property = new ReflectionProperty(GoridgeMultiRPC::class, 'freeRelays'); - $numberOfFreeRelays = count($property->getValue($rpc)); + $numberOfFreeRelays = count($property->getValue()); $property = new ReflectionProperty(GoridgeMultiRPC::class, 'occupiedRelays'); - $numberOfOccupiedRelays = count($property->getValue($rpc)); + $numberOfOccupiedRelays = count($property->getValue()); $property = new ReflectionProperty(GoridgeMultiRPC::class, 'seqToRelayMap'); - $numberOfWaitingResponses = count($property->getValue($rpc)); + $numberOfWaitingResponses = count($property->getValue()); $this->assertSame( - 10, + $this->expectedNumberOfRelays, $numberOfFreeRelays + $numberOfOccupiedRelays, "RPC has lost at least one relay! Waiting Responses: $numberOfWaitingResponses, Free Relays: $numberOfFreeRelays, Occupied Relays: $numberOfOccupiedRelays" ); } - protected function forceFlushRpc(GoridgeMultiRPC $rpc): void + protected function forceFlushRpc(): void { // Force consuming relay by flooding requests $ids = []; for ($i = 0; $i < 50; $i++) { - $ids[] = $rpc->callAsync('Service.Ping', 'ping'); + $ids[] = $this->rpc->callAsync('Service.Ping', 'ping'); } - foreach ($rpc->getResponses($ids) as $id => $response) { + foreach ($this->rpc->getResponses($ids) as $id => $response) { $this->assertSame('pong', $response); array_splice($ids, array_search($id, $ids, true), 1); }