diff --git a/README.md b/README.md index 312116f55d..301f9f1662 100644 --- a/README.md +++ b/README.md @@ -99,6 +99,7 @@ Have any feedback or questions? [Create a discussion](https://github.com/TwiN/ga - [Monitoring a WebSocket endpoint](#monitoring-a-websocket-endpoint) - [Monitoring an endpoint using ICMP](#monitoring-an-endpoint-using-icmp) - [Monitoring an endpoint using DNS queries](#monitoring-an-endpoint-using-dns-queries) + - [Monitoring an endpoint using MQTT](#monitoring-an-endpoint-using-mqtt) - [Monitoring an endpoint using SSH](#monitoring-an-endpoint-using-ssh) - [Monitoring an endpoint using STARTTLS](#monitoring-an-endpoint-using-starttls) - [Monitoring an endpoint using TLS](#monitoring-an-endpoint-using-tls) @@ -2037,6 +2038,28 @@ There are two placeholders that can be used in the conditions for endpoints of t `NOERROR`, `FORMERR`, `SERVFAIL`, `NXDOMAIN`, etc. +### Monitoring an endpoint using MQTT +Defining an `mqtt` configuration in an endpoint will automatically mark said endpoint as an endpoint of type MQTT: +```yaml +endpoints: + - name: example-mqtt-query + url: "wss://example.com/mqtt" + mqtt: + topic: "my_topic" + username: "username" # Optional + password: "password" # Optional + body: "gatus check - {{ uuidv4 }}" + conditions: + - "[CONNECTED] == true" +``` + +The body can be plain text or a text/template. If it is a text/template, the following functions are available: +- `uuidv4` returns a UUID v4 universally unique ID + +The following placeholders are supported for endpoints of type MQTT: +- `[CONNECTED]` resolves to `true` if the MQTT message was published and the same message was consumed, `false` otherwise + + ### Monitoring an endpoint using SSH You can monitor endpoints using SSH by prefixing `endpoints[].url` with `ssh://`: ```yaml diff --git a/client/client.go b/client/client.go index a7bb18da13..0853876185 100644 --- a/client/client.go +++ b/client/client.go @@ -1,6 +1,7 @@ package client import ( + "bytes" "crypto/tls" "crypto/x509" "encoding/json" @@ -11,10 +12,13 @@ import ( "net/smtp" "runtime" "strings" + "text/template" "time" "github.com/TwiN/gocache/v2" "github.com/TwiN/whois" + "github.com/eclipse/paho.mqtt.golang" + "github.com/google/uuid" "github.com/ishidawataru/sctp" "github.com/miekg/dns" ping "github.com/prometheus-community/pro-bing" @@ -32,6 +36,12 @@ var ( whoisClient = whois.NewClient().WithReferralCache(true) whoisExpirationDateCache = gocache.NewCache().WithMaxSize(10000).WithDefaultTTL(24 * time.Hour) + + mqttTemplateEngine = template.New("base").Funcs(template.FuncMap{ + "uuidv4": func() string { + return uuid.New().String() + }, + }) ) // GetHTTPClient returns the shared HTTP client, or the client from the configuration passed @@ -332,6 +342,58 @@ func QueryDNS(queryType, queryName, url string) (connected bool, dnsRcode string return connected, dnsRcode, body, nil } +func QueryMQTT(address, topic, username, password, body string, config *Config) (bool, error) { + bodyTemplate, err := mqttTemplateEngine.Parse(body) + if err != nil { + return false, fmt.Errorf("error parsing mqtt request body: %w", err) + } + + var renderedBodyBuffer bytes.Buffer + err = bodyTemplate.Execute(&renderedBodyBuffer, nil) + if err != nil { + return false, fmt.Errorf("error rendering mqtt request body: %w", err) + } + renderedBody := renderedBodyBuffer.String() + + opts := mqtt.NewClientOptions() + opts.AddBroker(address) + opts.SetClientID(fmt.Sprintf("gatus-client-%d", time.Now().UnixMilli())) + if len(username) > 0 { + opts.SetUsername(username) + } + if len(password) > 0 { + opts.SetPassword(password) + } + client := mqtt.NewClient(opts) + if token := client.Connect(); token.WaitTimeout(config.Timeout) && token.Error() != nil { + return false, fmt.Errorf("error connecting to mqtt: %w", token.Error()) + } + defer client.Disconnect(1) + + done := make(chan struct{}) + defer close(done) + + if token := client.Subscribe(topic, 0, func(client mqtt.Client, message mqtt.Message) { + message.Ack() + if string(message.Payload()) == renderedBody { + done <- struct{}{} + } + }); token.WaitTimeout(config.Timeout) && token.Error() != nil { + return false, fmt.Errorf("error subscribing to mqtt topic: %w", token.Error()) + } + + if token := client.Publish(topic, 0, false, renderedBody); token.WaitTimeout(config.Timeout) && token.Error() != nil { + return false, fmt.Errorf("error publishing to mqtt topic: %w", token.Error()) + } + + select { + case <-done: + return true, nil + case <-time.After(config.Timeout): + return false, fmt.Errorf("timout while waiting for mqtt message: %w") + } +} + // InjectHTTPClient is used to inject a custom HTTP client for testing purposes func InjectHTTPClient(httpClient *http.Client) { injectedHTTPClient = httpClient diff --git a/config/endpoint/endpoint.go b/config/endpoint/endpoint.go index ac765c1a5b..0cc14f4957 100644 --- a/config/endpoint/endpoint.go +++ b/config/endpoint/endpoint.go @@ -16,6 +16,7 @@ import ( "github.com/TwiN/gatus/v5/alerting/alert" "github.com/TwiN/gatus/v5/client" "github.com/TwiN/gatus/v5/config/endpoint/dns" + "github.com/TwiN/gatus/v5/config/endpoint/mqtt" sshconfig "github.com/TwiN/gatus/v5/config/endpoint/ssh" "github.com/TwiN/gatus/v5/config/endpoint/ui" "golang.org/x/crypto/ssh" @@ -46,6 +47,7 @@ const ( TypeHTTP Type = "HTTP" TypeWS Type = "WEBSOCKET" TypeSSH Type = "SSH" + TypeMQTT Type = "MQTT" TypeUNKNOWN Type = "UNKNOWN" ) @@ -110,6 +112,9 @@ type Endpoint struct { // SSH is the configuration for SSH monitoring SSHConfig *sshconfig.Config `yaml:"ssh,omitempty"` + // MQTT is the configuration for MQTT monitoring + MQTTConfig *mqtt.Config `yaml:"mqtt,omitempty"` + // ClientConfig is the configuration of the client used to communicate with the endpoint's target ClientConfig *client.Config `yaml:"client,omitempty"` @@ -136,6 +141,8 @@ func (e *Endpoint) Type() Type { switch { case e.DNSConfig != nil: return TypeDNS + case e.MQTTConfig != nil: + return TypeMQTT case strings.HasPrefix(e.URL, "tcp://"): return TypeTCP case strings.HasPrefix(e.URL, "sctp://"): @@ -216,6 +223,9 @@ func (e *Endpoint) ValidateAndSetDefaults() error { if e.SSHConfig != nil { return e.SSHConfig.Validate() } + if e.MQTTConfig != nil { + return e.MQTTConfig.Validate() + } if e.Type() == TypeUNKNOWN { return ErrUnknownEndpointType } @@ -375,6 +385,13 @@ func (e *Endpoint) call(result *Result) { return } result.Duration = time.Since(startTime) + } else if endpointType == TypeMQTT { + result.Connected, err = client.QueryMQTT(e.URL, e.MQTTConfig.Topic, e.MQTTConfig.Username, e.MQTTConfig.Password, e.Body, e.ClientConfig) + if err != nil { + result.AddError(err.Error()) + return + } + result.Duration = time.Since(startTime) } else { response, err = client.GetHTTPClient(e.ClientConfig).Do(request) result.Duration = time.Since(startTime) diff --git a/config/endpoint/endpoint_test.go b/config/endpoint/endpoint_test.go index c2d434b01a..d128d7605e 100644 --- a/config/endpoint/endpoint_test.go +++ b/config/endpoint/endpoint_test.go @@ -5,6 +5,7 @@ import ( "crypto/tls" "crypto/x509" "errors" + "github.com/TwiN/gatus/v5/config/endpoint/mqtt" "io" "net/http" "strings" @@ -280,9 +281,10 @@ func TestEndpoint_IsEnabled(t *testing.T) { func TestEndpoint_Type(t *testing.T) { type args struct { - URL string - DNS *dns.Config - SSH *ssh.Config + URL string + DNS *dns.Config + SSH *ssh.Config + MQTT *mqtt.Config } tests := []struct { args args @@ -298,6 +300,15 @@ func TestEndpoint_Type(t *testing.T) { }, want: TypeDNS, }, + { + args: args{ + URL: "wss://example.com/mqtt", + MQTT: &mqtt.Config{ + Topic: "my_topic", + }, + }, + want: TypeMQTT, + }, { args: args{ URL: "tcp://127.0.0.1:6379", @@ -540,6 +551,57 @@ func TestEndpoint_ValidateAndSetDefaultsWithSSH(t *testing.T) { } } +func TestEndpoint_ValidateAndSetDefaultsWithMQTT(t *testing.T) { + scenarios := []struct { + name string + topic string + username string + password string + expectedErr error + }{ + { + name: "fail when has no topic", + topic: "", + username: "", + password: "", + expectedErr: mqtt.ErrEndpointWithoutMQTTTopic, + }, + { + name: "success when only topic is set", + topic: "my_topic", + username: "", + password: "", + expectedErr: nil, + }, + { + name: "success when all fields are set", + topic: "my_topic", + username: "username", + password: "password", + expectedErr: nil, + }, + } + + for _, scenario := range scenarios { + t.Run(scenario.name, func(t *testing.T) { + endpoint := &Endpoint{ + Name: "mqtt-test", + URL: "https://example.com", + MQTTConfig: &mqtt.Config{ + Topic: scenario.topic, + Username: scenario.username, + Password: scenario.password, + }, + Conditions: []Condition{Condition("[STATUS] == 0")}, + } + err := endpoint.ValidateAndSetDefaults() + if !errors.Is(err, scenario.expectedErr) { + t.Errorf("expected error %v, got %v", scenario.expectedErr, err) + } + }) + } +} + func TestEndpoint_ValidateAndSetDefaultsWithSimpleErrors(t *testing.T) { scenarios := []struct { endpoint *Endpoint @@ -787,6 +849,42 @@ func TestIntegrationEvaluateHealthForDNS(t *testing.T) { } } +func TestIntegrationEvaluateHealthForMQTT(t *testing.T) { + scenarios := []struct { + name string + endpoint Endpoint + conditions []Condition + success bool + }{ + { + name: "mqtt-failure", + endpoint: Endpoint{ + Name: "mqtt-failure", + URL: "wss://example.com/mqtt", + MQTTConfig: &mqtt.Config{ + Topic: "my_topic", + Username: "gatus", + Password: "", + }, + Body: "This is a test: {{ uuidv4 }}", + }, + conditions: []Condition{Condition("[CONNECTED] == true")}, + success: false, + }, + } + + for _, scenario := range scenarios { + t.Run(scenario.name, func(t *testing.T) { + scenario.endpoint.ValidateAndSetDefaults() + scenario.endpoint.Conditions = scenario.conditions + result := scenario.endpoint.EvaluateHealth() + if result.Success != scenario.success { + t.Errorf("Expected success to be %v, but was %v", scenario.success, result.Success) + } + }) + } +} + func TestIntegrationEvaluateHealthForSSH(t *testing.T) { scenarios := []struct { name string diff --git a/config/endpoint/mqtt/mqtt.go b/config/endpoint/mqtt/mqtt.go new file mode 100644 index 0000000000..828c05c5a6 --- /dev/null +++ b/config/endpoint/mqtt/mqtt.go @@ -0,0 +1,24 @@ +package mqtt + +import ( + "errors" +) + +var ( + // ErrEndpointWithoutMQTTTopic is the error with which Gatus will panic if an endpoint with MQTT monitoring is configured without a topic. + ErrEndpointWithoutMQTTTopic = errors.New("you must specify a topic for each MQTT endpoint") +) + +type Config struct { + Topic string `yaml:"topic,omitempty"` + Username string `yaml:"username,omitempty"` + Password string `yaml:"password,omitempty"` +} + +// Validate the SSH configuration +func (cfg *Config) Validate() error { + if len(cfg.Topic) == 0 { + return ErrEndpointWithoutMQTTTopic + } + return nil +} diff --git a/config/endpoint/mqtt/mqtt_test.go b/config/endpoint/mqtt/mqtt_test.go new file mode 100644 index 0000000000..d7b294f861 --- /dev/null +++ b/config/endpoint/mqtt/mqtt_test.go @@ -0,0 +1,23 @@ +package mqtt + +import ( + "errors" + "testing" +) + +func TestMQTT_validate(t *testing.T) { + cfg := &Config{} + if err := cfg.Validate(); err == nil { + t.Error("expected an error") + } else if !errors.Is(err, ErrEndpointWithoutMQTTTopic) { + t.Errorf("expected error to be '%v', got '%v'", ErrEndpointWithoutMQTTTopic, err) + } + cfg.Username = "username" + if err := cfg.Validate(); err != nil { + t.Errorf("expected no error, got '%v'", err) + } + cfg.Password = "password" + if err := cfg.Validate(); err != nil { + t.Errorf("expected no error, got '%v'", err) + } +} diff --git a/go.mod b/go.mod index 48fffef847..06b2555339 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/TwiN/whois v1.1.9 github.com/aws/aws-sdk-go v1.55.5 github.com/coreos/go-oidc/v3 v3.11.0 + github.com/eclipse/paho.mqtt.golang v1.5.0 github.com/gofiber/fiber/v2 v2.52.5 github.com/google/go-github/v48 v48.2.0 github.com/google/uuid v1.6.0 @@ -50,6 +51,7 @@ require ( github.com/google/s2a-go v0.1.8 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect github.com/googleapis/gax-go/v2 v2.14.0 // indirect + github.com/gorilla/websocket v1.5.3 // indirect github.com/hashicorp/go-version v1.6.0 // indirect github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect diff --git a/go.sum b/go.sum index 33fbe4426c..2f47f930ef 100644 --- a/go.sum +++ b/go.sum @@ -36,6 +36,8 @@ github.com/davidmz/go-pageant v1.0.2 h1:bPblRCh5jGU+Uptpz6LgMZGD5hJoOt7otgT454Wv github.com/davidmz/go-pageant v1.0.2/go.mod h1:P2EDDnMqIwG5Rrp05dTRITj9z2zpGcD9efWSkTNKLIE= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/eclipse/paho.mqtt.golang v1.5.0 h1:EH+bUVJNgttidWFkLLVKaQPGmkTUfQQqjOsyvMGvD6o= +github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTqANF7XU7Fk0aOTAgk= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/go-fed/httpsig v1.1.0 h1:9M+hb0jkEICD8/cAiNqEB66R87tTINszBRTjwjQzWcI= @@ -68,6 +70,8 @@ github.com/googleapis/enterprise-certificate-proxy v0.3.4 h1:XYIDZApgAnrN1c855gT github.com/googleapis/enterprise-certificate-proxy v0.3.4/go.mod h1:YKe7cfqYXjKGpGvmSg28/fFvhNzinZQm8DGnaburhGA= github.com/googleapis/gax-go/v2 v2.14.0 h1:f+jMrjBPl+DL9nI4IQzLUxMq7XrAqFYB7hBPqMNIe8o= github.com/googleapis/gax-go/v2 v2.14.0/go.mod h1:lhBCnjdLrWRaPvLWhmc8IS24m9mr07qSYnHncrgo+zk= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek= github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=