From 5885d030c2669f5295895ff27f75ddcb30014233 Mon Sep 17 00:00:00 2001 From: Daniel Bevenius Date: Mon, 5 Aug 2024 14:46:15 +0200 Subject: [PATCH] docs: add more multi-threading details to ggml.md --- notes/ggml.md | 352 +++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 348 insertions(+), 4 deletions(-) diff --git a/notes/ggml.md b/notes/ggml.md index b480f6f5..0c56b212 100644 --- a/notes/ggml.md +++ b/notes/ggml.md @@ -1460,6 +1460,7 @@ enum ggml_status ggml_graph_compute_with_ctx(struct ggml_context * ctx, struct g ``` I've gone through `ggml_new_object` before so I won't go through it again. TODO: look into the context `mem_buffer` and how that works. + So lets now take a look at `ggml_graph_compute`: ```c enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cplan * cplan) { @@ -1508,17 +1509,20 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl ggml_graph_compute_thread(&worker); } ``` + ``` #pragma omp parallel num_threads(n_threads) ``` This is an OpenMP directive that specifies that the following block of code -should be executed in parallel by the threads. So this will start 4 threads in +should be executed in parallel by `n_threads`. So this will start 4 threads in our case. + The next OMP directive is `#pragma omp single` which specifies that the block should be executed by a single thread. So one of those four threads will execute -the block of code that follows, which in this case just gets a thread +the block of code that follows, which in this case just gets the number of +threads from OpenMP and sets the `n_threads` to that value. -If we set a breakpoint in the single block and the parallel block we can +If we set a breakpoint in the single block and the parallel block we can inspect the threads that have been created: ```console 18704 #pragma omp parallel num_threads(n_threads) @@ -1905,4 +1909,344 @@ to make sure that different threads are not writing to the same cache line starts at the base address. This will allow each thread to operate on different cache lines. -_wip_ +Our `cplan` struct will then look like this: +```console +(gdb) p cplan +$20 = {work_size = 2240, work_data = 0x0, n_threads = 4, abort_callback = 0x0, abort_callback_data = 0x0} +``` +This will then be retuned to `ggml_graph_compute_with_ctx`: +```c +enum ggml_status ggml_graph_compute_with_ctx(struct ggml_context * ctx, struct ggml_cgraph * cgraph, int n_threads) { + struct ggml_cplan cplan = ggml_graph_plan(cgraph, n_threads); + + struct ggml_object * obj = ggml_new_object(ctx, GGML_OBJECT_TYPE_WORK_BUFFER, cplan.work_size); + cplan.work_data = (uint8_t *)ctx->mem_buffer + obj->offs; + return ggml_graph_compute(cgraph, &cplan); +``` +GGML currently has three types of objects and I think we have discussed the +other two previously: +```c + enum ggml_object_type { + GGML_OBJECT_TYPE_TENSOR, + GGML_OBJECT_TYPE_GRAPH, + GGML_OBJECT_TYPE_WORK_BUFFER + }; +``` + +Lets take a closer look at `ggml_graph_compute`: +```c +enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cplan * cplan) { + GGML_ASSERT(cplan); + GGML_ASSERT(cplan->n_threads > 0); + GGML_ASSERT(cplan->work_size == 0 || cplan->work_data != NULL); + + int n_threads = cplan->n_threads; + + struct ggml_compute_state_shared state_shared = { + /*.cgraph =*/ cgraph, + /*.cgraph_plan =*/ cplan, + /*.n_threads =*/ n_threads, + /*.n_barrier =*/ 0, + /*.n_barrier_passed =*/ 0, + /*.abort_callback =*/ NULL, + /*.abort_callback_data =*/ NULL, + /*.current_chunk =*/ 0, + /*.ec =*/ GGML_STATUS_SUCCESS, + }; + +#ifdef GGML_USE_OPENMP + if (n_threads > 1) { + #pragma omp parallel num_threads(n_threads) + { + #pragma omp single + { + // update the number of threads from the actual number of threads that we got from OpenMP + n_threads = omp_get_num_threads(); + state_shared.n_threads = n_threads; + } + + struct ggml_compute_state worker = { + .thrd = 0, + .ith = omp_get_thread_num(), + .shared = &state_shared, + }; + ggml_graph_compute_thread(&worker); + } + } else { + struct ggml_compute_state worker = { + .thrd = 0, + .ith = 0, + .shared = &state_shared, + }; + ggml_graph_compute_thread(&worker); + } +``` +The member `ec`in the `ggml_compute_state_shared` struct is the error code (ec) +which for some reason was not obvious to me initially. + +``` + #pragma omp parallel num_threads(n_threads) +``` +This is an OpenMP directive that specifies that the following block of code +should be executed in parallel by `n_threads`. So this will start 4 threads in +our case. + +The next OMP directive is `#pragma omp single` which specifies that the block +should be executed by a single thread. So one of those four threads will execute +the block of code that follows, which in this case just gets the number of +threads from OpenMP and sets the `n_threads` to that value. + +If we set a breakpoint in the single block and the parallel block we can +inspect the threads that have been created: +```console +(gdb) br ggml.c:18715 +(gdb) continue + +Breakpoint 3 at 0x5555555a8ef4: file /home/danbev/work/ai/learning-ai/fundamentals/ggml/ggml/src/ggml.c, line 18715. +(gdb) continue +Continuing. +[New Thread 0x7ffff68ec640 (LWP 451923)] +[New Thread 0x7ffff60eb640 (LWP 451924)] +[New Thread 0x7ffff58ea640 (LWP 451925)] +[Switching to Thread 0x7ffff58ea640 (LWP 451925)] + +Thread 4 "rope" hit Breakpoint 3, ggml_graph_compute._omp_fn.0 () at /home/danbev/work/ai/learning-ai/fundamentals/ggml/ggml/src/ggml.c:18715 +18715 n_threads = omp_get_num_threads(); + +(gdb) info thread + Id Target Id Frame + 1 Thread 0x7ffff7e64c00 (LWP 450569) "rope" 0x00007ffff7e8a0ca in ?? () from /lib/x86_64-linux-gnu/libgomp.so.1 + 2 Thread 0x7ffff68ec640 (LWP 451923) "rope" 0x00007ffff7e8a0ca in ?? () from /lib/x86_64-linux-gnu/libgomp.so.1 + 3 Thread 0x7ffff60eb640 (LWP 451924) "rope" 0x00007ffff7e8a0ca in ?? () from /lib/x86_64-linux-gnu/libgomp.so.1 +* 4 Thread 0x7ffff58ea640 (LWP 451925) "rope" ggml_graph_compute._omp_fn.0 () + at /home/danbev/work/ai/learning-ai/fundamentals/ggml/ggml/src/ggml.c:18715 +``` +Now, lets enable thread locking: +```console +(gdb) set scheduler-locking on +``` +This is the single thread block which will execute and set the +`state_shared.n_threads`: +```c + #pragma omp single + { + // update the number of threads from the actual number of threads that we got from OpenMP + n_threads = omp_get_num_threads(); + state_shared.n_threads = n_threads; + } +``` +So lets also set a break point in the after the single thread block to be able +to step through it. +```c + struct ggml_compute_state worker = { + .thrd = 0, + .ith = omp_get_thread_num(), + .shared = &state_shared, + }; + ggml_graph_compute_thread(&worker); +``` +So the above will get executed by each thread and each will set teh `ith` member +of the `ggml_compute_state` struct to the thread number. For example the current +thread will set it to 3: +```console +(gdb) p (int) omp_get_thread_num() +$23 = 3 +``` +Note that `thrd` is short for `thread` and its type is `ggml_thread_t`: +```console +(gdb) ptype struct ggml_compute_state +type = struct ggml_compute_state { + ggml_thread_t thrd; + int ith; + struct ggml_compute_state_shared *shared; +} +``` +And notice that all `ggml_compute_state`'s will have a pointer to the shared +compute state which contains: +```console +(gdb) p state_shared +$24 = {cgraph = 0x7ffff691d610, + cplan = 0x7fffffffdd90, + n_threads = 4, + n_barrier = 0, + n_barrier_passed = 0, + abort_callback = 0x0, + abort_callback_data = 0x0, + current_chunk = 0, + ec = GGML_STATUS_SUCCESS} +``` + +After that the worker struct will be passed to `ggml_graph_compute_thread`: +```c +static thread_ret_t ggml_graph_compute_thread(void * data) { + struct ggml_compute_state * state = (struct ggml_compute_state *) data; + + const struct ggml_cgraph * cgraph = state->shared->cgraph; + const struct ggml_cplan * cplan = state->shared->cplan; + + set_numa_thread_affinity(state->ith); + + struct ggml_compute_params params = { + /*.ith =*/ state->ith, + /*.nth =*/ state->shared->n_threads, + /*.wsize =*/ cplan->work_size, + /*.wdata =*/ cplan->work_data, + /*.shared=*/ state->shared, + }; + + for (int node_n = 0; node_n < cgraph->n_nodes; node_n++) { + struct ggml_tensor * node = cgraph->nodes[node_n]; + + ggml_compute_forward(¶ms, node); + + if (state->ith == 0 && cplan->abort_callback && cplan->abort_callback(cplan->abort_callback_data)) { + state->shared->ec = GGML_STATUS_ABORTED; + } + + ggml_barrier(state->shared); + + if (state->shared->ec != GGML_STATUS_SUCCESS) { + break; + } + } + + return 0; +} +``` +A `ggml_compute_params` struct is created which contains the thread number (3 +in the current session/thread), the number of threads (4), the work size (2240), +the work data, and also a pointer to the shared compute state. + +Now, recall that all the threads will execute this function and not just one, +and all of them will loop through the nodes in the compute graph. +And in this case the first node is the rehape tensor which will be passed to +`ggml_compute_forward`: +```c +static void ggml_compute_forward(struct ggml_compute_params * params, struct ggml_tensor * tensor) { + GGML_ASSERT(params); + + if (tensor->op == GGML_OP_NONE || ggml_is_empty(tensor)) { + return; + } + + switch (tensor->op) { + ... + case GGML_OP_RESHAPE: + { + ggml_compute_forward_reshape(params, tensor); + } break; + ... + } +``` +And like we mentioned earlier rehape is a no-operation in the forward pass: +```c +static void ggml_compute_forward_reshape( + const struct ggml_compute_params * params, + struct ggml_tensor * dst) { + // NOP + UNUSED(params); + UNUSED(dst); +} +``` +Back in `ggml_graph_compute_thread` we have the following: +```c + if (state->ith == 0 && cplan->abort_callback && cplan->abort_callback(cplan->abort_callback_data)) { + state->shared->ec = GGML_STATUS_ABORTED; + } + + ggml_barrier(state->shared); +``` +In our case/thread the `state->ith` is 3 so that block will only be performed by +thread 0. Next we have the `ggml_barrier` which will a syncrhonization +construct which ensures that all threads in a parallel region (OpenMP block +which we are currently in). So all theads must wait for the others at this +point. For us this means that nothing else will happen as we enabled thread +locking earlier. So lets disable that and continue. +```console +(gdb) set scheduler-locking off +(gdb) continue +``` +When we hit the next breakpoint we can inspect the node/tensor to see that it is +the `GGML_OP_ROPE` operation and then again enable thread locking. +What we are interested in is the usage of the params like `ith` +```c + const int ith = params->ith; // thread number + const int nth = params->nth; // number of threads +``` +Next we have the calculation of the number of rows which I'd like to see how it +is implemented: +```c + const int nr = ggml_nrows(dst); +``` +```c +GGML_CALL int64_t ggml_nrows(const struct ggml_tensor * tensor) { + static_assert(GGML_MAX_DIMS == 4, "GGML_MAX_DIMS is not 4 - update this function"); + return tensor->ne[1] * tensor->ne[2] * tensor->ne[3]; +} +``` +Our tensor has the following shape: +```console +(gdb) p tensor->ne +$42 = {128, 32, 6, 1} +``` +128 is the number of dimensions, 32 the number of heads, 6 is the number of +tokens in the sequence, and 1 is the batch size. So this will become +32 * 6 * 1 = 192 rows. + +Next we have the calculation of the number of rows per thread: +```c + // rows per thread + const int dr = (nr + nth - 1)/nth; +``` +This will become (192 + 4 - 1)/4 = 48. So each thread will handle 48 rows. +We can visualize this something like this: +``` + 0 128 +0 [ ] thread 0 + ... + [ ] +47 [ ] thread 1 + ... + [ ] +95 [ ] thread 2 + ... + [ ] +143 [ ] thread 3 + ... + [ ] +191 [ ] +``` + +```c + // row range for this thread + const int ir0 = dr*ith; + const int ir1 = MIN(ir0 + dr, nr); +``` +So `ir0` (index row) will be 48 * 3 = 144. + +Next we have a local variable named `ir`: +```c + // row index used to determine which thread to use + int ir = 0; +``` +I'm going to skip the details of the rope implementation as this has been +covered in [ggml-rope.md](positional-encoding/ggml-rope.md) and focus on the +code related to multithreading. +Below there is an outerloop that loops over the number of batches (which is just +one in this case), then it will loop over the number of tokens in the sequence +which is 6. And the it will loop over the number of heads: +```c + for (int64_t i3 = 0; i3 < ne3; i3++) { + for (int64_t i2 = 0; i2 < ne2; i2++) { + ... + for (int64_t i1 = 0; i1 < ne1; i1++) { + if (ir++ < ir0) continue; + if (ir > ir1) break; +``` +This is the part that is interesting to this section. Recall that `ir` was set +to 0 initially and `ir0` was set to 144 (the rows that this thread is going to +process). So what this is doing is that it contining the loop as long as `ir` +is not rows that this thread is going to process. And if `ir` has reached `ir1` +then it will break out of the loop. So when the body of the loop is executed i1 +will be in the range 143 to 191. +And I think this is how the multi-threading is implemented in GGML.