From 03477e87df19391ae8454fc865462d48aca92e79 Mon Sep 17 00:00:00 2001 From: PSeitz Date: Tue, 6 Feb 2024 22:05:23 +0100 Subject: [PATCH] support compressed data (#4506) * support compressed data closes #3990 * return 415 for unsupported encoding * manually set headers in test * return 400 on corrupted data --------- Co-authored-by: Paul Masurel --- quickwit/Cargo.lock | 2 + quickwit/quickwit-serve/Cargo.toml | 2 + quickwit/quickwit-serve/src/decompression.rs | 87 +++++++++++++++++++ .../src/elasticsearch_api/filter.rs | 3 +- .../src/ingest_api/rest_handler.rs | 5 +- quickwit/quickwit-serve/src/lib.rs | 1 + quickwit/quickwit-serve/src/rest.rs | 11 +++ quickwit/rest-api-tests/run_tests.py | 9 +- .../_setup.elasticsearch.yaml | 2 +- .../es_compatibility/_setup.quickwit.yaml | 2 +- 10 files changed, 112 insertions(+), 12 deletions(-) create mode 100644 quickwit/quickwit-serve/src/decompression.rs diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index d289332f6be..b8c48cd0db1 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -6285,6 +6285,7 @@ dependencies = [ "bytes", "bytesize", "elasticsearch-dsl", + "flate2", "futures", "futures-util", "hex", @@ -6339,6 +6340,7 @@ dependencies = [ "tracing-opentelemetry", "utoipa", "warp", + "zstd 0.13.0", ] [[package]] diff --git a/quickwit/quickwit-serve/Cargo.toml b/quickwit/quickwit-serve/Cargo.toml index 744fc648340..b59e73c3172 100644 --- a/quickwit/quickwit-serve/Cargo.toml +++ b/quickwit/quickwit-serve/Cargo.toml @@ -16,6 +16,7 @@ base64 = { workspace = true } bytes = { workspace = true } bytesize = { workspace = true } elasticsearch-dsl = "0.4.15" +flate2 = { workspace = true } futures = { workspace = true } futures-util = { workspace = true } hex = { workspace = true } @@ -46,6 +47,7 @@ tracing = { workspace = true } tracing-opentelemetry = { workspace = true } utoipa = { workspace = true } warp = { workspace = true } +zstd = { workspace = true } quickwit-actors = { workspace = true } quickwit-cluster = { workspace = true } diff --git a/quickwit/quickwit-serve/src/decompression.rs b/quickwit/quickwit-serve/src/decompression.rs new file mode 100644 index 00000000000..c0be92cb214 --- /dev/null +++ b/quickwit/quickwit-serve/src/decompression.rs @@ -0,0 +1,87 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::io::Read; + +use bytes::Bytes; +use flate2::read::GzDecoder; +use thiserror::Error; +use tokio::task; +use warp::reject::Reject; +use warp::Filter; + +/// There are two ways to decompress the body: +/// - Stream the body through an async decompressor +/// - Fetch the body and then decompress the bytes +/// +/// The first approach lowers the latency, while the second approach is more CPU efficient. +/// Ingesting data is usually CPU bound and there is considerable latency until the data is +/// searchable, so the second approach is more suitable for this use case. +async fn decompress_body(encoding: Option, body: Bytes) -> Result { + match encoding.as_deref() { + Some("gzip" | "x-gzip") => { + let decompressed = task::spawn_blocking(move || { + let mut decompressed = Vec::new(); + let mut decoder = GzDecoder::new(body.as_ref()); + decoder + .read_to_end(&mut decompressed) + .map_err(|_| warp::reject::custom(CorruptedData))?; + Result::<_, warp::Rejection>::Ok(Bytes::from(decompressed)) + }) + .await + .map_err(|_| warp::reject::custom(CorruptedData))??; + Ok(decompressed) + } + Some("zstd") => { + let decompressed = task::spawn_blocking(move || { + zstd::decode_all(body.as_ref()) + .map(Bytes::from) + .map_err(|_| warp::reject::custom(CorruptedData)) + }) + .await + .map_err(|_| warp::reject::custom(CorruptedData))??; + Ok(decompressed) + } + Some(encoding) => Err(warp::reject::custom(UnsupportedEncoding( + encoding.to_string(), + ))), + _ => Ok(body), + } +} + +#[derive(Debug, Error)] +#[error("Error while decompressing the data")] +pub(crate) struct CorruptedData; + +impl Reject for CorruptedData {} + +#[derive(Debug, Error)] +#[error("Unsupported Content-Encoding {}. Supported encodings are 'gzip' and 'zstd'", self.0)] +pub(crate) struct UnsupportedEncoding(String); + +impl Reject for UnsupportedEncoding {} + +/// Custom filter for optional decompression +pub(crate) fn get_body_bytes() -> impl Filter + Clone { + warp::header::optional("content-encoding") + .and(warp::body::bytes()) + .and_then(|encoding: Option, body: Bytes| async move { + decompress_body(encoding, body).await + }) +} diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/filter.rs b/quickwit/quickwit-serve/src/elasticsearch_api/filter.rs index 4ea8e12e757..3825f97939f 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/filter.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/filter.rs @@ -27,6 +27,7 @@ use super::model::{ CatIndexQueryParams, FieldCapabilityQueryParams, FieldCapabilityRequestBody, MultiSearchQueryParams, SearchQueryParamsCount, }; +use crate::decompression::get_body_bytes; use crate::elasticsearch_api::model::{ ElasticBulkOptions, ScrollQueryParams, SearchBody, SearchQueryParams, }; @@ -76,7 +77,7 @@ pub(crate) fn elastic_bulk_filter( .and(warp::body::content_length_limit( CONTENT_LENGTH_LIMIT.as_u64(), )) - .and(warp::body::bytes()) + .and(get_body_bytes()) .and(serde_qs::warp::query(serde_qs::Config::default())) } diff --git a/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs b/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs index fa958abae4b..828b3556ba3 100644 --- a/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs @@ -32,6 +32,7 @@ use serde::Deserialize; use thiserror::Error; use warp::{Filter, Rejection}; +use crate::decompression::get_body_bytes; use crate::format::extract_format_from_qs; use crate::rest_api_response::into_rest_api_response; use crate::{with_arg, BodyFormat}; @@ -80,7 +81,7 @@ fn ingest_filter( .and(warp::body::content_length_limit( config.content_length_limit.as_u64(), )) - .and(warp::body::bytes()) + .and(get_body_bytes()) .and(serde_qs::warp::query::( serde_qs::Config::default(), )) @@ -104,7 +105,7 @@ fn ingest_v2_filter( .and(warp::body::content_length_limit( config.content_length_limit.as_u64(), )) - .and(warp::body::bytes()) + .and(get_body_bytes()) .and(serde_qs::warp::query::( serde_qs::Config::default(), )) diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index a44495f9731..16d68d0f3bc 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -20,6 +20,7 @@ mod build_info; mod cluster_api; mod debugging_api; +mod decompression; mod delete_task_api; mod elasticsearch_api; mod format; diff --git a/quickwit/quickwit-serve/src/rest.rs b/quickwit/quickwit-serve/src/rest.rs index a4265167ba8..d80c87377eb 100644 --- a/quickwit/quickwit-serve/src/rest.rs +++ b/quickwit/quickwit-serve/src/rest.rs @@ -34,6 +34,7 @@ use warp::{redirect, Filter, Rejection, Reply}; use crate::cluster_api::cluster_handler; use crate::debugging_api::debugging_handler; +use crate::decompression::{CorruptedData, UnsupportedEncoding}; use crate::delete_task_api::delete_task_api_handlers; use crate::elasticsearch_api::elastic_api_handlers; use crate::health_check_api::health_check_handlers; @@ -273,6 +274,16 @@ fn get_status_with_error(rejection: Rejection) -> RestApiError { service_code: ServiceErrorCode::UnsupportedMediaType, message: error.to_string(), } + } else if let Some(error) = rejection.find::() { + RestApiError { + service_code: ServiceErrorCode::UnsupportedMediaType, + message: error.to_string(), + } + } else if let Some(error) = rejection.find::() { + RestApiError { + service_code: ServiceErrorCode::BadRequest, + message: error.to_string(), + } } else if let Some(error) = rejection.find::() { RestApiError { service_code: ServiceErrorCode::BadRequest, diff --git a/quickwit/rest-api-tests/run_tests.py b/quickwit/rest-api-tests/run_tests.py index 5b3fa39fab5..5e65e6668ff 100755 --- a/quickwit/rest-api-tests/run_tests.py +++ b/quickwit/rest-api-tests/run_tests.py @@ -52,12 +52,6 @@ def run_step(step, previous_result): time.sleep(step["sleep_after"]) return result -def load_data(path): - if path.endswith("gz"): - return gzip.open(path, 'rb').read() - else: - return open(path, 'rb').read() - def run_request_with_retry(run_req, expected_status_code=None, num_retries=10, wait_time=0.5): for try_number in range(num_retries + 1): r = run_req() @@ -100,7 +94,8 @@ def run_request_step(method, step, previous_result): body_from_file = step.get("body_from_file", None) if body_from_file is not None: body_from_file = osp.join(step["cwd"], body_from_file) - kvargs["data"] = load_data(body_from_file) + kvargs["data"] = open(body_from_file, 'rb').read() + kvargs = resolve_previous_result(kvargs, previous_result) ndjson = step.get("ndjson", None) if ndjson is not None: diff --git a/quickwit/rest-api-tests/scenarii/es_compatibility/_setup.elasticsearch.yaml b/quickwit/rest-api-tests/scenarii/es_compatibility/_setup.elasticsearch.yaml index 1e29d7ffead..12390f6bade 100644 --- a/quickwit/rest-api-tests/scenarii/es_compatibility/_setup.elasticsearch.yaml +++ b/quickwit/rest-api-tests/scenarii/es_compatibility/_setup.elasticsearch.yaml @@ -102,6 +102,6 @@ method: POST endpoint: _bulk params: refresh: "true" -headers: {"Content-Type": "application/json"} +headers: {"Content-Type": "application/json", "content-encoding": "gzip"} body_from_file: gharchive-bulk.json.gz diff --git a/quickwit/rest-api-tests/scenarii/es_compatibility/_setup.quickwit.yaml b/quickwit/rest-api-tests/scenarii/es_compatibility/_setup.quickwit.yaml index bdc10d1249b..2e227090a9c 100644 --- a/quickwit/rest-api-tests/scenarii/es_compatibility/_setup.quickwit.yaml +++ b/quickwit/rest-api-tests/scenarii/es_compatibility/_setup.quickwit.yaml @@ -56,6 +56,6 @@ endpoint: _bulk num_retries: 10 params: refresh: "true" -headers: {"Content-Type": "application/json"} +headers: {"Content-Type": "application/json", "content-encoding": "gzip"} body_from_file: gharchive-bulk.json.gz sleep_after: 3