Skip to content

Commit

Permalink
Don't spin on the main mutex while waiting for new work (#8433)
Browse files Browse the repository at this point in the history
* Don't spin on the main mutex while waiting for new work

Once they run out of work to do, Halide worker threads spin for a bit
checking if new work has been enqueued before calling cond_wait, which
puts them to sleep until signaled. Job owners also spin waiting for
their job to complete before going to sleep on a different condition
variable. I hate this, but all previous attempts I have made at removing
or reducing the spinning have made things slower.

One problem with this approach is that spinning is done by releasing the
work queue lock, yielding, reacquiring the work queue lock, and doing
the somewhat involved check to see if there's something useful for this
thread to do, either because new work was enqueued, the last item on a
job completed, or a semaphore was released. This hammering of the lock
by idle worker threads can starve the thread that actually completed the
last task, delaying its ability to tell the job owner the job is done,
and can also starve the job owner, causing it to take extra time to
realize the job is all done and return back into Halide code. So this
adds some wasted time at the end of every parallel for loop.

This PR gets these idle threads to spin off the main mutex. I did this
by combining a counter with a condition variable. Any time they are
signaled, the counter is atomically incremented. Before they first
release the lock, the idlers atomically capture the value of this
counter. Then in cond_wait they spin for a bit doing atomic loads of the
counter in between yields until it changes, in which case they grab the
lock and return, or until they reach the spin count limit, in which case
they go to sleep. This improved performance quite a bit over main for
the blur app, which is a fast pipeline (~100us) with fine-grained
parallelism. The speed-up was 1.2x! Not much effect on the more complex
apps.
  • Loading branch information
