diff --git a/internal/ipc/types.go b/internal/ipc/types.go index 51e0a5c7..d879d6ce 100644 --- a/internal/ipc/types.go +++ b/internal/ipc/types.go @@ -11,6 +11,17 @@ import ( "github.com/dotandev/hintents/internal/errors" ) +const ( + ChunkFrameStart = "chunk_start" + ChunkFrameData = "chunk_data" + ChunkFrameEnd = "chunk_end" + + DefaultChunkBytes = 1024 * 1024 + DefaultChunkTimeoutMs = 5000 + DefaultChunkRetryLimit = 2 + DefaultChunkMaxLineSize = 2 * 1024 * 1024 +) + // ToErstError converts an IPC Error from the Rust simulator into the unified ErstError type. // The original Code and Message strings are preserved in OrigErr. // Note: the Rust simulator currently emits plain message strings without structured codes, @@ -106,6 +117,22 @@ type SimulationResponseSchema struct { Version string `json:"version"` } +type SimulationStreamFrame struct { + Kind string `json:"kind"` + StreamID string `json:"stream_id,omitempty"` + Index int `json:"index,omitempty"` + Total int `json:"total,omitempty"` + ChunkBytes int `json:"chunk_bytes,omitempty"` + TimeoutMs int `json:"timeout_ms,omitempty"` + RetryLimit int `json:"retry_limit,omitempty"` + DataBase64 string `json:"data_b64,omitempty"` + TotalBytes int64 `json:"total_bytes,omitempty"` +} + +func (f SimulationStreamFrame) IsChunkFrame() bool { + return f.Kind == ChunkFrameStart || f.Kind == ChunkFrameData || f.Kind == ChunkFrameEnd +} + type Error struct { Code string `json:"code"` Message string `json:"message"` diff --git a/internal/simulator/runner.go b/internal/simulator/runner.go index f8c84619..6f84fcce 100644 --- a/internal/simulator/runner.go +++ b/internal/simulator/runner.go @@ -244,6 +244,49 @@ func (r *Runner) Run(ctx context.Context, req *SimulationRequest) (*SimulationRe req.Timestamp = r.MockTime } + const maxChunkReadRetries = ipc.DefaultChunkRetryLimit + + var ( + resp *SimulationResponse + err error + ) + for attempt := 0; attempt <= maxChunkReadRetries; attempt++ { + resp, err = r.runAttempt(ctx, req) + if err == nil { + break + } + + chunkErr, ok := err.(*chunkStreamError) + if !ok || attempt >= chunkErr.retryLimit { + return nil, err + } + + logger.Logger.Warn( + "Retrying simulator after chunk delivery failure", + "attempt", attempt+1, + "retry_limit", chunkErr.retryLimit, + "error", chunkErr.Error(), + ) + } + + // If the simulator returned a logical error inside the response payload, + // classify it into a unified ErstError before returning to the caller. + if resp.Error != "" { + classified := (&ipc.Error{Code: resp.ErrorCode, Message: resp.Error}).ToErstError() + logger.Logger.Error("Simulator returned error", + "code", classified.Code, + "original", classified.OrigErr, + ) + return nil, classified + } + + resp.ProtocolVersion = &proto.Version + success = true + + return resp, nil +} + +func (r *Runner) runAttempt(ctx context.Context, req *SimulationRequest) (*SimulationResponse, error) { inputBytes, err := json.Marshal(req) if err != nil { logger.Logger.Error("Failed to marshal simulation request", "error", err) @@ -261,11 +304,12 @@ func (r *Runner) Run(ctx context.Context, req *SimulationRequest) (*SimulationRe cmd.Stdin = bytes.NewReader(inputBytes) cmd.Env = simulatorEnv() - // Use limited-size buffers to prevent memory growth in daemon mode - // Set reasonable limits (10MB stdout, 1MB stderr) for typical simulation responses - stdout := limitedBuffer{Buffer: bytes.Buffer{}, limit: 10 * 1024 * 1024} + stdoutPipe, err := cmd.StdoutPipe() + if err != nil { + return nil, errors.WrapSimCrash(err, "failed to open simulator stdout") + } + stderr := limitedBuffer{Buffer: bytes.Buffer{}, limit: 1 * 1024 * 1024} - cmd.Stdout = &stdout cmd.Stderr = &stderr if err := cmd.Start(); err != nil { @@ -284,6 +328,19 @@ func (r *Runner) Run(ctx context.Context, req *SimulationRequest) (*SimulationRe waitCh <- cmd.Wait() }() + resp, readErr := decodeSimulationResponseStream(ctx, stdoutPipe) + if readErr != nil { + _ = r.terminateProcessGroup(cmd, 1500*time.Millisecond) + <-waitCh + if ctx.Err() != nil { + return nil, ctx.Err() + } + if chunkErr, ok := readErr.(*chunkStreamError); ok { + return nil, chunkErr + } + return nil, errors.WrapUnmarshalFailed(readErr, stderr.String()) + } + select { case err := <-waitCh: if err != nil { @@ -299,27 +356,7 @@ func (r *Runner) Run(ctx context.Context, req *SimulationRequest) (*SimulationRe return nil, ctx.Err() } - var resp SimulationResponse - if err := json.Unmarshal(stdout.Bytes(), &resp); err != nil { - logger.Logger.Error("Failed to unmarshal response", "error", err) - return nil, errors.WrapUnmarshalFailed(err, stdout.String()) - } - - // If the simulator returned a logical error inside the response payload, - // classify it into a unified ErstError before returning to the caller. - if resp.Error != "" { - classified := (&ipc.Error{Code: resp.ErrorCode, Message: resp.Error}).ToErstError() - logger.Logger.Error("Simulator returned error", - "code", classified.Code, - "original", classified.OrigErr, - ) - return nil, classified - } - - resp.ProtocolVersion = &proto.Version - success = true - - return &resp, nil + return resp, nil } // limitedBuffer wraps bytes.Buffer with a size limit to prevent memory leaks diff --git a/internal/simulator/stream.go b/internal/simulator/stream.go new file mode 100644 index 00000000..4345379d --- /dev/null +++ b/internal/simulator/stream.go @@ -0,0 +1,208 @@ +// Copyright 2026 Erst Users +// SPDX-License-Identifier: Apache-2.0 + +package simulator + +import ( + "bufio" + "context" + "encoding/base64" + "encoding/json" + "fmt" + "io" + "os" + "strings" + "time" + + "github.com/dotandev/hintents/internal/ipc" +) + +type chunkStreamError struct { + retryLimit int + err error +} + +func (e *chunkStreamError) Error() string { + return e.err.Error() +} + +func (e *chunkStreamError) Unwrap() error { + return e.err +} + +type lineReadResult struct { + line []byte + err error +} + +func decodeSimulationResponseStream( + ctx context.Context, + reader io.Reader, +) (*SimulationResponse, error) { + lineCh := make(chan lineReadResult, 4) + go readChunkLines(reader, lineCh) + + first, err := waitForChunkLine(ctx, lineCh, time.Duration(ipc.DefaultChunkTimeoutMs)*time.Millisecond) + if err != nil { + return nil, err + } + + var frame ipc.SimulationStreamFrame + if err := json.Unmarshal(first, &frame); err == nil && frame.IsChunkFrame() { + return decodeChunkedSimulationResponse(ctx, lineCh, frame) + } + + var resp SimulationResponse + if err := json.Unmarshal(first, &resp); err != nil { + return nil, err + } + return &resp, nil +} + +func decodeChunkedSimulationResponse( + ctx context.Context, + lineCh <-chan lineReadResult, + start ipc.SimulationStreamFrame, +) (*SimulationResponse, error) { + if start.Kind != ipc.ChunkFrameStart { + return nil, fmt.Errorf("unexpected first chunk frame %q", start.Kind) + } + + timeoutMs := start.TimeoutMs + if timeoutMs <= 0 { + timeoutMs = ipc.DefaultChunkTimeoutMs + } + retryLimit := start.RetryLimit + if retryLimit < 0 { + retryLimit = 0 + } + + tempFile, err := os.CreateTemp("", "erst-sim-response-*.json") + if err != nil { + return nil, err + } + defer func() { + tempFile.Close() + _ = os.Remove(tempFile.Name()) + }() + + expectedIndex := 0 + for { + line, err := waitForChunkLine(ctx, lineCh, time.Duration(timeoutMs)*time.Millisecond) + if err != nil { + return nil, &chunkStreamError{ + retryLimit: retryLimit, + err: fmt.Errorf("chunk delivery failed for stream %s: %w", start.StreamID, err), + } + } + + var frame ipc.SimulationStreamFrame + if err := json.Unmarshal(line, &frame); err != nil { + return nil, &chunkStreamError{ + retryLimit: retryLimit, + err: fmt.Errorf("invalid chunk frame for stream %s: %w", start.StreamID, err), + } + } + + if frame.StreamID != start.StreamID { + return nil, &chunkStreamError{ + retryLimit: retryLimit, + err: fmt.Errorf("unexpected stream id %q while reading %q", frame.StreamID, start.StreamID), + } + } + + switch frame.Kind { + case ipc.ChunkFrameData: + if frame.Index != expectedIndex { + return nil, &chunkStreamError{ + retryLimit: retryLimit, + err: fmt.Errorf("missing chunk: expected %d, received %d", expectedIndex, frame.Index), + } + } + if _, err := io.Copy(tempFile, base64.NewDecoder(base64.StdEncoding, strings.NewReader(frame.DataBase64))); err != nil { + return nil, &chunkStreamError{ + retryLimit: retryLimit, + err: fmt.Errorf("failed to decode chunk %d: %w", frame.Index, err), + } + } + expectedIndex++ + case ipc.ChunkFrameEnd: + if frame.Total != start.Total { + return nil, &chunkStreamError{ + retryLimit: retryLimit, + err: fmt.Errorf("chunk total mismatch: start=%d end=%d", start.Total, frame.Total), + } + } + if expectedIndex != start.Total { + return nil, &chunkStreamError{ + retryLimit: retryLimit, + err: fmt.Errorf("incomplete chunk stream: received %d of %d", expectedIndex, start.Total), + } + } + if _, err := tempFile.Seek(0, io.SeekStart); err != nil { + return nil, err + } + var resp SimulationResponse + if err := json.NewDecoder(tempFile).Decode(&resp); err != nil { + return nil, err + } + return &resp, nil + default: + return nil, &chunkStreamError{ + retryLimit: retryLimit, + err: fmt.Errorf("unexpected chunk frame kind %q", frame.Kind), + } + } + } +} + +func readChunkLines(reader io.Reader, out chan<- lineReadResult) { + defer close(out) + + bufReader := bufio.NewReaderSize(reader, ipc.DefaultChunkMaxLineSize) + for { + line, err := bufReader.ReadBytes('\n') + if len(line) > 0 { + out <- lineReadResult{line: trimLineEndings(line)} + } + if err != nil { + if err != io.EOF { + out <- lineReadResult{err: err} + } + return + } + } +} + +func waitForChunkLine( + ctx context.Context, + lineCh <-chan lineReadResult, + timeout time.Duration, +) ([]byte, error) { + timer := time.NewTimer(timeout) + defer timer.Stop() + + for { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-timer.C: + return nil, fmt.Errorf("timed out waiting for next chunk after %v", timeout) + case res, ok := <-lineCh: + if !ok { + return nil, io.ErrUnexpectedEOF + } + if res.err != nil { + return nil, res.err + } + if len(res.line) == 0 { + continue + } + return res.line, nil + } + } +} + +func trimLineEndings(line []byte) []byte { + return []byte(strings.TrimRight(string(line), "\r\n")) +} diff --git a/internal/simulator/stream_test.go b/internal/simulator/stream_test.go new file mode 100644 index 00000000..40e57cd1 --- /dev/null +++ b/internal/simulator/stream_test.go @@ -0,0 +1,123 @@ +// Copyright 2026 Erst Users +// SPDX-License-Identifier: Apache-2.0 + +package simulator + +import ( + "context" + "encoding/base64" + "encoding/json" + "fmt" + "strings" + "testing" + + "github.com/dotandev/hintents/internal/ipc" +) + +func TestDecodeSimulationResponseStreamPlainJSON(t *testing.T) { + input := `{"status":"success","events":["a","b"]}` + "\n" + + resp, err := decodeSimulationResponseStream(context.Background(), strings.NewReader(input)) + if err != nil { + t.Fatalf("decode failed: %v", err) + } + if resp.Status != "success" { + t.Fatalf("expected success, got %s", resp.Status) + } + if len(resp.Events) != 2 { + t.Fatalf("expected 2 events, got %d", len(resp.Events)) + } +} + +func TestDecodeSimulationResponseStreamChunked(t *testing.T) { + original := SimulationResponse{ + Status: "success", + Events: []string{strings.Repeat("event-", 40000)}, + } + payload, err := json.Marshal(original) + if err != nil { + t.Fatalf("marshal failed: %v", err) + } + + chunkSize := 1024 + total := (len(payload) + chunkSize - 1) / chunkSize + var builder strings.Builder + writeFrame := func(frame ipc.SimulationStreamFrame) { + line, err := json.Marshal(frame) + if err != nil { + t.Fatalf("marshal frame failed: %v", err) + } + builder.Write(line) + builder.WriteByte('\n') + } + + writeFrame(ipc.SimulationStreamFrame{ + Kind: ipc.ChunkFrameStart, + StreamID: "stream-1", + Total: total, + ChunkBytes: chunkSize, + TimeoutMs: 100, + RetryLimit: 1, + }) + for index, offset := 0, 0; offset < len(payload); index, offset = index+1, offset+chunkSize { + end := offset + chunkSize + if end > len(payload) { + end = len(payload) + } + writeFrame(ipc.SimulationStreamFrame{ + Kind: ipc.ChunkFrameData, + StreamID: "stream-1", + Index: index, + DataBase64: base64.StdEncoding.EncodeToString(payload[offset:end]), + }) + } + writeFrame(ipc.SimulationStreamFrame{ + Kind: ipc.ChunkFrameEnd, + StreamID: "stream-1", + Total: total, + TotalBytes: int64(len(payload)), + }) + + resp, err := decodeSimulationResponseStream(context.Background(), strings.NewReader(builder.String())) + if err != nil { + t.Fatalf("decode failed: %v", err) + } + if resp.Status != original.Status { + t.Fatalf("expected status %s, got %s", original.Status, resp.Status) + } + if len(resp.Events) != 1 || resp.Events[0] != original.Events[0] { + t.Fatalf("chunked payload was not reassembled correctly") + } +} + +func TestDecodeSimulationResponseStreamMissingChunkIsRetryable(t *testing.T) { + start, _ := json.Marshal(ipc.SimulationStreamFrame{ + Kind: ipc.ChunkFrameStart, + StreamID: "stream-2", + Total: 2, + ChunkBytes: 16, + TimeoutMs: 100, + RetryLimit: 2, + }) + chunk, _ := json.Marshal(ipc.SimulationStreamFrame{ + Kind: ipc.ChunkFrameData, + StreamID: "stream-2", + Index: 1, + DataBase64: base64.StdEncoding.EncodeToString([]byte(`{"status":"success"}`)), + }) + + _, err := decodeSimulationResponseStream( + context.Background(), + strings.NewReader(fmt.Sprintf("%s\n%s\n", start, chunk)), + ) + if err == nil { + t.Fatal("expected error") + } + chunkErr, ok := err.(*chunkStreamError) + if !ok { + t.Fatalf("expected chunkStreamError, got %T", err) + } + if chunkErr.retryLimit != 2 { + t.Fatalf("expected retryLimit 2, got %d", chunkErr.retryLimit) + } +} diff --git a/simulator/src/context.rs b/simulator/src/context.rs new file mode 100644 index 00000000..e86e2327 --- /dev/null +++ b/simulator/src/context.rs @@ -0,0 +1,208 @@ +// Copyright 2026 Erst Users +// SPDX-License-Identifier: Apache-2.0 + +use std::collections::HashMap; + +use soroban_env_host::events::HostEvent; +use soroban_env_host::xdr::{LedgerEntry, LedgerKey}; + +use crate::runner::{SimHost, SimHostError}; +use crate::snapshot::LedgerSnapshot; + +#[derive(Debug, Clone)] +struct SnapshotState { + ledger: LedgerSnapshot, + event_count: usize, +} + +#[derive(Debug, thiserror::Error)] +pub enum SimulationContextError { + #[error("snapshot '{0}' not found")] + SnapshotNotFound(String), + #[error(transparent)] + Host(#[from] SimHostError), +} + +/// Owns the active simulator host and a rewindable history of snapshots. +pub struct SimulationContext { + host: SimHost, + snapshots: HashMap, + committed_events: Vec, + synced_host_event_count: usize, +} + +impl SimulationContext { + pub fn new(host: SimHost) -> Self { + Self { + host, + snapshots: HashMap::new(), + committed_events: Vec::new(), + synced_host_event_count: 0, + } + } + + pub fn host(&self) -> &SimHost { + &self.host + } + + pub fn host_mut(&mut self) -> &mut SimHost { + &mut self.host + } + + pub fn set_ledger_entry( + &mut self, + key: LedgerKey, + entry: LedgerEntry, + ) -> Result<(), SimulationContextError> { + self.sync_events()?; + self.host.set_ledger_entry(key, entry)?; + self.synced_host_event_count = 0; + Ok(()) + } + + pub fn capture_snapshot( + &mut self, + snapshot_id: impl Into, + ) -> Result<(), SimulationContextError> { + self.sync_events()?; + let snapshot = self.host.capture_snapshot()?; + self.snapshots.insert( + snapshot_id.into(), + SnapshotState { + ledger: snapshot, + event_count: self.committed_events.len(), + }, + ); + Ok(()) + } + + pub fn rollback_to(&mut self, snapshot_id: &str) -> Result<(), SimulationContextError> { + self.sync_events()?; + let snapshot = self + .snapshots + .get(snapshot_id) + .cloned() + .ok_or_else(|| SimulationContextError::SnapshotNotFound(snapshot_id.to_string()))?; + + self.host.restore_from_snapshot(&snapshot.ledger)?; + self.committed_events.truncate(snapshot.event_count); + self.synced_host_event_count = 0; + Ok(()) + } + + pub fn events(&self) -> Result, SimulationContextError> { + let mut events = self.committed_events.clone(); + let host_events = self.host.event_log()?; + events.extend(host_events.into_iter().skip(self.synced_host_event_count)); + Ok(events) + } + + fn sync_events(&mut self) -> Result<(), SimulationContextError> { + let host_events = self.host.event_log()?; + if self.synced_host_event_count < host_events.len() { + let host_event_count = host_events.len(); + self.committed_events + .extend(host_events.into_iter().skip(self.synced_host_event_count)); + self.synced_host_event_count = host_event_count; + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use soroban_env_host::xdr::{ + ContractDataDurability, ContractDataEntry, ContractId, Hash, LedgerEntry, LedgerEntryData, + LedgerEntryExt, LedgerKey, LedgerKeyContractData, Limits, ScAddress, ScVal, WriteXdr, + }; + use soroban_env_host::EnvBase; + use std::rc::Rc; + + fn contract_data_key(id: u8, key: u32) -> Rc { + Rc::new(LedgerKey::ContractData(LedgerKeyContractData { + contract: ScAddress::Contract(ContractId(Hash([id; 32]))), + key: ScVal::U32(key), + durability: ContractDataDurability::Persistent, + })) + } + + fn contract_data_entry(id: u8, key: u32, value: u32, ledger_seq: u32) -> Rc { + Rc::new(LedgerEntry { + last_modified_ledger_seq: ledger_seq, + data: LedgerEntryData::ContractData(ContractDataEntry { + ext: soroban_env_host::xdr::ExtensionPoint::V0, + contract: ScAddress::Contract(ContractId(Hash([id; 32]))), + key: ScVal::U32(key), + durability: ContractDataDurability::Persistent, + val: ScVal::U32(value), + }), + ext: LedgerEntryExt::V0, + }) + } + + #[test] + fn rollback_to_restores_exact_snapshot_and_truncates_future_events() { + let host = SimHost::new(None, None, None); + let mut context = SimulationContext::new(host); + + let first_key = contract_data_key(7, 1); + let first_entry = contract_data_entry(7, 1, 11, 1); + context + .set_ledger_entry(first_key.as_ref().clone(), first_entry.as_ref().clone()) + .expect("initial state should load"); + context + .host() + .inner + .log_from_slice("snapshot boundary", &[]) + .expect("boundary event should be recorded"); + context + .capture_snapshot("snap-a") + .expect("snapshot should save"); + + let second_key = contract_data_key(8, 2); + let second_entry = contract_data_entry(8, 2, 22, 2); + context + .set_ledger_entry(second_key.as_ref().clone(), second_entry.as_ref().clone()) + .expect("later state should load"); + context + .host() + .inner + .log_from_slice("future event", &[]) + .expect("future event should be recorded"); + + let before_rollback_events = context.events().expect("events should load"); + assert_eq!(before_rollback_events.len(), 2); + + context + .rollback_to("snap-a") + .expect("rollback should succeed"); + + let restored_snapshot = context + .host() + .capture_snapshot() + .expect("restored snapshot should be readable"); + assert_eq!(restored_snapshot.len(), 1); + assert!(restored_snapshot + .get(&first_key.to_xdr(Limits::none()).unwrap()) + .is_some()); + assert!(restored_snapshot + .get(&second_key.to_xdr(Limits::none()).unwrap()) + .is_none()); + + let after_rollback_events = context.events().expect("events should load"); + assert_eq!(after_rollback_events.len(), 1); + + context + .set_ledger_entry(second_key.as_ref().clone(), second_entry.as_ref().clone()) + .expect("re-executing from rollback point should work"); + let replayed_snapshot = context + .host() + .capture_snapshot() + .expect("replayed snapshot should be readable"); + assert_eq!(replayed_snapshot.len(), 2); + assert!(replayed_snapshot + .get(&second_key.to_xdr(Limits::none()).unwrap()) + .is_some()); + } +} diff --git a/simulator/src/lib.rs b/simulator/src/lib.rs index 246edc8d..d4bd037a 100644 --- a/simulator/src/lib.rs +++ b/simulator/src/lib.rs @@ -3,9 +3,10 @@ #![allow(clippy::pedantic, clippy::nursery, dead_code)] +pub mod context; pub mod gas_optimizer; pub mod git_detector; -pub mod ipc; +pub mod runner; pub mod snapshot; pub mod source_map_cache; pub mod source_mapper; diff --git a/simulator/src/main.rs b/simulator/src/main.rs index f66dd22a..eb874c6c 100644 --- a/simulator/src/main.rs +++ b/simulator/src/main.rs @@ -9,6 +9,7 @@ mod gas_optimizer; mod git_detector; mod ipc; mod runner; +mod snapshot; mod source_map_cache; mod source_mapper; mod stack_trace; @@ -21,6 +22,7 @@ use crate::source_mapper::SourceMapper; use crate::stack_trace::WasmStackTrace; use crate::types::*; use base64::Engine as _; +use serde::Serialize; use soroban_env_host::xdr::{ReadXdr, WriteXdr}; use soroban_env_host::{ xdr::{Operation, OperationBody}, @@ -30,11 +32,41 @@ use std::collections::HashMap; use std::env; use std::fs; use std::io::{self, Read}; +use std::time::{SystemTime, UNIX_EPOCH}; use tracing_subscriber::{fmt, EnvFilter}; // Use types::SimulationRequest directly const ERR_MEMORY_LIMIT_EXCEEDED: &str = "ERR_MEMORY_LIMIT_EXCEEDED"; +const IPC_CHUNK_BYTES: usize = 1024 * 1024; +const IPC_CHUNK_TIMEOUT_MS: u64 = 5000; +const IPC_CHUNK_RETRY_LIMIT: u32 = 2; + +#[derive(Serialize)] +struct StreamStartFrame<'a> { + kind: &'a str, + stream_id: &'a str, + total: usize, + chunk_bytes: usize, + timeout_ms: u64, + retry_limit: u32, +} + +#[derive(Serialize)] +struct StreamDataFrame<'a> { + kind: &'a str, + stream_id: &'a str, + index: usize, + data_b64: String, +} + +#[derive(Serialize)] +struct StreamEndFrame<'a> { + kind: &'a str, + stream_id: &'a str, + total: usize, + total_bytes: usize, +} fn init_logger() { let use_json = env::var("ERST_LOG_FORMAT") @@ -74,15 +106,68 @@ fn send_error(msg: String) { wasm_offset: None, linear_memory_dump: None, }; - if let Ok(json) = serde_json::to_string(&res) { - println!("{}", json); - } else { + if emit_response(&res).is_err() { eprintln!("Failed to serialize error response"); println!("{{\"status\": \"error\", \"error\": \"Internal serialization error\"}}"); + } else { + std::process::exit(1); } std::process::exit(1); } +fn emit_response(response: &T) -> Result<(), serde_json::Error> { + let json = serde_json::to_vec(response)?; + if json.len() <= IPC_CHUNK_BYTES { + println!("{}", String::from_utf8_lossy(&json)); + return Ok(()); + } + + let stream_id = format!( + "sim-{}", + SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.as_nanos()) + .unwrap_or(0) + ); + let total_chunks = json.len().div_ceil(IPC_CHUNK_BYTES); + + println!( + "{}", + serde_json::to_string(&StreamStartFrame { + kind: "chunk_start", + stream_id: &stream_id, + total: total_chunks, + chunk_bytes: IPC_CHUNK_BYTES, + timeout_ms: IPC_CHUNK_TIMEOUT_MS, + retry_limit: IPC_CHUNK_RETRY_LIMIT, + })? + ); + + for (index, chunk) in json.chunks(IPC_CHUNK_BYTES).enumerate() { + println!( + "{}", + serde_json::to_string(&StreamDataFrame { + kind: "chunk_data", + stream_id: &stream_id, + index, + data_b64: base64::engine::general_purpose::STANDARD.encode(chunk), + })? + ); + } + + println!( + "{}", + serde_json::to_string(&StreamEndFrame { + kind: "chunk_end", + stream_id: &stream_id, + total: total_chunks, + total_bytes: json.len(), + })? + ); + + Ok(()) +} + #[derive(Default)] struct CoverageTracker { invoked_functions: HashMap, @@ -751,9 +836,7 @@ fn main() { linear_memory_dump: None, }; - if let Ok(json) = serde_json::to_string(&response) { - println!("{}", json); - } else { + if emit_response(&response).is_err() { eprintln!("Failed to serialize simulation response"); println!("{{\"status\": \"error\", \"error\": \"Internal serialization error\"}}"); } @@ -784,9 +867,7 @@ fn main() { linear_memory_dump: None, }; - if let Ok(json) = serde_json::to_string(&response) { - println!("{}", json); - } else { + if emit_response(&response).is_err() { eprintln!("Failed to serialize simulation response"); println!("{{\"status\": \"error\", \"error\": \"Internal serialization error\"}}"); } @@ -915,9 +996,7 @@ fn main() { wasm_offset, linear_memory_dump: None, }; - if let Ok(json) = serde_json::to_string(&response) { - println!("{}", json); - } else { + if emit_response(&response).is_err() { eprintln!("Failed to serialize host error response"); println!("{{\"status\": \"error\", \"error\": \"Internal serialization error\"}}"); } @@ -960,9 +1039,7 @@ fn main() { wasm_offset: None, linear_memory_dump: None, }; - if let Ok(json) = serde_json::to_string(&response) { - println!("{}", json); - } else { + if emit_response(&response).is_err() { eprintln!("Failed to serialize panic response"); println!("{{\"status\": \"error\", \"error\": \"Internal serialization error\"}}"); } diff --git a/simulator/src/runner.rs b/simulator/src/runner.rs index 4ce5c01a..02c387bf 100644 --- a/simulator/src/runner.rs +++ b/simulator/src/runner.rs @@ -1,18 +1,33 @@ // Copyright 2026 Erst Users // SPDX-License-Identifier: Apache-2.0 +use base64::Engine; use soroban_env_host::{ budget::Budget, - storage::Storage, - xdr::{Hash, ScErrorCode, ScErrorType}, + events::{Events, HostEvent}, + storage::{AccessType, Footprint, FootprintMap, Storage, StorageMap}, + xdr::{Hash, Limits, ScErrorCode, ScErrorType, WriteXdr}, DiagnosticLevel, Error as EnvError, Host, HostError, TryIntoVal, Val, }; +use std::rc::Rc; + +use crate::snapshot::{LedgerSnapshot, SnapshotError}; + +#[derive(Debug, thiserror::Error)] +pub enum SimHostError { + #[error(transparent)] + Host(#[from] HostError), + #[error(transparent)] + Snapshot(#[from] SnapshotError), +} /// Wrapper around the Soroban Host to manage initialization and execution context. pub struct SimHost { pub inner: Host, - /// Events buffered since the last call to `_drain_events_for_snapshot`. - _pending_events: Vec, + ledger_snapshot: LedgerSnapshot, + budget_limits: Option<(u64, u64)>, + calibration: Option, + memory_limit: Option, } impl SimHost { @@ -20,11 +35,11 @@ impl SimHost { pub fn new( budget_limits: Option<(u64, u64)>, calibration: Option, - _memory_limit: Option, + memory_limit: Option, ) -> Self { let budget = Budget::default(); - if let Some(_calib) = calibration { + if let Some(ref _calib) = calibration { // Note: In newer versions of soroban_env_host, the Budget interface // no longer uses set_model() or CostModel directly like this. // Resource calibration settings from the request are ignored @@ -44,10 +59,97 @@ impl SimHost { Self { inner: host, - _pending_events: Vec::new(), + ledger_snapshot: LedgerSnapshot::new(), + budget_limits, + calibration, + memory_limit, } } + /// Creates a new host initialized with the provided snapshot contents. + pub fn from_snapshot( + budget_limits: Option<(u64, u64)>, + calibration: Option, + memory_limit: Option, + snapshot: &LedgerSnapshot, + ) -> Result { + let budget = Budget::default(); + let storage = Self::storage_from_snapshot(snapshot, &budget)?; + let host = Host::with_storage_and_budget(storage, budget); + host.set_diagnostic_level(DiagnosticLevel::Debug)?; + + Ok(Self { + inner: host, + ledger_snapshot: snapshot.clone(), + budget_limits, + calibration, + memory_limit, + }) + } + + /// Replaces the current host with a freshly initialized host loaded from the snapshot. + pub fn restore_from_snapshot(&mut self, snapshot: &LedgerSnapshot) -> Result<(), SimHostError> { + let restored = Self::from_snapshot( + self.budget_limits, + self.calibration.clone(), + self.memory_limit, + snapshot, + )?; + *self = restored; + Ok(()) + } + + /// Captures the current host storage as a reusable ledger snapshot. + pub fn capture_snapshot(&self) -> Result { + Ok(self.ledger_snapshot.clone()) + } + + /// Returns the host events that have been emitted so far. + pub fn events(&self) -> Result { + Ok(self.inner.get_events()?) + } + + /// Returns the host events as a cloned vector for external history tracking. + pub fn event_log(&self) -> Result, SimHostError> { + Ok(self.events()?.0) + } + + /// Stores or replaces a ledger entry by rebuilding the host from the updated snapshot. + pub fn set_ledger_entry( + &mut self, + key: soroban_env_host::xdr::LedgerKey, + entry: soroban_env_host::xdr::LedgerEntry, + ) -> Result<(), SimHostError> { + let key_bytes = key + .to_xdr(Limits::none()) + .map_err(|e| SnapshotError::XdrEncoding(format!("Failed to encode key: {e}")))?; + self.ledger_snapshot.insert(key_bytes, entry); + let snapshot = self.ledger_snapshot.clone(); + self.restore_from_snapshot(&snapshot) + } + + fn storage_from_snapshot( + snapshot: &LedgerSnapshot, + budget: &Budget, + ) -> Result { + let mut footprint_map = FootprintMap::new(); + let mut storage_map = StorageMap::new(); + + for (key_bytes, entry) in snapshot.iter() { + let key = Rc::new(crate::snapshot::decode_ledger_key( + &base64::engine::general_purpose::STANDARD.encode(key_bytes), + )?); + let entry_rc: Rc = Rc::new(entry.clone()); + footprint_map = footprint_map.insert(Rc::clone(&key), AccessType::ReadWrite, budget)?; + storage_map = storage_map.insert(key, Some((entry_rc, None)), budget)?; + } + + Ok(Storage::with_enforcing_footprint_and_map( + Footprint(footprint_map), + storage_map, + )) + } + /// Set the contract ID for execution context. pub fn _set_contract_id(&mut self, _id: Hash) {} @@ -90,6 +192,11 @@ impl SimHost { #[cfg(test)] mod tests { use super::*; + use soroban_env_host::xdr::{ + ContractDataDurability, ContractDataEntry, ContractId, Hash, LedgerEntry, LedgerEntryData, + LedgerEntryExt, LedgerKey, LedgerKeyContractData, ScAddress, ScVal, + }; + use soroban_env_host::EnvBase; #[test] fn test_host_initialization() { @@ -123,29 +230,70 @@ mod tests { } #[test] - fn test_drain_events_for_snapshot_returns_buffered_events() { + fn test_restore_from_snapshot_replaces_mutated_storage_and_clears_host_events() { let mut host = SimHost::new(None, None, None); - host._push_event("event_a".to_string()); - host._push_event("event_b".to_string()); + let first_key = Rc::new(LedgerKey::ContractData(LedgerKeyContractData { + contract: ScAddress::Contract(ContractId(Hash([1u8; 32]))), + key: ScVal::U32(1), + durability: ContractDataDurability::Persistent, + })); + let first_entry = Rc::new(LedgerEntry { + last_modified_ledger_seq: 1, + data: LedgerEntryData::ContractData(ContractDataEntry { + ext: soroban_env_host::xdr::ExtensionPoint::V0, + contract: ScAddress::Contract(ContractId(Hash([1u8; 32]))), + key: ScVal::U32(1), + durability: ContractDataDurability::Persistent, + val: ScVal::U32(10), + }), + ext: LedgerEntryExt::V0, + }); + host.set_ledger_entry(first_key.as_ref().clone(), first_entry.as_ref().clone()) + .expect("initial entry should be stored"); + host.inner + .log_from_slice("before snapshot", &[]) + .expect("diagnostic event should be recorded"); - let drained = host._drain_events_for_snapshot(); - assert_eq!(drained, vec!["event_a", "event_b"]); - } + let snapshot = host.capture_snapshot().expect("snapshot should capture"); - #[test] - fn test_drain_events_for_snapshot_clears_buffer() { - let mut host = SimHost::new(None, None, None); - host._push_event("event_a".to_string()); - let _ = host._drain_events_for_snapshot(); + let second_key = Rc::new(LedgerKey::ContractData(LedgerKeyContractData { + contract: ScAddress::Contract(ContractId(Hash([2u8; 32]))), + key: ScVal::U32(2), + durability: ContractDataDurability::Persistent, + })); + let second_entry = Rc::new(LedgerEntry { + last_modified_ledger_seq: 2, + data: LedgerEntryData::ContractData(ContractDataEntry { + ext: soroban_env_host::xdr::ExtensionPoint::V0, + contract: ScAddress::Contract(ContractId(Hash([2u8; 32]))), + key: ScVal::U32(2), + durability: ContractDataDurability::Persistent, + val: ScVal::U32(20), + }), + ext: LedgerEntryExt::V0, + }); + host.set_ledger_entry(second_key.as_ref().clone(), second_entry.as_ref().clone()) + .expect("mutated entry should be stored"); + host.inner + .log_from_slice("after snapshot", &[]) + .expect("later event should be recorded"); - let second_drain = host._drain_events_for_snapshot(); - assert!(second_drain.is_empty()); - } + host.restore_from_snapshot(&snapshot) + .expect("restoring snapshot should succeed"); - #[test] - fn test_drain_events_for_snapshot_empty_buffer() { - let mut host = SimHost::new(None, None, None); - let drained = host._drain_events_for_snapshot(); - assert!(drained.is_empty()); + let restored = host + .capture_snapshot() + .expect("restored snapshot should capture"); + assert_eq!(restored.len(), 1); + assert!(restored + .get(&first_key.to_xdr(Limits::none()).unwrap()) + .is_some()); + assert!(restored + .get(&second_key.to_xdr(Limits::none()).unwrap()) + .is_none()); + assert!( + host.events().expect("events should read").0.is_empty(), + "fresh host should not retain post-rollback host events" + ); } }