Skip to content

Commit 7b13256

Browse files
authored
Initial implementation of new task system (#7269)
1 parent c0d13a0 commit 7b13256

33 files changed

+3363
-0
lines changed

CMakeLists.txt

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -280,6 +280,14 @@ add_ccf_static_library(
280280
LINK_LIBS qcbor t_cose http_parser ccfcrypto ccf_kv
281281
)
282282

283+
# CCF task system library
284+
add_ccf_static_library(
285+
ccf_tasks
286+
SRCS ${CCF_DIR}/src/tasks/task_system.cpp ${CCF_DIR}/src/tasks/job_board.cpp
287+
${CCF_DIR}/src/tasks/ordered_tasks.cpp
288+
${CCF_DIR}/src/tasks/fan_in_tasks.cpp
289+
)
290+
283291
# Common test args for Python scripts starting up CCF networks
284292
set(WORKER_THREADS
285293
0
@@ -529,6 +537,20 @@ if(BUILD_TESTS)
529537
)
530538
target_link_libraries(ds_test PRIVATE ${CMAKE_THREAD_LIBS_INIT})
531539

540+
add_unit_test(
541+
task_system_test
542+
${CMAKE_CURRENT_SOURCE_DIR}/src/tasks/test/basic_tasks.cpp
543+
${CMAKE_CURRENT_SOURCE_DIR}/src/tasks/test/ordered_tasks.cpp
544+
${CMAKE_CURRENT_SOURCE_DIR}/src/tasks/test/delayed_tasks.cpp
545+
${CMAKE_CURRENT_SOURCE_DIR}/src/tasks/test/fan_in_tasks.cpp
546+
)
547+
target_link_libraries(task_system_test PRIVATE ccf_tasks)
548+
549+
add_unit_test(
550+
task_system_demo ${CMAKE_CURRENT_SOURCE_DIR}/src/tasks/test/demo/main.cpp
551+
)
552+
target_link_libraries(task_system_demo PRIVATE ccf_tasks)
553+
532554
add_unit_test(
533555
ledger_test ${CMAKE_CURRENT_SOURCE_DIR}/src/host/test/ledger.cpp
534556
)
@@ -775,6 +797,14 @@ if(BUILD_TESTS)
775797
)
776798
add_picobench(merkle_bench SRCS src/node/test/merkle_bench.cpp)
777799
add_picobench(hash_bench SRCS src/ds/test/hash_bench.cpp)
800+
801+
add_picobench(
802+
task_bench
803+
SRCS ${CMAKE_CURRENT_SOURCE_DIR}/src/tasks/test/bench/merge_bench.cpp
804+
${CMAKE_CURRENT_SOURCE_DIR}/src/tasks/test/bench/sleep_bench.cpp
805+
${CMAKE_CURRENT_SOURCE_DIR}/src/tasks/test/bench/contention_bench.cpp
806+
LINK_LIBS ccf_tasks
807+
)
778808
endif()
779809

780810
if(LONG_TESTS)

src/tasks/basic_task.h

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the Apache 2.0 License.
3+
#pragma once
4+
5+
#include "tasks/task.h"
6+
7+
namespace ccf::tasks
8+
{
9+
struct BasicTask : public BaseTask
10+
{
11+
using Fn = std::function<void()>;
12+
13+
Fn fn;
14+
const std::string name;
15+
16+
BasicTask(const Fn& _fn, const std::string& s = "[Anon]") : fn(_fn), name(s)
17+
{}
18+
19+
void do_task_implementation() override
20+
{
21+
fn();
22+
}
23+
24+
std::string_view get_name() const override
25+
{
26+
return name;
27+
}
28+
};
29+
30+
template <typename... Ts>
31+
Task make_basic_task(Ts&&... ts)
32+
{
33+
return std::make_shared<BasicTask>(std::forward<Ts>(ts)...);
34+
}
35+
}

