Skip to content
2 changes: 2 additions & 0 deletions .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[build]
target-dir = "target"
Comment thread
kotauskas marked this conversation as resolved.
Outdated
Empty file added rustfmt.toml
Empty file.
203 changes: 158 additions & 45 deletions src/recompress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,17 @@
//! time, particularly for the xz outputs. In our infrastructure this runs on a 72 vCPU container to
//! finish in a reasonable amount of time.

/// The maximum XZ dictionary size we're willing to choose. Rustup users will
/// need at least this much free RAM to decompress the archive, and
/// compression will require even more memory.
const MAX_XZ_DICTSIZE: u32 = 128 * 1024 * 1024;

use crate::Context;
use anyhow::Context as _;
use std::convert::TryFrom;
use std::fmt::Write as FmtWrite;
use std::fs::{self, File};
use std::io::{self, Read, Write};
use std::io::{self, Read, Seek, Write};
use std::path::Path;
use std::time::{Duration, Instant};
use xz2::read::XzDecoder;
Expand All @@ -28,16 +35,26 @@ pub(crate) fn recompress_file(
let file_start = Instant::now();
let gz_path = xz_path.with_extension("gz");

let mut destinations: Vec<(&str, Box<dyn io::Write>)> = Vec::new();
let mut in_file = File::open(xz_path).with_context(|| "failed to open XZ-compressed input")?;
let mut dec_buf = vec![0u8; 4 * 1024 * 1024];
let mut compression_times = String::new();

let mut dec_measurements = None;

// Produce gzip if explicitly enabled or the destination file doesn't exist.
if recompress_gz || !gz_path.is_file() {
let gz = File::create(gz_path)?;
destinations.push((
"gz",
Box::new(flate2::write::GzEncoder::new(gz, gz_compression_level)),
));
}
let gz_out = File::create(gz_path)?;
let mut gz_encoder = flate2::write::GzEncoder::new(gz_out, gz_compression_level);
let mut gz_duration = [Duration::ZERO];
dec_measurements = Some(decompress_and_write(
&mut in_file,
&mut dec_buf,
&mut [("gz", &mut gz_encoder)],
&mut gz_duration,
)?);
let [gz_duration] = gz_duration;
format_compression_time(&mut compression_times, "gz", gz_duration, None)?;
};

