diff --git a/Cargo.lock b/Cargo.lock index 29a1e633bc..392cee2055 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -297,6 +297,7 @@ dependencies = [ "libc", "libtest-mimic", "linkme", + "little-loadshedder", "lru", "maplit", "mediatype", @@ -3929,6 +3930,16 @@ version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" +[[package]] +name = "little-loadshedder" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1edea14a875dba8659cb864480c27e8eb2d099542b657729a7567625a2c1d65a" +dependencies = [ + "tokio", + "tower", +] + [[package]] name = "lock_api" version = "0.4.12" diff --git a/apollo-router/Cargo.toml b/apollo-router/Cargo.toml index d2a714c9c8..00cf3918b6 100644 --- a/apollo-router/Cargo.toml +++ b/apollo-router/Cargo.toml @@ -116,6 +116,8 @@ jsonwebtoken = "9.3.0" lazy_static = "1.4.0" libc = "0.2.155" linkme = "0.3.27" +# little-loadshedder = { path = "../../little-loadshedder" } +little-loadshedder = { version = "0.2.0" } lru = "0.12.3" maplit = "1.0.2" mediatype = "0.19.18" diff --git a/apollo-router/src/axum_factory/axum_http_server_factory.rs b/apollo-router/src/axum_factory/axum_http_server_factory.rs index 08df933dc6..caafd407ac 100644 --- a/apollo-router/src/axum_factory/axum_http_server_factory.rs +++ b/apollo-router/src/axum_factory/axum_http_server_factory.rs @@ -1,5 +1,4 @@ //! Axum http server factory. Axum provides routing capability on top of Hyper HTTP. -use std::fmt::Display; use std::pin::Pin; use std::sync::atomic::AtomicBool; use std::sync::atomic::AtomicU64; @@ -33,6 +32,7 @@ use serde_json::json; use tokio::net::UnixListener; use tokio::sync::mpsc; use tokio_rustls::TlsAcceptor; +use tower::load_shed::error::Overloaded; use tower::service_fn; use tower::BoxError; use tower::ServiceBuilder; @@ -429,7 +429,7 @@ pub(crate) fn span_mode(configuration: &Configuration) -> SpanMode { .unwrap_or_default() } -async fn decompression_error(_error: BoxError) -> axum::response::Response { +async fn decompression_error(_error: BoxError) -> impl IntoResponse { (StatusCode::BAD_REQUEST, "cannot decompress request body").into_response() } @@ -657,10 +657,10 @@ async fn handle_graphql( let processing_seconds = dur.as_secs_f64(); f64_histogram!( - "apollo.router.processing.time", - "Time spent by the router actually working on the request, not waiting for its network calls or other queries being processed", - processing_seconds - ); + "apollo.router.processing.time", + "Time spent by the router actually working on the request, not waiting for its network calls or other queries being processed", + processing_seconds + ); match res { Err(err) => internal_server_error(err), @@ -689,23 +689,32 @@ async fn handle_graphql( fn internal_server_error(err: T) -> Response where - T: Display, + T: Into, { + let err: BoxError = err.into(); + + let code = if err.is::() { + StatusCode::SERVICE_UNAVAILABLE + } else { + StatusCode::INTERNAL_SERVER_ERROR + }; + tracing::error!( - code = "INTERNAL_SERVER_ERROR", + code = code.to_string(), %err, ); // This intentionally doesn't include an error message as this could represent leakage of internal information. // The error message is logged above. let error = graphql::Error::builder() - .message("internal server error") + .message(code.to_string()) + // Note: Decide exactly what this extension_code should be for SERVICE_UNAVAILABLE .extension_code("INTERNAL_SERVER_ERROR") .build(); let response = graphql::Response::builder().error(error).build(); - (StatusCode::INTERNAL_SERVER_ERROR, Json(json!(response))).into_response() + (code, Json(json!(response))).into_response() } struct CancelHandler<'a> { diff --git a/apollo-router/src/plugin/test/service.rs b/apollo-router/src/plugin/test/service.rs index fa1388ff1b..4f5c2dafc4 100644 --- a/apollo-router/src/plugin/test/service.rs +++ b/apollo-router/src/plugin/test/service.rs @@ -49,7 +49,7 @@ macro_rules! mock_service { std::task::Poll::Ready(Ok(())) } fn call(&mut self, req: $request_type) -> Self::Future { - let r = self.call(req); + let r = self.call(req); Box::pin(async move { r }) } } diff --git a/apollo-router/src/plugins/limits/layer.rs b/apollo-router/src/plugins/limits/layer.rs index 680a95c41b..337727c6ba 100644 --- a/apollo-router/src/plugins/limits/layer.rs +++ b/apollo-router/src/plugins/limits/layer.rs @@ -23,20 +23,29 @@ struct BodyLimitControlInner { current: AtomicUsize, } +impl Clone for BodyLimitControlInner { + fn clone(&self) -> Self { + Self { + limit: AtomicUsize::new(self.limit.load(std::sync::atomic::Ordering::SeqCst)), + current: AtomicUsize::new(0), + } + } +} + /// This structure allows the body limit to be updated dynamically. /// It also allows the error message to be updated #[derive(Clone)] pub(crate) struct BodyLimitControl { - inner: Arc, + inner: BodyLimitControlInner, } impl BodyLimitControl { pub(crate) fn new(limit: usize) -> Self { Self { - inner: Arc::new(BodyLimitControlInner { + inner: BodyLimitControlInner { limit: AtomicUsize::new(limit), current: AtomicUsize::new(0), - }), + }, } } diff --git a/apollo-router/src/plugins/test.rs b/apollo-router/src/plugins/test.rs index ec1d9c509e..501388dcce 100644 --- a/apollo-router/src/plugins/test.rs +++ b/apollo-router/src/plugins/test.rs @@ -8,6 +8,7 @@ use apollo_compiler::validation::Valid; use serde_json::Value; use tower::BoxError; use tower::ServiceBuilder; +use tower::ServiceExt; use tower_service::Service; use crate::introspection::IntrospectionCache; @@ -149,7 +150,7 @@ impl PluginTestHarness { .service_fn(move |req: router::Request| async move { (response_fn)(req).await }), ); - self.plugin.router_service(service).call(request).await + self.plugin.router_service(service).oneshot(request).await } pub(crate) async fn call_supergraph( @@ -162,7 +163,10 @@ impl PluginTestHarness { .service_fn(move |req: supergraph::Request| async move { Ok((response_fn)(req)) }), ); - self.plugin.supergraph_service(service).call(request).await + self.plugin + .supergraph_service(service) + .oneshot(request) + .await } #[allow(dead_code)] @@ -176,7 +180,10 @@ impl PluginTestHarness { .service_fn(move |req: execution::Request| async move { Ok((response_fn)(req)) }), ); - self.plugin.execution_service(service).call(request).await + self.plugin + .execution_service(service) + .oneshot(request) + .await } #[allow(dead_code)] diff --git a/apollo-router/src/plugins/traffic_shaping/deduplication.rs b/apollo-router/src/plugins/traffic_shaping/deduplication.rs index 639d0d12b9..1587170539 100644 --- a/apollo-router/src/plugins/traffic_shaping/deduplication.rs +++ b/apollo-router/src/plugins/traffic_shaping/deduplication.rs @@ -182,19 +182,20 @@ where type Error = BoxError; type Future = BoxFuture<'static, Result>; - fn poll_ready(&mut self, _cx: &mut std::task::Context<'_>) -> Poll> { - Poll::Ready(Ok(())) + fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { + self.service.poll_ready(cx) } fn call(&mut self, request: SubgraphRequest) -> Self::Future { let service = self.service.clone(); + let inner = std::mem::replace(&mut self.service, service); if request.operation_kind == OperationKind::Query { let wait_map = self.wait_map.clone(); - Box::pin(async move { Self::dedup(service, wait_map, request).await }) + Box::pin(async move { Self::dedup(inner, wait_map, request).await }) } else { - Box::pin(async move { service.oneshot(request).await }) + Box::pin(async move { inner.oneshot(request).await }) } } } diff --git a/apollo-router/src/plugins/traffic_shaping/mod.rs b/apollo-router/src/plugins/traffic_shaping/mod.rs index e77322fffa..840572d03b 100644 --- a/apollo-router/src/plugins/traffic_shaping/mod.rs +++ b/apollo-router/src/plugins/traffic_shaping/mod.rs @@ -329,6 +329,7 @@ impl TrafficShaping { .boxed() }, ) + .load_shed() .layer(TimeoutLayer::new( self.config .router @@ -394,42 +395,42 @@ impl TrafficShaping { }); Either::A(ServiceBuilder::new() - .option_layer(config.shaping.deduplicate_query.unwrap_or_default().then( QueryDeduplicationLayer::default )) - .map_future_with_request_data( - |req: &subgraph::Request| req.context.clone(), - move |ctx, future| { - async { - let response: Result = future.await; - match response { - Err(error) if error.is::() => { - subgraph::Response::error_builder() - .status_code(StatusCode::GATEWAY_TIMEOUT) - .error::(Elapsed::new().into()) - .context(ctx) - .build() - } - Err(error) if error.is::() => { - subgraph::Response::error_builder() - .status_code(StatusCode::TOO_MANY_REQUESTS) - .error::(RateLimited::new().into()) - .context(ctx) - .build() - } - _ => response, + .map_future_with_request_data( + |req: &subgraph::Request| req.context.clone(), + move |ctx, future| { + async { + let response: Result = future.await; + match response { + Err(error) if error.is::() => { + subgraph::Response::error_builder() + .status_code(StatusCode::GATEWAY_TIMEOUT) + .error::(Elapsed::new().into()) + .context(ctx) + .build() } - }.boxed() - }, - ) - .layer(TimeoutLayer::new( - config.shaping - .timeout - .unwrap_or(DEFAULT_TIMEOUT), - )) - .option_layer(retry) - .option_layer(rate_limit) + Err(error) if error.is::() => { + subgraph::Response::error_builder() + .status_code(StatusCode::TOO_MANY_REQUESTS) + .error::(RateLimited::new().into()) + .context(ctx) + .build() + } + _ => response, + } + }.boxed() + }, + ) + .load_shed() + .layer(TimeoutLayer::new( + config.shaping + .timeout + .unwrap_or(DEFAULT_TIMEOUT), + )) + .option_layer(retry) + .option_layer(rate_limit) .service(service) .map_request(move |mut req: SubgraphRequest| { if let Some(compression) = config.shaping.compression { @@ -440,6 +441,7 @@ impl TrafficShaping { req })) } else { + // Note: Does this need more thinking about? Either::B(service) } } @@ -806,23 +808,17 @@ mod test { .body() .errors .is_empty()); - assert_eq!( - plugin - .as_any() - .downcast_ref::() - .unwrap() - .subgraph_service_internal("test", test_service.clone()) - .oneshot(SubgraphRequest::fake_builder().build()) - .await - .unwrap() - .response - .body() - .errors[0] - .extensions - .get("code") - .unwrap(), - "REQUEST_RATE_LIMITED" - ); + let err = plugin + .as_any() + .downcast_ref::() + .unwrap() + .subgraph_service_internal("test", test_service.clone()) + .oneshot(SubgraphRequest::fake_builder().build()) + .await + .unwrap_err(); + + assert!(err.is::()); + assert!(plugin .as_any() .downcast_ref::() @@ -868,17 +864,16 @@ mod test { mock_service.expect_clone().returning(|| { let mut mock_service = MockSupergraphService::new(); - mock_service.expect_clone().returning(|| { - let mut mock_service = MockSupergraphService::new(); - mock_service.expect_call().times(0..2).returning(move |_| { - Ok(SupergraphResponse::fake_builder() - .data(json!({ "test": 1234_u32 })) - .build() - .unwrap()) - }); - mock_service + mock_service.expect_call().times(0..2).returning(move |_| { + Ok(SupergraphResponse::fake_builder() + .data(json!({ "test": 1234_u32 })) + .build() + .unwrap()) }); mock_service + .expect_clone() + .returning(MockSupergraphService::new); + mock_service }); assert!(plugin @@ -895,24 +890,15 @@ mod test { .errors .is_empty()); - assert_eq!( - plugin - .as_any() - .downcast_ref::() - .unwrap() - .supergraph_service_internal(mock_service.clone()) - .oneshot(SupergraphRequest::fake_builder().build().unwrap()) - .await - .unwrap() - .next_response() - .await - .unwrap() - .errors[0] - .extensions - .get("code") - .unwrap(), - "REQUEST_RATE_LIMITED" - ); + let err = plugin + .as_any() + .downcast_ref::() + .unwrap() + .supergraph_service_internal(mock_service.clone()) + .oneshot(SupergraphRequest::fake_builder().build().unwrap()) + .await + .unwrap_err(); + assert!(err.is::()); tokio::time::sleep(Duration::from_millis(300)).await; assert!(plugin .as_any() diff --git a/apollo-router/src/plugins/traffic_shaping/rate/service.rs b/apollo-router/src/plugins/traffic_shaping/rate/service.rs index f826723b0a..74dd5916af 100644 --- a/apollo-router/src/plugins/traffic_shaping/rate/service.rs +++ b/apollo-router/src/plugins/traffic_shaping/rate/service.rs @@ -7,7 +7,6 @@ use std::task::Poll; use std::time::SystemTime; use std::time::UNIX_EPOCH; -use futures::ready; use tower::Service; use super::future::ResponseFuture; @@ -74,7 +73,7 @@ where self.current_nb_requests.fetch_add(1, Ordering::SeqCst); - Poll::Ready(ready!(self.inner.poll_ready(cx)).map_err(Into::into)) + self.inner.poll_ready(cx).map_err(Into::into) } fn call(&mut self, request: Request) -> Self::Future { diff --git a/apollo-router/src/plugins/traffic_shaping/timeout/mod.rs b/apollo-router/src/plugins/traffic_shaping/timeout/mod.rs index 6b7cb9abce..8b68fb7fde 100644 --- a/apollo-router/src/plugins/traffic_shaping/timeout/mod.rs +++ b/apollo-router/src/plugins/traffic_shaping/timeout/mod.rs @@ -13,9 +13,8 @@ use std::task::Context; use std::task::Poll; use std::time::Duration; -use tower::util::Oneshot; +use futures::future::BoxFuture; use tower::Service; -use tower::ServiceExt; use self::future::ResponseFuture; pub(crate) use self::layer::TimeoutLayer; @@ -40,21 +39,26 @@ impl Timeout { impl Service for Timeout where S: Service + Clone, - S::Error: Into, + S::Error: Into + Send + Sync, + S::Future: Send + 'static, { type Response = S::Response; type Error = tower::BoxError; - type Future = ResponseFuture>; + type Future = BoxFuture<'static, Result>; - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { + self.inner.poll_ready(cx).map_err(Into::into) } fn call(&mut self, request: Request) -> Self::Future { - let service = self.inner.clone(); + let inner = self.inner.clone(); + let mut service = std::mem::replace(&mut self.inner, inner); - let response = service.oneshot(request); + let response = service.call(request); - ResponseFuture::new(response, Box::pin(tokio::time::sleep(self.timeout))) + Box::pin(ResponseFuture::new( + response, + Box::pin(tokio::time::sleep(self.timeout)), + )) } } diff --git a/apollo-router/src/query_planner/bridge_query_planner_pool.rs b/apollo-router/src/query_planner/bridge_query_planner_pool.rs index 722a965efd..2b8c5e59f3 100644 --- a/apollo-router/src/query_planner/bridge_query_planner_pool.rs +++ b/apollo-router/src/query_planner/bridge_query_planner_pool.rs @@ -31,8 +31,6 @@ use crate::services::QueryPlannerResponse; use crate::spec::Schema; use crate::Configuration; -static CHANNEL_SIZE: usize = 1_000; - #[derive(Clone)] pub(crate) struct BridgeQueryPlannerPool { js_planners: Vec>>, @@ -61,10 +59,12 @@ impl BridgeQueryPlannerPool { let mut join_set = JoinSet::new(); + // Note: To avoid excess queueing here, we limit our channels to exactly match the size of + // the pool. let (sender, receiver) = bounded::<( QueryPlannerRequest, oneshot::Sender>, - )>(CHANNEL_SIZE); + )>(size.into()); let mut old_js_planners_iterator = old_js_planners.into_iter(); diff --git a/apollo-router/src/services/hickory_dns_connector.rs b/apollo-router/src/services/hickory_dns_connector.rs index 70e26ab497..da5bc5a0c4 100644 --- a/apollo-router/src/services/hickory_dns_connector.rs +++ b/apollo-router/src/services/hickory_dns_connector.rs @@ -40,6 +40,7 @@ impl Service for AsyncHyperResolver { fn call(&mut self, name: Name) -> Self::Future { let resolver = self.0.clone(); + let resolver = std::mem::replace(&mut self.0, resolver); Box::pin(async move { Ok(resolver diff --git a/apollo-router/src/services/layers/persisted_queries/mod.rs b/apollo-router/src/services/layers/persisted_queries/mod.rs index 0444590958..6cf3460074 100644 --- a/apollo-router/src/services/layers/persisted_queries/mod.rs +++ b/apollo-router/src/services/layers/persisted_queries/mod.rs @@ -457,9 +457,8 @@ mod tests { assert!(incoming_request.supergraph_request.body().query.is_none()); let result = pq_layer.supergraph_request(incoming_request); - let request = result - .ok() - .expect("pq layer returned response instead of putting the query on the request"); + let request = + result.expect("pq layer returned response instead of putting the query on the request"); assert_eq!(request.supergraph_request.body().query, Some(body)); } @@ -490,9 +489,8 @@ mod tests { assert!(incoming_request.supergraph_request.body().query.is_none()); let result = pq_layer.supergraph_request(incoming_request); - let request = result - .ok() - .expect("pq layer returned response instead of continuing to APQ layer"); + let request = + result.expect("pq layer returned response instead of continuing to APQ layer"); assert!(request.supergraph_request.body().query.is_none()); } @@ -622,12 +620,10 @@ mod tests { // the operation. let updated_request = pq_layer .supergraph_request(incoming_request) - .ok() .expect("pq layer returned error response instead of returning a request"); query_analysis_layer .supergraph_request(updated_request) .await - .ok() .expect("QA layer returned error response instead of returning a request") } @@ -667,7 +663,6 @@ mod tests { pq_layer .supergraph_request_with_analyzed_query(request_with_analyzed_query) .await - .ok() .expect("pq layer second hook returned error response instead of returning a request"); } diff --git a/apollo-router/src/services/router/service.rs b/apollo-router/src/services/router/service.rs index 9e379ae3c4..6e26fcf38a 100644 --- a/apollo-router/src/services/router/service.rs +++ b/apollo-router/src/services/router/service.rs @@ -826,11 +826,7 @@ pub(crate) fn process_vary_header(headers: &mut HeaderMap) { #[derive(Clone)] pub(crate) struct RouterCreator { pub(crate) supergraph_creator: Arc, - static_page: StaticPageLayer, - apq_layer: APQLayer, - pub(crate) persisted_query_layer: Arc, - query_analysis_layer: QueryAnalysisLayer, - batching: Batching, + sb: Arc>, } impl ServiceFactory for RouterCreator { @@ -882,13 +878,53 @@ impl RouterCreator { // For now just call activate to make the gauges work on the happy path. apq_layer.activate(); - Ok(Self { - supergraph_creator, - static_page, + let router_service = content_negotiation::RouterLayer::default().layer(RouterService::new( + supergraph_creator.clone(), apq_layer, - query_analysis_layer, persisted_query_layer, - batching: configuration.batching.clone(), + query_analysis_layer, + configuration.batching.clone(), + )); + + // NOTE: This is very important code. This is where the client request load management + // ability of the router pipeline is provided. + let sb = ServiceBuilder::new() + .map_result(|result_arg| match result_arg { + Ok(arg) => match arg { + // Note: It might be interesting to look at the result and see if the bridge + // pool is full. + little_loadshedder::LoadShedResponse::Inner(inner) => Ok(inner), + little_loadshedder::LoadShedResponse::Overload => Err(BoxError::from( + tower::load_shed::error::Overloaded::default(), + )), + }, + Err(err) => Err(err), + }) + .layer(little_loadshedder::LoadShedLayer::new( + 0.90, + std::time::Duration::from_millis(2_500), + )) + // Note: Alternative solutions here. Either we use the little loadshedder for adaptive + // load shedding or we use timeout, concurrency limits and rate limits to achieve the + // same thing. + // .load_shed() + // .concurrency_limit(10_000) + // .timeout(std::time::Duration::from_secs(2)) + .buffer(50_000) + .layer(static_page.clone()) + // .rate_limit(50_000, std::time::Duration::from_secs(1)) + .service( + supergraph_creator + .plugins() + .iter() + .rev() + .fold(router_service.boxed(), |acc, (_, e)| e.router_service(acc)), + ) + .boxed_clone(); + + Ok(Self { + supergraph_creator, + sb: Arc::new(parking_lot::Mutex::new(sb)), }) } @@ -900,23 +936,7 @@ impl RouterCreator { Error = BoxError, Future = BoxFuture<'static, router::ServiceResult>, > + Send { - let router_service = content_negotiation::RouterLayer::default().layer(RouterService::new( - self.supergraph_creator.clone(), - self.apq_layer.clone(), - self.persisted_query_layer.clone(), - self.query_analysis_layer.clone(), - self.batching.clone(), - )); - - ServiceBuilder::new() - .layer(self.static_page.clone()) - .service( - self.supergraph_creator - .plugins() - .iter() - .rev() - .fold(router_service.boxed(), |acc, (_, e)| e.router_service(acc)), - ) + self.sb.lock().clone() } } diff --git a/apollo-router/src/services/supergraph.rs b/apollo-router/src/services/supergraph.rs index 3807cf859d..1cc3ed6e97 100644 --- a/apollo-router/src/services/supergraph.rs +++ b/apollo-router/src/services/supergraph.rs @@ -179,6 +179,14 @@ pub struct Response { pub context: Context, } +impl std::fmt::Debug for Response { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Response") + .field("context", &self.context) + .finish() + } +} + #[buildstructor::buildstructor] impl Response { /// This is the constructor (or builder) to use when constructing a real Response..