Skip to content

Commit 4226b54

Browse files
committed
Thread/AsyncEventDispatcher
1 parent cdfdf6e commit 4226b54

File tree

4 files changed

+86
-1
lines changed

4 files changed

+86
-1
lines changed
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
export module CppUtils.Thread.AsyncEventDispatcher;
2+
3+
import std;
4+
import CppUtils.Execution.EventDispatcher;
5+
import CppUtils.String.Hash;
6+
import CppUtils.Thread.ThreadPool;
7+
8+
export namespace CppUtils::Thread
9+
{
10+
class AsyncEventDispatcher final
11+
{
12+
public:
13+
explicit AsyncEventDispatcher(
14+
std::size_t numberThreads = std::thread::hardware_concurrency(),
15+
std::function<void(std::exception_ptr)> onError = nullptr,
16+
std::function<void()> finally = nullptr):
17+
m_threadPool{numberThreads, std::move(onError), std::move(finally)}
18+
{}
19+
20+
template<String::Hasher eventName = String::Hash{}>
21+
inline auto subscribe(auto&& function) -> void
22+
{
23+
m_eventDispatcher.subscribe<eventName>(std::forward<decltype(function)>(function));
24+
}
25+
26+
template<String::Hasher eventName = String::Hash{}, class Event = std::nullptr_t>
27+
inline auto emit(Event&& event = nullptr) -> void
28+
{
29+
m_threadPool.call([this, event = std::forward<Event>(event)] {
30+
m_eventDispatcher.emit<eventName>(event);
31+
});
32+
}
33+
34+
inline auto waitUntilFinished() -> void
35+
{
36+
m_threadPool.waitUntilFinished();
37+
}
38+
39+
private:
40+
ThreadPool m_threadPool;
41+
Execution::EventDispatcher m_eventDispatcher;
42+
};
43+
}

modules/Thread/Thread.mpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,6 @@ export import CppUtils.Thread.ThreadLoop;
77
export import CppUtils.Thread.ThreadPool;
88
export import CppUtils.Thread.TryAsync;
99
export import CppUtils.Thread.UniqueLocker;
10+
export import CppUtils.Thread.AsyncEventDispatcher;
1011
export import CppUtils.Thread.ScheduledEventDispatcher;
12+
export import CppUtils.Thread.Scheduler;
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
export module CppUtils.UnitTests.Thread.AsyncEventDispatcher;
2+
3+
import std;
4+
import CppUtils;
5+
6+
export namespace CppUtils::UnitTest::Thread::AsyncEventDispatcher
7+
{
8+
inline auto _ = TestSuite{"Thread/AsyncEventDispatcher", {"Thread/ThreadPool", "Execution/EventDispatcher"}, [](auto& suite) {
9+
suite.addTest("Emit and Wait", [&] {
10+
auto dispatcher = CppUtils::Thread::AsyncEventDispatcher{};
11+
std::atomic_bool received = false;
12+
13+
dispatcher.subscribe<"Event">([&](bool value) {
14+
suite.expectEqual(value, true);
15+
received = true;
16+
});
17+
dispatcher.emit<"Event">(true);
18+
19+
dispatcher.waitUntilFinished();
20+
suite.expect(received);
21+
});
22+
23+
suite.addTest("Error Handling", [&] {
24+
std::atomic_bool errorCaught = false;
25+
auto onError = [&](std::exception_ptr) {
26+
errorCaught = true;
27+
};
28+
auto dispatcher = CppUtils::Thread::AsyncEventDispatcher{1, onError};
29+
30+
dispatcher.subscribe<"Event">([&](std::nullptr_t) {
31+
throw std::runtime_error{"Test Error"};
32+
});
33+
dispatcher.emit<"Event">(nullptr);
34+
35+
dispatcher.waitUntilFinished();
36+
suite.expect(errorCaught);
37+
});
38+
}};
39+
}

tests/UnitTests.mpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,9 @@ export import CppUtils.UnitTests.String.Utility;
3434
export import CppUtils.UnitTests.System.Error;
3535
export import CppUtils.UnitTests.Terminal.Canvas;
3636
export import CppUtils.UnitTests.Terminal;
37-
export import CppUtils.UnitTests.Thread.Scheduler;
37+
export import CppUtils.UnitTests.Thread.AsyncEventDispatcher;
3838
export import CppUtils.UnitTests.Thread.ScheduledEventDispatcher;
39+
export import CppUtils.UnitTests.Thread.Scheduler;
3940
export import CppUtils.UnitTests.Thread.SharedLocker;
4041
export import CppUtils.UnitTests.Thread.ThreadLoop;
4142
export import CppUtils.UnitTests.Thread.ThreadPool;

0 commit comments

Comments
 (0)