Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions cpp/include/kvikio/file_handle_rangelock.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright (c) 2025, NVIDIA CORPORATION.
*
* Modified FileHandle with range-based locking support
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#pragma once

#include <kvikio/file_handle.hpp>
#include <kvikio/range_lock.hpp>
#include <memory>

namespace kvikio {

class FileHandleWithRangeLock : public FileHandle {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the class needs to be commented in detail too, to explain the purpose of this class and also key implementation details. For example, when multiple write requests contend on a common range, what would the behavior be?

private:
mutable RangeLockManager range_lock_manager_;

public:
using FileHandle::FileHandle; // Inherit constructors

/**
* @brief Write with range-based locking
*
* This version acquires a lock only for the specific range being written,
* allowing non-overlapping writes to proceed in parallel.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function parameters and return values should be commented in Doxygen format.

std::future<std::size_t> pwrite_rangelock(void const* buf,
std::size_t size,
std::size_t file_offset = 0,
std::size_t task_size = defaults::task_size(),
std::size_t gds_threshold = defaults::gds_threshold(),
bool sync_default_stream = true) {

// Acquire range lock for this write
auto range_lock = range_lock_manager_.lock_range(file_offset, file_offset + size);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KvikIO has transitioned from a header-only library to a shared library. So all the implementations should go to corresponding .cpp files.


// Perform the write using the base class implementation
auto future = this->pwrite(buf, size, file_offset, task_size, gds_threshold, sync_default_stream);

// Create a wrapper future that releases the lock when done
return std::async(std::launch::deferred, [future = std::move(future),
lock = std::move(range_lock)]() mutable {
auto result = future.get();
// Lock will be automatically released when this lambda exits
return result;
Comment on lines +41 to +45
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For parallel I/O, the step to wait for the chunked tasks is performed in the thread pool (https://github.com/rapidsai/kvikio/blob/branch-25.12/cpp/include/kvikio/parallel_operation.hpp#L175-L184). Would it be possible that this unlock step here follows suit and gets pushed to the thread pool's task queue as well?

});
Comment on lines +41 to +46
Copy link

Copilot AI Sep 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using std::launch::deferred prevents the operation from running in parallel. The lambda will only execute when .get() is called, defeating the purpose of parallel I/O. Consider using std::launch::async or a different approach to maintain parallelism while ensuring proper lock cleanup.

Copilot uses AI. Check for mistakes.
}

/**
* @brief Read with range-based locking (optional, for consistency)
*/
std::future<std::size_t> pread_rangelock(void* buf,
std::size_t size,
std::size_t file_offset = 0,
std::size_t task_size = defaults::task_size(),
std::size_t gds_threshold = defaults::gds_threshold(),
bool sync_default_stream = true) {

// For reads, we could use shared locks if needed
// For now, using exclusive locks for simplicity
auto range_lock = range_lock_manager_.lock_range(file_offset, file_offset + size);

auto future = this->pread(buf, size, file_offset, task_size, gds_threshold, sync_default_stream);

return std::async(std::launch::deferred, [future = std::move(future),
lock = std::move(range_lock)]() mutable {
Comment on lines +65 to +66
Copy link

Copilot AI Sep 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same deferred execution issue as in pwrite_rangelock. This prevents the read operation from running in parallel, contradicting the parallel I/O goals of the feature.

Suggested change
return std::async(std::launch::deferred, [future = std::move(future),
lock = std::move(range_lock)]() mutable {
return std::async(std::launch::async, [future = std::move(future),
lock = std::move(range_lock)]() mutable {

Copilot uses AI. Check for mistakes.
auto result = future.get();
return result;
});
}

/**
* @brief Check if a range is currently locked
*/
bool is_range_locked(std::size_t start, std::size_t end) const {
return range_lock_manager_.is_range_locked(start, end);
}

/**
* @brief Get statistics about locked ranges
*/
std::size_t num_locked_ranges() const {
return range_lock_manager_.num_locked_ranges();
}
};

} // namespace kvikio
119 changes: 119 additions & 0 deletions cpp/include/kvikio/range_lock.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright (c) 2025, NVIDIA CORPORATION.
*
* Range-based locking for parallel file I/O
* This allows non-overlapping ranges to be written in parallel
*/
#pragma once

#include <map>
#include <mutex>
#include <condition_variable>
#include <set>
#include <memory>

namespace kvikio {

class RangeLockManager {
public:
struct Range {
std::size_t start;
std::size_t end;

bool overlaps(const Range& other) const {
return !(end <= other.start || start >= other.end);
}

bool operator<(const Range& other) const {
return start < other.start;
}
};

private:
mutable std::mutex mutex_;
std::condition_variable cv_;
std::set<Range> locked_ranges_;

public:
class RangeLock {
private:
RangeLockManager* manager_;
Range range_;
bool locked_;

public:
RangeLock(RangeLockManager* manager, std::size_t start, std::size_t end)
: manager_(manager), range_{start, end}, locked_(false) {
lock();
}

~RangeLock() {
if (locked_) {
unlock();
}
}

// Move only
RangeLock(const RangeLock&) = delete;
RangeLock& operator=(const RangeLock&) = delete;
RangeLock(RangeLock&& other) noexcept
: manager_(other.manager_), range_(other.range_), locked_(other.locked_) {
other.locked_ = false;
}

void lock() {
if (locked_) return;

std::unique_lock<std::mutex> lock(manager_->mutex_);

// Wait until no overlapping ranges are locked
manager_->cv_.wait(lock, [this]() {
for (const auto& locked_range : manager_->locked_ranges_) {
if (range_.overlaps(locked_range)) {
return false;
}
}
return true;
});

// Lock this range
manager_->locked_ranges_.insert(range_);
locked_ = true;
}

void unlock() {
if (!locked_) return;

std::unique_lock<std::mutex> lock(manager_->mutex_);
manager_->locked_ranges_.erase(range_);
locked_ = false;

// Notify waiting threads
manager_->cv_.notify_all();
}
};

std::unique_ptr<RangeLock> lock_range(std::size_t start, std::size_t end) {
return std::make_unique<RangeLock>(this, start, end);
}

// Check if a range is currently locked
bool is_range_locked(std::size_t start, std::size_t end) const {
std::unique_lock<std::mutex> lock(mutex_);
Range query{start, end};
for (const auto& locked_range : locked_ranges_) {
if (query.overlaps(locked_range)) {
return true;
}
}
return false;
}

// Get number of currently locked ranges
std::size_t num_locked_ranges() const {
std::unique_lock<std::mutex> lock(mutex_);
return locked_ranges_.size();
}
};

} // namespace kvikio
2 changes: 2 additions & 0 deletions cpp/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ kvikio_add_test(NAME ERROR_TEST SOURCES test_error.cpp)

kvikio_add_test(NAME MMAP_TEST SOURCES test_mmap.cpp)

kvikio_add_test(NAME RANGE_LOCK_TEST SOURCES test_range_lock.cpp)

if(KvikIO_REMOTE_SUPPORT)
kvikio_add_test(NAME REMOTE_HANDLE_TEST SOURCES test_remote_handle.cpp utils/env.cpp)
kvikio_add_test(NAME HDFS_TEST SOURCES test_hdfs.cpp utils/hdfs_helper.cpp)
Expand Down
Loading
Loading