Skip to content

Commit

Permalink
try fix scan issue
Browse files Browse the repository at this point in the history
  • Loading branch information
RicYaben committed Nov 6, 2024
1 parent d2bd5a7 commit 57029a1
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 74 deletions.
4 changes: 2 additions & 2 deletions modules/mqtt/mqtt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ func (t *mqttTester) getScanner() (*Scanner, error) {

// Identifiers
flags.ClientID = "testClient" // MQTT-specific
flags.ClientRandom = "blabla" // on the TCP handshake

// Client and user
flags.SubscribeTopics = "#,$SYS/#"
flags.TopicsSeparator = ","
flags.LimitMessages = 1
flags.LimitTopics = 10
flags.UseTLS = true

// Attempt anonymous auth with
// an empty user and password as the
Expand Down Expand Up @@ -69,7 +69,7 @@ func (t *mqttTester) runTest(test *testing.T, name string) {
var tests = map[string]*mqttTester{
"success": {
addr: "test.mosquitto.org",
port: 1883,
port: 8883,
expectedStatus: zgrab2.SCAN_SUCCESS,
},
}
Expand Down
66 changes: 44 additions & 22 deletions modules/mqtt/scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (scan *scan) getTLSConfig() (*tls.Config, error) {

func (scan *scan) makeClient() (paho.Client, error) {
// TODO: implement support for web-sockets as well?
o := paho.NewClientOptions()
opts := paho.NewClientOptions()

// Add TLS
scheme := "tcp"
Expand All @@ -110,7 +110,7 @@ func (scan *scan) makeClient() (paho.Client, error) {
if err != nil {
return nil, err
}
o.SetTLSConfig(cfg)
opts.SetTLSConfig(cfg)
}

// Add broker
Expand All @@ -119,18 +119,21 @@ func (scan *scan) makeClient() (paho.Client, error) {
port = scan.target.Port
}
t := fmt.Sprintf("%s://%s:%d", scheme, scan.target.Host(), *port)
o.AddBroker(t)
opts.AddBroker(t)

// Add auth details
if scan.scanner.config.UserAuth {
o.SetUsername(scan.scanner.config.Username)
o.SetPassword(scan.scanner.config.Password)
opts.SetUsername(scan.scanner.config.Username)
opts.SetPassword(scan.scanner.config.Password)
}

o.SetClientID(scan.scanner.config.ClientID)
o.SetCleanSession(true)
o.SetOrderMatters(false)
return paho.NewClient(o), nil
// TODO: change the dialer to a zgrab2 one.
// opts.SetDialer()
opts.SetClientID(scan.scanner.config.ClientID)
opts.SetCleanSession(true)
opts.SetOrderMatters(false)
opts.SetAutoReconnect(true)
return paho.NewClient(opts), nil
}

func (scan *scan) Init() (*scan, error) {
Expand All @@ -148,7 +151,11 @@ func (scan *scan) messageHandler(msgChan chan paho.Message) func(c paho.Client,
tCount := make(map[string]int)

var mu sync.Mutex

isFull := func(t string) bool {
mu.Lock()
defer mu.Unlock()

tc, ok := tCount[t]
// if the array does not exist, check the number of topics
if !ok && (tLimit > -1 && len(tCount) >= tLimit) {
Expand All @@ -162,24 +169,32 @@ func (scan *scan) messageHandler(msgChan chan paho.Message) func(c paho.Client,
return false
}

return func(c paho.Client, m paho.Message) {
topic := m.Topic()
var addToCount = func(topic string) {
mu.Lock()
defer mu.Unlock()
tCount[topic]++
}

var addMessage = func(c paho.Client, m paho.Message) {
topic := m.Topic()
if isFull(topic) {
c.Unsubscribe(m.Topic())
mu.Unlock()
c.Unsubscribe(topic)
return
}
tCount[topic]++
mu.Unlock()

addToCount(topic)
select {
case msgChan <- m:
// sent
default:
//ignore
}
}

// We cannot block here, so call a goroutine to handle
// the message instead.
return func(c paho.Client, m paho.Message) {
go addMessage(c, m)
}
}

// Grab starts the scan
Expand All @@ -199,18 +214,26 @@ func (scan *scan) Grab() *zgrab2.ScanError {
msgs := make(chan paho.Message)
handler := scan.messageHandler(msgs)

var wg *sync.WaitGroup
wg.Add(1)
go scan.handleMessages(msgs, wg)

if t := scan.client.SubscribeMultiple(filt, handler); t.Wait() && t.Error() != nil {
return zgrab2.NewScanError(zgrab2.SCAN_CONNECTION_REFUSED, t.Error())
}

ctx, cancel := context.WithTimeout(context.Background(), scan.scanner.config.Timeout)
defer cancel()

go func() {
<-ctx.Done()
scan.client.Unsubscribe(subs...)
close(msgs)
}()
<-ctx.Done()
scan.client.Unsubscribe(subs...)
close(msgs)
wg.Wait()
return nil
}

func (scan *scan) handleMessages(msgs chan paho.Message, wg *sync.WaitGroup) {
defer wg.Done()

topics := make(map[string][]string)
for m := range msgs {
Expand All @@ -220,7 +243,6 @@ func (scan *scan) Grab() *zgrab2.ScanError {
topics[m.Topic()] = msg
}
scan.results.Topics = topics
return nil
}

// Scanner implements the zgrab2.Scanner interface.
Expand Down
48 changes: 0 additions & 48 deletions modules/mqtt/test/docker-compose.yml

This file was deleted.

2 changes: 0 additions & 2 deletions modules/mqtt/test/mosquitto.conf

This file was deleted.

0 comments on commit 57029a1

Please sign in to comment.