Skip to content

Commit

Permalink
fix: StreamRelay in MultiRPC & Tests
Browse files Browse the repository at this point in the history
fix: Allowed usage of StreamRelay in MultiRPC since cloning isn't an issue
Also added a few more tests and failure checks
  • Loading branch information
L3tum committed Feb 23, 2024
1 parent a7ed4fb commit 4c621e4
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 50 deletions.
5 changes: 2 additions & 3 deletions src/MultiRelayHelper.php
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public static function findRelayWithMessage(array $relays, int $timeoutInMicrose
}

/**
* @param array<array-key, ConnectedRelayInterface> $relays
* @param array<array-key, RelayInterface> $relays
* @return array-key[]|false
* @internal
* Returns either
Expand All @@ -104,8 +104,7 @@ public static function checkConnected(array $relays): array|false

$keysNotConnected = [];
foreach ($relays as $key => $relay) {
assert($relay instanceof ConnectedRelayInterface);
if (!$relay->isConnected()) {
if ($relay instanceof ConnectedRelayInterface && !$relay->isConnected()) {
$relay->connect();
$keysNotConnected[] = $key;
}
Expand Down
74 changes: 50 additions & 24 deletions src/RPC/MultiRPC.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
use Spiral\Goridge\Frame;
use Spiral\Goridge\MultiRelayHelper;
use Spiral\Goridge\Relay;
use Spiral\Goridge\RelayInterface;
use Spiral\Goridge\RPC\Codec\JsonCodec;
use Spiral\Goridge\RPC\Exception\RPCException;
use Spiral\Goridge\SocketRelay;
Expand All @@ -21,23 +22,24 @@ class MultiRPC extends AbstractRPC implements AsyncRPCInterface
* A limit of 1_000 was hit repeatedly. Make it configurable though in case someone wants to change it.
*/
private const DEFAULT_BUFFER_THRESHOLD = 10_000;
const ERR_INVALID_SEQ_NUMBER = 'Invalid sequence number. This may occur if the number was already used, the buffers were flushed due to insufficient getResponse calling, or with a plain incorrect number. Please check your code.';

/**
* @var array<int, ConnectedRelayInterface>
* @var array<int, RelayInterface>
*/
private static array $freeRelays = [];

/**
* Occupied Relays is a map of seq to relay to make removal easier once a response is received.
* @var array<positive-int, ConnectedRelayInterface>
* @var array<positive-int, RelayInterface>
*/
private static array $occupiedRelays = [];

/**
* A map of seq to relay to use for decodeResponse().
* Technically the relay there is only needed in case of an error.
*
* @var array<positive-int, ConnectedRelayInterface>
* @var array<positive-int, RelayInterface>
*/
private static array $seqToRelayMap = [];

Expand All @@ -55,31 +57,51 @@ class MultiRPC extends AbstractRPC implements AsyncRPCInterface
private int $asyncBufferThreshold = self::DEFAULT_BUFFER_THRESHOLD;

