Skip to content

Commit

Permalink
adds limits to the number of topics and the count of messages per topic
Browse files Browse the repository at this point in the history
  • Loading branch information
RicYaben committed Oct 18, 2024
1 parent d6b82ae commit e31faab
Showing 1 changed file with 46 additions and 10 deletions.
56 changes: 46 additions & 10 deletions modules/mqtt/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"log"
"strings"
"sync"
"time"

paho "github.com/eclipse/paho.mqtt.golang"
Expand All @@ -28,6 +29,9 @@ type Flags struct {
//UseWebSocket bool `long:"use-ws" description:"force use Web Sockets as the communication channel"`
//UserAgent string `long:"user-agent" default:"Mozilla/5.0 zgrab/0.x" description:"Set a custom user agent"`

LimitMessages int `long:"limit-messages" description:"messages per topic, one is enough to prove read access. Default: 0; Limitless: -1;"`
LimitTopics int `long:"limit-topics" description:"number of topics to include, 100 topics cover most use cases. Default: 0; Limitless: -1;"`

SubscribeTopics string `long:"subscribe-topics" default:"#,$SYS/#" description:"list of topics to subscribe to. Defaults to wildcard all and system."`
TopicsSeparator string `long:"separator" default:"," description:"subscribe topics separator"`
SubscribeTimeout time.Duration `long:"wait" default:"10s" description:"time to accept messages from the subscribed topics. Defaults to 10 seconds"`
Expand Down Expand Up @@ -140,40 +144,72 @@ func (scan *scan) Init() (*scan, error) {
return scan, nil
}

func (scan *scan) messageHandler(msgChan chan paho.Message) func(c paho.Client, m paho.Message) {
mLimit := scan.scanner.config.LimitMessages
tLimit := scan.scanner.config.LimitTopics
tCount := make(map[string]int)

isFull := func(t string) bool {
tc, ok := tCount[t]
// if the array does not exist, check the number of topics
if !ok && (tLimit > -1 && len(tCount) >= tLimit) {
return true
}

// check the number of messages in the topic
if mLimit > -1 && tc >= mLimit {
return true
}
return false
}

var mu sync.Mutex
return func(c paho.Client, m paho.Message) {
topic := m.Topic()
mu.Lock()
if isFull(topic) {
c.Unsubscribe(m.Topic())
return
}
tCount[topic]++
mu.Unlock()
msgChan <- m
}
}

// Grab starts the scan
func (scan *scan) Grab() *zgrab2.ScanError {
if t := scan.client.Connect(); t.Wait() && t.Error() != nil {
return zgrab2.NewScanError(zgrab2.SCAN_CONNECTION_REFUSED, t.Error())
}

topics := strings.Split(scan.scanner.config.SubscribeTopics, scan.scanner.config.TopicsSeparator)
subs := strings.Split(scan.scanner.config.SubscribeTopics, scan.scanner.config.TopicsSeparator)
filt := make(map[string]byte)
for _, topic := range topics {
for _, topic := range subs {
filt[topic] = 2
}

// Limit the number of messages we get
msgs := make(chan paho.Message)
handler := func(c paho.Client, m paho.Message) {
msgs <- m
}

handler := scan.messageHandler(msgs)
if t := scan.client.SubscribeMultiple(filt, handler); t.Wait() && t.Error() != nil {
return zgrab2.NewScanError(zgrab2.SCAN_CONNECTION_REFUSED, t.Error())
}

go func() {
time.Sleep(scan.scanner.config.SubscribeTimeout)
scan.client.Unsubscribe(topics...)
scan.client.Unsubscribe(subs...)
close(msgs)
}()

topics := make(map[string][]string)
for m := range msgs {
// handle here to addd the results to the scan
msgs := scan.results.Topics[m.Topic()]
msgs := topics[m.Topic()]
msgs = append(msgs, string(m.Payload()))
scan.results.Topics[m.Topic()] = msgs
topics[m.Topic()] = msgs
}

scan.results.Topics = topics
return nil
}

Expand Down

0 comments on commit e31faab

Please sign in to comment.