Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
398 changes: 366 additions & 32 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,19 @@ criterion = { version = "0.5.1", features = ["async_tokio"] }
deepsize = { version = "0.2.0" }
futures = "0.3.30"
getrandom = "0.2.15"
hex = { version = "0.4.3", features = ["serde"] }
headers = "0.4"
hex = { version = "0.4.3", features = ["serde"] }
hpke-rs = "0.2.0"
hpke-rs-crypto = "0.2.0"
hpke-rs-rust-crypto = "0.2.0"
http = "1"
http-body-util = "0.1"
hyper = "0.14.29"
itertools = "0.12.1"
mappable-rc = "0.1.1"
matchit = "0.7.3"
opentelemetry = "0.24.0"
opentelemetry-http = "0.13.0"
p256 = { version = "0.13.2", features = ["ecdsa-core", "ecdsa", "pem"] }
paste = "1.0.15"
pin-project = "1.1.5"
Expand All @@ -83,6 +86,7 @@ tower = "0.4.13"
tower-service = "0.3"
tracing = "0.1.40"
tracing-core = "0.1.32"
tracing-opentelemetry = "0.25.0"
tracing-subscriber = "0.3.18"
url = { version = "2.5.2", features = ["serde"] }
webpki = "0.22.4"
Expand Down
15 changes: 11 additions & 4 deletions crates/daphne-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,33 @@ repository.workspace = true
description = "Workers backend for Daphne"

[dependencies]
axum = "0.6.0"
axum = "0.6.0" # held back to use http 0.2
daphne = { path = "../daphne" }
daphne-service-utils = { path = "../daphne-service-utils", features = ["durable_requests"] }
futures.workspace = true
hex.workspace = true
http = "0.2"
http = "0.2" # held back to use http 0.2
hyper.workspace = true
mappable-rc = "0.1.1"
mappable-rc.workspace = true
opentelemetry = "0.23.0" # held back to use http 0.2
opentelemetry-http = "0.12.0" # held back to use http 0.2
p256.workspace = true
prio.workspace = true
rayon.workspace = true
reqwest = { workspace = true, features = ["json"] }
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true
tokio.workspace = true
tower.workspace = true
tracing.workspace = true
tracing-opentelemetry = "0.24.0" # held back to use http 0.2
url.workspace = true

[dependencies.reqwest]
version = "0.11" # held back to use http 0.2
default-features = false
features = ["rustls-tls-native-roots", "json"]

[dev-dependencies]
anyhow.workspace = true
assert_matches.workspace = true
Expand Down
4 changes: 0 additions & 4 deletions crates/daphne-server/src/roles/leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ use daphne_service_utils::{auth::DaphneAuth, http_headers};
use tracing::{error, info};
use url::Url;

use crate::storage_proxy_connection::method_http_1_0_to_reqwest_0_11;

