Skip to content

Commit 5b66e53

Browse files
authored
ToolHive Re-Attachment fix (#2118)
1 parent d367463 commit 5b66e53

File tree

2 files changed

+958
-12
lines changed

2 files changed

+958
-12
lines changed

pkg/transport/stdio.go

Lines changed: 187 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,53 @@
1+
// Package transport provides utilities for handling different transport modes
2+
// for communication between the client and MCP server, including stdio transport
3+
// with automatic re-attachment on Docker/container restarts.
14
package transport
25

36
import (
47
"bytes"
58
"context"
9+
"errors"
610
"fmt"
711
"io"
12+
"net"
813
"net/http"
914
"strings"
1015
"sync"
1116
"time"
1217
"unicode"
1318

19+
"github.com/cenkalti/backoff/v5"
1420
"golang.org/x/exp/jsonrpc2"
1521

1622
"github.com/stacklok/toolhive/pkg/container"
1723
rt "github.com/stacklok/toolhive/pkg/container/runtime"
1824
"github.com/stacklok/toolhive/pkg/ignore"
1925
"github.com/stacklok/toolhive/pkg/logger"
2026
"github.com/stacklok/toolhive/pkg/permissions"
21-
"github.com/stacklok/toolhive/pkg/transport/errors"
27+
transporterrors "github.com/stacklok/toolhive/pkg/transport/errors"
2228
"github.com/stacklok/toolhive/pkg/transport/proxy/httpsse"
2329
"github.com/stacklok/toolhive/pkg/transport/proxy/streamable"
2430
"github.com/stacklok/toolhive/pkg/transport/types"
2531
)
2632

33+
const (
34+
// Retry configuration constants
35+
// defaultMaxRetries is the maximum number of re-attachment attempts after a connection loss.
36+
// Set to 10 to allow sufficient time for Docker/Rancher Desktop to restart (~5 minutes with backoff).
37+
defaultMaxRetries = 10
38+
39+
// defaultInitialRetryDelay is the starting delay for exponential backoff.
40+
// Starts at 2 seconds to quickly recover from transient issues without overwhelming the system.
41+
defaultInitialRetryDelay = 2 * time.Second
42+
43+
// defaultMaxRetryDelay caps the maximum delay between retry attempts.
44+
// Set to 30 seconds to balance between responsiveness and resource usage during extended outages.
45+
defaultMaxRetryDelay = 30 * time.Second
46+
47+
// shutdownTimeout is the maximum time to wait for graceful shutdown operations.
48+
shutdownTimeout = 30 * time.Second
49+
)
50+
2751
// StdioTransport implements the Transport interface using standard input/output.
2852
// It acts as a proxy between the MCP client and the container's stdin/stdout.
2953
type StdioTransport struct {
@@ -53,6 +77,25 @@ type StdioTransport struct {
5377

5478
// Container monitor
5579
monitor rt.Monitor
80+
81+
// Retry configuration (for testing)
82+
retryConfig *retryConfig
83+
}
84+
85+
// retryConfig holds configuration for retry behavior
86+
type retryConfig struct {
87+
maxRetries int
88+
initialDelay time.Duration
89+
maxDelay time.Duration
90+
}
91+
92+
// defaultRetryConfig returns the default retry configuration
93+
func defaultRetryConfig() *retryConfig {
94+
return &retryConfig{
95+
maxRetries: defaultMaxRetries,
96+
initialDelay: defaultInitialRetryDelay,
97+
maxDelay: defaultMaxRetryDelay,
98+
}
5699
}
57100

58101
// NewStdioTransport creates a new stdio transport.
@@ -75,6 +118,7 @@ func NewStdioTransport(
75118
prometheusHandler: prometheusHandler,
76119
shutdownCh: make(chan struct{}),
77120
proxyMode: types.ProxyModeSSE, // default to SSE for backward compatibility
121+
retryConfig: defaultRetryConfig(),
78122
}
79123
}
80124

@@ -150,7 +194,7 @@ func (t *StdioTransport) Start(ctx context.Context) error {
150194
defer t.mutex.Unlock()
151195

152196
if t.containerName == "" {
153-
return errors.ErrContainerNameNotSet
197+
return transporterrors.ErrContainerNameNotSet
154198
}
155199

156200
if t.deployer == nil {
@@ -291,8 +335,34 @@ func (t *StdioTransport) IsRunning(_ context.Context) (bool, error) {
291335
}
292336
}
293337

338+
// isDockerSocketError checks if an error indicates Docker socket unavailability using typed error detection
339+
func isDockerSocketError(err error) bool {
340+
if err == nil {
341+
return false
342+
}
343+
344+
// Check for EOF errors
345+
if errors.Is(err, io.EOF) {
346+
return true
347+
}
348+
349+
// Check for network-related errors
350+
var netErr *net.OpError
351+
if errors.As(err, &netErr) {
352+
// Connection refused typically indicates Docker daemon is not running
353+
return true
354+
}
355+
356+
// Fallback to string matching for errors that don't implement standard interfaces
357+
// This handles Docker SDK errors that may not wrap standard error types
358+
errStr := err.Error()
359+
return strings.Contains(errStr, "EOF") ||
360+
strings.Contains(errStr, "connection refused") ||
361+
strings.Contains(errStr, "Cannot connect to the Docker daemon")
362+
}
363+
294364
// processMessages handles the message exchange between the client and container.
295-
func (t *StdioTransport) processMessages(ctx context.Context, stdin io.WriteCloser, stdout io.ReadCloser) {
365+
func (t *StdioTransport) processMessages(ctx context.Context, _ io.WriteCloser, stdout io.ReadCloser) {
296366
// Create a context that will be canceled when shutdown is signaled
297367
ctx, cancel := context.WithCancel(ctx)
298368
defer cancel()
@@ -317,15 +387,113 @@ func (t *StdioTransport) processMessages(ctx context.Context, stdin io.WriteClos
317387
case <-ctx.Done():
318388
return
319389
case msg := <-messageCh:
320-
logger.Info("Process incoming messages and sending message to container")
321-
if err := t.sendMessageToContainer(ctx, stdin, msg); err != nil {
390+
logger.Debug("Processing incoming message and sending to container")
391+
// Use t.stdin instead of parameter so it uses the current stdin after re-attachment
392+
t.mutex.Lock()
393+
currentStdin := t.stdin
394+
t.mutex.Unlock()
395+
if err := t.sendMessageToContainer(ctx, currentStdin, msg); err != nil {
322396
logger.Errorf("Error sending message to container: %v", err)
323397
}
324-
logger.Info("Messages processed")
398+
logger.Debug("Message processed")
325399
}
326400
}
327401
}
328402

403+
// attemptReattachment tries to re-attach to a container that has lost its stdout connection.
404+
// Returns true if re-attachment was successful, false otherwise.
405+
func (t *StdioTransport) attemptReattachment(ctx context.Context, stdout io.ReadCloser) bool {
406+
if t.deployer == nil || t.containerName == "" {
407+
return false
408+
}
409+
410+
// Create an exponential backoff with the configured parameters
411+
expBackoff := backoff.NewExponentialBackOff()
412+
expBackoff.InitialInterval = t.retryConfig.initialDelay
413+
expBackoff.MaxInterval = t.retryConfig.maxDelay
414+
// Reset to allow unlimited elapsed time - we control retries via MaxTries
415+
expBackoff.Reset()
416+
417+
var attemptCount int
418+
maxRetries := t.retryConfig.maxRetries
419+
420+
operation := func() (any, error) {
421+
attemptCount++
422+
423+
// Check if context is cancelled
424+
select {
425+
case <-ctx.Done():
426+
return nil, backoff.Permanent(ctx.Err())
427+
default:
428+
}
429+
430+
running, checkErr := t.deployer.IsWorkloadRunning(ctx, t.containerName)
431+
if checkErr != nil {
432+
// Check if error is due to Docker being unavailable
433+
if isDockerSocketError(checkErr) {
434+
logger.Warnf("Docker socket unavailable (attempt %d/%d), will retry: %v", attemptCount, maxRetries, checkErr)
435+
return nil, checkErr // Retry
436+
}
437+
logger.Warnf("Error checking if container is running (attempt %d/%d): %v", attemptCount, maxRetries, checkErr)
438+
return nil, checkErr // Retry
439+
}
440+
441+
if !running {
442+
logger.Infof("Container not running (attempt %d/%d)", attemptCount, maxRetries)
443+
return nil, backoff.Permanent(fmt.Errorf("container not running"))
444+
}
445+
446+
logger.Warn("Container is still running after stdout EOF - attempting to re-attach")
447+
448+
// Try to re-attach to the container
449+
newStdin, newStdout, attachErr := t.deployer.AttachToWorkload(ctx, t.containerName)
450+
if attachErr != nil {
451+
logger.Errorf("Failed to re-attach to container (attempt %d/%d): %v", attemptCount, maxRetries, attachErr)
452+
return nil, attachErr // Retry
453+
}
454+
455+
logger.Info("Successfully re-attached to container - restarting message processing")
456+
457+
// Close old stdout and log any errors
458+
if closeErr := stdout.Close(); closeErr != nil {
459+
logger.Warnf("Error closing old stdout during re-attachment: %v", closeErr)
460+
}
461+
462+
// Update stdio references with proper synchronization
463+
t.mutex.Lock()
464+
t.stdin = newStdin
465+
t.stdout = newStdout
466+
t.mutex.Unlock()
467+
468+
// Start ONLY the stdout reader, not the full processMessages
469+
// The existing processMessages goroutine is still running and handling stdin
470+
go t.processStdout(ctx, newStdout)
471+
logger.Info("Restarted stdout processing with new pipe")
472+
return nil, nil // Success
473+
}
474+
475+
// Execute the operation with retry
476+
// Safe conversion: maxRetries is constrained by defaultMaxRetries constant (10)
477+
_, err := backoff.Retry(ctx, operation,
478+
backoff.WithBackOff(expBackoff),
479+
backoff.WithMaxTries(uint(maxRetries)), // #nosec G115
480+
backoff.WithNotify(func(_ error, duration time.Duration) {
481+
logger.Infof("Retry attempt %d/%d after %v", attemptCount+1, maxRetries, duration)
482+
}),
483+
)
484+
485+
if err != nil {
486+
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
487+
logger.Warnf("Re-attachment cancelled or timed out: %v", err)
488+
} else {
489+
logger.Warn("Failed to re-attach after all retry attempts")
490+
}
491+
return false
492+
}
493+
494+
return true
495+
}
496+
329497
// processStdout reads from the container's stdout and processes JSON-RPC messages.
330498
func (t *StdioTransport) processStdout(ctx context.Context, stdout io.ReadCloser) {
331499
// Create a buffer for accumulating data
@@ -343,7 +511,14 @@ func (t *StdioTransport) processStdout(ctx context.Context, stdout io.ReadCloser
343511
n, err := stdout.Read(readBuffer)
344512
if err != nil {
345513
if err == io.EOF {
346-
logger.Info("Container stdout closed")
514+
logger.Warn("Container stdout closed - checking if container is still running")
515+
516+
// Try to re-attach to the container
517+
if t.attemptReattachment(ctx, stdout) {
518+
return
519+
}
520+
521+
logger.Info("Container stdout closed - exiting read loop")
347522
} else {
348523
logger.Errorf("Error reading from container stdout: %v", err)
349524
}
@@ -418,11 +593,13 @@ func sanitizeBinaryString(input string) string {
418593
}
419594

420595
// isSpace reports whether r is a space character as defined by JSON.
421-
// These are the valid space characters in this implementation:
596+
// These are the valid space characters in JSON:
422597
// - ' ' (U+0020, SPACE)
598+
// - '\t' (U+0009, HORIZONTAL TAB)
423599
// - '\n' (U+000A, LINE FEED)
600+
// - '\r' (U+000D, CARRIAGE RETURN)
424601
func isSpace(r rune) bool {
425-
return r == ' ' || r == '\n'
602+
return r == ' ' || r == '\t' || r == '\n' || r == '\r'
426603
}
427604

428605
// parseAndForwardJSONRPC parses a JSON-RPC message and forwards it.
@@ -499,7 +676,7 @@ func (t *StdioTransport) handleContainerExit(ctx context.Context) {
499676
default:
500677
// Transport is still running, stop it
501678
// Create a context with timeout for stopping the transport
502-
stopCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
679+
stopCtx, cancel := context.WithTimeout(context.Background(), shutdownTimeout)
503680
defer cancel()
504681

505682
if stopErr := t.Stop(stopCtx); stopErr != nil {

0 commit comments

Comments
 (0)