Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions internal/bridge/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"io"
)

// ipcRequest is a minimal view of the simulator.SimulationRequest used for
Expand Down Expand Up @@ -70,6 +71,96 @@ func CompressRequest(reqJSON []byte) ([]byte, error) {
return out, nil
}

// ---------------------------------------------------------------------------
// FETCH_SNAPSHOT command — bidirectional IPC (Go bridge → simulator → Go)
// ---------------------------------------------------------------------------

// CommandOpcode identifies the command sent to the simulator over stdin.
type CommandOpcode string

const (
// OpFetchSnapshot requests one or more snapshot frames by sequence ID.
// The simulator responds with a "fetch_response" NDJSON frame on stdout.
OpFetchSnapshot CommandOpcode = "FETCH_SNAPSHOT"
)

// fetchSnapshotRequest is the JSON payload written to the simulator's stdin.
//
// Wire format:
//
// {"op":"FETCH_SNAPSHOT","id":3,"batch_size":5}
type fetchSnapshotRequest struct {
Op CommandOpcode `json:"op"`
ID uint32 `json:"id"`
BatchSize uint32 `json:"batch_size,omitempty"`
}

// SnapshotEntry is one snapshot frame inside a FetchSnapshotResponse.
type SnapshotEntry struct {
// Seq is the original sequence number of this snapshot.
Seq uint32 `json:"seq"`
// Data is the raw ledger snapshot payload.
Data json.RawMessage `json:"data"`
}

// FetchSnapshotResponse is the parsed payload of a "fetch_response" frame
// emitted by the simulator on stdout.
//
// Wire format:
//
// {"type":"fetch_response","seq":3,"data":{"snapshots":[...]}}
type FetchSnapshotResponse struct {
FrameType string `json:"type"`
// Seq echoes the requested starting sequence ID.
Seq uint32 `json:"seq"`
Data struct {
Snapshots []SnapshotEntry `json:"snapshots"`
} `json:"data"`
}

// FetchSnapshot sends a FETCH_SNAPSHOT command for a single frame to the
// simulator's stdin pipe w.
//
// The caller is responsible for reading the corresponding "fetch_response"
// frame from the simulator's stdout stream.
func FetchSnapshot(w io.Writer, id uint32) error {
return writeCommand(w, fetchSnapshotRequest{
Op: OpFetchSnapshot,
ID: id,
BatchSize: 1,
})
}

// FetchSnapshotBatch sends a FETCH_SNAPSHOT command requesting up to count
// consecutive frames starting at id.
//
// The simulator caps batch_size at 5; requesting more is safe but the response
// will contain at most 5 entries.
//
// The caller is responsible for reading the corresponding "fetch_response"
// frame from the simulator's stdout stream.
func FetchSnapshotBatch(w io.Writer, id uint32, count uint32) error {
if count == 0 {
count = 1
}
return writeCommand(w, fetchSnapshotRequest{
Op: OpFetchSnapshot,
ID: id,
BatchSize: count,
})
}