#[async_trait]
impl DapAuthorizedSender<DaphneAuth> for crate::App {
async fn authorize(
Expand Down Expand Up @@ -149,8 +147,6 @@ impl crate::App {
) -> Result<DapResponse, DapError> {
use reqwest::header::{self, HeaderMap, HeaderName, HeaderValue};

let method = method_http_1_0_to_reqwest_0_11(method);

let content_type = req
.media_type
.and_then(|mt| mt.as_str_for_version(req.version))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ impl Cache {
);
}

#[allow(dead_code)]
pub fn delete<P>(&mut self, key: &str) -> CacheResult<P::Value>
where
P: KvPrefix,
Expand Down
38 changes: 26 additions & 12 deletions crates/daphne-server/src/storage_proxy_connection/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tracing::{info_span, Instrument};

use crate::StorageProxyConfig;

use super::{status_http_1_0_to_reqwest_0_11, Error};
use super::Error;
pub(crate) use cache::Cache;
use daphne::messages::Time;
use daphne_service_utils::http_headers::STORAGE_PROXY_PUT_KV_EXPIRATION;
Expand Down Expand Up @@ -230,13 +230,17 @@ impl<'h> Kv<'h> {
prefix = std::any::type_name::<P>()
);
async {
let resp = self
let mut req = self
.http
.get(self.config.url.join(&key).unwrap())
.bearer_auth(&self.config.auth_token)
.send()
.await?;
if resp.status() == status_http_1_0_to_reqwest_0_11(StatusCode::NOT_FOUND) {
.build()?;

super::add_tracing_headers(&mut req);

let resp = self.http.execute(req).await?;

if resp.status() == StatusCode::NOT_FOUND {
if opt.cache_not_found {
self.cache.write().await.put::<P>(key, None);
}
Expand Down Expand Up @@ -276,13 +280,18 @@ impl<'h> Kv<'h> {
.http
.post(self.config.url.join(&key).unwrap())
.bearer_auth(&self.config.auth_token)
.body(serde_json::to_vec(&value).unwrap());
.body(serde_json::to_vec(&value).unwrap())
.build()?;

if let Some(expiration) = expiration {
request = request.header(STORAGE_PROXY_PUT_KV_EXPIRATION, expiration.to_string());
request
.headers_mut()
.insert(STORAGE_PROXY_PUT_KV_EXPIRATION, expiration.into());
}

request.send().await?.error_for_status()?;
super::add_tracing_headers(&mut request);

self.http.execute(request).await?.error_for_status()?;

self.cache.write().await.put::<P>(key, Some(value.into()));
Ok(())
Expand Down Expand Up @@ -337,15 +346,20 @@ impl<'h> Kv<'h> {
.http
.put(self.config.url.join(&key).unwrap())
.bearer_auth(&self.config.auth_token)
.body(serde_json::to_vec(&value).unwrap());
.body(serde_json::to_vec(&value).unwrap())
.build()?;

if let Some(expiration) = expiration {
request = request.header(STORAGE_PROXY_PUT_KV_EXPIRATION, expiration.to_string());
request
.headers_mut()
.insert(STORAGE_PROXY_PUT_KV_EXPIRATION, expiration.into());
}

let response = request.send().await?;
super::add_tracing_headers(&mut request);

let response = self.http.execute(request).await?;

if response.status() == status_http_1_0_to_reqwest_0_11(StatusCode::CONFLICT) {
if response.status() == StatusCode::CONFLICT {
Ok(Some(value))
} else {
response.error_for_status()?;
Expand Down
60 changes: 18 additions & 42 deletions crates/daphne-server/src/storage_proxy_connection/mod.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
// Copyright (c) 2024 Cloudflare, Inc. All rights reserved.
// SPDX-License-Identifier: BSD-3-Clause

#![allow(unused_variables)]
#![allow(clippy::unused_async)]
#![allow(dead_code)]

pub(crate) mod kv;

use std::fmt::Debug;

use axum::http::{Method, StatusCode};
use axum::http::StatusCode;
use daphne_service_utils::durable_requests::{
bindings::{DurableMethod, DurableRequestPayload, DurableRequestPayloadExt},
DurableRequest, ObjectIdFrom, DO_PATH_PREFIX,
};
use opentelemetry_http::HeaderInjector;
use serde::de::DeserializeOwned;

pub(crate) use kv::Kv;
use tracing::Span;
use tracing_opentelemetry::OpenTelemetrySpanExt;

use crate::StorageProxyConfig;

Expand Down Expand Up @@ -46,6 +45,7 @@ impl<'h> Do<'h> {
}
}

#[allow(dead_code)]
pub fn with_retry(self) -> Self {
Self {
retry: true,
Expand Down Expand Up @@ -77,20 +77,23 @@ impl<'d, B: DurableMethod + Debug, P: AsRef<[u8]>> RequestBuilder<'d, B, P> {
.url
.join(&format!("{DO_PATH_PREFIX}{}", self.path.to_uri()))
.unwrap();
let resp = self
let mut req = self
.durable
.http
.post(url)
.body(self.request.into_bytes())
.bearer_auth(&self.durable.config.auth_token)
.send()
.await?;
.build()?;

add_tracing_headers(&mut req);

let resp = self.durable.http.execute(req).await?;

if resp.status().is_success() {
Ok(resp.json().await?)
} else {
Err(Error::Http {
status: status_reqwest_0_11_to_http_1_0(resp.status()),
status: resp.status(),
body: resp.text().await?,
})
}
Expand Down Expand Up @@ -129,6 +132,7 @@ impl<'w> Do<'w> {
}
}

#[allow(dead_code)]
pub fn request_with_id<B: DurableMethod + Copy>(
&self,
path: B,
Expand All @@ -147,37 +151,9 @@ impl<'w> Do<'w> {
}
}

/// this is needed while [reqwest#2039](https://github.com/seanmonstar/reqwest/issues/2039) isn't
/// completed.
///
/// This is because axum is using http 1.0 and reqwest is still in http 0.2
pub fn method_http_1_0_to_reqwest_0_11(method: Method) -> reqwest::Method {
match method {
Method::GET => reqwest::Method::GET,
Method::POST => reqwest::Method::POST,
Method::PUT => reqwest::Method::PUT,
Method::PATCH => reqwest::Method::PATCH,
Method::HEAD => reqwest::Method::HEAD,
Method::TRACE => reqwest::Method::TRACE,
Method::OPTIONS => reqwest::Method::OPTIONS,
Method::CONNECT => reqwest::Method::CONNECT,
Method::DELETE => reqwest::Method::DELETE,
_ => unreachable!(),
}
}

/// this is needed while [reqwest#2039](https://github.com/seanmonstar/reqwest/issues/2039) isn't
/// completed.
///
/// This is because axum is using http 1.0 and reqwest is still in http 0.2
pub fn status_http_1_0_to_reqwest_0_11(status: StatusCode) -> reqwest::StatusCode {
reqwest::StatusCode::from_u16(status.as_u16()).unwrap()
}

/// this is needed while [reqwest#2039](https://github.com/seanmonstar/reqwest/issues/2039) isn't
/// completed.
///
/// This is because axum is using http 1.0 and reqwest is still in http 0.2
pub fn status_reqwest_0_11_to_http_1_0(status: reqwest::StatusCode) -> StatusCode {
StatusCode::from_u16(status.as_u16()).unwrap()
pub fn add_tracing_headers(req: &mut reqwest::Request) {
let ctx = Span::current().context();
opentelemetry::global::get_text_map_propagator(|propagator| {
propagator.inject_context(&ctx, &mut HeaderInjector(req.headers_mut()));
});
}
3 changes: 3 additions & 0 deletions crates/daphne-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ headers.workspace = true
hex.workspace = true
http-body-util.workspace = true
http.workspace = true
opentelemetry-http.workspace = true
opentelemetry.workspace = true
prio.workspace = true
prometheus.workspace = true
rand.workspace = true
Expand All @@ -39,6 +41,7 @@ serde-wasm-bindgen.workspace = true
serde.workspace = true
serde_json.workspace = true
tracing-core.workspace = true
tracing-opentelemetry.workspace = true
tracing-subscriber = { workspace = true, features = ["env-filter", "json"]}
tracing.workspace = true
url.workspace = true
Expand Down
10 changes: 6 additions & 4 deletions crates/daphne-worker/src/durable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ pub(crate) mod aggregate_store;
#[cfg(feature = "test-utils")]
pub(crate) mod test_state_cleaner;

use crate::tracing_utils::shorten_paths;
use daphne_service_utils::durable_requests::bindings::{
DurableMethod, DurableRequestPayload, DurableRequestPayloadExt,
};
use serde::{Deserialize, Serialize};
use tracing::info_span;
use tracing_opentelemetry::OpenTelemetrySpanExt as _;
use worker::{Env, Error, Request, Response, Result, ScheduledTime, State};

pub use aggregate_store::AggregateStore;
Expand Down Expand Up @@ -227,9 +227,11 @@ where
}

fn create_span_from_request(req: &Request) -> tracing::Span {
let path = req.path();
let span = info_span!("DO span", p = %shorten_paths(path.split('/')).display());
span.in_scope(|| tracing::info!(path, "DO handling new request"));
let extractor = crate::tracing_utils::HeaderExtractor::new(req);
let remote_context =
opentelemetry::global::get_text_map_propagator(|propagator| propagator.extract(&extractor));
let span = info_span!("durable object", path = req.path());
span.set_parent(remote_context);
span
}

Expand Down
Loading