Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
289 changes: 157 additions & 132 deletions Cargo.lock

Large diffs are not rendered by default.

11 changes: 7 additions & 4 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ skip = [

{ name = "hyper-timeout", version = "0.4.1" },

{ name = "tungstenite", version = "0.24.0" },
{ name = "tokio-tungstenite", version = "0.24.0" },
{ name = "tungstenite", version = "0.27.0" },
{ name = "tokio-tungstenite", version = "0.27.0" },

# `axum 0.7.5` depends on both `sync_wrapper 1.*` and `axum-core 0.4.3`.
# The latter depends on `sync_wrapper 0.1.*`.
Expand Down Expand Up @@ -168,8 +168,11 @@ skip = [
{ name = "phf_shared", version = "0.11.3" },
{ name = "phf_generator", version = "0.11.2" },
{ name = "phf_codegen", version = "0.11.3" },
# multer
{ name = "spin", version = "0.9.8" },
# axum
{ name = "matchit", version = "0.7.0" },
# console-subscriber
{ name = "axum", version = "0.7.9" },
{ name = "axum-core", version = "0.4.5" },
]

[[bans.deny]]
Expand Down
2 changes: 1 addition & 1 deletion src/balancerd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ workspace = true
[dependencies]
anyhow = "1.0.100"
async-trait = "0.1.89"
axum = "0.7.5"
axum = "0.8.6"
bytes = "1.10.1"
bytesize = "2.1.0"
chrono = { version = "0.4.39", default-features = false, features = ["std"] }
Expand Down
2 changes: 1 addition & 1 deletion src/clusterd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ workspace = true

[dependencies]
anyhow = "1.0.100"
axum = "0.7.5"
axum = "0.8.6"
clap = { version = "4.5.23", features = ["derive", "env"] }
fail = { version = "0.5.1", features = ["failpoints"] }
futures = "0.3.31"
Expand Down
6 changes: 3 additions & 3 deletions src/environmentd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ workspace = true
anyhow = "1.0.100"
askama = { version = "0.12.1", default-features = false, features = ["config", "serde-json"] }
async-trait = "0.1.89"
axum = { version = "0.7.5", features = ["ws"] }
axum-extra = { version = "0.9.3", features = ["typed-header"] }
axum = { version = "0.8.6", features = ["ws"] }
axum-extra = { version = "0.10.3", features = ["typed-header"] }
base64 = "0.22.1"
bytes = "1.10.1"
bytesize = "2.1.0"
Expand Down Expand Up @@ -124,7 +124,7 @@ tracing = "0.1.37"
tracing-capture = { version = "0.1.0", optional = true }
tracing-opentelemetry = { version = "0.25.0" }
tracing-subscriber = "0.3.19"
tungstenite = { version = "0.24.0" }
tungstenite = { version = "0.28.0" }
url = "2.3.1"
uuid = "1.18.1"
workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true }
Expand Down
8 changes: 3 additions & 5 deletions src/environmentd/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use std::sync::Arc;
use std::time::{Duration, SystemTime};

use anyhow::Context;
use async_trait::async_trait;
use axum::error_handling::HandleErrorLayer;
use axum::extract::ws::{Message, WebSocket};
use axum::extract::{ConnectInfo, DefaultBodyLimit, FromRequestParts, Query, Request, State};
Expand Down Expand Up @@ -207,7 +206,7 @@ impl HttpServer {
"/hierarchical-memory",
routing::get(memory::handle_hierarchical_memory),
)
.route("/static/*path", routing::get(root::handle_static));
.route("/static/{*path}", routing::get(root::handle_static));

