A PHP MQTT client library for Hyperf framework with MQTT 5 shared subscription support.
composer require nashgao/mqttPublish the configuration file:
php bin/hyperf.php vendor:publish nashgao/mqttuse Nashgao\MQTT\Event\PublishEvent;
use Psr\EventDispatcher\EventDispatcherInterface;
$dispatcher = ApplicationContext::getContainer()->get(EventDispatcherInterface::class);
$dispatcher->dispatch(new PublishEvent('topic/test', 'Hello MQTT!', 2));use Nashgao\MQTT\Event\SubscribeEvent;
use Nashgao\MQTT\Config\TopicConfig;
$event = new SubscribeEvent(topicConfigs: [
new TopicConfig(['topic' => 'topic/test', 'qos' => 2])
]);
$dispatcher->dispatch($event);Nashgao/MQTT is a PHP MQTT client library that provides coroutine-based MQTT 5 functionality integrated with the Hyperf framework. Built on top of simps/mqtt, this library focuses on enabling MQTT 5's shared subscription feature for load-balanced message consumption across multiple subscribers.
The library was specifically designed to leverage MQTT 5's Shared Subscription feature, which allows multiple subscribers to subscribe to the same topic while ensuring only one subscriber receives each message (load balancing). This is particularly useful for:
- Queue Topics:
$queue/topic- Simple load balancing - Shared Topics:
$share/group/topic- Group-based load balancing with custom group names
[subscriber1] got msg1
msg1, msg2, msg3 /
[publisher] ---------------> "$share/g/topic" -- [subscriber2] got msg2
\
[subscriber3] got msg3
Client(src/Client.php): Main facade providing proxy methods for all MQTT operationsClientFactory(src/ClientFactory.php): Factory for creating client instances with proper pool managementClientProxy(src/ClientProxy.php): Proxy implementation handling method delegation and connection management
MQTTConnection(src/MQTTConnection.php): Wrapper around simps/mqtt client with Hyperf coroutine integrationMQTTPool(src/Pool/MQTTPool.php): Connection pool implementation for efficient resource managementPoolFactory(src/Pool/PoolFactory.php): Factory for creating and managing connection pools
ClientConfig(src/Config/ClientConfig.php): Connection configuration (broker, credentials, timeouts)TopicConfig(src/Config/TopicConfig.php): Topic-specific configuration for subscriptions and publishingConfigProvider(src/ConfigProvider.php): Hyperf service provider for dependency injection and event listeners
The library uses Hyperf's event system for decoupled MQTT operations:
Command Events (trigger actions):
PublishEvent: Dispatch to publish messagesSubscribeEvent: Dispatch to subscribe to topics
Notification Events (reactions to MQTT events):
OnReceiveEvent: Triggered when messages are receivedOnPublishEvent: Triggered when messages are publishedOnSubscribeEvent: Triggered when subscriptions are establishedOnDisconnectEvent: Triggered on connection loss
PublishListener: Handles PublishEvent dispatchingSubscribeListener: Handles SubscribeEvent dispatchingAfterWorkerStartListener: Handles auto-subscription on server startupOnReceiveListener: Handles incoming message processingServerIdListener: Manages server identification for clustering
The library supports multiple subscription patterns:
- Auto-subscription: Configure topics in
mqtt.phpwithauto_subscribe: true - Multi-subscriber: Use
enable_multisub: trueto create multiple clients for the same topic - Shared subscriptions: Configure
enable_share_topic: truewith group names - Queue subscriptions: Enable
enable_queue_topic: truefor simple load balancing
Key environment variables:
MQTT_HOST: MQTT broker host (default: localhost)MQTT_PORT: MQTT broker port (default: 1883)MQTT_USERNAME: MQTT username (default: admin)MQTT_PASSWORD: MQTT password (default: public)MQTT_PROTOCOL_LEVEL: MQTT protocol level (default: 5)
Located in config/autoload/mqtt.php:
min_connections: Minimum pool connectionsmax_connections: Maximum pool connectionsconnect_timeout: Connection timeoutwait_timeout: Wait timeout for pool exhaustion
// Via event dispatch
$dispatcher->dispatch(new PublishEvent('topic/test', 'message', 2));
// Direct client usage
$client = make(Client::class);
$client->publish('topic/test', 'message', 2);// Via event dispatch
$event = new SubscribeEvent(topicConfigs: [
new TopicConfig(['topic' => 'topic/test', 'qos' => 2])
]);
$dispatcher->dispatch($event);
// Direct client usage
$client->subscribe(['topic/test' => ['qos' => 2]]);Configure topics in config/autoload/mqtt.php with auto_subscribe: true to automatically subscribe when the Hyperf server starts.
- PHP >= 8.3
- Hyperf Framework ~3.1
- simps/mqtt ~2.0
- Swoole >= 5.0 or Swow >= 1.5 (extensions)
The test suite requires:
- PHPUnit with co-phpunit for coroutine testing
- Hyperf testing framework
- Redis for integration tests (optional)
- MQTT broker running for integration tests