Skip to content

Commit

Permalink
recover from mqtt client panics
Browse files Browse the repository at this point in the history
  • Loading branch information
RicYaben committed Nov 15, 2024
1 parent 4431a6a commit cf600ac
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 17 deletions.
47 changes: 30 additions & 17 deletions modules/mqtt/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ func (s *scan) getClientOptions() (*paho.ClientOptions, error) {
opts := paho.NewClientOptions().
SetClientID(s.scanner.config.ClientID).
SetCleanSession(true).
SetAutoReconnect(true)
SetAutoReconnect(true).
SetOrderMatters(false)

switch s.scheme {
case "ssl":
Expand Down Expand Up @@ -99,7 +100,11 @@ func (s *scan) makeMessageHandler() func(c paho.Client, m paho.Message) {
tLimit := s.scanner.config.LimitTopics
tCount := make(map[string]int)

var mu sync.Mutex
var isFull = func(topic string) bool {
mu.Lock()
defer mu.Unlock()

tc, ok := tCount[topic]
// if the array does not exist, check the number of topics
if !ok && (tLimit > -1 && len(tCount) >= tLimit) {
Expand All @@ -113,17 +118,10 @@ func (s *scan) makeMessageHandler() func(c paho.Client, m paho.Message) {
return false
}

var mu sync.Mutex
var handler = func(c paho.Client, m paho.Message) {
mu.Lock()
defer mu.Unlock()

topic := m.Topic()
if isFull(topic) {
if t := c.Unsubscribe(topic); t.Wait() && t.Error() != nil {
// ignore
return
}
// ignore the message
return
}

Expand All @@ -140,12 +138,23 @@ func (s *scan) wait(client paho.Client) {
ctx, cancel := context.WithTimeout(context.Background(), s.scanner.config.Timeout)
defer cancel()

// timeout, finish now
<-ctx.Done()
// do **not** wait for the unsubscribe to return anything
client.Unsubscribe(s.topics...)
client.Disconnect(250)
}

func (s *scan) Grab() *zgrab2.ScanError {
defer func() {
// Stop panic
// The paho client tends to panic on:
// github.com/eclipse/paho%2emqtt%2egolang.startIncomingComms.func1()
// ...github.com/eclipse/[email protected]/net.go:212 +0x101d
if r := recover(); r != nil {
s.result.Error = r
}
}()

options, err := s.getClientOptions()
if err != nil {
return zgrab2.NewScanError(zgrab2.SCAN_APPLICATION_ERROR, err)
Expand All @@ -155,15 +164,19 @@ func (s *scan) Grab() *zgrab2.ScanError {
if t := client.Connect(); t.Wait() && t.Error() != nil {
return zgrab2.NewScanError(zgrab2.SCAN_CONNECTION_REFUSED, t.Error())
}

s.SetFilters()
handler := s.makeMessageHandler()
if t := client.SubscribeMultiple(s.filters, handler); t.Wait() && t.Error() != nil {
return zgrab2.NewScanError(zgrab2.SCAN_CONNECTION_REFUSED, t.Error())
}
defer client.Disconnect(250)

var subErr *zgrab2.ScanError
go func() {
s.SetFilters()
handler := s.makeMessageHandler()
if t := client.SubscribeMultiple(s.filters, handler); t.Wait() && t.Error() != nil {
subErr = zgrab2.NewScanError(zgrab2.SCAN_CONNECTION_REFUSED, t.Error())
}
}()

s.wait(client)
return nil
return subErr
}

type ScanBuilder struct {
Expand Down
1 change: 1 addition & 0 deletions modules/mqtt/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ type Result struct {
Topics map[string][]string `json:"topics,omitempty"`
Certificates [][]byte `json:"certificate,omitempty"`
Scheme string `json:"scheme"`
Error any `json:"error,omitempty"`
}

// Scanner implements the zgrab2.Scanner interface.
Expand Down

0 comments on commit cf600ac

Please sign in to comment.