diff --git a/.changeset/circuit-breaker.md b/.changeset/circuit-breaker.md new file mode 100644 index 000000000..070bda61c --- /dev/null +++ b/.changeset/circuit-breaker.md @@ -0,0 +1,11 @@ +--- +hive-router: patch +hive-router-plan-executor: patch +hive-router-config: patch +--- + +# Implement Circuit Breaker for Subgraph Requests + +This change introduces a circuit breaker mechanism for subgraph requests in the Hive Router. The circuit breaker will monitor the success and failure rates of requests to each subgraph and will prevent future requests if the failure rate exceeds a certain threshold. When the circuit breaker is opened, subsequent requests to that subgraph will fail immediately without attempting to send the request. + +This implementation helps improve the resilience and stability of the Hive Router when dealing with unreliable subgraphs. \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 70b3f3672..a7b7b2af5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2672,6 +2672,7 @@ dependencies = [ "futures", "futures-util", "graphql-tools", + "hive-console-sdk", "hive-router-config", "hive-router-internal", "hive-router-query-planner", @@ -2684,8 +2685,10 @@ dependencies = [ "indexmap 2.14.0", "insta", "itoa", + "lazy_static", "mockito", "ntex", + "recloser", "regex-automata", "rustls", "ryu", diff --git a/Cargo.toml b/Cargo.toml index dc1ffd3af..e408e05f9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -89,6 +89,7 @@ strum = { version = "0.28.0", features = ["derive"] } mockito = "1.7.0" futures-util = "0.3.31" axum = "0.8.4" +recloser = "1.3.1" notify = "8.2.0" ipnet = "2.12.0" diff --git a/docs/README.md b/docs/README.md index 17ecc9e09..82ea2c64c 100644 --- a/docs/README.md +++ b/docs/README.md @@ -23,7 +23,7 @@ |[**subscriptions**](#subscriptions)|`object`|Configuration for subscriptions.
Default: `{"broadcast_capacity":0,"enabled":false}`
|| |[**supergraph**](#supergraph)|`object`|Configuration for the Federation supergraph source. By default, the router will use a local file-based supergraph source (`./supergraph.graphql`).
|| |[**telemetry**](#telemetry)|`object`|Default: `{"client_identification":{"ip_header":null,"name_header":"graphql-client-name","version_header":"graphql-client-version"},"hive":null,"metrics":{"exporters":[],"instrumentation":{"common":{"histogram":{"aggregation":"explicit","bytes":{"buckets":[128,512,1024,2048,4096,8192,16384,32768,65536,131072,262144,524288,1048576,2097152,3145728,4194304,5242880],"record_min_max":false},"seconds":{"buckets":[0.005,0.01,0.025,0.05,0.075,0.1,0.25,0.5,0.75,1,2.5,5,7.5,10],"record_min_max":false}}},"instruments":{}}},"resource":{"attributes":{}},"tracing":{"collect":{"max_attributes_per_event":16,"max_attributes_per_link":32,"max_attributes_per_span":128,"max_events_per_span":128,"parent_based_sampler":false,"sampling":1},"exporters":[],"instrumentation":{"spans":{"mode":"spec_compliant"}},"propagation":{"b3":false,"baggage":false,"jaeger":false,"trace_context":true}}}`
|| -|[**traffic\_shaping**](#traffic_shaping)|`object`|Configuration for the traffic-shaping of the executor. Use these configurations to control how requests are being executed to subgraphs.
Default: `{"all":{"allow_only_http2":false,"dedupe_enabled":true,"pool_idle_timeout":"50s","request_timeout":"30s"},"max_connections_per_host":100,"router":{"dedupe":{"enabled":false,"headers":"all"},"max_long_lived_clients":128,"request_timeout":"1m"}}`
|| +|[**traffic\_shaping**](#traffic_shaping)|`object`|Configuration for the traffic-shaping of the executor. Use these configurations to control how requests are being executed to subgraphs.
Default: `{"all":{"allow_only_http2":false,"circuit_breaker":null,"dedupe_enabled":true,"pool_idle_timeout":"50s","request_timeout":"30s"},"max_connections_per_host":100,"router":{"dedupe":{"enabled":false,"headers":"all"},"max_long_lived_clients":128,"request_timeout":"1m"}}`
|| |[**websocket**](#websocket)|`object`|Configuration of router's WebSocket server.
Default: `{"enabled":false,"headers":{"persist":false,"source":"connection"},"path":null}`
|| **Additional Properties:** not allowed @@ -205,6 +205,7 @@ telemetry: traffic_shaping: all: allow_only_http2: false + circuit_breaker: null dedupe_enabled: true pool_idle_timeout: 50s request_timeout: 30s @@ -3391,7 +3392,7 @@ Configuration for the traffic-shaping of the executor. Use these configurations |Name|Type|Description|Required| |----|----|-----------|--------| -|[**all**](#traffic_shapingall)|`object`|The default configuration that will be applied to all subgraphs, unless overridden by a specific subgraph configuration.
Default: `{"allow_only_http2":false,"dedupe_enabled":true,"pool_idle_timeout":"50s","request_timeout":"30s"}`
|| +|[**all**](#traffic_shapingall)|`object`|The default configuration that will be applied to all subgraphs, unless overridden by a specific subgraph configuration.
Default: `{"allow_only_http2":false,"circuit_breaker":null,"dedupe_enabled":true,"pool_idle_timeout":"50s","request_timeout":"30s"}`
|| |**max\_connections\_per\_host**|`integer`|Limits the concurrent amount of requests/connections per host/subgraph.
Default: `100`
Format: `"uint"`
Minimum: `0`
|| |[**router**](#traffic_shapingrouter)|`object`|Configuration for the router itself, e.g., for handling incoming requests, or other router-level traffic shaping configurations.
Default: `{"dedupe":{"enabled":false,"headers":"all"},"max_long_lived_clients":128,"request_timeout":"1m"}`
|| |[**subgraphs**](#traffic_shapingsubgraphs)|`object`|Optional per-subgraph configurations that will override the default configuration for specific subgraphs.
|| @@ -3402,6 +3403,7 @@ Configuration for the traffic-shaping of the executor. Use these configurations ```yaml all: allow_only_http2: false + circuit_breaker: null dedupe_enabled: true pool_idle_timeout: 50s request_timeout: 30s @@ -3426,6 +3428,7 @@ The default configuration that will be applied to all subgraphs, unless overridd |Name|Type|Description|Required| |----|----|-----------|--------| |**allow\_only\_http2**|`boolean`|Forces HTTP/2 for requests to subgraphs.

For plain HTTP, it will use HTTP/2 cleartext (h2c).
For HTTPS, it also requires HTTP/2.
This will make the subgraph requests never fall back to HTTP/1.1,
and will fail if the subgraph doesn't support HTTP/2.
Default: `false`
|| +|[**circuit\_breaker**](#traffic_shapingallcircuit_breaker)|`object`, `null`|Circuit Breaker configuration for all subgraphs.
|| |**dedupe\_enabled**|`boolean`|Enables/disables request deduplication to subgraphs.

When requests exactly matches the hashing mechanism (e.g., subgraph name, URL, headers, query, variables), and are executed at the same time, they will
be deduplicated by sharing the response of other in-flight requests.
Default: `true`
|| |**pool\_idle\_timeout**|`string`|Timeout for idle sockets being kept-alive.
Default: `"50s"`
|| |**request\_timeout**||Optional timeout configuration for requests to subgraphs.

Example with a fixed duration:
```yaml
timeout:
duration: 5s
```

Or with a VRL expression that can return a duration based on the operation kind:
```yaml
timeout:
expression: \|
if (.request.operation.type == "mutation") {
"10s"
} else {
"15s"
}
```
Default: `"30s"`
|| @@ -3436,12 +3439,53 @@ The default configuration that will be applied to all subgraphs, unless overridd ```yaml allow_only_http2: false +circuit_breaker: null dedupe_enabled: true pool_idle_timeout: 50s request_timeout: 30s ``` + +#### traffic\_shaping\.all\.circuit\_breaker: object,null + +Circuit Breaker configuration for all subgraphs. +When the circuit breaker is open, requests to the subgraph will be +short-circuited and an error will be returned to the client. +The circuit breaker will be triggered based on the error rate of requests to the subgraph, and will attempt to reset after a certain timeout. + + +**Properties** + +|Name|Type|Description|Required| +|----|----|-----------|--------| +|**enabled**|`boolean`, `null`|Enable or disable the circuit breaker for the subgraph.
Default: false (circuit breaker is disabled)

When unset on a subgraph-level configuration, the value falls back
to the value defined in the global (`all`) circuit breaker
configuration.
|| +|[**error\_status\_codes**](#traffic_shapingallcircuit_breakererror_status_codes)|`integer[]`|HTTP status codes returned by the subgraph that should be counted as
|| +|**error\_threshold**|`string`|Percentage after what the circuit breaker should kick in.
Default: 50%
|| +|**reset\_timeout**|`string`|The duration after which the circuit breaker will attempt to retry sending requests to the subgraph.
Default: 30s
|| +|**volume\_threshold**|`integer`, `null`|Count of requests before starting evaluating.
Default: 5
Format: `"uint"`
Minimum: `0`
|| + +**Additional Properties:** not allowed + +##### traffic\_shaping\.all\.circuit\_breaker\.error\_status\_codes\[\]: array,null + +HTTP status codes returned by the subgraph that should be counted as +failures by the circuit breaker. + +Only responses whose status code is contained in this list will be +recorded as failures. Responses with any other status code (including +other 5xx codes) are treated as successes from the circuit breaker's +point of view. + +Default: `[503]` + + +**Items** + +**Item Type:** `integer` +**Item Minimum:** `100` +**Item Maximum:** `599` +**Unique Items:** yes #### traffic\_shaping\.all\.tls: object,null @@ -3554,12 +3598,52 @@ Optional per-subgraph configurations that will override the default configuratio |Name|Type|Description|Required| |----|----|-----------|--------| |**allow\_only\_http2**|`boolean`, `null`|Forces HTTP/2 for requests to subgraphs.

For plain HTTP, it will use HTTP/2 cleartext (h2c).
For HTTPS, it also requires HTTP/2.
This will make the subgraph requests never fall back to HTTP/1.1,
and will fail if the subgraph doesn't support HTTP/2.
|| +|[**circuit\_breaker**](#traffic_shapingsubgraphsadditionalpropertiescircuit_breaker)|`object`, `null`|Circuit Breaker configuration for the subgraph.
|| |**dedupe\_enabled**|`boolean`, `null`|Enables/disables request deduplication to subgraphs.

When requests exactly matches the hashing mechanism (e.g., subgraph name, URL, headers, query, variables), and are executed at the same time, they will
be deduplicated by sharing the response of other in-flight requests.
|| |**pool\_idle\_timeout**|`string`, `null`|Timeout for idle sockets being kept-alive.
|| |**request\_timeout**||Optional timeout configuration for requests to subgraphs.

Example with a fixed duration:
```yaml
timeout:
duration: 5s
```

Or with a VRL expression that can return a duration based on the operation kind:
```yaml
timeout:
expression: \|
if (.request.operation.type == "mutation") {
"10s"
} else {
"15s"
}
```
|| |[**tls**](#traffic_shapingsubgraphsadditionalpropertiestls)|`object`, `null`||| **Additional Properties:** not allowed + +##### traffic\_shaping\.subgraphs\.additionalProperties\.circuit\_breaker: object,null + +Circuit Breaker configuration for the subgraph. +When the circuit breaker is open, requests to the subgraph will be short-circuited and an error will be returned to the client. +The circuit breaker will be triggered based on the error rate of requests to the subgraph, and will attempt to reset after a certain timeout. + + +**Properties** + +|Name|Type|Description|Required| +|----|----|-----------|--------| +|**enabled**|`boolean`, `null`|Enable or disable the circuit breaker for the subgraph.
Default: false (circuit breaker is disabled)

When unset on a subgraph-level configuration, the value falls back
to the value defined in the global (`all`) circuit breaker
configuration.
|| +|[**error\_status\_codes**](#traffic_shapingsubgraphsadditionalpropertiescircuit_breakererror_status_codes)|`integer[]`|HTTP status codes returned by the subgraph that should be counted as
|| +|**error\_threshold**|`string`|Percentage after what the circuit breaker should kick in.
Default: 50%
|| +|**reset\_timeout**|`string`|The duration after which the circuit breaker will attempt to retry sending requests to the subgraph.
Default: 30s
|| +|**volume\_threshold**|`integer`, `null`|Count of requests before starting evaluating.
Default: 5
Format: `"uint"`
Minimum: `0`
|| + +**Additional Properties:** not allowed + +###### traffic\_shaping\.subgraphs\.additionalProperties\.circuit\_breaker\.error\_status\_codes\[\]: array,null + +HTTP status codes returned by the subgraph that should be counted as +failures by the circuit breaker. + +Only responses whose status code is contained in this list will be +recorded as failures. Responses with any other status code (including +other 5xx codes) are treated as successes from the circuit breaker's +point of view. + +Default: `[503]` + + +**Items** + +**Item Type:** `integer` +**Item Minimum:** `100` +**Item Maximum:** `599` +**Unique Items:** yes ##### traffic\_shaping\.subgraphs\.additionalProperties\.tls: object,null diff --git a/e2e/src/circuit_breaker.rs b/e2e/src/circuit_breaker.rs new file mode 100644 index 000000000..6bfc129d7 --- /dev/null +++ b/e2e/src/circuit_breaker.rs @@ -0,0 +1,1331 @@ +#[cfg(test)] +mod circuit_breaker_e2e_tests { + use std::{thread::sleep, time::Duration}; + + use hive_router_internal::telemetry::metrics::catalog::{labels, names}; + + use crate::testkit::{otel::OtlpCollector, ClientResponseExt, TestRouter, TestSubgraphs}; + + #[ntex::test] + async fn should_open_circuit_breaker_after_slow_subgraph_timeouts() { + let subgraphs = TestSubgraphs::builder() + .with_on_request(|request| { + if request.path == "/accounts" { + sleep(Duration::from_millis(700)); + } + None + }) + .build() + .start() + .await; + + let router = TestRouter::builder() + .with_subgraphs(&subgraphs) + .inline_config( + r#" + supergraph: + source: file + path: supergraph.graphql + traffic_shaping: + all: + request_timeout: 200ms + circuit_breaker: + enabled: true + error_threshold: 50% + volume_threshold: 3 + reset_timeout: 30s + "#, + ) + .build() + .start() + .await; + + for _ in 1..=3 { + let _ = router + .send_graphql_request("{ users { id } }", None, None) + .await; + } + + let res = router + .send_graphql_request("{ users { id } }", None, None) + .await; + assert_eq!(res.status(), ntex::http::StatusCode::OK); + insta::assert_snapshot!( + res.json_body_string_pretty().await, + @r###"{ + "data": { + "users": null + }, + "errors": [ + { + "message": "Request to subgraph timed out", + "extensions": { + "code": "SUBGRAPH_REQUEST_TIMEOUT", + "serviceName": "accounts" + } + } + ] +}"### + ); + + let res = router + .send_graphql_request("{ users { id } }", None, None) + .await; + assert_eq!(res.status(), ntex::http::StatusCode::OK); + insta::assert_snapshot!( + res.json_body_string_pretty().await, + @r###"{ + "data": { + "users": null + }, + "errors": [ + { + "message": "Rejected by the circuit breaker", + "extensions": { + "code": "SUBGRAPH_CIRCUIT_BREAKER_REJECTED", + "serviceName": "accounts" + } + } + ] +}"### + ); + } + + #[ntex::test] + async fn should_not_open_circuit_breaker_when_subgraph_timeout_override_allows_request() { + let subgraphs = TestSubgraphs::builder() + .with_on_request(|request| { + if request.path == "/accounts" { + sleep(Duration::from_millis(400)); + } + None + }) + .build() + .start() + .await; + + let router = TestRouter::builder() + .with_subgraphs(&subgraphs) + .inline_config( + r#" + supergraph: + source: file + path: supergraph.graphql + traffic_shaping: + all: + request_timeout: 100ms + circuit_breaker: + enabled: true + error_threshold: 50% + volume_threshold: 3 + reset_timeout: 30s + subgraphs: + accounts: + request_timeout: 1s + "#, + ) + .build() + .start() + .await; + + for _ in 1..=4 { + let res = router + .send_graphql_request("{ users { id } }", None, None) + .await; + assert_eq!(res.status(), ntex::http::StatusCode::OK); + } + + let res = router + .send_graphql_request("{ users { id } }", None, None) + .await; + assert_eq!(res.status(), ntex::http::StatusCode::OK); + insta::assert_snapshot!( + res.json_body_string_pretty().await, + @r###"{ + "data": { + "users": [ + { + "id": "1" + }, + { + "id": "2" + }, + { + "id": "3" + }, + { + "id": "4" + }, + { + "id": "5" + }, + { + "id": "6" + } + ] + } +}"### + ); + } + + #[ntex::test] + async fn should_open_circuit_breaker_after_error_threshold() { + let mut accounts_server = mockito::Server::new_async().await; + let host = accounts_server.host_with_port(); + + let error_mock = accounts_server + .mock("POST", "/accounts") + .with_status(503) + .expect(4) + .create_async() + .await; + + let router = TestRouter::builder() + .inline_config(&format!( + r#" + supergraph: + source: file + path: supergraph.graphql + traffic_shaping: + all: + circuit_breaker: + enabled: true + error_threshold: 50% + volume_threshold: 3 + reset_timeout: 30s + override_subgraph_urls: + accounts: + url: "http://{host}/accounts" + "# + )) + .build() + .start() + .await; + + for _ in 1..=3 { + let _ = router + .send_graphql_request("{ users { id } }", None, None) + .await; + } + + let res = router + .send_graphql_request("{ users { id } }", None, None) + .await; + + insta::assert_snapshot!( + &res.json_body_string_pretty().await, + @r###"{ + "data": { + "users": null + }, + "errors": [ + { + "message": "Received empty response body from subgraph \"accounts\"", + "extensions": { + "code": "SUBGRAPH_RESPONSE_BODY_EMPTY", + "serviceName": "accounts" + } + } + ] +}"### + ); + + error_mock.assert_async().await; + + let res = router + .send_graphql_request("{ users { id } }", None, None) + .await; + + assert_eq!(res.status(), ntex::http::StatusCode::OK); + + insta::assert_snapshot!( + res.json_body_string_pretty().await, + @r###"{ + "data": { + "users": null + }, + "errors": [ + { + "message": "Rejected by the circuit breaker", + "extensions": { + "code": "SUBGRAPH_CIRCUIT_BREAKER_REJECTED", + "serviceName": "accounts" + } + } + ] +}"### + ); + } + + #[ntex::test] + async fn should_close_circuit_breaker_after_reset_timeout() { + let mut accounts_server = mockito::Server::new_async().await; + let host = accounts_server.host_with_port(); + + let error_mock = accounts_server + .mock("POST", "/accounts") + .with_status(503) + .expect(4) + .create_async() + .await; + + let router = TestRouter::builder() + .inline_config(&format!( + r#" + supergraph: + source: file + path: supergraph.graphql + traffic_shaping: + all: + circuit_breaker: + enabled: true + error_threshold: 50% + volume_threshold: 3 + reset_timeout: 2s + override_subgraph_urls: + accounts: + url: "http://{host}/accounts" + "# + )) + .build() + .start() + .await; + + for _ in 1..=4 { + let _ = router + .send_graphql_request("{ users { id } }", None, None) + .await; + } + + error_mock.assert_async().await; + + let res = router + .send_graphql_request("{ users { id } }", None, None) + .await; + insta::assert_snapshot!( + res.json_body_string_pretty().await, + @r###"{ + "data": { + "users": null + }, + "errors": [ + { + "message": "Rejected by the circuit breaker", + "extensions": { + "code": "SUBGRAPH_CIRCUIT_BREAKER_REJECTED", + "serviceName": "accounts" + } + } + ] +}"### + ); + + let success_mock = accounts_server + .mock("POST", "/accounts") + .with_status(200) + .with_body(r#"{"data":{"users":[{"id":"1"}]}}"#) + .expect_at_least(1) + .create_async() + .await; + + tokio::time::sleep(Duration::from_secs(3)).await; + + let res = router + .send_graphql_request("{ users { id } }", None, None) + .await; + + insta::assert_snapshot!( + res.json_body_string_pretty().await, + @r###"{ + "data": { + "users": [ + { + "id": "1" + } + ] + } +}"### + ); + + success_mock.assert_async().await; + } + + #[ntex::test] + async fn should_support_per_subgraph_circuit_breaker_config() { + let mut accounts_server = mockito::Server::new_async().await; + let accounts_host = accounts_server.host_with_port(); + + let mut products_server = mockito::Server::new_async().await; + let products_host = products_server.host_with_port(); + + let accounts_error_mock = accounts_server + .mock("POST", "/accounts") + .with_status(503) + .expect(3) + .create_async() + .await; + + let products_success_mock = products_server + .mock("POST", "/products") + .with_status(200) + .with_body(r#"{"data":{"topProducts":[{"upc":"1"}]}}"#) + .expect_at_least(1) + .create_async() + .await; + + let router = TestRouter::builder() + .inline_config(&format!( + r#" + supergraph: + source: file + path: supergraph.graphql + traffic_shaping: + all: + circuit_breaker: + enabled: false + subgraphs: + accounts: + circuit_breaker: + enabled: true + error_threshold: 50% + volume_threshold: 2 + reset_timeout: 30s + override_subgraph_urls: + accounts: + url: "http://{accounts_host}/accounts" + products: + url: "http://{products_host}/products" + "# + )) + .build() + .start() + .await; + + for _ in 1..=3 { + let _ = router + .send_graphql_request("{ users { id } }", None, None) + .await; + } + + accounts_error_mock.assert_async().await; + + let res = router + .send_graphql_request("{ users { id } }", None, None) + .await; + + insta::assert_snapshot!( + res.json_body_string_pretty().await, + @r###"{ + "data": { + "users": null + }, + "errors": [ + { + "message": "Rejected by the circuit breaker", + "extensions": { + "code": "SUBGRAPH_CIRCUIT_BREAKER_REJECTED", + "serviceName": "accounts" + } + } + ] +}"### + ); + + let res = router + .send_graphql_request("{ topProducts { upc } }", None, None) + .await; + + insta::assert_snapshot!( + res.json_body_string_pretty().await, + @r###"{ + "data": { + "topProducts": [ + { + "upc": "1" + } + ] + } +}"### + ); + + products_success_mock.assert_async().await; + } + + /// With `volume_threshold: 10` and `error_threshold: 90%`, the breaker + /// only starts evaluating after the ring buffer of size 10 fills up. + /// Sending exactly `volume_threshold` failing requests is therefore not + /// enough on its own to flip the breaker open — the next request still + /// reaches the subgraph. This guards against accidentally tripping the + /// breaker too eagerly when the configured volume is barely reached. + #[ntex::test] + async fn should_not_open_circuit_breaker_when_volume_threshold_just_reached() { + let mut accounts_server = mockito::Server::new_async().await; + let host = accounts_server.host_with_port(); + + let mock = accounts_server + .mock("POST", "/accounts") + .with_status(503) + .expect_at_least(1) + .create_async() + .await; + + let router = TestRouter::builder() + .inline_config(&format!( + r#" + supergraph: + source: file + path: supergraph.graphql + traffic_shaping: + all: + circuit_breaker: + enabled: true + error_threshold: 90% + volume_threshold: 10 + reset_timeout: 30s + override_subgraph_urls: + accounts: + url: "http://{host}/accounts" + "# + )) + .build() + .start() + .await; + + for _ in 1..=10 { + let _ = router + .send_graphql_request("{ users { id } }", None, None) + .await; + } + + let res = router + .send_graphql_request("{ users { id } }", None, None) + .await; + + assert_eq!(res.status(), ntex::http::StatusCode::OK); + + insta::assert_snapshot!( + res.json_body_string_pretty().await, + @r###"{ + "data": { + "users": null + }, + "errors": [ + { + "message": "Received empty response body from subgraph \"accounts\"", + "extensions": { + "code": "SUBGRAPH_RESPONSE_BODY_EMPTY", + "serviceName": "accounts" + } + } + ] +}"### + ); + + mock.assert_async().await; + } + + #[ntex::test] + async fn should_not_activate_circuit_breaker_when_disabled() { + let mut accounts_server = mockito::Server::new_async().await; + let host = accounts_server.host_with_port(); + + let error_mock = accounts_server + .mock("POST", "/accounts") + .with_status(503) + .expect_at_least(10) + .create_async() + .await; + + let router = TestRouter::builder() + .inline_config(&format!( + r#" + supergraph: + source: file + path: supergraph.graphql + traffic_shaping: + all: + circuit_breaker: + enabled: false + override_subgraph_urls: + accounts: + url: "http://{host}/accounts" + "# + )) + .build() + .start() + .await; + + for _ in 1..=9 { + let _ = router + .send_graphql_request("{ users { id } }", None, None) + .await; + } + + let res = router + .send_graphql_request("{ users { id } }", None, None) + .await; + + insta::assert_snapshot!( + &res.json_body_string_pretty().await, + @r###"{ + "data": { + "users": null + }, + "errors": [ + { + "message": "Received empty response body from subgraph \"accounts\"", + "extensions": { + "code": "SUBGRAPH_RESPONSE_BODY_EMPTY", + "serviceName": "accounts" + } + } + ] +}"### + ); + + error_mock.assert_async().await; + } + + #[ntex::test] + async fn should_override_global_circuit_breaker_with_subgraph_config() { + let mut accounts_server = mockito::Server::new_async().await; + let host = accounts_server.host_with_port(); + + let error_mock = accounts_server + .mock("POST", "/accounts") + .with_status(503) + .expect(6) + .create_async() + .await; + + let router = TestRouter::builder() + .inline_config(&format!( + r#" + supergraph: + source: file + path: supergraph.graphql + traffic_shaping: + all: + circuit_breaker: + enabled: true + error_threshold: 50% + volume_threshold: 3 + reset_timeout: 30s + subgraphs: + accounts: + circuit_breaker: + enabled: true + volume_threshold: 5 + override_subgraph_urls: + accounts: + url: "http://{host}/accounts" + "# + )) + .build() + .start() + .await; + + for _ in 1..=6 { + let _ = router + .send_graphql_request("{ users { id } }", None, None) + .await; + } + + error_mock.assert_async().await; + + let res = router + .send_graphql_request("{ users { id } }", None, None) + .await; + + insta::assert_snapshot!( + res.json_body_string_pretty().await, + @r###"{ + "data": { + "users": null + }, + "errors": [ + { + "message": "Rejected by the circuit breaker", + "extensions": { + "code": "SUBGRAPH_CIRCUIT_BREAKER_REJECTED", + "serviceName": "accounts" + } + } + ] +}"### + ); + + error_mock.assert_async().await; + } + + /// Requests to an unreachable upstream time out and surface as + /// `SUBGRAPH_REQUEST_TIMEOUT`. Those timeouts must be counted as failures + /// by the circuit breaker so that, after the threshold is reached, further + /// requests are short-circuited with `SUBGRAPH_CIRCUIT_BREAKER_REJECTED`. + #[ntex::test] + async fn should_open_circuit_breaker_on_subgraph_unreachable_timeouts() { + let non_existent_host = "192.0.2.1:9999"; + + let router = TestRouter::builder() + .inline_config(&format!( + r#" + supergraph: + source: file + path: supergraph.graphql + traffic_shaping: + all: + request_timeout: 500ms + circuit_breaker: + enabled: true + error_threshold: 50% + volume_threshold: 3 + reset_timeout: 30s + override_subgraph_urls: + accounts: + url: "http://{non_existent_host}/accounts" + "# + )) + .build() + .start() + .await; + + for _ in 1..=3 { + let _ = router + .send_graphql_request("{ users { id } }", None, None) + .await; + } + + let res = router + .send_graphql_request("{ users { id } }", None, None) + .await; + + insta::assert_snapshot!( + &res.json_body_string_pretty().await, + @r###"{ + "data": { + "users": null + }, + "errors": [ + { + "message": "Request to subgraph timed out", + "extensions": { + "code": "SUBGRAPH_REQUEST_TIMEOUT", + "serviceName": "accounts" + } + } + ] +}"### + ); + + let res = router + .send_graphql_request("{ users { id } }", None, None) + .await; + + insta::assert_snapshot!( + res.json_body_string_pretty().await, + @r###"{ + "data": { + "users": null + }, + "errors": [ + { + "message": "Rejected by the circuit breaker", + "extensions": { + "code": "SUBGRAPH_CIRCUIT_BREAKER_REJECTED", + "serviceName": "accounts" + } + } + ] +}"### + ); + } + + #[ntex::test] + async fn should_record_rejection_metrics() { + let otlp_collector = OtlpCollector::start() + .await + .expect("Failed to start OTLP collector"); + let otlp_endpoint = otlp_collector.http_metrics_endpoint(); + + let non_existent_host = "192.0.2.1:9999"; + + let router = TestRouter::builder() + .inline_config(&format!( + r#" + supergraph: + source: file + path: supergraph.graphql + telemetry: + metrics: + exporters: + - kind: otlp + endpoint: {otlp_endpoint} + protocol: http + interval: 30ms + max_export_timeout: 50ms + traffic_shaping: + all: + request_timeout: 500ms + circuit_breaker: + enabled: true + error_threshold: 50% + volume_threshold: 3 + reset_timeout: 30s + override_subgraph_urls: + accounts: + url: "http://{non_existent_host}/accounts" + "# + )) + .build() + .start() + .await; + + for _ in 1..=4 { + router + .send_graphql_request("{ users { id } }", None, None) + .await; + } + + for _ in 1..=3 { + router + .send_graphql_request("{ users { id } }", None, None) + .await; + } + + tokio::time::sleep(Duration::from_millis(100)).await; + + let metrics = otlp_collector.metrics_view().await; + let attrs = [(labels::SUBGRAPH_NAME, "accounts")]; + + let rejection_count = + metrics.latest_counter(names::CIRCUIT_BREAKER_REJECTED_REQUESTS, &attrs); + + assert!( + rejection_count >= 3.0, + "Expected at least 3 rejected requests, got {}", + rejection_count + ); + } + + /// When a subgraph defines a `circuit_breaker` block but does not explicitly + /// set `enabled`, the value should be inherited from the global `all` + /// configuration instead of silently defaulting to `false`. + #[ntex::test] + async fn should_inherit_enabled_from_global_when_subgraph_circuit_breaker_omits_enabled() { + let mut accounts_server = mockito::Server::new_async().await; + let host = accounts_server.host_with_port(); + + let error_mock = accounts_server + .mock("POST", "/accounts") + .with_status(503) + .expect(3) + .create_async() + .await; + + let router = TestRouter::builder() + .inline_config(&format!( + r#" + supergraph: + source: file + path: supergraph.graphql + traffic_shaping: + all: + circuit_breaker: + enabled: true + error_threshold: 50% + volume_threshold: 2 + reset_timeout: 30s + subgraphs: + accounts: + circuit_breaker: + volume_threshold: 2 + override_subgraph_urls: + accounts: + url: "http://{host}/accounts" + "# + )) + .build() + .start() + .await; + + for _ in 1..=3 { + let _ = router + .send_graphql_request("{ users { id } }", None, None) + .await; + } + + error_mock.assert_async().await; + + // The subgraph block exists without `enabled`, so it should inherit + // `enabled: true` from the global config and trip the breaker. + let res = router + .send_graphql_request("{ users { id } }", None, None) + .await; + + insta::assert_snapshot!( + res.json_body_string_pretty().await, + @r###"{ + "data": { + "users": null + }, + "errors": [ + { + "message": "Rejected by the circuit breaker", + "extensions": { + "code": "SUBGRAPH_CIRCUIT_BREAKER_REJECTED", + "serviceName": "accounts" + } + } + ] +}"### + ); + } + + /// A subgraph should be able to explicitly opt out of the circuit breaker + /// (with `enabled: false`) even when the global config has it enabled. + #[ntex::test] + async fn should_allow_subgraph_to_disable_circuit_breaker_when_global_is_enabled() { + let mut accounts_server = mockito::Server::new_async().await; + let host = accounts_server.host_with_port(); + + // The breaker should never trip for `accounts`, so all 10 requests must + // reach the upstream subgraph. + let error_mock = accounts_server + .mock("POST", "/accounts") + .with_status(503) + .expect(10) + .create_async() + .await; + + let router = TestRouter::builder() + .inline_config(&format!( + r#" + supergraph: + source: file + path: supergraph.graphql + traffic_shaping: + all: + circuit_breaker: + enabled: true + error_threshold: 50% + volume_threshold: 3 + reset_timeout: 30s + subgraphs: + accounts: + circuit_breaker: + enabled: false + override_subgraph_urls: + accounts: + url: "http://{host}/accounts" + "# + )) + .build() + .start() + .await; + + for _ in 1..=10 { + let _ = router + .send_graphql_request("{ users { id } }", None, None) + .await; + } + + error_mock.assert_async().await; + + let res = router + .send_graphql_request("{ users { id } }", None, None) + .await; + + // The breaker stays closed, so we keep getting the upstream error + // instead of `SUBGRAPH_CIRCUIT_BREAKER_REJECTED`. + insta::assert_snapshot!( + res.json_body_string_pretty().await, + @r###"{ + "data": { + "users": null + }, + "errors": [ + { + "message": "Received empty response body from subgraph \"accounts\"", + "extensions": { + "code": "SUBGRAPH_RESPONSE_BODY_EMPTY", + "serviceName": "accounts" + } + } + ] +}"### + ); + } + + /// When a subgraph responds with HTTP 503 but ships a valid GraphQL body + /// (data and/or errors), the router must surface that body to the client + /// instead of replacing it with an `SUBGRAPH_RESPONSE_BODY_EMPTY` error. + /// The 503 is still recorded internally so the circuit breaker can trip, + /// but a single failing request alone should not transform the upstream + /// response. + #[ntex::test] + async fn should_return_subgraph_503_response_body_to_client() { + let mut accounts_server = mockito::Server::new_async().await; + let host = accounts_server.host_with_port(); + + let error_mock = accounts_server + .mock("POST", "/accounts") + .with_status(503) + .with_header("content-type", "application/json") + .with_body( + r#"{"data":{"users":[{"id":"1"}]},"errors":[{"message":"upstream is unhappy","extensions":{"code":"UPSTREAM_FAILURE"}}]}"#, + ) + .expect_at_least(1) + .create_async() + .await; + + let router = TestRouter::builder() + .inline_config(&format!( + r#" + supergraph: + source: file + path: supergraph.graphql + traffic_shaping: + all: + circuit_breaker: + enabled: false + override_subgraph_urls: + accounts: + url: "http://{host}/accounts" + "# + )) + .build() + .start() + .await; + + let res = router + .send_graphql_request("{ users { id } }", None, None) + .await; + assert_eq!(res.status(), ntex::http::StatusCode::OK); + + // The upstream 503 body must be propagated to the client (with the + // partial `data` and upstream `errors`) instead of being replaced by + // an `SUBGRAPH_RESPONSE_BODY_EMPTY` error. The router augments the + // error extensions with the originating subgraph name. + insta::assert_snapshot!( + res.json_body_string_pretty().await, + @r###"{ + "data": { + "users": [ + { + "id": "1" + } + ] + }, + "errors": [ + { + "message": "upstream is unhappy", + "extensions": { + "code": "UPSTREAM_FAILURE", + "serviceName": "accounts" + } + } + ] +}"### + ); + + error_mock.assert_async().await; + } + + /// 503 subgraph responses that carry a valid GraphQL body must still count + /// as failures for the circuit breaker (503 is the default tracked status + /// code). The first few requests should receive the upstream body, then + /// the breaker must open and short-circuit subsequent requests with + /// `SUBGRAPH_CIRCUIT_BREAKER_REJECTED`. + #[ntex::test] + async fn should_track_503_responses_with_body_in_circuit_breaker() { + let mut accounts_server = mockito::Server::new_async().await; + let host = accounts_server.host_with_port(); + + // The breaker should open after 4 failures with these thresholds, so + // the upstream subgraph should be hit exactly 4 times. + let error_mock = accounts_server + .mock("POST", "/accounts") + .with_status(503) + .with_header("content-type", "application/json") + .with_body( + r#"{"data":null,"errors":[{"message":"upstream is unhappy","extensions":{"code":"UPSTREAM_FAILURE"}}]}"#, + ) + .expect(4) + .create_async() + .await; + + let router = TestRouter::builder() + .inline_config(&format!( + r#" + supergraph: + source: file + path: supergraph.graphql + traffic_shaping: + all: + circuit_breaker: + enabled: true + error_threshold: 50% + volume_threshold: 3 + reset_timeout: 30s + override_subgraph_urls: + accounts: + url: "http://{host}/accounts" + "# + )) + .build() + .start() + .await; + + // First four requests reach the subgraph and surface the upstream + // 503 body to the client. We collect the bodies first so the snapshot + // assertion can live outside the loop (insta forbids inline snapshots + // inside loops). + let mut bodies = Vec::with_capacity(4); + for _ in 1..=4 { + let res = router + .send_graphql_request("{ users { id } }", None, None) + .await; + assert_eq!(res.status(), ntex::http::StatusCode::OK); + bodies.push(res.json_body_string_pretty().await); + } + // All four bodies should be identical upstream 503 payloads. + for body in &bodies[1..] { + assert_eq!(&bodies[0], body); + } + insta::assert_snapshot!( + bodies[0], + @r###"{ + "data": { + "users": null + }, + "errors": [ + { + "message": "upstream is unhappy", + "extensions": { + "code": "UPSTREAM_FAILURE", + "serviceName": "accounts" + } + } + ] +}"### + ); + + error_mock.assert_async().await; + + // After the threshold is reached, the breaker opens and rejects new + // requests without ever calling the subgraph. + let res = router + .send_graphql_request("{ users { id } }", None, None) + .await; + assert_eq!(res.status(), ntex::http::StatusCode::OK); + insta::assert_snapshot!( + res.json_body_string_pretty().await, + @r###"{ + "data": { + "users": null + }, + "errors": [ + { + "message": "Rejected by the circuit breaker", + "extensions": { + "code": "SUBGRAPH_CIRCUIT_BREAKER_REJECTED", + "serviceName": "accounts" + } + } + ] +}"### + ); + } + + /// By default, only HTTP 503 responses count as failures for the circuit + /// breaker. Other 5xx codes (like 500) must NOT trip the breaker, so the + /// subgraph keeps being called for every request. + #[ntex::test] + async fn should_not_open_circuit_breaker_on_500_by_default() { + let mut accounts_server = mockito::Server::new_async().await; + let host = accounts_server.host_with_port(); + + // Subgraph returns a valid GraphQL body alongside the 500 status, + // so the executor surfaces it as a successful response (with the + // upstream status). Without a body the executor would return an + // error which the breaker counts regardless of status code. + let error_mock = accounts_server + .mock("POST", "/accounts") + .with_status(500) + .with_header("content-type", "application/json") + .with_body(r#"{"data":null,"errors":[{"message":"oops"}]}"#) + .expect(6) + .create_async() + .await; + + let router = TestRouter::builder() + .inline_config(&format!( + r#" + supergraph: + source: file + path: supergraph.graphql + traffic_shaping: + all: + circuit_breaker: + enabled: true + error_threshold: 50% + volume_threshold: 3 + reset_timeout: 30s + override_subgraph_urls: + accounts: + url: "http://{host}/accounts" + "# + )) + .build() + .start() + .await; + + for _ in 1..=6 { + let res = router + .send_graphql_request("{ users { id } }", None, None) + .await; + // The breaker never opens, so the upstream body is always + // returned and we never see SUBGRAPH_CIRCUIT_BREAKER_REJECTED. + let body = res.json_body_string_pretty().await; + assert!( + !body.contains("SUBGRAPH_CIRCUIT_BREAKER_REJECTED"), + "breaker must not open for 500 by default, got: {body}" + ); + } + + error_mock.assert_async().await; + } + + /// When the user configures `error_status_codes` to include 500, the + /// circuit breaker must open for 500 responses just like it does for 503 + /// by default. + #[ntex::test] + async fn should_open_circuit_breaker_on_500_when_configured() { + let mut accounts_server = mockito::Server::new_async().await; + let host = accounts_server.host_with_port(); + + // The breaker should open after 4 failures with these thresholds, so + // the subgraph should be hit exactly 4 times. + let error_mock = accounts_server + .mock("POST", "/accounts") + .with_status(500) + .with_header("content-type", "application/json") + .with_body(r#"{"data":null,"errors":[{"message":"oops"}]}"#) + .expect(4) + .create_async() + .await; + + let router = TestRouter::builder() + .inline_config(&format!( + r#" + supergraph: + source: file + path: supergraph.graphql + traffic_shaping: + all: + circuit_breaker: + enabled: true + error_threshold: 50% + volume_threshold: 3 + reset_timeout: 30s + error_status_codes: [500, 502, 503, 504] + override_subgraph_urls: + accounts: + url: "http://{host}/accounts" + "# + )) + .build() + .start() + .await; + + for _ in 1..=4 { + let _ = router + .send_graphql_request("{ users { id } }", None, None) + .await; + } + + let res = router + .send_graphql_request("{ users { id } }", None, None) + .await; + assert_eq!(res.status(), ntex::http::StatusCode::OK); + insta::assert_snapshot!( + res.json_body_string_pretty().await, + @r###"{ + "data": { + "users": null + }, + "errors": [ + { + "message": "Rejected by the circuit breaker", + "extensions": { + "code": "SUBGRAPH_CIRCUIT_BREAKER_REJECTED", + "serviceName": "accounts" + } + } + ] +}"### + ); + + error_mock.assert_async().await; + } + + /// A subgraph-level `error_status_codes` setting must override the + /// global one for that specific subgraph. + #[ntex::test] + async fn should_use_subgraph_level_error_status_codes_override() { + let mut accounts_server = mockito::Server::new_async().await; + let host = accounts_server.host_with_port(); + + // Subgraph-level config restricts failures to 502 only, so even + // though the global config would treat 500 as a failure, 500 + // responses must not trip the breaker for `accounts`. + let error_mock = accounts_server + .mock("POST", "/accounts") + .with_status(500) + .with_header("content-type", "application/json") + .with_body(r#"{"data":null,"errors":[{"message":"oops"}]}"#) + .expect(6) + .create_async() + .await; + + let router = TestRouter::builder() + .inline_config(&format!( + r#" + supergraph: + source: file + path: supergraph.graphql + traffic_shaping: + all: + circuit_breaker: + enabled: true + error_threshold: 50% + volume_threshold: 3 + reset_timeout: 30s + error_status_codes: [500] + subgraphs: + accounts: + circuit_breaker: + error_status_codes: [502] + override_subgraph_urls: + accounts: + url: "http://{host}/accounts" + "# + )) + .build() + .start() + .await; + + for _ in 1..=6 { + let res = router + .send_graphql_request("{ users { id } }", None, None) + .await; + let body = res.json_body_string_pretty().await; + assert!( + !body.contains("SUBGRAPH_CIRCUIT_BREAKER_REJECTED"), + "subgraph-level override must prevent breaker from opening on 500, got: {body}" + ); + } + + error_mock.assert_async().await; + } +} diff --git a/e2e/src/lib.rs b/e2e/src/lib.rs index 6112d65aa..a665b318e 100644 --- a/e2e/src/lib.rs +++ b/e2e/src/lib.rs @@ -5,6 +5,8 @@ mod authorization_directives_reject; #[cfg(test)] mod body_limit; #[cfg(test)] +mod circuit_breaker; +#[cfg(test)] mod conditional_directives; #[cfg(test)] mod coprocessor; diff --git a/lib/executor/Cargo.toml b/lib/executor/Cargo.toml index 6c6b16188..a3a9da2d7 100644 --- a/lib/executor/Cargo.toml +++ b/lib/executor/Cargo.toml @@ -19,6 +19,7 @@ hive-router-query-planner = { path = "../query-planner", version = "2.8.0" } hive-router-config = { path = "../router-config", version = "0.0.33" } hive-router-internal = { path = "../internal", version = "0.0.20" } graphql-tools = { path = "../graphql-tools", version = "0.5.4" } +hive-console-sdk = { path = "../hive-console-sdk", version = "0.3.8" } async-trait = { workspace = true } futures = { workspace = true } @@ -36,6 +37,7 @@ xxhash-rust = { workspace = true } tokio = { workspace = true, features = ["sync"] } dashmap = { workspace = true } serde_json = { workspace = true } +recloser = { workspace = true } regex-automata = { workspace = true } bytes = { workspace = true } @@ -44,6 +46,7 @@ ahash = { workspace = true } ntex = { workspace = true } hyper-rustls = { workspace = true} rustls = { workspace = true } +lazy_static = { workspace = true } hyper-util = { version = "0.1.16", features = [ "client", diff --git a/lib/executor/src/executors/common.rs b/lib/executor/src/executors/common.rs index 667515074..e1a0cb44c 100644 --- a/lib/executor/src/executors/common.rs +++ b/lib/executor/src/executors/common.rs @@ -19,7 +19,7 @@ pub trait SubgraphExecutor { execution_request: SubgraphExecutionRequest<'a>, timeout: Option, plugin_req_state: Option<&'a PluginRequestState<'a>>, - ) -> Result, SubgraphExecutorError>; + ) -> Result, SubgraphExecutorError>; async fn subscribe<'a>( &self, diff --git a/lib/executor/src/executors/error.rs b/lib/executor/src/executors/error.rs index 148931ac9..e668a00e2 100644 --- a/lib/executor/src/executors/error.rs +++ b/lib/executor/src/executors/error.rs @@ -1,7 +1,10 @@ +use hive_console_sdk::circuit_breaker::CircuitBreakerError; use http::{uri::InvalidUri, StatusCode}; use rustls::server::VerifierBuilderError; use strum::IntoStaticStr; +use crate::response::subgraph_response::SubgraphResponse; + #[derive(thiserror::Error, Debug, IntoStaticStr)] pub enum SubgraphExecutorError { #[error("Failed to parse endpoint \"{0}\" as URI: {1}")] @@ -57,6 +60,12 @@ pub enum SubgraphExecutorError { #[error(transparent)] #[strum(serialize = "SUBGRAPH_HTTPS_CERTS_FAILURE")] TlsCertificatesError(#[from] TlsCertificatesError), + #[error("Unable to create circuit breaker: {0} for subgraph \"{1}\"")] + #[strum(serialize = "SUBGRAPH_CIRCUIT_BREAKER_CREATION_FAILURE")] + CircuitBreakerCreationError(CircuitBreakerError, String), + #[error("Rejected by the circuit breaker")] + #[strum(serialize = "SUBGRAPH_CIRCUIT_BREAKER_REJECTED")] + CircuitBreakerRejected, #[error("Unsupported content-type '{0}': expected 'multipart/mixed' or 'text/event-stream' for HTTP subscriptions")] #[strum(serialize = "SUBGRAPH_SUBSCRIPTION_UNSUPPORTED_CONTENT_TYPE")] UnsupportedContentTypeError(String), @@ -93,6 +102,9 @@ pub enum SubgraphExecutorError { #[error("HTTP Callback protocol configured for subgraph but no callback configuration provided for router")] #[strum(serialize = "SUBGRAPH_HTTP_CALLBACK_NOT_CONFIGURED")] HttpCallbackNotConfigured, + #[error("Subgraph internal server error")] + #[strum(serialize = "SUBGRAPH_INTERNAL_SERVER_ERROR")] + InternalServerError(Box>), } impl SubgraphExecutorError { diff --git a/lib/executor/src/executors/http.rs b/lib/executor/src/executors/http.rs index 0ef7c9813..748b0f40e 100644 --- a/lib/executor/src/executors/http.rs +++ b/lib/executor/src/executors/http.rs @@ -297,7 +297,7 @@ impl SubgraphExecutor for HTTPSubgraphExecutor { mut execution_request: SubgraphExecutionRequest<'a>, timeout: Option, plugin_req_state: Option<&'a PluginRequestState<'a>>, - ) -> Result, SubgraphExecutorError> { + ) -> Result, SubgraphExecutorError> { let mut body = build_request_body(&execution_request)?; self.header_map.iter().for_each(|(key, value)| { @@ -643,13 +643,11 @@ pub struct SubgraphHttpResponse { } impl SubgraphHttpResponse { - fn deserialize_http_response<'a>(self) -> Result, SubgraphExecutorError> { - SubgraphResponse::deserialize_from_bytes(self.body.clone()).map( - |mut resp: SubgraphResponse<'a>| { - // headers are under arc, zero cost clone - resp.headers = Some(self.headers.clone()); - resp - }, - ) + fn deserialize_http_response(self) -> Result, SubgraphExecutorError> { + SubgraphResponse::deserialize_from_bytes(self.body).map(|mut resp: SubgraphResponse| { + resp.headers = Some(self.headers); + resp.status = Some(self.status); + resp + }) } } diff --git a/lib/executor/src/executors/http_callback.rs b/lib/executor/src/executors/http_callback.rs index ff3907c24..e84494f9d 100644 --- a/lib/executor/src/executors/http_callback.rs +++ b/lib/executor/src/executors/http_callback.rs @@ -144,7 +144,7 @@ impl SubgraphExecutor for HttpCallbackSubgraphExecutor { _execution_request: SubgraphExecutionRequest<'a>, _timeout: Option, _plugin_req_state: Option<&'a PluginRequestState<'a>>, - ) -> Result, SubgraphExecutorError> { + ) -> Result, SubgraphExecutorError> { Err(SubgraphExecutorError::HttpCallbackNoSingle) } diff --git a/lib/executor/src/executors/map.rs b/lib/executor/src/executors/map.rs index 1d8b6585b..0ed48d299 100644 --- a/lib/executor/src/executors/map.rs +++ b/lib/executor/src/executors/map.rs @@ -1,11 +1,12 @@ use std::{ - collections::{BTreeMap, HashMap}, + collections::{BTreeMap, HashMap, HashSet}, sync::Arc, time::Duration, }; use dashmap::DashMap; -use futures::stream::BoxStream; +use futures::{stream::BoxStream, FutureExt}; +use hive_console_sdk::circuit_breaker::{CircuitBreakerBuilder, CircuitBreakerError}; use hive_router_config::{ override_subgraph_urls::UrlOrExpression, subscriptions::SubscriptionProtocol, traffic_shaping::DurationOrExpression, HiveRouterConfig, @@ -19,11 +20,12 @@ use hive_router_internal::{ expressions::vrl::compiler::Program as VrlProgram, inflight::InFlightMap, telemetry::TelemetryContext, }; -use http::Uri; +use http::{StatusCode, Uri}; use hyper_util::{ client::legacy::Client, rt::{TokioExecutor, TokioTimer}, }; +use recloser::AsyncRecloser; use tokio::sync::Semaphore; use crate::{ @@ -53,6 +55,24 @@ type StaticEndpointsBySubgraphMap = DashMap; type ExpressionEndpointsBySubgraphMap = HashMap; type TimeoutsBySubgraph = DashMap; +#[derive(Clone)] +struct SubgraphCircuitBreaker { + recloser: AsyncRecloser, + /// HTTP status codes that should be counted as failures by the + /// circuit breaker. Wrapped in `Arc` so the value is cheap to clone + /// out of the `DashMap`. + error_status_codes: Arc>, +} +type CircuitBreakersBySubgraph = DashMap; + +lazy_static::lazy_static! { + static ref DEFAULT_CIRCUIT_BREAKER_ERROR_STATUS_CODES: Arc> = Arc::new({ + let mut set = HashSet::new(); + set.insert(StatusCode::SERVICE_UNAVAILABLE); + set + }); +} + struct ResolvedSubgraphConfig<'a> { client: Arc, timeout_config: &'a DurationOrExpression, @@ -71,6 +91,7 @@ pub struct SubgraphExecutorMap { /// Only contains subgraphs with expression-based endpoint overrides expression_endpoints_by_subgraph: ExpressionEndpointsBySubgraphMap, timeouts_by_subgraph: TimeoutsBySubgraph, + circuit_breakers_by_subgraph: CircuitBreakersBySubgraph, global_timeout: DurationOrProgram, config: Arc, client: Arc, @@ -112,6 +133,7 @@ impl SubgraphExecutorMap { max_connections_per_host, in_flight_requests: InFlightMap::default(), timeouts_by_subgraph: Default::default(), + circuit_breakers_by_subgraph: Default::default(), global_timeout, telemetry_context, callback_subscriptions: Arc::new(DashMap::new()), @@ -154,6 +176,7 @@ impl SubgraphExecutorMap { subgraph_executor_map.register_static_endpoint(subgraph_name, &endpoint_str); subgraph_executor_map.register_executor(subgraph_name, &endpoint_str, false)?; subgraph_executor_map.register_subgraph_timeout(subgraph_name)?; + subgraph_executor_map.register_circuit_breaker(subgraph_name)?; } Ok(subgraph_executor_map) @@ -213,9 +236,61 @@ impl SubgraphExecutorMap { let mut execution_result = match execution_result { Some(execution_result) => execution_result, None => { - executor - .execute(execution_request, timeout, plugin_req_state) - .await? + let exec_fut = executor.execute(execution_request, timeout, plugin_req_state); + // Clone the circuit breaker out of the DashMap before awaiting to avoid + // holding the shard read-lock across an await point (potential deadlock). + let circuit_breaker = self + .circuit_breakers_by_subgraph + .get(subgraph_name) + .map(|r| r.value().clone()); + match circuit_breaker { + Some(circuit_breaker) => { + let SubgraphCircuitBreaker { + recloser, + error_status_codes, + } = circuit_breaker; + // Treat configured status codes as errors so the + // circuit breaker can track them. Default: only 503. + let exec_fut = exec_fut.map(move |exec_res| match exec_res { + Ok(succ_res) => { + if succ_res + .status + .is_some_and(|status| error_status_codes.contains(&status)) + { + // Save the original response in case the circuit breaker treats it as an error and returns it through the error variant + Err(SubgraphExecutorError::InternalServerError(succ_res.into())) + } else { + Ok(succ_res) + } + } + Err(err) => Err(err), + }); + recloser + .call(exec_fut) + .map(|exec_res| match exec_res { + Err(recloser::Error::Inner(e)) => match e { + // If it's an error we wrapped above, unwrap it and return the original successful response instead of treating it as a failure for the caller + // This allows the circuit breaker to track 5xx responses without impacting the actual response returned to the client, + // which is important for use cases where clients want to handle 5xx responses differently but still want the circuit breaker to be aware of them. + SubgraphExecutorError::InternalServerError(succ_ress) => { + Ok(*succ_ress) + } + other_err => Err(other_err), + }, + Err(recloser::Error::Rejected) => { + // Record circuit breaker rejection in metrics + self.telemetry_context + .metrics + .circuit_breaker + .record_rejected_request(subgraph_name); + Err(SubgraphExecutorError::CircuitBreakerRejected) + } + Ok(res) => Ok(res), + }) + .await? + } + None => exec_fut.await?, + } } }; @@ -632,6 +707,88 @@ impl SubgraphExecutorMap { Ok(()) } + + /// Registers a circuit breaker for a specific subgraph. + /// If the subgraph already has a circuit breaker registered, it will do nothing. + fn register_circuit_breaker(&self, subgraph_name: &str) -> Result<(), SubgraphExecutorError> { + if self + .circuit_breakers_by_subgraph + .contains_key(subgraph_name) + { + return Ok(()); + } + + let global_circuit_breaker_cfg = self.config.traffic_shaping.all.circuit_breaker.as_ref(); + let subgraph_circuit_breaker_cfg = self + .config + .traffic_shaping + .subgraphs + .get(subgraph_name) + .and_then(|s| s.circuit_breaker.as_ref()); + + let circuit_breaker_enabled = subgraph_circuit_breaker_cfg + .and_then(|c| c.enabled) + .or_else(|| global_circuit_breaker_cfg.and_then(|c| c.enabled)) + .unwrap_or(false); + + if circuit_breaker_enabled { + let mut builder = CircuitBreakerBuilder::default(); + + if let Some(error_threshold) = subgraph_circuit_breaker_cfg + .and_then(|c| c.error_threshold) + .or_else(|| global_circuit_breaker_cfg.and_then(|c| c.error_threshold)) + { + let error_threshold = error_threshold.as_f64() as f32; + if !error_threshold.is_finite() { + return Err(SubgraphExecutorError::CircuitBreakerCreationError( + CircuitBreakerError::InvalidErrorThreshold(error_threshold), + subgraph_name.to_string(), + )); + } + builder = builder.error_threshold(error_threshold); + } + + if let Some(volume_threshold) = subgraph_circuit_breaker_cfg + .and_then(|c| c.volume_threshold) + .or_else(|| global_circuit_breaker_cfg.and_then(|c| c.volume_threshold)) + { + builder = builder.volume_threshold(volume_threshold); + } + + if let Some(reset_timeout) = subgraph_circuit_breaker_cfg + .and_then(|c| c.reset_timeout) + .or_else(|| global_circuit_breaker_cfg.and_then(|c| c.reset_timeout)) + { + builder = builder.reset_timeout(reset_timeout); + } + + let recloser = builder.build_async().map_err(|e| { + SubgraphExecutorError::CircuitBreakerCreationError(e, subgraph_name.to_string()) + })?; + + let error_status_codes = subgraph_circuit_breaker_cfg + .and_then(|c| c.error_status_codes.as_ref()) + .or_else(|| global_circuit_breaker_cfg.and_then(|c| c.error_status_codes.as_ref())) + .map(|codes| { + codes + .iter() + .copied() + .collect::>() + .into() + }) + .unwrap_or_else(|| DEFAULT_CIRCUIT_BREAKER_ERROR_STATUS_CODES.clone()); + + self.circuit_breakers_by_subgraph.insert( + subgraph_name.to_string(), + SubgraphCircuitBreaker { + recloser, + error_status_codes, + }, + ); + } + + Ok(()) + } } /// Resolves a timeout DurationOrProgram to a concrete Duration. diff --git a/lib/executor/src/executors/websocket.rs b/lib/executor/src/executors/websocket.rs index f9754839d..9e37cb009 100644 --- a/lib/executor/src/executors/websocket.rs +++ b/lib/executor/src/executors/websocket.rs @@ -46,7 +46,7 @@ impl SubgraphExecutor for WsSubgraphExecutor { execution_request: SubgraphExecutionRequest<'a>, _timeout: Option, _plugin_req_state: Option<&'a crate::plugin_context::PluginRequestState<'a>>, - ) -> Result, SubgraphExecutorError> { + ) -> Result, SubgraphExecutorError> { let endpoint = self.endpoint.clone(); let subgraph_name = self.subgraph_name.clone(); let tls_config = self.tls_config.clone(); diff --git a/lib/executor/src/response/subgraph_response.rs b/lib/executor/src/response/subgraph_response.rs index 5bbd81821..1b1c4d38c 100644 --- a/lib/executor/src/response/subgraph_response.rs +++ b/lib/executor/src/response/subgraph_response.rs @@ -6,7 +6,7 @@ use crate::{ response::{graphql_error::GraphQLError, value::Value}, }; use bytes::Bytes; -use http::HeaderMap; +use http::{HeaderMap, StatusCode}; use serde::de::{self, Deserializer, MapAccess, Visitor}; #[derive(Debug, Default)] @@ -16,6 +16,7 @@ pub struct SubgraphResponse<'a> { pub extensions: Option>, pub headers: Option>, pub bytes: Option, + pub status: Option, } impl<'de> de::Deserialize<'de> for SubgraphResponse<'de> { @@ -80,6 +81,7 @@ impl<'de> de::Deserialize<'de> for SubgraphResponse<'de> { data, errors, extensions, + status: None, headers: None, bytes: None, }) @@ -92,7 +94,7 @@ impl<'de> de::Deserialize<'de> for SubgraphResponse<'de> { } } -impl<'a> SubgraphResponse<'a> { +impl SubgraphResponse<'_> { pub fn deserialize_from_bytes( bytes: Bytes, ) -> Result, SubgraphExecutorError> { diff --git a/lib/hive-console-sdk/Cargo.toml b/lib/hive-console-sdk/Cargo.toml index 6235ae7a7..7e3bcf16f 100644 --- a/lib/hive-console-sdk/Cargo.toml +++ b/lib/hive-console-sdk/Cargo.toml @@ -28,7 +28,7 @@ sha2 = { version = "0.10.8", features = ["std"] } tokio-util = { workspace = true } regex-automata = { workspace = true } retry-policies = { workspace = true } -recloser = "1.3.1" +recloser = { workspace = true } futures-util = { workspace = true } typify = "0.6.0" regress = "0.11.0" diff --git a/lib/internal/src/telemetry/metrics/catalog.rs b/lib/internal/src/telemetry/metrics/catalog.rs index 98f9141fb..c7d080c91 100644 --- a/lib/internal/src/telemetry/metrics/catalog.rs +++ b/lib/internal/src/telemetry/metrics/catalog.rs @@ -108,6 +108,8 @@ pub mod names { pub const PLAN_CACHE_REQUESTS_TOTAL: &str = "hive.router.plan_cache.requests_total"; pub const PLAN_CACHE_DURATION: &str = "hive.router.plan_cache.duration"; pub const PLAN_CACHE_SIZE: &str = "hive.router.plan_cache.size"; + pub const CIRCUIT_BREAKER_REJECTED_REQUESTS: &str = + "hive.router.circuit_breaker.rejected_requests_total"; pub const PERSISTED_DOCUMENTS_STORAGE_FAILURES_TOTAL: &str = "hive.router.persisted_documents.storage.failures_total"; pub const PERSISTED_DOCUMENTS_EXTRACT_MISSING_ID_TOTAL: &str = @@ -242,6 +244,10 @@ pub(crate) const METRIC_SPECS: &[(&str, &[&str])] = &[ (names::PLAN_CACHE_REQUESTS_TOTAL, &[labels::RESULT]), (names::PLAN_CACHE_DURATION, &[labels::RESULT]), (names::PLAN_CACHE_SIZE, &[]), + ( + names::CIRCUIT_BREAKER_REJECTED_REQUESTS, + &[labels::SUBGRAPH_NAME], + ), (names::PERSISTED_DOCUMENTS_STORAGE_FAILURES_TOTAL, &[]), (names::PERSISTED_DOCUMENTS_EXTRACT_MISSING_ID_TOTAL, &[]), ( diff --git a/lib/internal/src/telemetry/metrics/circuit_breaker_metrics.rs b/lib/internal/src/telemetry/metrics/circuit_breaker_metrics.rs new file mode 100644 index 000000000..970e427f2 --- /dev/null +++ b/lib/internal/src/telemetry/metrics/circuit_breaker_metrics.rs @@ -0,0 +1,39 @@ +use opentelemetry::{metrics::Counter, metrics::Meter, KeyValue}; + +use crate::telemetry::metrics::catalog::{labels, names}; + +#[cfg(debug_assertions)] +use crate::telemetry::metrics::catalog::debug_assert_attrs; + +pub struct CircuitBreakerMetrics { + rejected_requests: Option>, +} + +impl CircuitBreakerMetrics { + pub fn new(meter: Option<&Meter>) -> Self { + let rejected_requests = meter.map(|meter| { + meter + .u64_counter(names::CIRCUIT_BREAKER_REJECTED_REQUESTS) + .with_unit("{request}") + .with_description("Number of requests rejected by circuit breaker") + .build() + }); + + Self { rejected_requests } + } + + /// Records a rejected request due to circuit breaker being open + pub fn record_rejected_request(&self, subgraph_name: &str) { + if let Some(counter) = &self.rejected_requests { + let attributes = [KeyValue::new( + labels::SUBGRAPH_NAME, + subgraph_name.to_string(), + )]; + + #[cfg(debug_assertions)] + debug_assert_attrs(names::CIRCUIT_BREAKER_REJECTED_REQUESTS, &attributes); + + counter.add(1, &attributes); + } + } +} diff --git a/lib/internal/src/telemetry/metrics/mod.rs b/lib/internal/src/telemetry/metrics/mod.rs index 97e6a322f..cf1df1ba3 100644 --- a/lib/internal/src/telemetry/metrics/mod.rs +++ b/lib/internal/src/telemetry/metrics/mod.rs @@ -1,6 +1,7 @@ pub mod cache_metrics; mod capture; pub mod catalog; +pub mod circuit_breaker_metrics; pub mod coprocessor_metrics; pub mod graphql_metrics; pub mod http_client_metrics; @@ -14,6 +15,7 @@ pub use setup::{build_meter_provider_from_config, MetricsSetup, PrometheusRuntim use opentelemetry::metrics::Meter; use crate::telemetry::metrics::cache_metrics::CacheMetrics; +use crate::telemetry::metrics::circuit_breaker_metrics::CircuitBreakerMetrics; use crate::telemetry::metrics::coprocessor_metrics::CoprocessorMetrics; use crate::telemetry::metrics::graphql_metrics::GraphQLMetrics; use crate::telemetry::metrics::http_client_metrics::HttpClientMetrics; @@ -27,6 +29,7 @@ pub struct Metrics { pub graphql: GraphQLMetrics, pub supergraph: SupergraphMetrics, pub cache: CacheMetrics, + pub circuit_breaker: CircuitBreakerMetrics, pub persisted_documents: PersistedDocumentsMetrics, pub coprocessor: CoprocessorMetrics, } @@ -39,6 +42,7 @@ impl Metrics { graphql: GraphQLMetrics::new(meter), supergraph: SupergraphMetrics::new(meter), cache: CacheMetrics::new(meter), + circuit_breaker: CircuitBreakerMetrics::new(meter), persisted_documents: PersistedDocumentsMetrics::new(meter), coprocessor: CoprocessorMetrics::new(meter), } diff --git a/lib/router-config/src/primitives/mod.rs b/lib/router-config/src/primitives/mod.rs index 0608edae6..21eb5583a 100644 --- a/lib/router-config/src/primitives/mod.rs +++ b/lib/router-config/src/primitives/mod.rs @@ -2,6 +2,7 @@ pub mod absolute_path; pub mod file_path; pub mod http_header; pub mod ip_network; +pub mod percentage; pub mod retry_policy; pub mod single_or_multiple; pub mod toggle; diff --git a/lib/router-config/src/primitives/percentage.rs b/lib/router-config/src/primitives/percentage.rs new file mode 100644 index 000000000..5a591f379 --- /dev/null +++ b/lib/router-config/src/primitives/percentage.rs @@ -0,0 +1,72 @@ +use std::{fmt::Display, str::FromStr}; + +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Copy)] +pub struct Percentage { + value: f64, +} + +impl Percentage { + pub fn from_f64(value: f64) -> Result { + if !(0.0..=1.0).contains(&value) { + return Err(format!( + "Percentage value must be between 0 and 1, got: {}", + value + )); + } + Ok(Percentage { value }) + } + pub fn as_f64(&self) -> f64 { + self.value + } +} + +impl FromStr for Percentage { + type Err = String; + + fn from_str(s: &str) -> Result { + let s_trimmed = s.trim(); + if let Some(number_part) = s_trimmed.strip_suffix('%') { + let value: f64 = number_part.parse().map_err(|err| { + format!( + "Failed to parse percentage value '{}': {}", + number_part, err + ) + })?; + Ok(Percentage::from_f64(value / 100.0)?) + } else { + Err(format!( + "Percentage value must end with '%', got: '{}'", + s_trimmed + )) + } + } +} + +impl Display for Percentage { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}%", self.value * 100.0) + } +} + +// Deserializer from `n%` string to `Percentage` struct +impl<'de> Deserialize<'de> for Percentage { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + Percentage::from_str(&s).map_err(serde::de::Error::custom) + } +} + +// Serializer from `Percentage` struct to `n%` string +impl Serialize for Percentage { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + serializer.serialize_str(&self.to_string()) + } +} diff --git a/lib/router-config/src/traffic_shaping.rs b/lib/router-config/src/traffic_shaping.rs index 280c40eeb..bafcd871f 100644 --- a/lib/router-config/src/traffic_shaping.rs +++ b/lib/router-config/src/traffic_shaping.rs @@ -1,10 +1,15 @@ -use std::{collections::HashMap, time::Duration}; +use std::{ + collections::{HashMap, HashSet}, + time::Duration, +}; -use schemars::JsonSchema; +use http::StatusCode; +use schemars::{JsonSchema, Schema, SchemaGenerator}; use serde::{Deserialize, Serialize}; use crate::primitives::{ - file_path::FilePath, http_header::HttpHeaderName, single_or_multiple::SingleOrMultiple, + file_path::FilePath, http_header::HttpHeaderName, percentage::Percentage, + single_or_multiple::SingleOrMultiple, }; #[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)] @@ -91,6 +96,11 @@ pub struct TrafficShapingExecutorSubgraphConfig { /// ``` pub request_timeout: Option, + /// Circuit Breaker configuration for the subgraph. + /// When the circuit breaker is open, requests to the subgraph will be short-circuited and an error will be returned to the client. + /// The circuit breaker will be triggered based on the error rate of requests to the subgraph, and will attempt to reset after a certain timeout. + pub circuit_breaker: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub tls: Option, @@ -143,6 +153,12 @@ pub struct TrafficShapingExecutorGlobalConfig { #[serde(default = "default_request_timeout")] pub request_timeout: DurationOrExpression, + /// Circuit Breaker configuration for all subgraphs. + /// When the circuit breaker is open, requests to the subgraph will be + /// short-circuited and an error will be returned to the client. + /// The circuit breaker will be triggered based on the error rate of requests to the subgraph, and will attempt to reset after a certain timeout. + pub circuit_breaker: Option, + #[serde(skip_serializing_if = "Option::is_none")] pub tls: Option, @@ -184,6 +200,7 @@ impl Default for TrafficShapingExecutorGlobalConfig { pool_idle_timeout: default_pool_idle_timeout(), dedupe_enabled: default_dedupe_enabled(), request_timeout: default_request_timeout(), + circuit_breaker: default_circuit_breaker_config(), tls: None, allow_only_http2: false, } @@ -321,6 +338,10 @@ impl Default for TrafficShapingRouterConfig { } } +fn default_circuit_breaker_config() -> Option { + None +} + #[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)] #[serde(deny_unknown_fields)] pub struct ServerTLSConfig { @@ -329,6 +350,98 @@ pub struct ServerTLSConfig { pub client_auth: Option, } +#[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)] +#[serde(deny_unknown_fields)] +pub struct TrafficShapingSubgraphCircuitBreakerConfig { + /// Enable or disable the circuit breaker for the subgraph. + /// Default: false (circuit breaker is disabled) + /// + /// When unset on a subgraph-level configuration, the value falls back + /// to the value defined in the global (`all`) circuit breaker + /// configuration. + #[serde(default)] + pub enabled: Option, + /// Percentage after what the circuit breaker should kick in. + /// Default: 50% + #[serde(default)] + #[schemars(with = "String")] + pub error_threshold: Option, + /// Count of requests before starting evaluating. + /// Default: 5 + #[serde(default)] + pub volume_threshold: Option, + /// The duration after which the circuit breaker will attempt to retry sending requests to the subgraph. + /// Default: 30s + #[serde( + default, + deserialize_with = "humantime_serde::deserialize", + serialize_with = "humantime_serde::serialize" + )] + #[schemars(with = "String")] + pub reset_timeout: Option, + /// HTTP status codes returned by the subgraph that should be counted as + /// failures by the circuit breaker. + /// + /// Only responses whose status code is contained in this list will be + /// recorded as failures. Responses with any other status code (including + /// other 5xx codes) are treated as successes from the circuit breaker's + /// point of view. + /// + /// Default: `[503]` + #[serde( + default, + deserialize_with = "deserialize_status_codes", + serialize_with = "serialize_status_codes", + skip_serializing_if = "Option::is_none" + )] + #[schemars(schema_with = "schema_status_codes")] + pub error_status_codes: Option>, +} + +pub fn serialize_status_codes( + status_codes: &Option>, + serializer: S, +) -> Result +where + S: serde::Serializer, +{ + match status_codes { + Some(codes) => { + let codes_as_u16: Vec = codes.iter().map(|code| code.as_u16()).collect(); + codes_as_u16.serialize(serializer) + } + None => serializer.serialize_none(), + } +} + +pub fn deserialize_status_codes<'de, D>( + deserializer: D, +) -> Result>, D::Error> +where + D: serde::Deserializer<'de>, +{ + let opt_vec = Option::>::deserialize(deserializer)?; + Ok(opt_vec.map(|vec| { + vec.into_iter() + .filter_map(|code| StatusCode::from_u16(code).ok()) + .collect() + })) +} + +pub fn schema_status_codes(_generator: &mut SchemaGenerator) -> Schema { + // Schema for `Option>` represented as an array of valid HTTP + // status codes (integers in the range 100-599) or null. + schemars::json_schema!({ + "type": ["array", "null"], + "items": { + "type": "integer", + "minimum": 100, + "maximum": 599 + }, + "uniqueItems": true + }) +} + #[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)] #[serde(deny_unknown_fields)] pub struct ServerClientAuthConfig { diff --git a/lib/router-config/src/usage_reporting.rs b/lib/router-config/src/usage_reporting.rs index 474b77bbb..4e353fd32 100644 --- a/lib/router-config/src/usage_reporting.rs +++ b/lib/router-config/src/usage_reporting.rs @@ -1,8 +1,10 @@ -use std::{fmt::Display, str::FromStr, time::Duration}; +use std::time::Duration; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +use crate::primitives::percentage::Percentage; + #[derive(Debug, Deserialize, Serialize, JsonSchema, Clone)] #[serde(untagged)] pub enum UsageReportingExclude { @@ -189,72 +191,3 @@ fn default_connect_timeout() -> Duration { fn default_flush_interval() -> Duration { Duration::from_secs(5) } - -#[derive(Debug, Clone, Copy)] -pub struct Percentage { - value: f64, -} - -impl Percentage { - pub fn from_f64(value: f64) -> Result { - if !(0.0..=1.0).contains(&value) { - return Err(format!( - "Percentage value must be between 0 and 1, got: {}", - value - )); - } - Ok(Percentage { value }) - } - pub fn as_f64(&self) -> f64 { - self.value - } -} - -impl FromStr for Percentage { - type Err = String; - - fn from_str(s: &str) -> Result { - let s_trimmed = s.trim(); - if let Some(number_part) = s_trimmed.strip_suffix('%') { - let value: f64 = number_part.parse().map_err(|err| { - format!( - "Failed to parse percentage value '{}': {}", - number_part, err - ) - })?; - Ok(Percentage::from_f64(value / 100.0)?) - } else { - Err(format!( - "Percentage value must end with '%', got: '{}'", - s_trimmed - )) - } - } -} - -impl Display for Percentage { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}%", self.value * 100.0) - } -} - -// Deserializer from `n%` string to `Percentage` struct -impl<'de> Deserialize<'de> for Percentage { - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - let s = String::deserialize(deserializer)?; - Percentage::from_str(&s).map_err(serde::de::Error::custom) - } -} - -// Serializer from `Percentage` struct to `n%` string -impl Serialize for Percentage { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - serializer.serialize_str(&self.to_string()) - } -}