diff --git a/deno.lock b/deno.lock index f1716a6d2..ae9565bae 100644 --- a/deno.lock +++ b/deno.lock @@ -1,11 +1,33 @@ { "version": "5", "specifiers": { + "jsr:@cliffy/ansi@1.0.0-rc.7": "1.0.0-rc.7", + "jsr:@cliffy/command@1.0.0-rc.7": "1.0.0-rc.7", + "jsr:@cliffy/flags@1.0.0-rc.7": "1.0.0-rc.7", + "jsr:@cliffy/internal@1.0.0-rc.7": "1.0.0-rc.7", + "jsr:@cliffy/keycode@1.0.0-rc.7": "1.0.0-rc.7", + "jsr:@cliffy/prompt@1.0.0-rc.7": "1.0.0-rc.7", + "jsr:@cliffy/table@1.0.0-rc.7": "1.0.0-rc.7", + "jsr:@std/assert@~1.0.6": "1.0.16", + "jsr:@std/collections@^1.1.3": "1.1.3", "jsr:@std/encoding@^1.0.10": "1.0.10", + "jsr:@std/encoding@~1.0.5": "1.0.10", "jsr:@std/encoding@~1.0.8": "1.0.10", + "jsr:@std/fmt@^1.0.5": "1.0.8", + "jsr:@std/fmt@~1.0.2": "1.0.8", + "jsr:@std/fs@^1.0.11": "1.0.20", "jsr:@std/internal@^1.0.12": "1.0.12", + "jsr:@std/io@~0.224.9": "0.224.9", + "jsr:@std/io@~0.225.2": "0.225.2", + "jsr:@std/log@0.224": "0.224.14", + "jsr:@std/path@*": "1.1.3", "jsr:@std/path@^1.1.3": "1.1.3", + "jsr:@std/path@~1.0.6": "1.0.9", + "jsr:@std/text@~1.0.7": "1.0.16", + "jsr:@std/toml@*": "1.0.11", + "jsr:@sylc/dkill@~0.12.3": "0.12.3", "npm:@aurowallet/mina-provider@^1.0.12": "1.0.12", + "npm:@bloxbean/yaci-devkit@*": "0.10.6", "npm:@bloxbean/yaci-devkit@0.10.6": "0.10.6", "npm:@cardano-foundation/cardano-verify-datasignature@1.0.11": "1.0.11", "npm:@coderspirit/nominal@^4.1.1": "4.1.1_typescript@5.6.3", @@ -95,6 +117,7 @@ "npm:@sinclair/typebox@*": "0.34.41", "npm:@sinclair/typebox@0.34.41": "0.34.41", "npm:@subsquid/ss58-codec@^1.2.3": "1.2.3", + "npm:@txpipe/dolos@*": "0.19.1", "npm:@txpipe/dolos@0.19.1": "0.19.1", "npm:@types/node@*": "24.2.0", "npm:@types/react@18.3.1": "18.3.1", @@ -161,6 +184,7 @@ "npm:mina-signer@^3.0.7": "3.1.0", "npm:mqtt-pattern@^2.1.0": "2.1.0", "npm:mqtt@^5.14.0": "5.14.1", + "npm:mqtt@^5.4.1": "5.14.1", "npm:ntp-time-sync@0.5": "0.5.0", "npm:pg-gateway@~0.3.0-beta.4": "0.3.0-beta.4", "npm:pg-tx@^1.0.1": "1.0.1", @@ -197,6 +221,7 @@ "npm:vite-plugin-static-copy@^3.1.1": "3.1.4_vite@7.1.3__picomatch@4.0.3_@types+node@24.2.0", "npm:vite-plugin-top-level-await@^1.6.0": "1.6.0_vite@7.1.3__picomatch@4.0.3_@types+node@24.2.0", "npm:vite-plugin-wasm@^3.5.0": "3.5.0_vite@7.1.3__picomatch@4.0.3_@types+node@24.2.0", + "npm:vite@*": "7.1.3_picomatch@4.0.3_@types+node@24.2.0", "npm:vite@7.1.3": "7.1.3_picomatch@4.0.3_@types+node@24.2.0", "npm:vitest@^3.2.4": "3.2.4_vite@7.1.3__picomatch@4.0.3_@types+node@24.2.0", "npm:wagmi@^2.16.9": "2.19.5_@tanstack+react-query@5.90.12__react@18.3.1_react@18.3.1_typescript@5.6.3_viem@2.37.3__typescript@5.6.3__ws@8.18.3___bufferutil@4.1.0___utf-8-validate@5.0.10_@wagmi+core@2.22.1__typescript@5.6.3__viem@2.37.3___typescript@5.6.3___ws@8.18.3____bufferutil@4.1.0____utf-8-validate@5.0.10__@types+react@18.3.1__react@18.3.1__use-sync-external-store@1.4.0___react@18.3.1_@types+react@18.3.1_use-sync-external-store@1.4.0__react@18.3.1_ws@8.18.1", @@ -208,17 +233,111 @@ "npm:ws@^8.18.3": "8.18.3_bufferutil@4.1.0_utf-8-validate@5.0.10" }, "jsr": { + "@cliffy/ansi@1.0.0-rc.7": { + "integrity": "f71c921cce224c13d322e5cedba4f38e8f7354c7d855c9cb22729362a53f25aa", + "dependencies": [ + "jsr:@cliffy/internal", + "jsr:@std/encoding@~1.0.5", + "jsr:@std/io@~0.224.9" + ] + }, + "@cliffy/command@1.0.0-rc.7": { + "integrity": "1288808d7a3cd18b86c24c2f920e47a6d954b7e23cadc35c8cbd78f8be41f0cd", + "dependencies": [ + "jsr:@cliffy/flags", + "jsr:@cliffy/internal", + "jsr:@cliffy/table", + "jsr:@std/fmt@~1.0.2", + "jsr:@std/text" + ] + }, + "@cliffy/flags@1.0.0-rc.7": { + "integrity": "318d9be98f6a6417b108e03dec427dea96cdd41a15beb21d2554ae6da450a781", + "dependencies": [ + "jsr:@std/text" + ] + }, + "@cliffy/internal@1.0.0-rc.7": { + "integrity": "10412636ab3e67517d448be9eaab1b70c88eba9be22617b5d146257a11cc9b17" + }, + "@cliffy/keycode@1.0.0-rc.7": { + "integrity": "5b3f6c33994e81a76b79f108b1989642ac22705840da33781f7972d7dff05503" + }, + "@cliffy/prompt@1.0.0-rc.7": { + "integrity": "a9cbd13acd8073558447cae8ca4cf593c09d23bcbe429cc63346920c21187b83", + "dependencies": [ + "jsr:@cliffy/ansi", + "jsr:@cliffy/internal", + "jsr:@cliffy/keycode", + "jsr:@std/assert", + "jsr:@std/fmt@~1.0.2", + "jsr:@std/io@~0.224.9", + "jsr:@std/path@~1.0.6", + "jsr:@std/text" + ] + }, + "@cliffy/table@1.0.0-rc.7": { + "integrity": "9fdd9776eda28a0b397981c400eeb1aa36da2371b43eefe12e6ff555290e3180", + "dependencies": [ + "jsr:@std/fmt@~1.0.2" + ] + }, + "@std/assert@1.0.16": { + "integrity": "6a7272ed1eaa77defe76e5ff63ca705d9c495077e2d5fd0126d2b53fc5bd6532" + }, + "@std/collections@1.1.3": { + "integrity": "bf8b0818886df6a32b64c7d3b037a425111f28278d69fd0995aeb62777c986b0" + }, "@std/encoding@1.0.10": { "integrity": "8783c6384a2d13abd5e9e87a7ae0520a30e9f56aeeaa3bdf910a3eaaf5c811a1" }, + "@std/fmt@1.0.8": { + "integrity": "71e1fc498787e4434d213647a6e43e794af4fd393ef8f52062246e06f7e372b7" + }, + "@std/fs@1.0.20": { + "integrity": "e953206aae48d46ee65e8783ded459f23bec7dd1f3879512911c35e5484ea187" + }, "@std/internal@1.0.12": { "integrity": "972a634fd5bc34b242024402972cd5143eac68d8dffaca5eaa4dba30ce17b027" }, + "@std/io@0.224.9": { + "integrity": "4414664b6926f665102e73c969cfda06d2c4c59bd5d0c603fd4f1b1c840d6ee3" + }, + "@std/io@0.225.2": { + "integrity": "3c740cd4ee4c082e6cfc86458f47e2ab7cb353dc6234d5e9b1f91a2de5f4d6c7" + }, + "@std/log@0.224.14": { + "integrity": "257f7adceee3b53bb2bc86c7242e7d1bc59729e57d4981c4a7e5b876c808f05e", + "dependencies": [ + "jsr:@std/fmt@^1.0.5", + "jsr:@std/fs", + "jsr:@std/io@~0.225.2" + ] + }, + "@std/path@1.0.9": { + "integrity": "260a49f11edd3db93dd38350bf9cd1b4d1366afa98e81b86167b4e3dd750129e" + }, "@std/path@1.1.3": { "integrity": "b015962d82a5e6daea980c32b82d2c40142149639968549c649031a230b1afb3", "dependencies": [ "jsr:@std/internal" ] + }, + "@std/text@1.0.16": { + "integrity": "ddb9853b75119a2473857d691cf1ec02ad90793a2e8b4a4ac49d7354281a0cf8" + }, + "@std/toml@1.0.11": { + "integrity": "e084988b872ca4bad6aedfb7350f6eeed0e8ba88e9ee5e1590621c5b5bb8f715", + "dependencies": [ + "jsr:@std/collections" + ] + }, + "@sylc/dkill@0.12.3": { + "integrity": "7321d192fed6b09ad5bb1b7f6a4a3b89fcde50cecf5529914548415535a68704", + "dependencies": [ + "jsr:@cliffy/command", + "jsr:@cliffy/prompt" + ] } }, "npm": { @@ -19160,6 +19279,7 @@ "npm:ecpair@3", "npm:effection@^3.5.0", "npm:fastify@^5.4.0", + "npm:mqtt@^5.4.1", "npm:pg@^8.14.0", "npm:rxjs@*", "npm:tiny-secp256k1@^2.2.4", @@ -19304,11 +19424,14 @@ "npm:@midnight-ntwrk/wallet@5.0.0", "npm:@midnight-ntwrk/zswap@4.0.0", "npm:@sinclair/typebox@0.34.41", + "npm:aedes-server-factory@~0.2.1", + "npm:aedes@~0.51.3", "npm:bitcoinjs-lib@^6.1.5", "npm:bitcoinjs-message@^2.2.0", "npm:ecpair@^2.1.0", "npm:effection@^3.5.0", "npm:fastify@^5.4.0", + "npm:ip@^2.0.1", "npm:rxjs@7.8.2", "npm:tiny-secp256k1@^2.2.1", "npm:varuint-bitcoin@^1.1.2", diff --git a/docs/site/docs/home/100-components/108-batcher/1220-core-concepts.md b/docs/site/docs/home/100-components/108-batcher/1220-core-concepts.md index 967435b48..ef65ad758 100644 --- a/docs/site/docs/home/100-components/108-batcher/1220-core-concepts.md +++ b/docs/site/docs/home/100-components/108-batcher/1220-core-concepts.md @@ -214,6 +214,12 @@ const config: PaimaBatcherConfig = { enableHttpServer: true, port: 3334, enableEventSystem: true, + mqtt: { + enabled: true, + port: 8883, + host: "0.0.0.0", + retainLastMessage: true, + }, }; ``` @@ -253,6 +259,96 @@ const config: PaimaBatcherConfig = { }; ``` +### Realtime MQTT Updates + +The batcher can host a lightweight MQTT broker (powered by [Aedes](https://github.com/moscajs/aedes)) so React or mobile clients can subscribe to lifecycle updates for each submitted input. + +```typescript +const config: PaimaBatcherConfig = { + // ... + enableEventSystem: true, + mqtt: { + enabled: true, + port: 8883, // TCP + WebSocket listeners + host: "0.0.0.0", // Listen on all interfaces + retainLastMessage: true, // keep the latest state for late subscribers + }, +}; +``` + +Every successful `POST /send-input` response now includes an `inputId`: + +```json +{ + "success": true, + "message": "Input processed successfully", + "inputsProcessed": 1, + "transactionHash": "0x...", + "inputId": "ef07c6dd-e9c4-4bd7-99fb-9eae17f0f4c0" +} +``` + +Use that identifier to subscribe to the topic `batcher/inputs/{inputId}`. Each payload contains the phase, optional transaction metadata, and any errors: + +```json +{ + "inputId": "ef07c6dd-e9c4-4bd7-99fb-9eae17f0f4c0", + "target": "ethereum", + "phase": "receipt", + "txHash": "0x...", + "blockNumber": 19823423, + "time": 1734629912000 +} +``` + +Phases include `accepted`, `submitted`, `receipt`, `effectstream-processed`, and `error`. + +Clients can subscribe with [MQTT.js](https://github.com/mqttjs/MQTT.js): + +```typescript +import mqtt from "mqtt"; + +const client = mqtt.connect("ws://localhost:8883"); +const topic = `batcher/inputs/${inputId}`; + +client.on("connect", () => { + client.subscribe(topic, (err) => { + if (err) console.error("Subscription failed", err); + }); +}); + +client.on("message", (t, payload) => { + if (t !== topic) return; + const update = JSON.parse(payload.toString()); + // React components can set state directly from update.phase +}); +``` + +If you already consume batcher lifecycle events via `addStateTransition`, there is also a typed `input:update` event with the same payload for local observability or custom bridges. + +#### MQTT Security: Publish Authorization + +The batcher implements a **security-first approach** to MQTT publish operations. Only the batcher itself (localhost connections) can publish messages to prevent unauthorized actors from tampering with input status updates. + +**How it works:** + +The MQTT broker enforces **localhost-only publishing**: + +- ✅ **Localhost connections** (127.0.0.1, ::1) can publish → Only the batcher itself +- ❌ **Remote connections** cannot publish → Always rejected with error + +**Authorization flow for publish attempts:** + +``` +Client publishes → Check if localhost + ├─ If localhost → ✅ Allow + └─ If remote → ❌ Reject with error +``` + +This ensures that only the batcher process can update input statuses. There is no configuration option to disable this security — it's always enforced. + +**Subscribing remains unrestricted** — all clients (local or remote) can subscribe to topics to receive real-time status updates. This allows frontend apps running on different machines to monitor input progress without being able to forge status messages. + **What happens?** 1. Batcher queues inputs until it has **100 inputs** (criteria threshold) 2. When processing, `buildBatchData()` serializes inputs until reaching **10KB** (adapter limit) diff --git a/docs/site/docs/home/100-components/108-batcher/1240-configuration.md b/docs/site/docs/home/100-components/108-batcher/1240-configuration.md index 27ca9dcec..e1159a645 100644 --- a/docs/site/docs/home/100-components/108-batcher/1240-configuration.md +++ b/docs/site/docs/home/100-components/108-batcher/1240-configuration.md @@ -154,6 +154,52 @@ const baseConfig: PaimaBatcherConfig = { Set `pollingIntervalMs` to match your shortest batching time window. For example, if using `timeWindowMs: 5000`, a `pollingIntervalMs` of 1000 ensures responsive batch submission. ::: +#### MQTT Configuration + +The `mqtt` object controls the embedded MQTT broker for real-time input status updates: + +```typescript +const config: PaimaBatcherConfig = { + // ... + enableEventSystem: true, // Required for MQTT to work + mqtt: { + enabled: true, // Enable the MQTT broker + port: 8883, // Port for TCP + WebSocket + host: "0.0.0.0", // Listen on all interfaces + retainLastMessage: true, // Keep latest state for late subscribers + }, +}; +``` + +| Setting | Type | Default | Description | +|---------|------|---------|-------------| +| `mqtt.enabled` | `boolean` | `false` | Whether to start the embedded MQTT broker | +| `mqtt.port` | `number` | `8883` | Port for MQTT TCP and WebSocket connections | +| `mqtt.host` | `string` | `"0.0.0.0"` | Network interface to bind to (`"0.0.0.0"` = all interfaces, `"127.0.0.1"` = localhost only) | +| `mqtt.retainLastMessage` | `boolean` | `true` | Whether the broker should retain the latest message on each topic | + +**Security Note:** + +The MQTT broker **always** restricts publishing to localhost connections only. Only the batcher itself (running on 127.0.0.1 or ::1) can publish messages. Remote clients can subscribe to receive updates but cannot publish. This security is enforced and cannot be disabled. + +:::warning MQTT + Event System Dependency +The MQTT broker requires `enableEventSystem: true` to function, as it relies on state transition events to publish input status updates. + +```typescript +const config: PaimaBatcherConfig = { + enableEventSystem: true, // ✅ Required + mqtt: { + enabled: true, + // ... + }, +}; +``` +::: + +:::tip +Set `pollingIntervalMs` to match your shortest batching time window. For example, if using `timeWindowMs: 5000`, a `pollingIntervalMs` of 1000 ensures responsive batch submission. +::: + ### Step 2: Instantiate Blockchain Adapters Instantiate each blockchain adapter you want to use: diff --git a/e2e/client/batcher/config.ts b/e2e/client/batcher/config.ts index 9e89fe05d..63a7008e0 100644 --- a/e2e/client/batcher/config.ts +++ b/e2e/client/batcher/config.ts @@ -15,6 +15,12 @@ export const config: BatcherConfig = { confirmationLevel: "wait-effectstream-processed", enableEventSystem: true, // Important for adding state transitions to console logs port, + mqtt: { + enabled: true, + port: 8833, + host: "0.0.0.0", + retainLastMessage: true, + }, }; export const storage = new FileStorage("./batcher-data"); diff --git a/e2e/client/node/deno.json b/e2e/client/node/deno.json index 87d4e1d0b..0bcc934dd 100644 --- a/e2e/client/node/deno.json +++ b/e2e/client/node/deno.json @@ -46,6 +46,7 @@ "@polkadot/util-crypto": "npm:@polkadot/util-crypto@^13.4.3", "bitcoin": "npm:bitcoinjs-lib@^7.0.0", "ecpair": "npm:ecpair@^3.0.0", - "tiny-secp256k1": "npm:tiny-secp256k1@^2.2.4" + "tiny-secp256k1": "npm:tiny-secp256k1@^2.2.4", + "mqtt": "npm:mqtt@^5.4.1" } } \ No newline at end of file diff --git a/e2e/client/node/e2e-tests/e2e.midnight.test.ts b/e2e/client/node/e2e-tests/e2e.midnight.test.ts index c80e8bdd0..13ba0058f 100644 --- a/e2e/client/node/e2e-tests/e2e.midnight.test.ts +++ b/e2e/client/node/e2e-tests/e2e.midnight.test.ts @@ -1,3 +1,5 @@ +import { WebSocket as NodeWebSocket } from "ws"; +import mqtt from "mqtt"; import { assert, assertSQL, type SharedState } from "@e2e/engine"; import type { ContractAddress } from "@midnight-ntwrk/compact-runtime"; import { @@ -41,10 +43,17 @@ import { import type { Client } from "pg"; import { readMidnightContract } from "@effectstream/midnight-contracts/read-contract"; import { dirname, resolve } from "node:path"; +import type { BatcherResponse, InputUpdatePayload } from "@effectstream/batcher"; import { AddressType } from "@effectstream/utils"; const BATCHER_URL = "http://localhost:3334"; -globalThis.WebSocket = WebSocket; +const MQTT_WS_URL = "ws://localhost:8833"; +const MQTT_TIMEOUT_MS = 60_000; + +if (typeof globalThis.WebSocket === "undefined") { + (globalThis as unknown as { WebSocket: typeof NodeWebSocket }).WebSocket = + NodeWebSocket; +} // Inlined common types for standalone script type CounterCircuits = ImpureCircuitId; @@ -91,6 +100,16 @@ class StandaloneConfig implements Config { const GENESIS_MINT_WALLET_SEED = "0000000000000000000000000000000000000000000000000000000000000001"; +const MINT_ACCOUNT = { // Account in Either format + is_left: true, + left: { + bytes: "0x00112233445566778899AABBCCDDEEFF00112233445566778899AABBCCDDEEFF", + }, + right: { + bytes: "0x0000000000000000000000000000000000000000000000000000000000000000", + }, +}; + // Standalone helper functions const counterContractInstance: any = new Counter.Contract( witnesses, @@ -357,16 +376,8 @@ const getContractAddress = async (): Promise => { async function sendMintToBatcher( amount: number | string, confirmationLevel: string = "no-wait", -): Promise { - const account = { - is_left: true, - left: { - bytes: "0x00112233445566778899AABBCCDDEEFF00112233445566778899AABBCCDDEEFF", - }, - right: { - bytes: "0x0000000000000000000000000000000000000000000000000000000000000000", - }, - }; +): Promise { + const account = MINT_ACCOUNT; const input = JSON.stringify({ circuit: "mint", args: [account, amount], @@ -388,13 +399,87 @@ async function sendMintToBatcher( }, body: JSON.stringify(body), }); - const result = await response.json(); + const result = await response.json() as BatcherResponse; if (response.ok) { console.log("Mint sent to batcher successfully"); } else { console.error("[ERROR] Sending mint to batcher:", result); } - return response.status; + return result; +} + +async function waitForMqttPhase( + topic: string, + desiredPhase: InputUpdatePayload["phase"], + timeoutMs: number = MQTT_TIMEOUT_MS, +): Promise { + return await new Promise((resolve, reject) => { + const client = mqtt.connect(MQTT_WS_URL, { + protocolVersion: 4, + reconnectPeriod: 0, + }); + + let settled = false; + const timer = setTimeout(() => { + if (settled) return; + settled = true; + client.end(true, () => + reject( + new Error( + `[MQTT TEST] Timeout waiting for ${desiredPhase} on topic ${topic}`, + ), + ) + ); + }, timeoutMs); + + const finalize = (err?: Error, payload?: InputUpdatePayload) => { + if (settled) return; + settled = true; + clearTimeout(timer); + client.end(true, () => { + if (err) reject(err); + else resolve(payload!); + }); + }; + + client.on("connect", () => { + client.subscribe(topic, (error) => { + if (error) { + finalize( + new Error( + `[MQTT TEST] Failed to subscribe to ${topic}: ${error.message}`, + ), + ); + } + }); + }); + + client.on("message", (incomingTopic, payload) => { + if (incomingTopic !== topic) return; + let parsed: InputUpdatePayload; + try { + parsed = JSON.parse(payload.toString()); + } catch (error) { + finalize( + new Error( + `[MQTT TEST] Unable to parse payload for ${topic}: ${ + (error as Error).message + }`, + ), + ); + return; + } + if (parsed.phase === desiredPhase) { + finalize(undefined, parsed); + } + }); + + client.on("error", (error) => { + finalize( + new Error(`[MQTT TEST] MQTT client error: ${error.message}`), + ); + }); + }); } /** @@ -524,23 +609,30 @@ async function sendMintToBatcherTest( db: Client, sharedState: SharedState, ): Promise { - const status200 = await sendMintToBatcher(20000); - console.log("🪙 Correct input for Mint sent to batcher successfully with status:", status200); + const response = await sendMintToBatcher(20000) as BatcherResponse; + console.log("🪙 Correct input for Mint sent to batcher successfully with status:", response.success); await assert("Send Mint to Batcher Test", async () => { - return status200 === 200; + return response.success; }); +} - const statusBadInput = await sendMintToBatcher("not a number"); - console.log("🪙 Wrong input for Mint sent to batcher successfully:", statusBadInput); - await assert("Send Mint to Batcher Test Bad Input", async () => { - return statusBadInput === 400; - }); +async function testMqttSubscription( + _db: Client, + sharedState: SharedState, +): Promise { + const { inputId } = await sendMintToBatcher(20000); + const topic = `batcher/inputs/${inputId}`; - const statusBadConfirmationLevel = await sendMintToBatcher(20000, "wrong-confirmation-level"); - console.log("🪙 Wrong confirmation level for Mint sent to batcher successfully:", statusBadConfirmationLevel); - await assert("Send Mint to Batcher Test Bad Confirmation Level", async () => { - return statusBadConfirmationLevel === 400; + await assert("MQTT mint effectstream update", async () => { + const update = await waitForMqttPhase(topic, "effectstream-processed"); + if (update.inputId !== inputId || update.phase !== "effectstream-processed") { + throw new Error(`MQTT mint effectstream update failed: ${JSON.stringify(update)}`); + } + else { + sharedState.primitive_accounting_counter++; + return true; + } }); } -export { joinAndIncrementTest, sendMintToBatcherTest }; +export { joinAndIncrementTest, sendMintToBatcherTest, testMqttSubscription }; diff --git a/e2e/client/node/scripts/e2e.test.ts b/e2e/client/node/scripts/e2e.test.ts index 63d706f0d..8e972bb9c 100644 --- a/e2e/client/node/scripts/e2e.test.ts +++ b/e2e/client/node/scripts/e2e.test.ts @@ -8,7 +8,7 @@ import { startup, cleanup, shutdown } from "./e2e.start.ts"; import type { Client } from "pg"; import { accountTests } from "../e2e-tests/e2e.account.test.ts"; import { generalTest } from "../e2e-tests/e2e.general.test.ts"; -import { joinAndIncrementTest, sendMintToBatcherTest } from "../e2e-tests/e2e.midnight.test.ts"; +import { joinAndIncrementTest, sendMintToBatcherTest, testMqttSubscription } from "../e2e-tests/e2e.midnight.test.ts"; import { submitDataWithMessageAvailTest } from "../e2e-tests/e2e.avail.test.ts"; import { testMigrations } from "../e2e-tests/e2e.migrations.ts"; import { RPCTest } from "../e2e-tests/e2e.rpc.test.ts"; @@ -57,6 +57,7 @@ async function test() { ); await joinAndIncrementTest(db, sharedState); await sendMintToBatcherTest(db, sharedState); + await testMqttSubscription(db, sharedState); await submitDataWithMessageAvailTest(db, sharedState); await tokenTests(db, sharedState); if (bitcoin_enabled) { diff --git a/packages/batcher/README.md b/packages/batcher/README.md index 565b1f728..cb1c5787e 100644 --- a/packages/batcher/README.md +++ b/packages/batcher/README.md @@ -291,6 +291,11 @@ batcher.addStateTransition( batcher.addStateTransition("error", ({ phase, error }) => { errorReporter.report(phase, error); }); + +// Watch individual input lifecycles (accepted/submitted/receipt/etc.) +batcher.addStateTransition("input:update", (payload) => { + console.log(`[${payload.phase}] ${payload.inputId} -> ${payload.target}`); +}); ``` ## API Overview @@ -378,3 +383,49 @@ curl -X POST http://localhost:3334/batch-input \ ``` `signature` is optional for chains/adapters that override `verifySignature` (for example Midnight). When omitted, the adapter must implement its own verification semantics. + +Successful responses now include the generated `inputId` so frontend apps can subscribe to MQTT updates: + +```json +{ + "success": true, + "message": "Input processed successfully", + "inputsProcessed": 1, + "transactionHash": "0x...", + "inputId": "ef07c6dd-e9c4-4bd7-99fb-9eae17f0f4c0" +} +``` + +## Realtime client updates via MQTT + +Set `mqtt.enabled: true` to start the embedded Aedes broker (TCP + WebSocket) and emit per-input updates on the topic `batcher/inputs/{inputId}`: + +```typescript +const batcher = createNewBatcher({ + // ... + enableEventSystem: true, + mqtt: { + enabled: true, + port: 8883, + host: "0.0.0.0", + retainLastMessage: true, + }, +}); +``` + +Each payload contains the `phase` (`accepted`, `submitted`, `receipt`, `effectstream-processed`, or `error`), and optional metadata such as `txHash`, `blockNumber`, or `rollup`. + +```typescript +import mqtt from "mqtt"; + +const client = mqtt.connect("ws://localhost:8833"); +const topic = `batcher/inputs/${inputId}`; + +client.on("connect", () => client.subscribe(topic)); +client.on("message", (_topic, msg) => { + const update = JSON.parse(msg.toString()); + console.log(`Input ${update.inputId} is now ${update.phase}`); +}); +``` + +The snippet uses [MQTT.js](https://github.com/mqttjs/MQTT.js), the reference client library for browsers and Node.js. diff --git a/packages/batcher/core/api-types.ts b/packages/batcher/core/api-types.ts new file mode 100644 index 000000000..d97aaa1b2 --- /dev/null +++ b/packages/batcher/core/api-types.ts @@ -0,0 +1,21 @@ +import type { BatcherGrammar } from "./batcher-events.ts"; + +/** + * Response from the batcher `/send-input` endpoint. + */ +export interface BatcherResponse { + success: boolean; + message: string; + inputId: string; + inputsProcessed?: number; + transactionHash?: string; + rollup?: number; +} + +/** + * Payload for input update events published via MQTT or the event system. + * This represents the lifecycle state of a single input as it progresses + * through the batching pipeline. + */ +export type InputUpdatePayload = BatcherGrammar["input:update"]; + diff --git a/packages/batcher/core/batch-processor.ts b/packages/batcher/core/batch-processor.ts index 83afadc93..0621893bf 100644 --- a/packages/batcher/core/batch-processor.ts +++ b/packages/batcher/core/batch-processor.ts @@ -3,6 +3,7 @@ import type { BlockchainTransactionReceipt, } from "../adapters/adapter.ts"; import type { DefaultBatcherInput } from "./types.ts"; +import type { BatcherGrammar } from "./batcher-events.ts"; /** @@ -13,6 +14,11 @@ export class BatchProcessor { constructor( private batcher: { emitStateTransition: (prefix: string, payload: any) => Promise; + emitInputUpdate: ( + input: T, + payload: Omit, + ) => Promise; + removeProcessedIds: (inputs: T[]) => void; storage: { removeProcessedInputs: (inputs: T[], target: string) => Promise }; submissionCallbacks: Map< string, @@ -49,13 +55,21 @@ export class BatchProcessor { const { selectedInputs, data } = batchResult; // data is 'unknown' - await this.submitAndConfirmTransaction( - adapter, - target, - data, - selectedInputs as T[], - timeout, - ); + try { + await this.submitAndConfirmTransaction( + adapter, + target, + data, + selectedInputs as T[], + timeout, + ); + } catch (error) { + await this.emitInputUpdates(selectedInputs as T[], { + phase: "error", + error: error instanceof Error ? error.message : "Unknown error", + }); + throw error; + } } private async submitAndConfirmTransaction( @@ -82,6 +96,10 @@ export class BatchProcessor { txHash: hash, time: Date.now(), }); + await this.emitInputUpdates(selectedInputs, { + phase: "submitted", + txHash: hash, + }); // Wait for confirmation and EffectStream processing await this.handleTransactionConfirmation( @@ -106,9 +124,15 @@ export class BatchProcessor { blockNumber: receipt.blockNumber, time: Date.now(), }); + await this.emitInputUpdates(selectedInputs, { + phase: "receipt", + txHash: hash, + blockNumber: receipt.blockNumber, + }); // Remove processed inputs from storage after successful receipt await this.batcher.storage.removeProcessedInputs(selectedInputs, target); + this.batcher.removeProcessedIds(selectedInputs); // Resolve all callbacks with the receipt // Individual callers will decide if they want to continue waiting for EffectStream @@ -120,6 +144,8 @@ export class BatchProcessor { adapter, target, timeout, + selectedInputs, + hash, ).catch((error) => { console.error( `⚠️ Error waiting for EffectStream processing for target ${target}:`, @@ -133,6 +159,8 @@ export class BatchProcessor { adapter: BlockchainAdapter, target: string, timeout: number, + selectedInputs: T[], + txHash: string, ): Promise { try { const processingResult = await this.batcher.waitForEffectStreamProcessed( @@ -148,6 +176,11 @@ export class BatchProcessor { rollup: processingResult.rollup, time: Date.now(), }); + await this.emitInputUpdates(selectedInputs, { + phase: "effectstream-processed", + txHash, + rollup: processingResult.rollup, + }); } else { console.error( `❌ EffectStream processing validation failed for target ${target}`, @@ -158,6 +191,11 @@ export class BatchProcessor { error: new Error("EffectStream processing validation failed"), time: Date.now(), }); + await this.emitInputUpdates(selectedInputs, { + phase: "error", + txHash, + error: "EffectStream processing validation failed", + }); } } catch (error) { console.error( @@ -170,9 +208,23 @@ export class BatchProcessor { error, time: Date.now(), }); + await this.emitInputUpdates(selectedInputs, { + phase: "error", + txHash, + error: error instanceof Error ? error.message : "Unknown error", + }); } } + private async emitInputUpdates( + inputs: T[], + payload: Omit, + ): Promise { + await Promise.all(inputs.map((input) => + this.batcher.emitInputUpdate(input, payload) + )); + } + private resolveInputCallbacks( selectedInputs: T[], receipt: BlockchainTransactionReceipt, diff --git a/packages/batcher/core/batcher-events.ts b/packages/batcher/core/batcher-events.ts index a045b1190..f5cba4bf3 100644 --- a/packages/batcher/core/batcher-events.ts +++ b/packages/batcher/core/batcher-events.ts @@ -39,6 +39,21 @@ export type BatcherGrammar = Record & { success: boolean; time: number; }; + "input:update": { + inputId: string; + target: string; + phase: + | "accepted" + | "submitted" + | "receipt" + | "effectstream-processed" + | "error"; + txHash?: string; + blockNumber?: number | bigint; + rollup?: number; + error?: string; + time: number; + }; error: { phase: string; target?: string; error: unknown; time: number }; }; diff --git a/packages/batcher/core/batcher.ts b/packages/batcher/core/batcher.ts index 0d8607831..aae29f7ca 100644 --- a/packages/batcher/core/batcher.ts +++ b/packages/batcher/core/batcher.ts @@ -1,5 +1,5 @@ import { CryptoManager } from "@effectstream/crypto"; -import { call, lift, resource, sleep, spawn, suspend } from "effection"; +import { call, resource, sleep, spawn, suspend } from "effection"; import type { Operation } from "effection"; import type { BatcherStorage } from "./storage.ts"; import type { DefaultBatcherInput } from "./types.ts"; @@ -11,11 +11,16 @@ import type { BatchingCriteriaConfig, BatcherConfig } from "./config.ts"; import { applyBatcherConfigDefaults, DEFAULT_BATCHING_CRITERIA, + DEFAULT_CONFIG_VALUES, validateBatcherConfig, validateBatchingCriteria, validatePreInit, } from "./config.ts"; import { startBatcherHttpServer } from "../server/batcher-server.ts"; +import { + BatcherMqttServer, + type BatcherMqttServerOptions, +} from "../server/mqtt-server.ts"; import { BatcherFileStorage } from "./mod.ts"; import { BatchProcessor } from "./batch-processor.ts"; import { @@ -101,6 +106,17 @@ export class Batcher { > = new Map(); /** Batch processor for handling complex batch operations */ private readonly batchProcessor: BatchProcessor; + /** Async state transition dispatcher for non-Effection contexts */ + private readonly stateTransitionInvoker: ( + prefix: string, + payload: any, + ) => Promise; + /** Embedded MQTT server for per-input updates */ + private mqttServer?: BatcherMqttServer; + private readonly enableMqtt: boolean; + private readonly mqttOptions: BatcherMqttServerOptions; + /** Set of pending input IDs to prevent duplicates */ + private readonly pendingInputIds: Set = new Set(); /** Shutdown manager for handling graceful shutdowns */ private readonly shutdownManager: ShutdownManager; /** State transition listeners keyed by prefix */ @@ -167,33 +183,35 @@ export class Batcher { // Initialize last process times map (will be populated in init()/runBatcher()) this.lastProcessTime = new Map(); + this.stateTransitionInvoker = async (prefix, payload) => { + if (!this.enableEventSystem) return; + const listener = this.stateTransitionListeners.get(prefix); + if (!listener) return; + try { + await listener(payload); + } catch (error) { + const hasErrorListener = this.stateTransitionListeners.has("error"); + if (prefix !== "error" && hasErrorListener) { + try { + await this.stateTransitionListeners.get("error")!({ + phase: `event-listener:${prefix}`, + error, + time: Date.now(), + }); + } catch { + // swallow + } + } + } + }; + this.batchProcessor = new BatchProcessor({ - emitStateTransition: async (prefix: string, payload: any) => { - // For async contexts, we need to handle this differently - // Since we're in an async method but need to call an Effection operation, - // we'll create a simple non-blocking implementation - if (this.enableEventSystem) { - const listener = this.stateTransitionListeners.get(prefix); - if (listener) { - try { - // Execute the listener asynchronously without blocking - await listener(payload); - } catch (error) { - const hasErrorListener = this.stateTransitionListeners.has( - "error", - ); - if (prefix !== "error" && hasErrorListener) { - try { - await this.stateTransitionListeners.get("error")!({ - phase: `event-listener:${prefix}`, - error, - time: Date.now(), - }); - } catch { - // swallow - } - } - } + emitStateTransition: this.stateTransitionInvoker, + emitInputUpdate: (input, payload) => this.emitInputUpdate(input, payload), + removeProcessedIds: (inputs) => { + for (const input of inputs) { + if (input.inputId) { + this.pendingInputIds.delete(input.inputId); } } }, @@ -219,6 +237,14 @@ export class Batcher { this.enableHttpServer = this.config.enableHttpServer!; this.enableEventSystem = this.config.enableEventSystem!; this.namespace = this.config.namespace ?? this.namespace; + this.enableMqtt = !!this.config.mqtt?.enabled; + this.mqttOptions = { + host: this.config.mqtt?.host ?? DEFAULT_CONFIG_VALUES.mqtt.host, + port: this.config.mqtt?.port ?? DEFAULT_CONFIG_VALUES.mqtt.port, + retainLastMessage: + this.config.mqtt?.retainLastMessage ?? + DEFAULT_CONFIG_VALUES.mqtt.retainLastMessage, + }; } /** @@ -250,28 +276,8 @@ export class Batcher { */ *emitStateTransition(prefix: string, payload: any): Operation { if (!this.enableEventSystem) return; - const listener = this.stateTransitionListeners.get(prefix); - if (!listener) return; - - // `spawn` starts the listener in the background. - // The `emitStateTransition` operation can return immediately. yield* spawn((function* (this: Batcher) { - try { - // We still use `call` here to handle the listener being async. - yield* lift(listener)(payload); - } catch (error) { - // Error handling now happens inside the spawned fiber, - // preventing a listener crash from taking down the whole batcher. - const hasErrorListener = this.stateTransitionListeners.has("error"); - if (prefix !== "error" && hasErrorListener) { - // Re-emit the error, again in a supervised manner. - yield* lift(this.stateTransitionListeners.get("error")!)({ - phase: `event-listener:${prefix}`, - error, - time: Date.now(), - }); - } - } + yield* call(() => this.stateTransitionInvoker(prefix, payload)); }).bind(this)); } @@ -410,6 +416,7 @@ export class Batcher { } await this.storage.init(); + await this.populatePendingInputIds(); for (const [target, adapter] of Object.entries(this.adapters)) { if (typeof adapter.recoverState === "function") { @@ -432,6 +439,9 @@ export class Batcher { if (this.enableHttpServer) { await this.startHttpServer(); } + if (this.enableMqtt) { + await this.startMqttServer(); + } this.isInitialized = true; await this.emitStateTransition("startup", { @@ -460,7 +470,16 @@ export class Batcher { ); } - if (!this.defaultTarget && !input.target) { + const inputWithId = this.ensureInputHasId(input); + + if (this.pendingInputIds.has(inputWithId.inputId!)) { + throw new InputValidationError( + `Input with ID ${inputWithId.inputId} is already pending`, + 409, // Conflict + ); + } + + if (!this.defaultTarget && !inputWithId.target) { throw new InputValidationError( "No default target configured and input.target not specified. " + "Add adapters using addBlockchainAdapter() before initialization.", @@ -468,7 +487,7 @@ export class Batcher { ); } - const target = input.target || this.defaultTarget!; + const target = inputWithId.target || this.defaultTarget!; const adapter = this.adapters[target]; if (!adapter) { throw new InputValidationError(`Adapter for target ${target} not found. Available targets: ${Object.keys(this.adapters).join(", ")}`, 404); @@ -478,10 +497,10 @@ export class Batcher { let verifiedSignature: boolean; if (adapter && typeof adapter.verifySignature === "function") { - verifiedSignature = await adapter.verifySignature(input); - } else if (input.signature) { + verifiedSignature = await adapter.verifySignature(inputWithId); + } else if (inputWithId.signature) { // Fall back to the batcher's default EVM verification when a signature is provided - verifiedSignature = await this._defaultVerifyInputSignature(input); + verifiedSignature = await this._defaultVerifyInputSignature(inputWithId); } else { throw new InputValidationError( `Adapter for target ${target} requires either a signature or a custom verifySignature implementation`, @@ -494,7 +513,7 @@ export class Batcher { // 2. Adapter-Specific Input Validation (Pre-Queue) if (adapter && typeof adapter.validateInput === "function") { - const validationResult = await adapter.validateInput(input); + const validationResult = await adapter.validateInput(inputWithId); if (!validationResult.valid) { throw new InputValidationError( validationResult.error || "Invalid input for target adapter", @@ -503,10 +522,12 @@ export class Batcher { } // 3. Add to Storage (Only if all validation passes) - await this.addInput(input); + await this.addInput(inputWithId); + this.pendingInputIds.add(inputWithId.inputId!); const { count, size } = await this.storage.getInputCountAndSize(); + await this.emitInputUpdate(inputWithId, { phase: "accepted" }); console.log( - `✅ Added input from ${input.address} to batch queue. Queue size: ${count} inputs, ${size} bytes`, + `✅ Added input from ${inputWithId.address} to batch queue. Queue size: ${count} inputs, ${size} bytes`, ); if (confirmationLevel === "no-wait") { @@ -516,7 +537,7 @@ export class Batcher { // Create promise for callback with timeout const receiptPromise = new Promise( (resolve, reject) => { - const callbackKey = this.getInputCallbackKey(input); + const callbackKey = this.getInputCallbackKey(inputWithId); const timeoutId = setTimeout(() => { this.submissionCallbacks.delete(callbackKey); reject(new Error("Receipt confirmation timeout")); @@ -545,7 +566,7 @@ export class Batcher { // If waiting for EffectStream processing, continue waiting if (confirmationLevel === "wait-effectstream-processed") { - const target = input.target || this.defaultTarget; + const target = inputWithId.target || this.defaultTarget; if (!target) { throw new Error( "Cannot wait for EffectStream processing: no target specified and no default target configured.", @@ -641,13 +662,14 @@ export class Batcher { * Storage is the single source of truth - no pool needed */ async addInput(input: T): Promise { - const target = input.target ?? this.defaultTarget; + const normalizedInput = this.ensureInputHasId(input); + const target = normalizedInput.target ?? this.defaultTarget; if (!target) { throw new Error( "Cannot add input: no target specified and no default target configured.", ); } - await this.storage.addInput(input, target); + await this.storage.addInput(normalizedInput, target); } private async _defaultVerifyInputSignature( @@ -678,7 +700,47 @@ export class Batcher { return await cryptoManager.verifySignature(input.address, message, input.signature); } + private async populatePendingInputIds(): Promise { + const allPending = await this.storage.getAllInputs(); + for (const input of allPending) { + if (input.inputId) { + this.pendingInputIds.add(input.inputId); + } + } + } + + private ensureInputHasId(input: T): T { + if (!input.inputId) { + input.inputId = crypto.randomUUID(); + } + return input; + } + + private async emitInputUpdate( + input: T, + details: Omit, + ): Promise { + const normalizedInput = this.ensureInputHasId(input); + const target = normalizedInput.target || this.defaultTarget; + if (!target) return; + const payload = { + ...details, + inputId: normalizedInput.inputId!, + target, + time: Date.now(), + }; + if (this.enableEventSystem) { + await this.stateTransitionInvoker("input:update", payload); + } + if (this.enableMqtt) { + await this.publishInputUpdate(payload); + } + } + private getInputCallbackKey(input: T): string { + if (input.inputId) { + return input.inputId; + } const target = input.target || this.defaultTarget; if (!target) { throw new Error( @@ -695,6 +757,20 @@ export class Batcher { ].join("|"); } + private async publishInputUpdate( + payload: BatcherGrammar["input:update"], + ): Promise { + if (!this.enableMqtt || !this.mqttServer) return; + try { + await this.mqttServer.publish( + `batcher/inputs/${payload.inputId}`, + payload, + ); + } catch (error) { + console.error("❌ Failed to publish MQTT update:", error); + } + } + async pollBatcher(): Promise { if (this.shutdownState.isShuttingDown) return; @@ -870,6 +946,7 @@ export class Batcher { } await this.storage.clearAllInputs(); + this.pendingInputIds.clear(); } /** @@ -905,6 +982,19 @@ export class Batcher { } } + private async startMqttServer(): Promise { + if (!this.enableMqtt) return; + if (this.mqttServer) return; + this.mqttServer = new BatcherMqttServer(this.mqttOptions); + await this.mqttServer.start(); + } + + private async stopMqttServer(): Promise { + if (!this.mqttServer) return; + await this.mqttServer.stop(); + this.mqttServer = undefined; + } + /** * Get current batching status and statistics */ @@ -986,6 +1076,11 @@ export class Batcher { adapterTargets: string[]; /** Per-adapter batching criteria types */ criteriaTypes: Record; + mqtt: { + enabled: boolean; + host: string; + port: number; + }; } { const criteriaTypes: Record = {}; for (const [target, criteria] of this.batchingCriteria) { @@ -1001,6 +1096,11 @@ export class Batcher { port: this.port, adapterTargets: Object.keys(this.adapters), criteriaTypes, + mqtt: { + enabled: this.enableMqtt, + host: this.mqttOptions.host, + port: this.mqttOptions.port, + }, }; } @@ -1040,7 +1140,7 @@ export class Batcher { * Cleanup additional resources (can be overridden by subclasses) */ protected async cleanupResources(): Promise { - // Default implementation - can be extended by subclasses + await this.stopMqttServer(); } /** @@ -1287,6 +1387,7 @@ export class Batcher { // 3. Perform sequential setup tasks yield* call(() => this.storage.init()); + yield* call(() => this.populatePendingInputIds()); // 4. Recover adapter state from storage (e.g., Bitcoin reserved funds) if (this.defaultTarget) { @@ -1305,6 +1406,9 @@ export class Batcher { publicConfig: this.getPublicConfig(), time: Date.now(), }); + if (this.enableMqtt) { + yield* call(() => this.startMqttServer()); + } // 5. Run the main background tasks concurrently // Spawn ensures that if one task fails or stops, the other is also stopped. diff --git a/packages/batcher/core/config.ts b/packages/batcher/core/config.ts index 161dc7f36..a9840cbc5 100644 --- a/packages/batcher/core/config.ts +++ b/packages/batcher/core/config.ts @@ -17,6 +17,13 @@ import { Value } from "@sinclair/typebox/value"; */ export type ValidAdapterKey = T extends Record ? K : never; +export interface BatcherMqttConfig { + enabled?: boolean; + port?: number; + host?: string; + retainLastMessage?: boolean; +} + /** * Configuration for when and how batches should be processed. * Supports time-based, size-based, value-based, hybrid, and custom criteria. @@ -136,6 +143,7 @@ export interface BatcherConfig< port?: number; enableHttpServer?: boolean; enableEventSystem?: boolean; + mqtt?: BatcherMqttConfig; // Batching behavior batchingCriteria?: PerAdapterBatchingCriteria; @@ -167,6 +175,12 @@ export const DEFAULT_CONFIG_VALUES = { port: 3000, enableHttpServer: true, enableEventSystem: false, + mqtt: { + enabled: false, + port: 8833, + host: "0.0.0.0", + retainLastMessage: true, + }, maxRetries: 3, retryDelayMs: 1000, shutdown: { @@ -235,6 +249,22 @@ export const BatcherConfigSchema = Type.Object({ enableEventSystem: Type.Optional( Type.Boolean({ default: DEFAULT_CONFIG_VALUES.enableEventSystem }), ), + mqtt: Type.Optional(Type.Object({ + enabled: Type.Optional( + Type.Boolean({ default: DEFAULT_CONFIG_VALUES.mqtt.enabled }), + ), + port: Type.Optional(Type.Number({ + minimum: 1, + maximum: 65535, + default: DEFAULT_CONFIG_VALUES.mqtt.port, + })), + host: Type.Optional( + Type.String({ default: DEFAULT_CONFIG_VALUES.mqtt.host }), + ), + retainLastMessage: Type.Optional(Type.Boolean({ + default: DEFAULT_CONFIG_VALUES.mqtt.retainLastMessage, + })), + }, { additionalProperties: false })), shutdown: Type.Optional(Type.Object({ hooks: Type.Optional(Type.Object({ diff --git a/packages/batcher/core/mod.ts b/packages/batcher/core/mod.ts index 7d64081c6..671f1afac 100644 --- a/packages/batcher/core/mod.ts +++ b/packages/batcher/core/mod.ts @@ -2,6 +2,7 @@ export { Batcher } from "./batcher.ts"; export type { BatchingCriteriaConfig, BatcherConfig, + BatcherMqttConfig, ValidAdapterKey, } from "./config.ts"; export { validateBatcherConfig, validateBatchingCriteria } from "./config.ts"; @@ -11,3 +12,4 @@ export { FileStorage as BatcherFileStorage, } from "./storage.ts"; export type { DefaultBatcherInput } from "./types.ts"; +export type { BatcherResponse, InputUpdatePayload } from "./api-types.ts"; diff --git a/packages/batcher/core/storage.ts b/packages/batcher/core/storage.ts index 96b6b8559..b6da22df6 100644 --- a/packages/batcher/core/storage.ts +++ b/packages/batcher/core/storage.ts @@ -140,6 +140,9 @@ export class FileStorage * Create a unique key for a DefaultBatcherInput for comparison */ private createInputKey(input: T, target: string): string { + if (input.inputId) { + return `id-${input.inputId}`; + } return `${input.addressType}-${target}-${input.address}-${input.input}-${input.timestamp}-${input.signature ?? ""}`; } diff --git a/packages/batcher/core/types.ts b/packages/batcher/core/types.ts index 64ea8071f..90842747e 100644 --- a/packages/batcher/core/types.ts +++ b/packages/batcher/core/types.ts @@ -7,4 +7,9 @@ export interface DefaultBatcherInput { address: string; timestamp: string; target?: string; // Optional since by default we will target the PaimaL2 contract + /** + * Unique identifier for this input. Assigned server-side if omitted so + * clients can subscribe to lifecycle events safely. + */ + inputId?: string; } diff --git a/packages/batcher/deno.json b/packages/batcher/deno.json index d68877bc9..6bfa2e6b3 100644 --- a/packages/batcher/deno.json +++ b/packages/batcher/deno.json @@ -34,6 +34,9 @@ "ecpair": "npm:ecpair@^2.1.0", "tiny-secp256k1": "npm:tiny-secp256k1@^2.2.1", "varuint-bitcoin": "npm:varuint-bitcoin@^1.1.2", - "rxjs": "npm:rxjs@7.8.2" + "rxjs": "npm:rxjs@7.8.2", + "aedes": "npm:aedes@^0.51.3", + "aedes-server-factory": "npm:aedes-server-factory@^0.2.1", + "ip": "npm:ip@^2.0.1" } } \ No newline at end of file diff --git a/packages/batcher/mod.ts b/packages/batcher/mod.ts index 3510017fe..5591b0b1f 100644 --- a/packages/batcher/mod.ts +++ b/packages/batcher/mod.ts @@ -53,6 +53,9 @@ export type { DefaultBatcherInput } from "./core/types.ts"; export type { BatcherGrammar, BatcherListener } from "./core/batcher-events.ts"; export { attachDefaultConsoleListeners } from "./core/batcher-events.ts"; +// API types +export type { BatcherResponse, InputUpdatePayload } from "./core/api-types.ts"; + export { DefaultBatchBuilderLogic } from "./batch-data-builder/default-builder-logic.ts"; export { EvmBatchBuilderLogic, diff --git a/packages/batcher/server/batcher-server.ts b/packages/batcher/server/batcher-server.ts index 3a624940e..02a060d4b 100644 --- a/packages/batcher/server/batcher-server.ts +++ b/packages/batcher/server/batcher-server.ts @@ -251,6 +251,7 @@ export async function startBatcherHttpServer( inputsProcessed: Type.Number(), transactionHash: Type.Optional(Type.String()), rollup: Type.Optional(Type.Number()), + inputId: Type.String(), }), }, }, @@ -292,12 +293,14 @@ export async function startBatcherHttpServer( ); // Return appropriate response based on confirmation level + const inputId = (adaptedInput as any).inputId as string; switch (confirmationLevel) { case "no-wait": return { success: true, message: "Input queued for batching", inputsProcessed: 1, + inputId, }; case "wait-receipt": return { @@ -305,6 +308,7 @@ export async function startBatcherHttpServer( message: "Input processed successfully", transactionHash: result?.hash, inputsProcessed: 1, + inputId, }; case "wait-effectstream-processed": return { @@ -313,12 +317,14 @@ export async function startBatcherHttpServer( transactionHash: result?.hash, rollup: result?.rollup, inputsProcessed: 1, + inputId, }; default: return { success: true, message: "Input processed successfully", inputsProcessed: 1, + inputId, }; } } catch (error) { diff --git a/packages/batcher/server/mqtt-server.ts b/packages/batcher/server/mqtt-server.ts new file mode 100644 index 000000000..6bb947ce8 --- /dev/null +++ b/packages/batcher/server/mqtt-server.ts @@ -0,0 +1,149 @@ +import AedesModule, { type Client, type PublishPacket } from "aedes"; +import type { Server } from "aedes-server-factory"; +import { createServer } from "aedes-server-factory"; +import ip from "ip"; +import { Buffer } from "node:buffer"; + +function isLocalhost(ipAddress: string | undefined): boolean { + if (!ipAddress) return false; + try { + if (ip.isV4Format(ipAddress)) { + return ip.cidrSubnet("127.0.0.0/8").contains(ipAddress); + } + if (ip.isV6Format(ipAddress)) { + const localhostRange = ip.cidrSubnet("::1/128"); + const mappedRange = ip.cidrSubnet("::ffff:127.0.0.0/104"); + return localhostRange.contains(ipAddress) || mappedRange.contains(ipAddress); + } + } catch { + // ignore malformed addresses + } + return false; +} + +function extractRemoteAddress(client: Client | null): string | undefined { + // Deno currently hides remoteAddr behind symbols on the socket instance. + // This mirrors the workaround we already use inside the event-broker package. + const socket = client?.req?.socket; + if (!socket) return undefined; + const symbols = Object.getOwnPropertySymbols(socket); + const handleSymbol = symbols.find((symbol) => symbol.toString() === "Symbol(kHandle)"); + if (!handleSymbol) return undefined; + const socketHandle = (socket as any)[handleSymbol]; + const innerSymbols = Object.getOwnPropertySymbols(socketHandle ?? {}); + const streamBaseSymbol = innerSymbols.find((symbol) => + symbol.toString() === "Symbol(kStreamBaseField)" + ); + if (!streamBaseSymbol) return undefined; + const remoteAddr = socketHandle?.[streamBaseSymbol]?.remoteAddr; + return remoteAddr?.hostname; +} + +export interface BatcherMqttServerOptions { + host: string; + port: number; + retainLastMessage: boolean; +} + +type AedesConstructor = typeof import("aedes")["default"]; +type BrokerInstance = InstanceType; +type AedesFactory = (opts?: ConstructorParameters[0]) => BrokerInstance; + +const createAedesInstance: AedesFactory = (() => { + const moduleWithDefault = AedesModule as unknown as { + default?: AedesFactory; + }; + if (typeof moduleWithDefault.default === "function") { + return moduleWithDefault.default; + } + return AedesModule as unknown as AedesFactory; +})(); + +function stringifyPayload(payload: unknown): string { + return JSON.stringify( + payload, + (_key, value) => (typeof value === "bigint" ? value.toString() : value), + ); +} + +export class BatcherMqttServer { + private broker: BrokerInstance | null = null; + private server: Server | null = null; + + constructor(private readonly options: BatcherMqttServerOptions) {} + + async start(): Promise { + if (this.server) return; + + this.broker = createAedesInstance(); + this.broker.authorizePublish = ( + client: Client | null, + packet: PublishPacket, + callback: (error?: Error | null) => void, + ): void => { + if (isLocalhost(extractRemoteAddress(client))) { + callback(null); + return; + } + + console.error( + `[MQTT] Rejected publish from non-localhost origin for topic ${packet.topic}`, + ); + callback(new Error("MQTT publish restricted to localhost")); + }; + + this.server = createServer(this.broker, { ws: true }); + await new Promise((resolve, reject) => { + this.server!.listen(this.options.port, this.options.host, (err?: Error) => { + if (err) { + reject(err); + } else { + console.log( + `📡 Batcher MQTT broker listening on mqtt://${this.options.host}:${this.options.port}`, + ); + resolve(); + } + }); + }); + } + + async stop(): Promise { + if (!this.server) return; + await new Promise((resolve, reject) => { + this.server!.close((err?: Error) => { + if (err) reject(err); + else resolve(); + }); + }); + + if (this.broker) { + await new Promise((resolve) => { + this.broker!.close(() => { + resolve(); + }); + }); + } + + this.server = null; + this.broker = null; + } + + async publish(topic: string, payload: unknown): Promise { + if (!this.broker) return; + const packet: PublishPacket = { + cmd: "publish", + topic, + payload: Buffer.from(stringifyPayload(payload)), + qos: 0, + dup: false, + retain: this.options.retainLastMessage, + }; + await new Promise((resolve, reject) => { + this.broker!.publish(packet, (err?: Error) => { + if (err) reject(err); + else resolve(); + }); + }); + } +} +