Skip to content

Commit

Permalink
Export of mqttCon and persistence
Browse files Browse the repository at this point in the history
  • Loading branch information
seriousme committed Feb 8, 2025
1 parent d0f334a commit e856584
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 14 deletions.
21 changes: 17 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ persistence.

## Example

A simple server example using Deno, for more elaborate examples see the Usage
section below.
A simple server example using Deno.

```typescript
import { MqttServer } from "./server/mod.ts";
Expand All @@ -26,6 +25,9 @@ for await (const conn of this.listener) {
}
```

A more elaborate example including client and server can be found in the
[examples](/examples/) folder.

## Architecture

1. The basis of Opifex is the MQTT packet module
Expand Down Expand Up @@ -76,8 +78,6 @@ If you want to change the behaviour of the server and/or the client beyond what
can be done with CLI options then the next step is to clone the demo server
and/or the client scripts and modify them to your liking.

An example can be found in the [examples](/examples/) folder.

If you want to port the platform independent client and server libs to other
types of transport (e.g. Unix sockets or websocketstream) then its recommended
to clone and modify the platform specific code in `/node` or `/deno` as well.
Expand All @@ -88,6 +88,19 @@ inspiration.

Bun (as of version 1.2) is capable of running the NodeJS version.

## Exports

| Export | Description |
| ---------------------------- | ------------------------------------------------------- |
| @seriousme/opifex/tcpClient | Exports a MQTT over TCP client |
| @seriousme/opifex/tcpServer | Exports a MQTT over TCP server |
| @seriousme/opifex/server | Exports a transport agnostic MQTT server |
| @seriousme/opifex/client | Exports a transport agnostic MQTT client |
| @seriousme/opifex/client | Exports an Typescript interface for storage persistence |
| @seriousme/opifex/mqttConn | Exports MQTT connection handling |
| @seriousme/opifex/mqttPacket | Exports MQTT packet handling |
| @seriousme/opifex/utils | Exports various utilities |

## Naming

Some MQTT servers have names like:
Expand Down
60 changes: 58 additions & 2 deletions mqttConn/mqttConn.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
/**
* @module mqttConn
*/

