Skip to content

Commit 0728c23

Browse files
committed
rabbit feedback around discovery issues, but refactored at the same time
1 parent 68234fb commit 0728c23

File tree

5 files changed

+121
-78
lines changed

5 files changed

+121
-78
lines changed

internal/adapter/proxy/core/retry.go

Lines changed: 105 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -53,96 +53,136 @@ func (h *RetryHandler) ExecuteWithRetry(
5353
availableEndpoints := make([]*domain.Endpoint, len(endpoints))
5454
copy(availableEndpoints, endpoints)
5555

56+
// Preserve request body for potential retries
57+
bodyBytes, err := h.preserveRequestBody(r)
58+
if err != nil {
59+
return err
60+
}
61+
5662
var lastErr error
5763
maxRetries := len(endpoints)
5864
attemptCount := 0
5965

60-
// Preserve request body for potential retries
61-
var bodyBytes []byte
62-
if r.Body != nil && r.Body != http.NoBody {
63-
var err error
64-
bodyBytes, err = io.ReadAll(r.Body)
65-
if err != nil {
66-
h.logger.Error("Failed to read request body for retry preservation",
67-
"error", err)
68-
return fmt.Errorf("failed to read request body: %w", err)
69-
}
70-
71-
// Close the original body and handle any error
72-
if err := r.Body.Close(); err != nil {
73-
h.logger.Warn("Failed to close original request body",
74-
"error", err)
75-
// Continue as the body has been read successfully
76-
}
77-
78-
// Recreate the body for the first attempt
79-
r.Body = io.NopCloser(bytes.NewReader(bodyBytes))
80-
}
81-
8266
for attemptCount < maxRetries && len(availableEndpoints) > 0 {
83-
// Check for context cancellation before each attempt
84-
select {
85-
case <-ctx.Done():
86-
return fmt.Errorf("request cancelled: %w", ctx.Err())
87-
default:
67+
if err := h.checkContextCancellation(ctx); err != nil {
68+
return err
8869
}
8970

90-
// Reset body for retries (skip first iteration as body already set above)
91-
if bodyBytes != nil && attemptCount > 0 {
92-
r.Body = io.NopCloser(bytes.NewReader(bodyBytes))
93-
}
71+
h.resetRequestBodyForRetry(r, bodyBytes, attemptCount)
9472

9573
endpoint, err := selector.Select(ctx, availableEndpoints)
9674
if err != nil {
9775
return fmt.Errorf("endpoint selection failed: %w", err)
9876
}
9977

10078
attemptCount++
101-
err = proxyFunc(ctx, w, r, endpoint, stats)
79+
lastErr = h.executeProxyAttempt(ctx, w, r, endpoint, selector, stats, proxyFunc)
10280

103-
if err == nil {
81+
if lastErr == nil {
10482
return nil
10583
}
10684

107-
lastErr = err
108-
109-
if IsConnectionError(err) {
110-
h.logger.Warn("Connection failed to endpoint, marking as unhealthy",
111-
"endpoint", endpoint.Name,
112-
"error", err,
113-
"attempt", attemptCount,
114-
"remaining_endpoints", len(availableEndpoints)-1)
115-
116-
h.markEndpointUnhealthy(ctx, endpoint)
117-
118-
// Remove failed endpoint in-place to avoid allocation
119-
// Find and remove the failed endpoint by shifting elements
120-
for i := 0; i < len(availableEndpoints); i++ {
121-
if availableEndpoints[i].Name == endpoint.Name {
122-
// Remove element at index i by copying subsequent elements
123-
copy(availableEndpoints[i:], availableEndpoints[i+1:])
124-
availableEndpoints = availableEndpoints[:len(availableEndpoints)-1]
125-
break
126-
}
127-
}
128-
129-
if len(availableEndpoints) > 0 && attemptCount < maxRetries {
130-
h.logger.Info("Retrying request with different endpoint",
131-
"available_endpoints", len(availableEndpoints),
132-
"attempts_remaining", maxRetries-attemptCount)
133-
continue
134-
}
135-
} else {
85+
if !IsConnectionError(lastErr) {
13686
// Non-connection error warrants immediate failure
137-
return err
87+
return lastErr
88+
}
89+
90+
// Handle connection error and retry logic
91+
availableEndpoints = h.handleConnectionFailure(ctx, endpoint, lastErr, attemptCount, availableEndpoints, maxRetries)
92+
}
93+
94+
return h.buildFinalError(availableEndpoints, maxRetries, lastErr)
95+
}
96+
97+
// preserveRequestBody reads and preserves request body for potential retries
98+
func (h *RetryHandler) preserveRequestBody(r *http.Request) ([]byte, error) {
99+
if r.Body == nil || r.Body == http.NoBody {
100+
return nil, nil
101+
}
102+
103+
bodyBytes, err := io.ReadAll(r.Body)
104+
if err != nil {
105+
h.logger.Error("Failed to read request body for retry preservation", "error", err)
106+
return nil, fmt.Errorf("failed to read request body: %w", err)
107+
}
108+
109+
if err := r.Body.Close(); err != nil {
110+
h.logger.Warn("Failed to close original request body", "error", err)
111+
}
112+
113+
// Recreate the body for the first attempt
114+
r.Body = io.NopCloser(bytes.NewReader(bodyBytes))
115+
return bodyBytes, nil
116+
}
117+
118+
// checkContextCancellation verifies if the context has been cancelled
119+
func (h *RetryHandler) checkContextCancellation(ctx context.Context) error {
120+
select {
121+
case <-ctx.Done():
122+
return fmt.Errorf("request cancelled: %w", ctx.Err())
123+
default:
124+
return nil
125+
}
126+
}
127+
128+
// resetRequestBodyForRetry recreates request body for retry attempts
129+
func (h *RetryHandler) resetRequestBodyForRetry(r *http.Request, bodyBytes []byte, attemptCount int) {
130+
if bodyBytes != nil && attemptCount > 0 {
131+
r.Body = io.NopCloser(bytes.NewReader(bodyBytes))
132+
}
133+
}
134+
135+
// executeProxyAttempt executes a single proxy attempt with connection counting
136+
func (h *RetryHandler) executeProxyAttempt(ctx context.Context, w http.ResponseWriter, r *http.Request,
137+
endpoint *domain.Endpoint, selector domain.EndpointSelector, stats *ports.RequestStats, proxyFunc ProxyFunc) error {
138+
139+
selector.IncrementConnections(endpoint)
140+
defer selector.DecrementConnections(endpoint)
141+
142+
return proxyFunc(ctx, w, r, endpoint, stats)
143+
}
144+
145+
// handleConnectionFailure processes connection failures and manages endpoint removal
146+
func (h *RetryHandler) handleConnectionFailure(ctx context.Context, endpoint *domain.Endpoint,
147+
err error, attemptCount int, availableEndpoints []*domain.Endpoint, maxRetries int) []*domain.Endpoint {
148+
149+
h.logger.Warn("Connection failed to endpoint, marking as unhealthy",
150+
"endpoint", endpoint.Name,
151+
"error", err,
152+
"attempt", attemptCount,
153+
"remaining_endpoints", len(availableEndpoints)-1)
154+
155+
h.markEndpointUnhealthy(ctx, endpoint)
156+
157+
// Remove failed endpoint from available list
158+
updatedEndpoints := h.removeFailedEndpoint(availableEndpoints, endpoint)
159+
160+
if len(updatedEndpoints) > 0 && attemptCount < maxRetries {
161+
h.logger.Info("Retrying request with different endpoint",
162+
"available_endpoints", len(updatedEndpoints),
163+
"attempts_remaining", maxRetries-attemptCount)
164+
}
165+
166+
return updatedEndpoints
167+
}
168+
169+
// removeFailedEndpoint removes the failed endpoint from the available list
170+
func (h *RetryHandler) removeFailedEndpoint(endpoints []*domain.Endpoint, failedEndpoint *domain.Endpoint) []*domain.Endpoint {
171+
for i := 0; i < len(endpoints); i++ {
172+
if endpoints[i].Name == failedEndpoint.Name {
173+
// Remove element at index i by copying subsequent elements
174+
copy(endpoints[i:], endpoints[i+1:])
175+
return endpoints[:len(endpoints)-1]
138176
}
139177
}
178+
return endpoints
179+
}
140180

141-
// All endpoints exhausted or max attempts reached
181+
// buildFinalError constructs the appropriate error message for retry failure
182+
func (h *RetryHandler) buildFinalError(availableEndpoints []*domain.Endpoint, maxRetries int, lastErr error) error {
142183
if len(availableEndpoints) == 0 {
143184
return fmt.Errorf("all endpoints failed with connection errors: %w", lastErr)
144185
}
145-
146186
return fmt.Errorf("max attempts (%d) reached: %w", maxRetries, lastErr)
147187
}
148188

internal/adapter/registry/routing/discovery_strategy.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ func (s *DiscoveryStrategy) GetRoutableEndpoints(
174174
return nil, ports.NewRoutingDecision(
175175
s.Name(),
176176
ports.RoutingActionRejected,
177-
"no_healthy_after_discovery",
177+
constants.RoutingReasonNoHealthyAfterDiscovery,
178178
), domain.NewModelRoutingError(
179179
modelName,
180180
s.Name(),

internal/adapter/registry/unified_memory_registry.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -418,14 +418,14 @@ func (a *discoveryServiceAdapter) RefreshEndpoints(ctx context.Context) error {
418418
}
419419

420420
func (a *discoveryServiceAdapter) GetEndpoints(ctx context.Context) ([]*domain.Endpoint, error) {
421-
// get all endpoints, not just healthy ones
422-
if provider, ok := a.discovery.(interface {
423-
GetEndpoints(context.Context) ([]*domain.Endpoint, error)
424-
}); ok {
425-
return provider.GetEndpoints(ctx)
421+
// Try to get all endpoints first
422+
endpoints, err := a.discovery.GetEndpoints(ctx)
423+
if err != nil {
424+
// Fallback to healthy endpoints for implementations that may fail
425+
// when retrieving all endpoints or only provide healthy endpoints
426+
return a.discovery.GetHealthyEndpoints(ctx)
426427
}
427-
// fall to healthy endpoints if GetEndpoints not available
428-
return a.discovery.GetHealthyEndpoints(ctx)
428+
return endpoints, nil
429429
}
430430

431431
func (a *discoveryServiceAdapter) GetHealthyEndpoints(ctx context.Context) ([]*domain.Endpoint, error) {

internal/config/types.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,11 @@ type ProxyConfig struct {
6161
ConnectionTimeout time.Duration `yaml:"connection_timeout"`
6262
ResponseTimeout time.Duration `yaml:"response_timeout"`
6363
ReadTimeout time.Duration `yaml:"read_timeout"`
64-
MaxRetries int `yaml:"max_retries"`
65-
RetryBackoff time.Duration `yaml:"retry_backoff"`
66-
StreamBufferSize int `yaml:"stream_buffer_size"`
64+
// Deprecated: Use model_registry.routing_strategy instead. Retained for backward compatibility. TODO: Removal: v0.1.0
65+
MaxRetries int `yaml:"max_retries"`
66+
// Deprecated: Use model_registry.routing_strategy instead. Retained for backward compatibility. TODO: Removal: v0.1.0
67+
RetryBackoff time.Duration `yaml:"retry_backoff"`
68+
StreamBufferSize int `yaml:"stream_buffer_size"`
6769
}
6870

6971
// DiscoveryConfig holds service discovery configuration

internal/core/constants/routing.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,9 @@ const (
99
RoutingReasonModelFoundNoRefresh = "model_found_no_refresh"
1010

1111
// Model not found scenarios (404 Not Found)
12-
RoutingReasonModelNotFound = "model_not_found"
13-
RoutingReasonModelNotFoundFallback = "model_not_found_fallback"
12+
RoutingReasonModelNotFound = "model_not_found"
13+
RoutingReasonModelNotFoundFallback = "model_not_found_fallback"
14+
RoutingReasonNoHealthyAfterDiscovery = "no_healthy_after_discovery"
1415

1516
// Model unavailable scenarios (503 Service Unavailable)
1617
RoutingReasonModelUnavailable = "model_unavailable"

0 commit comments

Comments
 (0)