Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions NEWS
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
0.23.3 UNRELEASED

* Implement multi-threaded pack deltification with adaptive thread
selection. Automatically chooses optimal thread count based on
object count for best performance. (Jelmer Vernooij)

* Add Rust implementation of ``create_delta`` function for significant
performance improvements (up to 31x faster for large object sets).
Uses enum-based opcodes for better efficiency. (Jelmer Vernooij)

* Add support for ``pack.allowPackReuse`` configuration option.
``pack.allowPackReuse`` controls whether existing pack data can be
reused when generating new packs. This option is now properly
respected throughout the codebase. (Jelmer Vernooij)

* Add support for ``-a`` argument to
``dulwich.cli.commit``. (Jelmer Vernooij)

Expand Down
220 changes: 220 additions & 0 deletions crates/pack/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
use pyo3::exceptions::{PyTypeError, PyValueError};
use pyo3::prelude::*;
use pyo3::types::{PyBytes, PyList};
use std::cmp::min;
use std::collections::HashMap;

pyo3::import_exception!(dulwich.errors, ApplyDeltaError);

Expand Down Expand Up @@ -218,9 +220,227 @@ fn apply_delta(py: Python, py_src_buf: PyObject, py_delta: PyObject) -> PyResult
Ok(vec![PyBytes::new(py, &out).into()])
}

/// Maximum copy length for a single copy operation - 64KB
const MAX_COPY_LEN: usize = 0xFFFF;

/// Encodes a size using Git's variable-length encoding format
fn delta_encode_size(mut size: usize) -> Vec<u8> {
let mut result = Vec::new();
let mut c = (size & 0x7F) as u8;
size >>= 7;

while size > 0 {
result.push(c | 0x80);
c = (size & 0x7F) as u8;
size >>= 7;
}
result.push(c);
result
}

/// Encodes a copy operation as bytes
fn encode_copy_operation(start: usize, length: usize) -> Vec<u8> {
let mut result = vec![0x80u8];

// Encode start offset (up to 4 bytes)
for i in 0..4 {
let byte_val = (start >> (i * 8)) & 0xFF;
if byte_val != 0 {
result.push(byte_val as u8);
result[0] |= 1 << i;
}
}

// Encode length (up to 2 bytes)
for i in 0..2 {
let byte_val = (length >> (i * 8)) & 0xFF;
if byte_val != 0 {
result.push(byte_val as u8);
result[0] |= 1 << (4 + i);
}
}

result
}

/// Delta operation types
#[derive(Debug, Clone, Copy, PartialEq)]
enum DeltaOpcode {
Equal,
Insert,
}

/// Delta operation with positions
#[derive(Debug, Clone)]
struct DeltaOperation {
opcode: DeltaOpcode,
base_start: usize,
base_end: usize,
target_start: usize,
target_end: usize,
}

/// Sophisticated diff algorithm using hash-based matching
fn find_delta_operations(base: &[u8], target: &[u8]) -> Vec<DeltaOperation> {
let mut operations = Vec::new();

// Build a hash map of base buffer for efficient searching
let mut base_map: HashMap<&[u8], Vec<usize>> = HashMap::new();
let min_match_len = 4; // Minimum match length to consider

// Index base buffer with sliding windows
for i in 0..=base.len().saturating_sub(min_match_len) {
let end = std::cmp::min(i + 32, base.len()); // Limit window size
let window = &base[i..end];
base_map.entry(window).or_insert_with(Vec::new).push(i);
}

let mut target_pos = 0;

while target_pos < target.len() {
let mut best_match: Option<(usize, usize, usize)> = None;

// Try different window sizes to find the best match
for window_size in (min_match_len..=std::cmp::min(32, target.len() - target_pos)).rev() {
if target_pos + window_size <= target.len() {
let target_window = &target[target_pos..target_pos + window_size];

if let Some(base_positions) = base_map.get(target_window) {
for &base_start in base_positions {
// Try to extend the match
let mut match_len = 0;
let max_extend = std::cmp::min(base.len() - base_start, target.len() - target_pos);

while match_len < max_extend &&
base[base_start + match_len] == target[target_pos + match_len] {
match_len += 1;
}

if match_len >= min_match_len &&
(best_match.is_none() || match_len > best_match.unwrap().2) {
best_match = Some((base_start, target_pos, match_len));
}
}
}

if best_match.is_some() {
break; // Found a good match, don't try smaller windows
}
}
}

if let Some((base_start, target_start, match_len)) = best_match {
// Add copy operation
operations.push(DeltaOperation {
opcode: DeltaOpcode::Equal,
base_start,
base_end: base_start + match_len,
target_start,
target_end: target_start + match_len,
});
target_pos += match_len;
} else {
// No match found, find end of literal run
let literal_start = target_pos;
target_pos += 1;

// Continue until we find a potential match
while target_pos < target.len() {
let mut found_match = false;
for window_size in (min_match_len..=std::cmp::min(16, target.len() - target_pos)).rev() {
if target_pos + window_size <= target.len() {
let window = &target[target_pos..target_pos + window_size];
if base_map.contains_key(window) {
found_match = true;
break;
}
}
}
if found_match {
break;
}
target_pos += 1;
}

// Add insert operation for literal data
operations.push(DeltaOperation {
opcode: DeltaOpcode::Insert,
base_start: 0,
base_end: 0,
target_start: literal_start,
target_end: target_pos,
});
}
}

operations
}

