Skip to content

Commit

Permalink
Fix cross queue dependency when command buffer already dispatched.
Browse files Browse the repository at this point in the history
The UnitCL test USMBlockingFreeTest, MultipleQueueMultipleAlloc had
previously been disabled due to sporadic fails. Tsan build showed
that there was a dependency issue between the fill and copy commands.

This test did something like the following:

Q_A: Fill -> e[0]   BufA 4096
Q_B: Fill -> e[1]   BufB 4096
Q_C: e[0]->Memcpy   from BufA 2048
Q_C: e[1]->Memcpy   from BufB 2048

Much of the core dependencies for command buffers is done in
_cl_command_queue::getCommandBufferPending(). In order to capture event
dependencies, semaphores are waited on from pending or dispatched command
buffers. The code would check events and then check to see if they
were in pending command buffers and if so would add semaphores wait from
them. If it could not find a pending command buffer it would add the wait
on any running command buffers. This generally worked but for cross queues
it was not checking the running command buffers of that queue. Additionally
there is an early out for the case of a single pending command buffer but
if there is any cross queue events, we cannot use this.
  • Loading branch information
coldav committed Jan 4, 2024
1 parent 3f767d9 commit 192e231
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 33 deletions.
10 changes: 6 additions & 4 deletions source/cl/include/cl/command_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,16 +244,17 @@ struct _cl_command_queue final : public cl::base<_cl_command_queue> {
/// associated with a user command buffer, get the current command
/// buffer or the last pending dispatch.
/// 2. There are no wait events and the last pending dispatch is associated
/// with a user command buffer, get an unsed (new or reseted and cached)
/// with a user command buffer, get an unused (new or reset and cached)
/// command buffer.
/// 3. There are wait events associated with a single pending dispatch, get
/// the associated command buffer.
/// 4. There are wait events associated with multiple pending dispatches, get
/// an unused (new or reseted and cached) command buffer.
/// an unused (new or reset and cached) command buffer.
/// 5. There are wait events with no associated pending dispatches (already
/// dispatched), get an unused (new or reseted and cached) command buffer.
/// dispatched), get an unused (new or reset and cached) command buffer.
/// 6. There are wait events associated with a different queue.
///
/// When commands can't be batched into a single comand group, semaphores are
/// When commands can't be batched into a single command group, semaphores are
/// used to maintain submission order since the command queue is in-order.
/// Dispatches will have wait semaphores in the following cases:
///
Expand All @@ -263,6 +264,7 @@ struct _cl_command_queue final : public cl::base<_cl_command_queue> {
/// 3. There are wait events associated with multiple command buffers.
/// 4. There are wait events, no pending dispatches, and there is a running
/// command buffer.
/// 5. The wait event is cross queue.
///
/// A user event completion callback is registered with all user events, it
/// submits pending dispatches to the queue when the user event is in a
Expand Down
83 changes: 56 additions & 27 deletions source/cl/source/command_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,8 @@ _cl_command_queue::getCommandBufferPending(
// Storage for the pending dispatches on which this command buffer will
// depend.
using dispatch_pair = std::pair<const mux_command_buffer_t, dispatch_state_t>;
cargo::small_vector<dispatch_pair *, 8> dependent_dispatches;
using dispatch_dependency = std::pair<dispatch_pair *, cl_command_queue>;
cargo::small_vector<dispatch_dependency, 8> dependent_dispatches;

// Flag indicating whether it is safe to append to the last command buffer in
// the case that we only have one dependent command (which will always be the
Expand All @@ -526,7 +527,7 @@ _cl_command_queue::getCommandBufferPending(
OCL_ASSERT(pending_dispatch != std::end(pending_dispatches),
"The last pending command buffer has no entry in the "
"pending dispatches map.");
if (dependent_dispatches.push_back(&*pending_dispatch)) {
if (dependent_dispatches.push_back({&*pending_dispatch, this})) {
return cargo::make_unexpected(CL_OUT_OF_RESOURCES);
}

Expand All @@ -535,6 +536,7 @@ _cl_command_queue::getCommandBufferPending(
can_append_last_dispatch = !pending_dispatch->second.is_user_command_buffer;
}

cargo::small_vector<cl_command_queue, 2> dependent_dispatch_command_queues;
// Find all dependent dispatches in the event_wait_list.
for (auto wait_event : event_wait_list) {
if (cl::isUserEvent(wait_event) &&
Expand All @@ -557,7 +559,7 @@ _cl_command_queue::getCommandBufferPending(
// dependent_dispatches.
if (std::any_of(dispatch.signal_events.begin(),
dispatch.signal_events.end(), isWaitEvent)) {
if (dependent_dispatches.push_back(&pending)) {
if (dependent_dispatches.push_back({&pending, wait_event->queue})) {
return cargo::make_unexpected(CL_OUT_OF_RESOURCES);
}
}
Expand All @@ -567,15 +569,24 @@ _cl_command_queue::getCommandBufferPending(
// different from this one
if (wait_event->queue != this &&
CL_COMMAND_USER != wait_event->command_type) {
// Check if the cross queue does not exist in the queues
if (std::find(dependent_dispatch_command_queues.begin(),
dependent_dispatch_command_queues.end(),
wait_event->queue) ==
dependent_dispatch_command_queues.end()) {
if (dependent_dispatch_command_queues.push_back(wait_event->queue)) {
return cargo::make_unexpected(CL_OUT_OF_RESOURCES);
}
}
can_append_last_dispatch = false;

for (auto &pending : wait_event->queue->pending_dispatches) {
auto &dispatch = pending.second;
// Check if any signal events are the wait event and add them to
// dependent_dispatches.
if (std::any_of(dispatch.signal_events.begin(),
dispatch.signal_events.end(), isWaitEvent)) {
can_append_last_dispatch = false;

if (dependent_dispatches.push_back(&pending)) {
if (dependent_dispatches.push_back({&pending, wait_event->queue})) {
return cargo::make_unexpected(CL_OUT_OF_RESOURCES);
}
}
Expand All @@ -595,33 +606,51 @@ _cl_command_queue::getCommandBufferPending(
// There is only a single dependent dispatch so return its command buffer.
// Since there is only one it must be the most recent dispatch.
if (dependent_dispatches.size() == 1 && can_append_last_dispatch) {
return dependent_dispatches.front()->first;
return dependent_dispatches.front().first->first;
}

// Storage for wait semaphores to set on a pending command buffer.
cargo::small_vector<mux_shared_semaphore, 8> semaphores;

// There are one or more dependent dispatches we must create a new command
// group and wait on the their signal semaphores.
if (!dependent_dispatches.empty()) {
for (auto &dependent : dependent_dispatches) {
auto &dependent_dispatch = dependent->second;
// Append the signal semaphore to wait_semaphores of the current
// dispatch.
if (semaphores.push_back(dependent_dispatch.signal_semaphore)) {
return cargo::make_unexpected(CL_OUT_OF_RESOURCES);
// We always process the current queue
if (dependent_dispatch_command_queues.push_back(this)) {
return cargo::make_unexpected(CL_OUT_OF_RESOURCES);
}

for (auto &dispatch_queue : dependent_dispatch_command_queues) {
// There are one or more dependent dispatches we must create a new command
// group and wait on the their signal semaphores.
bool has_dependent_dispatches =
std::find_if(
dependent_dispatches.begin(), dependent_dispatches.end(),
[dispatch_queue](dispatch_dependency &dispatch_dependency) {
return dispatch_dependency.second == dispatch_queue;
}) != std::end(dependent_dispatches);

if (has_dependent_dispatches) {
for (auto &dependent_dispatch_info : dependent_dispatches) {
if (dependent_dispatch_info.second != dispatch_queue) {
continue;
}
auto &dependent_dispatch = dependent_dispatch_info.first->second;

// Append the signal semaphore to wait_semaphores of the current
// dispatch.
if (semaphores.push_back(dependent_dispatch.signal_semaphore)) {
return cargo::make_unexpected(CL_OUT_OF_RESOURCES);
}
}
}
} else {
// There are no dependent dispatches, this means the command buffer is
// running now or has already completed or there were never any wait events
// in the first place.
// Wait on all running dispatches to ensure ordering since the commands in
// running_command_buffers may be out of order with respect the
// container (ordering is still enforced via semaphore dependencies though).
for (auto &running_dispatch : running_command_buffers) {
if (semaphores.push_back(running_dispatch.signal_semaphore)) {
return cargo::make_unexpected(CL_OUT_OF_RESOURCES);
} else {
// There are no dependent dispatches, this means the command buffer is
// running now or has already completed or there were never any wait
// events in the first place. Wait on all running dispatches to ensure
// ordering since the commands in running_command_buffers may be out of
// order with respect the container (ordering is still enforced via
// semaphore dependencies though).
for (auto &running_dispatch : dispatch_queue->running_command_buffers) {
if (semaphores.push_back(running_dispatch.signal_semaphore)) {
return cargo::make_unexpected(CL_OUT_OF_RESOURCES);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,8 @@ TEST_F(USMBlockingFreeTest, SingleQueueMultipleAlloc) {
// Populates two device allocations, A & B, on their own queues using fill calls
// before copying them to a separate allocation C. The copy operation is
// enqueued on the queue C, so allocations A & B interact with multiple queues.
TEST_F(USMBlockingFreeTest, DISABLED_MultipleQueueMultipleAlloc) {
std::array<cl_event, 2> events;
TEST_F(USMBlockingFreeTest, MultipleQueueMultipleAlloc) {
std::array<cl_event, 2> events = {nullptr, nullptr};
const cl_uint pattern_A = 42;
auto &queue_A = fixture_queues[0];
auto &device_ptr_A = fixture_device_ptrs[0];
Expand Down

0 comments on commit 192e231

Please sign in to comment.