Skip to content

Commit

Permalink
feat: add sparkplug b encoder
Browse files Browse the repository at this point in the history
needs error handling
  • Loading branch information
ferm10n committed Jun 7, 2022
1 parent d1de077 commit 3dc2019
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 2 deletions.
9 changes: 9 additions & 0 deletions app/src/actions/Publish.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { AppState } from '../reducers'
import { Base64Message } from '../../../backend/src/Model/Base64Message'
import { Dispatch } from 'redux'
import { makePublishEvent, rendererEvents } from '../../../events'
import { Decoder } from '../../../backend/src/Model/Decoder'

export const setTopic = (topic?: string): Action => {
return {
Expand Down Expand Up @@ -47,6 +48,14 @@ export const publish = (connectionId: string) => (dispatch: Dispatch<Action>, ge
retain: state.publish.retain,
qos: state.publish.qos,
}

if (
mqttMessage.payload &&
mqttMessage.topic.match(/spBv1\.0\/[^/]+\/(DDATA|NDATA|NCMD|DCMD|NBIRTH|DBIRTH|NDEATH|DDEATH\/[^/]+\/)/u)
) {
mqttMessage.payload.decoder = Decoder.SPARKPLUG
}

rendererEvents.emit(publishEvent, mqttMessage)
}

Expand Down
10 changes: 8 additions & 2 deletions backend/src/DataSource/MqttSource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import { Client, connect as mqttConnect } from 'mqtt'
import { DataSource, DataSourceStateMachine } from './'
import { MqttMessage } from '../../../events'
import { Base64Message } from '../Model/Base64Message'
import { Decoder } from '../Model/Decoder'
import { SparkplugEncoder } from '../Model/sparkplugb'

export interface MqttOptions {
url: string
Expand Down Expand Up @@ -47,7 +49,6 @@ export class MqttSource implements DataSource<MqttOptions> {
throw error
}


const client = mqttConnect(url.toString(), {
resubscribe: false,
rejectUnauthorized: options.certValidation,
Expand Down Expand Up @@ -99,7 +100,12 @@ export class MqttSource implements DataSource<MqttOptions> {

public publish(msg: MqttMessage) {
if (this.client) {
this.client.publish(msg.topic, msg.payload ? Base64Message.toUnicodeString(msg.payload) : '', {
let payload: string | Buffer = msg.payload ? Base64Message.toUnicodeString(msg.payload) : ''
if (msg.payload?.decoder === Decoder.SPARKPLUG) {
payload = SparkplugEncoder.encode(payload) || payload
}

this.client.publish(msg.topic, payload, {
qos: msg.qos,
retain: msg.retain,
})
Expand Down
18 changes: 18 additions & 0 deletions backend/src/Model/sparkplugb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,22 @@ export const SparkplugDecoder = {
// ignore
}
}

export const SparkplugEncoder = {
encode(input: string): Buffer | undefined {
try {
console.log(input)
const payload = JSON.parse(input)
return Buffer.from(
SparkplugPayload.encode(
SparkplugPayload.create({
timestamp: Date.now(),
...payload,
})
).finish()
)
} catch (err) {
// todo ?
}
},
}

0 comments on commit 3dc2019

Please sign in to comment.