Skip to content

Commit

Permalink
Fix sequence counting
Browse files Browse the repository at this point in the history
  • Loading branch information
msmakouz committed Oct 21, 2024
1 parent 8fe68b0 commit 7c6d994
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 15 deletions.
26 changes: 26 additions & 0 deletions examples/swoole.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?php

declare(strict_types=1);

use Spiral\Goridge;
use Swoole\Coroutine as Co;
use Swoole\Coroutine\Barrier;

require 'vendor/autoload.php';

/**
* This example demonstrates how to use the package within Swoole coroutines.
*/
Co::set(['hook_flags'=> SWOOLE_HOOK_ALL]);
Co\Run(function () {
$barrier = Barrier::make();
for ($i = 0; $i < 3; $i++) {
go(function () use ($barrier) {
$rpc = new Goridge\RPC\RPC(
Goridge\Relay::create('tcp://127.0.0.1:6001')
);
echo $rpc->call('App.Hi', 'Antony');
});
}
Barrier::wait($barrier);
});
21 changes: 11 additions & 10 deletions src/RPC/AbstractRPC.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,6 @@
use Spiral\Goridge\Frame;
use Spiral\Goridge\RelayInterface;
use Spiral\Goridge\RPC\Exception\ServiceException;
use Stringable;
use function sprintf;
use function strlen;
use function substr;
use function ucfirst;

