diff --git a/source/cl/include/cl/command_queue.h b/source/cl/include/cl/command_queue.h index 0ecc6ab2d..902f90835 100644 --- a/source/cl/include/cl/command_queue.h +++ b/source/cl/include/cl/command_queue.h @@ -244,16 +244,17 @@ struct _cl_command_queue final : public cl::base<_cl_command_queue> { /// associated with a user command buffer, get the current command /// buffer or the last pending dispatch. /// 2. There are no wait events and the last pending dispatch is associated - /// with a user command buffer, get an unsed (new or reseted and cached) + /// with a user command buffer, get an unused (new or reset and cached) /// command buffer. /// 3. There are wait events associated with a single pending dispatch, get /// the associated command buffer. /// 4. There are wait events associated with multiple pending dispatches, get - /// an unused (new or reseted and cached) command buffer. + /// an unused (new or reset and cached) command buffer. /// 5. There are wait events with no associated pending dispatches (already - /// dispatched), get an unused (new or reseted and cached) command buffer. + /// dispatched), get an unused (new or reset and cached) command buffer. + /// 6. There are wait events associated with a different queue. /// - /// When commands can't be batched into a single comand group, semaphores are + /// When commands can't be batched into a single command group, semaphores are /// used to maintain submission order since the command queue is in-order. /// Dispatches will have wait semaphores in the following cases: /// @@ -263,6 +264,7 @@ struct _cl_command_queue final : public cl::base<_cl_command_queue> { /// 3. There are wait events associated with multiple command buffers. /// 4. There are wait events, no pending dispatches, and there is a running /// command buffer. + /// 5. The wait event is cross queue. /// /// A user event completion callback is registered with all user events, it /// submits pending dispatches to the queue when the user event is in a diff --git a/source/cl/source/command_queue.cpp b/source/cl/source/command_queue.cpp index 2ec7e46f5..d63ecb37e 100644 --- a/source/cl/source/command_queue.cpp +++ b/source/cl/source/command_queue.cpp @@ -512,7 +512,8 @@ _cl_command_queue::getCommandBufferPending( // Storage for the pending dispatches on which this command buffer will // depend. using dispatch_pair = std::pair; - cargo::small_vector dependent_dispatches; + using dispatch_dependency = std::pair; + cargo::small_vector dependent_dispatches; // Flag indicating whether it is safe to append to the last command buffer in // the case that we only have one dependent command (which will always be the @@ -526,7 +527,7 @@ _cl_command_queue::getCommandBufferPending( OCL_ASSERT(pending_dispatch != std::end(pending_dispatches), "The last pending command buffer has no entry in the " "pending dispatches map."); - if (dependent_dispatches.push_back(&*pending_dispatch)) { + if (dependent_dispatches.push_back({&*pending_dispatch, this})) { return cargo::make_unexpected(CL_OUT_OF_RESOURCES); } @@ -535,6 +536,7 @@ _cl_command_queue::getCommandBufferPending( can_append_last_dispatch = !pending_dispatch->second.is_user_command_buffer; } + cargo::small_vector dependent_dispatch_command_queues; // Find all dependent dispatches in the event_wait_list. for (auto wait_event : event_wait_list) { if (cl::isUserEvent(wait_event) && @@ -557,7 +559,7 @@ _cl_command_queue::getCommandBufferPending( // dependent_dispatches. if (std::any_of(dispatch.signal_events.begin(), dispatch.signal_events.end(), isWaitEvent)) { - if (dependent_dispatches.push_back(&pending)) { + if (dependent_dispatches.push_back({&pending, wait_event->queue})) { return cargo::make_unexpected(CL_OUT_OF_RESOURCES); } } @@ -567,15 +569,24 @@ _cl_command_queue::getCommandBufferPending( // different from this one if (wait_event->queue != this && CL_COMMAND_USER != wait_event->command_type) { + // Check if the cross queue does not exist in the queues + if (std::find(dependent_dispatch_command_queues.begin(), + dependent_dispatch_command_queues.end(), + wait_event->queue) == + dependent_dispatch_command_queues.end()) { + if (dependent_dispatch_command_queues.push_back(wait_event->queue)) { + return cargo::make_unexpected(CL_OUT_OF_RESOURCES); + } + } + can_append_last_dispatch = false; + for (auto &pending : wait_event->queue->pending_dispatches) { auto &dispatch = pending.second; // Check if any signal events are the wait event and add them to // dependent_dispatches. if (std::any_of(dispatch.signal_events.begin(), dispatch.signal_events.end(), isWaitEvent)) { - can_append_last_dispatch = false; - - if (dependent_dispatches.push_back(&pending)) { + if (dependent_dispatches.push_back({&pending, wait_event->queue})) { return cargo::make_unexpected(CL_OUT_OF_RESOURCES); } } @@ -595,33 +606,51 @@ _cl_command_queue::getCommandBufferPending( // There is only a single dependent dispatch so return its command buffer. // Since there is only one it must be the most recent dispatch. if (dependent_dispatches.size() == 1 && can_append_last_dispatch) { - return dependent_dispatches.front()->first; + return dependent_dispatches.front().first->first; } // Storage for wait semaphores to set on a pending command buffer. cargo::small_vector semaphores; - // There are one or more dependent dispatches we must create a new command - // group and wait on the their signal semaphores. - if (!dependent_dispatches.empty()) { - for (auto &dependent : dependent_dispatches) { - auto &dependent_dispatch = dependent->second; - // Append the signal semaphore to wait_semaphores of the current - // dispatch. - if (semaphores.push_back(dependent_dispatch.signal_semaphore)) { - return cargo::make_unexpected(CL_OUT_OF_RESOURCES); + // We always process the current queue + if (dependent_dispatch_command_queues.push_back(this)) { + return cargo::make_unexpected(CL_OUT_OF_RESOURCES); + } + + for (auto &dispatch_queue : dependent_dispatch_command_queues) { + // There are one or more dependent dispatches we must create a new command + // group and wait on the their signal semaphores. + bool has_dependent_dispatches = + std::find_if( + dependent_dispatches.begin(), dependent_dispatches.end(), + [dispatch_queue](dispatch_dependency &dispatch_dependency) { + return dispatch_dependency.second == dispatch_queue; + }) != std::end(dependent_dispatches); + + if (has_dependent_dispatches) { + for (auto &dependent_dispatch_info : dependent_dispatches) { + if (dependent_dispatch_info.second != dispatch_queue) { + continue; + } + auto &dependent_dispatch = dependent_dispatch_info.first->second; + + // Append the signal semaphore to wait_semaphores of the current + // dispatch. + if (semaphores.push_back(dependent_dispatch.signal_semaphore)) { + return cargo::make_unexpected(CL_OUT_OF_RESOURCES); + } } - } - } else { - // There are no dependent dispatches, this means the command buffer is - // running now or has already completed or there were never any wait events - // in the first place. - // Wait on all running dispatches to ensure ordering since the commands in - // running_command_buffers may be out of order with respect the - // container (ordering is still enforced via semaphore dependencies though). - for (auto &running_dispatch : running_command_buffers) { - if (semaphores.push_back(running_dispatch.signal_semaphore)) { - return cargo::make_unexpected(CL_OUT_OF_RESOURCES); + } else { + // There are no dependent dispatches, this means the command buffer is + // running now or has already completed or there were never any wait + // events in the first place. Wait on all running dispatches to ensure + // ordering since the commands in running_command_buffers may be out of + // order with respect the container (ordering is still enforced via + // semaphore dependencies though). + for (auto &running_dispatch : dispatch_queue->running_command_buffers) { + if (semaphores.push_back(running_dispatch.signal_semaphore)) { + return cargo::make_unexpected(CL_OUT_OF_RESOURCES); + } } } } diff --git a/source/cl/test/UnitCL/source/cl_intel_unified_shared_memory/usm_free.cpp b/source/cl/test/UnitCL/source/cl_intel_unified_shared_memory/usm_free.cpp index 7de80c174..ba0cad2a5 100644 --- a/source/cl/test/UnitCL/source/cl_intel_unified_shared_memory/usm_free.cpp +++ b/source/cl/test/UnitCL/source/cl_intel_unified_shared_memory/usm_free.cpp @@ -297,8 +297,8 @@ TEST_F(USMBlockingFreeTest, SingleQueueMultipleAlloc) { // Populates two device allocations, A & B, on their own queues using fill calls // before copying them to a separate allocation C. The copy operation is // enqueued on the queue C, so allocations A & B interact with multiple queues. -TEST_F(USMBlockingFreeTest, DISABLED_MultipleQueueMultipleAlloc) { - std::array events; +TEST_F(USMBlockingFreeTest, MultipleQueueMultipleAlloc) { + std::array events = {nullptr, nullptr}; const cl_uint pattern_A = 42; auto &queue_A = fixture_queues[0]; auto &device_ptr_A = fixture_device_ptrs[0];