Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
milesstoetzner committed Sep 22, 2023
1 parent 70e26ba commit 52cc3b5
Show file tree
Hide file tree
Showing 13 changed files with 85 additions and 30 deletions.
7 changes: 6 additions & 1 deletion src/bus/socketio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import Bus from '#/bus/bus'
import * as check from '#check'
import {JSONMessage} from '#core/message'
import std from '#std'
import hae from '#utils/hae'
import http from 'http'
import SocketIO from 'socket.io'

Expand Down Expand Up @@ -48,7 +49,11 @@ export class SocketIOBus extends Bus {

async stop() {
std.log('stopping socketio bus')
await this.stopServer()

await hae.try(async () => {
await this.stopServer()
}, 'problem while stopping socketio bus server')

std.log('socketio bus stopped')
}

Expand Down
13 changes: 12 additions & 1 deletion src/source/console.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,20 @@ export class ConsoleSource extends Source {
this.options = options
}

async start() {
std.log('starting console source')
this.readyPromise.resolve()
std.log('console source started')
}

async stop() {
std.log('stopping console source')
std.log('console source stopped')
}

async receive(processor: Processor) {
const message = Message.fromJSON(this.options)
std.log('console received', {message})
std.log('console source received', {message})
this.processor = processor
this.processor(message)
}
Expand Down
9 changes: 7 additions & 2 deletions src/source/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import Source from '#/source/source'
import Message from '#core/message'
import std from '#std'
import * as check from '#utils/check'
import hae from '#utils/hae'
import {Tail} from 'tail'

export type FileSourceOptions = {
Expand Down Expand Up @@ -39,8 +40,12 @@ export class FileSource extends Source {

async stop() {
std.log('stopping file source')
if (check.isUndefined(this.source)) return std.log('file source not defined')
this.source.unwatch()

await hae.try(async () => {
if (check.isUndefined(this.source)) return std.log('file source not defined')
this.source.unwatch()
}, 'problem while stopping file source')

std.log('file source stopped')
}
}
4 changes: 3 additions & 1 deletion src/source/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ export class HTTPSource extends Source {

async stop() {
std.log('stopping http source')
await this.stopSource()
await hae.try(async () => {
await this.stopSource()
}, 'problem while stopping http source')
std.log('http source stopped')
}

Expand Down
9 changes: 7 additions & 2 deletions src/source/mqtt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import Source from '#/source/source'
import Message from '#core/message'
import std from '#std'
import * as check from '#utils/check'
import hae from '#utils/hae'
import Aedes from 'aedes'
import net from 'net'

Expand Down Expand Up @@ -95,11 +96,15 @@ export class MQTTSource extends Source {
std.log('stopping mqtt source')

std.log('stopping mqtt aedes server')
await this.stopAedes()
await hae.try(async () => {
await this.stopAedes()
}, 'problem when stopping mqtt aedes server')
std.log('mqtt aeded server stopped')

std.log('stopping mqtt http server')
await this.stopServer()
await hae.try(async () => {
await this.stopServer()
}, 'problem when stopping mqtt http server')
std.log('mqtt http server stopped')

std.log('mqtt source stopped')
Expand Down
6 changes: 4 additions & 2 deletions src/source/socketio.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import Source from '#/source/source'
import Message, {JSONMessage} from '#core/message'
import std from '#std'
import * as check from '#utils/check'
import hae from '#utils/hae'
import http from 'http'
import SocketIO from 'socket.io'

Expand Down Expand Up @@ -54,7 +55,6 @@ export class SocketIOSource extends Source {
std.log('sending socketio source')
if (!this.options.bidirectional) return std.log('socketio source not bidirectional')

// TODO: broadcast?
if (check.isUndefined(this.socket)) return std.log('socketio socket not defined')
this.socket.emit(this.options.event, message.toJSON())

Expand All @@ -63,7 +63,9 @@ export class SocketIOSource extends Source {

async stop() {
std.log('stopping socketio source')
await this.stopServer()
await hae.try(async () => {
await this.stopServer()
}, 'problem when stopping socketio http server')
std.log('socketio source stopped')
}

Expand Down
8 changes: 2 additions & 6 deletions src/source/source.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,13 @@ export default abstract class Source {
this.readyPromise = utils.createDecomposedPromise()
}

async start() {
this.readyPromise.resolve()
}
abstract start(): Promise<void>

async ready() {
return this.readyPromise.promise
}

async stop() {
// nil
}
abstract stop(): Promise<void>

async receive(processor: Processor) {
this.processor = processor
Expand Down
5 changes: 4 additions & 1 deletion src/source/ws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import Source from '#/source/source'
import Message from '#core/message'
import std from '#std'
import * as check from '#utils/check'
import hae from '#utils/hae'
import http from 'http'
import {WebSocket, WebSocketServer} from 'ws'

Expand Down Expand Up @@ -57,7 +58,9 @@ export class WSSource extends Source {

async stop() {
std.log('stopping websocket source')
await this.stopServer()
await hae.try(async () => {
await this.stopServer()
}, 'problem when stopping websocket http server')
std.log('websocket source stopped')
}

Expand Down
13 changes: 5 additions & 8 deletions src/target/can.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import * as assert from '#assert'
import * as check from '#check'
import Message, {CANMessage} from '#core/message'
import std from '#std'
import hae from '#utils/hae'
import {RawChannel} from '*can.node'
import * as can from 'socketcan'

Expand Down Expand Up @@ -47,14 +48,10 @@ export class CANTarget extends Target {

async stop() {
std.log('stopping can target')
if (check.isUndefined(this.target)) return std.log('can target undefined')
try {
// TODO: does this have a site-effect on the os?
await hae.try(async () => {
if (check.isUndefined(this.target)) return std.log('can target undefined')
this.target.stop()
std.log('can target stopped')
} catch (error) {
// TODO: why doesnt this throw the same error when stopping the can server
std.log('stopping can target failed', {error: error})
}
}, 'problem when stopping can target')
std.log('can target stopped')
}
}
11 changes: 11 additions & 0 deletions src/target/console.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,20 @@ export class ConsoleTarget extends Target {
super()
}

async start() {
std.log('starting console target')
this.readyPromise.resolve()
std.log('console target started')
}

async send(message: Message) {
std.log('console target sending', {message})
std.out(message.toString())
std.log('console target sent')
}

async stop() {
std.log('stopping console target')
std.log('console target stopped')
}
}
11 changes: 11 additions & 0 deletions src/target/file.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,17 @@ export class FileTarget extends Target {
this.options = options
}

async start() {
std.log('starting file target')
this.readyPromise.resolve()
std.log('file target started')
}

async stop() {
std.log('stopping file target')
std.log('file target stopped')
}

async send(message: Message) {
std.log('file target sending', {message})
await files.createFile(this.options.file)
Expand Down
11 changes: 11 additions & 0 deletions src/target/http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,15 @@ export class HTTPTarget extends Target {
})
std.log('http target sent')
}

async start() {
std.log('starting http target')
this.readyPromise.resolve()
std.log('http target started')
}

async stop() {
std.log('stopping http target')
std.log('http target stopped')
}
}
8 changes: 2 additions & 6 deletions src/target/target.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,13 @@ export default abstract class Target {
this.readyPromise = utils.createDecomposedPromise()
}

async start() {
this.readyPromise.resolve()
}
abstract start(): Promise<void>

async ready() {
return this.readyPromise.promise
}

async stop() {
// nil
}
abstract stop(): Promise<void>

abstract send(message: Message): Promise<void>

Expand Down

0 comments on commit 52cc3b5

Please sign in to comment.