Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement "Async" RPC #22

Closed
wants to merge 28 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
a17bcfe
feat: Implement "Async" RPC
L3tum Feb 5, 2024
0179dc4
fix: Issues uncovered by Psalm
L3tum Feb 5, 2024
e7372c9
feat: Add getResponses as well as fix a number of logic or Psalm errors
L3tum Feb 5, 2024
0a71a52
feat: Add an unholy amount of tests
L3tum Feb 5, 2024
38d73e6
feat: Optimize getResponse to remove array_search and fix potential o…
L3tum Feb 5, 2024
1edc9ae
chore: Add test for response buffer handling
L3tum Feb 5, 2024
f366338
fix: Accidentally saved too many responses
L3tum Feb 6, 2024
4605e3f
fix: Reorder methods to make sure we do not lose a relay
L3tum Feb 6, 2024
1648eeb
fix: Wrong order for $seq
L3tum Feb 6, 2024
05fee35
fix: Up response buffer maximum
L3tum Feb 6, 2024
a52fda3
fix: Add Error Handling to MultiRPC::getResponses()
L3tum Feb 6, 2024
e7dd727
feat: Simplify MultiRPC and MultiRelayHelper, fixes some issues resul…
L3tum Feb 7, 2024
419fa7e
fix: Actually call tests
L3tum Feb 7, 2024
c8ac30b
fix: Model array_key_last output as docblock
L3tum Feb 7, 2024
2122e95
fix: Issues introduced by simplification of relay handling
L3tum Feb 7, 2024
96fafa7
fix: Gracefully handle socket disconnect without blocking (too much) …
L3tum Feb 7, 2024
43fadcf
fix: Handle cloning of MultiRPC
L3tum Feb 8, 2024
812da39
fix: Handle cloning of MultiRPC with SocketRelay
L3tum Feb 8, 2024
6adeb39
fix: Typo in Testclass Name
L3tum Feb 12, 2024
8c0018e
fix: Add comments documenting ensureFreeRelayAvailable and getRespons…
L3tum Feb 12, 2024
bead301
fix: Simplify getResponses() a bit
L3tum Feb 12, 2024
74aa106
fix: Add configurable buffer threshold and change exception message
L3tum Feb 12, 2024
bb190ce
feat: Refactor code around specialty handling of SocketRelay and add …
L3tum Feb 13, 2024
33b234a
fix: Make exception more descriptive
L3tum Feb 13, 2024
9bea714
fix: Missing extends statement in interface
L3tum Feb 13, 2024
0514503
fix: Enforce __clone impl and test streams with data on them
L3tum Feb 13, 2024
c4b41cf
fix: Remove @throws Error annotation
L3tum Feb 13, 2024
e19db31
Fix code style
roxblnfk Feb 19, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
"scripts": {
"test": "phpunit --no-coverage --colors=always",
"test-cover": "phpunit --coverage-clover=coverage.xml",
"test-static": "psalm",
"test-static": "psalm --no-cache",
"test-mutations": "infection"
},
"minimum-stability": "dev",
Expand Down
35 changes: 35 additions & 0 deletions src/ConnectedRelayInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?php

namespace Spiral\Goridge;

use Spiral\Goridge\Exception\RelayException;

/**
* This interface describes a Relay that explictily establishes a connection.
* That connection can also be re-established on the fly (in comparison to StreamRelay, which relies on the existence of the streams).
* The object is also clonable, i.e. supports cloning without data errors due to shared state.
L3tum marked this conversation as resolved.
Show resolved Hide resolved
*/
interface ConnectedRelayInterface extends RelayInterface
{
/**
* Returns true if the underlying connection is already established
*/
public function isConnected(): bool;

/**
* Establishes the underlying connection and returns true on success, false on failure, or throws an exception in case of an error.
*
* @throws RelayException
*/
public function connect(): bool;

/**
* Closes the underlying connection.
*/
public function close(): void;

/**
* Enforce implementation of __clone magic method
*/
public function __clone();
}
115 changes: 115 additions & 0 deletions src/MultiRelayHelper.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
<?php

declare(strict_types=1);

namespace Spiral\Goridge;

use Spiral\Goridge\RPC\Exception\RPCException;

