Skip to content
Merged
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
e1bf9a2
Initial commit of new task system + tests
eddyashton Sep 5, 2025
a80f015
Add some more basic unit tests
eddyashton Sep 5, 2025
cc68420
Merge branch 'main' of github.com:microsoft/CCF into task_system_impl
eddyashton Sep 8, 2025
158b0c7
Add benchmark of contended enqueue/dequeue
eddyashton Sep 8, 2025
ddbdbab
Merge branch 'main' of github.com:microsoft/CCF into task_system_impl
eddyashton Sep 8, 2025
8dc97d0
Oh. This sucks
eddyashton Sep 8, 2025
0d12ea6
More
eddyashton Sep 8, 2025
b0f386b
This does not mean what you think it means
eddyashton Sep 8, 2025
50148e1
Unit test fix
eddyashton Sep 8, 2025
c20b2eb
Remove unhelpful unit test
eddyashton Sep 9, 2025
c77fe58
Merge branch 'main' of github.com:microsoft/CCF into task_system_impl
eddyashton Sep 9, 2025
2ab05d3
Uhh
eddyashton Sep 9, 2025
3f80bdb
Add a FanInTasks container, to maintain keyed-order-execution of some…
eddyashton Sep 9, 2025
7658671
Merge branch 'main' of github.com:microsoft/CCF into task_system_impl
eddyashton Sep 9, 2025
d1802e8
Final test
eddyashton Sep 9, 2025
60a7916
Tidy
eddyashton Sep 10, 2025
115a1fd
Comments to explain test
eddyashton Sep 10, 2025
bf56944
TO REVERT: Verbose logging, force repro loop
eddyashton Sep 10, 2025
361bc6f
TO REVERT: Less aggressive yml changes, still verbose
eddyashton Sep 10, 2025
1d4b209
Revert previous debugging commits
eddyashton Sep 10, 2025
5e32d9f
At least nicer error message when this fails?
eddyashton Sep 10, 2025
365b248
Use deterministic time to simulate worker thread in unit test
eddyashton Sep 11, 2025
3e0fc19
Neater still
eddyashton Sep 11, 2025
a5a07c2
Verbose logging, just in case
eddyashton Sep 11, 2025
ae6ef24
Something subtle is broken with spans - fine, use owning vector
eddyashton Sep 18, 2025
1122163
Merge branch 'main' of github.com:microsoft/CCF into task_system_impl
eddyashton Sep 18, 2025
fb9fa4f
Pffft
eddyashton Sep 18, 2025
b682ba7
Return string_view from get_name
eddyashton Sep 22, 2025
6107b1a
Longer names in core task system
eddyashton Sep 22, 2025
ebd9581
Cleaner pattern to enforce shared ptr ownership
eddyashton Sep 22, 2025
62f16d3
Renames and mutex comments
eddyashton Sep 22, 2025
3c5424a
Merge branch 'main' of github.com:microsoft/CCF into task_system_impl
eddyashton Sep 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,14 @@ add_ccf_static_library(
LINK_LIBS qcbor t_cose http_parser ccfcrypto ccf_kv
)

# CCF task system library
add_ccf_static_library(
ccf_tasks
SRCS ${CCF_DIR}/src/tasks/task_system.cpp ${CCF_DIR}/src/tasks/job_board.cpp
${CCF_DIR}/src/tasks/ordered_tasks.cpp
${CCF_DIR}/src/tasks/fan_in_tasks.cpp
)

# Common test args for Python scripts starting up CCF networks
set(WORKER_THREADS
0
Expand Down Expand Up @@ -529,6 +537,20 @@ if(BUILD_TESTS)
)
target_link_libraries(ds_test PRIVATE ${CMAKE_THREAD_LIBS_INIT})

add_unit_test(
task_system_test
${CMAKE_CURRENT_SOURCE_DIR}/src/tasks/test/basic_tasks.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/tasks/test/ordered_tasks.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/tasks/test/delayed_tasks.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/tasks/test/fan_in_tasks.cpp
)
target_link_libraries(task_system_test PRIVATE ccf_tasks)

