Skip to content

Commit 8e3838b

Browse files
committed
follow commonware
1 parent ca1fbbf commit 8e3838b

File tree

3 files changed

+93
-71
lines changed

3 files changed

+93
-71
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ op-alloy-consensus = { version = "0.20.0", features = ["k256"] }
3939

4040
tokio = { version = "1.47.1", features = ["full"] }
4141
tracing = "0.1.41"
42-
tracing-subscriber = { version = "0.3.20", features = ["env-filter"] }
42+
tracing-subscriber = { version = "0.3.20", features = ["env-filter", "fmt", "json"] }
4343
anyhow = "1.0.99"
4444
clap = { version = "4.5.47", features = ["derive", "env"] }
4545
url = "2.5.7"

crates/ingress-rpc/src/main.rs

Lines changed: 85 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,26 @@ use clap::Parser;
33
use jsonrpsee::server::Server;
44
use op_alloy_network::Optimism;
55
use opentelemetry::global;
6-
use opentelemetry::trace::Tracer;
7-
use opentelemetry::{InstrumentationScope, trace::TracerProvider};
8-
use opentelemetry_sdk::trace;
9-
use opentelemetry_sdk::trace::{RandomIdGenerator, Sampler};
10-
use opentelemetry_semantic_conventions as semcov;
6+
//use opentelemetry::trace::Tracer;
7+
//use opentelemetry::{InstrumentationScope, trace::TracerProvider};
8+
//use opentelemetry_sdk::trace;
9+
use opentelemetry::trace::TracerProvider;
10+
use opentelemetry_sdk::trace::BatchSpanProcessor;
11+
use opentelemetry_sdk::trace::Sampler;
12+
//use opentelemetry_semantic_conventions as semcov;
13+
use opentelemetry_sdk::Resource;
14+
use opentelemetry_sdk::trace::SdkTracerProvider;
1115
use rdkafka::ClientConfig;
1216
use rdkafka::producer::FutureProducer;
1317
use std::env;
1418
use std::fs;
1519
use std::net::IpAddr;
1620
use tracing::{info, warn};
17-
use tracing_subscriber::Layer;
18-
use tracing_subscriber::filter::{LevelFilter, Targets};
19-
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
21+
//use tracing_subscriber::Layer;
22+
//use tracing_subscriber::filter::{LevelFilter, Targets};
23+
use opentelemetry_otlp::{SpanExporter, WithExportConfig};
24+
use tracing_subscriber::layer::SubscriberExt;
25+
use tracing_subscriber::{Layer, Registry};
2026
use url::Url;
2127

2228
mod queue;
@@ -110,6 +116,8 @@ async fn main() -> anyhow::Result<()> {
110116
.with_target(env!("CARGO_PKG_NAME"), LevelFilter::TRACE);
111117
*/
112118

119+
/*global::set_text_map_propagator(opentelemetry_datadog::DatadogPropagator::default());
120+
113121
let log_filter = Targets::new()
114122
.with_default(LevelFilter::INFO)
115123
.with_target(env!("CARGO_PKG_NAME"), log_level);
@@ -121,48 +129,76 @@ async fn main() -> anyhow::Result<()> {
121129
trace_config.sampler = Box::new(Sampler::AlwaysOn);
122130
trace_config.id_generator = Box::new(RandomIdGenerator::default());
123131
124-
let provider = if config.tracing_enabled {
125-
Some(
126-
opentelemetry_datadog::new_pipeline()
127-
.with_service_name(env!("CARGO_PKG_NAME"))
128-
.with_api_version(opentelemetry_datadog::ApiVersion::Version05)
129-
.with_agent_endpoint(&otlp_endpoint)
130-
.with_trace_config(trace_config)
131-
.install_batch()
132-
.unwrap(),
133-
)
134-
} else {
135-
None
136-
};
132+
let provider = opentelemetry_datadog::new_pipeline()
133+
.with_service_name(env!("CARGO_PKG_NAME"))
134+
.with_api_version(opentelemetry_datadog::ApiVersion::Version05)
135+
.with_agent_endpoint(&otlp_endpoint)
136+
.with_trace_config(trace_config)
137+
.install_batch()?;
138+
139+
global::set_tracer_provider(provider.clone());
140+
141+
let scope = InstrumentationScope::builder(env!("CARGO_PKG_NAME"))
142+
.with_version(env!("CARGO_PKG_VERSION"))
143+
.with_schema_url(semcov::SCHEMA_URL)
144+
.with_attributes(None)
145+
.build();
146+
147+
let tracer = provider.tracer_with_scope(scope);
148+
tracer.in_span("span_main", |_span| {
149+
info!(
150+
message = "Tracing enabled",
151+
endpoint = %otlp_endpoint
152+
);
153+
});
154+
155+
tracing_subscriber::registry()
156+
.with(tracing_opentelemetry::OpenTelemetryLayer::new(tracer))
157+
.with(tracing_subscriber::fmt::layer().with_filter(log_filter))
158+
.init();*/
159+
160+
let filter = tracing_subscriber::EnvFilter::new(log_level.to_string());
161+
162+
let log_layer = tracing_subscriber::fmt::layer()
163+
.with_line_number(true)
164+
.with_thread_ids(true)
165+
.with_file(true)
166+
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE)
167+
.json()
168+
.boxed();
137169

