From b8a149dd190e614f42fb709bdfc1873ffa5247a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miles=20St=C3=B6tzner?= Date: Sun, 3 Sep 2023 17:44:13 +0200 Subject: [PATCH] fix mqtt --- src/core/bridge.ts | 5 +++++ src/receiver/mqtt.ts | 10 +++++++++- src/sender/file.ts | 3 +++ src/sender/mqtt.ts | 15 +++++++-------- tests/mqtt/mqtt.test.ts | 9 ++++++++- tests/socket-io/socket-io.test.ts | 4 +++- tests/ws/ws.test.ts | 4 +++- 7 files changed, 38 insertions(+), 12 deletions(-) diff --git a/src/core/bridge.ts b/src/core/bridge.ts index d0e0900..ae01b64 100644 --- a/src/core/bridge.ts +++ b/src/core/bridge.ts @@ -1,5 +1,6 @@ import {Receiver} from '#/receiver/receiver' import {Sender} from '#/sender/sender' +import * as assert from '#assert' import std from '#std' import hae from '#utils/hae' @@ -28,6 +29,10 @@ export class Bridge { std.log('bridge started') await this.receiver.receive( hae.log(async message => { + assert.isNumber(message.id) + assert.isArray(message.data) + message.data.forEach(assert.isNumber) + std.log('bridging', {message}) await this.sender.send(message) if (!this.receiver.continuous) await this.stop() diff --git a/src/receiver/mqtt.ts b/src/receiver/mqtt.ts index 7f120f4..d3967af 100644 --- a/src/receiver/mqtt.ts +++ b/src/receiver/mqtt.ts @@ -69,6 +69,14 @@ export class MQTTReceiver extends Receiver { } async stop() { - if (check.isDefined(this.server)) this.server.close() + std.log('stopping mqtt server') + if (check.isUndefined(this.server)) return std.log('mqtt server not defined') + + const server = this.server + return new Promise((resolve, reject) => { + server.close(error => { + std.log('mqtt server stopped') + }) + }) } } diff --git a/src/sender/file.ts b/src/sender/file.ts index 486be1e..906455f 100644 --- a/src/sender/file.ts +++ b/src/sender/file.ts @@ -1,6 +1,7 @@ import {Sender} from '#/sender/sender' import {Message} from '#/types' import * as files from '#files' +import std from '#std' export type FileSenderOptions = { file: string @@ -15,7 +16,9 @@ export class FileSender extends Sender { } async send(message: Message) { + std.log('writing to file') await files.createFile(this.options.file) await files.appendFile(this.options.file, JSON.stringify(message) + '\n') + std.log('wrote to file') } } diff --git a/src/sender/mqtt.ts b/src/sender/mqtt.ts index a3e079b..e2778e7 100644 --- a/src/sender/mqtt.ts +++ b/src/sender/mqtt.ts @@ -29,37 +29,36 @@ export class MQTTSender extends Sender { std.log(`mqtt client subscribed`) this.client.on('error', error => { - std.log(`websocket client errored`, {error}) + std.log(`mqtt client errored`, {error}) }) this.client.on('disconnect', () => { - std.log(`websocket client disconnected`) + std.log(`mqtt client disconnected`) }) this.client.on('offline', () => { - std.log(`websocket client offline`) + std.log(`mqtt client offline`) }) this.client.on('close', () => { - std.log(`websocket client closed`) + std.log(`mqtt client closed`) }) this.client.on('end', () => { - std.log(`websocket client ended`) + std.log(`mqtt client ended`) }) this.resolveReady() } async send(message: Message) { - assert.isDefined(this.client, 'websocket sender not started') + assert.isDefined(this.client, 'mqtt sender not started') await this.client.publishAsync(this.options.topic, JSON.stringify(message)) } async stop() { - // TODO: wait for disconnect std.log('stopping mqtt client') - if (check.isDefined(this.client)) this.client.end() + if (check.isDefined(this.client)) await this.client.endAsync(true) std.log('mqtt client stopped') } } diff --git a/tests/mqtt/mqtt.test.ts b/tests/mqtt/mqtt.test.ts index ef8977a..53cca07 100644 --- a/tests/mqtt/mqtt.test.ts +++ b/tests/mqtt/mqtt.test.ts @@ -9,10 +9,12 @@ describe('mqtt', () => { it('sender-receiver', async () => { const message: Message = {id: 69, data: [1, 2, 3]} const output = files.temporary() + const port = 3000 // Start mqtt receiver with file sender const receiver = await actions.createBridge({ receiver: 'mqtt', + receiverPort: String(port), sender: 'file', senderFile: output, }) @@ -23,7 +25,7 @@ describe('mqtt', () => { receiverId: String(message.id), receiverData: message.data.map(String), sender: 'mqtt', - senderEndpoint: 'mqtt://localhost:3000', + senderEndpoint: `mqtt://localhost:${port}`, }) std.log('waiting for message being bridged') @@ -34,5 +36,10 @@ describe('mqtt', () => { await files.deleteFile(output) await sender.stop() await receiver.stop() + + await utils.sleep(1000) }) }) + +// TODO: does not work anymore +// TODO: something at mqtt is not correctly stopped (or acked?) when using "yarn test" diff --git a/tests/socket-io/socket-io.test.ts b/tests/socket-io/socket-io.test.ts index ffaae5c..80d541a 100644 --- a/tests/socket-io/socket-io.test.ts +++ b/tests/socket-io/socket-io.test.ts @@ -9,11 +9,13 @@ describe('socket-io', () => { it('sender-receiver', async () => { const message: Message = {id: 69, data: [1, 2, 3]} const output = files.temporary() + const port = 3001 // Start socket-io receiver with file sender // TODO: if an error is thrown then the test does not abort ... const receiver = await actions.createBridge({ receiver: 'socket-io', + receiverPort: String(port), sender: 'file', senderFile: output, }) @@ -24,7 +26,7 @@ describe('socket-io', () => { receiverId: String(message.id), receiverData: message.data.map(String), sender: 'socket-io', - senderEndpoint: 'http://localhost:3000', + senderEndpoint: `http://localhost:${port}`, }) std.log('waiting for message being bridged') diff --git a/tests/ws/ws.test.ts b/tests/ws/ws.test.ts index 213c8a0..6e65e72 100644 --- a/tests/ws/ws.test.ts +++ b/tests/ws/ws.test.ts @@ -9,10 +9,12 @@ describe('websocket', () => { it('sender-receiver', async () => { const message: Message = {id: 69, data: [1, 2, 3]} const output = files.temporary() + const port = 3002 // Start websocket receiver with file sender const receiver = await actions.createBridge({ receiver: 'ws', + receiverPort: String(port), sender: 'file', senderFile: output, }) @@ -23,7 +25,7 @@ describe('websocket', () => { receiverId: String(message.id), receiverData: message.data.map(String), sender: 'ws', - senderEndpoint: 'ws://localhost:3000', + senderEndpoint: `ws://localhost:${port}`, }) std.log('waiting for message being bridged')