abadams authored Dec 5, 2024
1 parent 53619a4 commit c70496d
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 35 deletions.
2 changes: 2 additions & 0 deletions src/runtime/synchronization_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -834,6 +834,7 @@ class fast_cond {

ALWAYS_INLINE void broadcast() {
if_tsan_pre_signal(this);

uintptr_t val;
atomic_load_relaxed(&state, &val);
if (val == 0) {
Expand All @@ -846,6 +847,7 @@ class fast_cond {
}

ALWAYS_INLINE void wait(fast_mutex *mutex) {
// Go to sleep until signaled
wait_parking_control control(&state, mutex);
uintptr_t result = control.park((uintptr_t)this);
if (result != (uintptr_t)mutex) {
Expand Down
104 changes: 69 additions & 35 deletions src/runtime/thread_pool_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,57 @@ namespace Halide {
namespace Runtime {
namespace Internal {

// A condition variable, augmented with a bit of spinning on an atomic counter
// before going to sleep for real. This helps reduce overhead at the end of a
// parallel for loop when idle worker threads are waiting for other threads to
// finish so that the next parallel for loop can begin.
struct halide_cond_with_spinning {
halide_cond cond;
uintptr_t counter;

void wait(halide_mutex *mutex) {
// First spin for a bit, checking the counter for another thread to bump
// it.
uintptr_t initial;
Synchronization::atomic_load_relaxed(&counter, &initial);
halide_mutex_unlock(mutex);
for (int spin = 0; spin < 40; spin++) {
halide_thread_yield();
uintptr_t current;
Synchronization::atomic_load_relaxed(&counter, &current);
if (current != initial) {
halide_mutex_lock(mutex);
return;
}
}

// Give up on spinning and relock the mutex preparing to sleep for real.
halide_mutex_lock(mutex);

// Check one final time with the lock held. This guarantees we won't
// miss an increment of the counter because it is only ever incremented
// with the lock held.
uintptr_t current;
Synchronization::atomic_load_relaxed(&counter, &current);
if (current != initial) {
return;
}

halide_cond_wait(&cond, mutex);
}

void broadcast() {
// Release any spinning waiters
Synchronization::atomic_fetch_add_acquire_release(&counter, (uintptr_t)1);

// Release any sleeping waiters
halide_cond_broadcast(&cond);
}

// Note that this cond var variant doesn't have signal(), because it always
// wakes all spinning waiters.
};

struct work {
halide_parallel_task_t task;

Expand Down Expand Up @@ -121,7 +172,7 @@ struct work_queue_t {
// may want to wake them up independently. Any code that may
// invalidate any of the reasons a worker or owner may have slept
// must signal or broadcast the appropriate condition variable.
halide_cond wake_a_team, wake_b_team, wake_owners;
halide_cond_with_spinning wake_a_team, wake_b_team, wake_owners;

// The number of sleeping workers and owners. An over-estimate - a
// waking-up thread may not have decremented this yet.
Expand Down Expand Up @@ -203,9 +254,6 @@ WEAK void dump_job_state() {
WEAK void worker_thread(void *);

WEAK void worker_thread_already_locked(work *owned_job) {
int spin_count = 0;
const int max_spin_count = 40;

while (owned_job ? owned_job->running() : !work_queue.shutdown) {
work *job = work_queue.jobs;
work **prev_ptr = &work_queue.jobs;
Expand All @@ -226,7 +274,7 @@ WEAK void worker_thread_already_locked(work *owned_job) {
// The wakeup can likely be only done under certain conditions, but it is only happening
// in when an error has already occured and it seems more important to ensure reliable
// termination than to optimize this path.
halide_cond_broadcast(&work_queue.wake_owners);
work_queue.wake_owners.broadcast();
continue;
}
}
Expand Down Expand Up @@ -283,38 +331,24 @@ WEAK void worker_thread_already_locked(work *owned_job) {
if (!job) {
// There is no runnable job. Go to sleep.
if (owned_job) {
if (spin_count++ < max_spin_count) {
// Give the workers a chance to finish up before sleeping
halide_mutex_unlock(&work_queue.mutex);
halide_thread_yield();
halide_mutex_lock(&work_queue.mutex);
} else {
work_queue.owners_sleeping++;
owned_job->owner_is_sleeping = true;
halide_cond_wait(&work_queue.wake_owners, &work_queue.mutex);
owned_job->owner_is_sleeping = false;
work_queue.owners_sleeping--;
}
work_queue.owners_sleeping++;
owned_job->owner_is_sleeping = true;
work_queue.wake_owners.wait(&work_queue.mutex);
owned_job->owner_is_sleeping = false;
work_queue.owners_sleeping--;
} else {
work_queue.workers_sleeping++;
if (work_queue.a_team_size > work_queue.target_a_team_size) {
// Transition to B team
work_queue.a_team_size--;
halide_cond_wait(&work_queue.wake_b_team, &work_queue.mutex);
work_queue.wake_b_team.wait(&work_queue.mutex);
work_queue.a_team_size++;
} else if (spin_count++ < max_spin_count) {
// Spin waiting for new work
halide_mutex_unlock(&work_queue.mutex);
halide_thread_yield();
halide_mutex_lock(&work_queue.mutex);
} else {
halide_cond_wait(&work_queue.wake_a_team, &work_queue.mutex);
work_queue.wake_a_team.wait(&work_queue.mutex);
}
work_queue.workers_sleeping--;
}
continue;
} else {
spin_count = 0;
}

log_message("Working on job " << job->task.name);
Expand Down Expand Up @@ -432,7 +466,7 @@ WEAK void worker_thread_already_locked(work *owned_job) {
if (wake_owners ||
(job->active_workers == 0 && (job->task.extent == 0 || job->exit_status != halide_error_code_success) && job->owner_is_sleeping)) {
// The job is done or some owned job failed via sibling linkage. Wake up the owner.
halide_cond_broadcast(&work_queue.wake_owners);
work_queue.wake_owners.broadcast();
}
}
}
Expand Down Expand Up @@ -554,11 +588,11 @@ WEAK void enqueue_work_already_locked(int num_jobs, work *jobs, work *task_paren
work_queue.target_a_team_size = workers_to_wake;
}

halide_cond_broadcast(&work_queue.wake_a_team);
work_queue.wake_a_team.broadcast();
if (work_queue.target_a_team_size > work_queue.a_team_size) {
halide_cond_broadcast(&work_queue.wake_b_team);
work_queue.wake_b_team.broadcast();
if (stealable_jobs) {
halide_cond_broadcast(&work_queue.wake_owners);
work_queue.wake_owners.broadcast();
}
}

Expand Down Expand Up @@ -707,9 +741,9 @@ WEAK void halide_shutdown_thread_pool() {
halide_mutex_lock(&work_queue.mutex);

work_queue.shutdown = true;
halide_cond_broadcast(&work_queue.wake_owners);
halide_cond_broadcast(&work_queue.wake_a_team);
halide_cond_broadcast(&work_queue.wake_b_team);
work_queue.wake_owners.broadcast();
work_queue.wake_a_team.broadcast();
work_queue.wake_b_team.broadcast();
halide_mutex_unlock(&work_queue.mutex);

// Wait until they leave
Expand Down Expand Up @@ -739,8 +773,8 @@ WEAK int halide_default_semaphore_release(halide_semaphore_t *s, int n) {
if (old_val == 0 && n != 0) { // Don't wake if nothing released.
// We may have just made a job runnable
halide_mutex_lock(&work_queue.mutex);
halide_cond_broadcast(&work_queue.wake_a_team);
halide_cond_broadcast(&work_queue.wake_owners);
work_queue.wake_a_team.broadcast();
work_queue.wake_owners.broadcast();
halide_mutex_unlock(&work_queue.mutex);
}
return old_val + n;
Expand Down

0 comments on commit c70496d

Please sign in to comment.