Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix sequence counting #34

Draft
wants to merge 1 commit into
base: 4.x
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions examples/swoole.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?php

declare(strict_types=1);

use Spiral\Goridge;
use Swoole\Coroutine as Co;
use Swoole\Coroutine\Barrier;

require 'vendor/autoload.php';

/**
* This example demonstrates how to use the package within Swoole coroutines.
*/
Co::set(['hook_flags'=> SWOOLE_HOOK_ALL]);
Co\Run(function () {
$barrier = Barrier::make();
for ($i = 0; $i < 3; $i++) {
go(function () use ($barrier) {
$rpc = new Goridge\RPC\RPC(
Goridge\Relay::create('tcp://127.0.0.1:6001')
);
echo $rpc->call('App.Hi', 'Antony');
});
Comment on lines +18 to +23
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider optimizing RPC usage and improving configuration management.

  1. Creating a new RPC instance for each coroutine might not be the most efficient approach. Consider reusing a single RPC instance across coroutines if possible.

  2. The hardcoded IP and port (tcp://127.0.0.1:6001) could be moved to a configuration variable for better maintainability and flexibility.

  3. The direct echo of the RPC call result doesn't allow for proper error handling or output formatting. Consider wrapping this in a try-catch block and formatting the output.

Here's a suggested refactor:

$config = [
    'rpc_address' => 'tcp://127.0.0.1:6001',
    'num_coroutines' => 3,
];

$rpc = new Goridge\RPC\RPC(Goridge\Relay::create($config['rpc_address']));

for ($i = 0; $i < $config['num_coroutines']; $i++) {
    go(function () use ($rpc, $barrier) {
        try {
            $result = $rpc->call('App.Hi', 'Antony');
            echo "Coroutine " . Co::getCid() . " result: " . $result . PHP_EOL;
        } catch (\Exception $e) {
            echo "Coroutine " . Co::getCid() . " error: " . $e->getMessage() . PHP_EOL;
        }
    });
}

This refactored version improves configuration management, reuses the RPC instance, and adds basic error handling and output formatting.

}
Barrier::wait($barrier);
});
21 changes: 11 additions & 10 deletions src/RPC/AbstractRPC.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,6 @@
use Spiral\Goridge\Frame;
use Spiral\Goridge\RelayInterface;
use Spiral\Goridge\RPC\Exception\ServiceException;
use Stringable;
use function sprintf;
use function strlen;
use function substr;
use function ucfirst;

abstract class AbstractRPC implements RPCInterface
{
Expand All @@ -24,9 +19,15 @@ abstract class AbstractRPC implements RPCInterface

/**
* @var positive-int
* @deprecated Use $this->sequence instead.
*/
protected static int $seq = 1;

/**
* @var positive-int
*/
protected int $sequence = 1;

public function __construct(
protected CodecInterface $codec
) {
Expand Down Expand Up @@ -62,14 +63,14 @@ public function withCodec(CodecInterface $codec): self
protected function decodeResponse(Frame $frame, RelayInterface $relay, mixed $options = null): mixed
{
// exclude method name
$body = substr((string)$frame->payload, $frame->options[1]);
$body = \substr((string)$frame->payload, $frame->options[1]);

if ($frame->hasFlag(Frame::ERROR)) {
$name = $relay instanceof Stringable
$name = $relay instanceof \Stringable
? (string)$relay
: $relay::class;

throw new ServiceException(sprintf("Error '%s' on %s", $body, $name));
throw new ServiceException(\sprintf("Error '%s' on %s", $body, $name));
}

return $this->codec->decode($body, $options);
Expand All @@ -81,10 +82,10 @@ protected function decodeResponse(Frame $frame, RelayInterface $relay, mixed $op
protected function packFrame(string $method, mixed $payload): Frame
{
if ($this->service !== null) {
$method = $this->service . '.' . ucfirst($method);
$method = $this->service . '.' . \ucfirst($method);
}

$body = $method . $this->codec->encode($payload);
return new Frame($body, [self::$seq, strlen($method)], $this->codec->getIndex());
return new Frame($body, [$this->sequence, \strlen($method)], $this->codec->getIndex());
}
}
9 changes: 6 additions & 3 deletions src/RPC/MultiRPC.php
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,10 @@ public function call(string $method, mixed $payload, mixed $options = null): mix
$relay->send($this->packFrame($method, $payload));

// wait for the frame confirmation
$frame = $this->getResponseFromRelay($relay, self::$seq, true);
$frame = $this->getResponseFromRelay($relay, $this->sequence, true);

self::$seq++;
$this->sequence++;
Comment on lines +153 to +156
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Initialize $sequence property before usage

The $sequence property is used here but doesn't appear to be initialized in the class. This could lead to undefined behavior or errors due to null or unexpected initial values. Consider initializing $sequence in the constructor.

Apply this diff to initialize $sequence:

 public function __construct(
     array $relays,
     int $asyncBufferThreshold = self::DEFAULT_BUFFER_THRESHOLD,
     CodecInterface $codec = new JsonCodec()
 ) {
+    $this->sequence = 0;
     // existing code...
 }

Committable suggestion was skipped due to low confidence.


return $this->decodeResponse($frame, $relay, $options);
}
Expand All @@ -164,8 +165,9 @@ public function callIgnoreResponse(string $method, mixed $payload): void

$relay->send($this->packFrame($method, $payload));

$seq = self::$seq;
$seq = $this->sequence;
self::$seq++;
$this->sequence++;
self::$occupiedRelays[$seq] = $relay;
// Last index so no need for array_pop or stuff
unset(self::$freeRelays[$relayIndex]);
Expand All @@ -186,8 +188,9 @@ public function callAsync(string $method, mixed $payload): int

$relay->send($this->packFrame($method, $payload));

$seq = self::$seq;
$seq = $this->sequence;
self::$seq++;
$this->sequence++;
self::$occupiedRelays[$seq] = $relay;
self::$seqToRelayMap[$seq] = $relay;
// Last index so no need for array_pop or stuff
Expand Down
5 changes: 3 additions & 2 deletions src/RPC/RPC.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class RPC extends AbstractRPC

public function __construct(
private readonly RelayInterface $relay,
CodecInterface $codec = new JsonCodec(),
CodecInterface $codec = new JsonCodec(),
)
{
parent::__construct($codec);
Expand All @@ -32,11 +32,12 @@ public function call(string $method, mixed $payload, mixed $options = null): mix
throw new RPCException('Invalid RPC frame, options missing');
}

if ($frame->options[0] !== self::$seq) {
if ($frame->options[0] !== $this->sequence) {
throw new RPCException('Invalid RPC frame, sequence mismatch');
}

self::$seq++;
$this->sequence++;

Comment on lines +35 to 41
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Approve instance-based sequence management, but address inconsistency.

The changes from self::$seq to $this->sequence improve encapsulation and allow for better handling of concurrent RPC instances. This aligns with similar changes in other RPC classes.

However, there's an inconsistency:

The line self::$seq++; should be removed as it's no longer needed and inconsistent with the new instance-based approach.

Apply this diff to resolve the issue:

 if ($frame->options[0] !== $this->sequence) {
     throw new RPCException('Invalid RPC frame, sequence mismatch');
 }

-self::$seq++;
 $this->sequence++;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if ($frame->options[0] !== $this->sequence) {
throw new RPCException('Invalid RPC frame, sequence mismatch');
}
self::$seq++;
$this->sequence++;
if ($frame->options[0] !== $this->sequence) {
throw new RPCException('Invalid RPC frame, sequence mismatch');
}
$this->sequence++;

return $this->decodeResponse($frame, $this->relay, $options);
}
Expand Down
54 changes: 54 additions & 0 deletions tests/Goridge/RPC.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use Exception;
use PHPUnit\Framework\TestCase;
use Spiral\Goridge\Frame;
use Spiral\Goridge\RelayInterface;
use Spiral\Goridge\RPC\Codec\RawCodec;
use Spiral\Goridge\RPC\Exception\CodecException;
Expand Down Expand Up @@ -249,6 +250,59 @@ public function testJsonException(): void
$conn->call('Service.Process', random_bytes(256));
}

public function testCallSequence(): void
{
$relay1Matcher = $this->exactly(3);
$relay1 = $this->createMock(RelayInterface::class);
$relay1
->method('waitFrame')
->willReturnOnConsecutiveCalls(
new Frame('Service.Process{}', [1, 15]),
new Frame('Service.Process{}', [2, 15]),
new Frame('Service.Process{}', [3, 15])
);
$relay1
->expects($relay1Matcher)
->method('send')
->willReturnCallback(function (Frame $value) use ($relay1Matcher) {
match ($relay1Matcher->numberOfInvocations()) {
1 => $this->assertEquals(new Frame('Service.Process{"Name":"foo","Value":18}', [1, 15], 8), $value),
2 => $this->assertEquals(new Frame('Service.Process{"Name":"foo","Value":18}', [2, 15], 8), $value),
3 => $this->assertEquals(new Frame('Service.Process{"Name":"foo","Value":18}', [3, 15], 8), $value),
};
});

$relay2Matcher = $this->exactly(3);
$relay2 = $this->createMock(RelayInterface::class);
$relay2
->method('waitFrame')
->willReturnOnConsecutiveCalls(
new Frame('Service.Process{}', [1, 15]),
new Frame('Service.Process{}', [2, 15]),
new Frame('Service.Process{}', [3, 15])
);
$relay2
->expects($relay2Matcher)
->method('send')
->willReturnCallback(function (Frame $value) use ($relay2Matcher) {
match ($relay2Matcher->numberOfInvocations()) {
1 => $this->assertEquals(new Frame('Service.Process{"Name":"bar","Value":18}', [1, 15], 8), $value),
2 => $this->assertEquals(new Frame('Service.Process{"Name":"bar","Value":18}', [2, 15], 8), $value),
3 => $this->assertEquals(new Frame('Service.Process{"Name":"bar","Value":18}', [3, 15], 8), $value),
};
});

$conn1 = new \Spiral\Goridge\RPC\RPC($relay1);
$conn2 = new \Spiral\Goridge\RPC\RPC($relay2);

$conn1->call('Service.Process', ['Name' => 'foo', 'Value' => 18]);
$conn2->call('Service.Process', ['Name' => 'bar', 'Value' => 18]);
$conn1->call('Service.Process', ['Name' => 'foo', 'Value' => 18]);
$conn2->call('Service.Process', ['Name' => 'bar', 'Value' => 18]);
$conn1->call('Service.Process', ['Name' => 'foo', 'Value' => 18]);
$conn2->call('Service.Process', ['Name' => 'bar', 'Value' => 18]);
}

/**
* @return GoridgeRPC
*/
Expand Down
Loading