Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Make use of new "Async" RPC #13

Merged
merged 9 commits into from
Sep 15, 2024
Merged
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
"require": {
"php": ">=8.1",
"psr/log": ">=2.0",
"spiral/goridge": "^4.0",
"spiral/goridge": "^4.2",
"spiral/roadrunner": "^2023.1 || ^2024.1"
},
"autoload": {
Expand Down
13 changes: 13 additions & 0 deletions src/AbstractMetrics.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?php

namespace Spiral\RoadRunner\Metrics;

use Spiral\Goridge\RPC\RPCInterface;

abstract class AbstractMetrics implements MetricsInterface
{
/**
* @var string
*/
protected const SERVICE_NAME = 'metrics';
}
29 changes: 14 additions & 15 deletions src/Metrics.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,24 @@

namespace Spiral\RoadRunner\Metrics;

use Spiral\Goridge\RPC\AsyncRPCInterface;
use Spiral\Goridge\RPC\Exception\ServiceException;
use Spiral\Goridge\RPC\RPCInterface;
use Spiral\RoadRunner\Metrics\Exception\MetricsException;
use function compact;
use function str_contains;

class Metrics implements MetricsInterface
class Metrics extends AbstractMetrics
{
private const SERVICE_NAME = 'metrics';

private readonly RPCInterface $rpc;

public function __construct(RPCInterface $rpc)
{
$this->rpc = $rpc->withServicePrefix(self::SERVICE_NAME);
public function __construct(
protected readonly RPCInterface $rpc
) {
}

public function add(string $name, float $value, array $labels = []): void
{
try {
$this->rpc->call('Add', \compact('name', 'value', 'labels'));
$this->rpc->call('metrics.Add', compact('name', 'value', 'labels'));
} catch (ServiceException $e) {
throw new MetricsException($e->getMessage(), $e->getCode(), $e);
}
Expand All @@ -31,7 +30,7 @@ public function add(string $name, float $value, array $labels = []): void
public function sub(string $name, float $value, array $labels = []): void
{
try {
$this->rpc->call('Sub', \compact('name', 'value', 'labels'));
$this->rpc->call('metrics.Sub', compact('name', 'value', 'labels'));
} catch (ServiceException $e) {
throw new MetricsException($e->getMessage(), $e->getCode(), $e);
}
Expand All @@ -40,7 +39,7 @@ public function sub(string $name, float $value, array $labels = []): void
public function observe(string $name, float $value, array $labels = []): void
{
try {
$this->rpc->call('Observe', \compact('name', 'value', 'labels'));
$this->rpc->call('metrics.Observe', compact('name', 'value', 'labels'));
} catch (ServiceException $e) {
throw new MetricsException($e->getMessage(), $e->getCode(), $e);
}
Expand All @@ -49,7 +48,7 @@ public function observe(string $name, float $value, array $labels = []): void
public function set(string $name, float $value, array $labels = []): void
{
try {
$this->rpc->call('Set', \compact('name', 'value', 'labels'));
$this->rpc->call('metrics.Set', compact('name', 'value', 'labels'));
} catch (ServiceException $e) {
throw new MetricsException($e->getMessage(), $e->getCode(), $e);
}
Expand All @@ -58,12 +57,12 @@ public function set(string $name, float $value, array $labels = []): void
public function declare(string $name, CollectorInterface $collector): void
{
try {
$this->rpc->call('Declare', [
$this->rpc->call('metrics.Declare', [
'name' => $name,
'collector' => $collector->toArray(),
]);
} catch (ServiceException $e) {
if (\str_contains($e->getMessage(), 'tried to register existing collector')) {
if (str_contains($e->getMessage(), 'tried to register existing collector')) {
// suppress duplicate metric error
return;
}
Expand All @@ -75,7 +74,7 @@ public function declare(string $name, CollectorInterface $collector): void
public function unregister(string $name): void
{
try {
$this->rpc->call('Unregister', $name);
$this->rpc->call('metrics.Unregister', $name);
} catch (ServiceException $e) {
throw new MetricsException($e->getMessage(), $e->getCode(), $e);
}
Expand Down
36 changes: 30 additions & 6 deletions src/MetricsFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use Spiral\Goridge\RPC\AsyncRPCInterface;
use Spiral\Goridge\RPC\RPCInterface;

class MetricsFactory
Expand All @@ -13,18 +14,41 @@ public function __construct(
) {
}

public function create(RPCInterface $rpc, MetricsOptions $options): MetricsInterface
public function create(RPCInterface $rpc, MetricsOptions $options = new MetricsOptions()): MetricsInterface
{
$metrics = new RetryMetrics(
new Metrics($rpc),
$options->retryAttempts,
$options->retrySleepMicroseconds,
);
if ($options->ignoreResponsesWherePossible && !($rpc instanceof AsyncRPCInterface)) {
$this->logger->warning("ignoreResponsesWherePossible is true but no AsyncRPCInterface provided");
} elseif (!$options->ignoreResponsesWherePossible && $rpc instanceof AsyncRPCInterface) {
$this->logger->warning("ignoreResponsesWherePossible is false but an AsyncRPCInterface was provided");
}

if ($options->ignoreResponsesWherePossible && $rpc instanceof AsyncRPCInterface) {
$metrics = new MetricsIgnoreResponse($rpc);
} else {
$metrics = new Metrics($rpc);
}

if ($options->retryAttempts > 0) {
$metrics = new RetryMetrics(
$metrics,
$options->retryAttempts,
$options->retrySleepMicroseconds,
);
}

if ($options->suppressExceptions) {
$metrics = new SuppressExceptionsMetrics($metrics, $this->logger);
}

return $metrics;
}

public static function createMetrics(
RPCInterface $rpc,
MetricsOptions $options = new MetricsOptions(),
LoggerInterface $logger = new NullLogger()
): MetricsInterface
{
return (new self($logger))->create($rpc, $options);
}
}
77 changes: 77 additions & 0 deletions src/MetricsIgnoreResponse.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
<?php

namespace Spiral\RoadRunner\Metrics;

use Spiral\Goridge\RPC\AsyncRPCInterface;
use Spiral\Goridge\RPC\Exception\ServiceException;
use Spiral\RoadRunner\Metrics\Exception\MetricsException;

class MetricsIgnoreResponse extends AbstractMetrics
{
public function __construct(
protected readonly AsyncRPCInterface $rpc
) {
}

public function add(string $name, float $value, array $labels = []): void
{
try {
$this->rpc->callIgnoreResponse('metrics.Add', compact('name', 'value', 'labels'));
} catch (ServiceException $e) {
throw new MetricsException($e->getMessage(), $e->getCode(), $e);
}
}

public function sub(string $name, float $value, array $labels = []): void
{
try {
$this->rpc->callIgnoreResponse('metrics.Sub', compact('name', 'value', 'labels'));
} catch (ServiceException $e) {
throw new MetricsException($e->getMessage(), $e->getCode(), $e);
}
}

public function observe(string $name, float $value, array $labels = []): void
{
try {
$this->rpc->callIgnoreResponse('metrics.Observe', compact('name', 'value', 'labels'));
} catch (ServiceException $e) {
throw new MetricsException($e->getMessage(), $e->getCode(), $e);
}
}

public function set(string $name, float $value, array $labels = []): void
{
try {
$this->rpc->callIgnoreResponse('metrics.Set', compact('name', 'value', 'labels'));
} catch (ServiceException $e) {
throw new MetricsException($e->getMessage(), $e->getCode(), $e);
}
}

public function declare(string $name, CollectorInterface $collector): void
{
try {
$this->rpc->call('metrics.Declare', [
'name' => $name,
'collector' => $collector->toArray(),
]);
} catch (ServiceException $e) {
if (str_contains($e->getMessage(), 'tried to register existing collector')) {
// suppress duplicate metric error
return;
}

throw new MetricsException($e->getMessage(), $e->getCode(), $e);
}
}

public function unregister(string $name): void
{
try {
$this->rpc->call('metrics.Unregister', $name);
} catch (ServiceException $e) {
throw new MetricsException($e->getMessage(), $e->getCode(), $e);
}
}
}
7 changes: 5 additions & 2 deletions src/MetricsOptions.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@
class MetricsOptions
{
/**
* @param int<0, max> $retryAttempts
* @param int<0, max> $retrySleepMicroseconds
* @param int<0, max> $retryAttempts Number of retry attempts done
* @param int<0, max> $retrySleepMicroseconds Amount of microSeconds slept between retry attempts
* @param bool $suppressExceptions Whether to suppress the exceptions usually thrown if something went wrong
* @param bool $ignoreResponsesWherePossible Whether to use the new callIgnoreResponse RPC interface to speed up Metric collection. May result in lost metrics
*/
public function __construct(
public readonly int $retryAttempts = 3,
public readonly int $retrySleepMicroseconds = 50,
public readonly bool $suppressExceptions = false,
public readonly bool $ignoreResponsesWherePossible = false
) {
}
}
75 changes: 69 additions & 6 deletions tests/Unit/MetricsFactoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

namespace Spiral\RoadRunner\Metrics\Tests\Unit;

use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
use Spiral\Goridge\RPC\RPC;
use Psr\Log\LoggerInterface;
use Spiral\Goridge\RPC\AsyncRPCInterface;
use Spiral\Goridge\RPC\RPCInterface;
use Spiral\RoadRunner\Metrics\MetricsFactory;
use Spiral\RoadRunner\Metrics\MetricsIgnoreResponse;
use Spiral\RoadRunner\Metrics\MetricsOptions;
use Spiral\RoadRunner\Metrics\RetryMetrics;
use Spiral\RoadRunner\Metrics\SuppressExceptionsMetrics;
Expand All @@ -15,28 +18,88 @@ final class MetricsFactoryTest extends TestCase
/**
* @dataProvider providerForTestCreate
*/
public function testCreate(MetricsOptions $options, string $expectedClass): void
public function testCreate(MetricsOptions $options, string $expectedClass, string $rpcInterfaceClass): void
{
$factory = new MetricsFactory();

$rpc = $this->createMock(RPCInterface::class);
$rpc->expects($this->once())->method('withServicePrefix')
->with('metrics')
->willReturn($rpc);
/** @var MockObject&RPCInterface $rpc */
$rpc = $this->createMock($rpcInterfaceClass);

self::assertInstanceOf($expectedClass, $factory->create($rpc, $options));
}

/**
* @dataProvider providerForTestCreate
*/
public function testCreateStatic(MetricsOptions $options, string $expectedClass, string $rpcInterfaceClass): void
{
/** @var MockObject&RPCInterface $rpc */
$rpc = $this->createMock($rpcInterfaceClass);

self::assertInstanceOf($expectedClass, MetricsFactory::createMetrics($rpc, $options));
}

public function testLogsIfIgnoreResponseButNoAsyncRPCInterface(): void
{
$logger = $this->createMock(LoggerInterface::class);
$logger->expects($this->once())->method('warning')
->with('ignoreResponsesWherePossible is true but no AsyncRPCInterface provided');

$rpc = $this->createMock(RPCInterface::class);

$factory = new MetricsFactory($logger);
$factory->create($rpc, new MetricsOptions(ignoreResponsesWherePossible: true));
}

public function testLogsIfAsyncRPCInterfaceButNoIgnoreResponses(): void
{
$logger = $this->createMock(LoggerInterface::class);
$logger->expects($this->once())->method('warning')
->with('ignoreResponsesWherePossible is false but an AsyncRPCInterface was provided');

$rpc = $this->createMock(AsyncRPCInterface::class);

$factory = new MetricsFactory($logger);
$factory->create($rpc, new MetricsOptions(ignoreResponsesWherePossible: false));
}

public static function providerForTestCreate(): array
{
return [
'create RetryMetrics' => [
'options' => new MetricsOptions(),
'expectedClass' => RetryMetrics::class,
'rpcInterfaceClass' => RPCInterface::class
],
'create SuppressExceptionsMetrics' => [
'options' => new MetricsOptions(suppressExceptions: true),
'expectedClass' => SuppressExceptionsMetrics::class,
'rpcInterfaceClass' => RPCInterface::class
],
'create Metrics if no AsyncRPCInterface' => [
'options' => new MetricsOptions(ignoreResponsesWherePossible: true),
'expectedClass' => RetryMetrics::class,
'rpcInterfaceClass' => RPCInterface::class
],
'create Metrics if AsyncRPCInterface but ignoreResponse... false' => [
'options' => new MetricsOptions(ignoreResponsesWherePossible: false),
'expectedClass' => RetryMetrics::class,
'rpcInterfaceClass' => RPCInterface::class
],
'create MetricsIgnoreResponse if AsyncRPCInterface' => [
'options' => new MetricsOptions(retryAttempts: 0, suppressExceptions: false, ignoreResponsesWherePossible: true),
'expectedClass' => MetricsIgnoreResponse::class,
'rpcInterfaceClass' => AsyncRPCInterface::class
],
'create MetricsIgnoreResponse with RetryMetrics if AsyncRPCInterface' => [
'options' => new MetricsOptions(retryAttempts: 3, suppressExceptions: false, ignoreResponsesWherePossible: true),
'expectedClass' => RetryMetrics::class,
'rpcInterfaceClass' => AsyncRPCInterface::class
],
'create MetricsIgnoreResponse with SuppressExceptions if AsyncRPCInterface' => [
'options' => new MetricsOptions(retryAttempts: 3, suppressExceptions: true, ignoreResponsesWherePossible: true),
'expectedClass' => SuppressExceptionsMetrics::class,
'rpcInterfaceClass' => AsyncRPCInterface::class
],
];
}
Expand Down
Loading
Loading