Skip to content

Commit

Permalink
Fix the sequence count with backward compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
msmakouz committed Jun 26, 2024
1 parent ab4b100 commit c799049
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 22 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,4 @@ jobs:
token: ${{ secrets.CODECOV_TOKEN }}
file: ./coverage.xml
flags: php
fail_ci_if_error: false
fail_ci_if_error: false
5 changes: 4 additions & 1 deletion examples/swoole.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

require 'vendor/autoload.php';

/**
* This example demonstrates how to use the package within Swoole coroutines.
*/
Co::set(['hook_flags'=> SWOOLE_HOOK_ALL]);
Co\Run(function () {
$barrier = Barrier::make();
Expand All @@ -20,4 +23,4 @@
});
}
Barrier::wait($barrier);
});
});
24 changes: 23 additions & 1 deletion src/RPC/RPC.php
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,17 @@ class RPC implements RPCInterface
*/
private ?string $service = null;

/**
* @var positive-int
* @deprecated since v3.2.1.
*/
private static int $seq = 1;

/**
* @deprecated since v3.2.1. Need for backward compatibility.
*/
private bool $hasSequence = false;

/**
* @param RelayInterface $relay
* @param CodecInterface|null $codec
Expand All @@ -38,6 +49,7 @@ public function __construct(RelayInterface $relay, CodecInterface $codec = null)
{
$this->relay = $relay;
$this->codec = $codec ?? new JsonCodec();
$this->hasSequence = \method_exists($this->relay, 'getNextSequence');
}

/**
Expand Down Expand Up @@ -71,7 +83,7 @@ public function withCodec(CodecInterface $codec): RPCInterface
*/
public function call(string $method, $payload, $options = null)
{
$seq = $this->relay->getNextSeq();
$seq = $this->getNextSequence();

$this->relay->send($this->packFrame($method, $payload, $seq));

Expand All @@ -86,6 +98,8 @@ public function call(string $method, $payload, $options = null)
throw new RPCException('Invalid RPC frame, sequence mismatch');
}

self::$seq++;

return $this->decodeResponse($frame, $options);
}

Expand Down Expand Up @@ -167,4 +181,12 @@ private function packFrame(string $method, $payload, int $seq): Frame
$body = $method . $this->codec->encode($payload);
return new Frame($body, [$seq, \strlen($method)], $this->codec->getIndex());
}

/**
* @deprecated since v3.2.1.
*/
private function getNextSequence(): int
{
return $this->hasSequence ? $this->relay->getNextSequence() : self::$seq;
}
}
12 changes: 3 additions & 9 deletions src/Relay.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@ abstract class Relay implements RelayInterface
public const PIPES = 'pipes';
protected const CONNECTION_EXP = '/(?P<protocol>[^:\/]+):\/\/(?P<arg1>[^:]+)(:(?P<arg2>[^:]+))?/';

/**
* @var int
*/
private int $seq = 1;
private int $sequence = 1;

/**
* Create relay using string address.
Expand Down Expand Up @@ -99,11 +96,8 @@ private static function openOut(string $output)
return $resource;
}

/**
* @return int
*/
public function getNextSeq(): int
public function getNextSequence(): int
{
return $this->seq++;
return $this->sequence++;
}
}
10 changes: 1 addition & 9 deletions src/RelayInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,14 @@

/**
* Blocking, duplex relay.
* @method getNextSequence(): int
*/
interface RelayInterface
{
/**
* @return Frame
* @throws RelayException
*/
public function waitFrame(): Frame;

/**
* @param Frame $frame
*/
public function send(Frame $frame): void;

/**
* @return int
*/
public function getNextSeq(): int;
}
71 changes: 70 additions & 1 deletion tests/Goridge/RPCTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

use Exception;
use PHPUnit\Framework\TestCase;
use Spiral\Goridge\Frame;
use Spiral\Goridge\Relay;
use Spiral\Goridge\RelayInterface;
use Spiral\Goridge\RPC\Codec\RawCodec;
use Spiral\Goridge\RPC\Exception\CodecException;
Expand Down Expand Up @@ -155,7 +157,7 @@ public function testLongRawBody(): void
{
$conn = $this->makeRPC();
$payload = random_bytes(65000 * 1000);

$resp = $conn->withCodec(new RawCodec())->call(
'Service.EchoBinary',
$payload
Expand Down Expand Up @@ -248,6 +250,73 @@ public function testJsonException(): void
$conn->call('Service.Process', random_bytes(256));
}

public function testCallSequence(): void
{
$relay1 = $this->getMockBuilder(Relay::class)->onlyMethods(['waitFrame', 'send'])->getMock();
$relay1
->method('waitFrame')
->willReturnCallback(function () {
static $series = [
[new Frame('Service.Process{}', [1, 15])],
[new Frame('Service.Process{}', [2, 15])],
[new Frame('Service.Process{}', [3, 15])],
];

[$return] = \array_shift($series);

return $return;
});
$relay1
->method('send')
->willReturnCallback(function (Frame $frame) {
static $series = [
[new Frame('Service.Process{"Name":"foo","Value":18}', [1, 15], 8)],
[new Frame('Service.Process{"Name":"foo","Value":18}', [2, 15], 8)],
[new Frame('Service.Process{"Name":"foo","Value":18}', [3, 15], 8)],
];

[$expectedArgs] = \array_shift($series);
self::assertEquals($expectedArgs, $frame);
});

$relay2 = $this->getMockBuilder(Relay::class)->onlyMethods(['waitFrame', 'send'])->getMock();
$relay2
->method('waitFrame')
->willReturnCallback(function () {
static $series = [
[new Frame('Service.Process{}', [1, 15])],
[new Frame('Service.Process{}', [2, 15])],
[new Frame('Service.Process{}', [3, 15])],
];

[$return] = \array_shift($series);

return $return;
});
$relay2
->method('send')
->willReturnCallback(function (Frame $frame) {
static $series = [
[new Frame('Service.Process{"Name":"bar","Value":18}', [1, 15], 8)],
[new Frame('Service.Process{"Name":"bar","Value":18}', [2, 15], 8)],
[new Frame('Service.Process{"Name":"bar","Value":18}', [3, 15], 8)],
];

[$expectedArgs] = \array_shift($series);
self::assertEquals($expectedArgs, $frame);
});

$conn1 = new RPC($relay1);
$conn2 = new RPC($relay2);

$conn1->call('Service.Process', ['Name' => 'foo', 'Value' => 18]);
$conn2->call('Service.Process', ['Name' => 'bar', 'Value' => 18]);
$conn1->call('Service.Process', ['Name' => 'foo', 'Value' => 18]);
$conn2->call('Service.Process', ['Name' => 'bar', 'Value' => 18]);
$conn1->call('Service.Process', ['Name' => 'foo', 'Value' => 18]);
$conn2->call('Service.Process', ['Name' => 'bar', 'Value' => 18]);
}

/**
* @return RPC
*/
Expand Down

0 comments on commit c799049

Please sign in to comment.