Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mqtt support #950

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ Have any feedback or questions? [Create a discussion](https://github.com/TwiN/ga
- [Monitoring a WebSocket endpoint](#monitoring-a-websocket-endpoint)
- [Monitoring an endpoint using ICMP](#monitoring-an-endpoint-using-icmp)
- [Monitoring an endpoint using DNS queries](#monitoring-an-endpoint-using-dns-queries)
- [Monitoring an endpoint using MQTT](#monitoring-an-endpoint-using-mqtt)
- [Monitoring an endpoint using SSH](#monitoring-an-endpoint-using-ssh)
- [Monitoring an endpoint using STARTTLS](#monitoring-an-endpoint-using-starttls)
- [Monitoring an endpoint using TLS](#monitoring-an-endpoint-using-tls)
Expand Down Expand Up @@ -2037,6 +2038,28 @@ There are two placeholders that can be used in the conditions for endpoints of t
`NOERROR`, `FORMERR`, `SERVFAIL`, `NXDOMAIN`, etc.


### Monitoring an endpoint using MQTT
Defining an `mqtt` configuration in an endpoint will automatically mark said endpoint as an endpoint of type MQTT:
```yaml
endpoints:
- name: example-mqtt-query
url: "wss://example.com/mqtt"
mqtt:
topic: "my_topic"
username: "username" # Optional
password: "password" # Optional
body: "gatus check - {{ uuidv4 }}"
conditions:
- "[CONNECTED] == true"
```

The body can be plain text or a text/template. If it is a text/template, the following functions are available:
- `uuidv4` returns a UUID v4 universally unique ID

The following placeholders are supported for endpoints of type MQTT:
- `[CONNECTED]` resolves to `true` if the MQTT message was published and the same message was consumed, `false` otherwise


### Monitoring an endpoint using SSH
You can monitor endpoints using SSH by prefixing `endpoints[].url` with `ssh://`:
```yaml
Expand Down
62 changes: 62 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package client

import (
"bytes"
"crypto/tls"
"crypto/x509"
"encoding/json"
Expand All @@ -11,10 +12,13 @@ import (
"net/smtp"
"runtime"
"strings"
"text/template"
"time"

"github.com/TwiN/gocache/v2"
"github.com/TwiN/whois"
"github.com/eclipse/paho.mqtt.golang"
"github.com/google/uuid"
"github.com/ishidawataru/sctp"
"github.com/miekg/dns"
ping "github.com/prometheus-community/pro-bing"
Expand All @@ -32,6 +36,12 @@ var (

whoisClient = whois.NewClient().WithReferralCache(true)
whoisExpirationDateCache = gocache.NewCache().WithMaxSize(10000).WithDefaultTTL(24 * time.Hour)

mqttTemplateEngine = template.New("base").Funcs(template.FuncMap{
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only think I'm unsure about is the fact that this uses a templating engine, while everything else leverages placeholder and functions. Maybe this should be exposed as a placeholder instead? I feel supporting [UUID4] globally (and not just for MQTT) would be more beneficial, albeit more complicated.

The UUID4 placeholder should also be unique per endpoint, meaning that if it's in the url and the body, it should be the same. Let me know if that doesn't make sense, I can give you some examples

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. I will look around and see if I can just figure it out.

"uuidv4": func() string {
return uuid.New().String()
},
})
)

// GetHTTPClient returns the shared HTTP client, or the client from the configuration passed
Expand Down Expand Up @@ -332,6 +342,58 @@ func QueryDNS(queryType, queryName, url string) (connected bool, dnsRcode string
return connected, dnsRcode, body, nil
}

func QueryMQTT(address, topic, username, password, body string, config *Config) (bool, error) {
bodyTemplate, err := mqttTemplateEngine.Parse(body)
if err != nil {
return false, fmt.Errorf("error parsing mqtt request body: %w", err)
}

var renderedBodyBuffer bytes.Buffer
err = bodyTemplate.Execute(&renderedBodyBuffer, nil)
if err != nil {
return false, fmt.Errorf("error rendering mqtt request body: %w", err)
}
renderedBody := renderedBodyBuffer.String()

opts := mqtt.NewClientOptions()
opts.AddBroker(address)
opts.SetClientID(fmt.Sprintf("gatus-client-%d", time.Now().UnixMilli()))
if len(username) > 0 {
opts.SetUsername(username)
}
if len(password) > 0 {
opts.SetPassword(password)
}
client := mqtt.NewClient(opts)
if token := client.Connect(); token.WaitTimeout(config.Timeout) && token.Error() != nil {
return false, fmt.Errorf("error connecting to mqtt: %w", token.Error())
}
defer client.Disconnect(1)

done := make(chan struct{})
defer close(done)

if token := client.Subscribe(topic, 0, func(client mqtt.Client, message mqtt.Message) {
message.Ack()
if string(message.Payload()) == renderedBody {
done <- struct{}{}
}
}); token.WaitTimeout(config.Timeout) && token.Error() != nil {
return false, fmt.Errorf("error subscribing to mqtt topic: %w", token.Error())
}

if token := client.Publish(topic, 0, false, renderedBody); token.WaitTimeout(config.Timeout) && token.Error() != nil {
return false, fmt.Errorf("error publishing to mqtt topic: %w", token.Error())
}

select {
case <-done:
return true, nil
case <-time.After(config.Timeout):
return false, fmt.Errorf("timout while waiting for mqtt message: %w")
}
}

// InjectHTTPClient is used to inject a custom HTTP client for testing purposes
func InjectHTTPClient(httpClient *http.Client) {
injectedHTTPClient = httpClient
Expand Down
17 changes: 17 additions & 0 deletions config/endpoint/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/TwiN/gatus/v5/alerting/alert"
"github.com/TwiN/gatus/v5/client"
"github.com/TwiN/gatus/v5/config/endpoint/dns"
"github.com/TwiN/gatus/v5/config/endpoint/mqtt"
sshconfig "github.com/TwiN/gatus/v5/config/endpoint/ssh"
"github.com/TwiN/gatus/v5/config/endpoint/ui"
"golang.org/x/crypto/ssh"
Expand Down Expand Up @@ -46,6 +47,7 @@ const (
TypeHTTP Type = "HTTP"
TypeWS Type = "WEBSOCKET"
TypeSSH Type = "SSH"
TypeMQTT Type = "MQTT"
TypeUNKNOWN Type = "UNKNOWN"
)

Expand Down Expand Up @@ -110,6 +112,9 @@ type Endpoint struct {
// SSH is the configuration for SSH monitoring
SSHConfig *sshconfig.Config `yaml:"ssh,omitempty"`

// MQTT is the configuration for MQTT monitoring
MQTTConfig *mqtt.Config `yaml:"mqtt,omitempty"`

// ClientConfig is the configuration of the client used to communicate with the endpoint's target
ClientConfig *client.Config `yaml:"client,omitempty"`

Expand All @@ -136,6 +141,8 @@ func (e *Endpoint) Type() Type {
switch {
case e.DNSConfig != nil:
return TypeDNS
case e.MQTTConfig != nil:
return TypeMQTT
case strings.HasPrefix(e.URL, "tcp://"):
return TypeTCP
case strings.HasPrefix(e.URL, "sctp://"):
Expand Down Expand Up @@ -216,6 +223,9 @@ func (e *Endpoint) ValidateAndSetDefaults() error {
if e.SSHConfig != nil {
return e.SSHConfig.Validate()
}
if e.MQTTConfig != nil {
return e.MQTTConfig.Validate()
}
if e.Type() == TypeUNKNOWN {
return ErrUnknownEndpointType
}
Expand Down Expand Up @@ -375,6 +385,13 @@ func (e *Endpoint) call(result *Result) {
return
}
result.Duration = time.Since(startTime)
} else if endpointType == TypeMQTT {
result.Connected, err = client.QueryMQTT(e.URL, e.MQTTConfig.Topic, e.MQTTConfig.Username, e.MQTTConfig.Password, e.Body, e.ClientConfig)
if err != nil {
result.AddError(err.Error())
return
}
result.Duration = time.Since(startTime)
} else {
response, err = client.GetHTTPClient(e.ClientConfig).Do(request)
result.Duration = time.Since(startTime)
Expand Down
104 changes: 101 additions & 3 deletions config/endpoint/endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/tls"
"crypto/x509"
"errors"
"github.com/TwiN/gatus/v5/config/endpoint/mqtt"
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be separate from the standard library dependencies, namely, at the bottom. See other go files for example.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know/ping me when you resolved it 👀

"io"
"net/http"
"strings"
Expand Down Expand Up @@ -280,9 +281,10 @@ func TestEndpoint_IsEnabled(t *testing.T) {

func TestEndpoint_Type(t *testing.T) {
type args struct {
URL string
DNS *dns.Config
SSH *ssh.Config
URL string
DNS *dns.Config
SSH *ssh.Config
MQTT *mqtt.Config
}
tests := []struct {
args args
Expand All @@ -298,6 +300,15 @@ func TestEndpoint_Type(t *testing.T) {
},
want: TypeDNS,
},
{
args: args{
URL: "wss://example.com/mqtt",
MQTT: &mqtt.Config{
Topic: "my_topic",
},
},
want: TypeMQTT,
},
{
args: args{
URL: "tcp://127.0.0.1:6379",
Expand Down Expand Up @@ -540,6 +551,57 @@ func TestEndpoint_ValidateAndSetDefaultsWithSSH(t *testing.T) {
}
}

func TestEndpoint_ValidateAndSetDefaultsWithMQTT(t *testing.T) {
scenarios := []struct {
name string
topic string
username string
password string
expectedErr error
}{
{
name: "fail when has no topic",
topic: "",
username: "",
password: "",
expectedErr: mqtt.ErrEndpointWithoutMQTTTopic,
},
{
name: "success when only topic is set",
topic: "my_topic",
username: "",
password: "",
expectedErr: nil,
},
{
name: "success when all fields are set",
topic: "my_topic",
username: "username",
password: "password",
expectedErr: nil,
},
}

for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {
endpoint := &Endpoint{
Name: "mqtt-test",
URL: "https://example.com",
MQTTConfig: &mqtt.Config{
Topic: scenario.topic,
Username: scenario.username,
Password: scenario.password,
},
Conditions: []Condition{Condition("[STATUS] == 0")},
}
err := endpoint.ValidateAndSetDefaults()
if !errors.Is(err, scenario.expectedErr) {
t.Errorf("expected error %v, got %v", scenario.expectedErr, err)
}
})
}
}

func TestEndpoint_ValidateAndSetDefaultsWithSimpleErrors(t *testing.T) {
scenarios := []struct {
endpoint *Endpoint
Expand Down Expand Up @@ -787,6 +849,42 @@ func TestIntegrationEvaluateHealthForDNS(t *testing.T) {
}
}

func TestIntegrationEvaluateHealthForMQTT(t *testing.T) {
scenarios := []struct {
name string
endpoint Endpoint
conditions []Condition
success bool
}{
{
name: "mqtt-failure",
endpoint: Endpoint{
Name: "mqtt-failure",
URL: "wss://example.com/mqtt",
MQTTConfig: &mqtt.Config{
Topic: "my_topic",
Username: "gatus",
Password: "",
},
Body: "This is a test: {{ uuidv4 }}",
},
conditions: []Condition{Condition("[CONNECTED] == true")},
success: false,
},
}

for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {
scenario.endpoint.ValidateAndSetDefaults()
scenario.endpoint.Conditions = scenario.conditions
result := scenario.endpoint.EvaluateHealth()
if result.Success != scenario.success {
t.Errorf("Expected success to be %v, but was %v", scenario.success, result.Success)
}
})
}
}

func TestIntegrationEvaluateHealthForSSH(t *testing.T) {
scenarios := []struct {
name string
Expand Down
24 changes: 24 additions & 0 deletions config/endpoint/mqtt/mqtt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package mqtt

import (
"errors"
)

var (
// ErrEndpointWithoutMQTTTopic is the error with which Gatus will panic if an endpoint with MQTT monitoring is configured without a topic.
ErrEndpointWithoutMQTTTopic = errors.New("you must specify a topic for each MQTT endpoint")
)

type Config struct {
Topic string `yaml:"topic,omitempty"`
Username string `yaml:"username,omitempty"`
Password string `yaml:"password,omitempty"`
}

// Validate the SSH configuration
func (cfg *Config) Validate() error {
if len(cfg.Topic) == 0 {
return ErrEndpointWithoutMQTTTopic
}
return nil
}
23 changes: 23 additions & 0 deletions config/endpoint/mqtt/mqtt_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package mqtt

import (
"errors"
"testing"
)

func TestMQTT_validate(t *testing.T) {
cfg := &Config{}
if err := cfg.Validate(); err == nil {
t.Error("expected an error")
} else if !errors.Is(err, ErrEndpointWithoutMQTTTopic) {
t.Errorf("expected error to be '%v', got '%v'", ErrEndpointWithoutMQTTTopic, err)
}
cfg.Username = "username"
if err := cfg.Validate(); err != nil {
t.Errorf("expected no error, got '%v'", err)
}
cfg.Password = "password"
if err := cfg.Validate(); err != nil {
t.Errorf("expected no error, got '%v'", err)
}
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/TwiN/whois v1.1.9
github.com/aws/aws-sdk-go v1.55.5
github.com/coreos/go-oidc/v3 v3.11.0
github.com/eclipse/paho.mqtt.golang v1.5.0
github.com/gofiber/fiber/v2 v2.52.5
github.com/google/go-github/v48 v48.2.0
github.com/google/uuid v1.6.0
Expand Down Expand Up @@ -50,6 +51,7 @@ require (
github.com/google/s2a-go v0.1.8 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect
github.com/googleapis/gax-go/v2 v2.14.0 // indirect
github.com/gorilla/websocket v1.5.3 // indirect
github.com/hashicorp/go-version v1.6.0 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
Expand Down
Loading