diff --git a/src/ClientProxy.php b/src/ClientProxy.php index b9f5af2..8ae1ed4 100644 --- a/src/ClientProxy.php +++ b/src/ClientProxy.php @@ -4,6 +4,7 @@ namespace Nashgao\MQTT; +use Hyperf\Contract\StdoutLoggerInterface; use Hyperf\Context\ApplicationContext; use Hyperf\Engine\Channel; use Nashgao\MQTT\Config\ClientConfig; @@ -22,6 +23,8 @@ class ClientProxy extends Client protected EventDispatcherInterface $dispatcher; + protected StdoutLoggerInterface $logger; + protected string $poolName; protected ClientConfig $config; @@ -35,6 +38,7 @@ public function __construct(ClientConfig $config, string $poolName) $this->config = $config; $this->poolName = $poolName; $this->dispatcher = ApplicationContext::getContainer()->get(EventDispatcherInterface::class); + $this->logger = ApplicationContext::getContainer()->get(StdoutLoggerInterface::class); $this->channel = new Channel(); $this->timeSincePing = time(); parent::__construct($config->host, $config->port, $config->clientConfig, $config->clientType); @@ -90,15 +94,21 @@ public function unsubscribe(array $topics, array $properties = []) public function receive() { $cont = new Channel(); + // 检查是否需要发送PING请求以保持连接 + // Check if ping is needed + if ((time() - $this->timeSincePing) >= $this->config->clientConfig->getKeepAlive()) { + call_user_func([$this, 'executePing']); + } + $this->channel->push( function () use ($cont) { $message = parent::recv(); - if ($this->timeSincePing <= (time() - ($this->config->clientConfig->getKeepAlive() + $this->delayTime))) { - if (parent::ping()) { - $this->timeSincePing = time(); - } else { - return $cont->push(false); - } + // 计算延迟时间 + $delayDateTime = (time() - ($this->config->clientConfig->getKeepAlive() + $this->delayTime)); + // 检查是否需要发送PING请求以保持连接 + // 在接收到消息后,检查是否需要执行ping操作 + if ($this->timeSincePing <= $delayDateTime) { + call_user_func([$this, 'executePing']); } if (! is_bool($message)) { @@ -107,19 +117,16 @@ function () use ($cont) { parent::send(['type' => Types::PUBACK, 'message_id' => $message['message_id']]); } + // 对于QoS为2的消息,发送PUBREC响应 /* qos 2 pubrel */ if ($message['type'] === Types::PUBLISH && $message['qos'] === Qos::QOS_EXACTLY_ONCE) { - parent::send(['type' => Types::PUBREC, 'message_id' => $message['message_id']]); + parent::send(['type' => Types::PUBREC, 'message_id' => $message['message_id']], false); } + // 对于收到的PUBREL消息,发送PUBCOMP响应 /* qos 2 pub comp */ if ($message['type'] === Types::PUBREL) { - parent::send( - [ - 'type' => Types::PUBCOMP, - 'message_id' => $message['message_id'], - ] - ); + parent::send(['type' => Types::PUBCOMP, 'message_id' => $message['message_id']], false); } if ($message['type'] === Types::DISCONNECT) { @@ -158,4 +165,40 @@ function () use ($cont) { return $cont->pop($this->config->clientConfig->getKeepAlive() + $this->delayTime); /* 30 seconds to pop the channel, since subscribe may not be able to receive the disconnect info immediately */ } + + /** + * 发送心跳包(此方法重写了父类的心跳包方法,并增加了记录最后一次成功心跳时间的功能。) + * Author: m + * DateTime: 2024/11/7 15:54 + * @return bool 如果心跳成功则返回true,否则返回false。 + * Remark + */ + private function executePing(): bool + { + try { + // 调用父类的心跳方法尝试与服务器进行心跳通信。 + if (!parent::ping()) { + throw new \Exception("mqtt ping failed"); + } + + // 如果心跳成功,记录当前时间为最后一次成功心跳的时间。 + $this->timeSincePing = time(); + // 输出心跳操作的结果以及自上次成功心跳以来的时间。 + $this->logger->debug("mqtt ping success,timestamp=" . date('Y-m-d H:i:s', intval($this->timeSincePing))); + } catch (\Exception $e) { + // 捕获并处理在心跳过程中发生的异常。 + echo "心跳过程中发生错误:" . $e->getMessage() . "\n"; + $this->logger->warning(sprintf("%s: %s(%s) in %s:%s\nStack trace:\n%s", + get_class($e), + $e->getMessage(), + $e->getCode(), + $e->getFile(), + $e->getLine(), + $e->getTraceAsString()) + ); + return false; + } + + return true; + } }