Skip to content

Commit 758d3f2

Browse files
committed
api: Add all in-flight requests /reverse_proxy/upstreams (#7277)
1 parent 65e0ddc commit 758d3f2

File tree

3 files changed

+64
-0
lines changed

3 files changed

+64
-0
lines changed

modules/caddyhttp/reverseproxy/admin.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,34 @@ func (adminUpstreams) handleUpstreams(w http.ResponseWriter, r *http.Request) er
102102
})
103103
return true
104104
})
105+
// Iterate over the inflight hosts
106+
inflightHosts.Range(func(key, val any) bool {
107+
address, ok := key.(string)
108+
if !ok {
109+
rangeErr = caddy.APIError{
110+
HTTPStatus: http.StatusInternalServerError,
111+
Err: fmt.Errorf("could not type assert upstream address"),
112+
}
113+
return false
114+
}
115+
116+
upstream, ok := val.(*Host)
117+
if !ok {
118+
rangeErr = caddy.APIError{
119+
HTTPStatus: http.StatusInternalServerError,
120+
Err: fmt.Errorf("could not type assert upstream struct"),
121+
}
122+
return false
123+
}
124+
125+
results = append(results, upstreamStatus{
126+
Address: address,
127+
NumRequests: upstream.NumRequests(),
128+
Fails: upstream.Fails(),
129+
})
130+
return true
131+
})
132+
105133

106134
// If an error happened during the range, return it
107135
if rangeErr != nil {

modules/caddyhttp/reverseproxy/hosts.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,15 @@ func (u *Upstream) fillHost() {
132132
u.Host = host
133133
}
134134

135+
func (u *Upstream) fillInfilghtHost() {
136+
host := new(Host)
137+
existingHost, loaded := inflightHosts.LoadOrStore(u.String(), host)
138+
if loaded {
139+
host = existingHost.(*Host)
140+
}
141+
u.Host = host
142+
}
143+
135144
// Host is the basic, in-memory representation of the state of a remote host.
136145
// Its fields are accessed atomically and Host values must not be copied.
137146
type Host struct {
@@ -268,6 +277,10 @@ func GetDialInfo(ctx context.Context) (DialInfo, bool) {
268277
// through config reloads.
269278
var hosts = caddy.NewUsagePool()
270279

280+
// inflightHosts is the global repository for hosts that are
281+
// currently in use by inflight upstream request.
282+
var inflightHosts = caddy.NewUsagePool()
283+
271284
// dialInfoVarKey is the key used for the variable that holds
272285
// the dial info for the upstream connection.
273286
const dialInfoVarKey = "reverse_proxy.dial_info"

modules/caddyhttp/reverseproxy/reverseproxy.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,9 @@ func (h *Handler) Cleanup() error {
394394

395395
// remove hosts from our config from the pool
396396
for _, upstream := range h.Upstreams {
397+
if upstream.NumRequests() > 0 {
398+
upstream.fillInfilghtHost()
399+
}
397400
_, _ = hosts.Delete(upstream.String())
398401
}
399402

@@ -460,6 +463,26 @@ func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request, next caddyht
460463
var done bool
461464
done, proxyErr = h.proxyLoopIteration(clonedReq, r, w, proxyErr, start, retries, repl, reqHeader, reqHost, next)
462465
if done {
466+
// Iterate over the upstream pool (needs to be fast)
467+
var rangeErr error
468+
inflightHosts.Range(func(key, val any) bool {
469+
host, ok := val.(*Host)
470+
if !ok {
471+
rangeErr = caddy.APIError{
472+
HTTPStatus: http.StatusInternalServerError,
473+
Err: fmt.Errorf("could not type assert upstream struct"),
474+
}
475+
return false
476+
}
477+
if host.NumRequests() == 0 {
478+
inflightHosts.Delete(reqHost)
479+
}
480+
return true
481+
})
482+
// If an error happened during the range, return it
483+
if rangeErr != nil {
484+
return statusError(rangeErr)
485+
}
463486
break
464487
}
465488
if h.VerboseLogs {

0 commit comments

Comments
 (0)