Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions en_US/data-integration/rule-sql-builtin-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -958,7 +958,7 @@ map_get('a', map_put('a', 2, json_decode('{"a": 1}'))) = 2

### map_to_redis_hset_args(Map) -> list

::: tip
::: tip

This function has been introduced since EMQX v5.7.1.

Expand Down Expand Up @@ -1413,7 +1413,7 @@ The Schema Resigtry is an EMQX Enterprise edition feature.

:::

EMQX Enterprise also supports using `schema_encode` and `schema_decode` functions to decode and encode [Protobuf (Protocol Buffers)](https://developers.google.com/protocol-buffers) and [Avro](https://avro.apache.org/) data according to a specified schema. You can read more about these functions in [Schema Registry](./schema-registry.md).
EMQX Enterprise also supports using `schema_encode` and `schema_decode` functions to decode and encode [Protobuf (Protocol Buffers)](https://developers.google.com/protocol-buffers) and [Avro](https://avro.apache.org/) data according to a specified schema. You can read more about these functions in [Schema Registry](./schema-registry.md).

### schema_encode(SchemaID: string, Data: map) -> binary

Expand All @@ -1433,7 +1433,7 @@ Decodes `Bin` using the specified Protobuf Schema. Create a schema in the Schema

### **Sparkplug B Functions**

EMQX Enterprise also has special purpose functions for decoding and encoding Sparkplug B messages (`sparkplug_decode` and `sparkplug_encode`). You can read more about the sparkplug functions in [Sparkplug B](./sparkplug.md).
EMQX Enterprise also has special purpose functions for decoding and encoding Sparkplug B messages (`spb_decode` and `spb_encode`). You can read more about the sparkplug functions in [Sparkplug B](./sparkplug.md).

## Date and Time Conversion Functions

Expand Down
70 changes: 38 additions & 32 deletions en_US/data-integration/sparkplug.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,24 @@ Sparkplug encoding scheme version B (Sparkplug B) defines the MQTT namespace for

This page guides you through the implementation of Sparkplug B in EMQX including data format, functions, and practical examples.

## Sparkplug B Data Format
## Sparkplug B Data Format


Sparkplug B utilizes a well-defined payload structure to standardize data communication. At its core, it employs [Protocol Buffers (Protobuf)](https://developers.google.com/protocol-buffers) for structuring Sparkplug messages, resulting in lightweight, efficient, and flexible data interchange.

EMQX offers advanced support for Sparkplug B through the [Schema Registry](./schema-registry.md) feature. With the Schema Registry, you can create custom encoders and decoders for various data formats, including Sparkplug B. By defining the [appropriate Sparkplug B schema](https://github.com/eclipse/tahu/blob/46f25e79f34234e6145d11108660dfd9133ae50d/sparkplug_b/sparkplug_b.proto) in the registry, you can use the `schema_decode` and `schema_encode` functions in EMQX's rule engine to access and manipulate data adhering to the specified format.

Additionally, EMQX offers built-in support for Sparkplug B, eliminating the need for the schema registry for this specific format. The `sparkplug_encode` and `sparkplug_decode` functions are readily available in EMQX, simplifying the encoding and decoding of Sparkplug B messages within the rule engine.
Additionally, EMQX offers built-in support for Sparkplug B, eliminating the need for the schema registry for this specific format. The `spb_encode` and `spb_decode` functions are readily available in EMQX, simplifying the encoding and decoding of Sparkplug B messages within the rule engine.

:::: tip

The previous versions of these functions, `sparkplug_encode` and `sparkplug_decode`, have been deprecated. Please switch to using `spb_encode` and `spb_decode`, respectively.

::::

## Sparkplug B Functions

EMQX provides two rule engine SQL functions for encoding and decoding Sparkplug B data: `sparkplug_encode` and `sparkplug_decode`. The [Practical Examples](#practical-examples) section helps you to understand how to use these functions in different scenarios.
EMQX provides two rule engine SQL functions for encoding and decoding Sparkplug B data: `spb_encode` and `spb_decode`. The [Practical Examples](#practical-examples) section helps you to understand how to use these functions in different scenarios.

The Sparkplug B encoding and decoding functions can be used to perform a wide variety of tasks due to the flexibility of the rule engine and its `jq` function. To learn more about the rule engine and its `jq` function, refer to the following pages:

Expand All @@ -32,39 +38,39 @@ The Sparkplug B encoding and decoding functions can be used to perform a wide va
* [The Rule Engine JQ Fuction](./rule-sql-jq.md)
* [Full Description of the JQ Programming Language](https://stedolan.github.io/jq/manual/)

### sparkplug_decode
### spb_decode

The `sparkplug_decode` function is used to decode Sparkplug B messages, for example, if you want forward a message to a specific topic based on the contents of a Sparkplug B encoded message or change the Sparkplug B message in some way. It converts the raw Sparkplug B encoded payload into a more user-friendly format that can be further processed or analyzed.
The `spb_decode` function is used to decode Sparkplug B messages, for example, if you want forward a message to a specific topic based on the contents of a Sparkplug B encoded message or change the Sparkplug B message in some way. It converts the raw Sparkplug B encoded payload into a more user-friendly format that can be further processed or analyzed.

Example usage:

```sql
select
sparkplug_decode(payload) as decoded
spb_decode(payload) as decoded
from t
```

In the example above, `payload` refers to the raw Sparkplug B message that you wish to decode.

The [Sparkplug B Protobuf schema](https://github.com/emqx/emqx/blob/039e27a153422028e3d0e7d517a521a84787d4a8/lib-ee/emqx_ee_schema_registry/priv/sparkplug_b.proto) can provide further insights into the structure of messages.

### sparkplug_encode
### spb_encode

The `sparkplug_encode` function is used to encode data into a Sparkplug B message. This is particularly useful when you need to send Sparkplug B messages to MQTT clients or other components of your system.
The `spb_encode` function is used to encode data into a Sparkplug B message. This is particularly useful when you need to send Sparkplug B messages to MQTT clients or other components of your system.

Example usage:

```sql
select
sparkplug_encode(json_decode(payload)) as encoded
spb_encode(json_decode(payload)) as encoded
from t
```

In the example above, `payload` refers to the data that you wish to encode into a Sparkplug B message.

## Practical Examples

This section provides practical examples for handling Sparkplug B messages using the `sparkplug_decode` and `sparkplug_encode` functions. Note that the examples given are just a small subset of what you can do.
This section provides practical examples for handling Sparkplug B messages using the `spb_decode` and `spb_encode` functions. Note that the examples given are just a small subset of what you can do.

Consider scenarios where you have a Sparkplug B encoded message with the following structure:

Expand Down Expand Up @@ -108,7 +114,7 @@ Consider scenarios where you have a Sparkplug B encoded message with the followi
```


### Extract Data
### Extract Data

Suppose you get messages from a device on the topic `my/sparkplug/topic` and want to forward just the `counter_group1/counter1_run` metric to another topic called `intresting_counters/counter1_run_updates` as a JSON formatted message. The instructions below demonstrate how to achieve this task by creating a rule in EMQX Dashboard and testing the rule using [MQTTX](https://mqttx.app/) client tool.

Expand All @@ -124,7 +130,7 @@ Suppose you get messages from a device on the topic `my/sparkplug/topic` and wan
.metrics[] |
select(.name == "counter_group1/counter1_run")
',
sparkplug_decode(payload)) AS item
spb_decode(payload)) AS item
DO item
FROM "my/sparkplug/topic"
```
Expand All @@ -140,7 +146,7 @@ Suppose you get messages from a device on the topic `my/sparkplug/topic` and wan
2. Click **+ Add Action** on the right side of the page. Select`Republish` from the **Action** drop-down list. Enter `intresting_counters/counter1_run_updates` as the republish topic and enter `${item}` in the **Payload** field for the action. Click **Add**.
3. Back on the **Create Rule** page, click **Create**. You can see a rule is created in the Rule list.

#### Test the Rule
#### Test the Rule

You can simulate an MQTT client using the MQTTX client tool to publish the Sparkplug B message to the topic `my/sparkplug/topic`. Then, you can verify that the message is transformed and forwarded to the topic `intresting_counters/counter1_run_updates` as a JSON formatted message:

Expand All @@ -167,7 +173,7 @@ You can simulate an MQTT client using the MQTTX client tool to publish the Spark

### Update Data

Consider a scenario where you discover an incorrect metric named `counter_group1/counter1_run` and want to remove it from the Sparkplug B encoded payload before forwarding the message.
Consider a scenario where you discover an incorrect metric named `counter_group1/counter1_run` and want to remove it from the Sparkplug B encoded payload before forwarding the message.

Similar to the demonstration in [Extract Data](#extract-data), you can create the following rule with a republish action in EMQX Dashboard.

Expand All @@ -183,16 +189,16 @@ jq('
# Update payload with new metrics
$payload | .metrics = $updated_metrics
',
sparkplug_decode(payload)) AS item
DO sparkplug_encode(item) AS updated_payload
spb_decode(payload)) AS item
DO spb_encode(item) AS updated_payload
FROM "my/sparkplug/topic"
```

In this rule, `sparkplug_decode` is used to decode the message and then `jq` is used to filter out the metric with the name `counter_group1/counter1_run`. Then, `sparkplug_encode` in the `DO` clause is used to encode the message again.
In this rule, `spb_decode` is used to decode the message and then `jq` is used to filter out the metric with the name `counter_group1/counter1_run`. Then, `spb_encode` in the `DO` clause is used to encode the message again.

In the republish action, use `${updated_payload}` as the payload because it is the name assigned to the updated Sparkplug B encoded message.

Similarly, you can also use `sparkplug_decode` and `sparkplug_encode` to update the value of a metric. Consider a scenario where you want to update the value of the metric with the name `counter_group1/counter1_run` to 0. You can achieve this by using the following rule:
Similarly, you can also use `spb_decode` and `spb_encode` to update the value of a metric. Consider a scenario where you want to update the value of the metric with the name `counter_group1/counter1_run` to 0. You can achieve this by using the following rule:

```sql
FOREACH
Expand All @@ -212,8 +218,8 @@ jq('
# Update payload with new metrics
$payload | .metrics = $updated_metrics
',
sparkplug_decode(payload)) AS item
DO sparkplug_encode(item) AS item
spb_decode(payload)) AS item
DO spb_encode(item) AS item
FROM "my/sparkplug/topic"
```

Expand All @@ -232,17 +238,17 @@ jq('
"int_value": 42,
"datatype": 5
} as $new_value |
# Create new metrics array
# Create new metrics array
($old_metrics + [ $new_value ]) as $updated_metrics |
# Update payload with new metrics
$payload | .metrics = $updated_metrics
',
sparkplug_decode(payload)) AS item
DO sparkplug_encode(item) AS item
spb_decode(payload)) AS item
DO spb_encode(item) AS item
FROM "my/sparkplug/topic"
```

### Filter Messages
### Filter Messages

Consider a scenario where you want to forward only the messages where the value of the metric with the name `counter_group1/counter1_run` is greater than 0. You can achieve this by using the following rule:

Expand All @@ -257,14 +263,14 @@ jq('
# Filter out messages where value of metric with name $to_filter is 0 or smaller
if $value > 0 then $payload else empty end
',
sparkplug_decode(payload)) AS item
DO sparkplug_encode(item) AS item
spb_decode(payload)) AS item
DO spb_encode(item) AS item
FROM "my/sparkplug/topic"
```

In the above rule, the `jq` function outputs an empty array if the value of the metric with the name `counter_group1/counter1_run` is 0 or smaller. This means that the message will not be forwarded to any of the actions connected to the rule, if the value is 0 or smaller.

### Split Messages
### Split Messages

Consider a scenario where you want to split a Sparkplug B encoded message into multiple messages, with each metric in the metrics array is republished as a separate Sparkplug B encoded message. This can be accomplished with the following rule:

Expand All @@ -279,16 +285,16 @@ jq('
# Let the current metric be the only one in the metrics array
$payload | .metrics = [ $metric ]
',
sparkplug_decode(payload)) AS item
DO sparkplug_encode(item) AS output_payload
spb_decode(payload)) AS item
DO spb_encode(item) AS output_payload
FROM "my/sparkplug/topic"
```

In the above rule, the `jq` function outputs an array with multiple items (given that there is more than one item in the metrics array).
All the actions connected to the rule will be triggered for each item in the array.
With the rule above you need to set the payload in the republish action to `${output_payload}` as `output_payload` is the name we assigned to the Sparkplug B encoded message in the `DO` clause.

### Split Messages and Send to Topics Based on Content
### Split Messages and Send to Topics Based on Content

Consider a scenario where you want to split a Sparkplug B encoded message but you also want to send each message to a different topic based on, for example, the metrics name. Suppose that the output topic name should be constructed by concatenating the strings `"my_metrics/"` with the name of the metric contained in the message. You can accomplish this with the following slightly modified code:

Expand All @@ -304,9 +310,9 @@ jq('
# Let the current metric be the only one in the metrics array
$payload | .metrics = [ $metric ]
',
sparkplug_decode(payload)) AS item
spb_decode(payload)) AS item
DO
sparkplug_encode(item) AS output_payload,
spb_encode(item) AS output_payload,
first(jq('"my_metrics/" + .metrics[0].name', item)) AS output_topic
FROM "my/sparkplug/topic"
```
Expand Down
2 changes: 1 addition & 1 deletion zh_CN/data-integration/rule-sql-builtin-functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -1425,7 +1425,7 @@ sqlserver_bin2hexstr(str_utf16_le('你好')) = '0x604F7D59'

### Sparkplug B

EMQX 企业版还有专门用于解码和编码 Sparkplug B 消息的特殊用途函数(`sparkplug_decode` 和`sparkplug_encode`)。您可以在 [Sparkplug B](./sparkplug.md) 中了解有关 Sparkplug 函数的更多信息。
EMQX 企业版还有专门用于解码和编码 Sparkplug B 消息的特殊用途函数(`spb_decode` 和`spb_encode`)。您可以在 [Sparkplug B](./sparkplug.md) 中了解有关 Sparkplug 函数的更多信息。

## 日期与时间函数

Expand Down
Loading