Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions deno.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,12 @@ const config: PaimaBatcherConfig = {
enableHttpServer: true,
port: 3334,
enableEventSystem: true,
mqtt: {
enabled: true,
port: 8883,
host: "0.0.0.0",
retainLastMessage: true,
},
};
```

Expand Down Expand Up @@ -253,6 +259,96 @@ const config: PaimaBatcherConfig = {
};
```

### Realtime MQTT Updates

The batcher can host a lightweight MQTT broker (powered by [Aedes](https://github.com/moscajs/aedes)) so React or mobile clients can subscribe to lifecycle updates for each submitted input.

```typescript
const config: PaimaBatcherConfig = {
// ...
enableEventSystem: true,
mqtt: {
enabled: true,
port: 8883, // TCP + WebSocket listeners
host: "0.0.0.0", // Listen on all interfaces
retainLastMessage: true, // keep the latest state for late subscribers
},
};
```

Every successful `POST /send-input` response now includes an `inputId`:

```json
{
"success": true,
"message": "Input processed successfully",
"inputsProcessed": 1,
"transactionHash": "0x...",
"inputId": "ef07c6dd-e9c4-4bd7-99fb-9eae17f0f4c0"
}
```

Use that identifier to subscribe to the topic `batcher/inputs/{inputId}`. Each payload contains the phase, optional transaction metadata, and any errors:

```json
{
"inputId": "ef07c6dd-e9c4-4bd7-99fb-9eae17f0f4c0",
"target": "ethereum",
"phase": "receipt",
"txHash": "0x...",
"blockNumber": 19823423,
"time": 1734629912000
}
```

Phases include `accepted`, `submitted`, `receipt`, `effectstream-processed`, and `error`.

Clients can subscribe with [MQTT.js](https://github.com/mqttjs/MQTT.js):

```typescript
import mqtt from "mqtt";

const client = mqtt.connect("ws://localhost:8883");
const topic = `batcher/inputs/${inputId}`;

client.on("connect", () => {
client.subscribe(topic, (err) => {
if (err) console.error("Subscription failed", err);
});
});

client.on("message", (t, payload) => {
if (t !== topic) return;
const update = JSON.parse(payload.toString());
// React components can set state directly from update.phase
});
```

If you already consume batcher lifecycle events via `addStateTransition`, there is also a typed `input:update` event with the same payload for local observability or custom bridges.

#### MQTT Security: Publish Authorization

The batcher implements a **security-first approach** to MQTT publish operations. Only the batcher itself (localhost connections) can publish messages to prevent unauthorized actors from tampering with input status updates.

**How it works:**

The MQTT broker enforces **localhost-only publishing**:

- ✅ **Localhost connections** (127.0.0.1, ::1) can publish → Only the batcher itself
- ❌ **Remote connections** cannot publish → Always rejected with error

**Authorization flow for publish attempts:**

```
Client publishes → Check if localhost
├─ If localhost → ✅ Allow
└─ If remote → ❌ Reject with error
```

This ensures that only the batcher process can update input statuses. There is no configuration option to disable this security — it's always enforced.

**Subscribing remains unrestricted** — all clients (local or remote) can subscribe to topics to receive real-time status updates. This allows frontend apps running on different machines to monitor input progress without being able to forge status messages.

**What happens?**
1. Batcher queues inputs until it has **100 inputs** (criteria threshold)
2. When processing, `buildBatchData()` serializes inputs until reaching **10KB** (adapter limit)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,52 @@ const baseConfig: PaimaBatcherConfig = {
Set `pollingIntervalMs` to match your shortest batching time window. For example, if using `timeWindowMs: 5000`, a `pollingIntervalMs` of 1000 ensures responsive batch submission.
:::

#### MQTT Configuration

The `mqtt` object controls the embedded MQTT broker for real-time input status updates:

```typescript
const config: PaimaBatcherConfig = {
// ...
enableEventSystem: true, // Required for MQTT to work
mqtt: {
enabled: true, // Enable the MQTT broker
port: 8883, // Port for TCP + WebSocket
host: "0.0.0.0", // Listen on all interfaces
retainLastMessage: true, // Keep latest state for late subscribers
},
};
```

| Setting | Type | Default | Description |
|---------|------|---------|-------------|
| `mqtt.enabled` | `boolean` | `false` | Whether to start the embedded MQTT broker |
| `mqtt.port` | `number` | `8883` | Port for MQTT TCP and WebSocket connections |
| `mqtt.host` | `string` | `"0.0.0.0"` | Network interface to bind to (`"0.0.0.0"` = all interfaces, `"127.0.0.1"` = localhost only) |
| `mqtt.retainLastMessage` | `boolean` | `true` | Whether the broker should retain the latest message on each topic |

**Security Note:**

The MQTT broker **always** restricts publishing to localhost connections only. Only the batcher itself (running on 127.0.0.1 or ::1) can publish messages. Remote clients can subscribe to receive updates but cannot publish. This security is enforced and cannot be disabled.

:::warning MQTT + Event System Dependency
The MQTT broker requires `enableEventSystem: true` to function, as it relies on state transition events to publish input status updates.

```typescript
const config: PaimaBatcherConfig = {
enableEventSystem: true, // ✅ Required
mqtt: {
enabled: true,
// ...
},
};
```
:::

:::tip
Set `pollingIntervalMs` to match your shortest batching time window. For example, if using `timeWindowMs: 5000`, a `pollingIntervalMs` of 1000 ensures responsive batch submission.
:::

### Step 2: Instantiate Blockchain Adapters

Instantiate each blockchain adapter you want to use:
Expand Down
6 changes: 6 additions & 0 deletions e2e/client/batcher/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ export const config: BatcherConfig<DefaultBatcherInput> = {
confirmationLevel: "wait-effectstream-processed",
enableEventSystem: true, // Important for adding state transitions to console logs
port,
mqtt: {
enabled: true,
port: 8833,
host: "0.0.0.0",
retainLastMessage: true,
},
};

export const storage = new FileStorage("./batcher-data");
3 changes: 2 additions & 1 deletion e2e/client/node/deno.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
"@polkadot/util-crypto": "npm:@polkadot/util-crypto@^13.4.3",
"bitcoin": "npm:bitcoinjs-lib@^7.0.0",
"ecpair": "npm:ecpair@^3.0.0",
"tiny-secp256k1": "npm:tiny-secp256k1@^2.2.4"
"tiny-secp256k1": "npm:tiny-secp256k1@^2.2.4",
"mqtt": "npm:mqtt@^5.4.1"
}
}
Loading