abstract class AbstractRPC implements RPCInterface
{
Expand All @@ -24,9 +19,15 @@ abstract class AbstractRPC implements RPCInterface

/**
* @var positive-int
* @deprecated Use $this->sequence instead.
*/
protected static int $seq = 1;

/**
* @var positive-int
*/
protected int $sequence = 1;

public function __construct(
protected CodecInterface $codec
) {
Expand Down Expand Up @@ -62,14 +63,14 @@ public function withCodec(CodecInterface $codec): self
protected function decodeResponse(Frame $frame, RelayInterface $relay, mixed $options = null): mixed
{
// exclude method name
$body = substr((string)$frame->payload, $frame->options[1]);
$body = \substr((string)$frame->payload, $frame->options[1]);

if ($frame->hasFlag(Frame::ERROR)) {
$name = $relay instanceof Stringable
$name = $relay instanceof \Stringable
? (string)$relay
: $relay::class;

throw new ServiceException(sprintf("Error '%s' on %s", $body, $name));
throw new ServiceException(\sprintf("Error '%s' on %s", $body, $name));
}

return $this->codec->decode($body, $options);
Expand All @@ -81,10 +82,10 @@ protected function decodeResponse(Frame $frame, RelayInterface $relay, mixed $op
protected function packFrame(string $method, mixed $payload): Frame
{
if ($this->service !== null) {
$method = $this->service . '.' . ucfirst($method);
$method = $this->service . '.' . \ucfirst($method);
}

$body = $method . $this->codec->encode($payload);
return new Frame($body, [self::$seq, strlen($method)], $this->codec->getIndex());
return new Frame($body, [$this->sequence, \strlen($method)], $this->codec->getIndex());
}
}
9 changes: 6 additions & 3 deletions src/RPC/MultiRPC.php
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,10 @@ public function call(string $method, mixed $payload, mixed $options = null): mix
$relay->send($this->packFrame($method, $payload));

// wait for the frame confirmation
$frame = $this->getResponseFromRelay($relay, self::$seq, true);
$frame = $this->getResponseFromRelay($relay, $this->sequence, true);

self::$seq++;
$this->sequence++;

return $this->decodeResponse($frame, $relay, $options);
}
Expand All @@ -164,8 +165,9 @@ public function callIgnoreResponse(string $method, mixed $payload): void

$relay->send($this->packFrame($method, $payload));

$seq = self::$seq;
$seq = $this->sequence;
self::$seq++;
$this->sequence++;
self::$occupiedRelays[$seq] = $relay;
// Last index so no need for array_pop or stuff
unset(self::$freeRelays[$relayIndex]);
Expand All @@ -186,8 +188,9 @@ public function callAsync(string $method, mixed $payload): int

$relay->send($this->packFrame($method, $payload));

$seq = self::$seq;
$seq = $this->sequence;
self::$seq++;
$this->sequence++;
self::$occupiedRelays[$seq] = $relay;
self::$seqToRelayMap[$seq] = $relay;
// Last index so no need for array_pop or stuff
Expand Down
5 changes: 3 additions & 2 deletions src/RPC/RPC.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class RPC extends AbstractRPC

public function __construct(
private readonly RelayInterface $relay,
CodecInterface $codec = new JsonCodec(),
CodecInterface $codec = new JsonCodec(),
)
{
parent::__construct($codec);
Expand All @@ -32,11 +32,12 @@ public function call(string $method, mixed $payload, mixed $options = null): mix
throw new RPCException('Invalid RPC frame, options missing');
}

if ($frame->options[0] !== self::$seq) {
if ($frame->options[0] !== $this->sequence) {
throw new RPCException('Invalid RPC frame, sequence mismatch');
}

self::$seq++;
$this->sequence++;

return $this->decodeResponse($frame, $this->relay, $options);
}
Expand Down
54 changes: 54 additions & 0 deletions tests/Goridge/RPC.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use Exception;
use PHPUnit\Framework\TestCase;
use Spiral\Goridge\Frame;
use Spiral\Goridge\RelayInterface;
use Spiral\Goridge\RPC\Codec\RawCodec;
use Spiral\Goridge\RPC\Exception\CodecException;
Expand Down Expand Up @@ -249,6 +250,59 @@ public function testJsonException(): void
$conn->call('Service.Process', random_bytes(256));
}

public function testCallSequence(): void
{
$relay1Matcher = $this->exactly(3);
$relay1 = $this->createMock(RelayInterface::class);
$relay1
->method('waitFrame')
->willReturnOnConsecutiveCalls(
new Frame('Service.Process{}', [1, 15]),
new Frame('Service.Process{}', [2, 15]),
new Frame('Service.Process{}', [3, 15])
);
$relay1
->expects($relay1Matcher)
->method('send')
->willReturnCallback(function (Frame $value) use ($relay1Matcher) {
match ($relay1Matcher->numberOfInvocations()) {
1 => $this->assertEquals(new Frame('Service.Process{"Name":"foo","Value":18}', [1, 15], 8), $value),
2 => $this->assertEquals(new Frame('Service.Process{"Name":"foo","Value":18}', [2, 15], 8), $value),
3 => $this->assertEquals(new Frame('Service.Process{"Name":"foo","Value":18}', [3, 15], 8), $value),
};
});

$relay2Matcher = $this->exactly(3);
$relay2 = $this->createMock(RelayInterface::class);
$relay2
->method('waitFrame')
->willReturnOnConsecutiveCalls(
new Frame('Service.Process{}', [1, 15]),
new Frame('Service.Process{}', [2, 15]),
new Frame('Service.Process{}', [3, 15])
);
$relay2
->expects($relay2Matcher)
->method('send')
->willReturnCallback(function (Frame $value) use ($relay2Matcher) {
match ($relay2Matcher->numberOfInvocations()) {
1 => $this->assertEquals(new Frame('Service.Process{"Name":"bar","Value":18}', [1, 15], 8), $value),
2 => $this->assertEquals(new Frame('Service.Process{"Name":"bar","Value":18}', [2, 15], 8), $value),
3 => $this->assertEquals(new Frame('Service.Process{"Name":"bar","Value":18}', [3, 15], 8), $value),
};
});

$conn1 = new \Spiral\Goridge\RPC\RPC($relay1);
$conn2 = new \Spiral\Goridge\RPC\RPC($relay2);

$conn1->call('Service.Process', ['Name' => 'foo', 'Value' => 18]);
$conn2->call('Service.Process', ['Name' => 'bar', 'Value' => 18]);
$conn1->call('Service.Process', ['Name' => 'foo', 'Value' => 18]);
$conn2->call('Service.Process', ['Name' => 'bar', 'Value' => 18]);
$conn1->call('Service.Process', ['Name' => 'foo', 'Value' => 18]);
$conn2->call('Service.Process', ['Name' => 'bar', 'Value' => 18]);
}

/**
* @return GoridgeRPC
*/
Expand Down

0 comments on commit 7c6d994

Please sign in to comment.