diff --git a/ext/src/nassau.rs b/ext/src/nassau.rs index aa71119d4..2a050f3f2 100644 --- a/ext/src/nassau.rs +++ b/ext/src/nassau.rs @@ -41,7 +41,6 @@ use sseq::coordinates::Bidegree; use crate::{ chain_complex::{AugmentedChainComplex, ChainComplex, FiniteChainComplex, FreeChainComplex}, save::{SaveDirectory, SaveKind}, - utils::LogWriter, }; /// See [`resolution::SenderData`](../resolution/struct.SenderData.html). This differs by not having the `new` field. @@ -476,7 +475,7 @@ impl> Resolution { }); } - #[tracing::instrument(skip_all, fields(signature = ?signature, throughput))] + #[tracing::instrument(skip_all, fields(?signature))] fn write_qi( f: &mut Option, scratch: &mut FpVector, @@ -490,9 +489,6 @@ impl> Resolution { None => return Ok(()), }; - let mut own_f = LogWriter::new(f); - let f = &mut own_f; - let pivots = &masked_matrix.pivots()[0..masked_matrix.end[0]]; if !pivots.iter().any(|&x| x >= 0) { return Ok(()); @@ -522,10 +518,6 @@ impl> Resolution { scratch.to_bytes(f)?; } - tracing::Span::current().record( - "throughput", - tracing::field::display(own_f.into_throughput()), - ); Ok(()) } diff --git a/ext/src/save.rs b/ext/src/save.rs index e8b1bf563..2f8e593b6 100644 --- a/ext/src/save.rs +++ b/ext/src/save.rs @@ -192,29 +192,37 @@ impl SaveKind { } } -/// In addition to checking the checksum, we also keep track of which files are open, and we delete -/// the open files if the program is terminated halfway. -pub struct ChecksumWriter { +/// The writer that [`SaveFile::create_file`] returns. It provides several features: +/// - It keeps track of the amount of data written and the time taken, for logging. +/// - It writes a checksum before closing, so that `ChecksumReader` can check the integrity. +/// - On drop, it removes the file from the tracker returned by [`open_files`]. This ensures that +/// the ctrlc handler only deletes files that are actively being written. +pub struct SaveWriter { writer: T, path: PathBuf, + bytes_written: usize, + creation_time: std::time::Instant, adler: adler::Adler32, } -impl ChecksumWriter { +impl SaveWriter { pub fn new(path: PathBuf, writer: T) -> Self { Self { - path, writer, + path, + bytes_written: 0, + creation_time: std::time::Instant::now(), adler: adler::Adler32::new(), } } } /// We only implement the functions required and the ones we actually use. -impl io::Write for ChecksumWriter { +impl io::Write for SaveWriter { fn write(&mut self, buf: &[u8]) -> io::Result { let bytes_written = self.writer.write(buf)?; self.adler.write_slice(&buf[0..bytes_written]); + self.bytes_written += bytes_written; Ok(bytes_written) } @@ -225,11 +233,12 @@ impl io::Write for ChecksumWriter { fn write_all(&mut self, buf: &[u8]) -> io::Result<()> { self.writer.write_all(buf)?; self.adler.write_slice(buf); + self.bytes_written += buf.len(); Ok(()) } } -impl std::ops::Drop for ChecksumWriter { +impl std::ops::Drop for SaveWriter { fn drop(&mut self) { if !std::thread::panicking() { // We may not have finished writing, so the data is wrong. It should not be given a @@ -244,7 +253,12 @@ impl std::ops::Drop for ChecksumWriter { self.path ); } - tracing::info!(file = ?self.path, "closing"); + tracing::info!( + file = ?self.path, + written = self.bytes_written, + elapsed = ?self.creation_time.elapsed(), + "closing" + ); } } @@ -479,7 +493,7 @@ impl SaveFile { .open(&p) .with_context(|| format!("Failed to create save file {p:?}")) .unwrap(); - let mut f = ChecksumWriter::new(p, io::BufWriter::new(f)); + let mut f = SaveWriter::new(p, io::BufWriter::new(f)); self.write_header(&mut f).unwrap(); f } diff --git a/ext/src/utils.rs b/ext/src/utils.rs index 4ba98c3c8..7b1d6a334 100644 --- a/ext/src/utils.rs +++ b/ext/src/utils.rs @@ -465,89 +465,37 @@ pub fn get_unit( Ok((is_unit, unit)) } -mod logging { - use std::io; +#[cfg(feature = "logging")] +pub fn ext_tracing_subscriber() -> impl tracing::Subscriber { + use std::io::IsTerminal; - pub struct LogWriter { - writer: T, - bytes: u64, - start: std::time::Instant, - } - - impl io::Write for LogWriter { - fn write(&mut self, buf: &[u8]) -> io::Result { - let written = self.writer.write(buf)?; - self.bytes += written as u64; - Ok(written) - } - - fn flush(&mut self) -> io::Result<()> { - self.writer.flush() - } - } - - impl LogWriter { - pub fn new(writer: T) -> Self { - Self { - writer, - bytes: 0, - start: std::time::Instant::now(), - } - } - } - - pub struct Throughput(f64); - - impl std::fmt::Display for Throughput { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:.2} MiB/s", self.0) - } - } - - impl LogWriter { - /// Return the throughput in MiB/s - pub fn into_throughput(mut self) -> Throughput { - self.writer.flush().unwrap(); - let duration = self.start.elapsed(); - let mib = self.bytes as f64 / (1024 * 1024) as f64; - Throughput(mib / duration.as_secs_f64()) - } - } - - #[cfg(feature = "logging")] - pub fn ext_tracing_subscriber() -> impl tracing::Subscriber { - use std::io::IsTerminal; - - use tracing_subscriber::{ - filter::EnvFilter, - fmt::{format::FmtSpan, Subscriber}, - }; + use tracing_subscriber::{ + filter::EnvFilter, + fmt::{format::FmtSpan, Subscriber}, + }; - Subscriber::builder() - .with_ansi(std::io::stderr().is_terminal()) - .with_writer(std::io::stderr) - .with_max_level(tracing::Level::INFO) - .with_span_events(FmtSpan::NEW | FmtSpan::CLOSE) - .with_thread_ids(true) - .with_env_filter(EnvFilter::try_from_default_env().unwrap_or_default()) - .finish() - } + Subscriber::builder() + .with_ansi(std::io::stderr().is_terminal()) + .with_writer(std::io::stderr) + .with_max_level(tracing::Level::INFO) + .with_span_events(FmtSpan::NEW | FmtSpan::CLOSE) + .with_thread_ids(true) + .with_env_filter(EnvFilter::try_from_default_env().unwrap_or_default()) + .finish() +} - #[cfg(not(feature = "logging"))] - pub fn ext_tracing_subscriber() -> impl tracing::Subscriber { - tracing::subscriber::NoSubscriber::new() - } +#[cfg(not(feature = "logging"))] +pub fn ext_tracing_subscriber() -> impl tracing::Subscriber { + tracing::subscriber::NoSubscriber::new() +} - pub fn init_logging() { - tracing::subscriber::set_global_default(ext_tracing_subscriber()) - .expect("Failed to enable logging"); +pub fn init_logging() { + tracing::subscriber::set_global_default(ext_tracing_subscriber()) + .expect("Failed to enable logging"); - tracing::info!("Logging initialized"); - } + tracing::info!("Logging initialized"); } -pub use logging::{ext_tracing_subscriber, init_logging, LogWriter}; - /// The value of the SECONDARY_JOB environment variable. /// /// This is used for distributing the `secondary`. If set, only data with `s = SECONDARY_JOB` will