Skip to content
mikerabat edited this page Jan 27, 2017 · 1 revision

Multithreading

Multithreading is a big part of the matrix library. So we can combine the single assembler optimized routines and the use of multiple cores.

The threaded operations make use of a thread pool object that caches preallocated threads for further use.

To make use of the thread pool one has to perform 3 steps (defined in MtxThreadPool.pas):

  • initialize the thread pool once by calling 'InitMtxThreadPool'. Note that this operation needs not to be executed in a dll's initialization procedure (this also means the function may not be called in the initialization sections of a dll or bpl).
  • Create a task group by calling 'MtxInitTaskGroup'. The main purpose here is to group a bunch of calls together in a group so we can synchronize and wait for all the running routines in the task.
  • Finally add tasks and wait for the finish line by calling sync.

Thread pool object

There are 3 thread pool implementations in library. One for MacOS (MacOsThreadPool.pas) and two for windows. By a simple define in MtxThreadPool.pas one can choose between a home brewn thread pool in WinThreadPool.pas and the windows internal thread pool implementation found in SimpleWinThreadPool.pas. The own implementation is a bit faster and constraints a task to a core by twiddling 'SetProcessAffinityMask' for each thread. The number of created threads equals the number of cores in the system - there is though a limitation of 64 used cores in the system. If that is not enough for you purpose simply change the constant value 'cMaxNumCores' in MtxThreadPool.pas. The simple reason for the constant is that this value is used as a constant to define a stack array in the threaded routines and thus we can spare quite a few memory allocations if we would use dynamic arrays.

Call the function 'InitMtxThreadPool' from MtxThreadPool.pas to initalize the thread pool. If you use the objects from the class 'TThreadedMatrix' this is done for you in the class constructor. In addition call the 'FinalizeThreadPool' to release memory that has been allocated in the init call.

Splitting tasks

The threading model in this library is to create a group object and add tasks to this object. Finally you can either wait for single tasks or wait for all of them together - which actually is the most common usage. Each task may have a reference to a function call that has the signature of 'TMtxProc' and a data object to define the params of the procedure. Note that when adding a task the task then owns the data object and frees it!

You may find the function 'MtxInitTaskGroup' in MtxThreadPool usefull that creates such a group of tasks.

A typical example is e.g. for the applying a function to each element in the matrix in parallel.


// this function is executed in a thread and simply calls the
// MatrixFunc procedure that applys the function to each element in a 
// defined subset of the matrix. The subset is defined by the object properties.
function MatrixFuncFunc(obj : TObject) : integer;
begin
     MatrixFunc(TAsyncMatrixFuncObj(obj).dest,
                TAsyncMatrixFuncObj(obj).destLineWidth,
                TAsyncMatrixFuncObj(obj).width,
                TAsyncMatrixFuncObj(obj).height,
                TAsyncMatrixFuncObj(obj).func);
     Result := 0;
end;



procedure ThrMatrixFunc(dest : PDouble; const destLineWidth : TASMNativeInt; 
  width, height : TASMNativeInt; func : TMatrixFunc);
var i: TASMNativeInt;
    obj : TAsyncMatrixFuncObj;
    calls : IMtxAsyncCallGroup;
    sizeFits : boolean;
    thrSize : TASMNativeInt;
    
    objs : Array[0..cMaxNumCores - 1] of TAsyncMatrixFuncObj;
    numUsed : integer;
begin
     // check if it is really necessary to thread the call:
     if (width < numCPUCores) and (height < numCPUCores) then
     begin
          MatrixFunc(dest, destLineWidth, width, height, func);
          exit;
     end;

     numUsed := 0;

     // ###################################################
     // #### Initialize data objects
     if width > height then
     begin
          sizeFits := (width mod numCPUCores) = 0;
          thrSize := width div numCPUCores + TASMNativeInt(not sizeFits);

          for i := 0 to numCPUCores - 1 do
          begin
               obj := TAsyncMatrixFuncObj.Create;
               obj.dest := dest;
               obj.destLineWidth := destLineWidth;
               obj.width := thrSize;
               obj.height := height;
               obj.func := func;

               inc(obj.dest, i*thrSize);
               if width < (i + 1)*thrSize then
                  obj.Width := width - i*thrSize;
                   
               objs[numUsed] := obj;
               inc(numUsed);
          end;
     end
     else
     begin
          sizeFits := (height mod numCPUCores) = 0;
          thrSize := height div numCPUCores + TASMNativeInt(not sizeFits);

          for i := 0 to numCPUCores - 1 do
          begin
               obj := TAsyncMatrixFuncObj.Create;
               obj.dest := dest;
               obj.destLineWidth := destLineWidth;
               obj.width := width;
               obj.height := thrSize;
               obj.func := func;

               inc(PByte(obj.dest), i*thrSize*destLineWidth);
               if height < (i + 1)*thrSize then
                  obj.Height := height - i*thrSize;

               objs[numUsed] := obj;
               inc(numUsed);
          end;
     end;

     // ###########################################
     // #### Run threads
     calls := MtxInitTaskGroup;

     for i := 0 to numUsed - 2 do
         calls.AddTask(@MatrixFuncFunc, objs[i]);

     MatrixFuncFunc(objs[numUsed  - 1]);
     objs[numUsed - 1].Free;
     
     calls.SyncAll;
end;


// ################################################
// ### Typical call 
// -> the example simple calculates value = value*value

// this function is applied to each element in the matrix
procedure ElemWiseSqr(var elem: double);
begin
     elem := SQR(elem);
end;

procedure TestThreadFunc(mt : PDouble; lineWidthMT : TASMNativeInt; width, height : integer);
begin
     // apply elementwise sqr on each element in mt in parallel:
     ThrMatrixFunc(mt, lineWidthMT, width, height, {$IFDEF FPC}@{$ENDIF}ElemWiseSqr); 
end;

Check out the unit ThreadedMatrixOpertions.pas to see how it''s done in the library.

Creating your own thread pool

Now this is where it''s getting a bit tricky. This is something you need to change the package and a source file a bit and recompile it.

To create your own thread pool add a unit to the package that implements 3 functions:

  • An Init function to initialize your thread pool.
  • A Finalize function to release memory from the pool.
  • And a function that returns a "task group" and actually an object that implements the interfaces 'IMtxAsyncCall' and 'IMtxAsyncCallGroup'.

Add the function to the MtxThreadPool.pas unit and make sure that no other thread pool is instatiated there - also override the 3 functions there that are used to create task groups throughout the library.

It''s best to check out the unit SimpleWinThreadPool.pas to see how a simple OS based thread pool works with the library.

I know that this is a cumbersome to implement a new thread pool and I need to revisit this section to have a mechanism like the memory manager...

Clone this wiki locally