diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 2222751a..1b458a4e 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -28,7 +28,7 @@ jobs: steps: - uses: actions/checkout@v4 - name: Run clippy - run: rustup update; cargo clippy --all-targets -- -D warnings + run: sudo apt-get install protobuf-compiler; rustup update; cargo clippy --all-targets -- -D warnings fmt: name: Fmt diff --git a/Cargo.toml b/Cargo.toml index 399a0166..2a9ac7f0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,10 @@ egglog = "1.0.0" egglog-ast = "1.0.0" egraph-serialize = { version = "0.3.0", default-features = false, features = ["graphviz", "serde"]} tracing = "0.1.43" +tracing-appender = "0.2.4" +tracing-perfetto-sdk-layer = "0.13.0" +tracing-perfetto-sdk-schema = "0.13.0" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } paste = "1.0.15" pretty-duration = "0.1.1" anyhow = "1.0" diff --git a/crates/luminal_cuda/src/runtime.rs b/crates/luminal_cuda/src/runtime.rs index 8fc2bb96..2ed1bb1a 100644 --- a/crates/luminal_cuda/src/runtime.rs +++ b/crates/luminal_cuda/src/runtime.rs @@ -95,6 +95,7 @@ pub struct CudaRuntime { custom_state: FxHashMap, exec_graph: StableGraph, node_to_exec: FxHashMap, + timings: Vec<(Vec, u64)>, } impl CudaRuntime { @@ -229,6 +230,65 @@ impl CudaRuntime { } } } + + pub fn record_cuda_perfetto_trace(&self, file_path: impl AsRef) { + let ops = ::into_vec(); + let data = std::fs::read(&file_path).unwrap(); + let mut trace = tracing_perfetto_sdk_schema::Trace::decode(data.as_slice()).unwrap(); + + let host_start_times: Vec<(u64, u32)> = trace + .packet + .iter() + .filter_map(|p| match &p.data { + Some(tracing_perfetto_sdk_schema::trace_packet::Data::TrackEvent(TrackEvent { + name_field: Some(tracing_perfetto_sdk_schema::track_event::NameField::Name(s)), + r#type: ty, + .. + })) if s == "megakernel" + && *ty + == Some( + tracing_perfetto_sdk_schema::track_event::Type::SliceBegin as i32, + ) => + { + Some((p.timestamp?, p.timestamp_clock_id?)) + } + _ => None, + }) + .sorted_by_key(|i| *i) + .collect_vec(); + let mut extra_packets = Vec::new(); + for (run, (device_timings, device_start_time)) in self.timings.iter().enumerate() { + let (host_time, host_clock_id) = host_start_times[run]; + for (sm, sm_timings) in device_timings.chunks(1000).into_iter().enumerate() { + let mut builder = ManualTrackBuilder::new(sm as u32, host_time, host_clock_id); + for n_op in 0..sm_timings.len() - 1 { + let op = sm_timings[n_op].event as usize; + let op_label = if op == 0 { + "Issue".to_string() + } else if op == 1 { + "Wait".to_string() + } else { + ops[op - 2].term().0 + }; + if sm_timings[n_op + 1].start == 0 { + break; + } + builder.push_slice( + &op_label, + sm_timings[n_op].start - *device_start_time, + sm_timings[n_op + 1].start - *device_start_time, + host_time, + host_clock_id, + ); + } + extra_packets.extend(builder.into_packets()); + } + } + trace.packet.extend(extra_packets); + let mut buf = Vec::with_capacity(trace.encoded_len()); + trace.encode(&mut buf).unwrap(); + std::fs::write(file_path, buf).unwrap(); + } } pub trait ToCudaBuffer { @@ -263,7 +323,7 @@ impl Runtime for CudaRuntime { FxHashMap, ); type Data = Box; - type ExecReturn = Vec<(Vec, u64)>; + type ExecReturn = (); fn initialize((ctx, stream, custom_state): Self::CompileArg) -> Self { Self { @@ -274,6 +334,7 @@ impl Runtime for CudaRuntime { custom_state: custom_state, exec_graph: StableGraph::default(), node_to_exec: FxHashMap::default(), + timings: vec![], } } @@ -587,7 +648,7 @@ impl Runtime for CudaRuntime { } } } - timings + self.timings.extend(timings); } fn set_data(&mut self, id: impl ToId, data: Self::Data) { @@ -795,66 +856,6 @@ pub fn allocate_input_buffers( buffers } -pub fn record_exec_timings_to_file( - timings: &Vec<(Vec, u64)>, - ops: &Vec>>, - file_path: &str, -) { - let data = std::fs::read(file_path).unwrap(); - let mut trace = tracing_perfetto_sdk_schema::Trace::decode(data.as_slice()).unwrap(); - - let host_start_times: Vec<(u64, u32)> = trace - .packet - .iter() - .filter_map(|p| match &p.data { - Some(tracing_perfetto_sdk_schema::trace_packet::Data::TrackEvent(TrackEvent { - name_field: Some(tracing_perfetto_sdk_schema::track_event::NameField::Name(s)), - r#type: ty, - .. - })) if s == "megakernel" - && *ty - == Some(tracing_perfetto_sdk_schema::track_event::Type::SliceBegin as i32) => - { - Some((p.timestamp?, p.timestamp_clock_id?)) - } - _ => None, - }) - .sorted_by_key(|i| *i) - .collect_vec(); - let mut extra_packets = Vec::new(); - for (run, (device_timings, device_start_time)) in timings.iter().enumerate() { - let (host_time, host_clock_id) = host_start_times[run]; - for (sm, sm_timings) in device_timings.chunks(1000).into_iter().enumerate() { - let mut builder = ManualTrackBuilder::new(sm as u32, host_time, host_clock_id); - for n_op in 0..sm_timings.len() - 1 { - let op = sm_timings[n_op].event as usize; - let op_label = if op == 0 { - "Issue".to_string() - } else if op == 1 { - "Wait".to_string() - } else { - ops[op - 2].term().0 - }; - if sm_timings[n_op + 1].start == 0 { - break; - } - builder.push_slice( - &op_label, - sm_timings[n_op].start - *device_start_time, - sm_timings[n_op + 1].start - *device_start_time, - host_time, - host_clock_id, - ); - } - extra_packets.extend(builder.into_packets()); - } - } - trace.packet.extend(extra_packets); - let mut buf = Vec::with_capacity(trace.encoded_len()); - trace.encode(&mut buf).unwrap(); - std::fs::write(file_path, buf).unwrap(); -} - struct ManualTrackBuilder { packets: Vec, track_uuid: u64, diff --git a/examples/llama/Cargo.toml b/examples/llama/Cargo.toml index 8bf1809d..e729159d 100644 --- a/examples/llama/Cargo.toml +++ b/examples/llama/Cargo.toml @@ -11,9 +11,4 @@ luminal_nn = { path = "../../crates/luminal_nn" } luminal_cuda = { path = "../../crates/luminal_cuda" } itertools = "0.12.1" tokenizers = "0.15.2" -tracing = "0.1.43" -tracing-subscriber = {version="0.3", features=["env-filter"]} -tracing-perfetto-sdk-layer = "0.13.0" -tracing-perfetto-sdk-schema = "0.13.0" -tracing-perfetto-sdk-sys = "0.13.0" -tracing-appender = "0.2.4" +tracing = "0.1.43" \ No newline at end of file diff --git a/examples/llama/src/main.rs b/examples/llama/src/main.rs index 2da2aaa2..1e5bb3c6 100644 --- a/examples/llama/src/main.rs +++ b/examples/llama/src/main.rs @@ -8,33 +8,16 @@ use luminal::{ op::DType, prelude::FxHashMap, }; -use luminal_cuda::{ - block::IntoBlockOp, - runtime::{record_exec_timings_to_file, CudaRuntime, CustomState}, -}; +use luminal_cuda::runtime::{CudaRuntime, CustomState}; use model::*; -use std::{fs::File, io::Write, time::Duration}; +use std::io::Write; use tokenizers::Tokenizer; use tracing::{span, Level}; -use tracing_appender::non_blocking; -use tracing_perfetto_sdk_layer::NativeLayer; -use tracing_subscriber::EnvFilter; -use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; fn main() { - // Set up tracing - let file = File::create("trace.pftrace").unwrap(); - let (writer, _guard) = non_blocking(file); - let layer = NativeLayer::from_config(trace_config(), writer) - .build() - .unwrap(); - let filter = EnvFilter::builder() - .parse(format!("{}=trace,luminal=trace", env!("CARGO_PKG_NAME"))) - .unwrap(); - let layer_handle = layer.clone(); - tracing_subscriber::registry() - .with(filter) - .with(layer) + let trace_session = luminal::trace::new() + .perfetto("trace.pftrace") + .env_filter(format!("{}=trace,luminal=trace", env!("CARGO_PKG_NAME"))) .init(); let max_seq_len = 4096; @@ -88,7 +71,6 @@ fn main() { print!("{input_sentence}"); std::io::stdout().flush().unwrap(); - let mut timings = vec![]; let mut prev_seq = 0; let mut benchmarker = Benchmarker::new(756., 2_000.); // H100 specs for i in 0..gen_tokens { @@ -121,7 +103,7 @@ fn main() { } benchmarker.start_iteration(seq_len, prev_seq); - timings.extend(runtime.execute(&cx.dyn_map)); + runtime.execute(&cx.dyn_map); let logits_data = runtime.get_f32(logits); let sample_span = span!(Level::INFO, "sample"); @@ -134,18 +116,12 @@ fn main() { } println!(); + trace_session.stop(); benchmarker.report(); - - layer_handle - .flush(Duration::from_secs(5), Duration::from_secs(5)) - .unwrap(); - layer_handle.stop().unwrap(); - drop(_guard); - record_exec_timings_to_file( - &timings, - &::into_vec(), - "trace.pftrace", - ); + // Dump cuda trace to timeline + if let Some(path) = trace_session.perfetto_path { + runtime.record_cuda_perfetto_trace(path); + } } #[tracing::instrument(skip_all)] @@ -164,20 +140,3 @@ fn sample(logits: &[f32], vocab_size: usize) -> Vec { }) .collect() } - -fn trace_config() -> tracing_perfetto_sdk_schema::TraceConfig { - tracing_perfetto_sdk_schema::TraceConfig { - buffers: vec![tracing_perfetto_sdk_schema::trace_config::BufferConfig { - size_kb: Some(4096), - ..Default::default() - }], - data_sources: vec![tracing_perfetto_sdk_schema::trace_config::DataSource { - config: Some(tracing_perfetto_sdk_schema::DataSourceConfig { - name: Some("rust_tracing".into()), - ..Default::default() - }), - ..Default::default() - }], - ..Default::default() - } -} diff --git a/src/graph.rs b/src/graph.rs index 0afb4242..6b77f30d 100644 --- a/src/graph.rs +++ b/src/graph.rs @@ -17,6 +17,7 @@ use egraph_serialize::{ClassId, NodeId}; use itertools::Itertools; use petgraph::{Direction, stable_graph::StableGraph, visit::EdgeRef}; use rustc_hash::{FxHashMap, FxHashSet}; +use tracing::info; pub type LLIRGraph = StableGraph; pub type HLIRGraph = StableGraph, Dependency>; @@ -198,6 +199,7 @@ impl Graph { let print = std::env::var("SEARCH") .map(|s| s == "1") .unwrap_or_default(); + let limit_reached = llir_graphs.len() == limit; let start = std::time::Instant::now(); if print { println!( @@ -205,17 +207,21 @@ impl Graph { format!( "---- Searching through {}{} graphs ----", llir_graphs.len().to_string().bold(), - if llir_graphs.len() == limit { - "[limit]" - } else { - "" - } + if limit_reached { "[limit]" } else { "" } ) .cyan() ); } runtime.compile(llir_graphs.last().unwrap()); if print { + info!( + target: "luminal::search", + graphs = llir_graphs.len(), + limit, + limit_reached, + duration_ms = start.elapsed().as_millis() as u64, + "search completed" + ); println!( "{}", format!( @@ -392,17 +398,22 @@ fn run_egglog( .unwrap_or_default() { println!("{}", "---- Egglog Rule Matches ----".green()); - println!( - "{}", - egraph - .get_overall_run_report() - .num_matches_per_rule - .iter() - .filter(|(k, _)| !k.contains("(")) - .map(|(k, v)| format!("{k}: {v}")) - .join("\n") - .green() - ); + let mut rule_lines = Vec::new(); + for (rule, matches) in egraph + .get_overall_run_report() + .num_matches_per_rule + .iter() + .filter(|(k, _)| !k.contains("(")) + { + info!( + target: "luminal::egglog", + rule = %rule, + matches = *matches, + "rule matches" + ); + rule_lines.push(format!("{rule}: {matches}")); + } + println!("{}", rule_lines.join("\n").green()); println!( "{}", format!( @@ -411,6 +422,11 @@ fn run_egglog( ) .green() ); + info!( + target: "luminal::egglog", + duration_ms = start.elapsed().as_millis() as u64, + "egglog run completed" + ); } let (sort, value) = egraph.eval_expr(&var!(root)).unwrap(); diff --git a/src/lib.rs b/src/lib.rs index 45b88114..1a4c67e6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,6 +5,7 @@ pub mod hl_ops; pub mod op; pub mod serialized_egraph; pub mod shape; +pub mod trace; pub mod utils; pub mod visualization; diff --git a/src/op.rs b/src/op.rs index dca6200d..37d4433e 100644 --- a/src/op.rs +++ b/src/op.rs @@ -17,6 +17,7 @@ use itertools::Itertools; use num_traits::Float; use petgraph::{Direction, algo::toposort, prelude::StableGraph, visit::EdgeRef}; use rustc_hash::FxHashMap; +use tracing::info_span; pub type Ops = ( Input, @@ -1510,6 +1511,8 @@ impl Runtime for NativeRuntime { continue; } + let span = info_span!("native_op", op = %format!("{:?}", self.graph[node])); + let _entered = span.enter(); let inputs = self .graph .edges_directed(node, Direction::Incoming) diff --git a/src/trace.rs b/src/trace.rs new file mode 100644 index 00000000..ab7d7dc3 --- /dev/null +++ b/src/trace.rs @@ -0,0 +1,121 @@ +use std::{ + fs::File, + path::{Path, PathBuf}, + time::Duration, +}; + +use tracing_appender::non_blocking::{NonBlocking, WorkerGuard}; +use tracing_perfetto_sdk_layer::NativeLayer; +use tracing_perfetto_sdk_schema::{ + DataSourceConfig, TraceConfig, + trace_config::{BufferConfig, DataSource}, +}; +use tracing_subscriber::{EnvFilter, layer::SubscriberExt, util::SubscriberInitExt}; + +pub enum TraceSink { + PerfettoFile { path: PathBuf }, + Stdout, +} + +/// Setup luminal default tracing. Any `tracing_subscriber`'s can also be used. +pub fn new() -> TraceOptions { + TraceOptions { + perfetto_file: None, + env_filter: "luminal=trace".to_string(), + } +} + +pub struct TraceOptions { + perfetto_file: Option, + pub env_filter: String, +} + +impl TraceOptions { + pub fn perfetto(mut self, path: impl AsRef) -> Self { + self.perfetto_file = Some(path.as_ref().to_path_buf()); + self + } + + pub fn env_filter(mut self, env_filter: impl ToString) -> Self { + self.env_filter = env_filter.to_string(); + self + } + + pub fn init(self) -> TraceSession { + let filter = EnvFilter::builder() + .parse(self.env_filter) + .expect("Invalid tracing env filter"); + + if let Some(file_path) = self.perfetto_file { + let file = File::create(&file_path).expect("Failed to create trace file"); + let (writer, guard) = tracing_appender::non_blocking(file); + let layer = NativeLayer::from_config(default_perfetto_config(), writer) + .build() + .expect("Failed to build perfetto layer"); + let handle = layer.clone(); + tracing_subscriber::registry() + .with(filter.clone()) + .with(layer) + .init(); + TraceSession { + perfetto_layer: Some(handle), + _guard: Some(guard), + perfetto_path: Some(file_path), + } + } else { + tracing_subscriber::registry() + .with(filter.clone()) + .with(tracing_subscriber::fmt::layer()) + .init(); + TraceSession { + perfetto_layer: None, + _guard: None, + perfetto_path: None, + } + } + } +} + +pub struct TraceSession { + perfetto_layer: Option>, + _guard: Option, + pub perfetto_path: Option, +} + +impl TraceSession { + pub fn flush(&self) { + if let Some(layer) = &self.perfetto_layer { + let _ = layer.flush(Duration::from_secs(5), Duration::from_secs(5)); + } + } + + pub fn stop(&self) { + self.flush(); + if let Some(layer) = &self.perfetto_layer { + let _ = layer.stop(); + } + } +} + +fn default_perfetto_config() -> TraceConfig { + TraceConfig { + buffers: vec![BufferConfig { + size_kb: Some(4096), + ..Default::default() + }], + data_sources: vec![DataSource { + config: Some(DataSourceConfig { + name: Some("rust_tracing".into()), + ..Default::default() + }), + ..Default::default() + }], + ..Default::default() + } +} + +pub fn trace_file_path(path: impl AsRef) -> TraceSink { + TraceSink::PerfettoFile { + path: path.as_ref().to_path_buf(), + } +}