Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 56 additions & 13 deletions src/ClientProxy.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Nashgao\MQTT;

use Hyperf\Contract\StdoutLoggerInterface;
use Hyperf\Context\ApplicationContext;
use Hyperf\Engine\Channel;
use Nashgao\MQTT\Config\ClientConfig;
Expand All @@ -22,6 +23,8 @@ class ClientProxy extends Client

protected EventDispatcherInterface $dispatcher;

protected StdoutLoggerInterface $logger;

protected string $poolName;

protected ClientConfig $config;
Expand All @@ -35,6 +38,7 @@ public function __construct(ClientConfig $config, string $poolName)
$this->config = $config;
$this->poolName = $poolName;
$this->dispatcher = ApplicationContext::getContainer()->get(EventDispatcherInterface::class);
$this->logger = ApplicationContext::getContainer()->get(StdoutLoggerInterface::class);
$this->channel = new Channel();
$this->timeSincePing = time();
parent::__construct($config->host, $config->port, $config->clientConfig, $config->clientType);
Expand Down Expand Up @@ -90,15 +94,21 @@ public function unsubscribe(array $topics, array $properties = [])
public function receive()
{
$cont = new Channel();
// 检查是否需要发送PING请求以保持连接
// Check if ping is needed
if ((time() - $this->timeSincePing) >= $this->config->clientConfig->getKeepAlive()) {
call_user_func([$this, 'executePing']);
}

$this->channel->push(
function () use ($cont) {
$message = parent::recv();
if ($this->timeSincePing <= (time() - ($this->config->clientConfig->getKeepAlive() + $this->delayTime))) {
if (parent::ping()) {
$this->timeSincePing = time();
} else {
return $cont->push(false);
}
// 计算延迟时间
$delayDateTime = (time() - ($this->config->clientConfig->getKeepAlive() + $this->delayTime));
// 检查是否需要发送PING请求以保持连接
// 在接收到消息后,检查是否需要执行ping操作
if ($this->timeSincePing <= $delayDateTime) {
call_user_func([$this, 'executePing']);
}

if (! is_bool($message)) {
Expand All @@ -107,19 +117,16 @@ function () use ($cont) {
parent::send(['type' => Types::PUBACK, 'message_id' => $message['message_id']]);
}

// 对于QoS为2的消息,发送PUBREC响应
/* qos 2 pubrel */
if ($message['type'] === Types::PUBLISH && $message['qos'] === Qos::QOS_EXACTLY_ONCE) {
parent::send(['type' => Types::PUBREC, 'message_id' => $message['message_id']]);
parent::send(['type' => Types::PUBREC, 'message_id' => $message['message_id']], false);
}

// 对于收到的PUBREL消息,发送PUBCOMP响应
/* qos 2 pub comp */
if ($message['type'] === Types::PUBREL) {
parent::send(
[
'type' => Types::PUBCOMP,
'message_id' => $message['message_id'],
]
);
parent::send(['type' => Types::PUBCOMP, 'message_id' => $message['message_id']], false);
}

if ($message['type'] === Types::DISCONNECT) {
Expand Down Expand Up @@ -158,4 +165,40 @@ function () use ($cont) {

return $cont->pop($this->config->clientConfig->getKeepAlive() + $this->delayTime); /* 30 seconds to pop the channel, since subscribe may not be able to receive the disconnect info immediately */
}

/**
* 发送心跳包(此方法重写了父类的心跳包方法,并增加了记录最后一次成功心跳时间的功能。)
* Author: m
* DateTime: 2024/11/7 15:54
* @return bool 如果心跳成功则返回true,否则返回false。
* Remark
*/
private function executePing(): bool
{
try {
// 调用父类的心跳方法尝试与服务器进行心跳通信。
if (!parent::ping()) {
throw new \Exception("mqtt ping failed");
}

// 如果心跳成功,记录当前时间为最后一次成功心跳的时间。
$this->timeSincePing = time();
// 输出心跳操作的结果以及自上次成功心跳以来的时间。
$this->logger->debug("mqtt ping success,timestamp=" . date('Y-m-d H:i:s', intval($this->timeSincePing)));
} catch (\Exception $e) {
// 捕获并处理在心跳过程中发生的异常。
echo "心跳过程中发生错误:" . $e->getMessage() . "\n";
$this->logger->warning(sprintf("%s: %s(%s) in %s:%s\nStack trace:\n%s",
get_class($e),
$e->getMessage(),
$e->getCode(),
$e->getFile(),
$e->getLine(),
$e->getTraceAsString())
);
return false;
}

return true;
}
}