From bf60d1ba2e571bcbc5c7b76bf316f29065dc4893 Mon Sep 17 00:00:00 2001 From: William Law Date: Fri, 17 Oct 2025 13:55:16 -0400 Subject: [PATCH 01/10] spike tracing approach --- Cargo.lock | 233 +++++++++++++++++++++++++++--- Cargo.toml | 13 ++ crates/ingress-rpc/Cargo.toml | 6 + crates/ingress-rpc/src/main.rs | 66 ++++++++- crates/ingress-rpc/src/service.rs | 6 +- 5 files changed, 300 insertions(+), 24 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4d6f59f5..037f3f20 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6568,6 +6568,41 @@ dependencies = [ "tracing", ] +[[package]] +name = "opentelemetry" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b84bcd6ae87133e903af7ef497404dda70c60d0ea14895fc8a5e6722754fc2a0" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "pin-project-lite", + "thiserror 2.0.17", + "tracing", +] + +[[package]] +name = "opentelemetry-datadog" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a6b2d4db32343691eb945e6153e5a4bd494dbf9d931d5bf7d1d7f59bee156d0" +dependencies = [ + "ahash", + "http 1.3.1", + "indexmap 2.11.4", + "itoa", + "opentelemetry 0.31.0", + "opentelemetry-http 0.31.0", + "opentelemetry-semantic-conventions", + "opentelemetry_sdk 0.31.0", + "reqwest", + "rmp", + "ryu", + "thiserror 2.0.17", + "url", +] + [[package]] name = "opentelemetry-http" version = "0.28.0" @@ -6577,11 +6612,24 @@ dependencies = [ "async-trait", "bytes", "http 1.3.1", - "opentelemetry", + "opentelemetry 0.28.0", "reqwest", "tracing", ] +[[package]] +name = "opentelemetry-http" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7a6d09a73194e6b66df7c8f1b680f156d916a1a942abf2de06823dd02b7855d" +dependencies = [ + "async-trait", + "bytes", + "http 1.3.1", + "opentelemetry 0.31.0", + "reqwest", +] + [[package]] name = "opentelemetry-otlp" version = "0.28.0" @@ -6591,16 +6639,36 @@ dependencies = [ "async-trait", "futures-core", "http 1.3.1", - "opentelemetry", - "opentelemetry-http", - "opentelemetry-proto", - "opentelemetry_sdk", - "prost", + "opentelemetry 0.28.0", + "opentelemetry-http 0.28.0", + "opentelemetry-proto 0.28.0", + "opentelemetry_sdk 0.28.0", + "prost 0.13.5", + "reqwest", + "serde_json", + "thiserror 2.0.17", + "tokio", + "tonic 0.12.3", + "tracing", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a2366db2dca4d2ad033cad11e6ee42844fd727007af5ad04a1730f4cb8163bf" +dependencies = [ + "http 1.3.1", + "opentelemetry 0.31.0", + "opentelemetry-http 0.31.0", + "opentelemetry-proto 0.31.0", + "opentelemetry_sdk 0.31.0", + "prost 0.14.1", "reqwest", "serde_json", "thiserror 2.0.17", "tokio", - "tonic", + "tonic 0.14.2", "tracing", ] @@ -6612,13 +6680,36 @@ checksum = "56f8870d3024727e99212eb3bb1762ec16e255e3e6f58eeb3dc8db1aa226746d" dependencies = [ "base64 0.22.1", "hex", - "opentelemetry", - "opentelemetry_sdk", - "prost", + "opentelemetry 0.28.0", + "opentelemetry_sdk 0.28.0", + "prost 0.13.5", "serde", - "tonic", + "tonic 0.12.3", ] +[[package]] +name = "opentelemetry-proto" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7175df06de5eaee9909d4805a3d07e28bb752c34cab57fa9cff549da596b30f" +dependencies = [ + "base64 0.22.1", + "const-hex", + "opentelemetry 0.31.0", + "opentelemetry_sdk 0.31.0", + "prost 0.14.1", + "serde", + "serde_json", + "tonic 0.14.2", + "tonic-prost", +] + +[[package]] +name = "opentelemetry-semantic-conventions" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e62e29dfe041afb8ed2a6c9737ab57db4907285d999ef8ad3a59092a36bdc846" + [[package]] name = "opentelemetry_sdk" version = "0.28.0" @@ -6630,7 +6721,7 @@ dependencies = [ "futures-executor", "futures-util", "glob", - "opentelemetry", + "opentelemetry 0.28.0", "percent-encoding", "rand 0.8.5", "serde_json", @@ -6640,6 +6731,23 @@ dependencies = [ "tracing", ] +[[package]] +name = "opentelemetry_sdk" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e14ae4f5991976fd48df6d843de219ca6d31b01daaab2dad5af2badeded372bd" +dependencies = [ + "futures-channel", + "futures-executor", + "futures-util", + "opentelemetry 0.31.0", + "percent-encoding", + "rand 0.9.2", + "thiserror 2.0.17", + "tokio", + "tokio-stream", +] + [[package]] name = "option-ext" version = "0.2.0" @@ -7141,7 +7249,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" dependencies = [ "bytes", - "prost-derive", + "prost-derive 0.13.5", +] + +[[package]] +name = "prost" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7231bd9b3d3d33c86b58adbac74b5ec0ad9f496b19d22801d773636feaa95f3d" +dependencies = [ + "bytes", + "prost-derive 0.14.1", ] [[package]] @@ -7157,6 +7275,19 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "prost-derive" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9120690fafc389a67ba3803df527d0ec9cbbc9cc45e4cc20b332996dfb672425" +dependencies = [ + "anyhow", + "itertools 0.14.0", + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "pulldown-cmark" version = "0.9.6" @@ -10894,9 +11025,9 @@ dependencies = [ "metrics-util", "moka", "op-alloy-rpc-types-engine", - "opentelemetry", - "opentelemetry-otlp", - "opentelemetry_sdk", + "opentelemetry 0.28.0", + "opentelemetry-otlp 0.28.0", + "opentelemetry_sdk 0.28.0", "parking_lot", "paste", "reth-optimism-payload-builder", @@ -10912,7 +11043,7 @@ dependencies = [ "tower 0.5.2", "tower-http", "tracing", - "tracing-opentelemetry", + "tracing-opentelemetry 0.29.0", "tracing-subscriber 0.3.20", "url", "vergen", @@ -12529,6 +12660,11 @@ dependencies = [ "op-alloy-consensus", "op-alloy-network", "op-revm", + "opentelemetry 0.31.0", + "opentelemetry-datadog", + "opentelemetry-otlp 0.31.0", + "opentelemetry-semantic-conventions", + "opentelemetry_sdk 0.31.0", "rdkafka", "reth-optimism-evm", "reth-rpc-eth-types", @@ -12536,6 +12672,7 @@ dependencies = [ "serde_json", "tokio", "tracing", + "tracing-opentelemetry 0.32.0", "tracing-subscriber 0.3.20", "url", ] @@ -12798,7 +12935,7 @@ dependencies = [ "hyper-util", "percent-encoding", "pin-project", - "prost", + "prost 0.13.5", "socket2 0.5.10", "tokio", "tokio-stream", @@ -12808,6 +12945,43 @@ dependencies = [ "tracing", ] +[[package]] +name = "tonic" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb7613188ce9f7df5bfe185db26c5814347d110db17920415cf2fbcad85e7203" +dependencies = [ + "async-trait", + "base64 0.22.1", + "bytes", + "http 1.3.1", + "http-body 1.0.1", + "http-body-util", + "hyper 1.7.0", + "hyper-timeout", + "hyper-util", + "percent-encoding", + "pin-project", + "sync_wrapper", + "tokio", + "tokio-stream", + "tower 0.5.2", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tonic-prost" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66bd50ad6ce1252d87ef024b3d64fe4c3cf54a86fb9ef4c631fdd0ded7aeaa67" +dependencies = [ + "bytes", + "prost 0.14.1", + "tonic 0.14.2", +] + [[package]] name = "tower" version = "0.4.13" @@ -12988,8 +13162,8 @@ checksum = "721f2d2569dce9f3dfbbddee5906941e953bfcdf736a62da3377f5751650cc36" dependencies = [ "js-sys", "once_cell", - "opentelemetry", - "opentelemetry_sdk", + "opentelemetry 0.28.0", + "opentelemetry_sdk 0.28.0", "smallvec", "tracing", "tracing-core", @@ -12998,6 +13172,25 @@ dependencies = [ "web-time", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e6e5658463dd88089aba75c7791e1d3120633b1bfde22478b28f625a9bb1b8e" +dependencies = [ + "js-sys", + "opentelemetry 0.31.0", + "opentelemetry_sdk 0.31.0", + "rustversion", + "smallvec", + "thiserror 2.0.17", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber 0.3.20", + "web-time", +] + [[package]] name = "tracing-serde" version = "0.2.0" diff --git a/Cargo.toml b/Cargo.toml index 9114e9f3..f91d86ef 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -72,3 +72,16 @@ backon = "1.5.2" op-revm = { version = "10.1.0", default-features = false } revm-context-interface = "10.2.0" alloy-signer-local = "1.0.36" + +opentelemetry = { version = "0.31.0", features = ["trace"] } +opentelemetry-otlp = { version = "0.31.0", features = [ + "http-proto", + "http-json", + "reqwest-client", + "trace", + "grpc-tonic", +] } +opentelemetry_sdk = { version = "0.31.0", features = ["rt-tokio"] } +tracing-opentelemetry = "0.32.0" +opentelemetry-datadog = { version = "0.19.0", features = ["reqwest-client"] } +opentelemetry-semantic-conventions = "0.31.0" diff --git a/crates/ingress-rpc/Cargo.toml b/crates/ingress-rpc/Cargo.toml index 13a581b3..9364bb21 100644 --- a/crates/ingress-rpc/Cargo.toml +++ b/crates/ingress-rpc/Cargo.toml @@ -35,3 +35,9 @@ op-revm.workspace = true revm-context-interface.workspace = true alloy-signer-local.workspace = true reth-optimism-evm.workspace = true +opentelemetry.workspace = true +opentelemetry-otlp.workspace = true +opentelemetry_sdk.workspace = true +tracing-opentelemetry.workspace = true +opentelemetry-datadog.workspace = true +opentelemetry-semantic-conventions.workspace = true diff --git a/crates/ingress-rpc/src/main.rs b/crates/ingress-rpc/src/main.rs index 6400bc02..e180b0b4 100644 --- a/crates/ingress-rpc/src/main.rs +++ b/crates/ingress-rpc/src/main.rs @@ -2,11 +2,20 @@ use alloy_provider::{ProviderBuilder, RootProvider}; use clap::Parser; use jsonrpsee::server::Server; use op_alloy_network::Optimism; +use opentelemetry::global; +use opentelemetry::trace::Tracer; +use opentelemetry::{InstrumentationScope, trace::TracerProvider}; +use opentelemetry_sdk::trace; +use opentelemetry_sdk::trace::{RandomIdGenerator, Sampler}; +use opentelemetry_semantic_conventions as semcov; use rdkafka::ClientConfig; use rdkafka::producer::FutureProducer; +use std::env; use std::fs; use std::net::IpAddr; use tracing::{info, warn}; +use tracing_subscriber::Layer; +use tracing_subscriber::filter::{LevelFilter, Targets}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use url::Url; @@ -57,6 +66,14 @@ struct Config { default_value = "10800" )] send_transaction_default_lifetime_seconds: u64, + + /// Enable tracing + #[arg(long, env = "TIPS_INGRESS_TRACING_ENABLED", default_value = "false")] + tracing_enabled: bool, + + /// Port for the OTLP endpoint + #[arg(long, env = "TIPS_INGRESS_TRACING_OTLP_PORT", default_value = "4317")] + tracing_otlp_port: u16, } #[tokio::main] @@ -80,18 +97,63 @@ async fn main() -> anyhow::Result<()> { } }; - tracing_subscriber::registry() + /*tracing_subscriber::registry() .with( tracing_subscriber::EnvFilter::try_from_default_env() .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(log_level.to_string())), ) .with(tracing_subscriber::fmt::layer()) .init(); + + let global_filter = Targets::new() + .with_default(LevelFilter::INFO) + .with_target(env!("CARGO_PKG_NAME"), LevelFilter::TRACE); + */ + + let log_filter = Targets::new() + .with_default(LevelFilter::INFO) + .with_target(env!("CARGO_PKG_NAME"), log_level); + + let dd_host = env::var("DD_AGENT_HOST").unwrap_or_else(|_| "localhost".to_string()); + let otlp_endpoint = format!("http://{}:{}", dd_host, config.tracing_otlp_port); + + let mut trace_config = trace::Config::default(); + trace_config.sampler = Box::new(Sampler::AlwaysOn); + trace_config.id_generator = Box::new(RandomIdGenerator::default()); + + let provider = opentelemetry_datadog::new_pipeline() + .with_service_name(env!("CARGO_PKG_NAME")) + .with_api_version(opentelemetry_datadog::ApiVersion::Version05) + .with_agent_endpoint(&otlp_endpoint) + .with_trace_config(trace_config) + .install_batch() + .unwrap(); + global::set_tracer_provider(provider.clone()); + + let scope = InstrumentationScope::builder("opentelemetry-datadog") + .with_version(env!("CARGO_PKG_VERSION")) + .with_schema_url(semcov::SCHEMA_URL) + .with_attributes(None) + .build(); + + let tracer = provider.tracer_with_scope(scope); + tracer.in_span("span_main", |_span| { + info!( + message = "Tracing enabled", + endpoint = %otlp_endpoint + ); + }); + tracing_subscriber::registry() + .with(tracing_opentelemetry::OpenTelemetryLayer::new(tracer)) + .with(tracing_subscriber::fmt::layer().with_filter(log_filter)) + .init(); + info!( message = "Starting ingress service", address = %config.address, port = config.port, - mempool_url = %config.mempool_url + mempool_url = %config.mempool_url, + endpoint = %otlp_endpoint ); let provider: RootProvider = ProviderBuilder::new() diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index a48841d4..e4e833d0 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -11,7 +11,7 @@ use op_alloy_consensus::OpTxEnvelope; use op_alloy_network::Optimism; use reth_rpc_eth_types::EthApiError; use std::time::{SystemTime, UNIX_EPOCH}; -use tracing::{info, warn}; +use tracing::{Instrument, info, span, warn}; use crate::queue::QueuePublisher; @@ -111,7 +111,9 @@ where // queue the bundle let sender = transaction.signer(); - if let Err(e) = self.queue.publish(&bundle, sender).await { + let span = + span!(tracing::Level::INFO, "span_publish", transaction = %transaction.tx_hash()); + if let Err(e) = self.queue.publish(&bundle, sender).instrument(span).await { warn!(message = "Failed to publish Queue::enqueue_bundle", sender = %sender, error = %e); } From 27e9ad247da2d076f47ebabb0f1db7e8105eadb8 Mon Sep 17 00:00:00 2001 From: William Law Date: Fri, 17 Oct 2025 14:21:23 -0400 Subject: [PATCH 02/10] shutdown span tracer --- crates/ingress-rpc/src/main.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/crates/ingress-rpc/src/main.rs b/crates/ingress-rpc/src/main.rs index e180b0b4..476b944e 100644 --- a/crates/ingress-rpc/src/main.rs +++ b/crates/ingress-rpc/src/main.rs @@ -156,7 +156,7 @@ async fn main() -> anyhow::Result<()> { endpoint = %otlp_endpoint ); - let provider: RootProvider = ProviderBuilder::new() + let op_provider: RootProvider = ProviderBuilder::new() .disable_recommended_fillers() .network::() .connect_http(config.mempool_url); @@ -168,7 +168,7 @@ async fn main() -> anyhow::Result<()> { let queue = KafkaQueuePublisher::new(queue_producer, config.ingress_topic); let service = IngressService::new( - provider, + op_provider, config.dual_write_mempool, queue, config.send_transaction_default_lifetime_seconds, @@ -185,6 +185,7 @@ async fn main() -> anyhow::Result<()> { ); handle.stopped().await; + let _ = provider.shutdown(); Ok(()) } From ca1fbbfc756b5bcae9ebbc56abd5cc40f06a581f Mon Sep 17 00:00:00 2001 From: William Law Date: Fri, 17 Oct 2025 14:45:45 -0400 Subject: [PATCH 03/10] fix span exporter again --- crates/ingress-rpc/src/main.rs | 92 +++++++++++++++++++++++----------- 1 file changed, 64 insertions(+), 28 deletions(-) diff --git a/crates/ingress-rpc/src/main.rs b/crates/ingress-rpc/src/main.rs index 476b944e..ea5080fc 100644 --- a/crates/ingress-rpc/src/main.rs +++ b/crates/ingress-rpc/src/main.rs @@ -121,32 +121,48 @@ async fn main() -> anyhow::Result<()> { trace_config.sampler = Box::new(Sampler::AlwaysOn); trace_config.id_generator = Box::new(RandomIdGenerator::default()); - let provider = opentelemetry_datadog::new_pipeline() - .with_service_name(env!("CARGO_PKG_NAME")) - .with_api_version(opentelemetry_datadog::ApiVersion::Version05) - .with_agent_endpoint(&otlp_endpoint) - .with_trace_config(trace_config) - .install_batch() - .unwrap(); - global::set_tracer_provider(provider.clone()); - - let scope = InstrumentationScope::builder("opentelemetry-datadog") - .with_version(env!("CARGO_PKG_VERSION")) - .with_schema_url(semcov::SCHEMA_URL) - .with_attributes(None) - .build(); - - let tracer = provider.tracer_with_scope(scope); - tracer.in_span("span_main", |_span| { - info!( - message = "Tracing enabled", - endpoint = %otlp_endpoint - ); - }); - tracing_subscriber::registry() - .with(tracing_opentelemetry::OpenTelemetryLayer::new(tracer)) - .with(tracing_subscriber::fmt::layer().with_filter(log_filter)) - .init(); + let provider = if config.tracing_enabled { + Some( + opentelemetry_datadog::new_pipeline() + .with_service_name(env!("CARGO_PKG_NAME")) + .with_api_version(opentelemetry_datadog::ApiVersion::Version05) + .with_agent_endpoint(&otlp_endpoint) + .with_trace_config(trace_config) + .install_batch() + .unwrap(), + ) + } else { + None + }; + + if let Some(ref provider) = provider { + global::set_tracer_provider(provider.clone()); + } + + if let Some(ref provider) = provider { + let scope = InstrumentationScope::builder("opentelemetry-datadog") + .with_version(env!("CARGO_PKG_VERSION")) + .with_schema_url(semcov::SCHEMA_URL) + .with_attributes(None) + .build(); + + let tracer = provider.tracer_with_scope(scope); + tracer.in_span("span_main", |_span| { + info!( + message = "Tracing enabled", + endpoint = %otlp_endpoint + ); + }); + + tracing_subscriber::registry() + .with(tracing_opentelemetry::OpenTelemetryLayer::new(tracer)) + .with(tracing_subscriber::fmt::layer().with_filter(log_filter)) + .init(); + } else { + tracing_subscriber::registry() + .with(tracing_subscriber::fmt::layer().with_filter(log_filter)) + .init(); + } info!( message = "Starting ingress service", @@ -184,8 +200,28 @@ async fn main() -> anyhow::Result<()> { address = %addr ); - handle.stopped().await; - let _ = provider.shutdown(); + // Set up graceful shutdown + tokio::select! { + _ = handle.stopped() => { + info!("Server stopped"); + } + _ = tokio::signal::ctrl_c() => { + info!("Received shutdown signal"); + } + } + + // Give time for any remaining spans to be processed + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + + // Shutdown tracer provider if it exists + if let Some(provider) = provider { + info!("Shutting down tracer provider"); + if let Err(e) = provider.shutdown() { + warn!("Error shutting down tracer provider: {}", e); + } + } + + info!("Ingress service shutdown complete"); Ok(()) } From 8e3838b56383a36e3be2841932ade2e35faec927 Mon Sep 17 00:00:00 2001 From: William Law Date: Fri, 17 Oct 2025 15:35:35 -0400 Subject: [PATCH 04/10] follow commonware --- Cargo.toml | 2 +- crates/ingress-rpc/src/main.rs | 155 ++++++++++++++++-------------- crates/ingress-rpc/src/service.rs | 7 ++ 3 files changed, 93 insertions(+), 71 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f91d86ef..dd288f8a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,7 +39,7 @@ op-alloy-consensus = { version = "0.20.0", features = ["k256"] } tokio = { version = "1.47.1", features = ["full"] } tracing = "0.1.41" -tracing-subscriber = { version = "0.3.20", features = ["env-filter"] } +tracing-subscriber = { version = "0.3.20", features = ["env-filter", "fmt", "json"] } anyhow = "1.0.99" clap = { version = "4.5.47", features = ["derive", "env"] } url = "2.5.7" diff --git a/crates/ingress-rpc/src/main.rs b/crates/ingress-rpc/src/main.rs index ea5080fc..b25835a1 100644 --- a/crates/ingress-rpc/src/main.rs +++ b/crates/ingress-rpc/src/main.rs @@ -3,20 +3,26 @@ use clap::Parser; use jsonrpsee::server::Server; use op_alloy_network::Optimism; use opentelemetry::global; -use opentelemetry::trace::Tracer; -use opentelemetry::{InstrumentationScope, trace::TracerProvider}; -use opentelemetry_sdk::trace; -use opentelemetry_sdk::trace::{RandomIdGenerator, Sampler}; -use opentelemetry_semantic_conventions as semcov; +//use opentelemetry::trace::Tracer; +//use opentelemetry::{InstrumentationScope, trace::TracerProvider}; +//use opentelemetry_sdk::trace; +use opentelemetry::trace::TracerProvider; +use opentelemetry_sdk::trace::BatchSpanProcessor; +use opentelemetry_sdk::trace::Sampler; +//use opentelemetry_semantic_conventions as semcov; +use opentelemetry_sdk::Resource; +use opentelemetry_sdk::trace::SdkTracerProvider; use rdkafka::ClientConfig; use rdkafka::producer::FutureProducer; use std::env; use std::fs; use std::net::IpAddr; use tracing::{info, warn}; -use tracing_subscriber::Layer; -use tracing_subscriber::filter::{LevelFilter, Targets}; -use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; +//use tracing_subscriber::Layer; +//use tracing_subscriber::filter::{LevelFilter, Targets}; +use opentelemetry_otlp::{SpanExporter, WithExportConfig}; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::{Layer, Registry}; use url::Url; mod queue; @@ -110,6 +116,8 @@ async fn main() -> anyhow::Result<()> { .with_target(env!("CARGO_PKG_NAME"), LevelFilter::TRACE); */ + /*global::set_text_map_propagator(opentelemetry_datadog::DatadogPropagator::default()); + let log_filter = Targets::new() .with_default(LevelFilter::INFO) .with_target(env!("CARGO_PKG_NAME"), log_level); @@ -121,48 +129,76 @@ async fn main() -> anyhow::Result<()> { trace_config.sampler = Box::new(Sampler::AlwaysOn); trace_config.id_generator = Box::new(RandomIdGenerator::default()); - let provider = if config.tracing_enabled { - Some( - opentelemetry_datadog::new_pipeline() - .with_service_name(env!("CARGO_PKG_NAME")) - .with_api_version(opentelemetry_datadog::ApiVersion::Version05) - .with_agent_endpoint(&otlp_endpoint) - .with_trace_config(trace_config) - .install_batch() - .unwrap(), - ) - } else { - None - }; + let provider = opentelemetry_datadog::new_pipeline() + .with_service_name(env!("CARGO_PKG_NAME")) + .with_api_version(opentelemetry_datadog::ApiVersion::Version05) + .with_agent_endpoint(&otlp_endpoint) + .with_trace_config(trace_config) + .install_batch()?; + + global::set_tracer_provider(provider.clone()); + + let scope = InstrumentationScope::builder(env!("CARGO_PKG_NAME")) + .with_version(env!("CARGO_PKG_VERSION")) + .with_schema_url(semcov::SCHEMA_URL) + .with_attributes(None) + .build(); + + let tracer = provider.tracer_with_scope(scope); + tracer.in_span("span_main", |_span| { + info!( + message = "Tracing enabled", + endpoint = %otlp_endpoint + ); + }); + + tracing_subscriber::registry() + .with(tracing_opentelemetry::OpenTelemetryLayer::new(tracer)) + .with(tracing_subscriber::fmt::layer().with_filter(log_filter)) + .init();*/ + + let filter = tracing_subscriber::EnvFilter::new(log_level.to_string()); + + let log_layer = tracing_subscriber::fmt::layer() + .with_line_number(true) + .with_thread_ids(true) + .with_file(true) + .with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE) + .json() + .boxed(); - if let Some(ref provider) = provider { - global::set_tracer_provider(provider.clone()); - } + let dd_host = env::var("DD_AGENT_HOST").unwrap_or_else(|_| "localhost".to_string()); + let otlp_endpoint = format!("http://{}:{}", dd_host, config.tracing_otlp_port); - if let Some(ref provider) = provider { - let scope = InstrumentationScope::builder("opentelemetry-datadog") - .with_version(env!("CARGO_PKG_VERSION")) - .with_schema_url(semcov::SCHEMA_URL) - .with_attributes(None) - .build(); - - let tracer = provider.tracer_with_scope(scope); - tracer.in_span("span_main", |_span| { - info!( - message = "Tracing enabled", - endpoint = %otlp_endpoint - ); - }); - - tracing_subscriber::registry() - .with(tracing_opentelemetry::OpenTelemetryLayer::new(tracer)) - .with(tracing_subscriber::fmt::layer().with_filter(log_filter)) - .init(); - } else { - tracing_subscriber::registry() - .with(tracing_subscriber::fmt::layer().with_filter(log_filter)) - .init(); - } + // https://github.com/commonwarexyz/monorepo/blob/27e6f73fce91fc46ef7170e928cbcf96cc635fea/runtime/src/tokio/tracing.rs#L10 + let exporter = SpanExporter::builder() + .with_http() + .with_endpoint(&otlp_endpoint) + .build()?; + + let batch_processor = BatchSpanProcessor::builder(exporter).build(); + + let resource = Resource::builder_empty() + .with_service_name(env!("CARGO_PKG_NAME")) + .build(); + + let tracer_provider = SdkTracerProvider::builder() + .with_span_processor(batch_processor) + .with_resource(resource) + .with_sampler(Sampler::AlwaysOn) + .build(); + + // Create the tracer and set it globally + let tracer = tracer_provider.tracer(env!("CARGO_PKG_NAME")); + global::set_tracer_provider(tracer_provider); + + let trace_layer = tracing_opentelemetry::layer().with_tracer(tracer); + + let register = Registry::default() + .with(filter) + .with(log_layer) + .with(trace_layer); + tracing::subscriber::set_global_default(register)?; info!( message = "Starting ingress service", @@ -200,28 +236,7 @@ async fn main() -> anyhow::Result<()> { address = %addr ); - // Set up graceful shutdown - tokio::select! { - _ = handle.stopped() => { - info!("Server stopped"); - } - _ = tokio::signal::ctrl_c() => { - info!("Received shutdown signal"); - } - } - - // Give time for any remaining spans to be processed - tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; - - // Shutdown tracer provider if it exists - if let Some(provider) = provider { - info!("Shutting down tracer provider"); - if let Err(e) = provider.shutdown() { - warn!("Error shutting down tracer provider: {}", e); - } - } - - info!("Ingress service shutdown complete"); + handle.stopped().await; Ok(()) } diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index e4e833d0..8eb8ae18 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -94,12 +94,18 @@ where .await?; validate_tx(account, &transaction, &data, &mut l1_block_info).await?; + let span = span!(tracing::Level::TRACE, "span_ethsendbundle", transaction = %transaction.tx_hash()); + let _enter = span.enter(); let expiry_timestamp = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_secs() + self.send_transaction_default_lifetime_seconds; + drop(_enter); + let span = + span!(tracing::Level::INFO, "span_ethsendbundle", transaction = %transaction.tx_hash()); + let _enter = span.enter(); let bundle = EthSendBundle { txs: vec![data.clone()], block_number: 0, @@ -108,6 +114,7 @@ where reverting_tx_hashes: vec![transaction.tx_hash()], ..Default::default() }; + drop(_enter); // queue the bundle let sender = transaction.signer(); From 33e20d537e35c40e902f1bbf7e49e945e1c19c9c Mon Sep 17 00:00:00 2001 From: William Law Date: Fri, 17 Oct 2025 17:47:30 -0400 Subject: [PATCH 05/10] try http client on commonware approach too --- Cargo.lock | 13 +++++++++++++ Cargo.toml | 1 + crates/ingress-rpc/Cargo.toml | 1 + crates/ingress-rpc/src/main.rs | 2 ++ 4 files changed, 17 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 037f3f20..b348ad91 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3568,6 +3568,15 @@ dependencies = [ "zeroize", ] +[[package]] +name = "encoding_rs" +version = "0.8.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3" +dependencies = [ + "cfg-if", +] + [[package]] name = "endian-type" version = "0.1.2" @@ -7695,9 +7704,11 @@ checksum = "d429f34c8092b2d42c7c93cec323bb4adeb7c67698f70839adec842ec10c7ceb" dependencies = [ "base64 0.22.1", "bytes", + "encoding_rs", "futures-channel", "futures-core", "futures-util", + "h2 0.4.12", "http 1.3.1", "http-body 1.0.1", "http-body-util", @@ -7707,6 +7718,7 @@ dependencies = [ "hyper-util", "js-sys", "log", + "mime", "native-tls", "percent-encoding", "pin-project-lite", @@ -12666,6 +12678,7 @@ dependencies = [ "opentelemetry-semantic-conventions", "opentelemetry_sdk 0.31.0", "rdkafka", + "reqwest", "reth-optimism-evm", "reth-rpc-eth-types", "revm-context-interface", diff --git a/Cargo.toml b/Cargo.toml index dd288f8a..fc6aa435 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,3 +85,4 @@ opentelemetry_sdk = { version = "0.31.0", features = ["rt-tokio"] } tracing-opentelemetry = "0.32.0" opentelemetry-datadog = { version = "0.19.0", features = ["reqwest-client"] } opentelemetry-semantic-conventions = "0.31.0" +reqwest = { version = "0.12", features = ["json"] } diff --git a/crates/ingress-rpc/Cargo.toml b/crates/ingress-rpc/Cargo.toml index 9364bb21..851dc984 100644 --- a/crates/ingress-rpc/Cargo.toml +++ b/crates/ingress-rpc/Cargo.toml @@ -41,3 +41,4 @@ opentelemetry_sdk.workspace = true tracing-opentelemetry.workspace = true opentelemetry-datadog.workspace = true opentelemetry-semantic-conventions.workspace = true +reqwest.workspace = true diff --git a/crates/ingress-rpc/src/main.rs b/crates/ingress-rpc/src/main.rs index b25835a1..6e82c34e 100644 --- a/crates/ingress-rpc/src/main.rs +++ b/crates/ingress-rpc/src/main.rs @@ -7,6 +7,7 @@ use opentelemetry::global; //use opentelemetry::{InstrumentationScope, trace::TracerProvider}; //use opentelemetry_sdk::trace; use opentelemetry::trace::TracerProvider; +use opentelemetry_otlp::WithHttpConfig; use opentelemetry_sdk::trace::BatchSpanProcessor; use opentelemetry_sdk::trace::Sampler; //use opentelemetry_semantic_conventions as semcov; @@ -173,6 +174,7 @@ async fn main() -> anyhow::Result<()> { // https://github.com/commonwarexyz/monorepo/blob/27e6f73fce91fc46ef7170e928cbcf96cc635fea/runtime/src/tokio/tracing.rs#L10 let exporter = SpanExporter::builder() .with_http() + .with_http_client(reqwest::Client::new()) .with_endpoint(&otlp_endpoint) .build()?; From cd81a8dc82b9d6c34cbb1491e64ca0dd8983773e Mon Sep 17 00:00:00 2001 From: William Law Date: Sat, 18 Oct 2025 16:37:47 -0400 Subject: [PATCH 06/10] try simplespanexporter --- Cargo.lock | 29 ----------------------------- Cargo.toml | 2 -- crates/ingress-rpc/Cargo.toml | 2 -- crates/ingress-rpc/src/main.rs | 9 ++++++--- 4 files changed, 6 insertions(+), 36 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b348ad91..52a94c1e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6591,27 +6591,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "opentelemetry-datadog" -version = "0.19.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a6b2d4db32343691eb945e6153e5a4bd494dbf9d931d5bf7d1d7f59bee156d0" -dependencies = [ - "ahash", - "http 1.3.1", - "indexmap 2.11.4", - "itoa", - "opentelemetry 0.31.0", - "opentelemetry-http 0.31.0", - "opentelemetry-semantic-conventions", - "opentelemetry_sdk 0.31.0", - "reqwest", - "rmp", - "ryu", - "thiserror 2.0.17", - "url", -] - [[package]] name = "opentelemetry-http" version = "0.28.0" @@ -6713,12 +6692,6 @@ dependencies = [ "tonic-prost", ] -[[package]] -name = "opentelemetry-semantic-conventions" -version = "0.31.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e62e29dfe041afb8ed2a6c9737ab57db4907285d999ef8ad3a59092a36bdc846" - [[package]] name = "opentelemetry_sdk" version = "0.28.0" @@ -12673,9 +12646,7 @@ dependencies = [ "op-alloy-network", "op-revm", "opentelemetry 0.31.0", - "opentelemetry-datadog", "opentelemetry-otlp 0.31.0", - "opentelemetry-semantic-conventions", "opentelemetry_sdk 0.31.0", "rdkafka", "reqwest", diff --git a/Cargo.toml b/Cargo.toml index fc6aa435..b3bc52a8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -83,6 +83,4 @@ opentelemetry-otlp = { version = "0.31.0", features = [ ] } opentelemetry_sdk = { version = "0.31.0", features = ["rt-tokio"] } tracing-opentelemetry = "0.32.0" -opentelemetry-datadog = { version = "0.19.0", features = ["reqwest-client"] } -opentelemetry-semantic-conventions = "0.31.0" reqwest = { version = "0.12", features = ["json"] } diff --git a/crates/ingress-rpc/Cargo.toml b/crates/ingress-rpc/Cargo.toml index 851dc984..65686dd9 100644 --- a/crates/ingress-rpc/Cargo.toml +++ b/crates/ingress-rpc/Cargo.toml @@ -39,6 +39,4 @@ opentelemetry.workspace = true opentelemetry-otlp.workspace = true opentelemetry_sdk.workspace = true tracing-opentelemetry.workspace = true -opentelemetry-datadog.workspace = true -opentelemetry-semantic-conventions.workspace = true reqwest.workspace = true diff --git a/crates/ingress-rpc/src/main.rs b/crates/ingress-rpc/src/main.rs index 6e82c34e..513b40f4 100644 --- a/crates/ingress-rpc/src/main.rs +++ b/crates/ingress-rpc/src/main.rs @@ -8,8 +8,9 @@ use opentelemetry::global; //use opentelemetry_sdk::trace; use opentelemetry::trace::TracerProvider; use opentelemetry_otlp::WithHttpConfig; -use opentelemetry_sdk::trace::BatchSpanProcessor; +//use opentelemetry_sdk::trace::BatchSpanProcessor; use opentelemetry_sdk::trace::Sampler; +use opentelemetry_sdk::trace::SimpleSpanProcessor; //use opentelemetry_semantic_conventions as semcov; use opentelemetry_sdk::Resource; use opentelemetry_sdk::trace::SdkTracerProvider; @@ -174,11 +175,13 @@ async fn main() -> anyhow::Result<()> { // https://github.com/commonwarexyz/monorepo/blob/27e6f73fce91fc46ef7170e928cbcf96cc635fea/runtime/src/tokio/tracing.rs#L10 let exporter = SpanExporter::builder() .with_http() - .with_http_client(reqwest::Client::new()) + .with_http_client(reqwest::blocking::Client::new()) + //.with_tonic() .with_endpoint(&otlp_endpoint) .build()?; - let batch_processor = BatchSpanProcessor::builder(exporter).build(); + //let batch_processor = BatchSpanProcessor::builder(exporter).build(); + let batch_processor = SimpleSpanProcessor::new(exporter); let resource = Resource::builder_empty() .with_service_name(env!("CARGO_PKG_NAME")) From cb921ae306578cb6c1cfcf79e86f13601af9f3be Mon Sep 17 00:00:00 2001 From: William Law Date: Sat, 18 Oct 2025 16:58:44 -0400 Subject: [PATCH 07/10] try grpc --- crates/ingress-rpc/src/main.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/ingress-rpc/src/main.rs b/crates/ingress-rpc/src/main.rs index 513b40f4..6f86211d 100644 --- a/crates/ingress-rpc/src/main.rs +++ b/crates/ingress-rpc/src/main.rs @@ -7,7 +7,7 @@ use opentelemetry::global; //use opentelemetry::{InstrumentationScope, trace::TracerProvider}; //use opentelemetry_sdk::trace; use opentelemetry::trace::TracerProvider; -use opentelemetry_otlp::WithHttpConfig; +//use opentelemetry_otlp::WithHttpConfig; //use opentelemetry_sdk::trace::BatchSpanProcessor; use opentelemetry_sdk::trace::Sampler; use opentelemetry_sdk::trace::SimpleSpanProcessor; @@ -174,9 +174,9 @@ async fn main() -> anyhow::Result<()> { // https://github.com/commonwarexyz/monorepo/blob/27e6f73fce91fc46ef7170e928cbcf96cc635fea/runtime/src/tokio/tracing.rs#L10 let exporter = SpanExporter::builder() - .with_http() - .with_http_client(reqwest::blocking::Client::new()) - //.with_tonic() + //.with_http() + //.with_http_client(reqwest::blocking::Client::new()) + .with_tonic() .with_endpoint(&otlp_endpoint) .build()?; From 6a482bdadc0443b119537c5e9847f550b0775ef2 Mon Sep 17 00:00:00 2001 From: William Law Date: Sun, 19 Oct 2025 11:15:25 -0400 Subject: [PATCH 08/10] more debugging --- crates/ingress-rpc/src/main.rs | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/crates/ingress-rpc/src/main.rs b/crates/ingress-rpc/src/main.rs index 6f86211d..9fb0220c 100644 --- a/crates/ingress-rpc/src/main.rs +++ b/crates/ingress-rpc/src/main.rs @@ -19,12 +19,13 @@ use rdkafka::producer::FutureProducer; use std::env; use std::fs; use std::net::IpAddr; -use tracing::{info, warn}; +use tracing::{info, span, warn}; //use tracing_subscriber::Layer; //use tracing_subscriber::filter::{LevelFilter, Targets}; use opentelemetry_otlp::{SpanExporter, WithExportConfig}; +use tracing_subscriber::Layer; use tracing_subscriber::layer::SubscriberExt; -use tracing_subscriber::{Layer, Registry}; +use tracing_subscriber::util::SubscriberInitExt; use url::Url; mod queue; @@ -199,11 +200,16 @@ async fn main() -> anyhow::Result<()> { let trace_layer = tracing_opentelemetry::layer().with_tracer(tracer); - let register = Registry::default() + /*let register = Registry::default() + .with(filter) + .with(log_layer) + .with(trace_layer);*/ + tracing_subscriber::registry() .with(filter) .with(log_layer) - .with(trace_layer); - tracing::subscriber::set_global_default(register)?; + .with(trace_layer) + .init(); + //tracing::subscriber::set_global_default(register)?; info!( message = "Starting ingress service", @@ -213,10 +219,13 @@ async fn main() -> anyhow::Result<()> { endpoint = %otlp_endpoint ); + let span = span!(tracing::Level::TRACE, "span_main"); + let _enter = span.enter(); let op_provider: RootProvider = ProviderBuilder::new() .disable_recommended_fillers() .network::() .connect_http(config.mempool_url); + drop(_enter); let client_config = load_kafka_config_from_file(&config.ingress_kafka_properties)?; From 865564fb944c09ba1a000229109a971000227bfc Mon Sep 17 00:00:00 2001 From: William Law Date: Sun, 19 Oct 2025 13:45:05 -0400 Subject: [PATCH 09/10] shutdown again (one more time) --- crates/ingress-rpc/src/main.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/crates/ingress-rpc/src/main.rs b/crates/ingress-rpc/src/main.rs index 9fb0220c..906552a0 100644 --- a/crates/ingress-rpc/src/main.rs +++ b/crates/ingress-rpc/src/main.rs @@ -19,7 +19,7 @@ use rdkafka::producer::FutureProducer; use std::env; use std::fs; use std::net::IpAddr; -use tracing::{info, span, warn}; +use tracing::{error, info, span, warn}; //use tracing_subscriber::Layer; //use tracing_subscriber::filter::{LevelFilter, Targets}; use opentelemetry_otlp::{SpanExporter, WithExportConfig}; @@ -196,7 +196,7 @@ async fn main() -> anyhow::Result<()> { // Create the tracer and set it globally let tracer = tracer_provider.tracer(env!("CARGO_PKG_NAME")); - global::set_tracer_provider(tracer_provider); + global::set_tracer_provider(tracer_provider.clone()); let trace_layer = tracing_opentelemetry::layer().with_tracer(tracer); @@ -251,6 +251,10 @@ async fn main() -> anyhow::Result<()> { ); handle.stopped().await; + if let Err(e) = tracer_provider.shutdown() { + error!(error = %e, "Failed to shutdown tracer provider"); + return Err(e.into()); + }; Ok(()) } From ed9c1349a80505d0a4a43c5361ac8a218cd32713 Mon Sep 17 00:00:00 2001 From: William Law Date: Sun, 19 Oct 2025 14:08:38 -0400 Subject: [PATCH 10/10] with just http --- crates/ingress-rpc/src/main.rs | 30 +++++++++++++++--------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/crates/ingress-rpc/src/main.rs b/crates/ingress-rpc/src/main.rs index 906552a0..3bbb4220 100644 --- a/crates/ingress-rpc/src/main.rs +++ b/crates/ingress-rpc/src/main.rs @@ -8,9 +8,9 @@ use opentelemetry::global; //use opentelemetry_sdk::trace; use opentelemetry::trace::TracerProvider; //use opentelemetry_otlp::WithHttpConfig; -//use opentelemetry_sdk::trace::BatchSpanProcessor; +use opentelemetry_sdk::trace::BatchSpanProcessor; use opentelemetry_sdk::trace::Sampler; -use opentelemetry_sdk::trace::SimpleSpanProcessor; +//use opentelemetry_sdk::trace::SimpleSpanProcessor; //use opentelemetry_semantic_conventions as semcov; use opentelemetry_sdk::Resource; use opentelemetry_sdk::trace::SdkTracerProvider; @@ -25,7 +25,7 @@ use tracing::{error, info, span, warn}; use opentelemetry_otlp::{SpanExporter, WithExportConfig}; use tracing_subscriber::Layer; use tracing_subscriber::layer::SubscriberExt; -use tracing_subscriber::util::SubscriberInitExt; +//use tracing_subscriber::util::SubscriberInitExt; use url::Url; mod queue; @@ -175,14 +175,14 @@ async fn main() -> anyhow::Result<()> { // https://github.com/commonwarexyz/monorepo/blob/27e6f73fce91fc46ef7170e928cbcf96cc635fea/runtime/src/tokio/tracing.rs#L10 let exporter = SpanExporter::builder() - //.with_http() + .with_http() //.with_http_client(reqwest::blocking::Client::new()) - .with_tonic() + //.with_tonic() .with_endpoint(&otlp_endpoint) .build()?; - //let batch_processor = BatchSpanProcessor::builder(exporter).build(); - let batch_processor = SimpleSpanProcessor::new(exporter); + let batch_processor = BatchSpanProcessor::builder(exporter).build(); + //let batch_processor = SimpleSpanProcessor::new(exporter); let resource = Resource::builder_empty() .with_service_name(env!("CARGO_PKG_NAME")) @@ -200,16 +200,16 @@ async fn main() -> anyhow::Result<()> { let trace_layer = tracing_opentelemetry::layer().with_tracer(tracer); - /*let register = Registry::default() - .with(filter) - .with(log_layer) - .with(trace_layer);*/ - tracing_subscriber::registry() + let register = tracing_subscriber::registry() .with(filter) .with(log_layer) - .with(trace_layer) - .init(); - //tracing::subscriber::set_global_default(register)?; + .with(trace_layer); + /*tracing_subscriber::registry() + .with(filter) + .with(log_layer) + .with(trace_layer) + .init();*/ + tracing::subscriber::set_global_default(register)?; info!( message = "Starting ingress service",