src/tasks/fan_in_tasks.cpp

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the Apache 2.0 License.
3+
4+
#include "tasks/fan_in_tasks.h"
5+
6+
#include <map>
7+
#include <mutex>
8+
#include <stdexcept>
9+
10+
#define FMT_HEADER_ONLY
11+
#include <fmt/format.h>
12+
13+
namespace ccf::tasks
14+
{
15+
struct FanInTasks::PImpl
16+
{
17+
std::string name;
18+
IJobBoard& job_board;
19+
20+
// Synchronise access to pending_tasks and next_expected_task_index
21+
std::mutex pending_tasks_mutex;
22+
std::map<size_t, Task> pending_tasks;
23+
size_t next_expected_task_index = 0;
24+
25+
std::atomic<bool> active = false;
26+
};
27+
28+
void FanInTasks::enqueue_on_board()
29+
{
30+
pimpl->job_board.add_task(shared_from_this());
31+
}
32+
33+
void FanInTasks::do_task_implementation()
34+
{
35+
std::vector<Task> current_batch;
36+
37+
{
38+
std::lock_guard<std::mutex> lock(pimpl->pending_tasks_mutex);
39+
pimpl->active.store(true);
40+
41+
auto it = pimpl->pending_tasks.find(pimpl->next_expected_task_index);
42+
while (it != pimpl->pending_tasks.end())
43+
{
44+
current_batch.push_back(it->second);
45+
pimpl->pending_tasks.erase(it);
46+
47+
++pimpl->next_expected_task_index;
48+
it = pimpl->pending_tasks.find(pimpl->next_expected_task_index);
49+
}
50+
}
51+
52+
for (auto& task : current_batch)
53+
{
54+
task->do_task();
55+
}
56+
57+
{
58+
std::lock_guard<std::mutex> lock(pimpl->pending_tasks_mutex);
59+
pimpl->active.store(false);
60+
61+
auto it = pimpl->pending_tasks.find(pimpl->next_expected_task_index);
62+
if (it != pimpl->pending_tasks.end())
63+
{
64+
// While we were executing the previous batch, a call to fan_in_task
65+
// provided the _next_ contiguous task. We're now responsible for
66+
// re-enqueuing this task
67+
enqueue_on_board();
68+
}
69+
}
70+
}
71+
72+
FanInTasks::FanInTasks(
73+
[[maybe_unused]] FanInTasks::Private force_private_constructor,
74+
IJobBoard& job_board_,
75+
const std::string& name_) :
76+
pimpl(std::make_unique<FanInTasks::PImpl>(name_, job_board_))
77+
{}
78+
79+
FanInTasks::~FanInTasks() = default;
80+
81+
std::string_view FanInTasks::get_name() const
82+
{
83+
return pimpl->name;
84+
}
85+
86+
void FanInTasks::add_task(size_t task_index, Task task)
87+
{
88+
{
89+
std::lock_guard<std::mutex> lock(pimpl->pending_tasks_mutex);
90+
91+
if (task_index < pimpl->next_expected_task_index)
92+
{
93+
throw std::runtime_error(fmt::format(
94+
"[{}] Received task {} ({}) out-of-order - already advanced next "
95+
"expected "
96+
"to {}",
97+
get_name(),
98+
task_index,
99+
task->get_name(),
100+
pimpl->next_expected_task_index));
101+
}
102+
103+
auto it = pimpl->pending_tasks.find(task_index);
104+
if (it != pimpl->pending_tasks.end())
105+
{
106+
throw std::runtime_error(fmt::format(
107+
"[{}] Received duplicate task {} ({}) - already have pending task {}",
108+
get_name(),
109+
task_index,
110+
task->get_name(),
111+
it->second == nullptr ? std::string("nullptr") :
112+
it->second->get_name()));
113+
}
114+
115+
pimpl->pending_tasks.emplace(task_index, task);
116+
117+
if (!pimpl->active.load())
118+
{
119+
if (task_index == pimpl->next_expected_task_index)
120+
{
121+
enqueue_on_board();
122+
}
123+
}
124+
}
125+
}
126+
127+
std::shared_ptr<FanInTasks> FanInTasks::create(
128+
IJobBoard& job_board_, const std::string& name_)
129+
{
130+
return std::make_shared<FanInTasks>(Private{}, job_board_, name_);
131+
}
132+
}