/// Creates a Git-compatible delta between base and target buffers
fn create_delta_internal(base_buf: &[u8], target_buf: &[u8]) -> Vec<u8> {
let mut result = Vec::new();

// Write delta header - encoded sizes of base and target
result.extend(delta_encode_size(base_buf.len()));
result.extend(delta_encode_size(target_buf.len()));

// Generate diff operations
let operations = find_delta_operations(base_buf, target_buf);

for operation in operations {
match operation.opcode {
DeltaOpcode::Equal => {
// Copy operation - reference data from base buffer
let mut copy_start = operation.base_start;
let mut copy_len = operation.base_end - operation.base_start;

while copy_len > 0 {
let to_copy = min(copy_len, MAX_COPY_LEN);
result.extend(encode_copy_operation(copy_start, to_copy));
copy_start += to_copy;
copy_len -= to_copy;
}
}
DeltaOpcode::Insert => {
// Insert operation - include literal data from target
let mut remaining = operation.target_end - operation.target_start;
let mut offset = operation.target_start;

while remaining > 127 {
result.push(127);
result.extend_from_slice(&target_buf[offset..offset + 127]);
remaining -= 127;
offset += 127;
}

if remaining > 0 {
result.push(remaining as u8);
result.extend_from_slice(&target_buf[offset..offset + remaining]);
}
}
}
}

result
}

#[pyfunction]
fn create_delta(py: Python, py_base_buf: PyObject, py_target_buf: PyObject) -> PyResult<PyObject> {
let base_buf = py_chunked_as_string(py, &py_base_buf)?;
let target_buf = py_chunked_as_string(py, &py_target_buf)?;

let delta = create_delta_internal(base_buf.as_ref(), target_buf.as_ref());

// Return a list with the delta as a single chunk to maintain compatibility
let py_list = PyList::empty(py);
py_list.append(PyBytes::new(py, &delta))?;
Ok(py_list.into())
}

#[pymodule]
fn _pack(_py: Python, m: &Bound<PyModule>) -> PyResult<()> {
m.add_function(wrap_pyfunction!(bisect_find_sha, m)?)?;
m.add_function(wrap_pyfunction!(apply_delta, m)?)?;
m.add_function(wrap_pyfunction!(create_delta, m)?)?;
Ok(())
}
13 changes: 13 additions & 0 deletions dulwich/object_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,7 @@ def __init__(
pack_depth=None,
pack_threads=None,
pack_big_file_threshold=None,
pack_allow_pack_reuse=None,
) -> None:
self._pack_cache: dict[str, Pack] = {}
self.pack_compression_level = pack_compression_level
Expand All @@ -528,6 +529,7 @@ def __init__(
self.pack_depth = pack_depth
self.pack_threads = pack_threads
self.pack_big_file_threshold = pack_big_file_threshold
self.pack_allow_pack_reuse = pack_allow_pack_reuse

def add_pack(self) -> tuple[BytesIO, Callable[[], None], Callable[[], None]]:
"""Add a new pack to this object store."""
Expand Down Expand Up @@ -619,6 +621,9 @@ def generate_pack_data(
progress=progress,
ofs_delta=ofs_delta,
other_haves=remote_has,
reuse_deltas=self.pack_allow_pack_reuse
if self.pack_allow_pack_reuse is not None
else True,
)

def _clear_cached_packs(self) -> None:
Expand Down Expand Up @@ -937,6 +942,7 @@ def __init__(
pack_depth=None,
pack_threads=None,
pack_big_file_threshold=None,
pack_allow_pack_reuse=None,
) -> None:
"""Open an object store.

Expand All @@ -951,6 +957,7 @@ def __init__(
pack_depth: maximum delta chain depth
pack_threads: number of threads for pack operations
pack_big_file_threshold: threshold for treating files as big
pack_allow_pack_reuse: whether to allow reusing existing pack data
"""
super().__init__(
pack_compression_level=pack_compression_level,
Expand All @@ -961,6 +968,7 @@ def __init__(
pack_depth=pack_depth,
pack_threads=pack_threads,
pack_big_file_threshold=pack_big_file_threshold,
pack_allow_pack_reuse=pack_allow_pack_reuse,
)
self.path = path
self.pack_dir = os.path.join(self.path, PACKDIR)
Expand Down Expand Up @@ -1033,6 +1041,9 @@ def from_config(cls, path: Union[str, os.PathLike], config):
except KeyError:
pack_big_file_threshold = None

# Read new pack configuration options
pack_allow_pack_reuse = config.get_boolean((b"pack",), b"allowPackReuse", True)

# Read core.commitGraph setting
use_commit_graph = config.get_boolean((b"core",), b"commitGraph", True)

Expand All @@ -1047,6 +1058,7 @@ def from_config(cls, path: Union[str, os.PathLike], config):
pack_depth,
pack_threads,
pack_big_file_threshold,
pack_allow_pack_reuse,
)
instance._use_commit_graph = use_commit_graph
return instance
Expand Down Expand Up @@ -1568,6 +1580,7 @@ def __init__(self) -> None:
super().__init__()
self._data: dict[str, ShaFile] = {}
self.pack_compression_level = -1
self.pack_allow_pack_reuse = None

def _to_hexsha(self, sha):
if len(sha) == 40:
Expand Down
Loading
Loading