Skip to content

Commit

Permalink
fix: Cloning MultiRPC could result in a lot of wasted memory due to a…
Browse files Browse the repository at this point in the history
…ll the sockets created
  • Loading branch information
L3tum committed Feb 23, 2024
1 parent 34b26f5 commit a7ed4fb
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 166 deletions.
1 change: 1 addition & 0 deletions src/ConnectedRelayInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public function close(): void;

/**
* Enforce implementation of __clone magic method
* @psalm-return void
*/
public function __clone();
}
3 changes: 1 addition & 2 deletions src/RPC/AbstractRPC.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ abstract class AbstractRPC implements RPCInterface

public function __construct(
protected CodecInterface $codec
)
{
) {
}

/**
Expand Down
125 changes: 53 additions & 72 deletions src/RPC/MultiRPC.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
use Spiral\Goridge\Frame;
use Spiral\Goridge\MultiRelayHelper;
use Spiral\Goridge\Relay;
use Spiral\Goridge\RelayInterface;
use Spiral\Goridge\RPC\Codec\JsonCodec;
use Spiral\Goridge\RPC\Exception\RPCException;
use Spiral\Goridge\SocketRelay;
Expand All @@ -26,29 +25,29 @@ class MultiRPC extends AbstractRPC implements AsyncRPCInterface
/**
* @var array<int, ConnectedRelayInterface>
*/
private array $freeRelays = [];
private static array $freeRelays = [];

/**
* Occupied Relays is a map of seq to relay to make removal easier once a response is received.
* @var array<positive-int, ConnectedRelayInterface>
*/
private array $occupiedRelays = [];
private static array $occupiedRelays = [];

/**
* A map of seq to relay to use for decodeResponse().
* Technically the relay there is only needed in case of an error.
*
* @var array<positive-int, ConnectedRelayInterface>
*/
private array $seqToRelayMap = [];
private static array $seqToRelayMap = [];

/**
* Map of seq to response Frame
* Should only really need to be used in cases of high amounts of traffic
*
* @var array<positive-int, Frame>
*/
private array $asyncResponseBuffer = [];
private static array $asyncResponseBuffer = [];

/**
* The threshold after which the asyncResponseBuffer is flushed of all entries.
Expand Down Expand Up @@ -79,29 +78,12 @@ public function __construct(
}
}

$this->freeRelays = $relays;
self::$freeRelays = $relays;
self::$occupiedRelays = self::$seqToRelayMap = self::$asyncResponseBuffer = [];
$this->asyncBufferThreshold = $asyncBufferThreshold;
parent::__construct($codec);
}

/**
* Without cloning relays explicitly they get shared and thus, when one RPC gets called, the freeRelays array
* in the other RPC stays the same, making it reuse the just-used and still occupied relay.
*/
public function __clone()
{
// Clone both freeRelays and occupiedRelays since both array handle Relay objects and thus, not cloning both of them
// would result in relays being mixed between instances and run into issues when one instance expects
// a relay to be in occupiedRelays when it's in freeRelays in the other instance.
foreach ($this->freeRelays as $key => $relay) {
$this->freeRelays[$key] = clone $relay;
}
foreach ($this->occupiedRelays as $key => $relay) {
$this->occupiedRelays[$key] = clone $relay;
}
$this->seqToRelayMap = $this->asyncResponseBuffer = [];
}

