Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions internal/ipc/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,17 @@ import (
"github.com/dotandev/hintents/internal/errors"
)

const (
ChunkFrameStart = "chunk_start"
ChunkFrameData = "chunk_data"
ChunkFrameEnd = "chunk_end"

DefaultChunkBytes = 1024 * 1024
DefaultChunkTimeoutMs = 5000
DefaultChunkRetryLimit = 2
DefaultChunkMaxLineSize = 2 * 1024 * 1024
)

// ToErstError converts an IPC Error from the Rust simulator into the unified ErstError type.
// The original Code and Message strings are preserved in OrigErr.
// Note: the Rust simulator currently emits plain message strings without structured codes,
Expand Down Expand Up @@ -106,6 +117,22 @@ type SimulationResponseSchema struct {
Version string `json:"version"`
}

type SimulationStreamFrame struct {
Kind string `json:"kind"`
StreamID string `json:"stream_id,omitempty"`
Index int `json:"index,omitempty"`
Total int `json:"total,omitempty"`
ChunkBytes int `json:"chunk_bytes,omitempty"`
TimeoutMs int `json:"timeout_ms,omitempty"`
RetryLimit int `json:"retry_limit,omitempty"`
DataBase64 string `json:"data_b64,omitempty"`
TotalBytes int64 `json:"total_bytes,omitempty"`
}

func (f SimulationStreamFrame) IsChunkFrame() bool {
return f.Kind == ChunkFrameStart || f.Kind == ChunkFrameData || f.Kind == ChunkFrameEnd
}

type Error struct {
Code string `json:"code"`
Message string `json:"message"`
Expand Down
87 changes: 62 additions & 25 deletions internal/simulator/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,49 @@ func (r *Runner) Run(ctx context.Context, req *SimulationRequest) (*SimulationRe
req.Timestamp = r.MockTime
}

const maxChunkReadRetries = ipc.DefaultChunkRetryLimit

var (
resp *SimulationResponse
err error
)
for attempt := 0; attempt <= maxChunkReadRetries; attempt++ {
resp, err = r.runAttempt(ctx, req)
if err == nil {
break
}

chunkErr, ok := err.(*chunkStreamError)
if !ok || attempt >= chunkErr.retryLimit {
return nil, err
}

logger.Logger.Warn(
"Retrying simulator after chunk delivery failure",
"attempt", attempt+1,
"retry_limit", chunkErr.retryLimit,
"error", chunkErr.Error(),
)
}

// If the simulator returned a logical error inside the response payload,
// classify it into a unified ErstError before returning to the caller.
if resp.Error != "" {
classified := (&ipc.Error{Code: resp.ErrorCode, Message: resp.Error}).ToErstError()
logger.Logger.Error("Simulator returned error",
"code", classified.Code,
"original", classified.OrigErr,
)
return nil, classified
}

resp.ProtocolVersion = &proto.Version
success = true

return resp, nil
}

