Skip to content

Commit be099d6

Browse files
authored
Enable compression for gRPC and HTTP endpoint (#671)
1 parent 4553430 commit be099d6

File tree

14 files changed

+99
-25
lines changed

14 files changed

+99
-25
lines changed

Cargo.lock

Lines changed: 28 additions & 6 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ members = [
3131
resolver = "2"
3232

3333
[workspace.dependencies]
34-
tonic = { version = "0.11" }
34+
tonic = { version = "0.11", features = ["zstd"] }
3535
tonic-build = { version = "0.11" }
3636
tonic-web = { version = "0.11" }
3737
tonic-reflection = { version = "0.11" }

crates/arroyo-api/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ bincode = { version = "2.0.0-rc.3", features = ["serde"]}
4040
petgraph = {version = "0.6", features = ["serde-1"]}
4141

4242
http = "0.2"
43-
tower-http = {version = "0.4", features = ["trace", "fs", "cors", "validate-request", "auth"]}
43+
tower-http = {version = "0.4", features = ["trace", "fs", "cors", "validate-request", "auth", "compression-zstd"]}
4444
axum = {version = "0.6.20", features = ["headers", "tokio", "macros"]}
4545
axum-extra = "0.7.4"
4646
h2 = "0.3.26"

crates/arroyo-api/src/jobs.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -482,6 +482,7 @@ pub async fn get_job_output(
482482
};
483483

484484
let output_data: OutputData = v.into();
485+
485486
let e = Ok(Event::default()
486487
.json_data(output_data)
487488
.unwrap()

crates/arroyo-api/src/lib.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ use serde::{Deserialize, Serialize};
66
use std::net::{SocketAddr, TcpListener};
77
use time::OffsetDateTime;
88
use tonic::transport::Channel;
9+
use tower_http::compression::predicate::NotForContentType;
10+
use tower_http::compression::{CompressionLayer, DefaultPredicate, Predicate};
911
use tracing::{error, info};
1012
use utoipa::OpenApi;
1113

@@ -125,7 +127,14 @@ pub fn start_server(database: DatabaseSource, guard: ShutdownGuard) -> anyhow::R
125127
let listener = TcpListener::bind(addr)?;
126128
let local_addr = listener.local_addr()?;
127129

128-
let app = rest::create_rest_app(database, &config.controller_endpoint());
130+
let app = rest::create_rest_app(database, &config.controller_endpoint()).layer(
131+
CompressionLayer::new().zstd(true).compress_when(
132+
DefaultPredicate::new()
133+
// compression doesn't work for server-sent events
134+
// (https://github.com/tokio-rs/axum/discussions/2034)
135+
.and(NotForContentType::new("text/event-stream")),
136+
),
137+
);
129138

130139
info!("Starting API server on {:?}", local_addr);
131140
guard.into_spawn_task(wrap_start(

crates/arroyo-api/src/metrics.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ use arroyo_rpc::api_types::metrics::OperatorMetricGroup;
88
use arroyo_rpc::api_types::OperatorMetricGroupCollection;
99
use arroyo_rpc::grpc::rpc::controller_grpc_client::ControllerGrpcClient;
1010
use arroyo_rpc::grpc::rpc::JobMetricsReq;
11+
use tonic::codec::CompressionEncoding;
12+
use tonic::transport::Channel;
1113
use tonic::Code;
1214

1315
/// Get a job's metrics
@@ -38,10 +40,15 @@ pub async fn get_operator_metric_groups(
3840
)
3941
.await?;
4042

41-
let mut controller = ControllerGrpcClient::connect(state.controller_addr)
43+
let channel = Channel::builder(state.controller_addr.parse().unwrap())
44+
.connect()
4245
.await
4346
.map_err(log_and_map)?;
4447

48+
let mut controller = ControllerGrpcClient::new(channel)
49+
.accept_compressed(CompressionEncoding::Zstd)
50+
.send_compressed(CompressionEncoding::Zstd);
51+
4552
let data = match controller
4653
.job_metrics(JobMetricsReq { job_id: job.id })
4754
.await

crates/arroyo-controller/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ prometheus = "0.13"
3333
async-trait = "0.1"
3434
lazy_static = "1.4.0"
3535
chrono = "0.4"
36+
zstd = "0.13"
3637

3738
arrow-schema = {workspace = true}
3839

crates/arroyo-controller/src/lib.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use tokio::sync::mpsc::error::TrySendError;
3636
use tokio::sync::mpsc::Sender;
3737
use tokio::sync::RwLock;
3838
use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream};
39+
use tonic::codec::CompressionEncoding;
3940
use tonic::{Request, Response, Status};
4041
use tracing::{debug, info, warn};
4142

@@ -387,7 +388,7 @@ impl ControllerGrpc for ControllerServer {
387388
remove.insert(i);
388389
}
389390
Err(TrySendError::Full(_)) => {
390-
warn!("queue full");
391+
debug!("queue full");
391392
}
392393
}
393394
}
@@ -455,6 +456,7 @@ impl ControllerGrpc for ControllerServer {
455456
.ok_or_else(|| Status::not_found("No metrics for job"))?
456457
.clone();
457458

459+
// TODO: send this over in a more efficient format like protobuf
458460
Ok(Response::new(JobMetricsResp {
459461
metrics: serde_json::to_string(&metrics.get_groups().await).unwrap(),
460462
}))
@@ -625,7 +627,11 @@ impl ControllerServer {
625627
local_addr,
626628
arroyo_server_common::grpc_server()
627629
.accept_http1(true)
628-
.add_service(ControllerGrpcServer::new(self.clone()))
630+
.add_service(
631+
ControllerGrpcServer::new(self.clone())
632+
.send_compressed(CompressionEncoding::Zstd)
633+
.accept_compressed(CompressionEncoding::Zstd),
634+
)
629635
.add_service(reflection)
630636
.serve_with_incoming(TcpListenerStream::new(listener)),
631637
));

crates/arroyo-node/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ impl NodeServer {
103103
.env(JOB_ID_ENV, req.job_id.clone())
104104
.env("ARROYO__WORKER__TASK_SLOTS", format!("{}", slots))
105105
.env(RUN_ID_ENV, format!("{}", req.run_id))
106+
.env("ARROYO__ADMIN__HTTP_PORT", "0")
106107
.kill_on_drop(true)
107108
.spawn()
108109
.map_err(|e| Status::internal(format!("Failed to start worker: {:?}", e)))?;

webui/src/components/OperatorDetail.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ const OperatorDetail: React.FC<OperatorDetailProps> = ({ pipelineId, jobId, oper
3030
return (
3131
<Alert status="warning">
3232
<AlertIcon />
33-
<AlertDescription>Failed to get job's metrics.</AlertDescription>
33+
<AlertDescription>Failed to get job metrics.</AlertDescription>
3434
</Alert>
3535
);
3636
}

0 commit comments

Comments
 (0)