diff --git a/cpp/include/kvikio/file_handle_rangelock.hpp b/cpp/include/kvikio/file_handle_rangelock.hpp new file mode 100644 index 0000000000..06adf05337 --- /dev/null +++ b/cpp/include/kvikio/file_handle_rangelock.hpp @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Modified FileHandle with range-based locking support + */ +#pragma once + +#include +#include +#include + +namespace kvikio { + +class FileHandleWithRangeLock : public FileHandle { +private: + mutable RangeLockManager range_lock_manager_; + +public: + using FileHandle::FileHandle; // Inherit constructors + + /** + * @brief Write with range-based locking + * + * This version acquires a lock only for the specific range being written, + * allowing non-overlapping writes to proceed in parallel. + */ + std::future pwrite_rangelock(void const* buf, + std::size_t size, + std::size_t file_offset = 0, + std::size_t task_size = defaults::task_size(), + std::size_t gds_threshold = defaults::gds_threshold(), + bool sync_default_stream = true) { + + // Acquire range lock for this write + auto range_lock = range_lock_manager_.lock_range(file_offset, file_offset + size); + + // Perform the write using the base class implementation + auto future = this->pwrite(buf, size, file_offset, task_size, gds_threshold, sync_default_stream); + + // Create a wrapper future that releases the lock when done + return std::async(std::launch::deferred, [future = std::move(future), + lock = std::move(range_lock)]() mutable { + auto result = future.get(); + // Lock will be automatically released when this lambda exits + return result; + }); + } + + /** + * @brief Read with range-based locking (optional, for consistency) + */ + std::future pread_rangelock(void* buf, + std::size_t size, + std::size_t file_offset = 0, + std::size_t task_size = defaults::task_size(), + std::size_t gds_threshold = defaults::gds_threshold(), + bool sync_default_stream = true) { + + // For reads, we could use shared locks if needed + // For now, using exclusive locks for simplicity + auto range_lock = range_lock_manager_.lock_range(file_offset, file_offset + size); + + auto future = this->pread(buf, size, file_offset, task_size, gds_threshold, sync_default_stream); + + return std::async(std::launch::deferred, [future = std::move(future), + lock = std::move(range_lock)]() mutable { + auto result = future.get(); + return result; + }); + } + + /** + * @brief Check if a range is currently locked + */ + bool is_range_locked(std::size_t start, std::size_t end) const { + return range_lock_manager_.is_range_locked(start, end); + } + + /** + * @brief Get statistics about locked ranges + */ + std::size_t num_locked_ranges() const { + return range_lock_manager_.num_locked_ranges(); + } +}; + +} // namespace kvikio \ No newline at end of file diff --git a/cpp/include/kvikio/range_lock.hpp b/cpp/include/kvikio/range_lock.hpp new file mode 100644 index 0000000000..9d9935a743 --- /dev/null +++ b/cpp/include/kvikio/range_lock.hpp @@ -0,0 +1,119 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Range-based locking for parallel file I/O + * This allows non-overlapping ranges to be written in parallel + */ +#pragma once + +#include +#include +#include +#include +#include + +namespace kvikio { + +class RangeLockManager { +public: + struct Range { + std::size_t start; + std::size_t end; + + bool overlaps(const Range& other) const { + return !(end <= other.start || start >= other.end); + } + + bool operator<(const Range& other) const { + return start < other.start; + } + }; + +private: + mutable std::mutex mutex_; + std::condition_variable cv_; + std::set locked_ranges_; + +public: + class RangeLock { + private: + RangeLockManager* manager_; + Range range_; + bool locked_; + + public: + RangeLock(RangeLockManager* manager, std::size_t start, std::size_t end) + : manager_(manager), range_{start, end}, locked_(false) { + lock(); + } + + ~RangeLock() { + if (locked_) { + unlock(); + } + } + + // Move only + RangeLock(const RangeLock&) = delete; + RangeLock& operator=(const RangeLock&) = delete; + RangeLock(RangeLock&& other) noexcept + : manager_(other.manager_), range_(other.range_), locked_(other.locked_) { + other.locked_ = false; + } + + void lock() { + if (locked_) return; + + std::unique_lock lock(manager_->mutex_); + + // Wait until no overlapping ranges are locked + manager_->cv_.wait(lock, [this]() { + for (const auto& locked_range : manager_->locked_ranges_) { + if (range_.overlaps(locked_range)) { + return false; + } + } + return true; + }); + + // Lock this range + manager_->locked_ranges_.insert(range_); + locked_ = true; + } + + void unlock() { + if (!locked_) return; + + std::unique_lock lock(manager_->mutex_); + manager_->locked_ranges_.erase(range_); + locked_ = false; + + // Notify waiting threads + manager_->cv_.notify_all(); + } + }; + + std::unique_ptr lock_range(std::size_t start, std::size_t end) { + return std::make_unique(this, start, end); + } + + // Check if a range is currently locked + bool is_range_locked(std::size_t start, std::size_t end) const { + std::unique_lock lock(mutex_); + Range query{start, end}; + for (const auto& locked_range : locked_ranges_) { + if (query.overlaps(locked_range)) { + return true; + } + } + return false; + } + + // Get number of currently locked ranges + std::size_t num_locked_ranges() const { + std::unique_lock lock(mutex_); + return locked_ranges_.size(); + } +}; + +} // namespace kvikio \ No newline at end of file diff --git a/cpp/tests/CMakeLists.txt b/cpp/tests/CMakeLists.txt index afa7e8d97b..825be489ac 100644 --- a/cpp/tests/CMakeLists.txt +++ b/cpp/tests/CMakeLists.txt @@ -76,6 +76,8 @@ kvikio_add_test(NAME ERROR_TEST SOURCES test_error.cpp) kvikio_add_test(NAME MMAP_TEST SOURCES test_mmap.cpp) +kvikio_add_test(NAME RANGE_LOCK_TEST SOURCES test_range_lock.cpp) + if(KvikIO_REMOTE_SUPPORT) kvikio_add_test(NAME REMOTE_HANDLE_TEST SOURCES test_remote_handle.cpp utils/env.cpp) kvikio_add_test(NAME HDFS_TEST SOURCES test_hdfs.cpp utils/hdfs_helper.cpp) diff --git a/cpp/tests/test_range_lock.cpp b/cpp/tests/test_range_lock.cpp new file mode 100644 index 0000000000..02b5cb1bb6 --- /dev/null +++ b/cpp/tests/test_range_lock.cpp @@ -0,0 +1,195 @@ +/* + * Copyright (c) 2025, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include "utils/utils.hpp" + +#include +#include +#include +#include + +using namespace kvikio::test; + +class RangeLockTest : public testing::Test { + protected: + void SetUp() override + { + TempDir tmp_dir{false}; + _filepath = tmp_dir.path() / "test_rangelock"; + } + + void TearDown() override {} + + std::filesystem::path _filepath; +}; + +TEST_F(RangeLockTest, non_overlapping_ranges) +{ + kvikio::RangeLockManager lock_manager; + + // Test that non-overlapping ranges can be locked simultaneously + auto lock1 = lock_manager.lock_range(0, 100); + auto lock2 = lock_manager.lock_range(100, 200); + + EXPECT_TRUE(lock_manager.is_range_locked(0, 100)); + EXPECT_TRUE(lock_manager.is_range_locked(100, 200)); + EXPECT_FALSE(lock_manager.is_range_locked(200, 300)); + + EXPECT_EQ(lock_manager.num_locked_ranges(), 2); +} + +TEST_F(RangeLockTest, overlapping_ranges_serialize) +{ + kvikio::RangeLockManager lock_manager; + std::atomic counter{0}; + std::atomic first_completed{false}; + + std::thread t1([&]() { + auto lock = lock_manager.lock_range(0, 100); + counter++; + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + first_completed = true; + }); + + std::thread t2([&]() { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + auto lock = lock_manager.lock_range(50, 150); // Overlaps with [0, 100) + counter++; + // Should only acquire after t1 releases + EXPECT_TRUE(first_completed.load()); + }); + + t1.join(); + t2.join(); + + EXPECT_EQ(counter.load(), 2); +} + +TEST_F(RangeLockTest, file_handle_parallel_writes) +{ + const size_t chunk_size = 1024; + const size_t num_chunks = 10; + + // Create test data + std::vector data_a(chunk_size, 0xAA); + std::vector data_b(chunk_size, 0xBB); + + { + kvikio::FileHandleWithRangeLock f(_filepath, "w+"); + + std::vector threads; + auto start = std::chrono::steady_clock::now(); + + // Thread A writes even chunks + threads.emplace_back([&]() { + for (size_t i = 0; i < num_chunks; i += 2) { + auto future = f.pwrite_rangelock(data_a.data(), chunk_size, i * chunk_size); + future.get(); + } + }); + + // Thread B writes odd chunks + threads.emplace_back([&]() { + for (size_t i = 1; i < num_chunks; i += 2) { + auto future = f.pwrite_rangelock(data_b.data(), chunk_size, i * chunk_size); + future.get(); + } + }); + + for (auto& t : threads) { + t.join(); + } + + auto duration = std::chrono::duration_cast( + std::chrono::steady_clock::now() - start).count(); + + // Verify no ranges remain locked + EXPECT_EQ(f.num_locked_ranges(), 0); + + // Verify written data + std::vector verify(chunk_size); + for (size_t i = 0; i < num_chunks; i++) { + auto future = f.pread(verify.data(), chunk_size, i * chunk_size); + future.get(); + + uint8_t expected = (i % 2 == 0) ? 0xAA : 0xBB; + EXPECT_EQ(verify[0], expected); + EXPECT_EQ(verify[chunk_size - 1], expected); + } + } +} + +TEST_F(RangeLockTest, range_lock_move_semantics) +{ + kvikio::RangeLockManager lock_manager; + + { + auto lock1 = lock_manager.lock_range(0, 100); + EXPECT_EQ(lock_manager.num_locked_ranges(), 1); + + // Move constructor + auto lock2 = std::move(lock1); + EXPECT_EQ(lock_manager.num_locked_ranges(), 1); + + // Original lock should be invalidated after move + // lock2 still holds the lock + } + + // Lock should be released when lock2 goes out of scope + EXPECT_EQ(lock_manager.num_locked_ranges(), 0); +} + +TEST_F(RangeLockTest, concurrent_non_overlapping_performance) +{ + kvikio::RangeLockManager lock_manager; + const int num_threads = 4; + const int ops_per_thread = 100; + + auto worker = [&](int thread_id) { + for (int i = 0; i < ops_per_thread; i++) { + size_t start = thread_id * 1000 + i * 10; + size_t end = start + 5; + auto lock = lock_manager.lock_range(start, end); + // Simulate some work + std::this_thread::sleep_for(std::chrono::microseconds(10)); + } + }; + + std::vector threads; + auto start = std::chrono::steady_clock::now(); + + for (int i = 0; i < num_threads; i++) { + threads.emplace_back(worker, i); + } + + for (auto& t : threads) { + t.join(); + } + + auto duration = std::chrono::duration_cast( + std::chrono::steady_clock::now() - start).count(); + + // All ranges were non-overlapping, so they should execute in parallel + // Total time should be much less than serial execution + // Serial would take at least: num_threads * ops_per_thread * 10us = 4000us + // Parallel should be close to: ops_per_thread * 10us = 1000us + // Allow some overhead, but should be significantly faster than serial + EXPECT_LT(duration, 2000); // Should complete in less than 2 seconds + EXPECT_EQ(lock_manager.num_locked_ranges(), 0); +} \ No newline at end of file diff --git a/python/kvikio/tests/test_range_lock.py b/python/kvikio/tests/test_range_lock.py new file mode 100644 index 0000000000..4ade1eaced --- /dev/null +++ b/python/kvikio/tests/test_range_lock.py @@ -0,0 +1,229 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# See file LICENSE for terms. + +import os +import threading +import time +from concurrent.futures import ThreadPoolExecutor, as_completed + +import pytest + +import kvikio +import kvikio.defaults + +cupy = pytest.importorskip("cupy") +numpy = pytest.importorskip("numpy") + + +@pytest.fixture +def range_lock_file(tmp_path): + """Create a temporary file for range lock testing""" + filename = tmp_path / "test-rangelock" + # Pre-allocate file + with open(filename, "wb") as f: + f.seek(10 * 1024 * 1024 - 1) # 10MB file + f.write(b'\0') + return filename + + +def test_parallel_non_overlapping_writes(range_lock_file): + """Test that non-overlapping range writes can execute in parallel""" + chunk_size = 1024 * 1024 # 1MB chunks + num_chunks = 8 + + # Create distinct data for each chunk + chunks = {} + for i in range(num_chunks): + data = numpy.full(chunk_size // 4, i, dtype=numpy.int32) + chunks[i] = cupy.asarray(data) + + def write_chunk(chunk_id): + """Write a specific chunk using range lock""" + offset = chunk_id * chunk_size + with kvikio.CuFile(range_lock_file, "r+") as f: + # Write with range locking (when implemented) + # For now, this tests the basic parallel write capability + f.pwrite(chunks[chunk_id], file_offset=offset) + return chunk_id + + # Write chunks in parallel + start_time = time.time() + with ThreadPoolExecutor(max_workers=4) as executor: + futures = [executor.submit(write_chunk, i) for i in range(num_chunks)] + completed = [f.result() for f in as_completed(futures)] + parallel_time = time.time() - start_time + + # Verify all chunks were written + assert len(completed) == num_chunks + + # Verify data integrity + with kvikio.CuFile(range_lock_file, "r") as f: + for i in range(num_chunks): + offset = i * chunk_size + data = cupy.empty(chunk_size // 4, dtype=cupy.int32) + f.pread(data, file_offset=offset) + expected = cupy.full(chunk_size // 4, i, dtype=cupy.int32) + cupy.testing.assert_array_equal(data, expected) + + print(f"Parallel write time: {parallel_time:.3f}s") + + +def test_overlapping_range_serialization(range_lock_file): + """Test that overlapping ranges are properly serialized""" + chunk_size = 1024 * 1024 # 1MB + overlap_size = chunk_size // 2 + + execution_order = [] + lock = threading.Lock() + + def write_with_overlap(writer_id, offset): + """Write data that potentially overlaps with other writers""" + data = numpy.full(chunk_size // 4, writer_id, dtype=numpy.int32) + gpu_data = cupy.asarray(data) + + with kvikio.CuFile(range_lock_file, "r+") as f: + with lock: + execution_order.append((writer_id, "start")) + f.pwrite(gpu_data, file_offset=offset) + with lock: + execution_order.append((writer_id, "end")) + return writer_id + + # Create overlapping writes + # Writer 0: offset 0, size 1MB + # Writer 1: offset 512KB, size 1MB (overlaps with writer 0) + # Writer 2: offset 1MB, size 1MB (overlaps with writer 1) + + with ThreadPoolExecutor(max_workers=3) as executor: + futures = [ + executor.submit(write_with_overlap, 0, 0), + executor.submit(write_with_overlap, 1, overlap_size), + executor.submit(write_with_overlap, 2, chunk_size), + ] + results = [f.result() for f in as_completed(futures)] + + assert len(results) == 3 + + # Check that execution was properly ordered + # With proper range locking, overlapping ranges should serialize + print(f"Execution order: {execution_order}") + + +def test_range_lock_performance_benefit(range_lock_file): + """Compare performance of range-locked vs serialized writes""" + chunk_size = 512 * 1024 # 512KB chunks + num_operations = 16 + + # Prepare data + data_chunks = [] + for i in range(num_operations): + data = numpy.full(chunk_size // 4, i, dtype=numpy.int32) + data_chunks.append(cupy.asarray(data)) + + # Test 1: Interleaved non-overlapping writes (should benefit from range lock) + def write_interleaved(op_id): + # Even ops write to first half, odd ops to second half + base_offset = (op_id % 2) * (chunk_size * num_operations // 2) + offset = base_offset + (op_id // 2) * chunk_size + + with kvikio.CuFile(range_lock_file, "r+") as f: + f.pwrite(data_chunks[op_id], file_offset=offset) + return op_id + + start = time.time() + with ThreadPoolExecutor(max_workers=4) as executor: + futures = [executor.submit(write_interleaved, i) for i in range(num_operations)] + results = [f.result() for f in as_completed(futures)] + interleaved_time = time.time() - start + + # Test 2: Sequential overlapping writes (no benefit from range lock) + def write_sequential(op_id): + # All write to overlapping regions + offset = op_id * (chunk_size // 2) # 50% overlap + + with kvikio.CuFile(range_lock_file, "r+") as f: + f.pwrite(data_chunks[op_id], file_offset=offset) + return op_id + + start = time.time() + with ThreadPoolExecutor(max_workers=4) as executor: + futures = [executor.submit(write_sequential, i) for i in range(num_operations)] + results = [f.result() for f in as_completed(futures)] + sequential_time = time.time() - start + + print(f"Interleaved (non-overlapping) time: {interleaved_time:.3f}s") + print(f"Sequential (overlapping) time: {sequential_time:.3f}s") + + # With proper range locking, interleaved should be faster + # Without range locking, times might be similar + assert len(results) == num_operations + + +@pytest.mark.parametrize("num_threads", [2, 4, 8]) +def test_concurrent_range_locks(range_lock_file, num_threads): + """Test concurrent acquisition and release of range locks""" + operations_per_thread = 10 + chunk_size = 128 * 1024 # 128KB + + success_counter = threading.Semaphore(0) + + def worker(thread_id): + """Worker that performs multiple range-locked operations""" + for op in range(operations_per_thread): + # Each thread writes to its own range + offset = thread_id * operations_per_thread * chunk_size + op * chunk_size + data = numpy.full(chunk_size // 4, thread_id * 100 + op, dtype=numpy.int32) + gpu_data = cupy.asarray(data) + + with kvikio.CuFile(range_lock_file, "r+") as f: + f.pwrite(gpu_data, file_offset=offset) + + success_counter.release() + return thread_id + + start = time.time() + with ThreadPoolExecutor(max_workers=num_threads) as executor: + futures = [executor.submit(worker, i) for i in range(num_threads)] + results = [f.result() for f in as_completed(futures)] + duration = time.time() - start + + # Verify all operations completed + total_ops = num_threads * operations_per_thread + for _ in range(total_ops): + assert success_counter.acquire(timeout=0.001) + + print(f"Completed {total_ops} operations with {num_threads} threads in {duration:.3f}s") + assert len(results) == num_threads + + +def test_range_lock_with_different_sizes(range_lock_file): + """Test range locks with varying data sizes""" + sizes = [64 * 1024, 256 * 1024, 1024 * 1024] # 64KB, 256KB, 1MB + + def write_variable_size(op_id, size): + """Write data of variable size""" + offset = sum(sizes[:op_id]) # Non-overlapping offsets + data = numpy.full(size // 4, op_id, dtype=numpy.int32) + gpu_data = cupy.asarray(data) + + with kvikio.CuFile(range_lock_file, "r+") as f: + bytes_written = f.pwrite(gpu_data, file_offset=offset) + assert bytes_written == size + return (op_id, size) + + with ThreadPoolExecutor(max_workers=len(sizes)) as executor: + futures = [executor.submit(write_variable_size, i, size) + for i, size in enumerate(sizes)] + results = [f.result() for f in as_completed(futures)] + + assert len(results) == len(sizes) + + # Verify data + with kvikio.CuFile(range_lock_file, "r") as f: + offset = 0 + for op_id, size in enumerate(sizes): + data = cupy.empty(size // 4, dtype=cupy.int32) + f.pread(data, file_offset=offset) + expected = cupy.full(size // 4, op_id, dtype=cupy.int32) + cupy.testing.assert_array_equal(data, expected) + offset += size \ No newline at end of file