Skip to content

Commit aaabf0a

Browse files
committed
Added connection monitoring
1 parent d44df87 commit aaabf0a

File tree

5 files changed

+39
-18
lines changed

5 files changed

+39
-18
lines changed

Diff for: examples/basic.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
}
2929

3030
yield $channel->consume(function (Message $message, Channel $channel) {
31-
echo $message->content() . \PHP_EOL;
31+
echo $message->content . \PHP_EOL;
3232

3333
yield $channel->ack($message);
3434
}, 'basic_queue');

Diff for: examples/worker.php

+3-3
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
echo '[*] Waiting for messages. To exit press CTRL+C', \PHP_EOL;
3333

3434
$tag = yield $channel->consume(function (Message $message, Channel $channel) {
35-
echo "[x] Received message: {$message->content()}.", \PHP_EOL;
35+
echo "[x] Received message: {$message->content}.", \PHP_EOL;
3636

3737
// Do some work - we generate password hashes with a high cost
3838
// sleep() gets interrupted by Ctrl+C so it's not very good for demos
@@ -43,12 +43,12 @@
4343
password_hash(random_bytes(255), PASSWORD_BCRYPT, ["cost" => 15]);
4444
}
4545

46-
echo "[x] Done ", $message->content(), \PHP_EOL;
46+
echo "[x] Done ", $message->content, \PHP_EOL;
4747

4848
try {
4949
yield $channel->ack($message);
5050

51-
echo "ACK SUCCESS:: {$message->content()}", \PHP_EOL;
51+
echo "ACK SUCCESS:: {$message->content}", \PHP_EOL;
5252
} catch (\Throwable $error) {
5353

5454
echo "ACK FAILED:: {$error->getMessage()}", \PHP_EOL;

Diff for: src/Client.php

+25-1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
namespace PHPinnacle\Ridge;
1414

15+
use Amp\Loop;
1516
use function Amp\asyncCall;
1617
use function Amp\call;
1718
use Amp\Deferred;
@@ -24,6 +25,8 @@ final class Client
2425
private const STATE_CONNECTED = 2;
2526
private const STATE_DISCONNECTING = 3;
2627

28+
private const CONNECTION_MONITOR_INTERVAL = 5000;
29+
2730
/**
2831
* @var Config
2932
*/
@@ -54,6 +57,11 @@ final class Client
5457
*/
5558
private $properties;
5659

60+
/**
61+
* @var string|null
62+
*/
63+
private $connectionMonitorWatcherId;
64+
5765
public function __construct(Config $config)
5866
{
5967
$this->config = $config;
@@ -91,7 +99,7 @@ function () {
9199

92100
$this->state = self::STATE_CONNECTING;
93101

94-
$this->connection = new Connection($this->config->uri(), fn() => $this->state = self::STATE_NOT_CONNECTED);
102+
$this->connection = new Connection($this->config->uri());
95103

96104
yield $this->connection->open(
97105
$this->config->timeout,
@@ -132,6 +140,16 @@ function () {
132140
);
133141

134142
$this->state = self::STATE_CONNECTED;
143+
144+
$this->connectionMonitorWatcherId = Loop::repeat(
145+
self::CONNECTION_MONITOR_INTERVAL,
146+
function(): void
147+
{
148+
if($this->connection->connected() === false) {
149+
throw Exception\ClientException::disconnected();
150+
}
151+
}
152+
);
135153
}
136154
);
137155
}
@@ -153,6 +171,12 @@ function () use ($code, $reason) {
153171
throw Exception\ClientException::notConnected();
154172
}
155173

174+
if($this->connectionMonitorWatcherId !== null){
175+
Loop::cancel($this->connectionMonitorWatcherId);
176+
177+
$this->connectionMonitorWatcherId = null;
178+
}
179+
156180
$this->state = self::STATE_DISCONNECTING;
157181

158182
if ($code === 0) {

Diff for: src/Connection.php

+6-13
Original file line numberDiff line numberDiff line change
@@ -58,16 +58,15 @@ final class Connection
5858
*/
5959
private $heartbeatWatcherId;
6060

61-
/**
62-
* @var callable|null
63-
*/
64-
private $connectionLost;
65-
66-
public function __construct(string $uri, ?callable $connectionLost = null)
61+
public function __construct(string $uri)
6762
{
6863
$this->uri = $uri;
6964
$this->parser = new Parser;
70-
$this->connectionLost = $connectionLost;
65+
}
66+
67+
public function connected(): bool
68+
{
69+
return $this->socket !== null && $this->socket->isClosed() === false;
7170
}
7271

7372
/**
@@ -195,12 +194,10 @@ function (string $watcherId) use ($interval){
195194
}
196195

197196
if (
198-
null !== $this->connectionLost &&
199197
0 !== $this->lastRead &&
200198
$currentTime > ($this->lastRead + $interval + 1000)
201199
)
202200
{
203-
call_user_func($this->connectionLost);
204201
Loop::cancel($watcherId);
205202
}
206203

@@ -218,10 +215,6 @@ public function close(): void
218215
$this->heartbeatWatcherId = null;
219216
}
220217

221-
if ($this->connectionLost !== null) {
222-
call_user_func($this->connectionLost);
223-
}
224-
225218
if ($this->socket !== null) {
226219
$this->socket->close();
227220
}

Diff for: src/Exception/ClientException.php

+4
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ public static function notConnected(): self
2626
return new self('Client is not connected to server.');
2727
}
2828

29+
public static function disconnected(): self {
30+
return new self('The client was unexpectedly disconnected from the server');
31+
}
32+
2933
public static function alreadyConnected(): self
3034
{
3135
return new self('Client is already connected/connecting.');

0 commit comments

Comments
 (0)