diff --git a/app/src/actions/Publish.ts b/app/src/actions/Publish.ts index 91130b41..ffebcc57 100644 --- a/app/src/actions/Publish.ts +++ b/app/src/actions/Publish.ts @@ -3,6 +3,7 @@ import { AppState } from '../reducers' import { Base64Message } from '../../../backend/src/Model/Base64Message' import { Dispatch } from 'redux' import { makePublishEvent, rendererEvents } from '../../../events' +import { Decoder } from '../../../backend/src/Model/Decoder' export const setTopic = (topic?: string): Action => { return { @@ -47,6 +48,14 @@ export const publish = (connectionId: string) => (dispatch: Dispatch, ge retain: state.publish.retain, qos: state.publish.qos, } + + if ( + mqttMessage.payload && + mqttMessage.topic.match(/spBv1\.0\/[^/]+\/(DDATA|NDATA|NCMD|DCMD|NBIRTH|DBIRTH|NDEATH|DDEATH\/[^/]+\/)/u) + ) { + mqttMessage.payload.decoder = Decoder.SPARKPLUG + } + rendererEvents.emit(publishEvent, mqttMessage) } diff --git a/backend/src/DataSource/MqttSource.ts b/backend/src/DataSource/MqttSource.ts index fb76c2f6..0ccd97f7 100644 --- a/backend/src/DataSource/MqttSource.ts +++ b/backend/src/DataSource/MqttSource.ts @@ -4,6 +4,8 @@ import { Client, connect as mqttConnect } from 'mqtt' import { DataSource, DataSourceStateMachine } from './' import { MqttMessage } from '../../../events' import { Base64Message } from '../Model/Base64Message' +import { Decoder } from '../Model/Decoder' +import { SparkplugEncoder } from '../Model/sparkplugb' export interface MqttOptions { url: string @@ -47,7 +49,6 @@ export class MqttSource implements DataSource { throw error } - const client = mqttConnect(url.toString(), { resubscribe: false, rejectUnauthorized: options.certValidation, @@ -99,7 +100,12 @@ export class MqttSource implements DataSource { public publish(msg: MqttMessage) { if (this.client) { - this.client.publish(msg.topic, msg.payload ? Base64Message.toUnicodeString(msg.payload) : '', { + let payload: string | Buffer = msg.payload ? Base64Message.toUnicodeString(msg.payload) : '' + if (msg.payload?.decoder === Decoder.SPARKPLUG) { + payload = SparkplugEncoder.encode(payload) || payload + } + + this.client.publish(msg.topic, payload, { qos: msg.qos, retain: msg.retain, }) diff --git a/backend/src/Model/sparkplugb.ts b/backend/src/Model/sparkplugb.ts index 467752d9..e64d66a6 100644 --- a/backend/src/Model/sparkplugb.ts +++ b/backend/src/Model/sparkplugb.ts @@ -21,4 +21,22 @@ export const SparkplugDecoder = { // ignore } } + +export const SparkplugEncoder = { + encode(input: string): Buffer | undefined { + try { + console.log(input) + const payload = JSON.parse(input) + return Buffer.from( + SparkplugPayload.encode( + SparkplugPayload.create({ + timestamp: Date.now(), + ...payload, + }) + ).finish() + ) + } catch (err) { + // todo ? + } + }, }