import {
type AnyPacket,
decodePayload,
Expand All @@ -10,21 +14,39 @@ import { assert } from "../utils/mod.ts";
import type { SockConn } from "../socket/socket.ts";
import { Conn } from "../socket/socket.ts";

/**
* Common MQTT connection error messages
*/
export const MqttConnError = {
invalidPacket: "Invalid Packet",
packetTooLarge: "Packet too large",
UnexpectedEof: "Unexpected EOF",
} as const;

/**
* Interface for MQTT connection handling
*/
export interface IMqttConn extends AsyncIterable<AnyPacket> {
/** Underlying connection */
readonly conn: Conn;
/** Whether connection is closed */
readonly isClosed: boolean;
/** Reason for connection closure if any */
readonly reason: string | undefined;
/** Async iterator for receiving packets */
[Symbol.asyncIterator](): AsyncIterableIterator<AnyPacket>;
/** Send an MQTT packet */
send(data: AnyPacket): Promise<void>;
/** Close the connection */
close(): void;
}

/**
* Read a single byte from the connection
* @param conn Connection to read from
* @returns Single byte as number
* @throws Error if EOF reached unexpectedly
*/
async function readByte(conn: Conn): Promise<number> {
const buf = new Uint8Array(1);
const bytesRead = await conn.read(buf);
Expand All @@ -33,6 +55,12 @@ async function readByte(conn: Conn): Promise<number> {
return buf[0];
}

/**
* Read exact number of bytes into buffer
* @param conn Connection to read from
* @param buf Buffer to read into
* @throws Error if EOF reached unexpectedly
*/
async function readFull(conn: Conn, buf: Uint8Array): Promise<void> {
let bytesRead = 0;
while (bytesRead < buf.length) {
Expand All @@ -43,8 +71,12 @@ async function readFull(conn: Conn, buf: Uint8Array): Promise<void> {
}
}

/** Read MQTT packet
* @throws `Error` if packet is invalid
/**
* Read a complete MQTT packet from the connection
* @param conn Connection to read from
* @param maxPacketSize Maximum allowed packet size
* @returns Decoded MQTT packet
* @throws Error if packet is invalid or too large
*/
export async function readPacket(
conn: Conn,
Expand All @@ -70,12 +102,25 @@ export async function readPacket(
return packet;
}

/**
* MQTT Connection class implementing IMqttConn interface
*/
export class MqttConn implements IMqttConn {
/** Underlying connection */
readonly conn: Conn;
/** Maximum allowed packet size */
private readonly maxPacketSize: number;
/** Reason for connection closure if any */
private _reason: string | undefined = undefined;
/** Whether connection is closed */
private _isClosed = false;

/**
* Create new MQTT connection
* @param options Connection options
* @param options.conn Underlying socket connection
* @param options.maxPacketSize Maximum allowed packet size (default 2MB)
*/
constructor({
conn,
maxPacketSize,
Expand All @@ -87,10 +132,15 @@ export class MqttConn implements IMqttConn {
this.maxPacketSize = maxPacketSize || 2 * 1024 * 1024;
}

/** Get reason for connection closure */
get reason(): string | undefined {
return this._reason;
}

/**
* Async iterator for receiving packets
* @yields MQTT packets
*/
async *[Symbol.asyncIterator](): AsyncIterableIterator<AnyPacket> {
while (!this._isClosed) {
try {
Expand All @@ -109,6 +159,10 @@ export class MqttConn implements IMqttConn {
}
}

/**
* Send an MQTT packet
* @param data Packet to send
*/
async send(data: AnyPacket): Promise<void> {
try {
await this.conn.write(encode(data));
Expand All @@ -120,10 +174,12 @@ export class MqttConn implements IMqttConn {
}
}

/** Whether connection is closed */
get isClosed(): boolean {
return this._isClosed;
}

/** Close the connection */
close(): void {
if (this.isClosed) return;
try {
Expand Down
16 changes: 10 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,21 +32,25 @@
"dist/**/*.d.ts.map"
],
"exports": {
"./tcpClient": "./dist/node/tcpClient.js",
"./tcpServer": "./dist/node/tcpServer.js",
"./server": "./dist/server/mod.js",
"./client": "./dist/client/mod.js",
"./persistence": "./dist/persistence/persistence.js",
"./mqttConn": "./dist/mqttConn/mod.js",
"./mqttPacket": "./dist/mqttPacket/mod.js",
"./utils": "./dist/utils/mod.js",
"./tcpClient": "./dist/node/tcpClient.js",
"./tcpServer": "./dist/node/tcpServer.js"
"./utils": "./dist/utils/mod.js"
},
"deno": {
"exports": {
"./tcpClient": "./deno/tcpClient.ts",
"./tcpServer": "./deno/tcpServer.ts",
"./server": "./server/mod.ts",
"./client": "./client/mod.ts",
"./persistence": "./persistence/persistence.ts",
"./mqttConn": "./mqttConn/mod.ts",
"./mqttPacket": "./mqttPacket/mod.ts",
"./utils": "./utils/mod.ts",
"./tcpClient": "./deno/tcpClient.ts",
"./tcpServer": "./deno/tcpServer.ts"
"./utils": "./utils/mod.ts"
},
"exclude": [
"dist"
Expand Down
68 changes: 66 additions & 2 deletions persistence/persistence.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,88 @@
/**
* @module persistence
* @description Module for handling MQTT message persistence and client management
*/
import type { ClientId, PublishPacket, QoS, Topic } from "./deps.ts";
import type { IStore } from "./store.ts";

/**
* Handler function type for processing publish packets
* @callback Handler
* @param {PublishPacket} packet - The MQTT publish packet to handle
*/
export type Handler = (packet: PublishPacket) => void;

/**
* Store type for retained messages mapped by topic
* @typedef {Map<Topic, PublishPacket>} RetainStore
*/
export type RetainStore = Map<Topic, PublishPacket>;

/**
* Client type containing message store and packet handler
* @typedef {Object} Client
* @property {IStore} store - The client's message store
* @property {Handler} handler - The client's packet handler function
*/
export type Client = { store: IStore; handler: Handler };

/**
* IPersistence is the interface for various types of Persistence
* to store messages and subscriptions
* Interface for persistence implementations to store messages and subscriptions
* @interface IPersistence
*/
export interface IPersistence {
/**
* Map of connected clients by client ID
* @type {Map<ClientId, Client>}
*/
clientList: Map<ClientId, Client>;

/**
* Map of retained messages by topic
* @type {RetainStore}
*/
retained: RetainStore;

/**
* Register a new client with the persistence layer
* @param {ClientId} clientId - Unique identifier for the client
* @param {Handler} handler - Packet handler function for the client
* @param {boolean} clean - Whether to start with a clean session
* @returns {IStore} The client's message store
*/
registerClient(clientId: ClientId, handler: Handler, clean: boolean): IStore;

/**
* Remove a client from the persistence layer
* @param {ClientId} clientId - ID of client to deregister
*/
deregisterClient(clientId: ClientId): void;

/**
* Publish a message to all subscribed clients
* @param {Topic} topic - Topic to publish to
* @param {PublishPacket} packet - Packet containing the message
*/
publish(topic: Topic, packet: PublishPacket): void;

/**
* Subscribe a client to a topic
* @param {IStore} store - Client's message store
* @param {Topic} topic - Topic to subscribe to
* @param {QoS} qos - Quality of Service level
*/
subscribe(store: IStore, topic: Topic, qos: QoS): void;

/**
* Unsubscribe a client from a topic
* @param {IStore} store - Client's message store
* @param {Topic} topic - Topic to unsubscribe from
*/
unsubscribe(store: IStore, topic: Topic): void;

/**
* Send retained messages to a client
* @param {ClientId} clientId - ID of client to send retained messages to
*/
handleRetained(clientId: ClientId): void;
}

0 comments on commit e856584

Please sign in to comment.