src/tasks/fan_in_tasks.h

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the Apache 2.0 License.
3+
#pragma once
4+
5+
#include "tasks/job_board_interface.h"
6+
#include "tasks/task.h"
7+
8+
#include <memory>
9+
10+
namespace ccf::tasks
11+
{
12+
class FanInTasks : public BaseTask,
13+
public std::enable_shared_from_this<FanInTasks>
14+
{
15+
protected:
16+
struct PImpl;
17+
std::unique_ptr<PImpl> pimpl = nullptr;
18+
19+
void enqueue_on_board();
20+
void do_task_implementation() override;
21+
22+
// Non-public constructor argument type, so this can only be constructed by
23+
// this class (ensuring shared ptr ownership)
24+
struct Private
25+
{
26+
explicit Private() = default;
27+
};
28+
29+
public:
30+
FanInTasks(Private, IJobBoard& job_board_, const std::string& name_);
31+
~FanInTasks();
32+
33+
static std::shared_ptr<FanInTasks> create(
34+
IJobBoard& job_board_, const std::string& name_ = "[FanIn]");
35+
36+
std::string_view get_name() const override;
37+
38+
void add_task(size_t task_index, Task task);
39+
};
40+
}

src/tasks/job_board.cpp

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the Apache 2.0 License.
3+
#include "tasks/job_board.h"
4+
5+
namespace ccf::tasks
6+
{
7+
void JobBoard::add_task(Task&& task)
8+
{
9+
{
10+
std::lock_guard<std::mutex> lock(mutex);
11+
queue.emplace(std::move(task));
12+
}
13+
work_beacon.notify_work_available();
14+
}
15+
16+
Task JobBoard::get_task()
17+
{
18+
std::lock_guard<std::mutex> lock(mutex);
19+
if (queue.empty())
20+
{
21+
return nullptr;
22+
}
23+
24+
Task task = queue.front();
25+
queue.pop();
26+
return task;
27+
}
28+
29+
bool JobBoard::empty()
30+
{
31+
std::lock_guard<std::mutex> lock(mutex);
32+
return queue.empty();
33+
}
34+
35+
Task JobBoard::wait_for_task(const std::chrono::milliseconds& timeout)
36+
{
37+
using TClock = std::chrono::system_clock;
38+
39+
const auto start = TClock::now();
40+
const auto until = start + timeout;
41+
42+
while (true)
43+
{
44+
auto task = get_task();
45+
if (task != nullptr || TClock::now() >= until)
46+
{
47+
return task;
48+
}
49+
50+
work_beacon.wait_for_work_with_timeout(timeout);
51+
}
52+
}
53+
}

src/tasks/job_board.h

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the Apache 2.0 License.
3+
#pragma once
4+
5+
#include "ds/work_beacon.h"
6+
#include "tasks/job_board_interface.h"
7+
8+
#include <mutex>
9+
#include <queue>
10+
11+
namespace ccf::tasks
12+
{
13+
struct JobBoard : public IJobBoard
14+
{
15+
std::mutex mutex;
16+
std::queue<Task> queue;
17+
ccf::ds::WorkBeacon work_beacon;
18+
19+
void add_task(Task&& t) override;
20+
Task get_task() override;
21+
bool empty() override;
22+
23+
Task wait_for_task(const std::chrono::milliseconds& timeout) override;
24+
};
25+
}

src/tasks/job_board_interface.h

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the Apache 2.0 License.
3+
#pragma once
4+
5+
#include "tasks/task.h"
6+
7+
#include <mutex>
8+
#include <queue>
9+
#include <thread>
10+
11+
namespace ccf::tasks
12+
{
13+
struct IJobBoard
14+
{
15+
virtual void add_task(Task&& t) = 0;
16+
virtual Task get_task() = 0;
17+
virtual bool empty() = 0;
18+
19+
virtual Task wait_for_task(const std::chrono::milliseconds& timeout) = 0;
20+
};
21+
}

0 commit comments

Comments
 (0)