Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 1 addition & 9 deletions ext/src/nassau.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ use sseq::coordinates::Bidegree;
use crate::{
chain_complex::{AugmentedChainComplex, ChainComplex, FiniteChainComplex, FreeChainComplex},
save::{SaveDirectory, SaveKind},
utils::LogWriter,
};

/// See [`resolution::SenderData`](../resolution/struct.SenderData.html). This differs by not having the `new` field.
Expand Down Expand Up @@ -476,7 +475,7 @@ impl<M: ZeroModule<Algebra = MilnorAlgebra>> Resolution<M> {
});
}

#[tracing::instrument(skip_all, fields(signature = ?signature, throughput))]
#[tracing::instrument(skip_all, fields(?signature))]
fn write_qi(
f: &mut Option<impl io::Write>,
scratch: &mut FpVector,
Expand All @@ -490,9 +489,6 @@ impl<M: ZeroModule<Algebra = MilnorAlgebra>> Resolution<M> {
None => return Ok(()),
};

let mut own_f = LogWriter::new(f);
let f = &mut own_f;

let pivots = &masked_matrix.pivots()[0..masked_matrix.end[0]];
if !pivots.iter().any(|&x| x >= 0) {
return Ok(());
Expand Down Expand Up @@ -522,10 +518,6 @@ impl<M: ZeroModule<Algebra = MilnorAlgebra>> Resolution<M> {
scratch.to_bytes(f)?;
}

tracing::Span::current().record(
"throughput",
tracing::field::display(own_f.into_throughput()),
);
Ok(())
}

Expand Down
32 changes: 23 additions & 9 deletions ext/src/save.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,29 +192,37 @@ impl SaveKind {
}
}

/// In addition to checking the checksum, we also keep track of which files are open, and we delete
/// the open files if the program is terminated halfway.
pub struct ChecksumWriter<T: io::Write> {
/// The writer that [`SaveFile::create_file`] returns. It provides several features:
/// - It keeps track of the amount of data written and the time taken, for logging.
/// - It writes a checksum before closing, so that `ChecksumReader` can check the integrity.
/// - On drop, it removes the file from the tracker returned by [`open_files`]. This ensures that
/// the ctrlc handler only deletes files that are actively being written.
pub struct SaveWriter<T: io::Write> {
writer: T,
path: PathBuf,
bytes_written: usize,
creation_time: std::time::Instant,
adler: adler::Adler32,
}

impl<T: io::Write> ChecksumWriter<T> {
impl<T: io::Write> SaveWriter<T> {
pub fn new(path: PathBuf, writer: T) -> Self {
Self {
path,
writer,
path,
bytes_written: 0,
creation_time: std::time::Instant::now(),
adler: adler::Adler32::new(),
}
}
}

/// We only implement the functions required and the ones we actually use.
impl<T: io::Write> io::Write for ChecksumWriter<T> {
impl<T: io::Write> io::Write for SaveWriter<T> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let bytes_written = self.writer.write(buf)?;
self.adler.write_slice(&buf[0..bytes_written]);
self.bytes_written += bytes_written;
Ok(bytes_written)
}

Expand All @@ -225,11 +233,12 @@ impl<T: io::Write> io::Write for ChecksumWriter<T> {
fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
self.writer.write_all(buf)?;
self.adler.write_slice(buf);
self.bytes_written += buf.len();
Ok(())
}
}

impl<T: io::Write> std::ops::Drop for ChecksumWriter<T> {
impl<T: io::Write> std::ops::Drop for SaveWriter<T> {
fn drop(&mut self) {
if !std::thread::panicking() {
// We may not have finished writing, so the data is wrong. It should not be given a
Expand All @@ -244,7 +253,12 @@ impl<T: io::Write> std::ops::Drop for ChecksumWriter<T> {
self.path
);
}
tracing::info!(file = ?self.path, "closing");
tracing::info!(
file = ?self.path,
written = self.bytes_written,
elapsed = ?self.creation_time.elapsed(),
"closing"
);
}
}

Expand Down Expand Up @@ -479,7 +493,7 @@ impl<A: Algebra> SaveFile<A> {
.open(&p)
.with_context(|| format!("Failed to create save file {p:?}"))
.unwrap();
let mut f = ChecksumWriter::new(p, io::BufWriter::new(f));
let mut f = SaveWriter::new(p, io::BufWriter::new(f));
self.write_header(&mut f).unwrap();
f
}
Expand Down
100 changes: 24 additions & 76 deletions ext/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,89 +465,37 @@ pub fn get_unit(
Ok((is_unit, unit))
}

mod logging {
use std::io;
#[cfg(feature = "logging")]
pub fn ext_tracing_subscriber() -> impl tracing::Subscriber {
use std::io::IsTerminal;

pub struct LogWriter<T> {
writer: T,
bytes: u64,
start: std::time::Instant,
}

impl<T: io::Write> io::Write for LogWriter<T> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let written = self.writer.write(buf)?;
self.bytes += written as u64;
Ok(written)
}

fn flush(&mut self) -> io::Result<()> {
self.writer.flush()
}
}

impl<T> LogWriter<T> {
pub fn new(writer: T) -> Self {
Self {
writer,
bytes: 0,
start: std::time::Instant::now(),
}
}
}

pub struct Throughput(f64);

impl std::fmt::Display for Throughput {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:.2} MiB/s", self.0)
}
}

impl<T: io::Write> LogWriter<T> {
/// Return the throughput in MiB/s
pub fn into_throughput(mut self) -> Throughput {
self.writer.flush().unwrap();
let duration = self.start.elapsed();
let mib = self.bytes as f64 / (1024 * 1024) as f64;
Throughput(mib / duration.as_secs_f64())
}
}

#[cfg(feature = "logging")]
pub fn ext_tracing_subscriber() -> impl tracing::Subscriber {
use std::io::IsTerminal;

use tracing_subscriber::{
filter::EnvFilter,
fmt::{format::FmtSpan, Subscriber},
};
use tracing_subscriber::{
filter::EnvFilter,
fmt::{format::FmtSpan, Subscriber},
};

Subscriber::builder()
.with_ansi(std::io::stderr().is_terminal())
.with_writer(std::io::stderr)
.with_max_level(tracing::Level::INFO)
.with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
.with_thread_ids(true)
.with_env_filter(EnvFilter::try_from_default_env().unwrap_or_default())
.finish()
}
Subscriber::builder()
.with_ansi(std::io::stderr().is_terminal())
.with_writer(std::io::stderr)
.with_max_level(tracing::Level::INFO)
.with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
.with_thread_ids(true)
.with_env_filter(EnvFilter::try_from_default_env().unwrap_or_default())
.finish()
}

#[cfg(not(feature = "logging"))]
pub fn ext_tracing_subscriber() -> impl tracing::Subscriber {
tracing::subscriber::NoSubscriber::new()
}
#[cfg(not(feature = "logging"))]
pub fn ext_tracing_subscriber() -> impl tracing::Subscriber {
tracing::subscriber::NoSubscriber::new()
}

pub fn init_logging() {
tracing::subscriber::set_global_default(ext_tracing_subscriber())
.expect("Failed to enable logging");
pub fn init_logging() {
tracing::subscriber::set_global_default(ext_tracing_subscriber())
.expect("Failed to enable logging");

tracing::info!("Logging initialized");
}
tracing::info!("Logging initialized");
}

pub use logging::{ext_tracing_subscriber, init_logging, LogWriter};

/// The value of the SECONDARY_JOB environment variable.
///
/// This is used for distributing the `secondary`. If set, only data with `s = SECONDARY_JOB` will
Expand Down