let mut ws_router = Router::new()
.route("/api/experimental/sql", routing::get(sql::handle_sql_ws))
Expand All @@ -230,7 +229,7 @@ impl HttpServer {
if routes_enabled.webhook {
let webhook_router = Router::new()
.route(
"/api/webhook/:database/:schema/:id",
"/api/webhook/{:database}/{:schema}/{:id}",
routing::post(webhook::handle_webhook),
)
.with_state(WebhookState {
Expand Down Expand Up @@ -315,7 +314,7 @@ impl HttpServer {
routing::get(|| async { Redirect::temporary("/internal-console/") }),
)
.route(
"/internal-console/*path",
"/internal-console/{*path}",
routing::get(console::handle_internal_console),
)
.route(
Expand Down Expand Up @@ -606,7 +605,6 @@ impl AuthedClient {
}
}

#[async_trait]
impl<S> FromRequestParts<S> for AuthedClient
where
S: Send + Sync,
Expand Down
9 changes: 4 additions & 5 deletions src/environmentd/src/http/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::borrow::Cow;
use std::collections::BTreeMap;
use std::net::{IpAddr, SocketAddr};
use std::pin::pin;
Expand Down Expand Up @@ -354,7 +353,7 @@ async fn run_ws(state: &WsState, user: Option<AuthedUser>, peer_addr: IpAddr, mu
// are safe to return because they're generated after authentication.
debug!("WS request failed init: {}", e);
let reason = match e.downcast_ref::<AdapterError>() {
Some(error) => Cow::Owned(error.to_string()),
Some(error) => error.to_string().into(),
None => "unauthorized".into(),
};
let _ = ws
Expand Down Expand Up @@ -385,7 +384,7 @@ async fn run_ws(state: &WsState, user: Option<AuthedUser>, peer_addr: IpAddr, mu
));
for msg in msgs {
let _ = ws
.send(Message::Text(
.send(Message::text(
serde_json::to_string(&msg).expect("must serialize"),
))
.await;
Expand Down Expand Up @@ -491,7 +490,7 @@ async fn run_ws_request(
/// Sends a single [`WebSocketResponse`] over the provided [`WebSocket`].
async fn send_ws_response(ws: &mut WebSocket, resp: WebSocketResponse) -> Result<(), Error> {
let msg = serde_json::to_string(&resp).unwrap();
let msg = Message::Text(msg);
let msg = Message::text(msg);
ws.send(msg).await?;

Ok(())
Expand Down Expand Up @@ -1104,7 +1103,7 @@ impl ResultSender for WebSocket {
tick.tick().await;
loop {
tick.tick().await;
if let Err(err) = self.send(Message::Ping(Vec::new())).await {
if let Err(err) = self.send(Message::Ping(Default::default())).await {
return err.into();
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/environmentd/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1503,7 +1503,7 @@ pub fn auth_with_ws(
}
auth_with_ws_impl(
ws,
Message::Text(
Message::text(
serde_json::to_string(&WebSocketAuth::Basic {
user: "materialize".into(),
password: "".into(),
Expand Down
12 changes: 6 additions & 6 deletions src/environmentd/tests/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -355,11 +355,11 @@ async fn run_tests<'a>(header: &str, server: &test_util::TestServer, tests: &[Te
let stream = make_ws_tls(&uri, configure);
let (mut ws, _resp) = tungstenite::client(uri, stream).unwrap();

ws.send(Message::Text(serde_json::to_string(&auth).unwrap()))
ws.send(Message::text(serde_json::to_string(&auth).unwrap()))
.unwrap();

ws.send(Message::Text(
r#"{"query": "SELECT pg_catalog.current_user()"}"#.into(),
ws.send(Message::text(
r#"{"query": "SELECT pg_catalog.current_user()"}"#,
))
.unwrap();

Expand Down Expand Up @@ -3482,7 +3482,7 @@ async fn test_password_auth_http() {
.unwrap();

let query = r#"{"query":"SELECT current_user"}"#;
let ws_options_msg = Message::Text(r#"{"options": {}}"#.to_owned());
let ws_options_msg = Message::text(r#"{"options": {}}"#);

let http_client = hyper_util::client::legacy::Client::builder(TokioExecutor::new())
.pool_idle_timeout(Duration::from_secs(10))
Expand Down Expand Up @@ -3511,7 +3511,7 @@ async fn test_password_auth_http() {
ws.read().unwrap(),
Message::Close(Some(CloseFrame {
code: CloseCode::Protocol,
reason: Cow::Borrowed("unauthorized"),
reason: "unauthorized".into(),
})),
);

Expand Down Expand Up @@ -3577,7 +3577,7 @@ async fn test_password_auth_http() {
let msg: WebSocketResponse = serde_json::from_str(&msg).unwrap();
match msg {
WebSocketResponse::ReadyForQuery(_) => {
ws.send(Message::Text(query.to_owned())).unwrap();
ws.send(Message::text(query)).unwrap();
}
WebSocketResponse::Row(rows) => {
assert_eq!(&rows, &[serde_json::Value::from("mz_system".to_owned())]);
Expand Down
49 changes: 26 additions & 23 deletions src/environmentd/tests/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -914,8 +914,8 @@ fn test_http_sql() {

f.run(|tc| {
let msg = match tc.directive.as_str() {
"ws-text" => Message::Text(tc.input.clone()),
"ws-binary" => Message::Binary(tc.input.as_bytes().to_vec()),
"ws-text" => Message::text(&tc.input),
"ws-binary" => Message::Binary(tc.input.clone().into()),
"http" => {
let json: serde_json::Value = serde_json::from_str(&tc.input).unwrap();
let res = Client::new()
Expand All @@ -938,12 +938,12 @@ fn test_http_sql() {
loop {
let resp = ws.read().unwrap();
match resp {
Message::Text(mut msg) => {
if fixtimestamp {
msg = fixtimestamp_re
.replace_all(&msg, fixtimestamp_replace)
.into();
}
Message::Text(msg) => {
let msg = if fixtimestamp {
fixtimestamp_re.replace_all(&msg, fixtimestamp_replace)
} else {
msg.as_str().into()
};
let msg: WebSocketResponse = serde_json::from_str(&msg).unwrap();
write!(&mut responses, "{}\n", serde_json::to_string(&msg).unwrap())
.unwrap();
Expand Down Expand Up @@ -1742,7 +1742,7 @@ fn test_max_request_size() {
let json =
format!("{{\"queries\":[{{\"query\":\"{statement}\",\"params\":[\"{param}\"]}}]}}");
let json: serde_json::Value = serde_json::from_str(&json).unwrap();
ws.send(Message::Text(json.to_string())).unwrap();
ws.send(Message::text(json.to_string())).unwrap();

// The specific error isn't forwarded to the client, the connection is just closed.
let err = ws.read().unwrap_err();
Expand Down Expand Up @@ -1819,7 +1819,7 @@ fn test_max_statement_batch_size() {
test_util::auth_with_ws(&mut ws, BTreeMap::default()).unwrap();
let json = format!("{{\"query\":\"{statements}\"}}");
let json: serde_json::Value = serde_json::from_str(&json).unwrap();
ws.send(Message::Text(json.to_string())).unwrap();
ws.send(Message::text(json.to_string())).unwrap();

// Discard the CommandStarting message
let _ = ws.read().unwrap();
Expand Down Expand Up @@ -1875,7 +1875,7 @@ fn test_ws_passes_options() {
// set from the options map we passed with the auth.
let json = "{\"query\":\"SHOW application_name;\"}";
let json: serde_json::Value = serde_json::from_str(json).unwrap();
ws.send(Message::Text(json.to_string())).unwrap();
ws.send(Message::text(json.to_string())).unwrap();

let mut read_msg = || -> WebSocketResponse {
let msg = ws.read().unwrap();
Expand Down Expand Up @@ -1930,7 +1930,7 @@ fn test_ws_subscribe_no_crash() {
let query = "SUBSCRIBE (SELECT 1)";
let json = format!("{{\"query\":\"{query}\"}}");
let json: serde_json::Value = serde_json::from_str(&json).unwrap();
ws.send(Message::Text(json.to_string())).unwrap();
ws.send(Message::text(json.to_string())).unwrap();

// Give the server time to crash, if it's going to.
std::thread::sleep(Duration::from_secs(1))
Expand Down Expand Up @@ -2143,7 +2143,7 @@ fn test_max_connections_on_all_interfaces() {
test_util::auth_with_ws(&mut ws, BTreeMap::default()).unwrap();
let json = format!("{{\"query\":\"{query}\"}}");
let json: serde_json::Value = serde_json::from_str(&json).unwrap();
ws.send(Message::Text(json.to_string())).unwrap();
ws.send(Message::text(json.to_string())).unwrap();

// The specific error isn't forwarded to the client, the connection is just closed.
match ws.read() {
Expand All @@ -2154,13 +2154,13 @@ fn test_max_connections_on_all_interfaces() {
);
assert_eq!(
ws.read().unwrap(),
Message::Text(format!(
Message::text(format!(
r#"{{"type":"Rows","payload":{{"columns":[{{"name":"{UNKNOWN_COLUMN_NAME}","type_oid":23,"type_len":4,"type_mod":-1}}]}}}}"#
))
);
assert_eq!(
ws.read().unwrap(),
Message::Text("{\"type\":\"Row\",\"payload\":[\"1\"]}".to_string())
Message::text("{\"type\":\"Row\",\"payload\":[\"1\"]}")
);
tracing::info!("data: {:?}", ws.read().unwrap());
}
Expand Down Expand Up @@ -2593,15 +2593,15 @@ fn test_internal_ws_auth() {
// Auth with OptionsOnly
test_util::auth_with_ws_impl(
&mut ws,
Message::Text(serde_json::to_string(&WebSocketAuth::OptionsOnly { options }).unwrap()),
Message::text(serde_json::to_string(&WebSocketAuth::OptionsOnly { options }).unwrap()),
)
.unwrap();

// Query to make sure we get back the correct user, which should be
// set from the headers passed with the websocket request.
let json = "{\"query\":\"SELECT current_user;\"}";
let json: serde_json::Value = serde_json::from_str(json).unwrap();
ws.send(Message::Text(json.to_string())).unwrap();
ws.send(Message::text(json.to_string())).unwrap();

let mut read_msg = || -> WebSocketResponse {
let msg = ws.read().unwrap();
Expand Down Expand Up @@ -2768,7 +2768,7 @@ fn test_cancel_ws() {
test_util::auth_with_ws(&mut ws, BTreeMap::default()).unwrap();
let json = r#"{"queries":[{"query":"SUBSCRIBE t"}]}"#;
let json: serde_json::Value = serde_json::from_str(json).unwrap();
ws.send(Message::Text(json.to_string())).unwrap();
ws.send(Message::text(json.to_string())).unwrap();

loop {
let msg = ws.read().unwrap();
Expand Down Expand Up @@ -2861,7 +2861,10 @@ async fn smoketest_webhook_source() {
assert_eq!(total_requests_metric.get_counter().get_value(), 100.0);

let path_label = &total_requests_metric.get_label()[0];
assert_eq!(path_label.value(), "/api/webhook/:database/:schema/:id");
assert_eq!(
path_label.value(),
"/api/webhook/{:database}/{:schema}/{:id}"
);

let status_label = &total_requests_metric.get_label()[2];
assert_eq!(status_label.value(), "200");
Expand Down Expand Up @@ -3057,10 +3060,10 @@ fn test_github_20262() {

let (mut ws, _resp) = tungstenite::connect(server.ws_addr()).unwrap();
test_util::auth_with_ws(&mut ws, BTreeMap::default()).unwrap();
ws.send(Message::Text(subscribe)).unwrap();
ws.send(Message::text(subscribe)).unwrap();
cancel();
ws.send(Message::Text(commit)).unwrap();
ws.send(Message::Text(select)).unwrap();
ws.send(Message::text(commit)).unwrap();
ws.send(Message::text(select)).unwrap();

let mut expect = VecDeque::from([
r#"{"type":"CommandStarting","payload":{"has_rows":true,"is_streaming":true}}"#.to_string(),
Expand Down Expand Up @@ -4353,7 +4356,7 @@ async fn test_double_encoded_json() {

let json = "{\"query\":\"SELECT a FROM t1 ORDER BY a;\"}";
let json: serde_json::Value = serde_json::from_str(json).unwrap();
ws.send(Message::Text(json.to_string())).unwrap();
ws.send(Message::text(json.to_string())).unwrap();

let mut read_msg = || -> WebSocketResponse {
let msg = ws.read().unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/environmentd/tests/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ impl MockHttpServer {
async fn new() -> MockHttpServer {
let (conn_tx, conn_rx) = mpsc::unbounded_channel();
let router = Router::new().route(
"/*path",
"/{*path}",
routing::get(|| async move {
let (response_tx, response_rx) = oneshot::channel();
conn_tx
Expand Down
2 changes: 1 addition & 1 deletion src/frontegg-auth/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ uuid = { version = "1.18.1", features = ["serde"] }
workspace-hack = { version = "0.0.0", path = "../workspace-hack", optional = true }

[dev-dependencies]
axum = "0.7.5"
axum = "0.8.6"
mz-ore = { path = "../ore", features = ["network", "test"] }
tokio = { version = "1.44.1", features = ["macros", "rt-multi-thread"] }

Expand Down
2 changes: 1 addition & 1 deletion src/frontegg-auth/src/client/tokens.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ mod tests {

// Fake server that returns the provided status code a few times before returning success.
let app = Router::new().route(
"/:status_code",
"/{:status_code}",
post(
|axum::extract::Path(code): axum::extract::Path<u16>| async move {
let cnt = count_.fetch_add(1, Ordering::Relaxed);
Expand Down
4 changes: 2 additions & 2 deletions src/frontegg-mock/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ workspace = true

[dependencies]
anyhow = "1.0.100"
axum = "0.7.5"
axum-extra = { version = "0.9.3", features = ["typed-header"] }
axum = "0.8.6"
axum-extra = { version = "0.10.3", features = ["typed-header"] }
base64 = "0.22.1"
chrono = { version = "0.4.39", default-features = false, features = ["serde"] }
clap = { version = "4.5.23", features = ["derive", "env"] }
Expand Down
Loading