add_unit_test(
task_system_demo ${CMAKE_CURRENT_SOURCE_DIR}/src/tasks/test/demo/main.cpp
)
target_link_libraries(task_system_demo PRIVATE ccf_tasks)

add_unit_test(
ledger_test ${CMAKE_CURRENT_SOURCE_DIR}/src/host/test/ledger.cpp
)
Expand Down Expand Up @@ -775,6 +797,14 @@ if(BUILD_TESTS)
)
add_picobench(merkle_bench SRCS src/node/test/merkle_bench.cpp)
add_picobench(hash_bench SRCS src/ds/test/hash_bench.cpp)

add_picobench(
task_bench
SRCS ${CMAKE_CURRENT_SOURCE_DIR}/src/tasks/test/bench/merge_bench.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/tasks/test/bench/sleep_bench.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/tasks/test/bench/contention_bench.cpp
LINK_LIBS ccf_tasks
)
endif()

if(LONG_TESTS)
Expand Down
35 changes: 35 additions & 0 deletions src/tasks/basic_task.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the Apache 2.0 License.
#pragma once

#include "tasks/task.h"

namespace ccf::tasks
{
struct BasicTask : public BaseTask
{
using Fn = std::function<void()>;

Fn fn;
const std::string name;

BasicTask(const Fn& _fn, const std::string& s = "[Anon]") : fn(_fn), name(s)
{}

void do_task_implementation() override
{
fn();
}

std::string get_name() const override
{
return name;
}
};

template <typename... Ts>
Task make_basic_task(Ts&&... ts)
{
return std::make_shared<BasicTask>(std::forward<Ts>(ts)...);
}
}
138 changes: 138 additions & 0 deletions src/tasks/fan_in_tasks.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the Apache 2.0 License.

#include "tasks/fan_in_tasks.h"

#include <map>
#include <mutex>
#include <stdexcept>

#define FMT_HEADER_ONLY
#include <fmt/format.h>

namespace ccf::tasks
{
struct FanInTasks::PImpl
{
std::string name;
IJobBoard& job_board;

std::mutex mutex;
std::map<size_t, Task> pending_tasks;
size_t next_expected_task_index = 0;
std::atomic<bool> active = false;
};

void FanInTasks::enqueue_on_board()
{
pimpl->job_board.add_task(shared_from_this());
}

void FanInTasks::do_task_implementation()
{
std::vector<Task> current_batch;

{
std::lock_guard<std::mutex> lock(pimpl->mutex);
pimpl->active.store(true);

auto it = pimpl->pending_tasks.find(pimpl->next_expected_task_index);
while (it != pimpl->pending_tasks.end())
{
current_batch.push_back(it->second);
pimpl->pending_tasks.erase(it);

++pimpl->next_expected_task_index;
it = pimpl->pending_tasks.find(pimpl->next_expected_task_index);
}
}

for (auto& task : current_batch)
{
task->do_task();
}

{
std::lock_guard<std::mutex> lock(pimpl->mutex);
pimpl->active.store(false);

auto it = pimpl->pending_tasks.find(pimpl->next_expected_task_index);
if (it != pimpl->pending_tasks.end())
{
// While we were executing the previous batch, a call to fan_in_task
// provided the _next_ contiguous task. We're now responsible for
// re-enqueuing this task
enqueue_on_board();
}
}
}

FanInTasks::FanInTasks(IJobBoard& jb, const std::string& s) :
pimpl(std::make_unique<FanInTasks::PImpl>(s, jb))
{}

FanInTasks::~FanInTasks() = default;

std::string FanInTasks::get_name() const
{
return pimpl->name;
}

void FanInTasks::add_task(size_t task_index, Task task)
{
{
std::lock_guard<std::mutex> lock(pimpl->mutex);

if (task_index < pimpl->next_expected_task_index)
{
throw std::runtime_error(fmt::format(
"[{}] Received task {} ({}) out-of-order - already advanced next "
"expected "
"to {}",
get_name(),
task_index,
task->get_name(),
pimpl->next_expected_task_index));
}

auto it = pimpl->pending_tasks.find(task_index);
if (it != pimpl->pending_tasks.end())
{
throw std::runtime_error(fmt::format(
"[{}] Received duplicate task {} ({}) - already have pending task {}",
get_name(),
task_index,
task->get_name(),
it->second == nullptr ? std::string("nullptr") :
it->second->get_name()));
}

pimpl->pending_tasks.emplace(task_index, task);

if (!pimpl->active.load())
{
if (task_index == pimpl->next_expected_task_index)
{
enqueue_on_board();
}
}
}
}

namespace
{
struct ConcreteFanInTasks : public FanInTasks
{
public:
ConcreteFanInTasks(IJobBoard& jb, const std::string& s) :
FanInTasks(jb, s)
{}
};
}

std::shared_ptr<FanInTasks> make_fan_in_tasks(
IJobBoard& jb, const std::string& s)
{
return std::make_shared<ConcreteFanInTasks>(jb, s);
}
}
37 changes: 37 additions & 0 deletions src/tasks/fan_in_tasks.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the Apache 2.0 License.
#pragma once

