Skip to content

Commit e1f4921

Browse files
Fix race conditions in threadpool when dealing with dynamic/frequent n_threads changes (ggml-org#17748)
* tests: update barrier test to check for race condition in active threads * cpu: combine n_graph and n_threads into a single atomic update * tests: add multi-graph test for test_barrier
1 parent 4dff236 commit e1f4921

File tree

2 files changed

+190
-53
lines changed

2 files changed

+190
-53
lines changed

ggml/src/ggml-cpu/ggml-cpu.c

Lines changed: 34 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,9 @@ typedef void * thread_ret_t;
187187

188188
typedef pthread_t ggml_thread_t;
189189

190+
#define GGML_THREADPOOL_N_THREADS_MASK (0xffffU)
191+
#define GGML_THREADPOOL_N_THREADS_BITS (16)
192+
190193
#if defined(__APPLE__)
191194
#include <unistd.h>
192195
#include <mach/mach.h>
@@ -449,20 +452,18 @@ struct ggml_threadpool {
449452
struct ggml_cplan * cplan;
450453

451454
// synchronization primitives
452-
atomic_int n_graph; // incremented when there is work to be done (i.e each graph)
455+
atomic_int n_graph; // updated when there is work to be done (i.e each graph) holds graph and active thread counts.
453456
atomic_int GGML_CACHE_ALIGN n_barrier;
454457
atomic_int GGML_CACHE_ALIGN n_barrier_passed;
455458
atomic_int GGML_CACHE_ALIGN current_chunk; // currently processing chunk during Mat_Mul, shared between all the threads.
456459

457460
// these are atomic as an annotation for thread-sanitizer
458461
atomic_bool stop; // Used for stopping the threadpool altogether
459462
atomic_bool pause; // Used for pausing the threadpool or individual threads
460-
atomic_int abort; // Used for aborting processing of a graph
463+
atomic_int abort; // Used for aborting processing of a graph
461464

462465
struct ggml_compute_state * workers; // per thread state
463-
int n_threads_max; // number of threads in the pool
464-
atomic_int n_threads_cur; // number of threads used in the current graph
465-
466+
int n_threads; // Number of threads in the pool
466467
int32_t prio; // Scheduling priority
467468
uint32_t poll; // Polling level (0 - no polling)
468469

@@ -539,7 +540,7 @@ struct ggml_state {
539540
static struct ggml_state g_state = {0};
540541

541542
void ggml_barrier(struct ggml_threadpool * tp) {
542-
int n_threads = atomic_load_explicit(&tp->n_threads_cur, memory_order_relaxed);
543+
int n_threads = atomic_load_explicit(&tp->n_graph, memory_order_relaxed) & GGML_THREADPOOL_N_THREADS_MASK;
543544
if (n_threads == 1) {
544545
return;
545546
}
@@ -556,7 +557,7 @@ void ggml_barrier(struct ggml_threadpool * tp) {
556557
// last thread
557558
atomic_store_explicit(&tp->n_barrier, 0, memory_order_relaxed);
558559

559-
// exit barrier (fill seq-cst fence)
560+
// exit barrier (full seq-cst fence)
560561
atomic_fetch_add_explicit(&tp->n_barrier_passed, 1, memory_order_seq_cst);
561562
return;
562563
}
@@ -2628,7 +2629,7 @@ static void ggml_thread_cpumask_next(const bool * global_mask, bool * local_mask
26282629
void ggml_threadpool_free(struct ggml_threadpool* threadpool) {
26292630
if (!threadpool) return;
26302631

2631-
const int n_threads = threadpool->n_threads_max;
2632+
const int n_threads = threadpool->n_threads;
26322633

26332634
#ifndef GGML_USE_OPENMP
26342635
struct ggml_compute_state* workers = threadpool->workers;
@@ -2704,7 +2705,7 @@ struct ggml_cplan ggml_graph_plan(
27042705
//GGML_PRINT_DEBUG("Threadpool is not specified. Will create a disposable threadpool : n_threads %d\n", n_threads);
27052706
}
27062707
if (n_threads <= 0) {
2707-
n_threads = threadpool ? threadpool->n_threads_max : GGML_DEFAULT_N_THREADS;
2708+
n_threads = threadpool ? threadpool->n_threads : GGML_DEFAULT_N_THREADS;
27082709
}
27092710

27102711
#if defined(__EMSCRIPTEN__) && !defined(__EMSCRIPTEN_PTHREADS__)
@@ -2912,12 +2913,14 @@ static thread_ret_t ggml_graph_compute_thread(void * data) {
29122913

29132914
struct ggml_compute_params params = {
29142915
/*.ith =*/ state->ith,
2915-
/*.nth =*/ atomic_load_explicit(&tp->n_threads_cur, memory_order_relaxed),
2916+
/*.nth =*/ atomic_load_explicit(&tp->n_graph, memory_order_relaxed) & GGML_THREADPOOL_N_THREADS_MASK,
29162917
/*.wsize =*/ cplan->work_size,
29172918
/*.wdata =*/ cplan->work_data,
29182919
/*.threadpool=*/ tp,
29192920
};
29202921

2922+
GGML_PRINT_DEBUG("thread #%d compute-start cplan %p last-graph %d \n", state->ith, cplan, state->last_graph);
2923+
29212924
for (int node_n = 0; node_n < cgraph->n_nodes && atomic_load_explicit(&tp->abort, memory_order_relaxed) != node_n; node_n++) {
29222925
struct ggml_tensor * node = cgraph->nodes[node_n];
29232926

@@ -2939,34 +2942,32 @@ static thread_ret_t ggml_graph_compute_thread(void * data) {
29392942
}
29402943
}
29412944

2945+
GGML_PRINT_DEBUG("thread #%d compute-done cplan %p last-graph %d \n", state->ith, cplan, state->last_graph);
2946+
29422947
ggml_barrier(state->threadpool);
29432948

29442949
return 0;
29452950
}
29462951

29472952
#ifndef GGML_USE_OPENMP
29482953

2949-
// check if thread is active
2950-
static inline bool ggml_graph_compute_thread_active(struct ggml_compute_state * state) {
2951-
struct ggml_threadpool * threadpool = state->threadpool;
2952-
int n_threads = atomic_load_explicit(&threadpool->n_threads_cur, memory_order_relaxed);
2953-
return (state->ith < n_threads);
2954-
}
2955-
29562954
// check if thread is ready to proceed (exit from polling or sleeping)
2955+
// returns true if loops should exit, sets state->pending to indicate new work
29572956
static inline bool ggml_graph_compute_thread_ready(struct ggml_compute_state * state) {
29582957
struct ggml_threadpool * threadpool = state->threadpool;
29592958

29602959
if (state->pending || threadpool->stop || threadpool->pause) { return true; }
29612960

29622961
// check for new graph/work
2963-
int new_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed);
2964-
if (new_graph != state->last_graph) {
2965-
state->pending = ggml_graph_compute_thread_active(state);
2966-
state->last_graph = new_graph;
2962+
int n_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed);
2963+
int n_threads = n_graph & GGML_THREADPOOL_N_THREADS_MASK;
2964+
if (n_graph != state->last_graph) {
2965+
state->pending = (state->ith < n_threads);
2966+
state->last_graph = n_graph;
2967+
return true;
29672968
}
29682969

2969-
return state->pending;
2970+
return false;
29702971
}
29712972

29722973
// sync thread state after polling
@@ -2983,11 +2984,6 @@ static inline void ggml_graph_compute_thread_sync(struct ggml_compute_state * st
29832984
static inline bool ggml_graph_compute_poll_for_work(struct ggml_compute_state * state) {
29842985
struct ggml_threadpool * threadpool = state->threadpool;
29852986

2986-
// Skip polling for unused threads
2987-
if (!ggml_graph_compute_thread_active(state)) {
2988-
return state->pending;
2989-
}
2990-
29912987
// This seems to make 0 ... 100 a decent range for polling level across modern processors.
29922988
// Perhaps, we can adjust it dynamically based on load and things.
29932989
const uint64_t n_rounds = 1024UL * 128 * threadpool->poll;
@@ -3049,7 +3045,6 @@ static thread_ret_t ggml_graph_compute_secondary_thread(void* data) {
30493045
ggml_graph_compute_check_for_work(state);
30503046
if (state->pending) {
30513047
state->pending = false;
3052-
30533048
ggml_graph_compute_thread(state);
30543049
}
30553050
}
@@ -3064,14 +3059,15 @@ static void ggml_graph_compute_kickoff(struct ggml_threadpool * threadpool, int
30643059

30653060
ggml_mutex_lock(&threadpool->mutex);
30663061

3067-
GGML_PRINT_DEBUG("threadpool: n_threads_cur %d n_threads %d\n", threadpool->n_threads_cur, n_threads);
3062+
// Update the number of active threads and the graph count
3063+
int n_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed) >> GGML_THREADPOOL_N_THREADS_BITS;
3064+
n_graph = ((n_graph + 1) << GGML_THREADPOOL_N_THREADS_BITS) | (n_threads & GGML_THREADPOOL_N_THREADS_MASK);
30683065

3069-
// Update the number of active threads
3070-
atomic_store_explicit(&threadpool->n_threads_cur, n_threads, memory_order_relaxed);
3066+
GGML_PRINT_DEBUG("compute-kickoff: n_threads %d n_graph %d\n", n_threads, n_graph);
30713067

30723068
// Indicate the graph is ready to be processed
30733069
// We need the full seq-cst fence here because of the polling threads (used in thread_sync)
3074-
atomic_fetch_add_explicit(&threadpool->n_graph, 1, memory_order_seq_cst);
3070+
atomic_store_explicit(&threadpool->n_graph, n_graph, memory_order_seq_cst);
30753071

30763072
if (threadpool->pause) {
30773073
// Update main thread prio and affinity to match the threadpool settings
@@ -3109,8 +3105,7 @@ static struct ggml_threadpool * ggml_threadpool_new_impl(
31093105
threadpool->pause = tpp->paused;
31103106
threadpool->abort = -1;
31113107
threadpool->workers = NULL;
3112-
threadpool->n_threads_max = tpp->n_threads;
3113-
threadpool->n_threads_cur = tpp->n_threads;
3108+
threadpool->n_threads = tpp->n_threads;
31143109
threadpool->poll = tpp->poll;
31153110
threadpool->prio = tpp->prio;
31163111
threadpool->ec = GGML_STATUS_SUCCESS;
@@ -3205,7 +3200,7 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
32053200
{
32063201
// update the number of threads from the actual number of threads that we got from OpenMP
32073202
n_threads = omp_get_num_threads();
3208-
atomic_store_explicit(&threadpool->n_threads_cur, n_threads, memory_order_relaxed);
3203+
atomic_store_explicit(&threadpool->n_graph, n_threads, memory_order_relaxed);
32093204
}
32103205

32113206
// Apply thread CPU mask and priority
@@ -3218,13 +3213,13 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
32183213
ggml_graph_compute_thread(&threadpool->workers[ith]);
32193214
}
32203215
} else {
3221-
atomic_store_explicit(&threadpool->n_threads_cur, 1, memory_order_relaxed);
3216+
atomic_store_explicit(&threadpool->n_graph, 1, memory_order_relaxed);
32223217
ggml_graph_compute_thread(&threadpool->workers[0]);
32233218
}
32243219
#else
3225-
if (n_threads > threadpool->n_threads_max) {
3226-
GGML_LOG_WARN("cplan requested more threads (%d) than available (%d)\n", n_threads, threadpool->n_threads_max);
3227-
n_threads = threadpool->n_threads_max;
3220+
if (n_threads > threadpool->n_threads) {
3221+
GGML_LOG_WARN("cplan requested more threads (%d) than available (%d)\n", n_threads, threadpool->n_threads);
3222+
n_threads = threadpool->n_threads;
32283223
}
32293224

32303225
// Kick all threads to start the new graph

tests/test-barrier.cpp

Lines changed: 156 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,19 +11,7 @@
1111

1212
#define MAX_NARGS 2
1313

14-
int main(int argc, char *argv[]) {
15-
16-
int n_threads = std::max(1, std::min(4, (int) std::thread::hardware_concurrency()));
17-
int n_rounds = 100;
18-
19-
if (argc > 1) {
20-
n_threads = std::atoi(argv[1]);
21-
}
22-
23-
if (argc > 2) {
24-
n_rounds = std::atoi(argv[2]);
25-
}
26-
14+
static void test_barrier(int n_threads, int n_rounds) {
2715
struct ggml_init_params params = {
2816
/* .mem_size = */ 1024*1024*1024,
2917
/* .mem_buffer = */ NULL,
@@ -56,7 +44,7 @@ int main(int argc, char *argv[]) {
5644
exit(1);
5745
}
5846

59-
// Create compute plan
47+
// The test runs with constant number of threads
6048
struct ggml_cplan cplan = ggml_graph_plan(gf, n_threads, threadpool);
6149

6250
std::vector<uint8_t> work_data(cplan.work_size);
@@ -89,6 +77,160 @@ int main(int argc, char *argv[]) {
8977

9078
ggml_threadpool_free(threadpool);
9179
ggml_free(ctx);
80+
}
81+
82+
static void test_active(int n_threads, int n_rounds) {
83+
struct ggml_init_params params = {
84+
/* .mem_size = */ 1024*1024*1024,
85+
/* .mem_buffer = */ NULL,
86+
/* .no_alloc = */ false,
87+
};
88+
89+
struct ggml_context * ctx = ggml_init(params);
90+
91+
// Create graph
92+
struct ggml_cgraph * gf = ggml_new_graph(ctx);
93+
94+
// Small graph with, parallel ops with barriers
95+
struct ggml_tensor * out = ggml_new_tensor_1d(ctx, GGML_TYPE_F32, 64);
96+
for (int i = 0; i < 2; i++) {
97+
struct ggml_tensor * a = ggml_new_tensor_2d(ctx, GGML_TYPE_Q4_0, 64, 128);
98+
out = ggml_mul_mat(ctx, a, out);
99+
100+
struct ggml_tensor * d = ggml_new_tensor_2d(ctx, GGML_TYPE_Q4_0, 128, 64);
101+
out = ggml_mul_mat(ctx, d, out);
102+
}
103+
104+
ggml_build_forward_expand(gf, out);
105+
int n_nodes = ggml_graph_n_nodes(gf);
106+
107+
// Create threadpool
108+
struct ggml_threadpool_params tpp = ggml_threadpool_params_default(n_threads);
109+
struct ggml_threadpool* threadpool = ggml_threadpool_new(&tpp);
110+
if (!threadpool) {
111+
fprintf(stderr, "threadpool create failed : n_threads %d\n", n_threads);
112+
exit(1);
113+
}
114+
115+
std::cerr << "graph-compute with"
116+
<< "\n n_threads: " << n_threads
117+
<< "\n n_nodes: " << n_nodes
118+
<< "\n n_rounds: " << n_rounds
119+
<< "\n";
120+
// ggml_graph_print(gf);
121+
122+
// In this test we keep changing the number of threads every 4th iteration
123+
// to test for race conditions in that path
124+
125+
for (int i=0; i < n_rounds; i++) {
126+
struct ggml_cplan cplan = ggml_graph_plan(gf, (i % 4) == 0 ? 1 : n_threads, threadpool);
127+
128+
std::vector<uint8_t> work_data(cplan.work_size);
129+
cplan.work_data = work_data.data();
130+
131+
ggml_graph_compute(gf, &cplan);
132+
}
133+
134+
ggml_threadpool_free(threadpool);
135+
ggml_free(ctx);
136+
}
137+
138+
static void test_multi_graph(int n_threads, int n_rounds) {
139+
struct ggml_init_params params = {
140+
/* .mem_size = */ 1024*1024*1024,
141+
/* .mem_buffer = */ NULL,
142+
/* .no_alloc = */ false,
143+
};
144+
145+
struct ggml_context * ctx = ggml_init(params);
146+
147+
// Create graphs
148+
struct ggml_cgraph * gf0 = ggml_new_graph(ctx);
149+
{
150+
// Small graph with parallel ops with barriers
151+
struct ggml_tensor * out = ggml_new_tensor_1d(ctx, GGML_TYPE_F32, 64);
152+
for (int i = 0; i < 2; i++) {
153+
struct ggml_tensor * a = ggml_new_tensor_2d(ctx, GGML_TYPE_Q4_0, 64, 128);
154+
out = ggml_mul_mat(ctx, a, out);
155+
156+
struct ggml_tensor * d = ggml_new_tensor_2d(ctx, GGML_TYPE_Q4_0, 128, 64);
157+
out = ggml_mul_mat(ctx, d, out);
158+
}
159+
160+
ggml_build_forward_expand(gf0, out);
161+
}
162+
163+
struct ggml_cgraph * gf1 = ggml_new_graph(ctx);
164+
{
165+
// Small graph with parallel ops with barriers
166+
// Use larger tensors to make sure work_data size is larger than gf0
167+
struct ggml_tensor * out = ggml_new_tensor_1d(ctx, GGML_TYPE_F32, 256);
168+
for (int i = 0; i < 4; i++) {
169+
struct ggml_tensor * a = ggml_new_tensor_2d(ctx, GGML_TYPE_Q4_0, 256, 128);
170+
out = ggml_mul_mat(ctx, a, out);
171+
172+
struct ggml_tensor * d = ggml_new_tensor_2d(ctx, GGML_TYPE_Q4_0, 128, 256);
173+
out = ggml_mul_mat(ctx, d, out);
174+
}
175+
176+
ggml_build_forward_expand(gf1, out);
177+
}
178+
179+
180+
// Create threadpool
181+
struct ggml_threadpool_params tpp = ggml_threadpool_params_default(n_threads);
182+
struct ggml_threadpool* threadpool = ggml_threadpool_new(&tpp);
183+
if (!threadpool) {
184+
fprintf(stderr, "threadpool create failed : n_threads %d\n", n_threads);
185+
exit(1);
186+
}
187+
188+
std::cerr << "graph-compute with"
189+
<< "\n gf0 n_nodes: " << ggml_graph_n_nodes(gf0)
190+
<< "\n gf1 n_nodes: " << ggml_graph_n_nodes(gf1)
191+
<< "\n n_threads: " << n_threads
192+
<< "\n n_rounds: " << n_rounds
193+
<< "\n";
194+
195+
// In this test we keep changing the number of threads every 4th iteration
196+
// and we compute two graphs back to back to test graph frequent graph switching
197+
198+
for (int i=0; i < n_rounds; i++) {
199+
struct ggml_cplan cplan0 = ggml_graph_plan(gf0, (i % 4) == 0 ? 1 : n_threads, threadpool);
200+
std::vector<uint8_t> work_data0(cplan0.work_size);
201+
cplan0.work_data = work_data0.data();
202+
203+
struct ggml_cplan cplan1 = ggml_graph_plan(gf1, (i % 4) == 0 ? 1 : n_threads, threadpool);
204+
std::vector<uint8_t> work_data1(cplan1.work_size);
205+
cplan1.work_data = work_data1.data();
206+
207+
ggml_graph_compute(gf0, &cplan0);
208+
ggml_graph_compute(gf1, &cplan1);
209+
}
210+
211+
ggml_threadpool_free(threadpool);
212+
ggml_free(ctx);
213+
}
214+
215+
216+
int main(int argc, char *argv[]) {
217+
218+
int n_threads = std::max(1, std::min(4, (int) std::thread::hardware_concurrency()));
219+
int n_rounds = 100;
220+
221+
if (argc > 1) {
222+
n_threads = std::atoi(argv[1]);
223+
}
224+
225+
if (argc > 2) {
226+
n_rounds = std::atoi(argv[2]);
227+
}
228+
229+
test_barrier(n_threads, n_rounds);
230+
231+
test_active(n_threads, n_rounds * 100);
232+
233+
test_multi_graph(n_threads, n_rounds * 10);
92234

93235
return 0;
94236
}

0 commit comments

Comments
 (0)