Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(injector): add an extend method to Nucleo's injector #74

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ exclude = ["/typos.toml", "/tarpaulin.toml"]

[dependencies]
nucleo-matcher = { version = "0.3.1", path = "matcher" }
parking_lot = { version = "0.12.1", features = ["send_guard", "arc_lock"]}
parking_lot = { version = "0.12.1", features = ["send_guard", "arc_lock"] }
rayon = "1.7.0"

[workspace]
members = [ "matcher", "bench" ]
members = ["matcher", "bench"]
77 changes: 77 additions & 0 deletions src/boxcar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,83 @@ impl<T> Vec<T> {
index
}

/// Extends the vector by appending multiple elements at once.
pub fn extend<I>(&self, values: I, fill_columns: impl Fn(&T, &mut [Utf32String]))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a non-trivial unsafe function I would like to see some unit tests (just of the boxcar in isolation)

where
I: IntoIterator<Item = T> + ExactSizeIterator,
{
let count: u32 = values
.len()
.try_into()
.expect("overflowed maximum capacity");
if count == 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as said the reported len can be wrong so you want an assert inside this function like assert_eq!(values.next(), None, "values reproted incorrect len")

return;
}

// Reserve all indices at once
let start_index: u32 = self
.inflight
.fetch_add(u64::from(count), Ordering::Release)
.try_into()
.expect("overflowed maximum capacity");

// Compute first and last locations
let start_location = Location::of(start_index);
let end_location = Location::of(start_index + count - 1);

// Allocate necessary buckets upfront
if start_location.bucket != end_location.bucket {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a pessimisation. This is only supposed to be used for avoiding contention on allocating a new shard. That is only needed for the end_bucket and the bucket after it. For the other buckets it's not needed as they will all be allocated contention free from within this function.

The correct logic would look like this:

let alloc_entry = end_location.bucket_len - (end_location.bucket_len >> 3);
if end_location.entry >= alloc_entry && (start_location.bucket != end_location.bucket || start_location.entry <= alloc_entry) {
    if let Some(next_bucket) = self.buckets.get(end_location.bucket as usize + 1) {
        Vec::get_or_alloc(next_bucket, end_location.bucket_len << 1, self.columns);
    }
} 

if start_location.bucket != end_location.bucket {
    let bucket_ptr = self.buckets.get_unckecked(end_location.bucket as usize);
    Vec::get_or_alloc(bucket_ptr, end_location.bucket_len, self.columns);
}

we probably want to turn all_entry intoa function on Location since it's used in multiple places now

for bucket in start_location.bucket..=end_location.bucket {
if let Some(bucket_ptr) = self.buckets.get(bucket as usize) {
Vec::get_or_alloc(bucket_ptr, Location::bucket_len(bucket), self.columns);
}
}
}

let mut bucket = unsafe { self.buckets.get_unchecked(start_location.bucket as usize) };
let mut entries = bucket.entries.load(Ordering::Acquire);
if entries.is_null() {
entries = Vec::get_or_alloc(
bucket,
Location::bucket_len(start_location.bucket),
self.columns,
);
}
// Route each value to its corresponding bucket
let mut location;
for (i, v) in values.into_iter().enumerate() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExactSizeItreator is a safe trait that can have bugs/lie about it's size. Unsafe code cannot rely on the reported length being correct. You need to track the index (with .enumrate()) and if we go past the claimed length you need to panic (less is fine since that would just mean a permanent gap in the vec)

location =
Location::of(start_index + u32::try_from(i).expect("overflowed maximum capacity"));

unsafe {
let entry = Bucket::get(entries, location.entry, self.columns);

// Initialize matcher columns
for col in Entry::matcher_cols_raw(entry, self.columns) {
col.get().write(MaybeUninit::new(Utf32String::default()));
}
fill_columns(&v, Entry::matcher_cols_mut(entry, self.columns));
(*entry).slot.get().write(MaybeUninit::new(v));
(*entry).active.store(true, Ordering::Release);
}

// if we are at the end of the bucket, move on to the next one
if location.entry == location.bucket_len - 1 {
// safety: `location.bucket + 1` is always in bounds
bucket = unsafe { self.buckets.get_unchecked((location.bucket + 1) as usize) };
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is true. end_location could be the last bucket (which would make this UB).

I think this check should be at the start of the function (and simply check wether the bucket changed compared to the previous location)

entries = bucket.entries.load(Ordering::Acquire);

if entries.is_null() {
entries = Vec::get_or_alloc(
bucket,
Location::bucket_len(location.bucket + 1),
self.columns,
);
}
}
}
}

/// race to initialize a bucket
fn get_or_alloc(bucket: &Bucket<T>, len: u32, cols: u32) -> *mut Entry<T> {
let entries = unsafe { Bucket::alloc(len, cols) };
Expand Down
17 changes: 17 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,23 @@ impl<T> Injector<T> {
idx
}

/// Appends multiple elements to the list of matched items.
/// This function is lock-free and wait-free.
///
/// You should favor this function over `push` if at least one of the following is true:
/// - the number of items you're adding can be computed beforehand and is typically larger
/// than 1k
/// - you're able to batch incoming items
/// - you're adding items from multiple threads concurrently (this function results in less
/// contention)
pub fn extend<I>(&self, values: I, fill_columns: impl Fn(&T, &mut [Utf32String]))
where
I: IntoIterator<Item = T> + ExactSizeIterator,
{
self.items.extend(values, fill_columns);
(self.notify)();
}

/// Returns the total number of items injected in the matcher. This might
/// not match the number of items in the match snapshot (if the matcher
/// is still running)
Expand Down