Skip to content

Channel mqtt#1706

Open
avaksru wants to merge 10 commits intosipeed:mainfrom
avaksru:channel_mqtt
Open

Channel mqtt#1706
avaksru wants to merge 10 commits intosipeed:mainfrom
avaksru:channel_mqtt

Conversation

@avaksru
Copy link
Contributor

@avaksru avaksru commented Mar 17, 2026

📝 Description

Implemented periodic MQTT connection health check with automatic reconnection. Added startHealthCheck() method that monitors connection status every 30 seconds and automatically reconnects when connection is lost. The system continues to operate normally even when the MQTT server is unavailable, without interfering with other application functionality.

🗣️ Type of Change

  • [✅] ✨ New feature (non-breaking change which adds functionality)
  • [ ✅] 📖 Documentation update

🤖 AI Code Generation

  • 🤖 Fully AI-generated (100% AI, 0% Human)
  • 🛠️ Mostly AI-generated (AI draft, Human verified/modified)
  • [✅] 👨‍💻 Mostly Human-written (Human lead, AI assisted or none)

🔗 Related Issue

No related issue

📚 Technical Context (Skip for Docs)

  • Reference URL: pkg/channels/mqtt/mqtt.go
  • Reasoning: Added periodic connection health check function to ensure reliable MQTT connection

🧪 Test Environment

  • Hardware: Any hardware supporting Go
  • OS: Any OS supporting Go (Linux, Windows, macOS)
  • Model/Provider: N/A (not required)
  • Channels: MQTT

📸 Evidence (Optional)

Click to view Logs/Screenshots image

Key code changes:

  1. startHealthCheck() method - added to pkg/channels/mqtt/mqtt.go:
func (c *MQTTChannel) startHealthCheck() {
    ticker := time.NewTicker(30 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-c.ctx.Done():
            return
        case <-ticker.C:
            if c.client != nil && c.client.IsConnected() {
                // Connection is healthy
                continue
            }
            
            // Connection is lost, attempt to reconnect
            logger.WarnC("mqtt", "MQTT connection lost, attempting to reconnect")
            
            // Try to reconnect
            if token := c.client.Connect(); token.Wait() && token.Error() != nil {
                logger.ErrorCF("mqtt", "Failed to reconnect to MQTT broker", map[string]any{
                    "error": token.Error(),
                })
            } else {
                logger.InfoC("mqtt", "Successfully reconnected to MQTT broker")
            }
        }
    }
}
  1. Updated Start() method - added health check call and connection handlers:
// Set connection lost handler
opts.SetConnectionLostHandler(func(client mqtt.Client, err error) {
    logger.ErrorCF("mqtt", "Connection lost", map[string]any{
        "error": err,
    })
    // Connection will be automatically reconnected by the client
})

// Set connect handler
opts.SetOnConnectHandler(func(client mqtt.Client) {
    logger.InfoC("mqtt", "Connected to MQTT broker")
    
    // Subscribe to topics after successful connection
    for _, topic := range c.config.SubscribeTopics {
        if token := c.client.Subscribe(topic, byte(c.config.QoS), nil); token.Wait() && token.Error() != nil {
            logger.ErrorCF("mqtt", "Failed to subscribe to topic", map[string]any{
                "topic": topic,
                "error": token.Error(),
            })
        } else {
            logger.InfoCF("mqtt", "Subscribed to topic", map[string]any{
                "topic": topic,
            })
        }
    }
})

// Start periodic connection health check
go c.startHealthCheck()

Implementation features:

  • Connection check every 30 seconds
  • Automatic reconnection when connection is lost
  • Topic subscription after each successful connection
  • Detailed logging of all events
  • Non-disruptive operation - doesn't interfere with other application functions when server is unavailable

☑️ Checklist

  • [✅] My code/docs follow the style of this project.
  • [✅] I have performed a self-review of my own changes.
  • [✅] I have updated the documentation accordingly.

Ready to switch to Act mode to apply these changes to the documentation if needed.

@CLAassistant
Copy link

CLAassistant commented Mar 17, 2026

CLA assistant check
All committers have signed the CLA.

@sipeed-bot sipeed-bot bot added type: enhancement New feature or request domain: channel go Pull requests that update go code dependencies Pull requests that update a dependency file labels Mar 17, 2026
Copy link
Contributor

@horsley horsley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TLS Configuration (Lines 106-118): The variables TLSCA, TLSCert, and TLSKey are checked but never actually used. The code has empty comment blocks where the certificate loading logic should be implemented. You need to implement actual certificate loading using tls.LoadX509KeyPair or similar.

Copy link
Contributor

@horsley horsley left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant Health Check (Lines 206-232): The paho.mqtt.golang library already has built-in automatic reconnection. Looking at lines 79-84, you set SetAutoReconnect(true), SetConnectRetry(true), SetConnectRetryInterval(5s), and SetMaxReconnectInterval(5m). The manual reconnection logic in startHealthCheck() is redundant and may cause race conditions with the library's internal reconnection mechanism. Consider removing the startHealthCheck function or at least the manual reconnection code.

// TLS configuration
if c.config.TLS {
tlsConfig := &tls.Config{
InsecureSkipVerify: false,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue 1: Incomplete TLS Implementation

The TLS certificate loading code (around line 86) has comments but no actual implementation. The code creates a tls.Config but never loads the certificates from the configured paths (TLSCA, TLSCert, TLSKey). Consider implementing the certificate loading logic.

}

// startHealthCheck starts a periodic health check for the MQTT connection.
func (c *MQTTChannel) startHealthCheck() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue 2: Redundant Health Check

The paho.mqtt library already has built-in automatic reconnection with the options SetAutoReconnect(true), SetConnectRetry(true), SetMaxReconnectInterval(5 * time.Minute) configured. The manual health check (startHealthCheck) and reconnection logic may cause race conditions with the library's internal reconnection mechanism. Consider removing the manual health check and relying solely on the library's built-in reconnection.

@horsley
Copy link
Contributor

horsley commented Mar 18, 2026

@avaksru LGTM, try rebase main branch to resolve conflicts :)

@yinwm
Copy link
Collaborator

yinwm commented Mar 18, 2026

@avaksru plz resolve conflicts

@yinwm
Copy link
Collaborator

yinwm commented Mar 19, 2026

@avaksru there's still has conflicts, plz resolve conflicts

@avaksru avaksru force-pushed the channel_mqtt branch 5 times, most recently from fcaef36 to 9aa2361 Compare March 20, 2026 13:39
- Fix pkg/gateway/gateway.go: remove unnecessary leading/trailing newlines and fix gofumpt formatting
- Fix pkg/config/config.go: reformat long struct tags to meet golines 120-char limit
- Fix pkg/channels/mqtt/mqtt.go: fix gofmt formatting
- Fix gci import ordering across pkg/config/ and pkg/channels/mqtt/ packages

All linter errors resolved: 0 issues remaining
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

dependencies Pull requests that update a dependency file domain: channel go Pull requests that update go code type: enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants