Skip to content

Commit cdfdf6e

Browse files
committed
Thread/ScheduledEventDispatcher
1 parent 3797c96 commit cdfdf6e

File tree

10 files changed

+104
-7
lines changed

10 files changed

+104
-7
lines changed
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
export module CppUtils.Thread.ScheduledEventDispatcher;
2+
3+
import std;
4+
import CppUtils.Chrono.Concept;
5+
import CppUtils.Execution.EventDispatcher;
6+
import CppUtils.String.Hash;
7+
import CppUtils.Thread.Scheduler;
8+
9+
export namespace CppUtils::Thread
10+
{
11+
class ScheduledEventDispatcher final
12+
{
13+
public:
14+
using Clock = std::chrono::steady_clock;
15+
using TimePoint = Clock::time_point;
16+
17+
explicit ScheduledEventDispatcher(
18+
Scheduler::Clock::duration step = std::chrono::milliseconds(10),
19+
std::size_t numberThreads = std::thread::hardware_concurrency(),
20+
std::function<void(std::exception_ptr)> onError = nullptr,
21+
std::function<void()> finally = nullptr):
22+
m_scheduler{step, numberThreads, std::move(onError), std::move(finally)}
23+
{}
24+
25+
template<String::Hasher eventName = String::Hash{}>
26+
inline auto subscribe(auto&& function) -> void
27+
{
28+
m_eventDispatcher.subscribe<eventName>(std::forward<decltype(function)>(function));
29+
}
30+
31+
template<String::Hasher eventName = String::Hash{}, class Event = std::nullptr_t, Chrono::Duration Delay = std::chrono::milliseconds>
32+
inline auto emit(Event&& event = nullptr, Delay delay = std::chrono::milliseconds{0}) -> void
33+
{
34+
m_scheduler.schedule([this, event = std::forward<Event>(event)] {
35+
m_eventDispatcher.emit<eventName>(event);
36+
}, delay);
37+
}
38+
39+
template<String::Hasher eventName = String::Hash{}, class Event>
40+
inline auto emit(Event&& event, TimePoint when) -> void
41+
{
42+
m_scheduler.schedule([this, event = std::forward<Event>(event)] {
43+
m_eventDispatcher.emit<eventName>(event);
44+
}, when);
45+
}
46+
47+
inline auto waitUntilFinished() -> void
48+
{
49+
m_scheduler.waitUntilFinished();
50+
}
51+
52+
private:
53+
Scheduler m_scheduler;
54+
Execution::EventDispatcher m_eventDispatcher;
55+
};
56+
}

modules/Thread/Thread.mpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@ export import CppUtils.Thread.ThreadLoop;
77
export import CppUtils.Thread.ThreadPool;
88
export import CppUtils.Thread.TryAsync;
99
export import CppUtils.Thread.UniqueLocker;
10+
export import CppUtils.Thread.ScheduledEventDispatcher;
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
export module CppUtils.UnitTests.Thread.ScheduledEventDispatcher;
2+
3+
import std;
4+
import CppUtils;
5+
6+
export namespace CppUtils::UnitTest::Thread::ScheduledEventDispatcher
7+
{
8+
inline auto _ = TestSuite{"Thread/ScheduledEventDispatcher", {"Execution/EventDispatcher", "Thread/Scheduler"}, [](auto& suite) {
9+
using namespace std::chrono_literals;
10+
11+
suite.addTest("Immediate Emit", [&] {
12+
auto dispatcher = CppUtils::Thread::ScheduledEventDispatcher{};
13+
auto receivedValue = std::atomic_bool{};
14+
15+
dispatcher.subscribe<"Event">([&](bool value) {
16+
receivedValue = value;
17+
});
18+
dispatcher.emit<"Event">(true);
19+
20+
dispatcher.waitUntilFinished();
21+
suite.expectEqual(receivedValue.load(), true);
22+
});
23+
24+
suite.addTest("Delayed Emit", [&] {
25+
auto dispatcher = CppUtils::Thread::ScheduledEventDispatcher{};
26+
auto receivedValue = std::atomic_bool{};
27+
28+
dispatcher.subscribe<"Event">([&](bool value) {
29+
receivedValue = value;
30+
});
31+
dispatcher.emit<"Event">(true, 200ms);
32+
33+
std::this_thread::sleep_for(100ms);
34+
suite.expectEqual(receivedValue.load(), false);
35+
dispatcher.waitUntilFinished();
36+
suite.expectEqual(receivedValue.load(), true);
37+
});
38+
}};
39+
}

