description |
---|
How to implement VerneMQ plugins using a HTTP/HTTPS interface |
The VerneMQ Webhooks plugin provides an easy and flexible way to build powerful plugins for VerneMQ using web hooks. With VerneMQ Webhooks you are free to select the implementation language to match your technical requirements or the language in which you feel comfortable and productive in. You can use any modern language such as Python, Go, C#/.Net and indeed any language in which you can build something that can handle HTTP(s) requests.
The idea of VerneMQ Webhooks very simple: you can register an HTTP(s) endpoint with a VerneMQ plugin hook and whenever the hook (such as auth_on_register
) is called, the VerneMQ Webhooks plugin dispatches a HTTP post request to the registered endpoint. The HTTP post request contains a HTTP header like vernemq-hook: auth_on_register
and a JSON encoded payload. The endpoint then responds with code 200 on success and with a JSON encoded payload informing the VerneMQ Webhooks plugin which action to take (if any).
To enable webhooks make sure to set:
plugins.vmq_webhooks = on
And then each webhook can be configured like this:
vmq_webhooks.mywebhook1.hook = auth_on_register
vmq_webhooks.mywebhook1.endpoint = http://127.0.0.1/myendpoints
It is possible to have the webhooks plugin omit sending the payload for the auth_on_publish and auth_on_publish_m5 webhooks by setting the no_payload
config:
vmq_webhooks.mywebhook1.no_payload = on
It is also possible to dynamically register webhooks at run-time:
$ vmq-admin webhooks register hook=auth_on_register endpoint="http://localhost"
See which endpoints are registered:
$ vmq-admin webhooks show
And finally deregistering an endpoint:
$ vmq-admin webhooks deregister hook=auth_on_register endpoint="http://localhost"
{% hint style="info" %} You might consider placing the endpoint implementation locally on each VerneMQ node such that each request can go over localhost without being subject to network issues. {% endhint %}
In case your WebHooks backend requires HTTPS, you can configure the VerneMQ internal HTTP client to do so as well. There are various option you can set in the vernemq.conf
file:
vmq_webhooks.cafile
vmq_webhooks.tls_version
vmq_webhooks.verify_peer
vmq_webhooks.depth
vmq_webhooks.certfile
vmq_webhooks.use_crls
vmq_webhooks.keyfile
vmq_webhooks.keyfile_password
Check the WebHooks Schema file for quick documentation on those options or to look up their configured defaults.
Each registered hook uses by default a connection pool containing maximally 100 connections. This can be changed by setting vmq_webhooks.pool_max_connections
to a different value. Similarly the vmq_webhooks.pool_timeout
configuration (value is in milliseconds) can be set to control how long an unused connection should stay in the connection pool before being closed and removed. The default value is 60000 (60 seconds).
VerneMQ webhooks support caching of the auth_on_register
, auth_on_publish
, auth_on_subscribe
, auth_on_register_m5
, auth_on_publish_m5
and auth_on_subscribe_m5
hooks.
This can be used to speed up authentication and authorization tremendously. All data passed to these hooks is used to look if the call is in the cache, except in the case of the auth_on_publish
and auth_on_publish_m5
where the payload is omitted.
To enable caching for an endpoint simply return the cache-control: max-age=AgeInSeconds
in the response headers to one of the mentioned hooks. If the call was successful (authentication granted), the request will be cached together with any modifiers, except for the payload
modifier in the auth_on_publish
hook.
Whenever a non-expired entry is looked up in the cache the endpoint will not be called and the modifiers of the cached entry will be returned, if any.
It is possible to inspect the cache using:
$ vmq-admin webhooks cache show
{% hint style="info" %} Cache entries are currently not actively disposed after expiry and will remain in memory. {% endhint %}
All webhooks are called with method POST
. All hooks need to be answered with the HTTP code 200
to be considered successful. Any hook called that does not return the 200
code will be logged as an error as will any hook with an unparseable payload.
All hooks are called with the header vernemq-hook
which contains the name of the hook in question.
For detailed information about the hooks and when they are called, see the sections Session Lifecycle, Subscribe Flow and Publish Flow.
{% hint style="info" %} Note, when overriding a mountpoint or a client-id both have to be returned by the webhook implementation for it to have an effect. {% endhint %}
All hooks, unless stated otherwise, respond with a JSON-encoded payload and a success code of 200. All hooks support responding with "ok", indicated that the request was successful.
{
"result": "ok"
}
Other possible responses are "next", meaning that the next callback should be tried.
{
"result": "next"
}
Errors, e.g. authentication failures, are returned by a an "error" payload, either with the predefined "not_allowed"
{
"result": {
"error": "not_allowed"
}
}
or some other error text:
{
"result": {
"error": "some_error_message"
}
}
Header: vernemq-hook: auth_on_register
Webhook example payload:
{
"peer_addr": "127.0.0.1",
"peer_port": 8888,
"username": "username",
"password": "password",
"mountpoint": "",
"client_id": "clientid",
"clean_session": false
}
Additionaly, to the standard "ok" response. It is also possible to override various client specific settings by returning an array of modifiers:
{
"result": "ok",
"modifiers": {
"max_message_size": 65535,
"max_inflight_messages": 10000,
"retry_interval": 20000
}
}
Note, the retry_interval
is in milli-seconds. It is possible to override many more settings, see the Session Lifecycle for more information.
Other possible responses are next and error (not_allowed).
Header: vernemq-hook: auth_on_subscribe
Webhook example payload:
{
"client_id": "clientid",
"mountpoint": "",
"username": "username",
"topics":
[{"topic": "a/b",
"qos": 1},
{"topic": "c/d",
"qos": 2}]
}
An example where where the topics to subscribe have been rewritten looks like:
{
"result": "ok",
"topics":
[{"topic": "rewritten/topic",
"qos": 0}]
}
Note, you can also pass a qos
with value 128
which means it was either not possible or the client was not allowed to subscribe to that specific question.
Other possible responses are "next" and "error".
Header: vernemq-hook: auth_on_publish
Note, in the example below the payload is not base64 encoded which is not the default.
Webhook example payload:
{
"username": "username",
"client_id": "clientid",
"mountpoint": "",
"qos": 1,
"topic": "a/b",
"payload": "hello",
"retain": false
}
A complex example where the publish topic, qos, payload and retain flag is rewritten looks like:
{
"result": "ok",
"modifiers": {
"topic": "rewritten/topic",
"qos": 2,
"payload": "rewritten payload",
"retain": true
}
}
Other possible responses are "next" and "error".
Header: vernemq-hook: on_register
Webhook example payload:
{
"peer_addr": "127.0.0.1",
"peer_port": 8888,
"username": "username",
"mountpoint": "",
"client_id": "clientid"
}
The response should be an empty json object {}
.
Header: vernemq-hook: on_publish
Note, in the example below the payload is not base64 encoded which is not the default.
Webhook example payload:
{
"username": "username",
"client_id": "clientid",
"mountpoint": "",
"qos": 1,
"topic": "a/b",
"payload": "hello",
"retain": false
}
The response should be an empty json object {}
.
Header: vernemq-hook: on_subscribe
Webhook example payload:
{
"client_id": "clientid",
"mountpoint": "",
"username": "username",
"topics":
[{"topic": "a/b",
"qos": 1},
{"topic": "c/d",
"qos": 2}]
}
The response should be an empty json object {}
.
Header: vernemq-hook: on_unsubscribe
Webhook example payload:
{
"username": "username",
"client_id": "clientid",
"mountpoint": "",
"topics":
["a/b", "c/d"]
}
Example response:
{
"result": "ok",
"topics":
["rewritten/topic"]
}
Other possible responses are "next" and "error".
Header: vernemq-hook: on_deliver
Note, in the example below the payload is not base64 encoded which is not the default.
Webhook example payload:
{
"username": "username",
"client_id": "clientid",
"mountpoint": "",
"topic": "a/b",
"payload": "hello"
}
Example response:
{
"result": "ok",
"modifiers":
{
"topic": "rewritten/topic",
"payload": "rewritten payload"
}
}
An other possible response is "next".
Header: vernemq-hook: on_offline_message
Note, in the example below the payload is not base64 encoded which is not the default.
Webhook example payload:
{
"client_id": "clientid",
"mountpoint": "",
"qos": "1",
"topic": "sometopic",
"payload": "payload",
"retain": false
}
The response should be an empty json object {}
.
Header: vernemq-hook: on_client_wakeup
Webhook example payload:
{
"client_id": "clientid",
"mountpoint": ""
}
The response should be an empty json object {}
.
Header: vernemq-hook: on_client_offline
Webhook example payload:
{
"client_id": "clientid",
"mountpoint": ""
}
The response should be an empty json object {}
.
Header: vernemq-hook: on_client_gone
Webhook example payload:
{
"client_id": "clientid",
"mountpoint": ""
}
The response should be an empty json object {}
.
Header: vernemq-hook: auth_on_register_m5
Webhook example payload:
{
"peer_addr": "127.0.0.1",
"peer_port": 8888,
"mountpoint": "",
"client_id": "client-id",
"username": "username",
"password": "password",
"clean_start": true,
"properties": {}
}
It is also possible to override various client specific settings by returning an array of modifiers:
{
"result": "ok",
"modifiers": {
"max_message_size": 65535,
"max_inflight_messages": 10000
}
}
Note, the retry_interval
is in milli-seconds. It is possible to override many more settings, see the Session Lifecycle for more information.
Other possible responses are "next" and "error".
Header vernemq-hook: on_auth_m5
Webhook example payload:
{
"username": "username",
"mountpoint": "",
"client_id": "client-id",
"properties": {
"p_authentication_data": "QVVUSF9EQVRBMA==",
"p_authentication_method": "AUTH_METHOD"
}
}
Note, as the authentication data is binary data it is base64 encoded.
A minimal response indicating the authentication was successful looks like:
"modifiers": {
"properties": {
"p_authentication_data": "QVVUSF9EQVRBMQ==",
"p_authentication_method": "AUTH_METHOD"
}
"reason_code": 0
},
"result": "ok"
}
If authentication were to continue for another round a reason code with value 24 (Continue Authentication) should be returned instead. See also the relevant section in the MQTT 5.0 specification.
Header: vernemq-hook: auth_on_subscribe_m5
Webhook example payload:
{
"username": "username",
"mountpoint": "",
"client_id": "client-id",
"topics": [
{
"topic": "test/topic",
"qos": 1
}
],
"properties": {}
}
An example where where the topics to subscribe have been rewritten looks like:
{
"modifiers": {
"topics": [
{
"qos": 2,
"topic": "rewritten/topic"
},
{
"qos": 135,
"topic": "forbidden/topic"
}
]
},
"result": "ok"
}
Note, the forbidden/topic
has been rejected with the qos
value of 135 (Not authorized).
Other possible responses are "next" and "error".
Header: vernemq-hook: auth_on_publish_m5
Note, in the example below the payload is not base64 encoded which is not the default.
Webhook example payload:
{
"username": "username",
"mountpoint": "",
"client_id": "client-id",
"qos": 1,
"topic": "some/topic",
"payload": "message payload",
"retain": false,
"properties": {
}
}
A response where the publish topic has been rewritten:
{
"modifiers": {
"topic": "rewritten/topic"
},
"result": "ok"
}
Other possible responses are "next" and "error" (not_allowed).
Header: vernemq-hook: on_register_m5
Webhook example payload:
{
"peer_addr": "127.0.0.1",
"peer_port": 8888,
"mountpoint": "",
"client_id": "client-id",
"username": "username",
"properties": {
}
}
The response should be an empty json object {}
.
Header: vernemq-hook: on_publish_m5
Note, in the example below the payload is base64 encoded .
Webhook example payload:
{
"username": "username",
"mountpoint": "",
"client_id": "client-id",
"qos": 1,
"topic": "test/topic",
"payload": "message payload",
"retain": false,
"properties": {
}
}
The response should be an empty json object {}
.
Header: vernemq-hook: on_subscribe_m5
Webhook example payload:
{
"username": "username",
"mountpoint": "",
"client_id": "client-id",
"topics": [
{
"topic": "test/topic",
"qos": 1
},
{
"topic": "test/topic",
"qos": 128
}
],
"properties": {
}
}
Note, the qos value of 128
(Unspecified error) means the subscription was rejected.
The response should be an empty json object {}
.
Header: vernemq-hook: on_unsubscribe_m5
Webhook example payload:
{
"username": "username",
"mountpoint": "",
"client_id": "client-id",
"topics": [
"test/topic"
],
"properties": {
}
}
Example response:
{
"modifiers": {
"topics": [
"rewritten/topic"
]
},
"result": "ok"
}
It supports the standard "OK" response, as well "next".
Header: vernemq-hook: on_deliver_m5
Note, in the example below the payload is not base64 encoded which is not the default.
Webhook example payload:
{
"username": "username",
"mountpoint": "",
"client_id": "client-id",
"topic": "test/topic",
"payload": "message payload",
"properties": {
}
}
It supports the standard "OK" response, as well "next" and "error".
Below is a very simple example of an endpoint implemented in Python. It uses the web
and json
modules and implements handlers for six different hooks: auth_on_register
, auth_on_publish
, auth_on_subscribe
, auth_on_register_m5
, auth_on_publish_m5
and auth_on_subscribe_m5
.
The auth_on_register
hook only restricts access only to the user with username joe
and password secret
. It also shows how to cache the result. The auth_on_subscribe
and auth_on_publish
hooks allow any subscription or publish to continue as is. These last two hooks are needed as the default
policy is deny
.
import web
import json
urls = ('/.*', 'hooks')
app = web.application(urls, globals())
class hooks:
def POST(self):
# fetch hook and request data
hook = web.ctx.env.get('HTTP_VERNEMQ_HOOK')
data = json.loads(web.data())
# print the hook and request data to the console
print
print ('hook:', hook)
print (' data: ', data)
# dispatch to appropriate function based on the hook.
if hook == 'auth_on_register':
return handle_auth_on_register(data)
elif hook == 'auth_on_register_m5':
return handle_auth_on_register(data)
elif hook == 'auth_on_publish':
return handle_auth_on_publish(data)
elif hook == 'auth_on_publish_m5':
return handle_auth_on_publish(data)
elif hook == 'auth_on_subscribe':
return handle_auth_on_subscribe(data)
elif hook == 'auth_on_subscribe_m5':
return handle_auth_on_subscribe(data)
else:
web.ctx.status = 501
return "not implemented"
def handle_auth_on_register(data):
# Cache example
web.header('cache-control', 'max-age=30')
# only allow user 'joe' with password 'secret', reject all others.
if "joe" == data['username']:
if "secret" == data['password']:
return json.dumps({'result': 'ok'})
return json.dumps({'result': {'error': 'not allowed'}})
def handle_auth_on_publish(data):
# accept all publish requests
return json.dumps({'result': 'ok'})
def handle_auth_on_subscribe(data):
# accept all subscribe requests
return json.dumps({'result': 'ok'})
if __name__ == '__main__':
app.run()
The following configuration can be used for testing the Python example.
plugins.vmq_webhooks = on
# auth_on_register
vmq_webhooks.webhook1.hook = auth_on_register
vmq_webhooks.webhook1.endpoint = http://127.0.0.1:8080
# auth_on_subscribe
vmq_webhooks.webhook2.hook = auth_on_subscribe
vmq_webhooks.webhook2.endpoint = http://127.0.0.1:8080
# auth_on_register_m5
vmq_webhooks.webhook3.hook = auth_on_register_m5
vmq_webhooks.webhook3.endpoint = http://127.0.0.1:8080
# auth_on_subscribe_m5
vmq_webhooks.webhook4.hook = auth_on_subscribe_m5
vmq_webhooks.webhook4.endpoint = http://127.0.0.1:8080