From e31faab8d22c4ce21800f875e238d9ab0f160f87 Mon Sep 17 00:00:00 2001 From: RicYaben Date: Fri, 18 Oct 2024 12:50:20 +0200 Subject: [PATCH] adds limits to the number of topics and the count of messages per topic --- modules/mqtt/scanner.go | 56 +++++++++++++++++++++++++++++++++-------- 1 file changed, 46 insertions(+), 10 deletions(-) diff --git a/modules/mqtt/scanner.go b/modules/mqtt/scanner.go index 17e9000e..7b7d88a8 100644 --- a/modules/mqtt/scanner.go +++ b/modules/mqtt/scanner.go @@ -9,6 +9,7 @@ import ( "fmt" "log" "strings" + "sync" "time" paho "github.com/eclipse/paho.mqtt.golang" @@ -28,6 +29,9 @@ type Flags struct { //UseWebSocket bool `long:"use-ws" description:"force use Web Sockets as the communication channel"` //UserAgent string `long:"user-agent" default:"Mozilla/5.0 zgrab/0.x" description:"Set a custom user agent"` + LimitMessages int `long:"limit-messages" description:"messages per topic, one is enough to prove read access. Default: 0; Limitless: -1;"` + LimitTopics int `long:"limit-topics" description:"number of topics to include, 100 topics cover most use cases. Default: 0; Limitless: -1;"` + SubscribeTopics string `long:"subscribe-topics" default:"#,$SYS/#" description:"list of topics to subscribe to. Defaults to wildcard all and system."` TopicsSeparator string `long:"separator" default:"," description:"subscribe topics separator"` SubscribeTimeout time.Duration `long:"wait" default:"10s" description:"time to accept messages from the subscribed topics. Defaults to 10 seconds"` @@ -140,40 +144,72 @@ func (scan *scan) Init() (*scan, error) { return scan, nil } +func (scan *scan) messageHandler(msgChan chan paho.Message) func(c paho.Client, m paho.Message) { + mLimit := scan.scanner.config.LimitMessages + tLimit := scan.scanner.config.LimitTopics + tCount := make(map[string]int) + + isFull := func(t string) bool { + tc, ok := tCount[t] + // if the array does not exist, check the number of topics + if !ok && (tLimit > -1 && len(tCount) >= tLimit) { + return true + } + + // check the number of messages in the topic + if mLimit > -1 && tc >= mLimit { + return true + } + return false + } + + var mu sync.Mutex + return func(c paho.Client, m paho.Message) { + topic := m.Topic() + mu.Lock() + if isFull(topic) { + c.Unsubscribe(m.Topic()) + return + } + tCount[topic]++ + mu.Unlock() + msgChan <- m + } +} + // Grab starts the scan func (scan *scan) Grab() *zgrab2.ScanError { if t := scan.client.Connect(); t.Wait() && t.Error() != nil { return zgrab2.NewScanError(zgrab2.SCAN_CONNECTION_REFUSED, t.Error()) } - topics := strings.Split(scan.scanner.config.SubscribeTopics, scan.scanner.config.TopicsSeparator) + subs := strings.Split(scan.scanner.config.SubscribeTopics, scan.scanner.config.TopicsSeparator) filt := make(map[string]byte) - for _, topic := range topics { + for _, topic := range subs { filt[topic] = 2 } + // Limit the number of messages we get msgs := make(chan paho.Message) - handler := func(c paho.Client, m paho.Message) { - msgs <- m - } - + handler := scan.messageHandler(msgs) if t := scan.client.SubscribeMultiple(filt, handler); t.Wait() && t.Error() != nil { return zgrab2.NewScanError(zgrab2.SCAN_CONNECTION_REFUSED, t.Error()) } go func() { time.Sleep(scan.scanner.config.SubscribeTimeout) - scan.client.Unsubscribe(topics...) + scan.client.Unsubscribe(subs...) close(msgs) }() + topics := make(map[string][]string) for m := range msgs { // handle here to addd the results to the scan - msgs := scan.results.Topics[m.Topic()] + msgs := topics[m.Topic()] msgs = append(msgs, string(m.Payload())) - scan.results.Topics[m.Topic()] = msgs + topics[m.Topic()] = msgs } - + scan.results.Topics = topics return nil }