Skip to content

Commit

Permalink
fix mqtt
Browse files Browse the repository at this point in the history
  • Loading branch information
milesstoetzner committed Sep 3, 2023
1 parent c89c155 commit b8a149d
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 12 deletions.
5 changes: 5 additions & 0 deletions src/core/bridge.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {Receiver} from '#/receiver/receiver'
import {Sender} from '#/sender/sender'
import * as assert from '#assert'
import std from '#std'
import hae from '#utils/hae'

Expand Down Expand Up @@ -28,6 +29,10 @@ export class Bridge {
std.log('bridge started')
await this.receiver.receive(
hae.log(async message => {
assert.isNumber(message.id)
assert.isArray(message.data)
message.data.forEach(assert.isNumber)

std.log('bridging', {message})
await this.sender.send(message)
if (!this.receiver.continuous) await this.stop()
Expand Down
10 changes: 9 additions & 1 deletion src/receiver/mqtt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,14 @@ export class MQTTReceiver extends Receiver {
}

async stop() {
if (check.isDefined(this.server)) this.server.close()
std.log('stopping mqtt server')
if (check.isUndefined(this.server)) return std.log('mqtt server not defined')

const server = this.server
return new Promise<void>((resolve, reject) => {
server.close(error => {
std.log('mqtt server stopped')
})
})
}
}
3 changes: 3 additions & 0 deletions src/sender/file.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import {Sender} from '#/sender/sender'
import {Message} from '#/types'
import * as files from '#files'
import std from '#std'

export type FileSenderOptions = {
file: string
Expand All @@ -15,7 +16,9 @@ export class FileSender extends Sender {
}

async send(message: Message) {
std.log('writing to file')
await files.createFile(this.options.file)
await files.appendFile(this.options.file, JSON.stringify(message) + '\n')
std.log('wrote to file')
}
}
15 changes: 7 additions & 8 deletions src/sender/mqtt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,37 +29,36 @@ export class MQTTSender extends Sender {
std.log(`mqtt client subscribed`)

this.client.on('error', error => {
std.log(`websocket client errored`, {error})
std.log(`mqtt client errored`, {error})
})

this.client.on('disconnect', () => {
std.log(`websocket client disconnected`)
std.log(`mqtt client disconnected`)
})

this.client.on('offline', () => {
std.log(`websocket client offline`)
std.log(`mqtt client offline`)
})

this.client.on('close', () => {
std.log(`websocket client closed`)
std.log(`mqtt client closed`)
})

this.client.on('end', () => {
std.log(`websocket client ended`)
std.log(`mqtt client ended`)
})

this.resolveReady()
}

async send(message: Message) {
assert.isDefined(this.client, 'websocket sender not started')
assert.isDefined(this.client, 'mqtt sender not started')
await this.client.publishAsync(this.options.topic, JSON.stringify(message))
}

async stop() {
// TODO: wait for disconnect
std.log('stopping mqtt client')
if (check.isDefined(this.client)) this.client.end()
if (check.isDefined(this.client)) await this.client.endAsync(true)
std.log('mqtt client stopped')
}
}
9 changes: 8 additions & 1 deletion tests/mqtt/mqtt.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ describe('mqtt', () => {
it('sender-receiver', async () => {
const message: Message = {id: 69, data: [1, 2, 3]}
const output = files.temporary()
const port = 3000

// Start mqtt receiver with file sender
const receiver = await actions.createBridge({
receiver: 'mqtt',
receiverPort: String(port),
sender: 'file',
senderFile: output,
})
Expand All @@ -23,7 +25,7 @@ describe('mqtt', () => {
receiverId: String(message.id),
receiverData: message.data.map(String),
sender: 'mqtt',
senderEndpoint: 'mqtt://localhost:3000',
senderEndpoint: `mqtt://localhost:${port}`,
})

std.log('waiting for message being bridged')
Expand All @@ -34,5 +36,10 @@ describe('mqtt', () => {
await files.deleteFile(output)
await sender.stop()
await receiver.stop()

await utils.sleep(1000)
})
})

// TODO: does not work anymore
// TODO: something at mqtt is not correctly stopped (or acked?) when using "yarn test"
4 changes: 3 additions & 1 deletion tests/socket-io/socket-io.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ describe('socket-io', () => {
it('sender-receiver', async () => {
const message: Message = {id: 69, data: [1, 2, 3]}
const output = files.temporary()
const port = 3001

// Start socket-io receiver with file sender
// TODO: if an error is thrown then the test does not abort ...
const receiver = await actions.createBridge({
receiver: 'socket-io',
receiverPort: String(port),
sender: 'file',
senderFile: output,
})
Expand All @@ -24,7 +26,7 @@ describe('socket-io', () => {
receiverId: String(message.id),
receiverData: message.data.map(String),
sender: 'socket-io',
senderEndpoint: 'http://localhost:3000',
senderEndpoint: `http://localhost:${port}`,
})

std.log('waiting for message being bridged')
Expand Down
4 changes: 3 additions & 1 deletion tests/ws/ws.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ describe('websocket', () => {
it('sender-receiver', async () => {
const message: Message = {id: 69, data: [1, 2, 3]}
const output = files.temporary()
const port = 3002

// Start websocket receiver with file sender
const receiver = await actions.createBridge({
receiver: 'ws',
receiverPort: String(port),
sender: 'file',
senderFile: output,
})
Expand All @@ -23,7 +25,7 @@ describe('websocket', () => {
receiverId: String(message.id),
receiverData: message.data.map(String),
sender: 'ws',
senderEndpoint: 'ws://localhost:3000',
senderEndpoint: `ws://localhost:${port}`,
})

std.log('waiting for message being bridged')
Expand Down

0 comments on commit b8a149d

Please sign in to comment.