// writeCommand serialises cmd to a single NDJSON line and writes it to w.
func writeCommand(w io.Writer, cmd fetchSnapshotRequest) error {
b, err := json.Marshal(cmd)
if err != nil {
return fmt.Errorf("bridge: marshal command: %w", err)
}
b = append(b, '\n')
if _, err := w.Write(b); err != nil {
return fmt.Errorf("bridge: write command to simulator stdin: %w", err)
}
return nil
// WithRollbackAndResume injects a rollback-and-resume control command into a
// simulation request JSON payload.
func WithRollbackAndResume(reqJSON []byte, rewindStep int, forkParams map[string]string, harnessReset bool) ([]byte, error) {
Expand Down
198 changes: 132 additions & 66 deletions simulator/src/ipc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
cat > simulator/src/ipc/mod.rs << 'ENDOFFILE'
// Copyright 2026 Erst Users
// SPDX-License-Identifier: Apache-2.0

pub mod decompress;
pub mod validate;

use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::io::Write;

/// Identifies the kind of streaming frame emitted to stdout.
Expand All @@ -16,24 +18,17 @@ pub enum FrameType {
Snapshot,
/// Terminal frame; payload is the complete SimulationResponse JSON.
Final,
/// Response to a FETCH_SNAPSHOT command from the Go bridge.
FetchResponse,
}

/// A single newline-delimited JSON (NDJSON) frame written to stdout.
///
/// The Go bridge reads these lines from the simulator subprocess stdout pipe
/// in a background goroutine, enabling the UI to populate frames before the
/// simulation finishes (reducing Time-to-First-Interactive).
#[allow(dead_code)]
#[derive(Debug, Serialize, Deserialize)]
pub struct StreamFrame {
/// Discriminates snapshot frames from the terminal final frame.
#[serde(rename = "type")]
pub frame_type: FrameType,
/// Monotonically increasing sequence number (0-based) within one run.
pub seq: u32,
/// Arbitrary JSON payload.
/// - `FrameType::Snapshot`: ledger snapshot data captured mid-simulation.
/// - `FrameType::Final`: the complete `SimulationResponse` object.
pub data: serde_json::Value,
}

Expand All @@ -45,12 +40,6 @@ pub enum BridgeControlCommand {
}

impl StreamFrame {
/// Serialise this frame to a single JSON line on stdout.
///
/// Uses a locked stdout handle to prevent interleaved output when called
/// from concurrent contexts. Write errors are logged to stderr and ignored
/// so that simulation output is not disrupted by a broken pipe — the Go
/// side will detect the closed reader and surface the error via `Wait()`.
#[allow(dead_code)]
pub fn emit(&self) {
match serde_json::to_string(self) {
Expand All @@ -66,35 +55,100 @@ impl StreamFrame {
}
}

/// Emit an intermediate snapshot frame for ledger state captured mid-simulation.
///
/// # Arguments
/// * `seq` – Monotonically increasing sequence number for this run (start at 0).
/// * `data` – Ledger snapshot data to forward to the Go bridge.
#[allow(dead_code)]
pub fn emit_snapshot_frame(seq: u32, data: serde_json::Value) {
StreamFrame {
frame_type: FrameType::Snapshot,
seq,
data,
}
.emit();
StreamFrame { frame_type: FrameType::Snapshot, seq, data }.emit();
}

/// Emit the terminal frame, signalling to the Go bridge that the simulation
/// has completed and no further frames will follow.
///
/// # Arguments
/// * `seq` – Sequence number immediately following the last snapshot frame.
/// * `data` – The complete `SimulationResponse` as a `serde_json::Value`.
#[allow(dead_code)]
pub fn emit_final_frame(seq: u32, data: serde_json::Value) {
StreamFrame {
frame_type: FrameType::Final,
seq,
data,
StreamFrame { frame_type: FrameType::Final, seq, data }.emit();
}

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
pub enum CommandOpcode {
FetchSnapshot,
}

#[derive(Debug, Serialize, Deserialize)]
pub struct CommandFrame {
pub op: CommandOpcode,
pub id: u32,
#[serde(default = "default_batch_size")]
pub batch_size: u32,
}

fn default_batch_size() -> u32 { 1 }

#[derive(Debug, Serialize, Deserialize)]
pub struct SnapshotEntry {
pub seq: u32,
pub data: serde_json::Value,
}

#[derive(Debug, Serialize)]
struct FetchResponseFrame {
#[serde(rename = "type")]
frame_type: FrameType,
seq: u32,
data: FetchResponseData,
}

#[derive(Debug, Serialize)]
struct FetchResponseData {
snapshots: Vec<SnapshotEntry>,
}

#[derive(Debug, Default)]
pub struct SnapshotRegistry {
entries: HashMap<u32, serde_json::Value>,
}

impl SnapshotRegistry {
pub fn new() -> Self { Self::default() }

pub fn insert(&mut self, seq: u32, data: serde_json::Value) {
self.entries.insert(seq, data);
}

pub fn fetch(&self, id: u32, batch_size: u32) -> Vec<SnapshotEntry> {
let count = batch_size.clamp(1, 5);
(id..id.saturating_add(count))
.filter_map(|seq| {
self.entries.get(&seq).map(|data| SnapshotEntry { seq, data: data.clone() })
})
.collect()
}
}

pub fn handle_stdin_command(registry: &SnapshotRegistry) {
use std::io::BufRead;
let stdin = std::io::stdin();
let mut line = String::new();
if stdin.lock().read_line(&mut line).unwrap_or(0) == 0 { return; }
let cmd: CommandFrame = match serde_json::from_str(line.trim()) {
Ok(c) => c,
Err(e) => { eprintln!("ipc: failed to parse command: {e}"); return; }
};
match cmd.op {
CommandOpcode::FetchSnapshot => {
let snapshots = registry.fetch(cmd.id, cmd.batch_size);
let response = FetchResponseFrame {
frame_type: FrameType::FetchResponse,
seq: cmd.id,
data: FetchResponseData { snapshots },
};
match serde_json::to_string(&response) {
Ok(json_line) => {
let stdout = std::io::stdout();
let mut handle = stdout.lock();
let _ = writeln!(handle, "{json_line}");
}
Err(e) => eprintln!("ipc: failed to serialize fetch response: {e}"),
}
}
}
.emit();
}

#[cfg(test)]
Expand All @@ -103,51 +157,63 @@ mod tests {

#[test]
fn test_frame_type_serialization() {
let snapshot = serde_json::to_string(&FrameType::Snapshot).unwrap();
assert_eq!(snapshot, "\"snapshot\"");

let final_type = serde_json::to_string(&FrameType::Final).unwrap();
assert_eq!(final_type, "\"final\"");
assert_eq!(serde_json::to_string(&FrameType::Snapshot).unwrap(), "\"snapshot\"");
assert_eq!(serde_json::to_string(&FrameType::Final).unwrap(), "\"final\"");
assert_eq!(serde_json::to_string(&FrameType::FetchResponse).unwrap(), "\"fetchresponse\"");
}

#[test]
fn test_stream_frame_roundtrip() {
let frame = StreamFrame {
frame_type: FrameType::Snapshot,
seq: 3,
data: serde_json::json!({"entries": 42}),
};

let frame = StreamFrame { frame_type: FrameType::Snapshot, seq: 3, data: serde_json::json!({"entries": 42}) };
let json = serde_json::to_string(&frame).unwrap();
let decoded: StreamFrame = serde_json::from_str(&json).unwrap();

assert_eq!(decoded.frame_type, FrameType::Snapshot);
assert_eq!(decoded.seq, 3);
assert_eq!(decoded.data["entries"], 42);
}

#[test]
fn test_final_frame_roundtrip() {
let frame = StreamFrame {
frame_type: FrameType::Final,
seq: 5,
data: serde_json::json!({"status": "success", "events": []}),
};
fn test_emit_snapshot_frame_does_not_panic() {
emit_snapshot_frame(0, serde_json::json!({"test": true}));
}

let json = serde_json::to_string(&frame).unwrap();
assert!(json.contains("\"type\":\"final\""));
assert!(json.contains("\"seq\":5"));
#[test]
fn test_registry_insert_and_fetch_single() {
let mut reg = SnapshotRegistry::new();
reg.insert(0, serde_json::json!({"ledger": 0}));
let result = reg.fetch(0, 1);
assert_eq!(result.len(), 1);
assert_eq!(result[0].seq, 0);
}

let decoded: StreamFrame = serde_json::from_str(&json).unwrap();
assert_eq!(decoded.frame_type, FrameType::Final);
assert_eq!(decoded.data["status"], "success");
#[test]
fn test_registry_batch_capped_at_5() {
let mut reg = SnapshotRegistry::new();
for i in 0..20u32 { reg.insert(i, serde_json::json!({"ledger": i})); }
assert_eq!(reg.fetch(0, 10).len(), 5);
}

#[test]
fn test_emit_snapshot_frame_does_not_panic() {
// Only verifies that the helper compiles and runs without panicking.
// Stdout capture in unit tests is non-trivial; the integration tests
// validate the actual output format end-to-end.
emit_snapshot_frame(0, serde_json::json!({"test": true}));
fn test_registry_missing_seqs_skipped() {
let mut reg = SnapshotRegistry::new();
reg.insert(0, serde_json::json!({}));
reg.insert(2, serde_json::json!({}));
let result = reg.fetch(0, 3);
assert_eq!(result.len(), 2);
}

#[test]
fn test_command_frame_deserialization() {
let cmd: CommandFrame = serde_json::from_str(r#"{"op":"FETCH_SNAPSHOT","id":3,"batch_size":5}"#).unwrap();
assert_eq!(cmd.op, CommandOpcode::FetchSnapshot);
assert_eq!(cmd.id, 3);
assert_eq!(cmd.batch_size, 5);
}

#[test]
fn test_command_frame_default_batch_size() {
let cmd: CommandFrame = serde_json::from_str(r#"{"op":"FETCH_SNAPSHOT","id":7}"#).unwrap();
assert_eq!(cmd.batch_size, 1);
}
}
ENDOFFILE