tests/Thread/Scheduler.mpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,7 @@ import CppUtils;
55

66
export namespace CppUtils::UnitTest::Thread::Scheduler
77
{
8-
inline auto _ = TestSuite{
9-
"Thread/Scheduler", {"UnitTest"}, []([[maybe_unused]] auto& suite) {
8+
inline auto _ = TestSuite{"Thread/Scheduler", {"UnitTest"}, []([[maybe_unused]] auto& suite) {
109
using namespace std::chrono_literals;
1110

1211
suite.addTest("Initialize Scheduler", [&] {

tests/Thread/SharedLocker.mpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
export module CppUtils.UnitTests.Thread.SharedLocker;
22

3+
import std;
34
import CppUtils;
45

56
export namespace CppUtils::UnitTest::Thread::SharedLocker
67
{
7-
inline auto _ = CppUtils::UnitTest::TestSuite{"Thread/SharedLocker", {"UnitTest", "Thread/UniqueLocker"}, [](auto& suite) {
8+
inline auto _ = TestSuite{"Thread/SharedLocker", {"UnitTest", "Thread/UniqueLocker"}, [](auto& suite) {
89
suite.addTest("Unique access", [&] {
910
auto lockedString = CppUtils::Thread::SharedLocker<std::string>{"Foo"};
1011
{

tests/Thread/ThreadLoop.mpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import CppUtils;
55

66
export namespace CppUtils::UnitTest::Thread::ThreadLoop
77
{
8-
inline auto _ = CppUtils::UnitTest::TestSuite{"Thread/ThreadLoop", {"UnitTest"}, [](auto& suite) {
8+
inline auto _ = TestSuite{"Thread/ThreadLoop", {"UnitTest"}, [](auto& suite) {
99
using namespace std::chrono_literals;
1010
using Logger = CppUtils::Logger<"CppUtils">;
1111

tests/Thread/ThreadPool.mpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import CppUtils;
55

66
export namespace CppUtils::UnitTest::Thread::ThreadPool
77
{
8-
inline auto _ = CppUtils::UnitTest::TestSuite{"Thread/ThreadPool", {"UnitTest"}, [](auto& suite) {
8+
inline auto _ = TestSuite{"Thread/ThreadPool", {"UnitTest"}, [](auto& suite) {
99
using namespace std::literals;
1010
using namespace std::chrono_literals;
1111
using Logger = CppUtils::Logger<"CppUtils">;

tests/Thread/TryAsync.mpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ export module CppUtils.UnitTests.Thread.TryAsync;
22

33
import std;
44
import CppUtils;
5-
import CppUtils.Thread.TryAsync;
65

76
export namespace CppUtils::UnitTest::Thread
87
{

tests/Thread/UniqueLocker.mpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
export module CppUtils.UnitTests.Thread.UniqueLocker;
22

3+
import std;
34
import CppUtils;
45

56
export namespace CppUtils::UnitTest::Thread::UniqueLocker
67
{
7-
inline auto _ = CppUtils::UnitTest::TestSuite{"Thread/UniqueLocker", {"UnitTest"}, [](auto& suite) {
8+
inline auto _ = TestSuite{"Thread/UniqueLocker", {"UnitTest"}, [](auto& suite) {
89
suite.addTest("Access", [&] {
910
auto lockedString = CppUtils::Thread::UniqueLocker<std::string>{"Test"};
1011
auto accessor = lockedString.access();

tests/UnitTests.mpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ export import CppUtils.UnitTests.System.Error;
3535
export import CppUtils.UnitTests.Terminal.Canvas;
3636
export import CppUtils.UnitTests.Terminal;
3737
export import CppUtils.UnitTests.Thread.Scheduler;
38+
export import CppUtils.UnitTests.Thread.ScheduledEventDispatcher;
3839
export import CppUtils.UnitTests.Thread.SharedLocker;
3940
export import CppUtils.UnitTests.Thread.ThreadLoop;
4041
export import CppUtils.UnitTests.Thread.ThreadPool;

0 commit comments

Comments
 (0)