/**
* @param array<int, ConnectedRelayInterface> $relays
* @param array<int, RelayInterface> $relays
*/
public function __construct(
array $relays,
int $asyncBufferThreshold = self::DEFAULT_BUFFER_THRESHOLD,
CodecInterface $codec = new JsonCodec()
) {
if (count($relays) === 0) {
// Check if we have at least one either existing or new relay here
if (count($relays) === 0 && count(self::$freeRelays) === 0 && count(self::$occupiedRelays) === 0) {
throw new RPCException("MultiRPC needs at least one relay. Zero provided.");
}

foreach ($relays as $relay) {
if (!($relay instanceof ConnectedRelayInterface)) {
throw new RPCException(
sprintf(
"MultiRPC can only be used with relays implementing the %s, such as %s",
ConnectedRelayInterface::class,
SocketRelay::class
)
);
if (count($relays) > 0) {
// Check if all new relays are of the same type
if (count(array_unique(array_map(static fn(RelayInterface $relay) => $relay::class, $relays))) > 1) {
throw new RPCException("MultiRPC can only be used with all relays of the same type, such as a " . SocketRelay::class);
}

// Check if the existing relays (if any) and the new relays are of the same type.
if (count(self::$freeRelays) > 0) {
$existingRelay = self::$freeRelays[0];
} elseif (count(self::$occupiedRelays) > 0) {
$existingRelay = self::$occupiedRelays[array_key_first(self::$occupiedRelays)];

Check warning on line 82 in src/RPC/MultiRPC.php

View check run for this annotation

Codecov / codecov/patch

src/RPC/MultiRPC.php#L82

Added line #L82 was not covered by tests
} else {
$existingRelay = null;
}

if ($existingRelay !== null && $existingRelay::class !== $relays[0]::class) {
throw new RPCException("MultiRPC can only be used with all relays of the same type, such as a " . SocketRelay::class);
}
}

self::$freeRelays = $relays;
self::$occupiedRelays = self::$seqToRelayMap = self::$asyncResponseBuffer = [];
// The relays (and related arrays) are static to support cloning this class.
// Basically the following problem exists:
// - If we make these arrays instance variables, then we need to recreate the relays on clone, otherwise we'd run into data issues.
// When we do that, the number of relays in existence can increase quite dramatically, resulting in balooning memory usage for socket buffers.
// - If we make these arrays static variables, then we need to make certain that they stay the same across all instances
// of this class. As a result the arrays are basically only appended on, and never deleted or modified.
// In the end that *can* mean that if someone were to repeatedly call `new MultiRPC([a bunch of relays])` that we'd
// tack all those relays into this array resulting in the same problem.
// It also means that different services can cannibalize the number of relays available to them,
// for example a Metrics service and a KV (Cache) service.
// IMHO(L3tum) a balooning memory usage that occurs unexpectly is way worse, than any of the other problems. In the end
// one can work against cannibalized relays by simply upping the number of relays at any point.
self::$freeRelays = [...self::$freeRelays, ...$relays];
$this->asyncBufferThreshold = $asyncBufferThreshold;
parent::__construct($codec);
}
Expand All @@ -95,11 +117,11 @@ public static function create(
CodecInterface $codec = new JsonCodec()
): self {
assert($count > 0);
$count = $count - count(self::$freeRelays) - count(self::$occupiedRelays);
$relays = [];

for ($i = 0; $i < $count; $i++) {
$relay = Relay::create($connection);
assert($relay instanceof ConnectedRelayInterface);
$relays[] = $relay;
}

Expand All @@ -113,8 +135,10 @@ public static function create(
public function preConnectRelays(): void
{
foreach (self::$freeRelays as $relay) {
// Force connect
$relay->connect();
if ($relay instanceof ConnectedRelayInterface) {
// Force connect
$relay->connect();
}
}
}

Expand Down Expand Up @@ -194,6 +218,8 @@ public function hasResponses(array $seqs): array
$relayIndexToSeq = [];
$seqsWithResponse = [];

// The behaviour is essentially the same as self::hasResponse, just mapped to multiple $seqs aka $relays.
// In order to use MultiRelayHelper we create a map of index => seq to map it back after checking for messages.
foreach ($seqs as $seq) {
if (isset(self::$asyncResponseBuffer[$seq])) {
$seqsWithResponse[] = $seq;
Expand All @@ -219,7 +245,7 @@ public function hasResponses(array $seqs): array

public function getResponse(int $seq, mixed $options = null): mixed
{
$relay = self::$seqToRelayMap[$seq] ?? throw new RPCException('Invalid sequence number. This may occur if the number was already used, the buffers were flushed due to insufficient getResponse calling, or with a plain inccorect number. Please check your code.');
$relay = self::$seqToRelayMap[$seq] ?? throw new RPCException(self::ERR_INVALID_SEQ_NUMBER);
unset(self::$seqToRelayMap[$seq]);

if (($frame = $this->getResponseFromBuffer($seq)) !== null) {
Expand Down Expand Up @@ -262,7 +288,7 @@ public function getResponses(array $seqs, mixed $options = null): iterable

// Make sure we have relays for all $seqs, otherwise something went wrong
if (count($seqsToRelays) !== count($seqsKeyed)) {
throw new RPCException("Invalid sequence number. This may occur if the number was already used, the buffers were flushed due to insufficient getResponse calling, or with a plain inccorect number. Please check your code.");
throw new RPCException(self::ERR_INVALID_SEQ_NUMBER);
}

$timeoutInMicroseconds = 0;
Expand All @@ -276,7 +302,7 @@ public function getResponses(array $seqs, mixed $options = null): iterable
if ($this->checkAllOccupiedRelaysStillConnected()) {
// Check if we've lost a relay we were waiting on, if so we need to quit since something is wrong.
if (count(array_diff_key($seqsToRelays, self::$occupiedRelays)) > 0) {
throw new RPCException("Invalid sequence number. This may occur if the number was already used, the buffers were flushed due to insufficient getResponse calling, or with a plain inccorect number. Please check your code.");
throw new RPCException(self::ERR_INVALID_SEQ_NUMBER);

Check warning on line 305 in src/RPC/MultiRPC.php

View check run for this annotation

Codecov / codecov/patch

src/RPC/MultiRPC.php#L305

Added line #L305 was not covered by tests
}
}
continue;
Expand Down Expand Up @@ -362,9 +388,9 @@ private function ensureFreeRelayAvailable(): int
*
* @param positive-int $expectedSeq
*/
private function getResponseFromRelay(ConnectedRelayInterface $relay, int $expectedSeq, bool $onlySaveResponseInCaseOfMismatchedSeq = false): Frame
private function getResponseFromRelay(RelayInterface $relay, int $expectedSeq, bool $onlySaveResponseInCaseOfMismatchedSeq = false): Frame
{
if (!$relay->isConnected()) {
if ($relay instanceof ConnectedRelayInterface && !$relay->isConnected()) {
throw new TransportException("Unable to read payload from the stream");
}

Expand Down
67 changes: 45 additions & 22 deletions tests/Goridge/MultiRPC.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ abstract class MultiRPC extends TestCase

public function testManualConnect(): void
{
$property = new ReflectionProperty(GoridgeMultiRPC::class, 'freeRelays');
$property->setValue([]);

$relays = [];
for ($i = 0; $i < 10; $i++) {
$relays[] = $this->makeRelay();
Expand All @@ -57,6 +60,9 @@ public function testManualConnect(): void

public function testReconnect(): void
{
$property = new ReflectionProperty(GoridgeMultiRPC::class, 'freeRelays');
$property->setValue([]);

/** @var SocketRelay $relay */
$relay = $this->makeRelay();
$this->rpc = new GoridgeMultiRPC([$relay]);
Expand Down Expand Up @@ -449,7 +455,7 @@ public function testCannotGetSameResponseTwice(): void
$this->assertSame('pong', $this->rpc->getResponse($id));
$this->assertFreeRelaysCorrectNumber($this->rpc);
$this->expectException(RPCException::class);
$this->expectExceptionMessage('Invalid sequence number. This may occur if the number was already used, the buffers were flushed due to insufficient getResponse calling, or with a plain inccorect number. Please check your code.');
$this->expectExceptionMessage(GoridgeMultiRPC::ERR_INVALID_SEQ_NUMBER);
$this->assertSame('pong', $this->rpc->getResponse($id));
}

Expand Down Expand Up @@ -545,7 +551,7 @@ public function testHandleRelayDisconnectWithPressure(): void

if ($discovered) {
$this->expectException(RPCException::class);
$this->expectExceptionMessage('Invalid sequence number. This may occur if the number was already used, the buffers were flushed due to insufficient getResponse calling, or with a plain inccorect number. Please check your code.');
$this->expectExceptionMessage(GoridgeMultiRPC::ERR_INVALID_SEQ_NUMBER);
} else {
$this->expectException(TransportException::class);
$this->expectExceptionMessage('Unable to read payload from the stream');
Expand Down Expand Up @@ -583,7 +589,7 @@ public function testHandleRelayDisconnectWithPressureForceDiscovered(): void
}

$this->expectException(RPCException::class);
$this->expectExceptionMessage('Invalid sequence number. This may occur if the number was already used, the buffers were flushed due to insufficient getResponse calling, or with a plain inccorect number. Please check your code.');
$this->expectExceptionMessage(GoridgeMultiRPC::ERR_INVALID_SEQ_NUMBER);
$this->rpc->getResponse($id);
}

Expand Down Expand Up @@ -626,7 +632,7 @@ public function testHandleRelayDisconnectWithPressureForceUndiscovered(): void


$this->expectException(RPCException::class);
$this->expectExceptionMessage('Invalid sequence number. This may occur if the number was already used, the buffers were flushed due to insufficient getResponse calling, or with a plain inccorect number. Please check your code.');
$this->expectExceptionMessage(GoridgeMultiRPC::ERR_INVALID_SEQ_NUMBER);
}

$this->expectException(TransportException::class);
Expand All @@ -648,7 +654,7 @@ public function testHandleRelayDisconnectWithPressureGetResponses(): void
}

$this->expectException(RPCException::class);
$this->expectExceptionMessage('Invalid sequence number. This may occur if the number was already used, the buffers were flushed due to insufficient getResponse calling, or with a plain inccorect number. Please check your code.');
$this->expectExceptionMessage(GoridgeMultiRPC::ERR_INVALID_SEQ_NUMBER);
foreach ($this->rpc->getResponses($ids) as $response) {
$this->assertSame('pong', $response);
}
Expand Down Expand Up @@ -727,21 +733,11 @@ public function testHandlesCloneCorrectly(): void
}
}

public function testAllowsOnlySockets(): void
{
$this->expectException(RPCException::class);
$this->expectExceptionMessage(
sprintf(
"MultiRPC can only be used with relays implementing the %s, such as %s",
ConnectedRelayInterface::class,
SocketRelay::class
)
);
new GoridgeMultiRPC([new StreamRelay(STDIN, STDOUT)]);
}

public function testNeedsAtLeastOne(): void
{
$property = new ReflectionProperty(GoridgeMultiRPC::class, 'freeRelays');
$property->setValue([]);
$this->expectedNumberOfRelays = 0;
$this->expectException(RPCException::class);
$this->expectExceptionMessage("MultiRPC needs at least one relay. Zero provided.");
new GoridgeMultiRPC([]);
Expand Down Expand Up @@ -811,16 +807,46 @@ public function testMultiRPCIsUsableWithOneRelay(): void
$this->rpc->callIgnoreResponse('Service.SleepEcho', 'Hello');
$id = $this->rpc->callAsync('Service.Ping', 'ping');
$this->rpc->callIgnoreResponse('Service.Echo', 'Hello');
$this->assertSame('pong', $this->rpc->call('Service.Ping', 'ping'));
$this->assertSame('pong', $this->rpc->getResponse($id));
}

public function testThrowsWhenMixedRelaysProvided(): void
{
$property = new ReflectionProperty(GoridgeMultiRPC::class, 'freeRelays');
$property->setValue([]);
$this->expectedNumberOfRelays = 0;
$relays = [new StreamRelay(STDIN, STDOUT), $this->makeRelay()];
$this->expectException(RPCException::class);
$this->expectExceptionMessage("MultiRPC can only be used with all relays of the same type, such as a " . SocketRelay::class);
new GoridgeMultiRPC($relays);
}

public function testThrowsWhenRelaysDontMatchExistingOnes(): void
{
$relays = [new StreamRelay(STDIN, STDOUT)];
$this->expectException(RPCException::class);
$this->expectExceptionMessage("MultiRPC can only be used with all relays of the same type, such as a " . SocketRelay::class);
new GoridgeMultiRPC($relays);
}

protected function setUp(): void
{
$this->makeRPC();
}

protected function makeRPC(int $count = 10): void
{
// We need to manually clean the static properties between test runs.
// In an actual application this would never happen.
$property = new ReflectionProperty(GoridgeMultiRPC::class, 'freeRelays');
$property->setValue([]);
$property = new ReflectionProperty(GoridgeMultiRPC::class, 'occupiedRelays');
$property->setValue([]);
$property = new ReflectionProperty(GoridgeMultiRPC::class, 'seqToRelayMap');
$property->setValue([]);
$property = new ReflectionProperty(GoridgeMultiRPC::class, 'asyncResponseBuffer');
$property->setValue([]);
$type = self::SOCK_TYPE->value;
$address = self::SOCK_ADDR;
$port = self::SOCK_PORT;
Expand All @@ -838,10 +864,7 @@ protected function makeRelay(): RelayInterface

protected function tearDown(): void
{
if (isset($this->rpc)) {
$this->assertFreeRelaysCorrectNumber();
unset($this->rpc);
}
$this->assertFreeRelaysCorrectNumber();
}

protected function assertFreeRelaysCorrectNumber(): void
Expand Down
15 changes: 14 additions & 1 deletion tests/Goridge/MultiRelayHelperTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,34 @@
use PHPUnit\Framework\TestCase;
use Spiral\Goridge\MultiRelayHelper;
use Spiral\Goridge\StreamRelay;
use Spiral\Goridge\Tests\MultiRPC;

class MultiRelayHelperTest extends TestCase
{
// Unfortunately a locally created stream is always "available" and will just return an empty string if no data is available.
// Thus the test below could only work with a remote stream
public function testSupportsStreamRelay(): void
{
$relays = [new StreamRelay(STDIN, STDOUT), new StreamRelay(STDIN, STDERR)];
$type = MultiRPC::SOCK_TYPE->value;
$address = MultiRPC::SOCK_ADDR;
$port = MultiRPC::SOCK_PORT;

$in = stream_socket_client("$type://$address:$port");
$this->assertTrue(stream_set_blocking($in, true));
$this->assertFalse(feof($in));
$relays = [new StreamRelay($in, STDOUT), new StreamRelay($in, STDERR)];
// No message available on STDIN, aka a read would block, so this returns false
$this->assertFalse(MultiRelayHelper::findRelayWithMessage($relays));
fclose($in);
}

public function testSupportsReadingFromStreamRelay(): void
{
$stream = fopen('php://temp', 'rw+');
fwrite($stream, 'Hello');
fseek($stream, 0);
$this->assertTrue(stream_set_blocking($stream, true));
$this->assertFalse(feof($stream));
$relays = [new StreamRelay($stream, STDOUT)];
$this->assertCount(1, MultiRelayHelper::findRelayWithMessage($relays));
fclose($stream);
Expand Down

0 comments on commit 4c621e4

Please sign in to comment.