diff --git a/Cargo.lock b/Cargo.lock index a11121c34..6bcf422e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -588,7 +588,7 @@ dependencies = [ "js-sys", "num-traits", "wasm-bindgen", - "windows-targets", + "windows-targets 0.52.6", ] [[package]] @@ -906,9 +906,9 @@ dependencies = [ "prometheus", "rand", "rayon", - "reqwest", + "reqwest 0.12.7", "rustls 0.23.12", - "rustls-native-certs", + "rustls-native-certs 0.7.2", "sentry", "serde_json", "tokio", @@ -968,6 +968,8 @@ dependencies = [ "http 0.2.12", "hyper 0.14.30", "mappable-rc", + "opentelemetry 0.23.0", + "opentelemetry-http 0.12.0", "p256", "paste", "prio", @@ -975,13 +977,14 @@ dependencies = [ "rand", "rayon", "rcgen", - "reqwest", + "reqwest 0.11.27", "serde", "serde_json", "thiserror", "tokio", "tower", "tracing", + "tracing-opentelemetry 0.24.0", "tracing-subscriber", "url", "webpki", @@ -1029,11 +1032,13 @@ dependencies = [ "hex", "http 1.1.0", "http-body-util", + "opentelemetry 0.24.0", + "opentelemetry-http 0.13.0", "paste", "prio", "prometheus", "rand", - "reqwest", + "reqwest 0.12.7", "reqwest-wasm", "ring", "serde", @@ -1042,6 +1047,7 @@ dependencies = [ "tower-service", "tracing", "tracing-core", + "tracing-opentelemetry 0.25.0", "tracing-subscriber", "url", "wasm-streams", @@ -1758,6 +1764,20 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-rustls" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" +dependencies = [ + "futures-util", + "http 0.2.12", + "hyper 0.14.30", + "rustls 0.21.12", + "tokio", + "tokio-rustls 0.24.1", +] + [[package]] name = "hyper-rustls" version = "0.27.2" @@ -1769,10 +1789,10 @@ dependencies = [ "hyper 1.4.1", "hyper-util", "rustls 0.23.12", - "rustls-native-certs", + "rustls-native-certs 0.7.2", "rustls-pki-types", "tokio", - "tokio-rustls", + "tokio-rustls 0.26.0", "tower-service", "webpki-roots 0.26.3", ] @@ -1972,7 +1992,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4979f22fdb869068da03c9f7528f8297c6fd2606bc3a4affe42e6a823fdb8da4" dependencies = [ "cfg-if", - "windows-targets", + "windows-targets 0.52.6", ] [[package]] @@ -2245,6 +2265,105 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opentelemetry" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b69a91d4893e713e06f724597ad630f1fa76057a5e1026c0ca67054a9032a76" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror", +] + +[[package]] +name = "opentelemetry" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c365a63eec4f55b7efeceb724f1336f26a9cf3427b70e59e2cd2a5b947fba96" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror", +] + +[[package]] +name = "opentelemetry-http" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0ba633e55c5ea6f431875ba55e71664f2fa5d3a90bd34ec9302eecc41c865dd" +dependencies = [ + "async-trait", + "bytes", + "http 0.2.12", + "opentelemetry 0.23.0", +] + +[[package]] +name = "opentelemetry-http" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad31e9de44ee3538fb9d64fe3376c1362f406162434609e79aea2a41a0af78ab" +dependencies = [ + "async-trait", + "bytes", + "http 1.1.0", + "opentelemetry 0.24.0", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae312d58eaa90a82d2e627fd86e075cf5230b3f11794e2ed74199ebbe572d4fd" +dependencies = [ + "async-trait", + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "lazy_static", + "once_cell", + "opentelemetry 0.23.0", + "ordered-float", + "percent-encoding", + "rand", + "thiserror", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "692eac490ec80f24a17828d49b40b60f5aeaccdfe6a503f939713afd22bc28df" +dependencies = [ + "async-trait", + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "once_cell", + "opentelemetry 0.24.0", + "percent-encoding", + "rand", + "thiserror", +] + +[[package]] +name = "ordered-float" +version = "4.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a91171844676f8c7990ce64959210cd2eaef32c2612c50f9fae9f8aaa6065a6" +dependencies = [ + "num-traits", +] + [[package]] name = "ordered-multimap" version = "0.4.3" @@ -2314,7 +2433,7 @@ dependencies = [ "libc", "redox_syscall", "smallvec", - "windows-targets", + "windows-targets 0.52.6", ] [[package]] @@ -2771,6 +2890,47 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b" +[[package]] +name = "reqwest" +version = "0.11.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dd67538700a17451e7cba03ac727fb961abb7607553461627b97de0b89cf4a62" +dependencies = [ + "base64 0.21.7", + "bytes", + "encoding_rs", + "futures-core", + "futures-util", + "h2", + "http 0.2.12", + "http-body 0.4.6", + "hyper 0.14.30", + "hyper-rustls 0.24.2", + "ipnet", + "js-sys", + "log", + "mime", + "once_cell", + "percent-encoding", + "pin-project-lite", + "rustls 0.21.12", + "rustls-native-certs 0.6.3", + "rustls-pemfile 1.0.4", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper 0.1.2", + "system-configuration", + "tokio", + "tokio-rustls 0.24.1", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", + "winreg 0.50.0", +] + [[package]] name = "reqwest" version = "0.12.7" @@ -2786,7 +2946,7 @@ dependencies = [ "http-body 1.0.1", "http-body-util", "hyper 1.4.1", - "hyper-rustls", + "hyper-rustls 0.27.2", "hyper-util", "ipnet", "js-sys", @@ -2797,15 +2957,15 @@ dependencies = [ "pin-project-lite", "quinn", "rustls 0.23.12", - "rustls-native-certs", - "rustls-pemfile", + "rustls-native-certs 0.7.2", + "rustls-pemfile 2.1.3", "rustls-pki-types", "serde", "serde_json", "serde_urlencoded", "sync_wrapper 1.0.1", "tokio", - "tokio-rustls", + "tokio-rustls 0.26.0", "tower-service", "url", "wasm-bindgen", @@ -2850,7 +3010,7 @@ dependencies = [ "wasm-bindgen", "wasm-bindgen-futures", "web-sys", - "winreg", + "winreg 0.10.1", ] [[package]] @@ -2937,9 +3097,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.34" +version = "0.38.35" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "70dc5ec042f7a43c4a73241207cecc9873a06d45debb38b329f8541d85c2730f" +checksum = "a85d50532239da68e9addb745ba38ff4612a242c1c7ceea689c4bc7c2f43c36f" dependencies = [ "bitflags 2.6.0", "errno", @@ -2976,6 +3136,18 @@ dependencies = [ "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" +dependencies = [ + "openssl-probe", + "rustls-pemfile 1.0.4", + "schannel", + "security-framework", +] + [[package]] name = "rustls-native-certs" version = "0.7.2" @@ -2983,12 +3155,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04182dffc9091a404e0fc069ea5cd60e5b866c3adf881eff99a32d048242dffa" dependencies = [ "openssl-probe", - "rustls-pemfile", + "rustls-pemfile 2.1.3", "rustls-pki-types", "schannel", "security-framework", ] +[[package]] +name = "rustls-pemfile" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c74cae0a4cf6ccbbf5f359f08efdf8ee7e1dc532573bf0db71968cb56b1448c" +dependencies = [ + "base64 0.21.7", +] + [[package]] name = "rustls-pemfile" version = "2.1.3" @@ -3123,7 +3304,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "00421ed8fa0c995f07cde48ba6c89e80f2b312f74ff637326f392fbfd23abe02" dependencies = [ "httpdate", - "reqwest", + "reqwest 0.12.7", "rustls 0.21.12", "sentry-backtrace", "sentry-contexts", @@ -3482,6 +3663,27 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "system-configuration" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" +dependencies = [ + "bitflags 1.3.2", + "core-foundation", + "system-configuration-sys", +] + +[[package]] +name = "system-configuration-sys" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "tap" version = "1.0.1" @@ -3646,6 +3848,16 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-rustls" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" +dependencies = [ + "rustls 0.21.12", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.26.0" @@ -3751,6 +3963,42 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f68803492bf28ab40aeccaecc7021096bd256baf7ca77c3d425d89b35a7be4e4" +dependencies = [ + "js-sys", + "once_cell", + "opentelemetry 0.23.0", + "opentelemetry_sdk 0.23.0", + "smallvec", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", + "web-time", +] + +[[package]] +name = "tracing-opentelemetry" +version = "0.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9784ed4da7d921bc8df6963f8c80a0e4ce34ba6ba76668acadd3edbd985ff3b" +dependencies = [ + "js-sys", + "once_cell", + "opentelemetry 0.24.0", + "opentelemetry_sdk 0.24.1", + "smallvec", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", + "web-time", +] + [[package]] name = "tracing-serde" version = "0.1.3" @@ -4027,6 +4275,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "webpki" version = "0.22.4" @@ -4102,7 +4360,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e48a53791691ab099e5e2ad123536d0fff50652600abaf43bbf952894110d0be" dependencies = [ "windows-core", - "windows-targets", + "windows-targets 0.52.6", ] [[package]] @@ -4111,7 +4369,7 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9" dependencies = [ - "windows-targets", + "windows-targets 0.52.6", ] [[package]] @@ -4122,7 +4380,7 @@ checksum = "e400001bb720a623c1c69032f8e3e4cf09984deec740f007dd2b03ec864804b0" dependencies = [ "windows-result", "windows-strings", - "windows-targets", + "windows-targets 0.52.6", ] [[package]] @@ -4131,7 +4389,7 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1d1043d8214f791817bab27572aaa8af63732e11bf84aa21a45a78d6c317ae0e" dependencies = [ - "windows-targets", + "windows-targets 0.52.6", ] [[package]] @@ -4141,7 +4399,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4cd9b125c486025df0eabcb585e62173c6c9eddcec5d117d3b6e8c30e2ee4d10" dependencies = [ "windows-result", - "windows-targets", + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-sys" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" +dependencies = [ + "windows-targets 0.48.5", ] [[package]] @@ -4150,7 +4417,7 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" dependencies = [ - "windows-targets", + "windows-targets 0.52.6", ] [[package]] @@ -4159,7 +4426,22 @@ version = "0.59.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e38bc4d79ed67fd075bcc251a1c39b32a1776bbe92e5bef1f0bf1f8c531853b" dependencies = [ - "windows-targets", + "windows-targets 0.52.6", +] + +[[package]] +name = "windows-targets" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" +dependencies = [ + "windows_aarch64_gnullvm 0.48.5", + "windows_aarch64_msvc 0.48.5", + "windows_i686_gnu 0.48.5", + "windows_i686_msvc 0.48.5", + "windows_x86_64_gnu 0.48.5", + "windows_x86_64_gnullvm 0.48.5", + "windows_x86_64_msvc 0.48.5", ] [[package]] @@ -4168,28 +4450,46 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" dependencies = [ - "windows_aarch64_gnullvm", - "windows_aarch64_msvc", - "windows_i686_gnu", + "windows_aarch64_gnullvm 0.52.6", + "windows_aarch64_msvc 0.52.6", + "windows_i686_gnu 0.52.6", "windows_i686_gnullvm", - "windows_i686_msvc", - "windows_x86_64_gnu", - "windows_x86_64_gnullvm", - "windows_x86_64_msvc", + "windows_i686_msvc 0.52.6", + "windows_x86_64_gnu 0.52.6", + "windows_x86_64_gnullvm 0.52.6", + "windows_x86_64_msvc 0.52.6", ] +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" + [[package]] name = "windows_aarch64_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" + [[package]] name = "windows_aarch64_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" +[[package]] +name = "windows_i686_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" + [[package]] name = "windows_i686_gnu" version = "0.52.6" @@ -4202,24 +4502,48 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" +[[package]] +name = "windows_i686_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" + [[package]] name = "windows_i686_msvc" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" + [[package]] name = "windows_x86_64_gnu" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" + [[package]] name = "windows_x86_64_gnullvm" version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" + [[package]] name = "windows_x86_64_msvc" version = "0.52.6" @@ -4235,6 +4559,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "winreg" +version = "0.50.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "524e57b2c537c0f9b1e69f1965311ec12182b4122e45035b1508cd24d2adadb1" +dependencies = [ + "cfg-if", + "windows-sys 0.48.0", +] + [[package]] name = "worker" version = "0.3.4" diff --git a/Cargo.toml b/Cargo.toml index a845880f3..129f99b72 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,8 +49,8 @@ criterion = { version = "0.5.1", features = ["async_tokio"] } deepsize = { version = "0.2.0" } futures = "0.3.30" getrandom = "0.2.15" -hex = { version = "0.4.3", features = ["serde"] } headers = "0.4" +hex = { version = "0.4.3", features = ["serde"] } hpke-rs = "0.2.0" hpke-rs-crypto = "0.2.0" hpke-rs-rust-crypto = "0.2.0" @@ -58,7 +58,10 @@ http = "1" http-body-util = "0.1" hyper = "0.14.29" itertools = "0.12.1" +mappable-rc = "0.1.1" matchit = "0.7.3" +opentelemetry = "0.24.0" +opentelemetry-http = "0.13.0" p256 = { version = "0.13.2", features = ["ecdsa-core", "ecdsa", "pem"] } paste = "1.0.15" pin-project = "1.1.5" @@ -83,6 +86,7 @@ tower = "0.4.13" tower-service = "0.3" tracing = "0.1.40" tracing-core = "0.1.32" +tracing-opentelemetry = "0.25.0" tracing-subscriber = "0.3.18" url = { version = "2.5.2", features = ["serde"] } webpki = "0.22.4" diff --git a/crates/daphne-server/Cargo.toml b/crates/daphne-server/Cargo.toml index 4e0067e4a..cfc942fb1 100644 --- a/crates/daphne-server/Cargo.toml +++ b/crates/daphne-server/Cargo.toml @@ -13,26 +13,33 @@ repository.workspace = true description = "Workers backend for Daphne" [dependencies] -axum = "0.6.0" +axum = "0.6.0" # held back to use http 0.2 daphne = { path = "../daphne" } daphne-service-utils = { path = "../daphne-service-utils", features = ["durable_requests"] } futures.workspace = true hex.workspace = true -http = "0.2" +http = "0.2" # held back to use http 0.2 hyper.workspace = true -mappable-rc = "0.1.1" +mappable-rc.workspace = true +opentelemetry = "0.23.0" # held back to use http 0.2 +opentelemetry-http = "0.12.0" # held back to use http 0.2 p256.workspace = true prio.workspace = true rayon.workspace = true -reqwest = { workspace = true, features = ["json"] } serde.workspace = true serde_json.workspace = true thiserror.workspace = true tokio.workspace = true tower.workspace = true tracing.workspace = true +tracing-opentelemetry = "0.24.0" # held back to use http 0.2 url.workspace = true +[dependencies.reqwest] +version = "0.11" # held back to use http 0.2 +default-features = false +features = ["rustls-tls-native-roots", "json"] + [dev-dependencies] anyhow.workspace = true assert_matches.workspace = true diff --git a/crates/daphne-server/src/roles/leader.rs b/crates/daphne-server/src/roles/leader.rs index 9cda28933..66b58b983 100644 --- a/crates/daphne-server/src/roles/leader.rs +++ b/crates/daphne-server/src/roles/leader.rs @@ -19,8 +19,6 @@ use daphne_service_utils::{auth::DaphneAuth, http_headers}; use tracing::{error, info}; use url::Url; -use crate::storage_proxy_connection::method_http_1_0_to_reqwest_0_11; - #[async_trait] impl DapAuthorizedSender for crate::App { async fn authorize( @@ -149,8 +147,6 @@ impl crate::App { ) -> Result { use reqwest::header::{self, HeaderMap, HeaderName, HeaderValue}; - let method = method_http_1_0_to_reqwest_0_11(method); - let content_type = req .media_type .and_then(|mt| mt.as_str_for_version(req.version)) diff --git a/crates/daphne-server/src/storage_proxy_connection/kv/cache.rs b/crates/daphne-server/src/storage_proxy_connection/kv/cache.rs index 96b5457f5..c17c384dd 100644 --- a/crates/daphne-server/src/storage_proxy_connection/kv/cache.rs +++ b/crates/daphne-server/src/storage_proxy_connection/kv/cache.rs @@ -76,6 +76,7 @@ impl Cache { ); } + #[allow(dead_code)] pub fn delete

(&mut self, key: &str) -> CacheResult where P: KvPrefix, diff --git a/crates/daphne-server/src/storage_proxy_connection/kv/mod.rs b/crates/daphne-server/src/storage_proxy_connection/kv/mod.rs index 4b7669798..f2e5f75a7 100644 --- a/crates/daphne-server/src/storage_proxy_connection/kv/mod.rs +++ b/crates/daphne-server/src/storage_proxy_connection/kv/mod.rs @@ -14,7 +14,7 @@ use tracing::{info_span, Instrument}; use crate::StorageProxyConfig; -use super::{status_http_1_0_to_reqwest_0_11, Error}; +use super::Error; pub(crate) use cache::Cache; use daphne::messages::Time; use daphne_service_utils::http_headers::STORAGE_PROXY_PUT_KV_EXPIRATION; @@ -230,13 +230,17 @@ impl<'h> Kv<'h> { prefix = std::any::type_name::

() ); async { - let resp = self + let mut req = self .http .get(self.config.url.join(&key).unwrap()) .bearer_auth(&self.config.auth_token) - .send() - .await?; - if resp.status() == status_http_1_0_to_reqwest_0_11(StatusCode::NOT_FOUND) { + .build()?; + + super::add_tracing_headers(&mut req); + + let resp = self.http.execute(req).await?; + + if resp.status() == StatusCode::NOT_FOUND { if opt.cache_not_found { self.cache.write().await.put::

(key, None); } @@ -276,13 +280,18 @@ impl<'h> Kv<'h> { .http .post(self.config.url.join(&key).unwrap()) .bearer_auth(&self.config.auth_token) - .body(serde_json::to_vec(&value).unwrap()); + .body(serde_json::to_vec(&value).unwrap()) + .build()?; if let Some(expiration) = expiration { - request = request.header(STORAGE_PROXY_PUT_KV_EXPIRATION, expiration.to_string()); + request + .headers_mut() + .insert(STORAGE_PROXY_PUT_KV_EXPIRATION, expiration.into()); } - request.send().await?.error_for_status()?; + super::add_tracing_headers(&mut request); + + self.http.execute(request).await?.error_for_status()?; self.cache.write().await.put::

(key, Some(value.into())); Ok(()) @@ -337,15 +346,20 @@ impl<'h> Kv<'h> { .http .put(self.config.url.join(&key).unwrap()) .bearer_auth(&self.config.auth_token) - .body(serde_json::to_vec(&value).unwrap()); + .body(serde_json::to_vec(&value).unwrap()) + .build()?; if let Some(expiration) = expiration { - request = request.header(STORAGE_PROXY_PUT_KV_EXPIRATION, expiration.to_string()); + request + .headers_mut() + .insert(STORAGE_PROXY_PUT_KV_EXPIRATION, expiration.into()); } - let response = request.send().await?; + super::add_tracing_headers(&mut request); + + let response = self.http.execute(request).await?; - if response.status() == status_http_1_0_to_reqwest_0_11(StatusCode::CONFLICT) { + if response.status() == StatusCode::CONFLICT { Ok(Some(value)) } else { response.error_for_status()?; diff --git a/crates/daphne-server/src/storage_proxy_connection/mod.rs b/crates/daphne-server/src/storage_proxy_connection/mod.rs index f526b381d..67b1a25a8 100644 --- a/crates/daphne-server/src/storage_proxy_connection/mod.rs +++ b/crates/daphne-server/src/storage_proxy_connection/mod.rs @@ -1,22 +1,21 @@ // Copyright (c) 2024 Cloudflare, Inc. All rights reserved. // SPDX-License-Identifier: BSD-3-Clause -#![allow(unused_variables)] -#![allow(clippy::unused_async)] -#![allow(dead_code)] - pub(crate) mod kv; use std::fmt::Debug; -use axum::http::{Method, StatusCode}; +use axum::http::StatusCode; use daphne_service_utils::durable_requests::{ bindings::{DurableMethod, DurableRequestPayload, DurableRequestPayloadExt}, DurableRequest, ObjectIdFrom, DO_PATH_PREFIX, }; +use opentelemetry_http::HeaderInjector; use serde::de::DeserializeOwned; pub(crate) use kv::Kv; +use tracing::Span; +use tracing_opentelemetry::OpenTelemetrySpanExt; use crate::StorageProxyConfig; @@ -46,6 +45,7 @@ impl<'h> Do<'h> { } } + #[allow(dead_code)] pub fn with_retry(self) -> Self { Self { retry: true, @@ -77,20 +77,23 @@ impl<'d, B: DurableMethod + Debug, P: AsRef<[u8]>> RequestBuilder<'d, B, P> { .url .join(&format!("{DO_PATH_PREFIX}{}", self.path.to_uri())) .unwrap(); - let resp = self + let mut req = self .durable .http .post(url) .body(self.request.into_bytes()) .bearer_auth(&self.durable.config.auth_token) - .send() - .await?; + .build()?; + + add_tracing_headers(&mut req); + + let resp = self.durable.http.execute(req).await?; if resp.status().is_success() { Ok(resp.json().await?) } else { Err(Error::Http { - status: status_reqwest_0_11_to_http_1_0(resp.status()), + status: resp.status(), body: resp.text().await?, }) } @@ -129,6 +132,7 @@ impl<'w> Do<'w> { } } + #[allow(dead_code)] pub fn request_with_id( &self, path: B, @@ -147,37 +151,9 @@ impl<'w> Do<'w> { } } -/// this is needed while [reqwest#2039](https://github.com/seanmonstar/reqwest/issues/2039) isn't -/// completed. -/// -/// This is because axum is using http 1.0 and reqwest is still in http 0.2 -pub fn method_http_1_0_to_reqwest_0_11(method: Method) -> reqwest::Method { - match method { - Method::GET => reqwest::Method::GET, - Method::POST => reqwest::Method::POST, - Method::PUT => reqwest::Method::PUT, - Method::PATCH => reqwest::Method::PATCH, - Method::HEAD => reqwest::Method::HEAD, - Method::TRACE => reqwest::Method::TRACE, - Method::OPTIONS => reqwest::Method::OPTIONS, - Method::CONNECT => reqwest::Method::CONNECT, - Method::DELETE => reqwest::Method::DELETE, - _ => unreachable!(), - } -} - -/// this is needed while [reqwest#2039](https://github.com/seanmonstar/reqwest/issues/2039) isn't -/// completed. -/// -/// This is because axum is using http 1.0 and reqwest is still in http 0.2 -pub fn status_http_1_0_to_reqwest_0_11(status: StatusCode) -> reqwest::StatusCode { - reqwest::StatusCode::from_u16(status.as_u16()).unwrap() -} - -/// this is needed while [reqwest#2039](https://github.com/seanmonstar/reqwest/issues/2039) isn't -/// completed. -/// -/// This is because axum is using http 1.0 and reqwest is still in http 0.2 -pub fn status_reqwest_0_11_to_http_1_0(status: reqwest::StatusCode) -> StatusCode { - StatusCode::from_u16(status.as_u16()).unwrap() +pub fn add_tracing_headers(req: &mut reqwest::Request) { + let ctx = Span::current().context(); + opentelemetry::global::get_text_map_propagator(|propagator| { + propagator.inject_context(&ctx, &mut HeaderInjector(req.headers_mut())); + }); } diff --git a/crates/daphne-worker/Cargo.toml b/crates/daphne-worker/Cargo.toml index bb7a3406f..17e0c56a6 100644 --- a/crates/daphne-worker/Cargo.toml +++ b/crates/daphne-worker/Cargo.toml @@ -30,6 +30,8 @@ headers.workspace = true hex.workspace = true http-body-util.workspace = true http.workspace = true +opentelemetry-http.workspace = true +opentelemetry.workspace = true prio.workspace = true prometheus.workspace = true rand.workspace = true @@ -39,6 +41,7 @@ serde-wasm-bindgen.workspace = true serde.workspace = true serde_json.workspace = true tracing-core.workspace = true +tracing-opentelemetry.workspace = true tracing-subscriber = { workspace = true, features = ["env-filter", "json"]} tracing.workspace = true url.workspace = true diff --git a/crates/daphne-worker/src/durable/mod.rs b/crates/daphne-worker/src/durable/mod.rs index cd3844457..4cf7601a5 100644 --- a/crates/daphne-worker/src/durable/mod.rs +++ b/crates/daphne-worker/src/durable/mod.rs @@ -23,12 +23,12 @@ pub(crate) mod aggregate_store; #[cfg(feature = "test-utils")] pub(crate) mod test_state_cleaner; -use crate::tracing_utils::shorten_paths; use daphne_service_utils::durable_requests::bindings::{ DurableMethod, DurableRequestPayload, DurableRequestPayloadExt, }; use serde::{Deserialize, Serialize}; use tracing::info_span; +use tracing_opentelemetry::OpenTelemetrySpanExt as _; use worker::{Env, Error, Request, Response, Result, ScheduledTime, State}; pub use aggregate_store::AggregateStore; @@ -227,9 +227,11 @@ where } fn create_span_from_request(req: &Request) -> tracing::Span { - let path = req.path(); - let span = info_span!("DO span", p = %shorten_paths(path.split('/')).display()); - span.in_scope(|| tracing::info!(path, "DO handling new request")); + let extractor = crate::tracing_utils::HeaderExtractor::new(req); + let remote_context = + opentelemetry::global::get_text_map_propagator(|propagator| propagator.extract(&extractor)); + let span = info_span!("durable object", path = req.path()); + span.set_parent(remote_context); span } diff --git a/crates/daphne-worker/src/durable/test_state_cleaner.rs b/crates/daphne-worker/src/durable/test_state_cleaner.rs index af1ae060a..77fb1fa14 100644 --- a/crates/daphne-worker/src/durable/test_state_cleaner.rs +++ b/crates/daphne-worker/src/durable/test_state_cleaner.rs @@ -3,15 +3,15 @@ use std::{cmp::min, ops::ControlFlow}; -use crate::{durable::create_span_from_request, initialize_tracing, int_err}; +use crate::{durable::create_span_from_request, int_err}; use daphne::messages::TaskId; use daphne_service_utils::durable_requests::bindings::{self, DurableMethod}; use rand::{thread_rng, Rng}; use serde::{Deserialize, Serialize}; -use tracing::{error, trace, Instrument}; +use tracing::Instrument; use worker::{ - async_trait, durable_object, wasm_bindgen, wasm_bindgen_futures, Date, Env, ListOptions, - Method, Request, Response, Result, State, Stub, + async_trait, console_debug, console_error, durable_object, wasm_bindgen, wasm_bindgen_futures, + Date, Env, ListOptions, Method, Request, Response, Result, State, Stub, }; use super::GcDurableObject; @@ -27,7 +27,6 @@ pub struct TestStateCleaner { #[durable_object] impl DurableObject for TestStateCleaner { fn new(state: State, env: Env) -> Self { - initialize_tracing(&env); Self { state, env } } @@ -49,17 +48,17 @@ impl TestStateCleaner { bindings::AggregateStore::BINDING => (), s => { let message = format!("GarbageCollector: unrecognized binding: {s}"); - error!("{}", message); + console_error!("{}", message); return Err(int_err(message)); } }; let queued = DurableOrdered::new_roughly_ordered(durable_ref, "object"); queued.put(&self.state).await?; - trace!( + console_debug!( + "registered DO instance for deletion. binding: {binding}, instance: {instance}", binding = queued.as_ref().binding, instance = queued.as_ref().id_hex, - "registered DO instance for deletion", ); Response::from_json(&()) } @@ -87,10 +86,10 @@ impl TestStateCleaner { &(), ) .await?; - trace!( + console_debug!( + "deleted instance. binding: {binding}. instance: {instance}", binding = durable_ref.binding, instance = durable_ref.id_hex, - "deleted instance", ); } @@ -104,7 +103,7 @@ impl TestStateCleaner { req.method(), req.path() ); - error!("{}", message); + console_error!("{}", message); Err(int_err(message)) } } diff --git a/crates/daphne-worker/src/storage_proxy/mod.rs b/crates/daphne-worker/src/storage_proxy/mod.rs index f41eafbb2..6dbea357f 100644 --- a/crates/daphne-worker/src/storage_proxy/mod.rs +++ b/crates/daphne-worker/src/storage_proxy/mod.rs @@ -90,9 +90,11 @@ use daphne_service_utils::durable_requests::{ use daphne_service_utils::http_headers::STORAGE_PROXY_PUT_KV_EXPIRATION; use headers::Header; use http::{HeaderMap, StatusCode}; +use opentelemetry_http::HeaderExtractor; use prometheus::Registry; use tower_service::Service; -use tracing::warn; +use tracing::{info_span, warn}; +use tracing_opentelemetry::OpenTelemetrySpanExt; use url::Url; use worker::{ js_sys::Uint8Array, Date, Delay, Env, HttpRequest, HttpResponse, Request, RequestInit, @@ -127,6 +129,15 @@ impl IntoResponse for Error { /// Handle a proxy request. This is the entry point of the Worker. pub async fn handle_request(req: HttpRequest, env: Env, registry: &Registry) -> Response { + let span = info_span!("handle_request", path = req.uri().path(), method = ?req.method()); + { + let extractor = HeaderExtractor(req.headers()); + let remote_context = opentelemetry::global::get_text_map_propagator(|propagator| { + propagator.extract(&extractor) + }); + span.set_parent(remote_context); + } + let ctx = Arc::new(RequestContext { metrics: Metrics::new(registry), env, @@ -186,11 +197,13 @@ async fn storage_purge(ctx: State>) -> impl IntoResponse + ' }; let do_delete = async { - let req = Request::new_with_init( + let mut req = Request::new_with_init( &format!("https://fake-host{}", TestStateCleaner::DeleteAll.to_uri()), RequestInit::new().with_method(worker::Method::Post), )?; + crate::tracing_utils::add_tracing_headers(&mut req); + ctx.env .durable_object(TestStateCleaner::BINDING)? .id_from_name(TestStateCleaner::NAME_STR)? @@ -393,7 +406,7 @@ async fn handle_do_request( let durable_request = DurableRequest::try_from(body.as_ref()) .map_err(|e| worker::Error::RustError(format!("invalid format: {e:?}")))?; - let http_request = { + let mut http_request = { let mut do_req = RequestInit::new(); do_req.with_method(worker::Method::Post); do_req.with_headers(headers.into()); @@ -414,6 +427,7 @@ async fn handle_do_request( .unwrap(); Request::new_with_init(url.as_str(), &do_req)? }; + crate::tracing_utils::add_tracing_headers(&mut http_request); let binding = ctx.env.durable_object(&durable_request.binding)?; let obj = match &durable_request.id { diff --git a/crates/daphne-worker/src/tracing_utils/mod.rs b/crates/daphne-worker/src/tracing_utils/mod.rs index 4da95a478..9476ef176 100644 --- a/crates/daphne-worker/src/tracing_utils/mod.rs +++ b/crates/daphne-worker/src/tracing_utils/mod.rs @@ -6,8 +6,9 @@ mod workers_json_layer; use chrono::{SecondsFormat, Utc}; use std::{collections::HashMap, fmt::Result as FmtResult, io, path::PathBuf, str, sync::Once}; -use tracing::field::Visit; +use tracing::{field::Visit, Span}; use tracing_core::Field; +use tracing_opentelemetry::OpenTelemetrySpanExt; use tracing_subscriber::{ fmt, fmt::{format::Writer, time::FormatTime}, @@ -232,6 +233,49 @@ pub fn initialize_tracing(env: &Env) { }); } +/// Helper for extracting headers from HTTP Requests. This is used for OpenTelemetry context +/// propagation over HTTP. +/// See [this](https://github.com/open-telemetry/opentelemetry-rust/blob/main/examples/tracing-http-propagator/README.md) +/// for example usage. +pub struct HeaderExtractor(HashMap); + +impl HeaderExtractor { + pub fn new(req: &worker::Request) -> Self { + Self(req.headers().into_iter().collect()) + } +} + +impl opentelemetry::propagation::Extractor for HeaderExtractor { + fn get(&self, key: &str) -> Option<&str> { + self.0.get(key).map(|s| s.as_str()) + } + + fn keys(&self) -> Vec<&str> { + self.0.keys().map(|k| k.as_str()).collect::>() + } +} + +pub struct HeaderInjector<'a>(pub &'a mut worker::Headers); + +impl<'a> opentelemetry::propagation::Injector for HeaderInjector<'a> { + fn set(&mut self, key: &str, value: String) { + self.0.set(key, &value).expect("header name is invalid"); + } +} + +pub fn add_tracing_headers(req: &mut worker::Request) { + let ctx = Span::current().context(); + opentelemetry::global::get_text_map_propagator(|propagator| { + propagator.inject_context( + &ctx, + &mut HeaderInjector( + req.headers_mut() + .expect("this request was received and can't be sent"), + ), + ); + }); +} + #[cfg(test)] mod test { use std::path::PathBuf;