diff --git a/Cargo.lock b/Cargo.lock index 4d6f59f5..52a94c1e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3568,6 +3568,15 @@ dependencies = [ "zeroize", ] +[[package]] +name = "encoding_rs" +version = "0.8.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3" +dependencies = [ + "cfg-if", +] + [[package]] name = "endian-type" version = "0.1.2" @@ -6568,6 +6577,20 @@ dependencies = [ "tracing", ] +[[package]] +name = "opentelemetry" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b84bcd6ae87133e903af7ef497404dda70c60d0ea14895fc8a5e6722754fc2a0" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "pin-project-lite", + "thiserror 2.0.17", + "tracing", +] + [[package]] name = "opentelemetry-http" version = "0.28.0" @@ -6577,11 +6600,24 @@ dependencies = [ "async-trait", "bytes", "http 1.3.1", - "opentelemetry", + "opentelemetry 0.28.0", "reqwest", "tracing", ] +[[package]] +name = "opentelemetry-http" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7a6d09a73194e6b66df7c8f1b680f156d916a1a942abf2de06823dd02b7855d" +dependencies = [ + "async-trait", + "bytes", + "http 1.3.1", + "opentelemetry 0.31.0", + "reqwest", +] + [[package]] name = "opentelemetry-otlp" version = "0.28.0" @@ -6591,16 +6627,36 @@ dependencies = [ "async-trait", "futures-core", "http 1.3.1", - "opentelemetry", - "opentelemetry-http", - "opentelemetry-proto", - "opentelemetry_sdk", - "prost", + "opentelemetry 0.28.0", + "opentelemetry-http 0.28.0", + "opentelemetry-proto 0.28.0", + "opentelemetry_sdk 0.28.0", + "prost 0.13.5", + "reqwest", + "serde_json", + "thiserror 2.0.17", + "tokio", + "tonic 0.12.3", + "tracing", +] + +[[package]] +name = "opentelemetry-otlp" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a2366db2dca4d2ad033cad11e6ee42844fd727007af5ad04a1730f4cb8163bf" +dependencies = [ + "http 1.3.1", + "opentelemetry 0.31.0", + "opentelemetry-http 0.31.0", + "opentelemetry-proto 0.31.0", + "opentelemetry_sdk 0.31.0", + "prost 0.14.1", "reqwest", "serde_json", "thiserror 2.0.17", "tokio", - "tonic", + "tonic 0.14.2", "tracing", ] @@ -6612,11 +6668,28 @@ checksum = "56f8870d3024727e99212eb3bb1762ec16e255e3e6f58eeb3dc8db1aa226746d" dependencies = [ "base64 0.22.1", "hex", - "opentelemetry", - "opentelemetry_sdk", - "prost", + "opentelemetry 0.28.0", + "opentelemetry_sdk 0.28.0", + "prost 0.13.5", "serde", - "tonic", + "tonic 0.12.3", +] + +[[package]] +name = "opentelemetry-proto" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7175df06de5eaee9909d4805a3d07e28bb752c34cab57fa9cff549da596b30f" +dependencies = [ + "base64 0.22.1", + "const-hex", + "opentelemetry 0.31.0", + "opentelemetry_sdk 0.31.0", + "prost 0.14.1", + "serde", + "serde_json", + "tonic 0.14.2", + "tonic-prost", ] [[package]] @@ -6630,7 +6703,7 @@ dependencies = [ "futures-executor", "futures-util", "glob", - "opentelemetry", + "opentelemetry 0.28.0", "percent-encoding", "rand 0.8.5", "serde_json", @@ -6640,6 +6713,23 @@ dependencies = [ "tracing", ] +[[package]] +name = "opentelemetry_sdk" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e14ae4f5991976fd48df6d843de219ca6d31b01daaab2dad5af2badeded372bd" +dependencies = [ + "futures-channel", + "futures-executor", + "futures-util", + "opentelemetry 0.31.0", + "percent-encoding", + "rand 0.9.2", + "thiserror 2.0.17", + "tokio", + "tokio-stream", +] + [[package]] name = "option-ext" version = "0.2.0" @@ -7141,7 +7231,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" dependencies = [ "bytes", - "prost-derive", + "prost-derive 0.13.5", +] + +[[package]] +name = "prost" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7231bd9b3d3d33c86b58adbac74b5ec0ad9f496b19d22801d773636feaa95f3d" +dependencies = [ + "bytes", + "prost-derive 0.14.1", ] [[package]] @@ -7157,6 +7257,19 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "prost-derive" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9120690fafc389a67ba3803df527d0ec9cbbc9cc45e4cc20b332996dfb672425" +dependencies = [ + "anyhow", + "itertools 0.14.0", + "proc-macro2", + "quote", + "syn 2.0.106", +] + [[package]] name = "pulldown-cmark" version = "0.9.6" @@ -7564,9 +7677,11 @@ checksum = "d429f34c8092b2d42c7c93cec323bb4adeb7c67698f70839adec842ec10c7ceb" dependencies = [ "base64 0.22.1", "bytes", + "encoding_rs", "futures-channel", "futures-core", "futures-util", + "h2 0.4.12", "http 1.3.1", "http-body 1.0.1", "http-body-util", @@ -7576,6 +7691,7 @@ dependencies = [ "hyper-util", "js-sys", "log", + "mime", "native-tls", "percent-encoding", "pin-project-lite", @@ -10894,9 +11010,9 @@ dependencies = [ "metrics-util", "moka", "op-alloy-rpc-types-engine", - "opentelemetry", - "opentelemetry-otlp", - "opentelemetry_sdk", + "opentelemetry 0.28.0", + "opentelemetry-otlp 0.28.0", + "opentelemetry_sdk 0.28.0", "parking_lot", "paste", "reth-optimism-payload-builder", @@ -10912,7 +11028,7 @@ dependencies = [ "tower 0.5.2", "tower-http", "tracing", - "tracing-opentelemetry", + "tracing-opentelemetry 0.29.0", "tracing-subscriber 0.3.20", "url", "vergen", @@ -12529,13 +12645,18 @@ dependencies = [ "op-alloy-consensus", "op-alloy-network", "op-revm", + "opentelemetry 0.31.0", + "opentelemetry-otlp 0.31.0", + "opentelemetry_sdk 0.31.0", "rdkafka", + "reqwest", "reth-optimism-evm", "reth-rpc-eth-types", "revm-context-interface", "serde_json", "tokio", "tracing", + "tracing-opentelemetry 0.32.0", "tracing-subscriber 0.3.20", "url", ] @@ -12798,7 +12919,7 @@ dependencies = [ "hyper-util", "percent-encoding", "pin-project", - "prost", + "prost 0.13.5", "socket2 0.5.10", "tokio", "tokio-stream", @@ -12808,6 +12929,43 @@ dependencies = [ "tracing", ] +[[package]] +name = "tonic" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb7613188ce9f7df5bfe185db26c5814347d110db17920415cf2fbcad85e7203" +dependencies = [ + "async-trait", + "base64 0.22.1", + "bytes", + "http 1.3.1", + "http-body 1.0.1", + "http-body-util", + "hyper 1.7.0", + "hyper-timeout", + "hyper-util", + "percent-encoding", + "pin-project", + "sync_wrapper", + "tokio", + "tokio-stream", + "tower 0.5.2", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tonic-prost" +version = "0.14.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "66bd50ad6ce1252d87ef024b3d64fe4c3cf54a86fb9ef4c631fdd0ded7aeaa67" +dependencies = [ + "bytes", + "prost 0.14.1", + "tonic 0.14.2", +] + [[package]] name = "tower" version = "0.4.13" @@ -12988,9 +13146,28 @@ checksum = "721f2d2569dce9f3dfbbddee5906941e953bfcdf736a62da3377f5751650cc36" dependencies = [ "js-sys", "once_cell", - "opentelemetry", - "opentelemetry_sdk", + "opentelemetry 0.28.0", + "opentelemetry_sdk 0.28.0", + "smallvec", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber 0.3.20", + "web-time", +] + +[[package]] +name = "tracing-opentelemetry" +version = "0.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e6e5658463dd88089aba75c7791e1d3120633b1bfde22478b28f625a9bb1b8e" +dependencies = [ + "js-sys", + "opentelemetry 0.31.0", + "opentelemetry_sdk 0.31.0", + "rustversion", "smallvec", + "thiserror 2.0.17", "tracing", "tracing-core", "tracing-log", diff --git a/Cargo.toml b/Cargo.toml index 9114e9f3..b3bc52a8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,7 +39,7 @@ op-alloy-consensus = { version = "0.20.0", features = ["k256"] } tokio = { version = "1.47.1", features = ["full"] } tracing = "0.1.41" -tracing-subscriber = { version = "0.3.20", features = ["env-filter"] } +tracing-subscriber = { version = "0.3.20", features = ["env-filter", "fmt", "json"] } anyhow = "1.0.99" clap = { version = "4.5.47", features = ["derive", "env"] } url = "2.5.7" @@ -72,3 +72,15 @@ backon = "1.5.2" op-revm = { version = "10.1.0", default-features = false } revm-context-interface = "10.2.0" alloy-signer-local = "1.0.36" + +opentelemetry = { version = "0.31.0", features = ["trace"] } +opentelemetry-otlp = { version = "0.31.0", features = [ + "http-proto", + "http-json", + "reqwest-client", + "trace", + "grpc-tonic", +] } +opentelemetry_sdk = { version = "0.31.0", features = ["rt-tokio"] } +tracing-opentelemetry = "0.32.0" +reqwest = { version = "0.12", features = ["json"] } diff --git a/crates/ingress-rpc/Cargo.toml b/crates/ingress-rpc/Cargo.toml index 13a581b3..65686dd9 100644 --- a/crates/ingress-rpc/Cargo.toml +++ b/crates/ingress-rpc/Cargo.toml @@ -35,3 +35,8 @@ op-revm.workspace = true revm-context-interface.workspace = true alloy-signer-local.workspace = true reth-optimism-evm.workspace = true +opentelemetry.workspace = true +opentelemetry-otlp.workspace = true +opentelemetry_sdk.workspace = true +tracing-opentelemetry.workspace = true +reqwest.workspace = true diff --git a/crates/ingress-rpc/src/main.rs b/crates/ingress-rpc/src/main.rs index 6400bc02..3bbb4220 100644 --- a/crates/ingress-rpc/src/main.rs +++ b/crates/ingress-rpc/src/main.rs @@ -2,12 +2,30 @@ use alloy_provider::{ProviderBuilder, RootProvider}; use clap::Parser; use jsonrpsee::server::Server; use op_alloy_network::Optimism; +use opentelemetry::global; +//use opentelemetry::trace::Tracer; +//use opentelemetry::{InstrumentationScope, trace::TracerProvider}; +//use opentelemetry_sdk::trace; +use opentelemetry::trace::TracerProvider; +//use opentelemetry_otlp::WithHttpConfig; +use opentelemetry_sdk::trace::BatchSpanProcessor; +use opentelemetry_sdk::trace::Sampler; +//use opentelemetry_sdk::trace::SimpleSpanProcessor; +//use opentelemetry_semantic_conventions as semcov; +use opentelemetry_sdk::Resource; +use opentelemetry_sdk::trace::SdkTracerProvider; use rdkafka::ClientConfig; use rdkafka::producer::FutureProducer; +use std::env; use std::fs; use std::net::IpAddr; -use tracing::{info, warn}; -use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; +use tracing::{error, info, span, warn}; +//use tracing_subscriber::Layer; +//use tracing_subscriber::filter::{LevelFilter, Targets}; +use opentelemetry_otlp::{SpanExporter, WithExportConfig}; +use tracing_subscriber::Layer; +use tracing_subscriber::layer::SubscriberExt; +//use tracing_subscriber::util::SubscriberInitExt; use url::Url; mod queue; @@ -57,6 +75,14 @@ struct Config { default_value = "10800" )] send_transaction_default_lifetime_seconds: u64, + + /// Enable tracing + #[arg(long, env = "TIPS_INGRESS_TRACING_ENABLED", default_value = "false")] + tracing_enabled: bool, + + /// Port for the OTLP endpoint + #[arg(long, env = "TIPS_INGRESS_TRACING_OTLP_PORT", default_value = "4317")] + tracing_otlp_port: u16, } #[tokio::main] @@ -80,24 +106,126 @@ async fn main() -> anyhow::Result<()> { } }; - tracing_subscriber::registry() + /*tracing_subscriber::registry() .with( tracing_subscriber::EnvFilter::try_from_default_env() .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new(log_level.to_string())), ) .with(tracing_subscriber::fmt::layer()) .init(); + + let global_filter = Targets::new() + .with_default(LevelFilter::INFO) + .with_target(env!("CARGO_PKG_NAME"), LevelFilter::TRACE); + */ + + /*global::set_text_map_propagator(opentelemetry_datadog::DatadogPropagator::default()); + + let log_filter = Targets::new() + .with_default(LevelFilter::INFO) + .with_target(env!("CARGO_PKG_NAME"), log_level); + + let dd_host = env::var("DD_AGENT_HOST").unwrap_or_else(|_| "localhost".to_string()); + let otlp_endpoint = format!("http://{}:{}", dd_host, config.tracing_otlp_port); + + let mut trace_config = trace::Config::default(); + trace_config.sampler = Box::new(Sampler::AlwaysOn); + trace_config.id_generator = Box::new(RandomIdGenerator::default()); + + let provider = opentelemetry_datadog::new_pipeline() + .with_service_name(env!("CARGO_PKG_NAME")) + .with_api_version(opentelemetry_datadog::ApiVersion::Version05) + .with_agent_endpoint(&otlp_endpoint) + .with_trace_config(trace_config) + .install_batch()?; + + global::set_tracer_provider(provider.clone()); + + let scope = InstrumentationScope::builder(env!("CARGO_PKG_NAME")) + .with_version(env!("CARGO_PKG_VERSION")) + .with_schema_url(semcov::SCHEMA_URL) + .with_attributes(None) + .build(); + + let tracer = provider.tracer_with_scope(scope); + tracer.in_span("span_main", |_span| { + info!( + message = "Tracing enabled", + endpoint = %otlp_endpoint + ); + }); + + tracing_subscriber::registry() + .with(tracing_opentelemetry::OpenTelemetryLayer::new(tracer)) + .with(tracing_subscriber::fmt::layer().with_filter(log_filter)) + .init();*/ + + let filter = tracing_subscriber::EnvFilter::new(log_level.to_string()); + + let log_layer = tracing_subscriber::fmt::layer() + .with_line_number(true) + .with_thread_ids(true) + .with_file(true) + .with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE) + .json() + .boxed(); + + let dd_host = env::var("DD_AGENT_HOST").unwrap_or_else(|_| "localhost".to_string()); + let otlp_endpoint = format!("http://{}:{}", dd_host, config.tracing_otlp_port); + + // https://github.com/commonwarexyz/monorepo/blob/27e6f73fce91fc46ef7170e928cbcf96cc635fea/runtime/src/tokio/tracing.rs#L10 + let exporter = SpanExporter::builder() + .with_http() + //.with_http_client(reqwest::blocking::Client::new()) + //.with_tonic() + .with_endpoint(&otlp_endpoint) + .build()?; + + let batch_processor = BatchSpanProcessor::builder(exporter).build(); + //let batch_processor = SimpleSpanProcessor::new(exporter); + + let resource = Resource::builder_empty() + .with_service_name(env!("CARGO_PKG_NAME")) + .build(); + + let tracer_provider = SdkTracerProvider::builder() + .with_span_processor(batch_processor) + .with_resource(resource) + .with_sampler(Sampler::AlwaysOn) + .build(); + + // Create the tracer and set it globally + let tracer = tracer_provider.tracer(env!("CARGO_PKG_NAME")); + global::set_tracer_provider(tracer_provider.clone()); + + let trace_layer = tracing_opentelemetry::layer().with_tracer(tracer); + + let register = tracing_subscriber::registry() + .with(filter) + .with(log_layer) + .with(trace_layer); + /*tracing_subscriber::registry() + .with(filter) + .with(log_layer) + .with(trace_layer) + .init();*/ + tracing::subscriber::set_global_default(register)?; + info!( message = "Starting ingress service", address = %config.address, port = config.port, - mempool_url = %config.mempool_url + mempool_url = %config.mempool_url, + endpoint = %otlp_endpoint ); - let provider: RootProvider = ProviderBuilder::new() + let span = span!(tracing::Level::TRACE, "span_main"); + let _enter = span.enter(); + let op_provider: RootProvider = ProviderBuilder::new() .disable_recommended_fillers() .network::() .connect_http(config.mempool_url); + drop(_enter); let client_config = load_kafka_config_from_file(&config.ingress_kafka_properties)?; @@ -106,7 +234,7 @@ async fn main() -> anyhow::Result<()> { let queue = KafkaQueuePublisher::new(queue_producer, config.ingress_topic); let service = IngressService::new( - provider, + op_provider, config.dual_write_mempool, queue, config.send_transaction_default_lifetime_seconds, @@ -123,6 +251,10 @@ async fn main() -> anyhow::Result<()> { ); handle.stopped().await; + if let Err(e) = tracer_provider.shutdown() { + error!(error = %e, "Failed to shutdown tracer provider"); + return Err(e.into()); + }; Ok(()) } diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index a48841d4..8eb8ae18 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -11,7 +11,7 @@ use op_alloy_consensus::OpTxEnvelope; use op_alloy_network::Optimism; use reth_rpc_eth_types::EthApiError; use std::time::{SystemTime, UNIX_EPOCH}; -use tracing::{info, warn}; +use tracing::{Instrument, info, span, warn}; use crate::queue::QueuePublisher; @@ -94,12 +94,18 @@ where .await?; validate_tx(account, &transaction, &data, &mut l1_block_info).await?; + let span = span!(tracing::Level::TRACE, "span_ethsendbundle", transaction = %transaction.tx_hash()); + let _enter = span.enter(); let expiry_timestamp = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_secs() + self.send_transaction_default_lifetime_seconds; + drop(_enter); + let span = + span!(tracing::Level::INFO, "span_ethsendbundle", transaction = %transaction.tx_hash()); + let _enter = span.enter(); let bundle = EthSendBundle { txs: vec![data.clone()], block_number: 0, @@ -108,10 +114,13 @@ where reverting_tx_hashes: vec![transaction.tx_hash()], ..Default::default() }; + drop(_enter); // queue the bundle let sender = transaction.signer(); - if let Err(e) = self.queue.publish(&bundle, sender).await { + let span = + span!(tracing::Level::INFO, "span_publish", transaction = %transaction.tx_hash()); + if let Err(e) = self.queue.publish(&bundle, sender).instrument(span).await { warn!(message = "Failed to publish Queue::enqueue_bundle", sender = %sender, error = %e); }