class MultiRelayHelper
{
/**
* @param array<array-key, RelayInterface> $relays
* @return array-key[]|false
* @internal
* Returns either
* - an array of array keys, even if only one
* - or false if none
*/
public static function findRelayWithMessage(array $relays, int $timeoutInMicroseconds = 0): array|false
{
if (\count($relays) === 0) {
return false;
}

if ($relays[\array_key_first($relays)] instanceof SocketRelay) {
$sockets = [];
$socketIdToRelayIndexMap = [];
foreach ($relays as $relayIndex => $relay) {
\assert($relay instanceof SocketRelay);

// Enforce connection
if ($relay->socket === null) {
// Important: Do not force reconnect here as it would otherwise completely ruin further handling
continue;
}

$sockets[] = $relay->socket;
$socketIdToRelayIndexMap[\spl_object_id($relay->socket)] = $relayIndex;
}

if (\count($sockets) === 0) {
return false;
}

$writes = null;
$except = null;
$changes = \socket_select($sockets, $writes, $except, 0, $timeoutInMicroseconds);

if ($changes > 0) {
$indexes = [];
foreach ($sockets as $socket) {
$indexes[] = $socketIdToRelayIndexMap[\spl_object_id($socket)] ?? throw new RPCException("Invalid socket??");
}

return $indexes;
}

return false;
}

if ($relays[\array_key_first($relays)] instanceof StreamRelay) {
$streams = [];
$streamNameToRelayIndexMap = [];
foreach ($relays as $relayIndex => $relay) {
\assert($relay instanceof StreamRelay);

$streams[] = $relay->in;
$streamNameToRelayIndexMap[(string)$relay->in] = $relayIndex;
}

$writes = null;
$except = null;
$changes = \stream_select($streams, $writes, $except, 0, $timeoutInMicroseconds);

if ($changes > 0) {
$indexes = [];
foreach ($streams as $stream) {
$indexes[] = $streamNameToRelayIndexMap[(string)$stream] ?? throw new RPCException("Invalid stream??");
}

return $indexes;
}

return false;
}

return false;
}

/**
* @param array<array-key, ConnectedRelayInterface> $relays
* @return array-key[]|false
* @internal
* Returns either
* - an array of array keys, even if only one
* - or false if none
*/
public static function checkConnected(array $relays): array|false
{
if (\count($relays) === 0) {
return false;
}

$keysNotConnected = [];
foreach ($relays as $key => $relay) {
\assert($relay instanceof ConnectedRelayInterface);
if (!$relay->isConnected()) {
$relay->connect();
$keysNotConnected[] = $key;
}
}

return $keysNotConnected;
}
}
85 changes: 85 additions & 0 deletions src/RPC/AbstractRPC.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
<?php

declare(strict_types=1);

namespace Spiral\Goridge\RPC;

use Spiral\Goridge\Frame;
use Spiral\Goridge\RelayInterface;
use Spiral\Goridge\RPC\Exception\ServiceException;

abstract class AbstractRPC implements RPCInterface
{
/**
* RPC calls service prefix.
*
* @var non-empty-string|null
*/
protected ?string $service = null;

/**
* @var positive-int
*/
protected static int $seq = 1;

public function __construct(
protected CodecInterface $codec,
) {
}

/**
* @psalm-pure
*/
public function withServicePrefix(string $service): static
{
/** @psalm-suppress ImpureVariable */
$rpc = clone $this;
$rpc->service = $service;

return $rpc;
}

/**
* @psalm-pure
*/
public function withCodec(CodecInterface $codec): self
{
/** @psalm-suppress ImpureVariable */
$rpc = clone $this;
$rpc->codec = $codec;

return $rpc;
}

/**
* @throws Exception\ServiceException
*/
protected function decodeResponse(Frame $frame, RelayInterface $relay, mixed $options = null): mixed
{
// exclude method name
$body = \substr((string)$frame->payload, $frame->options[1]);

if ($frame->hasFlag(Frame::ERROR)) {
$name = $relay instanceof \Stringable
? (string)$relay
: $relay::class;

throw new ServiceException(\sprintf("Error '%s' on %s", $body, $name));
}

return $this->codec->decode($body, $options);
}

/**
* @param non-empty-string $method
*/
protected function packFrame(string $method, mixed $payload): Frame
{
if ($this->service !== null) {
$method = $this->service . '.' . \ucfirst($method);
}

$body = $method . $this->codec->encode($payload);
return new Frame($body, [self::$seq, \strlen($method)], $this->codec->getIndex());
}
}
69 changes: 69 additions & 0 deletions src/RPC/AsyncRPCInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
<?php

namespace Spiral\Goridge\RPC;

use Spiral\Goridge\Exception\GoridgeException;
use Spiral\Goridge\Exception\RelayException;
use Spiral\Goridge\RPC\Exception\RPCException;
use Spiral\Goridge\RPC\Exception\ServiceException;

interface AsyncRPCInterface extends RPCInterface
{
/**
* Invoke remote RoadRunner service method using given payload (free form) non-blockingly and ignore the response.
*
* @param non-empty-string $method
*
* @throws GoridgeException
*/
public function callIgnoreResponse(string $method, mixed $payload): void;

/**
* Invoke remote RoadRunner service method using given payload (free form) non-blockingly but accept a response.
*
* @param non-empty-string $method
* @return positive-int An "ID" to check whether a response has been received and to fetch said response.
*
* @throws GoridgeException
*/
public function callAsync(string $method, mixed $payload): int;

/**
* Check whether a response has been received using the "ID" obtained through @see AsyncRPCInterface::callAsync() .
*
* @param positive-int $seq
* @return bool
*/
public function hasResponse(int $seq): bool;

/**
* Checks the "ID"s obtained through @see AsyncRPCInterface::callAsync() if they've got a response yet.
* Returns an array of "ID"s that do.
*
* @param positive-int[] $seqs
* @return positive-int[]
*/
public function hasResponses(array $seqs): array;

/**
* Fetch the response for the "ID" obtained through @see AsyncRPCInterface::callAsync() .
* @param positive-int $seq
* @throws RPCException
* @throws ServiceException
* @throws RelayException
*/
public function getResponse(int $seq, mixed $options = null): mixed;

/**
* Fetches the responses for the "ID"s obtained through @see AsyncRPCInterface::callAsync()
* and returns a map of "ID" => Response.
* @throws RelayException
* @throws ServiceException
* @throws RPCException
*
* @param array<array-key, positive-int> $seqs
* @return iterable<positive-int, mixed>
*
*/
public function getResponses(array $seqs, mixed $options = null): iterable;
}
Loading
Loading