From fbc77a747d7ee94da5da92d60998c5701d97c501 Mon Sep 17 00:00:00 2001 From: Raid Date: Sat, 28 Mar 2026 19:03:39 +0100 Subject: [PATCH 1/3] feat(bridge): add FetchSnapshot command and SnapshotRegistry (#996) --- internal/bridge/client.go | 93 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 93 insertions(+) diff --git a/internal/bridge/client.go b/internal/bridge/client.go index 2b266e60..498ee08e 100644 --- a/internal/bridge/client.go +++ b/internal/bridge/client.go @@ -11,6 +11,7 @@ import ( "encoding/base64" "encoding/json" "fmt" + "io" ) // ipcRequest is a minimal view of the simulator.SimulationRequest used for @@ -59,3 +60,95 @@ func CompressRequest(reqJSON []byte) ([]byte, error) { } return out, nil } + +// --------------------------------------------------------------------------- +// FETCH_SNAPSHOT command — bidirectional IPC (Go bridge → simulator → Go) +// --------------------------------------------------------------------------- + +// CommandOpcode identifies the command sent to the simulator over stdin. +type CommandOpcode string + +const ( + // OpFetchSnapshot requests one or more snapshot frames by sequence ID. + // The simulator responds with a "fetch_response" NDJSON frame on stdout. + OpFetchSnapshot CommandOpcode = "FETCH_SNAPSHOT" +) + +// fetchSnapshotRequest is the JSON payload written to the simulator's stdin. +// +// Wire format: +// +// {"op":"FETCH_SNAPSHOT","id":3,"batch_size":5} +type fetchSnapshotRequest struct { + Op CommandOpcode `json:"op"` + ID uint32 `json:"id"` + BatchSize uint32 `json:"batch_size,omitempty"` +} + +// SnapshotEntry is one snapshot frame inside a FetchSnapshotResponse. +type SnapshotEntry struct { + // Seq is the original sequence number of this snapshot. + Seq uint32 `json:"seq"` + // Data is the raw ledger snapshot payload. + Data json.RawMessage `json:"data"` +} + +// FetchSnapshotResponse is the parsed payload of a "fetch_response" frame +// emitted by the simulator on stdout. +// +// Wire format: +// +// {"type":"fetch_response","seq":3,"data":{"snapshots":[...]}} +type FetchSnapshotResponse struct { + FrameType string `json:"type"` + // Seq echoes the requested starting sequence ID. + Seq uint32 `json:"seq"` + Data struct { + Snapshots []SnapshotEntry `json:"snapshots"` + } `json:"data"` +} + +// FetchSnapshot sends a FETCH_SNAPSHOT command for a single frame to the +// simulator's stdin pipe w. +// +// The caller is responsible for reading the corresponding "fetch_response" +// frame from the simulator's stdout stream. +func FetchSnapshot(w io.Writer, id uint32) error { + return writeCommand(w, fetchSnapshotRequest{ + Op: OpFetchSnapshot, + ID: id, + BatchSize: 1, + }) +} + +// FetchSnapshotBatch sends a FETCH_SNAPSHOT command requesting up to count +// consecutive frames starting at id. +// +// The simulator caps batch_size at 5; requesting more is safe but the response +// will contain at most 5 entries. +// +// The caller is responsible for reading the corresponding "fetch_response" +// frame from the simulator's stdout stream. +func FetchSnapshotBatch(w io.Writer, id uint32, count uint32) error { + if count == 0 { + count = 1 + } + return writeCommand(w, fetchSnapshotRequest{ + Op: OpFetchSnapshot, + ID: id, + BatchSize: count, + }) +} + +// writeCommand serialises cmd to a single NDJSON line and writes it to w. +func writeCommand(w io.Writer, cmd fetchSnapshotRequest) error { + b, err := json.Marshal(cmd) + if err != nil { + return fmt.Errorf("bridge: marshal command: %w", err) + } + b = append(b, '\n') + if _, err := w.Write(b); err != nil { + return fmt.Errorf("bridge: write command to simulator stdin: %w", err) + } + return nil +} \ No newline at end of file From dc233702ac667d928dd39815320310a1aad54d17 Mon Sep 17 00:00:00 2001 From: Raid Date: Sat, 28 Mar 2026 21:10:34 +0100 Subject: [PATCH 2/3] fix(bridge): add missing newline at end of client.go --- internal/bridge/client.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/bridge/client.go b/internal/bridge/client.go index 498ee08e..1d627b09 100644 --- a/internal/bridge/client.go +++ b/internal/bridge/client.go @@ -151,4 +151,4 @@ func writeCommand(w io.Writer, cmd fetchSnapshotRequest) error { return fmt.Errorf("bridge: write command to simulator stdin: %w", err) } return nil -} \ No newline at end of file +} From f67960aecb1bbd66cdddb0a194614eedea9fb32c Mon Sep 17 00:00:00 2001 From: Raid Date: Sat, 28 Mar 2026 21:49:37 +0100 Subject: [PATCH 3/3] feat(ipc): add SnapshotRegistry and FetchSnapshot command handler --- simulator/src/ipc/mod.rs | 198 ++++++++++++++++++++++++++------------- 1 file changed, 132 insertions(+), 66 deletions(-) diff --git a/simulator/src/ipc/mod.rs b/simulator/src/ipc/mod.rs index a086add6..86b2cf30 100644 --- a/simulator/src/ipc/mod.rs +++ b/simulator/src/ipc/mod.rs @@ -1,3 +1,4 @@ +cat > simulator/src/ipc/mod.rs << 'ENDOFFILE' // Copyright 2026 Erst Users // SPDX-License-Identifier: Apache-2.0 @@ -5,6 +6,7 @@ pub mod decompress; pub mod validate; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use std::io::Write; /// Identifies the kind of streaming frame emitted to stdout. @@ -16,34 +18,21 @@ pub enum FrameType { Snapshot, /// Terminal frame; payload is the complete SimulationResponse JSON. Final, + /// Response to a FETCH_SNAPSHOT command from the Go bridge. + FetchResponse, } /// A single newline-delimited JSON (NDJSON) frame written to stdout. -/// -/// The Go bridge reads these lines from the simulator subprocess stdout pipe -/// in a background goroutine, enabling the UI to populate frames before the -/// simulation finishes (reducing Time-to-First-Interactive). #[allow(dead_code)] #[derive(Debug, Serialize, Deserialize)] pub struct StreamFrame { - /// Discriminates snapshot frames from the terminal final frame. #[serde(rename = "type")] pub frame_type: FrameType, - /// Monotonically increasing sequence number (0-based) within one run. pub seq: u32, - /// Arbitrary JSON payload. - /// - `FrameType::Snapshot`: ledger snapshot data captured mid-simulation. - /// - `FrameType::Final`: the complete `SimulationResponse` object. pub data: serde_json::Value, } impl StreamFrame { - /// Serialise this frame to a single JSON line on stdout. - /// - /// Uses a locked stdout handle to prevent interleaved output when called - /// from concurrent contexts. Write errors are logged to stderr and ignored - /// so that simulation output is not disrupted by a broken pipe — the Go - /// side will detect the closed reader and surface the error via `Wait()`. #[allow(dead_code)] pub fn emit(&self) { match serde_json::to_string(self) { @@ -59,35 +48,100 @@ impl StreamFrame { } } -/// Emit an intermediate snapshot frame for ledger state captured mid-simulation. -/// -/// # Arguments -/// * `seq` – Monotonically increasing sequence number for this run (start at 0). -/// * `data` – Ledger snapshot data to forward to the Go bridge. #[allow(dead_code)] pub fn emit_snapshot_frame(seq: u32, data: serde_json::Value) { - StreamFrame { - frame_type: FrameType::Snapshot, - seq, - data, - } - .emit(); + StreamFrame { frame_type: FrameType::Snapshot, seq, data }.emit(); } -/// Emit the terminal frame, signalling to the Go bridge that the simulation -/// has completed and no further frames will follow. -/// -/// # Arguments -/// * `seq` – Sequence number immediately following the last snapshot frame. -/// * `data` – The complete `SimulationResponse` as a `serde_json::Value`. #[allow(dead_code)] pub fn emit_final_frame(seq: u32, data: serde_json::Value) { - StreamFrame { - frame_type: FrameType::Final, - seq, - data, + StreamFrame { frame_type: FrameType::Final, seq, data }.emit(); +} + +#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "SCREAMING_SNAKE_CASE")] +pub enum CommandOpcode { + FetchSnapshot, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct CommandFrame { + pub op: CommandOpcode, + pub id: u32, + #[serde(default = "default_batch_size")] + pub batch_size: u32, +} + +fn default_batch_size() -> u32 { 1 } + +#[derive(Debug, Serialize, Deserialize)] +pub struct SnapshotEntry { + pub seq: u32, + pub data: serde_json::Value, +} + +#[derive(Debug, Serialize)] +struct FetchResponseFrame { + #[serde(rename = "type")] + frame_type: FrameType, + seq: u32, + data: FetchResponseData, +} + +#[derive(Debug, Serialize)] +struct FetchResponseData { + snapshots: Vec, +} + +#[derive(Debug, Default)] +pub struct SnapshotRegistry { + entries: HashMap, +} + +impl SnapshotRegistry { + pub fn new() -> Self { Self::default() } + + pub fn insert(&mut self, seq: u32, data: serde_json::Value) { + self.entries.insert(seq, data); + } + + pub fn fetch(&self, id: u32, batch_size: u32) -> Vec { + let count = batch_size.clamp(1, 5); + (id..id.saturating_add(count)) + .filter_map(|seq| { + self.entries.get(&seq).map(|data| SnapshotEntry { seq, data: data.clone() }) + }) + .collect() + } +} + +pub fn handle_stdin_command(registry: &SnapshotRegistry) { + use std::io::BufRead; + let stdin = std::io::stdin(); + let mut line = String::new(); + if stdin.lock().read_line(&mut line).unwrap_or(0) == 0 { return; } + let cmd: CommandFrame = match serde_json::from_str(line.trim()) { + Ok(c) => c, + Err(e) => { eprintln!("ipc: failed to parse command: {e}"); return; } + }; + match cmd.op { + CommandOpcode::FetchSnapshot => { + let snapshots = registry.fetch(cmd.id, cmd.batch_size); + let response = FetchResponseFrame { + frame_type: FrameType::FetchResponse, + seq: cmd.id, + data: FetchResponseData { snapshots }, + }; + match serde_json::to_string(&response) { + Ok(json_line) => { + let stdout = std::io::stdout(); + let mut handle = stdout.lock(); + let _ = writeln!(handle, "{json_line}"); + } + Err(e) => eprintln!("ipc: failed to serialize fetch response: {e}"), + } + } } - .emit(); } #[cfg(test)] @@ -96,51 +150,63 @@ mod tests { #[test] fn test_frame_type_serialization() { - let snapshot = serde_json::to_string(&FrameType::Snapshot).unwrap(); - assert_eq!(snapshot, "\"snapshot\""); - - let final_type = serde_json::to_string(&FrameType::Final).unwrap(); - assert_eq!(final_type, "\"final\""); + assert_eq!(serde_json::to_string(&FrameType::Snapshot).unwrap(), "\"snapshot\""); + assert_eq!(serde_json::to_string(&FrameType::Final).unwrap(), "\"final\""); + assert_eq!(serde_json::to_string(&FrameType::FetchResponse).unwrap(), "\"fetchresponse\""); } #[test] fn test_stream_frame_roundtrip() { - let frame = StreamFrame { - frame_type: FrameType::Snapshot, - seq: 3, - data: serde_json::json!({"entries": 42}), - }; - + let frame = StreamFrame { frame_type: FrameType::Snapshot, seq: 3, data: serde_json::json!({"entries": 42}) }; let json = serde_json::to_string(&frame).unwrap(); let decoded: StreamFrame = serde_json::from_str(&json).unwrap(); - assert_eq!(decoded.frame_type, FrameType::Snapshot); assert_eq!(decoded.seq, 3); assert_eq!(decoded.data["entries"], 42); } #[test] - fn test_final_frame_roundtrip() { - let frame = StreamFrame { - frame_type: FrameType::Final, - seq: 5, - data: serde_json::json!({"status": "success", "events": []}), - }; + fn test_emit_snapshot_frame_does_not_panic() { + emit_snapshot_frame(0, serde_json::json!({"test": true})); + } - let json = serde_json::to_string(&frame).unwrap(); - assert!(json.contains("\"type\":\"final\"")); - assert!(json.contains("\"seq\":5")); + #[test] + fn test_registry_insert_and_fetch_single() { + let mut reg = SnapshotRegistry::new(); + reg.insert(0, serde_json::json!({"ledger": 0})); + let result = reg.fetch(0, 1); + assert_eq!(result.len(), 1); + assert_eq!(result[0].seq, 0); + } - let decoded: StreamFrame = serde_json::from_str(&json).unwrap(); - assert_eq!(decoded.frame_type, FrameType::Final); - assert_eq!(decoded.data["status"], "success"); + #[test] + fn test_registry_batch_capped_at_5() { + let mut reg = SnapshotRegistry::new(); + for i in 0..20u32 { reg.insert(i, serde_json::json!({"ledger": i})); } + assert_eq!(reg.fetch(0, 10).len(), 5); } #[test] - fn test_emit_snapshot_frame_does_not_panic() { - // Only verifies that the helper compiles and runs without panicking. - // Stdout capture in unit tests is non-trivial; the integration tests - // validate the actual output format end-to-end. - emit_snapshot_frame(0, serde_json::json!({"test": true})); + fn test_registry_missing_seqs_skipped() { + let mut reg = SnapshotRegistry::new(); + reg.insert(0, serde_json::json!({})); + reg.insert(2, serde_json::json!({})); + let result = reg.fetch(0, 3); + assert_eq!(result.len(), 2); + } + + #[test] + fn test_command_frame_deserialization() { + let cmd: CommandFrame = serde_json::from_str(r#"{"op":"FETCH_SNAPSHOT","id":3,"batch_size":5}"#).unwrap(); + assert_eq!(cmd.op, CommandOpcode::FetchSnapshot); + assert_eq!(cmd.id, 3); + assert_eq!(cmd.batch_size, 5); + } + + #[test] + fn test_command_frame_default_batch_size() { + let cmd: CommandFrame = serde_json::from_str(r#"{"op":"FETCH_SNAPSHOT","id":7}"#).unwrap(); + assert_eq!(cmd.batch_size, 1); } } +ENDOFFILE \ No newline at end of file