#include "tasks/job_board_interface.h"
#include "tasks/task.h"

#include <memory>

namespace ccf::tasks
{
class FanInTasks : public BaseTask,
public std::enable_shared_from_this<FanInTasks>
{
protected:
struct PImpl;
std::unique_ptr<PImpl> pimpl = nullptr;

void enqueue_on_board();
void do_task_implementation() override;

// Constructor is protected, to ensure this is only created via the
// make_fan_in_tasks factory function (ensuring this is always owned by
// a shared_ptr)
FanInTasks(IJobBoard& jb, const std::string& s);

public:
~FanInTasks();

std::string get_name() const override;

void add_task(size_t task_index, Task task);
};

std::shared_ptr<FanInTasks> make_fan_in_tasks(
IJobBoard& jb, const std::string& s = "[FanIn]");
}
53 changes: 53 additions & 0 deletions src/tasks/job_board.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the Apache 2.0 License.
#include "tasks/job_board.h"

namespace ccf::tasks
{
void JobBoard::add_task(Task&& t)
{
{
std::lock_guard<std::mutex> lock(mutex);
queue.emplace(std::move(t));
}
work_beacon.notify_work_available();
}

Task JobBoard::get_task()
{
std::lock_guard<std::mutex> lock(mutex);
if (queue.empty())
{
return nullptr;
}

Task t = queue.front();
queue.pop();
return t;
}

bool JobBoard::empty()
{
std::lock_guard<std::mutex> lock(mutex);
return queue.empty();
}

Task JobBoard::wait_for_task(const std::chrono::milliseconds& timeout)
{
using TClock = std::chrono::system_clock;

const auto start = TClock::now();
const auto until = start + timeout;

while (true)
{
auto task = get_task();
if (task != nullptr || TClock::now() >= until)
{
return task;
}

work_beacon.wait_for_work_with_timeout(timeout);
}
}
}
25 changes: 25 additions & 0 deletions src/tasks/job_board.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the Apache 2.0 License.
#pragma once

#include "ds/work_beacon.h"
#include "tasks/job_board_interface.h"

#include <mutex>
#include <queue>

namespace ccf::tasks
{
struct JobBoard : public IJobBoard
{
std::mutex mutex;
std::queue<Task> queue;
ccf::ds::WorkBeacon work_beacon;

void add_task(Task&& t) override;
Task get_task() override;
bool empty() override;

Task wait_for_task(const std::chrono::milliseconds& timeout) override;
};
}
21 changes: 21 additions & 0 deletions src/tasks/job_board_interface.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the Apache 2.0 License.
#pragma once

#include "tasks/task.h"

#include <mutex>
#include <queue>
#include <thread>

namespace ccf::tasks
{
struct IJobBoard
{
virtual void add_task(Task&& t) = 0;
virtual Task get_task() = 0;
virtual bool empty() = 0;

virtual Task wait_for_task(const std::chrono::milliseconds& timeout) = 0;
};
}
Loading