Skip to content

Commit

Permalink
support compressed data (#4506)
Browse files Browse the repository at this point in the history
* support compressed data

closes #3990

* return 415 for unsupported encoding

* manually set headers in test

* return 400 on corrupted data

---------

Co-authored-by: Paul Masurel <[email protected]>
  • Loading branch information
PSeitz and fulmicoton authored Feb 6, 2024
1 parent aad9b34 commit 03477e8
Show file tree
Hide file tree
Showing 10 changed files with 112 additions and 12 deletions.
2 changes: 2 additions & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions quickwit/quickwit-serve/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ base64 = { workspace = true }
bytes = { workspace = true }
bytesize = { workspace = true }
elasticsearch-dsl = "0.4.15"
flate2 = { workspace = true }
futures = { workspace = true }
futures-util = { workspace = true }
hex = { workspace = true }
Expand Down Expand Up @@ -46,6 +47,7 @@ tracing = { workspace = true }
tracing-opentelemetry = { workspace = true }
utoipa = { workspace = true }
warp = { workspace = true }
zstd = { workspace = true }

quickwit-actors = { workspace = true }
quickwit-cluster = { workspace = true }
Expand Down
87 changes: 87 additions & 0 deletions quickwit/quickwit-serve/src/decompression.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright (C) 2024 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at [email protected].
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::io::Read;

use bytes::Bytes;
use flate2::read::GzDecoder;
use thiserror::Error;
use tokio::task;
use warp::reject::Reject;
use warp::Filter;

/// There are two ways to decompress the body:
/// - Stream the body through an async decompressor
/// - Fetch the body and then decompress the bytes
///
/// The first approach lowers the latency, while the second approach is more CPU efficient.
/// Ingesting data is usually CPU bound and there is considerable latency until the data is
/// searchable, so the second approach is more suitable for this use case.
async fn decompress_body(encoding: Option<String>, body: Bytes) -> Result<Bytes, warp::Rejection> {
match encoding.as_deref() {
Some("gzip" | "x-gzip") => {
let decompressed = task::spawn_blocking(move || {
let mut decompressed = Vec::new();
let mut decoder = GzDecoder::new(body.as_ref());
decoder
.read_to_end(&mut decompressed)
.map_err(|_| warp::reject::custom(CorruptedData))?;
Result::<_, warp::Rejection>::Ok(Bytes::from(decompressed))
})
.await
.map_err(|_| warp::reject::custom(CorruptedData))??;
Ok(decompressed)
}
Some("zstd") => {
let decompressed = task::spawn_blocking(move || {
zstd::decode_all(body.as_ref())
.map(Bytes::from)
.map_err(|_| warp::reject::custom(CorruptedData))
})
.await
.map_err(|_| warp::reject::custom(CorruptedData))??;
Ok(decompressed)
}
Some(encoding) => Err(warp::reject::custom(UnsupportedEncoding(
encoding.to_string(),
))),
_ => Ok(body),
}
}

#[derive(Debug, Error)]
#[error("Error while decompressing the data")]
pub(crate) struct CorruptedData;

impl Reject for CorruptedData {}

#[derive(Debug, Error)]
#[error("Unsupported Content-Encoding {}. Supported encodings are 'gzip' and 'zstd'", self.0)]
pub(crate) struct UnsupportedEncoding(String);

impl Reject for UnsupportedEncoding {}

/// Custom filter for optional decompression
pub(crate) fn get_body_bytes() -> impl Filter<Extract = (Bytes,), Error = warp::Rejection> + Clone {
warp::header::optional("content-encoding")
.and(warp::body::bytes())
.and_then(|encoding: Option<String>, body: Bytes| async move {
decompress_body(encoding, body).await
})
}
3 changes: 2 additions & 1 deletion quickwit/quickwit-serve/src/elasticsearch_api/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use super::model::{
CatIndexQueryParams, FieldCapabilityQueryParams, FieldCapabilityRequestBody,
MultiSearchQueryParams, SearchQueryParamsCount,
};
use crate::decompression::get_body_bytes;
use crate::elasticsearch_api::model::{
ElasticBulkOptions, ScrollQueryParams, SearchBody, SearchQueryParams,
};
Expand Down Expand Up @@ -76,7 +77,7 @@ pub(crate) fn elastic_bulk_filter(
.and(warp::body::content_length_limit(
CONTENT_LENGTH_LIMIT.as_u64(),
))
.and(warp::body::bytes())
.and(get_body_bytes())
.and(serde_qs::warp::query(serde_qs::Config::default()))
}

Expand Down
5 changes: 3 additions & 2 deletions quickwit/quickwit-serve/src/ingest_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use serde::Deserialize;
use thiserror::Error;
use warp::{Filter, Rejection};

use crate::decompression::get_body_bytes;
use crate::format::extract_format_from_qs;
use crate::rest_api_response::into_rest_api_response;
use crate::{with_arg, BodyFormat};
Expand Down Expand Up @@ -80,7 +81,7 @@ fn ingest_filter(
.and(warp::body::content_length_limit(
config.content_length_limit.as_u64(),
))
.and(warp::body::bytes())
.and(get_body_bytes())
.and(serde_qs::warp::query::<IngestOptions>(
serde_qs::Config::default(),
))
Expand All @@ -104,7 +105,7 @@ fn ingest_v2_filter(
.and(warp::body::content_length_limit(
config.content_length_limit.as_u64(),
))
.and(warp::body::bytes())
.and(get_body_bytes())
.and(serde_qs::warp::query::<IngestOptions>(
serde_qs::Config::default(),
))
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-serve/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
mod build_info;
mod cluster_api;
mod debugging_api;
mod decompression;
mod delete_task_api;
mod elasticsearch_api;
mod format;
Expand Down
11 changes: 11 additions & 0 deletions quickwit/quickwit-serve/src/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use warp::{redirect, Filter, Rejection, Reply};

use crate::cluster_api::cluster_handler;
use crate::debugging_api::debugging_handler;
use crate::decompression::{CorruptedData, UnsupportedEncoding};
use crate::delete_task_api::delete_task_api_handlers;
use crate::elasticsearch_api::elastic_api_handlers;
use crate::health_check_api::health_check_handlers;
Expand Down Expand Up @@ -273,6 +274,16 @@ fn get_status_with_error(rejection: Rejection) -> RestApiError {
service_code: ServiceErrorCode::UnsupportedMediaType,
message: error.to_string(),
}
} else if let Some(error) = rejection.find::<UnsupportedEncoding>() {
RestApiError {
service_code: ServiceErrorCode::UnsupportedMediaType,
message: error.to_string(),
}
} else if let Some(error) = rejection.find::<CorruptedData>() {
RestApiError {
service_code: ServiceErrorCode::BadRequest,
message: error.to_string(),
}
} else if let Some(error) = rejection.find::<warp::reject::InvalidQuery>() {
RestApiError {
service_code: ServiceErrorCode::BadRequest,
Expand Down
9 changes: 2 additions & 7 deletions quickwit/rest-api-tests/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,6 @@ def run_step(step, previous_result):
time.sleep(step["sleep_after"])
return result

def load_data(path):
if path.endswith("gz"):
return gzip.open(path, 'rb').read()
else:
return open(path, 'rb').read()

def run_request_with_retry(run_req, expected_status_code=None, num_retries=10, wait_time=0.5):
for try_number in range(num_retries + 1):
r = run_req()
Expand Down Expand Up @@ -100,7 +94,8 @@ def run_request_step(method, step, previous_result):
body_from_file = step.get("body_from_file", None)
if body_from_file is not None:
body_from_file = osp.join(step["cwd"], body_from_file)
kvargs["data"] = load_data(body_from_file)
kvargs["data"] = open(body_from_file, 'rb').read()

kvargs = resolve_previous_result(kvargs, previous_result)
ndjson = step.get("ndjson", None)
if ndjson is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,6 @@ method: POST
endpoint: _bulk
params:
refresh: "true"
headers: {"Content-Type": "application/json"}
headers: {"Content-Type": "application/json", "content-encoding": "gzip"}
body_from_file: gharchive-bulk.json.gz

Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,6 @@ endpoint: _bulk
num_retries: 10
params:
refresh: "true"
headers: {"Content-Type": "application/json"}
headers: {"Content-Type": "application/json", "content-encoding": "gzip"}
body_from_file: gharchive-bulk.json.gz
sleep_after: 3

0 comments on commit 03477e8

Please sign in to comment.