From 9785330de9ae73bdefac012a43855b589d327272 Mon Sep 17 00:00:00 2001 From: Michal Zientkiewicz Date: Thu, 24 Oct 2024 17:23:36 +0200 Subject: [PATCH 1/5] ExternalSource refactoring and fixing - refactor CachingList - use proper "item" wrapper type - remove "apprentice" and simplify lookahead logic - remove weird make_unique dependence - use r-value when recycling - refactor ExternalSource - store TensorList by value in list items - move all iteration data to one structure and queue - associate events directly with data items - use global event pool and obtain events for proper streams ---- Signed-off-by: Michal Zientkiewicz --- dali/pipeline/operator/builtin/caching_list.h | 84 ++++--- .../operator/builtin/caching_list_test.cc | 40 ++-- .../operator/builtin/input_operator.cc | 97 ++++---- .../operator/builtin/input_operator.h | 210 +++++++----------- dali/test/python/test_external_source_cupy.py | 31 +-- 5 files changed, 201 insertions(+), 261 deletions(-) diff --git a/dali/pipeline/operator/builtin/caching_list.h b/dali/pipeline/operator/builtin/caching_list.h index 0936fc249f6..edd212535ac 100644 --- a/dali/pipeline/operator/builtin/caching_list.h +++ b/dali/pipeline/operator/builtin/caching_list.h @@ -1,4 +1,4 @@ -// Copyright (c) 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -15,11 +15,13 @@ #ifndef DALI_PIPELINE_OPERATOR_BUILTIN_CACHING_LIST_H_ #define DALI_PIPELINE_OPERATOR_BUILTIN_CACHING_LIST_H_ -#include #include #include +#include +#include namespace dali { + /** * CachingList differs from std::List by the ability to recycle empty elements. When allocating * memory is expensive it is better to store already allocated but no longer needed element in the @@ -47,6 +49,19 @@ class CachingList { public: CachingList() : prophet_(full_data_.end()) {} + class Item { + public: + Item() = default; + T &operator*() const & noexcept { return l_.front(); } + T &&operator*() && noexcept { return l_.front(); } + + T *operator->() const & noexcept { return &l_.front(); } + private: + explicit Item(std::list &&l) : l_(std::move(l)) {} + mutable std::list l_; + friend class CachingList; + }; + bool IsEmpty() const { return full_data_.empty(); @@ -58,50 +73,43 @@ class CachingList { } - std::list PopFront() { - assert(!full_data_.empty()); // Can't pop from an empty list + Item PopFront() { + if (full_data_.empty()) + throw std::out_of_range("Cannot pop an item from an empty list"); std::list tmp; tmp.splice(tmp.begin(), full_data_, full_data_.begin()); if (tmp.begin() == prophet_) prophet_ = full_data_.begin(); - return tmp; + assert(tmp.size() == 1u); + return Item(std::move(tmp)); } - void Recycle(std::list &elm) { - empty_data_.splice(empty_data_.end(), elm, elm.begin()); + void Recycle(Item &&elm) { + empty_data_.splice(empty_data_.end(), elm.l_, elm.l_.begin(), elm.l_.end()); } - std::list GetEmpty() { + Item GetEmpty() { std::list tmp; if (empty_data_.empty()) { - tmp.emplace_back(std::make_unique()); + tmp.emplace_back(); } else { tmp.splice(tmp.begin(), empty_data_, empty_data_.begin()); } - return tmp; + return Item(std::move(tmp)); } - void PushBack(std::list &elm) { - full_data_.splice(full_data_.end(), elm, elm.begin()); - /* - * When the prophet is dead and needs to be resurrected, - * he shall be resurrected by the apprentice. - * In the special scenario, when prophet is dead and the data list is empty - * (hence the apprentice is dead too), the prophet will be resurrected - * from scratch, by assigning him to the element that was just added to the data list. - * Sic mundus creatus est. - */ - if (resurrect_prophet_) { - if (full_data_.size() == 1) { - prophet_ = full_data_.begin(); - } else { - prophet_ = std::next(apprentice_); - } - resurrect_prophet_ = false; - } + void PushBack(Item &&elm) { + if (elm.l_.empty()) + throw std::logic_error("The element is empty - has it been moved out?"); + + // If the "prophet" is at the end of the list, we'll need to restore it to point to the + // beginning of the newly appended item. + if (prophet_ == full_data_.end() || full_data_.empty()) + prophet_ = elm.l_.begin(); + full_data_.splice(full_data_.end(), elm.l_, elm.l_.begin(), elm.l_.end()); } @@ -119,8 +127,7 @@ class CachingList { throw std::out_of_range( "Attempted to move to the data batch that doesn't exist. Add more elements to" " the DALI input operator."); - apprentice_ = prophet_++; - resurrect_prophet_ = prophet_ == full_data_.end(); + ++prophet_; } @@ -132,20 +139,9 @@ class CachingList { std::list full_data_; std::list empty_data_; - /** - * Prophet dies when he hits the end() iterator of the list with the data. - * Prophet can be resurrected, iff there is a data record for him, i.e. - * when user calls PushBack and therefore inserts the data at the end - * of the CachingList - */ - bool resurrect_prophet_ = true; - - /** - * The apprentice follows the prophet and is always one step behind him. - * Apprentice is used to resurrect the prophet, so that the prophet might - * again point to the last actual element of the list. - */ - typename std::list::iterator prophet_, apprentice_; + // The "prophet" is a separate lookahead pointer into the list, used for peeking into + // future items without altering the contents of the list. + typename std::list::iterator prophet_; }; } // namespace dali diff --git a/dali/pipeline/operator/builtin/caching_list_test.cc b/dali/pipeline/operator/builtin/caching_list_test.cc index ee83c6c894d..e22b2ad084a 100644 --- a/dali/pipeline/operator/builtin/caching_list_test.cc +++ b/dali/pipeline/operator/builtin/caching_list_test.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -14,6 +14,7 @@ #include "dali/pipeline/operator/builtin/caching_list.h" #include +#include namespace dali::test { @@ -33,50 +34,49 @@ struct TestType { TEST(CachingListTest, ProphetTest) { - CachingList>> cl; + CachingList> cl; auto push = [&](int val) { auto elem = cl.GetEmpty(); - elem.emplace_back(std::make_unique>()); - elem.front()->val = val; - cl.PushBack(elem); + elem->val = val; + cl.PushBack(std::move(elem)); }; ASSERT_THROW(cl.PeekProphet(), std::out_of_range); push(6); - EXPECT_EQ(*cl.PeekProphet(), 6); + EXPECT_EQ(cl.PeekProphet(), 6); push(9); - EXPECT_EQ(*cl.PeekProphet(), 6); + EXPECT_EQ(cl.PeekProphet(), 6); cl.AdvanceProphet(); - EXPECT_EQ(*cl.PeekProphet(), 9); + EXPECT_EQ(cl.PeekProphet(), 9); push(13); - EXPECT_EQ(*cl.PeekProphet(), 9); + EXPECT_EQ(cl.PeekProphet(), 9); cl.AdvanceProphet(); - EXPECT_EQ(*cl.PeekProphet(), 13); + EXPECT_EQ(cl.PeekProphet(), 13); push(42); - EXPECT_EQ(*cl.PeekProphet(), 13); + EXPECT_EQ(cl.PeekProphet(), 13); push(69); - EXPECT_EQ(*cl.PeekProphet(), 13); + EXPECT_EQ(cl.PeekProphet(), 13); cl.AdvanceProphet(); - EXPECT_EQ(*cl.PeekProphet(), 42); + EXPECT_EQ(cl.PeekProphet(), 42); cl.AdvanceProphet(); - EXPECT_EQ(*cl.PeekProphet(), 69); + EXPECT_EQ(cl.PeekProphet(), 69); cl.AdvanceProphet(); ASSERT_THROW(cl.PeekProphet(), std::out_of_range); push(666); - EXPECT_EQ(*cl.PeekProphet(), 666); + EXPECT_EQ(cl.PeekProphet(), 666); push(1337); - EXPECT_EQ(*cl.PeekProphet(), 666); + EXPECT_EQ(cl.PeekProphet(), 666); cl.AdvanceProphet(); - EXPECT_EQ(*cl.PeekProphet(), 1337); + EXPECT_EQ(cl.PeekProphet(), 1337); cl.AdvanceProphet(); ASSERT_THROW(cl.PeekProphet(), std::out_of_range); push(1234); - EXPECT_EQ(*cl.PeekProphet(), 1234); + EXPECT_EQ(cl.PeekProphet(), 1234); push(4321); - EXPECT_EQ(*cl.PeekProphet(), 1234); + EXPECT_EQ(cl.PeekProphet(), 1234); cl.AdvanceProphet(); - EXPECT_EQ(*cl.PeekProphet(), 4321); + EXPECT_EQ(cl.PeekProphet(), 4321); cl.AdvanceProphet(); ASSERT_THROW(cl.PeekProphet(), std::out_of_range); ASSERT_THROW(cl.AdvanceProphet(), std::out_of_range); diff --git a/dali/pipeline/operator/builtin/input_operator.cc b/dali/pipeline/operator/builtin/input_operator.cc index 3f9b11eea65..ba2bcc1979e 100644 --- a/dali/pipeline/operator/builtin/input_operator.cc +++ b/dali/pipeline/operator/builtin/input_operator.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -51,28 +51,27 @@ template<> void InputOperator::ForwardCurrentData(TensorList &target, std::optional &target_data_id, ThreadPool &thread_pool) { - std::list tensor_list_elm; + queue_item_t tensor_list_elm; { - std::unique_lock busy_lock(busy_m_); + std::lock_guard busy_lock(busy_m_); tensor_list_elm = tl_data_.PopFront(); - state_.pop_front(); } - target_data_id = std::move(tensor_list_elm.front().data_id); - tensor_list_elm.front().data_id = std::nullopt; + target_data_id = std::move(tensor_list_elm->data_id); + tensor_list_elm->data_id = std::nullopt; // if the output is pinned and input not it needs to be copied - if (target.is_pinned() && !tensor_list_elm.front()->is_pinned()) { - const auto &shapes = tensor_list_elm.front()->shape(); + if (target.is_pinned() && !tensor_list_elm->data.is_pinned()) { + const auto &shapes = tensor_list_elm->data.shape(); auto curr_batch_size = shapes.num_samples(); - target.Resize(shapes, tensor_list_elm.front()->type()); + target.Resize(shapes, tensor_list_elm->data.type()); // as we copy element by element and the output is contiguous we need to set layout // for the whole output not each element(view) - target.SetLayout(tensor_list_elm.front()->GetLayout()); + target.SetLayout(tensor_list_elm->data.GetLayout()); for (int sample_id = 0; sample_id < curr_batch_size; ++sample_id) { thread_pool.AddWork( [&target, sample_id, &tensor_list_elm](int tid) { - target.CopySample(sample_id, *tensor_list_elm.front(), sample_id, + target.CopySample(sample_id, tensor_list_elm->data, sample_id, AccessOrder::host()); }, shapes.tensor_size(sample_id)); @@ -80,9 +79,9 @@ void InputOperator::ForwardCurrentData(TensorList &targe thread_pool.RunAll(); } else { // swap output with tensor_list_elm content - std::swap(target, *tensor_list_elm.front()); + std::swap(target, tensor_list_elm->data); } - RecycleBuffer(tensor_list_elm); + RecycleBuffer(std::move(tensor_list_elm)); } @@ -90,36 +89,24 @@ template<> void InputOperator::ForwardCurrentData(TensorList &target, std::optional &target_data_id, cudaStream_t stream) { - std::list tensor_list_elm; - std::list internal_copy_to_storage; - InputSourceState state_info; + queue_item_t tensor_list_elm; { - std::unique_lock busy_lock(busy_m_); + std::lock_guard busy_lock(busy_m_); tensor_list_elm = tl_data_.PopFront(); - state_info = state_.front(); - state_.pop_front(); - // even with no_copy we may have copied from TensorList to TensorList and we - // need to sync with that - if (!state_info.no_copy || state_info.copied_shared_data) { - internal_copy_to_storage = copy_to_storage_events_.PopFront(); - } } - if (!state_info.no_copy || state_info.copied_shared_data) { - CUDA_CALL(cudaStreamWaitEvent(stream, *internal_copy_to_storage.front(), 0)); + if (tensor_list_elm->copy_complete) { + CUDA_CALL(cudaStreamWaitEvent(stream, tensor_list_elm->copy_complete)); } - target_data_id = std::move(tensor_list_elm.front().data_id); - tensor_list_elm.front().data_id = std::nullopt; - std::swap(target, *tensor_list_elm.front()); + target_data_id = std::move(tensor_list_elm->data_id); + tensor_list_elm->data_id = std::nullopt; + + std::swap(target, tensor_list_elm->data); target.set_order(stream, false); - tensor_list_elm.front()->set_order(internal_copy_order_); + tensor_list_elm->data.set_order(internal_copy_order_); - if (!state_info.no_copy || state_info.copied_shared_data) { - RecycleBuffer(tensor_list_elm, &internal_copy_to_storage); - } else { - RecycleBuffer(tensor_list_elm); - } + RecycleBuffer(std::move(tensor_list_elm)); } @@ -127,22 +114,19 @@ template<> void InputOperator::ForwardCurrentData(TensorList &target, std::optional &target_data_id, cudaStream_t stream) { - std::list tensor_list_elm; - InputSourceState state_info; + queue_item_t tensor_list_elm; { - std::unique_lock busy_lock(busy_m_); + std::lock_guard busy_lock(busy_m_); tensor_list_elm = tl_data_.PopFront(); - state_info = state_.front(); - state_.pop_front(); } - target_data_id = std::move(tensor_list_elm.front().data_id); - tensor_list_elm.front().data_id = std::nullopt; + target_data_id = std::move(tensor_list_elm->data_id); + tensor_list_elm->data_id = std::nullopt; - target.Copy(*tensor_list_elm.front(), stream); + target.Copy(tensor_list_elm->data, stream); - tensor_list_elm.front()->set_order(internal_copy_order_); + tensor_list_elm->data.set_order(internal_copy_order_); - RecycleBuffer(tensor_list_elm); + RecycleBuffer(std::move(tensor_list_elm)); } @@ -150,28 +134,27 @@ template<> void InputOperator::ForwardCurrentData(TensorList &target, std::optional &target_data_id, ThreadPool &thread_pool) { - std::list tensor_list_elm; + queue_item_t tensor_list_elm; { - std::unique_lock busy_lock(busy_m_); + std::lock_guard busy_lock(busy_m_); tensor_list_elm = tl_data_.PopFront(); - state_.pop_front(); } - target_data_id = std::move(tensor_list_elm.front().data_id); - tensor_list_elm.front().data_id = std::nullopt; + target_data_id = std::move(tensor_list_elm->data_id); + tensor_list_elm->data_id = std::nullopt; // if the output is pinned and input not it needs to be copied - if (target.is_pinned() && !tensor_list_elm.front()->is_pinned()) { - const auto &shapes = tensor_list_elm.front()->shape(); + if (target.is_pinned() && !tensor_list_elm->data.is_pinned()) { + const auto &shapes = tensor_list_elm->data.shape(); auto curr_batch_size = shapes.num_samples(); - target.Resize(shapes, tensor_list_elm.front()->type()); + target.Resize(shapes, tensor_list_elm->data.type()); // as we copy element by element and the output is contiguous we need to set layout // for the whole output not each element(view) - target.SetLayout(tensor_list_elm.front()->GetLayout()); + target.SetLayout(tensor_list_elm->data.GetLayout()); for (int sample_id = 0; sample_id < curr_batch_size; ++sample_id) { thread_pool.AddWork( [&target, sample_id, &tensor_list_elm](int tid) { - target.CopySample(sample_id, *tensor_list_elm.front(), sample_id, + target.CopySample(sample_id, tensor_list_elm->data, sample_id, AccessOrder::host()); }, shapes.tensor_size(sample_id)); @@ -179,9 +162,9 @@ void InputOperator::ForwardCurrentData(TensorList &tar thread_pool.RunAll(); } else { // swap output with tensor_list_elm content - std::swap(target, *tensor_list_elm.front()); + std::swap(target, tensor_list_elm->data); } - RecycleBuffer(tensor_list_elm); + RecycleBuffer(std::move(tensor_list_elm)); } diff --git a/dali/pipeline/operator/builtin/input_operator.h b/dali/pipeline/operator/builtin/input_operator.h index b5b01e50428..b8975694223 100644 --- a/dali/pipeline/operator/builtin/input_operator.h +++ b/dali/pipeline/operator/builtin/input_operator.h @@ -33,80 +33,48 @@ namespace dali { namespace detail { -struct CudaEventWrapper : CUDAEvent { - CudaEventWrapper() : CUDAEvent(CUDAEvent::Create()) {} -}; - - -/** - * Custom structure to hold the input data inside the CachingList. - * Apart from holding a pointer to the data, it can also contain other data associated with the - * input data, e.g. name, flavour, colour, etc. - * @tparam Backend - */ template -struct named_pointer_to_tensor_list_t { - using element_type = TensorList; - std::unique_ptr data = nullptr; - std::optional data_id = std::nullopt; - - - named_pointer_to_tensor_list_t(std::unique_ptr data) : // NOLINT - data(std::move(data)) {} - - ~named_pointer_to_tensor_list_t() = default; - - named_pointer_to_tensor_list_t(const named_pointer_to_tensor_list_t &) = delete; - - named_pointer_to_tensor_list_t &operator=(const named_pointer_to_tensor_list_t &) = delete; - - - named_pointer_to_tensor_list_t &operator=(named_pointer_to_tensor_list_t &&other) noexcept { - data = std::move(other.data); - data_id = std::move(other.data_id); - other.data = nullptr; - other.data_id = std::nullopt; - } - - - named_pointer_to_tensor_list_t(named_pointer_to_tensor_list_t &&other) noexcept { - *this = std::move(other); - } - - - auto &operator*() const noexcept { - return *data; - } - +struct InputQueueItem { + class EventLease { + int device_id_ = -1; + CUDAEvent event_; + public: + int device_id() const noexcept { return device_id_; } + + operator cudaEvent_t() const noexcept { return event_; } + explicit operator bool() const noexcept { return event_; } + + void Get(int device_id) { + if (device_id != device_id_) + Put(); + if (!event_) { + event_ = CUDAEventPool::instance().Get(device_id_); + device_id_ = device_id; + } + } - auto &operator->() const noexcept { - return data; - } + void Put() { + if (event_) + CUDAEventPool::instance().Put(std::move(event_), device_id_); + device_id_ = -1; + } + }; + TensorList data; + std::optional data_id = std::nullopt; + EventLease copy_complete; + bool copy_performed = false; + bool copy_requested = false; - void set_id(std::string id) noexcept { - data_id = std::move(id); + cudaEvent_t GetCompletionEvent(int device_id) { + copy_complete.Get(device_id); + return copy_complete; } }; } // namespace detail -struct InputSourceState { - /** - * True if the data that was shared as no_copy required copy regardless. - * Happens for non-contiguous TensorList with GPU memory. - */ - bool copied_shared_data = false; - - /** - * @brief Actual value of no_copy option used in this call. Always false for CopyUserData(...) - * and always true for ShareUserData(...) - */ - bool no_copy = false; -}; - - /** * @brief Option used to override the InputOperator ``no_copy`` parameter * @@ -170,8 +138,8 @@ class InputOperator : public Operator, virtual public BatchSizeProvider std::is_same_v, CPUBackend /* CPUBackend */, GPUBackend /* GPUBackend and MixedBackend */>; - using uptr_tl_type = detail::named_pointer_to_tensor_list_t; - using uptr_cuda_event_type = std::unique_ptr; + using InputQueue = CachingList>; + using queue_item_t = typename InputQueue::Item; public: explicit InputOperator(const OpSpec &spec) : @@ -274,7 +242,7 @@ class InputOperator : public Operator, virtual public BatchSizeProvider * Checks if there is more data in queue to be loaded. */ bool HasDataInQueue() const { - return !state_.empty(); + return !tl_data_.IsEmpty(); } @@ -288,9 +256,9 @@ class InputOperator : public Operator, virtual public BatchSizeProvider void HandleDataAvailability() { std::unique_lock busy_lock(busy_m_); if (blocking_) { - cv_.wait(busy_lock, [&data = state_] { return !data.empty(); }); + cv_.wait(busy_lock, [&] { return HasDataInQueue(); }); } else { - if (state_.empty()) { + if (!HasDataInQueue()) { DALI_FAIL("No data was provided to the InputOperator. Make sure to feed it properly."); } } @@ -324,7 +292,7 @@ class InputOperator : public Operator, virtual public BatchSizeProvider * Peeks the data that is next in line. */ const TensorList &PeekCurrentData() { - return *tl_data_.PeekFront(); + return tl_data_.PeekFront().data; } @@ -335,7 +303,7 @@ class InputOperator : public Operator, virtual public BatchSizeProvider return !running || data.CanProphetAdvance(); }); } - return tl_data_.PeekProphet()->num_samples(); + return tl_data_.PeekProphet().data.num_samples(); } @@ -370,18 +338,10 @@ class InputOperator : public Operator, virtual public BatchSizeProvider private: - // pass cuda_event by pointer to allow default, nullptr value, with the - // reference it is not that easy - template - void RecycleBuffer(DataType &data, - std::list *cuda_event = nullptr, - std::list *copy_to_gpu = nullptr) { - // No need to synchronize on copy_to_gpu - it was already synchronized before + void RecycleBuffer(queue_item_t &&data) { + data->copy_complete.Put(); std::lock_guard busy_lock(busy_m_); - tl_data_.Recycle(data); - if (copy_to_gpu) { - copy_to_storage_events_.Recycle(*copy_to_gpu); - } + tl_data_.Recycle(std::move(data)); } @@ -404,15 +364,16 @@ class InputOperator : public Operator, virtual public BatchSizeProvider ShareUserData(const TensorList &batch, std::optional data_id, AccessOrder /* order = {}*/, bool /*use_copy_kernel = false*/) { std::lock_guard busy_lock(busy_m_); - state_.push_back({false, true}); auto tl_elm = GetEmptyOutputBatch(std::move(data_id)); + tl_elm->copy_requested = false; + tl_elm->copy_performed = true; // set pinned if needed - if (batch.is_pinned() != tl_elm.front()->is_pinned()) { - tl_elm.front()->Reset(); - tl_elm.front()->set_pinned(batch.is_pinned()); + if (batch.is_pinned() != tl_elm->data.is_pinned()) { + tl_elm->data.Reset(); + tl_elm->data.set_pinned(batch.is_pinned()); } - tl_elm.front()->ShareData(const_cast &>(batch)); - tl_data_.PushBack(tl_elm); + tl_elm->data.ShareData(const_cast &>(batch)); + tl_data_.PushBack(std::move(tl_elm)); } @@ -439,18 +400,20 @@ class InputOperator : public Operator, virtual public BatchSizeProvider auto tl_elm = GetEmptyOutputBatch(std::move(data_id)); bool copied_shared_data = false; - if (batch.IsContiguous()) { + // We can share only contiguous tensor lists that are stored on the same device. + if (batch.IsContiguousInMemory() && batch.device_id() == device_id_) { auto &batch_owner = unsafe_sample_owner(const_cast &>(batch), 0); - tl_elm.front()->ShareData(batch_owner, batch.nbytes(), batch.is_pinned(), batch.shape(), + tl_elm->data.ShareData(batch_owner, batch.nbytes(), batch.is_pinned(), batch.shape(), batch.type(), batch.device_id(), batch.order()); zero_copy_noncontiguous_gpu_input_ = true; } else { - // it is not contiguous so we need to copy - tl_elm.front()->Copy(batch, order, use_copy_kernel); - std::list copy_to_storage_event; - copy_to_storage_event = copy_to_storage_events_.GetEmpty(); - CUDA_CALL(cudaEventRecord(*copy_to_storage_event.front(), order.stream())); - copy_to_storage_events_.PushBack(copy_to_storage_event); + // Do not overwrite the buffer it if shares data. + if (tl_elm->data.shares_data()) + tl_elm->data.Reset(); + tl_elm->data.Copy(batch, order, use_copy_kernel); + int device_id = order.is_device() ? order.device_id() : tl_elm->data.device_id(); + cudaEvent_t event = tl_elm->GetCompletionEvent(order.device_id()); + CUDA_CALL(cudaEventRecord(event, order.stream())); if (zero_copy_noncontiguous_gpu_input_) { DALI_WARN("ExternalSource operator should not mix contiguous and noncontiguous inputs. " @@ -459,8 +422,9 @@ class InputOperator : public Operator, virtual public BatchSizeProvider } copied_shared_data = true; } - state_.push_back({copied_shared_data, true}); - tl_data_.PushBack(tl_elm); + tl_elm->copy_performed = copied_shared_data; + tl_elm->copy_requested = false; + tl_data_.PushBack(std::move(tl_elm)); } @@ -468,26 +432,27 @@ class InputOperator : public Operator, virtual public BatchSizeProvider std::enable_if_t::value> CopyUserData(const TensorList &batch, std::optional data_id, AccessOrder order, bool /* sync */, bool /* use_copy_kernel */) { - std::list tl_elm; + queue_item_t tl_elm; { std::lock_guard busy_lock(busy_m_); tl_elm = GetEmptyOutputBatch(std::move(data_id)); } // set pinned if needed - tl_elm.front()->set_order(AccessOrder::host()); - if (batch.is_pinned() != tl_elm.front()->is_pinned()) { - tl_elm.front()->Reset(); - tl_elm.front()->set_pinned(batch.is_pinned()); + tl_elm->data.set_order(AccessOrder::host()); + if (batch.is_pinned() != tl_elm->data.is_pinned()) { + tl_elm->data.Reset(); + tl_elm->data.set_pinned(batch.is_pinned()); } AccessOrder copy_order = std::is_same::value ? AccessOrder::host() // do not use a device order for a host to host copy : order; - tl_elm.front()->Copy(batch, copy_order); + tl_elm->data.Copy(batch, copy_order); { std::lock_guard busy_lock(busy_m_); - tl_data_.PushBack(tl_elm); - state_.push_back({false, false}); + tl_elm->copy_requested = true; + tl_elm->copy_performed = true; + tl_data_.PushBack(std::move(tl_elm)); } } @@ -496,12 +461,10 @@ class InputOperator : public Operator, virtual public BatchSizeProvider std::enable_if_t::value> CopyUserData(const TensorList &batch, std::optional data_id, AccessOrder order, bool sync, bool use_copy_kernel) { - std::list copy_to_storage_event; - std::list tl_elm; + queue_item_t tl_elm; { std::lock_guard busy_lock(busy_m_); tl_elm = GetEmptyOutputBatch(std::move(data_id)); - copy_to_storage_event = copy_to_storage_events_.GetEmpty(); } // If we got a host order we most probably got it via FeedPipeline and we are trying to pass the // data from CPU to GPU. As we keep the order in tl_data_ as internal_copy_stream_, we will use @@ -509,19 +472,21 @@ class InputOperator : public Operator, virtual public BatchSizeProvider // asynchronous if it comes from pinned memory or happens on a device with integrated memory // (like Xavier) where CPU and GPU share the same memory. if (!order.is_device()) { - order = tl_elm.front()->order(); + order = tl_elm->data.order(); } - tl_elm.front()->Copy(batch, order, use_copy_kernel); - CUDA_CALL(cudaEventRecord(*copy_to_storage_event.front(), order.stream())); + tl_elm->data.Copy(batch, order, use_copy_kernel); + int copy_device = order.is_device() ? order.device_id() : tl_elm->data.device_id(); + auto event = tl_elm->GetCompletionEvent(copy_device); + CUDA_CALL(cudaEventRecord(event, order.stream())); if (sync) { - CUDA_CALL(cudaEventSynchronize(*copy_to_storage_event.front())); + CUDA_CALL(cudaEventSynchronize(event)); } { std::lock_guard busy_lock(busy_m_); - tl_data_.PushBack(tl_elm); - copy_to_storage_events_.PushBack(copy_to_storage_event); - state_.push_back({false, false}); + tl_elm->copy_requested = true; + tl_elm->copy_performed = true; + tl_data_.PushBack(std::move(tl_elm)); } } @@ -562,24 +527,19 @@ class InputOperator : public Operator, virtual public BatchSizeProvider * * @param data_id Arbitrary ID of the data. Can be any string. */ - std::list GetEmptyOutputBatch(std::optional data_id) { + queue_item_t GetEmptyOutputBatch(std::optional data_id) { auto result = tl_data_.GetEmpty(); - result.front()->set_order(internal_copy_order_); - if (data_id.has_value()) { - result.front().set_id(std::move(data_id.value())); - } + result->data.set_order(internal_copy_order_); + result->data_id = (std::move(data_id)); return result; } - CachingList tl_data_; - CachingList copy_to_storage_events_; + InputQueue tl_data_; std::mutex busy_m_; std::condition_variable cv_; - std::list state_; - /* * indicates that user provide noncontiguous GPU input with zero copy option so DALI needs * to create an internal copy, it is used to raise a warning when the user mixes contiguous and diff --git a/dali/test/python/test_external_source_cupy.py b/dali/test/python/test_external_source_cupy.py index 66ccaef1f06..cb4921b4389 100644 --- a/dali/test/python/test_external_source_cupy.py +++ b/dali/test/python/test_external_source_cupy.py @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# Copyright (c) 2020-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -93,22 +93,23 @@ def _test_cross_device(src, dst, use_dali_tensor=False): iter = 0 - def get_data(): - nonlocal iter - with cp.cuda.Device(src): - data = cp.array([[1, 2, 3, 4], [5, 6, 7, 8]], dtype=cp.float32) + iter - iter += 1 - if use_dali_tensor: - return TensorGPU(data.toDlpack()) - return data + with cp.cuda.Device(src): + with cp.cuda.Stream(src): + def get_data(): + nonlocal iter + data = cp.array([[1, 2, 3, 4], [5, 6, 7, 8]], dtype=cp.float32) + iter + iter += 1 + if use_dali_tensor: + return TensorGPU(data.toDlpack()) + return data - with pipe: - pipe.set_outputs(fn.external_source(get_data, batch=False, device="gpu")) + with pipe: + pipe.set_outputs(fn.external_source(get_data, batch=False, device="gpu")) - pipe.build() - for i in range(10): - (out,) = pipe.run() - assert np.array_equal(np.array(out[0].as_cpu()), np.array([[1, 2, 3, 4], [5, 6, 7, 8]]) + i) + pipe.build() + for i in range(10): + (out,) = pipe.run() + assert np.array_equal(np.array(out[0].as_cpu()), np.array([[1, 2, 3, 4], [5, 6, 7, 8]]) + i) @attr("multigpu") From 6c3875c8f2a75267bd680dc618ace677bbdf01ce Mon Sep 17 00:00:00 2001 From: Michal Zientkiewicz Date: Fri, 25 Oct 2024 11:58:29 +0200 Subject: [PATCH 2/5] Fix stream semantics with default streams. Signed-off-by: Michal Zientkiewicz --- dali/core/access_order.cc | 53 +++++++++++++------ .../operator/builtin/input_operator.h | 26 ++++++--- 2 files changed, 58 insertions(+), 21 deletions(-) diff --git a/dali/core/access_order.cc b/dali/core/access_order.cc index 7e701230bb5..17c08e9bf1b 100644 --- a/dali/core/access_order.cc +++ b/dali/core/access_order.cc @@ -1,4 +1,4 @@ -// Copyright (c) 2021-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -25,6 +25,13 @@ AccessOrder::AccessOrder(cudaStream_t stream) : stream_(stream) { device_id_ = DeviceFromStream(stream); } +constexpr bool is_ambiguous_handle(cudaStream_t stream) { + return + stream == 0 || + stream == cudaStreamPerThread || + stream == cudaStreamLegacy; +} + void AccessOrder::wait(const AccessOrder &other) const { if (*this == other) return; @@ -33,36 +40,47 @@ void AccessOrder::wait(const AccessOrder &other) const { // always considered up-to-date. if (!has_value() || !other.is_device()) return; + + auto current_dev = []() { + int dev; + CUDA_CALL(cudaGetDevice(&dev)); + return dev; + }; + + auto need_device_switch = [&]() { + return is_ambiguous_handle(other.stream_) && other.device_id() != current_dev(); + }; + if (is_device()) { auto &pool = CUDAEventPool::instance(); int other_dev = other.device_id(); auto event = pool.Get(other_dev); // Record an event in the preceding stream - auto current_dev = []() { - int dev; - CUDA_CALL(cudaGetDevice(&dev)); - return dev; - }; - // If the stream handle has a special value, we can't refer to it directly - it is // inherently associated with the concept of "current device" and it must be switched - if (other_dev != device_id_ || - ((other.stream_ == 0 || - other.stream_ == cudaStreamPerThread || - other.stream_ == cudaStreamLegacy) && - other_dev != current_dev())) { + if (need_device_switch()) { DeviceGuard dg(other.device_id_); CUDA_CALL(cudaEventRecord(event, other.stream())); } else { CUDA_CALL(cudaEventRecord(event, other.stream())); } // and wait for it in this stream - CUDA_CALL(cudaStreamWaitEvent(stream(), event, 0)); + if (is_ambiguous_handle(stream())) { + DeviceGuard dg(device_id_); + CUDA_CALL(cudaStreamWaitEvent(stream(), event, 0)); + } else { + CUDA_CALL(cudaStreamWaitEvent(stream(), event, 0)); + } pool.Put(std::move(event), other_dev); } else { // host order - wait for the preceding stream on host - CUDA_CALL(cudaStreamSynchronize(other.stream())); + if (need_device_switch()) { + DeviceGuard dg(device_id_); + CUDA_CALL(cudaStreamSynchronize(other.stream())); + } else { + CUDA_CALL(cudaStreamSynchronize(other.stream())); + } } } @@ -70,7 +88,12 @@ void AccessOrder::wait(cudaEvent_t event) const { if (!has_value()) throw std::logic_error("A null AccessOrder cannot wait for an event."); if (is_device()) { - CUDA_DTOR_CALL(cudaStreamWaitEvent(stream(), event, 0)); + if (is_ambiguous_handle(stream())) { + DeviceGuard dg(device_id_); + CUDA_DTOR_CALL(cudaStreamWaitEvent(stream(), event, 0)); + } else { + CUDA_DTOR_CALL(cudaStreamWaitEvent(stream(), event, 0)); + } } else { CUDA_DTOR_CALL(cudaEventSynchronize(event)); } diff --git a/dali/pipeline/operator/builtin/input_operator.h b/dali/pipeline/operator/builtin/input_operator.h index b8975694223..2940bcdacde 100644 --- a/dali/pipeline/operator/builtin/input_operator.h +++ b/dali/pipeline/operator/builtin/input_operator.h @@ -48,7 +48,7 @@ struct InputQueueItem { if (device_id != device_id_) Put(); if (!event_) { - event_ = CUDAEventPool::instance().Get(device_id_); + event_ = CUDAEventPool::instance().Get(device_id); device_id_ = device_id; } } @@ -413,7 +413,17 @@ class InputOperator : public Operator, virtual public BatchSizeProvider tl_elm->data.Copy(batch, order, use_copy_kernel); int device_id = order.is_device() ? order.device_id() : tl_elm->data.device_id(); cudaEvent_t event = tl_elm->GetCompletionEvent(order.device_id()); - CUDA_CALL(cudaEventRecord(event, order.stream())); + + if (order.device_id() != device_id_ && + (order.stream() == 0 || + order.stream() == cudaStreamPerThread || + order.stream() == cudaStreamLegacy)) { + // In case of ambiguous stream handles, we need to swithch to the proper device + DeviceGuard dg; + CUDA_CALL(cudaEventRecord(event, order.stream())); + } else { + CUDA_CALL(cudaEventRecord(event, order.stream())); + } if (zero_copy_noncontiguous_gpu_input_) { DALI_WARN("ExternalSource operator should not mix contiguous and noncontiguous inputs. " @@ -476,10 +486,14 @@ class InputOperator : public Operator, virtual public BatchSizeProvider } tl_elm->data.Copy(batch, order, use_copy_kernel); int copy_device = order.is_device() ? order.device_id() : tl_elm->data.device_id(); - auto event = tl_elm->GetCompletionEvent(copy_device); - CUDA_CALL(cudaEventRecord(event, order.stream())); - if (sync) { - CUDA_CALL(cudaEventSynchronize(event)); + + { + DeviceGuard dg(copy_device); + auto event = tl_elm->GetCompletionEvent(copy_device); + CUDA_CALL(cudaEventRecord(event, order.stream())); + if (sync) { + CUDA_CALL(cudaEventSynchronize(event)); + } } { From acbb9d514456a72ecab090f9a47a3d1428e659e8 Mon Sep 17 00:00:00 2001 From: Michal Zientkiewicz Date: Fri, 25 Oct 2024 14:17:06 +0200 Subject: [PATCH 3/5] Lint Signed-off-by: Michal Zientkiewicz --- dali/test/python/test_external_source_cupy.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/dali/test/python/test_external_source_cupy.py b/dali/test/python/test_external_source_cupy.py index cb4921b4389..d2176144090 100644 --- a/dali/test/python/test_external_source_cupy.py +++ b/dali/test/python/test_external_source_cupy.py @@ -95,6 +95,7 @@ def _test_cross_device(src, dst, use_dali_tensor=False): with cp.cuda.Device(src): with cp.cuda.Stream(src): + def get_data(): nonlocal iter data = cp.array([[1, 2, 3, 4], [5, 6, 7, 8]], dtype=cp.float32) + iter @@ -109,7 +110,9 @@ def get_data(): pipe.build() for i in range(10): (out,) = pipe.run() - assert np.array_equal(np.array(out[0].as_cpu()), np.array([[1, 2, 3, 4], [5, 6, 7, 8]]) + i) + assert np.array_equal( + np.array(out[0].as_cpu()), np.array([[1, 2, 3, 4], [5, 6, 7, 8]]) + i + ) @attr("multigpu") From afc953e40f38382bdb266a59b3ff7c5728014f40 Mon Sep 17 00:00:00 2001 From: Michal Zientkiewicz Date: Mon, 28 Oct 2024 17:06:19 +0100 Subject: [PATCH 4/5] Further simplification. Signed-off-by: Michal Zientkiewicz --- .../operator/builtin/input_operator.h | 24 ++++++++----------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/dali/pipeline/operator/builtin/input_operator.h b/dali/pipeline/operator/builtin/input_operator.h index 2940bcdacde..c5754460d84 100644 --- a/dali/pipeline/operator/builtin/input_operator.h +++ b/dali/pipeline/operator/builtin/input_operator.h @@ -331,7 +331,7 @@ class InputOperator : public Operator, virtual public BatchSizeProvider } - int device_id_; + int device_id_ = -1; bool blocking_ = true; bool no_copy_ = false; bool running_ = true; @@ -400,11 +400,12 @@ class InputOperator : public Operator, virtual public BatchSizeProvider auto tl_elm = GetEmptyOutputBatch(std::move(data_id)); bool copied_shared_data = false; + if (!order.has_value()) + order = batch.order().is_device() ? batch.order() : tl_elm->data.order(); + // We can share only contiguous tensor lists that are stored on the same device. if (batch.IsContiguousInMemory() && batch.device_id() == device_id_) { - auto &batch_owner = unsafe_sample_owner(const_cast &>(batch), 0); - tl_elm->data.ShareData(batch_owner, batch.nbytes(), batch.is_pinned(), batch.shape(), - batch.type(), batch.device_id(), batch.order()); + tl_elm->data.ShareData(batch); zero_copy_noncontiguous_gpu_input_ = true; } else { // Do not overwrite the buffer it if shares data. @@ -412,16 +413,10 @@ class InputOperator : public Operator, virtual public BatchSizeProvider tl_elm->data.Reset(); tl_elm->data.Copy(batch, order, use_copy_kernel); int device_id = order.is_device() ? order.device_id() : tl_elm->data.device_id(); - cudaEvent_t event = tl_elm->GetCompletionEvent(order.device_id()); - - if (order.device_id() != device_id_ && - (order.stream() == 0 || - order.stream() == cudaStreamPerThread || - order.stream() == cudaStreamLegacy)) { - // In case of ambiguous stream handles, we need to swithch to the proper device - DeviceGuard dg; - CUDA_CALL(cudaEventRecord(event, order.stream())); - } else { + cudaEvent_t event = tl_elm->GetCompletionEvent(device_id); + + { + DeviceGuard dg(order.device_id()); CUDA_CALL(cudaEventRecord(event, order.stream())); } @@ -543,6 +538,7 @@ class InputOperator : public Operator, virtual public BatchSizeProvider */ queue_item_t GetEmptyOutputBatch(std::optional data_id) { auto result = tl_data_.GetEmpty(); + result->data.set_device_id(device_id_); result->data.set_order(internal_copy_order_); result->data_id = (std::move(data_id)); return result; From a3ac0d81e04d98e1422a7480b8238467e65f1067 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20Zientkiewicz?= Date: Tue, 29 Oct 2024 12:03:34 +0100 Subject: [PATCH 5/5] Bug fixes. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: MichaƂ Zientkiewicz --- dali/pipeline/operator/builtin/input_operator.h | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/dali/pipeline/operator/builtin/input_operator.h b/dali/pipeline/operator/builtin/input_operator.h index c5754460d84..1d15c323a9d 100644 --- a/dali/pipeline/operator/builtin/input_operator.h +++ b/dali/pipeline/operator/builtin/input_operator.h @@ -412,10 +412,9 @@ class InputOperator : public Operator, virtual public BatchSizeProvider if (tl_elm->data.shares_data()) tl_elm->data.Reset(); tl_elm->data.Copy(batch, order, use_copy_kernel); - int device_id = order.is_device() ? order.device_id() : tl_elm->data.device_id(); - cudaEvent_t event = tl_elm->GetCompletionEvent(device_id); - { + if (order.is_device()) { + cudaEvent_t event = tl_elm->GetCompletionEvent(order.device_id()); DeviceGuard dg(order.device_id()); CUDA_CALL(cudaEventRecord(event, order.stream())); } @@ -447,6 +446,8 @@ class InputOperator : public Operator, virtual public BatchSizeProvider if (batch.is_pinned() != tl_elm->data.is_pinned()) { tl_elm->data.Reset(); tl_elm->data.set_pinned(batch.is_pinned()); + if constexpr (std::is_same_v) + tl_elm->data.set_device_id(tl_elm->data.is_pinned() ? device_id_ : CPU_ONLY_DEVICE_ID); } AccessOrder copy_order = std::is_same::value @@ -538,7 +539,9 @@ class InputOperator : public Operator, virtual public BatchSizeProvider */ queue_item_t GetEmptyOutputBatch(std::optional data_id) { auto result = tl_data_.GetEmpty(); - result->data.set_device_id(device_id_); + int data_device_id = std::is_same_v || result->data.is_pinned() + ? device_id_ : CPU_ONLY_DEVICE_ID; + result->data.set_device_id(data_device_id); result->data.set_order(internal_copy_order_); result->data_id = (std::move(data_id)); return result;