138-
if let Some(ref provider) = provider {
139-
global::set_tracer_provider(provider.clone());
140-
}
170+
let dd_host = env::var("DD_AGENT_HOST").unwrap_or_else(|_| "localhost".to_string());
171+
let otlp_endpoint = format!("http://{}:{}", dd_host, config.tracing_otlp_port);
141172

142-
if let Some(ref provider) = provider {
143-
let scope = InstrumentationScope::builder("opentelemetry-datadog")
144-
.with_version(env!("CARGO_PKG_VERSION"))
145-
.with_schema_url(semcov::SCHEMA_URL)
146-
.with_attributes(None)
147-
.build();
148-
149-
let tracer = provider.tracer_with_scope(scope);
150-
tracer.in_span("span_main", |_span| {
151-
info!(
152-
message = "Tracing enabled",
153-
endpoint = %otlp_endpoint
154-
);
155-
});
156-
157-
tracing_subscriber::registry()
158-
.with(tracing_opentelemetry::OpenTelemetryLayer::new(tracer))
159-
.with(tracing_subscriber::fmt::layer().with_filter(log_filter))
160-
.init();
161-
} else {
162-
tracing_subscriber::registry()
163-
.with(tracing_subscriber::fmt::layer().with_filter(log_filter))
164-
.init();
165-
}
173+
// https://github.com/commonwarexyz/monorepo/blob/27e6f73fce91fc46ef7170e928cbcf96cc635fea/runtime/src/tokio/tracing.rs#L10
174+
let exporter = SpanExporter::builder()
175+
.with_http()
176+
.with_endpoint(&otlp_endpoint)
177+
.build()?;
178+
179+
let batch_processor = BatchSpanProcessor::builder(exporter).build();
180+
181+
let resource = Resource::builder_empty()
182+
.with_service_name(env!("CARGO_PKG_NAME"))
183+
.build();
184+
185+
let tracer_provider = SdkTracerProvider::builder()
186+
.with_span_processor(batch_processor)
187+
.with_resource(resource)
188+
.with_sampler(Sampler::AlwaysOn)
189+
.build();
190+
191+
// Create the tracer and set it globally
192+
let tracer = tracer_provider.tracer(env!("CARGO_PKG_NAME"));
193+
global::set_tracer_provider(tracer_provider);
194+
195+
let trace_layer = tracing_opentelemetry::layer().with_tracer(tracer);
196+
197+
let register = Registry::default()
198+
.with(filter)
199+
.with(log_layer)
200+
.with(trace_layer);
201+
tracing::subscriber::set_global_default(register)?;
166202

167203
info!(
168204
message = "Starting ingress service",
@@ -200,28 +236,7 @@ async fn main() -> anyhow::Result<()> {
200236
address = %addr
201237
);
202238

203-
// Set up graceful shutdown
204-
tokio::select! {
205-
_ = handle.stopped() => {
206-
info!("Server stopped");
207-
}
208-
_ = tokio::signal::ctrl_c() => {
209-
info!("Received shutdown signal");
210-
}
211-
}
212-
213-
// Give time for any remaining spans to be processed
214-
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
215-
216-
// Shutdown tracer provider if it exists
217-
if let Some(provider) = provider {
218-
info!("Shutting down tracer provider");
219-
if let Err(e) = provider.shutdown() {
220-
warn!("Error shutting down tracer provider: {}", e);
221-
}
222-
}
223-
224-
info!("Ingress service shutdown complete");
239+
handle.stopped().await;
225240
Ok(())
226241
}
227242

crates/ingress-rpc/src/service.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,12 +94,18 @@ where
9494
.await?;
9595
validate_tx(account, &transaction, &data, &mut l1_block_info).await?;
9696

97+
let span = span!(tracing::Level::TRACE, "span_ethsendbundle", transaction = %transaction.tx_hash());
98+
let _enter = span.enter();
9799
let expiry_timestamp = SystemTime::now()
98100
.duration_since(UNIX_EPOCH)
99101
.unwrap()
100102
.as_secs()
101103
+ self.send_transaction_default_lifetime_seconds;
104+
drop(_enter);
102105

106+
let span =
107+
span!(tracing::Level::INFO, "span_ethsendbundle", transaction = %transaction.tx_hash());
108+
let _enter = span.enter();
103109
let bundle = EthSendBundle {
104110
txs: vec![data.clone()],
105111
block_number: 0,
@@ -108,6 +114,7 @@ where
108114
reverting_tx_hashes: vec![transaction.tx_hash()],
109115
..Default::default()
110116
};
117+
drop(_enter);
111118

112119
// queue the bundle
113120
let sender = transaction.signer();

0 commit comments

Comments
 (0)