Skip to content

Commit

Permalink
feat: add async metrics implementation (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
L3tum authored Sep 15, 2024
1 parent ad5e245 commit 9a7518e
Show file tree
Hide file tree
Showing 9 changed files with 350 additions and 40 deletions.
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
"require": {
"php": ">=8.1",
"psr/log": ">=2.0",
"spiral/goridge": "^4.0",
"spiral/goridge": "^4.2",
"spiral/roadrunner": "^2023.1 || ^2024.1"
},
"autoload": {
Expand Down
13 changes: 13 additions & 0 deletions src/AbstractMetrics.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<?php

namespace Spiral\RoadRunner\Metrics;

use Spiral\Goridge\RPC\RPCInterface;

abstract class AbstractMetrics implements MetricsInterface
{
/**
* @var string
*/
protected const SERVICE_NAME = 'metrics';
}
29 changes: 14 additions & 15 deletions src/Metrics.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,24 @@

namespace Spiral\RoadRunner\Metrics;

use Spiral\Goridge\RPC\AsyncRPCInterface;
use Spiral\Goridge\RPC\Exception\ServiceException;
use Spiral\Goridge\RPC\RPCInterface;
use Spiral\RoadRunner\Metrics\Exception\MetricsException;
use function compact;
use function str_contains;

class Metrics implements MetricsInterface
class Metrics extends AbstractMetrics
{
private const SERVICE_NAME = 'metrics';

private readonly RPCInterface $rpc;

public function __construct(RPCInterface $rpc)
{
$this->rpc = $rpc->withServicePrefix(self::SERVICE_NAME);
public function __construct(
protected readonly RPCInterface $rpc
) {
}

public function add(string $name, float $value, array $labels = []): void
{
try {
$this->rpc->call('Add', \compact('name', 'value', 'labels'));
$this->rpc->call('metrics.Add', compact('name', 'value', 'labels'));
} catch (ServiceException $e) {
throw new MetricsException($e->getMessage(), $e->getCode(), $e);
}
Expand All @@ -31,7 +30,7 @@ public function add(string $name, float $value, array $labels = []): void
public function sub(string $name, float $value, array $labels = []): void
{
try {
$this->rpc->call('Sub', \compact('name', 'value', 'labels'));
$this->rpc->call('metrics.Sub', compact('name', 'value', 'labels'));
} catch (ServiceException $e) {
throw new MetricsException($e->getMessage(), $e->getCode(), $e);
}
Expand All @@ -40,7 +39,7 @@ public function sub(string $name, float $value, array $labels = []): void
public function observe(string $name, float $value, array $labels = []): void
{
try {
$this->rpc->call('Observe', \compact('name', 'value', 'labels'));
$this->rpc->call('metrics.Observe', compact('name', 'value', 'labels'));
} catch (ServiceException $e) {
throw new MetricsException($e->getMessage(), $e->getCode(), $e);
}
Expand All @@ -49,7 +48,7 @@ public function observe(string $name, float $value, array $labels = []): void
public function set(string $name, float $value, array $labels = []): void
{
try {
$this->rpc->call('Set', \compact('name', 'value', 'labels'));
$this->rpc->call('metrics.Set', compact('name', 'value', 'labels'));
} catch (ServiceException $e) {
throw new MetricsException($e->getMessage(), $e->getCode(), $e);
}
Expand All @@ -58,12 +57,12 @@ public function set(string $name, float $value, array $labels = []): void
public function declare(string $name, CollectorInterface $collector): void
{
try {
$this->rpc->call('Declare', [
$this->rpc->call('metrics.Declare', [
'name' => $name,
'collector' => $collector->toArray(),
]);
} catch (ServiceException $e) {
if (\str_contains($e->getMessage(), 'tried to register existing collector')) {
if (str_contains($e->getMessage(), 'tried to register existing collector')) {
// suppress duplicate metric error
return;
}
Expand All @@ -75,7 +74,7 @@ public function declare(string $name, CollectorInterface $collector): void
public function unregister(string $name): void
{
try {
$this->rpc->call('Unregister', $name);
$this->rpc->call('metrics.Unregister', $name);
} catch (ServiceException $e) {
throw new MetricsException($e->getMessage(), $e->getCode(), $e);
}
Expand Down
36 changes: 30 additions & 6 deletions src/MetricsFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use Spiral\Goridge\RPC\AsyncRPCInterface;
use Spiral\Goridge\RPC\RPCInterface;

class MetricsFactory
Expand All @@ -13,18 +14,41 @@ public function __construct(
) {
}

public function create(RPCInterface $rpc, MetricsOptions $options): MetricsInterface
public function create(RPCInterface $rpc, MetricsOptions $options = new MetricsOptions()): MetricsInterface
{
$metrics = new RetryMetrics(
new Metrics($rpc),
$options->retryAttempts,
$options->retrySleepMicroseconds,
);
if ($options->ignoreResponsesWherePossible && !($rpc instanceof AsyncRPCInterface)) {
$this->logger->warning("ignoreResponsesWherePossible is true but no AsyncRPCInterface provided");
} elseif (!$options->ignoreResponsesWherePossible && $rpc instanceof AsyncRPCInterface) {
$this->logger->warning("ignoreResponsesWherePossible is false but an AsyncRPCInterface was provided");
}

if ($options->ignoreResponsesWherePossible && $rpc instanceof AsyncRPCInterface) {
$metrics = new MetricsIgnoreResponse($rpc);
} else {
$metrics = new Metrics($rpc);
}

if ($options->retryAttempts > 0) {
$metrics = new RetryMetrics(
$metrics,
$options->retryAttempts,
$options->retrySleepMicroseconds,
);
}

if ($options->suppressExceptions) {
$metrics = new SuppressExceptionsMetrics($metrics, $this->logger);
}

return $metrics;
}

public static function createMetrics(
RPCInterface $rpc,
MetricsOptions $options = new MetricsOptions(),
LoggerInterface $logger = new NullLogger()
): MetricsInterface
{
return (new self($logger))->create($rpc, $options);
}
}
77 changes: 77 additions & 0 deletions src/MetricsIgnoreResponse.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
<?php

namespace Spiral\RoadRunner\Metrics;

use Spiral\Goridge\RPC\AsyncRPCInterface;
use Spiral\Goridge\RPC\Exception\ServiceException;
use Spiral\RoadRunner\Metrics\Exception\MetricsException;

class MetricsIgnoreResponse extends AbstractMetrics
{
public function __construct(
protected readonly AsyncRPCInterface $rpc
) {
}

public function add(string $name, float $value, array $labels = []): void
{
try {
$this->rpc->callIgnoreResponse('metrics.Add', compact('name', 'value', 'labels'));
} catch (ServiceException $e) {
throw new MetricsException($e->getMessage(), $e->getCode(), $e);
}
}

public function sub(string $name, float $value, array $labels = []): void
{
try {
$this->rpc->callIgnoreResponse('metrics.Sub', compact('name', 'value', 'labels'));
} catch (ServiceException $e) {
throw new MetricsException($e->getMessage(), $e->getCode(), $e);
}
}

public function observe(string $name, float $value, array $labels = []): void
{
try {
$this->rpc->callIgnoreResponse('metrics.Observe', compact('name', 'value', 'labels'));
} catch (ServiceException $e) {
throw new MetricsException($e->getMessage(), $e->getCode(), $e);
}
}

public function set(string $name, float $value, array $labels = []): void
{
try {
$this->rpc->callIgnoreResponse('metrics.Set', compact('name', 'value', 'labels'));
} catch (ServiceException $e) {
throw new MetricsException($e->getMessage(), $e->getCode(), $e);
}
}

public function declare(string $name, CollectorInterface $collector): void
{
try {
$this->rpc->call('metrics.Declare', [
'name' => $name,
'collector' => $collector->toArray(),
]);
} catch (ServiceException $e) {
if (str_contains($e->getMessage(), 'tried to register existing collector')) {
// suppress duplicate metric error
return;
}

throw new MetricsException($e->getMessage(), $e->getCode(), $e);
}
}

public function unregister(string $name): void
{
try {
$this->rpc->call('metrics.Unregister', $name);
} catch (ServiceException $e) {
throw new MetricsException($e->getMessage(), $e->getCode(), $e);
}
}
}
7 changes: 5 additions & 2 deletions src/MetricsOptions.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@
class MetricsOptions
{
/**
* @param int<0, max> $retryAttempts
* @param int<0, max> $retrySleepMicroseconds
* @param int<0, max> $retryAttempts Number of retry attempts done
* @param int<0, max> $retrySleepMicroseconds Amount of microSeconds slept between retry attempts
* @param bool $suppressExceptions Whether to suppress the exceptions usually thrown if something went wrong
* @param bool $ignoreResponsesWherePossible Whether to use the new callIgnoreResponse RPC interface to speed up Metric collection. May result in lost metrics
*/
public function __construct(
public readonly int $retryAttempts = 3,
public readonly int $retrySleepMicroseconds = 50,
public readonly bool $suppressExceptions = false,
public readonly bool $ignoreResponsesWherePossible = false
) {
}
}
75 changes: 69 additions & 6 deletions tests/Unit/MetricsFactoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

namespace Spiral\RoadRunner\Metrics\Tests\Unit;

use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
use Spiral\Goridge\RPC\RPC;
use Psr\Log\LoggerInterface;
use Spiral\Goridge\RPC\AsyncRPCInterface;
use Spiral\Goridge\RPC\RPCInterface;
use Spiral\RoadRunner\Metrics\MetricsFactory;
use Spiral\RoadRunner\Metrics\MetricsIgnoreResponse;
use Spiral\RoadRunner\Metrics\MetricsOptions;
use Spiral\RoadRunner\Metrics\RetryMetrics;
use Spiral\RoadRunner\Metrics\SuppressExceptionsMetrics;
Expand All @@ -15,28 +18,88 @@ final class MetricsFactoryTest extends TestCase
/**
* @dataProvider providerForTestCreate
*/
public function testCreate(MetricsOptions $options, string $expectedClass): void
public function testCreate(MetricsOptions $options, string $expectedClass, string $rpcInterfaceClass): void
{
$factory = new MetricsFactory();

$rpc = $this->createMock(RPCInterface::class);
$rpc->expects($this->once())->method('withServicePrefix')
->with('metrics')
->willReturn($rpc);
/** @var MockObject&RPCInterface $rpc */
$rpc = $this->createMock($rpcInterfaceClass);

self::assertInstanceOf($expectedClass, $factory->create($rpc, $options));
}

/**
* @dataProvider providerForTestCreate
*/
public function testCreateStatic(MetricsOptions $options, string $expectedClass, string $rpcInterfaceClass): void
{
/** @var MockObject&RPCInterface $rpc */
$rpc = $this->createMock($rpcInterfaceClass);

self::assertInstanceOf($expectedClass, MetricsFactory::createMetrics($rpc, $options));
}

public function testLogsIfIgnoreResponseButNoAsyncRPCInterface(): void
{
$logger = $this->createMock(LoggerInterface::class);
$logger->expects($this->once())->method('warning')
->with('ignoreResponsesWherePossible is true but no AsyncRPCInterface provided');

$rpc = $this->createMock(RPCInterface::class);

$factory = new MetricsFactory($logger);
$factory->create($rpc, new MetricsOptions(ignoreResponsesWherePossible: true));
}

public function testLogsIfAsyncRPCInterfaceButNoIgnoreResponses(): void
{
$logger = $this->createMock(LoggerInterface::class);
$logger->expects($this->once())->method('warning')
->with('ignoreResponsesWherePossible is false but an AsyncRPCInterface was provided');

$rpc = $this->createMock(AsyncRPCInterface::class);

$factory = new MetricsFactory($logger);
$factory->create($rpc, new MetricsOptions(ignoreResponsesWherePossible: false));
}

public static function providerForTestCreate(): array
{
return [
'create RetryMetrics' => [
'options' => new MetricsOptions(),
'expectedClass' => RetryMetrics::class,
'rpcInterfaceClass' => RPCInterface::class
],
'create SuppressExceptionsMetrics' => [
'options' => new MetricsOptions(suppressExceptions: true),
'expectedClass' => SuppressExceptionsMetrics::class,
'rpcInterfaceClass' => RPCInterface::class
],
'create Metrics if no AsyncRPCInterface' => [
'options' => new MetricsOptions(ignoreResponsesWherePossible: true),
'expectedClass' => RetryMetrics::class,
'rpcInterfaceClass' => RPCInterface::class
],
'create Metrics if AsyncRPCInterface but ignoreResponse... false' => [
'options' => new MetricsOptions(ignoreResponsesWherePossible: false),
'expectedClass' => RetryMetrics::class,
'rpcInterfaceClass' => RPCInterface::class
],
'create MetricsIgnoreResponse if AsyncRPCInterface' => [
'options' => new MetricsOptions(retryAttempts: 0, suppressExceptions: false, ignoreResponsesWherePossible: true),
'expectedClass' => MetricsIgnoreResponse::class,
'rpcInterfaceClass' => AsyncRPCInterface::class
],
'create MetricsIgnoreResponse with RetryMetrics if AsyncRPCInterface' => [
'options' => new MetricsOptions(retryAttempts: 3, suppressExceptions: false, ignoreResponsesWherePossible: true),
'expectedClass' => RetryMetrics::class,
'rpcInterfaceClass' => AsyncRPCInterface::class
],
'create MetricsIgnoreResponse with SuppressExceptions if AsyncRPCInterface' => [
'options' => new MetricsOptions(retryAttempts: 3, suppressExceptions: true, ignoreResponsesWherePossible: true),
'expectedClass' => SuppressExceptionsMetrics::class,
'rpcInterfaceClass' => AsyncRPCInterface::class
],
];
}
Expand Down
Loading

0 comments on commit 9a7518e

Please sign in to comment.