// xz recompression with more aggressive settings than we want to take the time
// for in rust-lang/rust CI. This cuts 5-15% off of the produced tarballs.
Expand All @@ -51,11 +68,17 @@ pub(crate) fn recompress_file(
// parallel.
let xz_recompressed = xz_path.with_extension("xz_recompressed");
if recompress_xz {
let in_size = match dec_measurements {
Some((_, size)) => size,
None => measure_compressed_file(&mut in_file, &mut dec_buf)?.1,
};
let dictsize = choose_xz_dictsize(u32::try_from(in_size).unwrap_or(u32::MAX));

let mut filters = xz2::stream::Filters::new();
let mut lzma_ops = xz2::stream::LzmaOptions::new_preset(9).unwrap();
// This sets the overall dictionary size, which is also how much memory (baseline)
// is needed for decompression.
lzma_ops.dict_size(64 * 1024 * 1024);
lzma_ops.dict_size(dictsize);
// Use the best match finder for compression ratio.
lzma_ops.match_finder(xz2::stream::MatchFinder::BinaryTree4);
lzma_ops.mode(xz2::stream::Mode::Normal);
Expand All @@ -76,61 +99,148 @@ pub(crate) fn recompress_file(
// FIXME: Do we want a checksum as part of compression?
let stream =
xz2::stream::Stream::new_stream_encoder(&filters, xz2::stream::Check::None).unwrap();

let xz_out = File::create(&xz_recompressed)?;
destinations.push((
"xz",
Box::new(xz2::write::XzEncoder::new_stream(
std::io::BufWriter::new(xz_out),
stream,
)),
));
let mut xz_encoder = xz2::write::XzEncoder::new_stream(io::BufWriter::new(xz_out), stream);
let mut xz_duration = [Duration::ZERO];
dec_measurements = Some(decompress_and_write(
&mut in_file,
&mut dec_buf,
&mut [("xz", &mut xz_encoder)],
&mut xz_duration,
)?);
let [xz_duration] = xz_duration;
format_compression_time(&mut compression_times, "xz", xz_duration, Some(dictsize))?;
}

// We only decompress once and then write into each of the compressors before
// moving on.
//
// This code assumes that compression with `write_all` will never fail (i.e., we
// can take arbitrary amounts of data as input). That seems like a reasonable
// assumption though.
let mut decompressor = XzDecoder::new(File::open(xz_path)?);
let mut buffer = vec![0u8; 4 * 1024 * 1024];
drop(in_file);

print!(
"recompressed {}: {:.2?} total",
xz_path.display(),
file_start.elapsed()
);
if let Some((decompress_time, _)) = dec_measurements {
print!(" {:.2?} decompression", decompress_time);
}
println!("{}", compression_times);

if recompress_xz {
fs::rename(&xz_recompressed, xz_path)?;
}

Ok(())
}

/// Decompresses the given XZ stream and sends it to the given set of destinations.
/// Writes the time taken by each individual destination to the given slice of
/// durations and returns the total time taken by the decompressor and the total
/// size of the decompressed stream.
fn decompress_and_write(
src: &mut (impl Read + Seek),
buf: &mut [u8],
destinations: &mut [(&str, &mut dyn Write)],
Comment thread
kotauskas marked this conversation as resolved.
Outdated
time_by_dst: &mut [Duration],
) -> anyhow::Result<(Duration, u64)> {
src.rewind().with_context(|| "input file seek failed")?;
let mut decompressor = XzDecoder::new(src);
let mut decompress_time = Duration::ZERO;
let mut time_by_dest = vec![Duration::ZERO; destinations.len()];
let mut total_length = 0_u64;
loop {
let start = Instant::now();
let length = decompressor.read(&mut buffer)?;
let length = decompressor
.read(buf)
.with_context(|| "XZ decompression failed")?;
decompress_time += start.elapsed();
total_length += length as u64;
if length == 0 {
break;
}
for (idx, (_, destination)) in destinations.iter_mut().enumerate() {
// This code assumes that compression with `write_all` will never fail (i.e.,
// we can take arbitrary amounts of data as input). That seems like a
// reasonable assumption though.
for (idx, (compname, destination)) in destinations.iter_mut().enumerate() {
let start = std::time::Instant::now();
destination.write_all(&buffer[..length])?;
time_by_dest[idx] += start.elapsed();
destination
.write_all(&buf[..length])
.with_context(|| format!("{compname} compression failed"))?;
time_by_dst[idx] += start.elapsed();
}
}
Ok((decompress_time, total_length))
}

let mut compression_times = String::new();
for (idx, (name, _)) in destinations.iter().enumerate() {
/// Calls `decompress_and_write` solely to measure the file's uncompressed size
/// and the time taken by decompression.
fn measure_compressed_file(
src: &mut (impl Read + Seek),
buf: &mut [u8],
) -> anyhow::Result<(Duration, u64)> {
decompress_and_write(src, buf, &mut [], &mut [])
}

fn format_compression_time(
out: &mut String,
name: &str,
duration: Duration,
dictsize: Option<u32>,
) -> std::fmt::Result {
write!(out, ", {:.2?} {} compression", duration, name)?;
if let Some(mut dictsize) = dictsize {
let mut iprefix = 0;
while iprefix < 2 && dictsize & 1023 == 0 {
Comment thread
kotauskas marked this conversation as resolved.
Outdated
iprefix += 1;
dictsize >>= 10;
}
write!(
compression_times,
", {:.2?} {} compression",
time_by_dest[idx], name
out,
" with {dictsize} {}B dictionary",
["", "Ki", "Mi"][iprefix]
)?;
}
println!(
"recompressed {}: {:.2?} total, {:.2?} decompression{}",
xz_path.display(),
file_start.elapsed(),
decompress_time,
compression_times
);
Ok(())
}

if recompress_xz {
fs::rename(&xz_recompressed, xz_path)?;
/// Chooses the smallest XZ dictionary size that is at least as large as the
/// file and will not be rounded by XZ, clipping it to the range of acceptable
/// dictionary sizes.
Comment thread
Mark-Simulacrum marked this conversation as resolved.
fn choose_xz_dictsize(sz: u32) -> u32 {
/// XZ's minimum dictionary size, which is 4 KiB.
const MIN_XZ_DICTSIZE: u32 = 4096;
const {
// This check is to prevent overflow further down the line
// regardless of the value of MAX_XZ_DICTSIZE.
assert!(
MAX_XZ_DICTSIZE <= (1024 + 512) * 1024 * 1024,
"XZ dictionary size only goes up to 1.5 GiB"
);
};
// If the size is beyond the extremes, clip and
// don't bother with any further calculations.
if sz <= MIN_XZ_DICTSIZE {
return MIN_XZ_DICTSIZE;
}
if sz >= MAX_XZ_DICTSIZE {
return MAX_XZ_DICTSIZE;
}
if sz.is_power_of_two() {
return sz;
}

Ok(())
// Copypasted from .isolate_highest_one().
Comment thread
kotauskas marked this conversation as resolved.
Outdated
let hi_one = sz & (1_u32 << 31).wrapping_shr(sz.leading_zeros());

// For a bitstring of the form 01x…, check if 011… is greater or equal.
let twinbit_form = hi_one | (hi_one >> 1);
if twinbit_form >= sz {
Comment thread
Mark-Simulacrum marked this conversation as resolved.
return twinbit_form;
}

// Otherwise, we go for the next power of two.
if hi_one << 1 >= MAX_XZ_DICTSIZE {
return MAX_XZ_DICTSIZE;
}
hi_one << 1
}

impl Context {
Expand Down Expand Up @@ -192,7 +302,10 @@ impl Context {
let path = to_recompress.lock().unwrap().pop();
path
} {
recompress_file(&xz_path, recompress_gz, compression_level, recompress_xz)?;
recompress_file(&xz_path, recompress_gz, compression_level, recompress_xz)
.with_context(|| {
format!("failed to recompress {}", xz_path.display())
})?;
}

Ok::<_, anyhow::Error>(())
Expand Down