func (r *Runner) runAttempt(ctx context.Context, req *SimulationRequest) (*SimulationResponse, error) {
inputBytes, err := json.Marshal(req)
if err != nil {
logger.Logger.Error("Failed to marshal simulation request", "error", err)
Expand All @@ -261,11 +304,12 @@ func (r *Runner) Run(ctx context.Context, req *SimulationRequest) (*SimulationRe
cmd.Stdin = bytes.NewReader(inputBytes)
cmd.Env = simulatorEnv()

// Use limited-size buffers to prevent memory growth in daemon mode
// Set reasonable limits (10MB stdout, 1MB stderr) for typical simulation responses
stdout := limitedBuffer{Buffer: bytes.Buffer{}, limit: 10 * 1024 * 1024}
stdoutPipe, err := cmd.StdoutPipe()
if err != nil {
return nil, errors.WrapSimCrash(err, "failed to open simulator stdout")
}

stderr := limitedBuffer{Buffer: bytes.Buffer{}, limit: 1 * 1024 * 1024}
cmd.Stdout = &stdout
cmd.Stderr = &stderr

if err := cmd.Start(); err != nil {
Expand All @@ -284,6 +328,19 @@ func (r *Runner) Run(ctx context.Context, req *SimulationRequest) (*SimulationRe
waitCh <- cmd.Wait()
}()

resp, readErr := decodeSimulationResponseStream(ctx, stdoutPipe)
if readErr != nil {
_ = r.terminateProcessGroup(cmd, 1500*time.Millisecond)
<-waitCh
if ctx.Err() != nil {
return nil, ctx.Err()
}
if chunkErr, ok := readErr.(*chunkStreamError); ok {
return nil, chunkErr
}
return nil, errors.WrapUnmarshalFailed(readErr, stderr.String())
}

select {
case err := <-waitCh:
if err != nil {
Expand All @@ -299,27 +356,7 @@ func (r *Runner) Run(ctx context.Context, req *SimulationRequest) (*SimulationRe
return nil, ctx.Err()
}

var resp SimulationResponse
if err := json.Unmarshal(stdout.Bytes(), &resp); err != nil {
logger.Logger.Error("Failed to unmarshal response", "error", err)
return nil, errors.WrapUnmarshalFailed(err, stdout.String())
}

// If the simulator returned a logical error inside the response payload,
// classify it into a unified ErstError before returning to the caller.
if resp.Error != "" {
classified := (&ipc.Error{Code: resp.ErrorCode, Message: resp.Error}).ToErstError()
logger.Logger.Error("Simulator returned error",
"code", classified.Code,
"original", classified.OrigErr,
)
return nil, classified
}

resp.ProtocolVersion = &proto.Version
success = true

return &resp, nil
return resp, nil
}

// limitedBuffer wraps bytes.Buffer with a size limit to prevent memory leaks
Expand Down
208 changes: 208 additions & 0 deletions internal/simulator/stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
// Copyright 2026 Erst Users
// SPDX-License-Identifier: Apache-2.0

package simulator

import (
"bufio"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"os"
"strings"
"time"

"github.com/dotandev/hintents/internal/ipc"
)

type chunkStreamError struct {
retryLimit int
err error
}

func (e *chunkStreamError) Error() string {
return e.err.Error()
}

func (e *chunkStreamError) Unwrap() error {
return e.err
}

type lineReadResult struct {
line []byte
err error
}

func decodeSimulationResponseStream(
ctx context.Context,
reader io.Reader,
) (*SimulationResponse, error) {
lineCh := make(chan lineReadResult, 4)
go readChunkLines(reader, lineCh)

first, err := waitForChunkLine(ctx, lineCh, time.Duration(ipc.DefaultChunkTimeoutMs)*time.Millisecond)
if err != nil {
return nil, err
}

var frame ipc.SimulationStreamFrame
if err := json.Unmarshal(first, &frame); err == nil && frame.IsChunkFrame() {
return decodeChunkedSimulationResponse(ctx, lineCh, frame)
}

var resp SimulationResponse
if err := json.Unmarshal(first, &resp); err != nil {
return nil, err
}
return &resp, nil
}

func decodeChunkedSimulationResponse(
ctx context.Context,
lineCh <-chan lineReadResult,
start ipc.SimulationStreamFrame,
) (*SimulationResponse, error) {
if start.Kind != ipc.ChunkFrameStart {
return nil, fmt.Errorf("unexpected first chunk frame %q", start.Kind)
}

timeoutMs := start.TimeoutMs
if timeoutMs <= 0 {
timeoutMs = ipc.DefaultChunkTimeoutMs
}
retryLimit := start.RetryLimit
if retryLimit < 0 {
retryLimit = 0
}

tempFile, err := os.CreateTemp("", "erst-sim-response-*.json")
if err != nil {
return nil, err
}
defer func() {
tempFile.Close()
_ = os.Remove(tempFile.Name())
}()

expectedIndex := 0
for {
line, err := waitForChunkLine(ctx, lineCh, time.Duration(timeoutMs)*time.Millisecond)
if err != nil {
return nil, &chunkStreamError{
retryLimit: retryLimit,
err: fmt.Errorf("chunk delivery failed for stream %s: %w", start.StreamID, err),
}
}

var frame ipc.SimulationStreamFrame
if err := json.Unmarshal(line, &frame); err != nil {
return nil, &chunkStreamError{
retryLimit: retryLimit,
err: fmt.Errorf("invalid chunk frame for stream %s: %w", start.StreamID, err),
}
}

if frame.StreamID != start.StreamID {
return nil, &chunkStreamError{
retryLimit: retryLimit,
err: fmt.Errorf("unexpected stream id %q while reading %q", frame.StreamID, start.StreamID),
}
}

switch frame.Kind {
case ipc.ChunkFrameData:
if frame.Index != expectedIndex {
return nil, &chunkStreamError{
retryLimit: retryLimit,
err: fmt.Errorf("missing chunk: expected %d, received %d", expectedIndex, frame.Index),
}
}
if _, err := io.Copy(tempFile, base64.NewDecoder(base64.StdEncoding, strings.NewReader(frame.DataBase64))); err != nil {
return nil, &chunkStreamError{
retryLimit: retryLimit,
err: fmt.Errorf("failed to decode chunk %d: %w", frame.Index, err),
}
}
expectedIndex++
case ipc.ChunkFrameEnd:
if frame.Total != start.Total {
return nil, &chunkStreamError{
retryLimit: retryLimit,
err: fmt.Errorf("chunk total mismatch: start=%d end=%d", start.Total, frame.Total),
}
}
if expectedIndex != start.Total {
return nil, &chunkStreamError{
retryLimit: retryLimit,
err: fmt.Errorf("incomplete chunk stream: received %d of %d", expectedIndex, start.Total),
}
}
if _, err := tempFile.Seek(0, io.SeekStart); err != nil {
return nil, err
}
var resp SimulationResponse
if err := json.NewDecoder(tempFile).Decode(&resp); err != nil {
return nil, err
}
return &resp, nil
default:
return nil, &chunkStreamError{
retryLimit: retryLimit,
err: fmt.Errorf("unexpected chunk frame kind %q", frame.Kind),
}
}
}
}

func readChunkLines(reader io.Reader, out chan<- lineReadResult) {
defer close(out)

bufReader := bufio.NewReaderSize(reader, ipc.DefaultChunkMaxLineSize)
for {
line, err := bufReader.ReadBytes('\n')
if len(line) > 0 {
out <- lineReadResult{line: trimLineEndings(line)}
}
if err != nil {
if err != io.EOF {
out <- lineReadResult{err: err}
}
return
}
}
}

func waitForChunkLine(
ctx context.Context,
lineCh <-chan lineReadResult,
timeout time.Duration,
) ([]byte, error) {
timer := time.NewTimer(timeout)
defer timer.Stop()

for {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-timer.C:
return nil, fmt.Errorf("timed out waiting for next chunk after %v", timeout)
case res, ok := <-lineCh:
if !ok {
return nil, io.ErrUnexpectedEOF
}
if res.err != nil {
return nil, res.err
}
if len(res.line) == 0 {
continue
}
return res.line, nil
}
}
}

func trimLineEndings(line []byte) []byte {
return []byte(strings.TrimRight(string(line), "\r\n"))
}
Loading
Loading