From 7c6d994781e7efb71cc7fcf8befd1b8b05daf894 Mon Sep 17 00:00:00 2001 From: Maxim Smakouz Date: Mon, 21 Oct 2024 13:52:29 +0300 Subject: [PATCH] Fix sequence counting --- examples/swoole.php | 26 ++++++++++++++++++++ src/RPC/AbstractRPC.php | 21 ++++++++-------- src/RPC/MultiRPC.php | 9 ++++--- src/RPC/RPC.php | 5 ++-- tests/Goridge/RPC.php | 54 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 100 insertions(+), 15 deletions(-) create mode 100644 examples/swoole.php diff --git a/examples/swoole.php b/examples/swoole.php new file mode 100644 index 0000000..7d19a61 --- /dev/null +++ b/examples/swoole.php @@ -0,0 +1,26 @@ + SWOOLE_HOOK_ALL]); +Co\Run(function () { + $barrier = Barrier::make(); + for ($i = 0; $i < 3; $i++) { + go(function () use ($barrier) { + $rpc = new Goridge\RPC\RPC( + Goridge\Relay::create('tcp://127.0.0.1:6001') + ); + echo $rpc->call('App.Hi', 'Antony'); + }); + } + Barrier::wait($barrier); +}); diff --git a/src/RPC/AbstractRPC.php b/src/RPC/AbstractRPC.php index ab4e593..fd748a7 100644 --- a/src/RPC/AbstractRPC.php +++ b/src/RPC/AbstractRPC.php @@ -7,11 +7,6 @@ use Spiral\Goridge\Frame; use Spiral\Goridge\RelayInterface; use Spiral\Goridge\RPC\Exception\ServiceException; -use Stringable; -use function sprintf; -use function strlen; -use function substr; -use function ucfirst; abstract class AbstractRPC implements RPCInterface { @@ -24,9 +19,15 @@ abstract class AbstractRPC implements RPCInterface /** * @var positive-int + * @deprecated Use $this->sequence instead. */ protected static int $seq = 1; + /** + * @var positive-int + */ + protected int $sequence = 1; + public function __construct( protected CodecInterface $codec ) { @@ -62,14 +63,14 @@ public function withCodec(CodecInterface $codec): self protected function decodeResponse(Frame $frame, RelayInterface $relay, mixed $options = null): mixed { // exclude method name - $body = substr((string)$frame->payload, $frame->options[1]); + $body = \substr((string)$frame->payload, $frame->options[1]); if ($frame->hasFlag(Frame::ERROR)) { - $name = $relay instanceof Stringable + $name = $relay instanceof \Stringable ? (string)$relay : $relay::class; - throw new ServiceException(sprintf("Error '%s' on %s", $body, $name)); + throw new ServiceException(\sprintf("Error '%s' on %s", $body, $name)); } return $this->codec->decode($body, $options); @@ -81,10 +82,10 @@ protected function decodeResponse(Frame $frame, RelayInterface $relay, mixed $op protected function packFrame(string $method, mixed $payload): Frame { if ($this->service !== null) { - $method = $this->service . '.' . ucfirst($method); + $method = $this->service . '.' . \ucfirst($method); } $body = $method . $this->codec->encode($payload); - return new Frame($body, [self::$seq, strlen($method)], $this->codec->getIndex()); + return new Frame($body, [$this->sequence, \strlen($method)], $this->codec->getIndex()); } } diff --git a/src/RPC/MultiRPC.php b/src/RPC/MultiRPC.php index e518947..2177310 100644 --- a/src/RPC/MultiRPC.php +++ b/src/RPC/MultiRPC.php @@ -150,9 +150,10 @@ public function call(string $method, mixed $payload, mixed $options = null): mix $relay->send($this->packFrame($method, $payload)); // wait for the frame confirmation - $frame = $this->getResponseFromRelay($relay, self::$seq, true); + $frame = $this->getResponseFromRelay($relay, $this->sequence, true); self::$seq++; + $this->sequence++; return $this->decodeResponse($frame, $relay, $options); } @@ -164,8 +165,9 @@ public function callIgnoreResponse(string $method, mixed $payload): void $relay->send($this->packFrame($method, $payload)); - $seq = self::$seq; + $seq = $this->sequence; self::$seq++; + $this->sequence++; self::$occupiedRelays[$seq] = $relay; // Last index so no need for array_pop or stuff unset(self::$freeRelays[$relayIndex]); @@ -186,8 +188,9 @@ public function callAsync(string $method, mixed $payload): int $relay->send($this->packFrame($method, $payload)); - $seq = self::$seq; + $seq = $this->sequence; self::$seq++; + $this->sequence++; self::$occupiedRelays[$seq] = $relay; self::$seqToRelayMap[$seq] = $relay; // Last index so no need for array_pop or stuff diff --git a/src/RPC/RPC.php b/src/RPC/RPC.php index 28979dc..7c9eb19 100644 --- a/src/RPC/RPC.php +++ b/src/RPC/RPC.php @@ -15,7 +15,7 @@ class RPC extends AbstractRPC public function __construct( private readonly RelayInterface $relay, - CodecInterface $codec = new JsonCodec(), + CodecInterface $codec = new JsonCodec(), ) { parent::__construct($codec); @@ -32,11 +32,12 @@ public function call(string $method, mixed $payload, mixed $options = null): mix throw new RPCException('Invalid RPC frame, options missing'); } - if ($frame->options[0] !== self::$seq) { + if ($frame->options[0] !== $this->sequence) { throw new RPCException('Invalid RPC frame, sequence mismatch'); } self::$seq++; + $this->sequence++; return $this->decodeResponse($frame, $this->relay, $options); } diff --git a/tests/Goridge/RPC.php b/tests/Goridge/RPC.php index e4decc7..41c0748 100644 --- a/tests/Goridge/RPC.php +++ b/tests/Goridge/RPC.php @@ -6,6 +6,7 @@ use Exception; use PHPUnit\Framework\TestCase; +use Spiral\Goridge\Frame; use Spiral\Goridge\RelayInterface; use Spiral\Goridge\RPC\Codec\RawCodec; use Spiral\Goridge\RPC\Exception\CodecException; @@ -249,6 +250,59 @@ public function testJsonException(): void $conn->call('Service.Process', random_bytes(256)); } + public function testCallSequence(): void + { + $relay1Matcher = $this->exactly(3); + $relay1 = $this->createMock(RelayInterface::class); + $relay1 + ->method('waitFrame') + ->willReturnOnConsecutiveCalls( + new Frame('Service.Process{}', [1, 15]), + new Frame('Service.Process{}', [2, 15]), + new Frame('Service.Process{}', [3, 15]) + ); + $relay1 + ->expects($relay1Matcher) + ->method('send') + ->willReturnCallback(function (Frame $value) use ($relay1Matcher) { + match ($relay1Matcher->numberOfInvocations()) { + 1 => $this->assertEquals(new Frame('Service.Process{"Name":"foo","Value":18}', [1, 15], 8), $value), + 2 => $this->assertEquals(new Frame('Service.Process{"Name":"foo","Value":18}', [2, 15], 8), $value), + 3 => $this->assertEquals(new Frame('Service.Process{"Name":"foo","Value":18}', [3, 15], 8), $value), + }; + }); + + $relay2Matcher = $this->exactly(3); + $relay2 = $this->createMock(RelayInterface::class); + $relay2 + ->method('waitFrame') + ->willReturnOnConsecutiveCalls( + new Frame('Service.Process{}', [1, 15]), + new Frame('Service.Process{}', [2, 15]), + new Frame('Service.Process{}', [3, 15]) + ); + $relay2 + ->expects($relay2Matcher) + ->method('send') + ->willReturnCallback(function (Frame $value) use ($relay2Matcher) { + match ($relay2Matcher->numberOfInvocations()) { + 1 => $this->assertEquals(new Frame('Service.Process{"Name":"bar","Value":18}', [1, 15], 8), $value), + 2 => $this->assertEquals(new Frame('Service.Process{"Name":"bar","Value":18}', [2, 15], 8), $value), + 3 => $this->assertEquals(new Frame('Service.Process{"Name":"bar","Value":18}', [3, 15], 8), $value), + }; + }); + + $conn1 = new \Spiral\Goridge\RPC\RPC($relay1); + $conn2 = new \Spiral\Goridge\RPC\RPC($relay2); + + $conn1->call('Service.Process', ['Name' => 'foo', 'Value' => 18]); + $conn2->call('Service.Process', ['Name' => 'bar', 'Value' => 18]); + $conn1->call('Service.Process', ['Name' => 'foo', 'Value' => 18]); + $conn2->call('Service.Process', ['Name' => 'bar', 'Value' => 18]); + $conn1->call('Service.Process', ['Name' => 'foo', 'Value' => 18]); + $conn2->call('Service.Process', ['Name' => 'bar', 'Value' => 18]); + } + /** * @return GoridgeRPC */