/**
* @param non-empty-string $connection
* @param positive-int $count
Expand Down Expand Up @@ -130,17 +112,16 @@ public static function create(
*/
public function preConnectRelays(): void
{
foreach ($this->freeRelays as $relay) {
foreach (self::$freeRelays as $relay) {
// Force connect
$relay->connect();
}
}


public function call(string $method, mixed $payload, mixed $options = null): mixed
{
$relayIndex = $this->ensureFreeRelayAvailable();
$relay = $this->freeRelays[$relayIndex];
$relay = self::$freeRelays[$relayIndex];

$relay->send($this->packFrame($method, $payload));

Expand All @@ -155,51 +136,51 @@ public function call(string $method, mixed $payload, mixed $options = null): mix
public function callIgnoreResponse(string $method, mixed $payload): void
{
$relayIndex = $this->ensureFreeRelayAvailable();
$relay = $this->freeRelays[$relayIndex];
$relay = self::$freeRelays[$relayIndex];

$relay->send($this->packFrame($method, $payload));

$seq = self::$seq;
self::$seq++;
$this->occupiedRelays[$seq] = $relay;
self::$occupiedRelays[$seq] = $relay;
// Last index so no need for array_pop or stuff
unset($this->freeRelays[$relayIndex]);
unset(self::$freeRelays[$relayIndex]);
}

public function callAsync(string $method, mixed $payload): int
{
// Flush buffer if someone doesn't call getResponse
if (count($this->asyncResponseBuffer) > $this->asyncBufferThreshold) {
if (count(self::$asyncResponseBuffer) > $this->asyncBufferThreshold) {
// We don't need to clean up occupiedRelays here since the buffer is solely for responses already
// fetched from relays, and those relays are put back to freeRelays in getNextFreeRelay()
$this->seqToRelayMap = array_diff_key($this->seqToRelayMap, $this->asyncResponseBuffer);
$this->asyncResponseBuffer = [];
self::$seqToRelayMap = array_diff_key(self::$seqToRelayMap, self::$asyncResponseBuffer);
self::$asyncResponseBuffer = [];
}

$relayIndex = $this->ensureFreeRelayAvailable();
$relay = $this->freeRelays[$relayIndex];
$relay = self::$freeRelays[$relayIndex];

$relay->send($this->packFrame($method, $payload));

$seq = self::$seq;
self::$seq++;
$this->occupiedRelays[$seq] = $relay;
$this->seqToRelayMap[$seq] = $relay;
self::$occupiedRelays[$seq] = $relay;
self::$seqToRelayMap[$seq] = $relay;
// Last index so no need for array_pop or stuff
unset($this->freeRelays[$relayIndex]);
unset(self::$freeRelays[$relayIndex]);

return $seq;
}

public function hasResponse(int $seq): bool
{
// Check if we have the response buffered previously due to congestion
if (isset($this->asyncResponseBuffer[$seq])) {
if (isset(self::$asyncResponseBuffer[$seq])) {
return true;
}

// Else check if the relay has the response in its buffer
if ($this->seqToRelayMap[$seq]->hasFrame()) {
if (self::$seqToRelayMap[$seq]->hasFrame()) {
return true;
}

Expand All @@ -214,11 +195,11 @@ public function hasResponses(array $seqs): array
$seqsWithResponse = [];

foreach ($seqs as $seq) {
if (isset($this->asyncResponseBuffer[$seq])) {
if (isset(self::$asyncResponseBuffer[$seq])) {
$seqsWithResponse[] = $seq;
} elseif (isset($this->seqToRelayMap[$seq])) {
} elseif (isset(self::$seqToRelayMap[$seq])) {
$relayIndexToSeq[count($relays)] = $seq;
$relays[] = $this->seqToRelayMap[$seq];
$relays[] = self::$seqToRelayMap[$seq];
}
}

Expand All @@ -238,8 +219,8 @@ public function hasResponses(array $seqs): array

public function getResponse(int $seq, mixed $options = null): mixed
{
$relay = $this->seqToRelayMap[$seq] ?? throw new RPCException('Invalid sequence number. This may occur if the number was already used, the buffers were flushed due to insufficient getResponse calling, or with a plain inccorect number. Please check your code.');
unset($this->seqToRelayMap[$seq]);
$relay = self::$seqToRelayMap[$seq] ?? throw new RPCException('Invalid sequence number. This may occur if the number was already used, the buffers were flushed due to insufficient getResponse calling, or with a plain inccorect number. Please check your code.');
unset(self::$seqToRelayMap[$seq]);

if (($frame = $this->getResponseFromBuffer($seq)) !== null) {
/**
Expand All @@ -248,8 +229,8 @@ public function getResponse(int $seq, mixed $options = null): mixed
* Thus we only re-add (and do so without searching for it first) if we don't have the response yet.
*/
} else {
$this->freeRelays[] = $this->occupiedRelays[$seq];
unset($this->occupiedRelays[$seq]);
self::$freeRelays[] = self::$occupiedRelays[$seq];
unset(self::$occupiedRelays[$seq]);

$frame = $this->getResponseFromRelay($relay, $seq, true);
}
Expand All @@ -268,7 +249,7 @@ public function getResponses(array $seqs, mixed $options = null): iterable
$seqsKeyed = [];

foreach ($seqs as $seq) {
if (isset($this->asyncResponseBuffer[$seq])) {
if (isset(self::$asyncResponseBuffer[$seq])) {
// We can use getResponse() here since it's doing basically what we want to do here anyway
yield $seq => $this->getResponse($seq, $options);
} else {
Expand All @@ -277,7 +258,7 @@ public function getResponses(array $seqs, mixed $options = null): iterable
}

// Fetch all relays that are still occupied and which we need responses from
$seqsToRelays = array_intersect_key($this->occupiedRelays, $seqsKeyed);
$seqsToRelays = array_intersect_key(self::$occupiedRelays, $seqsKeyed);

// Make sure we have relays for all $seqs, otherwise something went wrong
if (count($seqsToRelays) !== count($seqsKeyed)) {
Expand All @@ -294,7 +275,7 @@ public function getResponses(array $seqs, mixed $options = null): iterable
if ($seqsReceivedResponse === false) {
if ($this->checkAllOccupiedRelaysStillConnected()) {
// Check if we've lost a relay we were waiting on, if so we need to quit since something is wrong.
if (count(array_diff_key($seqsToRelays, $this->occupiedRelays)) > 0) {
if (count(array_diff_key($seqsToRelays, self::$occupiedRelays)) > 0) {
throw new RPCException("Invalid sequence number. This may occur if the number was already used, the buffers were flushed due to insufficient getResponse calling, or with a plain inccorect number. Please check your code.");
}
}
Expand All @@ -304,15 +285,15 @@ public function getResponses(array $seqs, mixed $options = null): iterable
foreach ($seqsReceivedResponse as $seq) {
// Add the previously occupied relay to freeRelays here so that we don't lose it in case of an error
$relay = $seqsToRelays[$seq];
$this->freeRelays[] = $relay;
unset($this->occupiedRelays[$seq]);
self::$freeRelays[] = $relay;
unset(self::$occupiedRelays[$seq]);

// Yield the response
$frame = $this->getResponseFromRelay($relay, $seq, true);
yield $seq => $this->decodeResponse($frame, $relay, $options);

// Unset tracking map
unset($seqsToRelays[$seq], $this->seqToRelayMap[$seq]);
unset($seqsToRelays[$seq], self::$seqToRelayMap[$seq]);
}
}
}
Expand All @@ -323,21 +304,21 @@ public function getResponses(array $seqs, mixed $options = null): iterable
*/
private function ensureFreeRelayAvailable(): int
{
if (count($this->freeRelays) > 0) {
// Return the last entry on $this->freeRelays so that further code can use unset() instead of array_splice (index handling)
if (count(self::$freeRelays) > 0) {
// Return the last entry on self::$freeRelays so that further code can use unset() instead of array_splice (index handling)
/** @psalm-return int */
return array_key_last($this->freeRelays);
return array_key_last(self::$freeRelays);
}

if (count($this->occupiedRelays) === 0) {
if (count(self::$occupiedRelays) === 0) {
// If we have neither freeRelays nor occupiedRelays then someone either initialized this with 0 relays
// or something went terribly wrong. Either way we need to quit.
throw new RPCException("No relays available at all");
}

while (count($this->freeRelays) === 0) {
while (count(self::$freeRelays) === 0) {
/** @var positive-int[]|false $index */
$index = MultiRelayHelper::findRelayWithMessage($this->occupiedRelays);
$index = MultiRelayHelper::findRelayWithMessage(self::$occupiedRelays);

if ($index === false) {
// Check if all currently occupied relays are even still connected. Do another loop if they aren't.
Expand All @@ -346,7 +327,7 @@ private function ensureFreeRelayAvailable(): int
} else {
// Just choose the first occupiedRelay to wait on since instead we may busyloop here
// checking relay status and not giving RR the chance to actually answer (in a single core env for example).
$index = [array_key_first($this->occupiedRelays)];
$index = [array_key_first(self::$occupiedRelays)];
}
}

Expand All @@ -356,13 +337,13 @@ private function ensureFreeRelayAvailable(): int
$seq = $index[$i];
// Move relay from occupiedRelays into freeRelays before trying to get the response from it
// in case something happens, so we don't lose it.
$relay = $this->occupiedRelays[$seq];
$this->freeRelays[] = $relay;
unset($this->occupiedRelays[$seq]);
$relay = self::$occupiedRelays[$seq];
self::$freeRelays[] = $relay;
unset(self::$occupiedRelays[$seq]);
// Save response if in seqToRelayMap (aka a response is expected)
// only save response in case of mismatched seq = response not in seqToRelayMap
try {
$this->getResponseFromRelay($relay, $seq, !isset($this->seqToRelayMap[$seq]));
$this->getResponseFromRelay($relay, $seq, !isset(self::$seqToRelayMap[$seq]));
} catch (RelayException|RPCException) {
// Intentionally left blank
}
Expand All @@ -372,8 +353,8 @@ private function ensureFreeRelayAvailable(): int
// Sometimes check if all occupied relays are even still connected
$this->checkAllOccupiedRelaysStillConnected();

// Return the last entry on $this->freeRelays so that further code can use unset() instead of array_splice (index handling)
return array_key_last($this->freeRelays);
// Return the last entry on self::$freeRelays so that further code can use unset() instead of array_splice (index handling)
return array_key_last(self::$freeRelays);
}

/**
Expand All @@ -398,7 +379,7 @@ private function getResponseFromRelay(ConnectedRelayInterface $relay, int $expec
// Save response since $seq was invalid but the response may not
/** @var positive-int $responseSeq */
$responseSeq = $frame->options[0];
$this->asyncResponseBuffer[$responseSeq] = $frame;
self::$asyncResponseBuffer[$responseSeq] = $frame;

throw new RPCException('Invalid RPC frame, sequence mismatch');
}
Expand All @@ -408,7 +389,7 @@ private function getResponseFromRelay(ConnectedRelayInterface $relay, int $expec
// we'll need to add it to the buffer.
// This is used in e.g. flushing a relay in ensureFreeRelay()
// so that we can at least *try* to get the resonse back to the user.
$this->asyncResponseBuffer[$expectedSeq] = $frame;
self::$asyncResponseBuffer[$expectedSeq] = $frame;
}

return $frame;
Expand All @@ -422,20 +403,20 @@ private function getResponseFromRelay(ConnectedRelayInterface $relay, int $expec
*/
private function getResponseFromBuffer(int $seq): ?Frame
{
if (($frame = $this->asyncResponseBuffer[$seq] ?? null) !== null) {
unset($this->asyncResponseBuffer[$seq]);
if (($frame = self::$asyncResponseBuffer[$seq] ?? null) !== null) {
unset(self::$asyncResponseBuffer[$seq]);
}

return $frame;
}

private function checkAllOccupiedRelaysStillConnected(): bool
{
if (($relaysNotConnected = MultiRelayHelper::checkConnected($this->occupiedRelays)) !== false) {
if (($relaysNotConnected = MultiRelayHelper::checkConnected(self::$occupiedRelays)) !== false) {
/** @var positive-int $seq */
foreach ($relaysNotConnected as $seq) {
$this->freeRelays[] = $this->occupiedRelays[$seq];
unset($this->seqToRelayMap[$seq], $this->occupiedRelays[$seq]);
self::$freeRelays[] = self::$occupiedRelays[$seq];
unset(self::$seqToRelayMap[$seq], self::$occupiedRelays[$seq]);
}

return true;
Expand Down
Loading

0 comments on commit a7ed4fb

Please sign in to comment.