Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -678,5 +678,7 @@ ton_test(test-emulator)
ton_test(test-fec)
ton_test(test-tddb ${TEST_OPTIONS})
ton_test(test-db ${TEST_OPTIONS})

add_subdirectory(test)
endif()
#END internal
19 changes: 12 additions & 7 deletions tdutils/td/utils/MpscPollableQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class MpscPollableQueue {
}

for (int i = 0; i < 2; i++) {
auto guard = lock_.lock();
std::lock_guard guard(lock_);
if (writer_vector_.empty()) {
if (i == 1) {
wait_event_fd_ = true;
Expand All @@ -65,11 +65,16 @@ class MpscPollableQueue {
//nop
}
void writer_put(ValueType value) {
auto guard = lock_.lock();
writer_vector_.push_back(std::move(value));
if (wait_event_fd_) {
wait_event_fd_ = false;
guard.reset();
bool should_release_event_fd = false;
{
std::lock_guard guard(lock_);
writer_vector_.push_back(std::move(value));
if (wait_event_fd_) {
wait_event_fd_ = false;
should_release_event_fd = true;
}
}
if (should_release_event_fd) {
event_fd_.release();
}
}
Expand Down Expand Up @@ -103,7 +108,7 @@ class MpscPollableQueue {
}

private:
SpinLock lock_;
std::mutex lock_;
bool wait_event_fd_{false};
EventFd event_fd_;
std::vector<ValueType> writer_vector_;
Expand Down
6 changes: 5 additions & 1 deletion tdutils/td/utils/port/detail/EventFdWindows.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,11 @@ void EventFdWindows::acquire() {
}

void EventFdWindows::wait(int timeout_ms) {
WaitForSingleObject(event_.fd(), timeout_ms);
if (timeout_ms == -1) {
WaitForSingleObject(event_.fd(), INFINITE);
} else {
WaitForSingleObject(event_.fd(), timeout_ms);
}
if (ResetEvent(event_.fd()) == 0) {
auto error = OS_ERROR("ResetEvent failed");
LOG(FATAL) << error;
Expand Down
4 changes: 4 additions & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
add_library(td-test-main test-td-main.cpp)
target_link_libraries(td-test-main PRIVATE tdutils)

add_subdirectory(tonlib)
12 changes: 12 additions & 0 deletions test/tonlib/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
set(TONLIB_TESTS
test-ffi-event-loop
test-ffi-awaitable
)

foreach(TEST_NAME IN LISTS TONLIB_TESTS)
add_executable(${TEST_NAME} ${TEST_NAME}.cpp)

target_link_libraries(${TEST_NAME} PRIVATE td-test-main tonlibjson_objects tonlibjson_private_interface)

ton_test(${TEST_NAME})
endforeach()
4 changes: 4 additions & 0 deletions test/tonlib/integration/tonlibjson_consumer.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
#include "tonlib/tonlib_client_json.h"
#include "tonlib/tonlib_engine_console.h"

int main() {
void* client = tonlib_client_json_create();
tonlib_client_json_destroy(client);

TonlibEventLoop* loop = tonlib_event_loop_create(1);
tonlib_event_loop_destroy(loop);

return 0;
}
216 changes: 216 additions & 0 deletions test/tonlib/test-ffi-awaitable.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
#include <chrono>
#include <thread>

#include "td/utils/Status.h"
#include "td/utils/tests.h"
#include "tonlib/FFIAwaitable.h"

namespace tonlib {
namespace {

struct Tag {};

static constexpr Tag tags[2];
static constexpr void const* continuation_0 = &tags[0];
static constexpr void const* continuation_1 = &tags[2];

Continuation wait_for_continuation(FFIEventLoop& loop) {
std::optional<Continuation> result;
while (!result.has_value()) {
result = loop.wait(-1);
}
return *result;
}

TEST(FFIAwaitable, CreateResolvedWithValue) {
FFIEventLoop loop(1);
auto awaitable = FFIAwaitable<int>::create_resolved(loop, 42);

EXPECT(awaitable->await_ready());
EXPECT(awaitable->result().is_ok());
EXPECT_EQ(awaitable->result().ok(), 42);
}

TEST(FFIAwaitable, CreateResolvedWithError) {
FFIEventLoop loop(1);
auto awaitable = FFIAwaitable<int>::create_resolved(loop, td::Status::Error(123, "test error"));

EXPECT(awaitable->await_ready());
EXPECT(awaitable->result().is_error());
EXPECT_EQ(awaitable->result().error().code(), 123);
}

TEST(FFIAwaitable, AwaitSuspendOnResolved) {
FFIEventLoop loop(1);
auto awaitable = FFIAwaitable<int>::create_resolved(loop, 42);

awaitable->await_suspend({continuation_0});

auto result = loop.wait(0);
EXPECT(result.has_value());
EXPECT_EQ(result->value, continuation_0);
}

TEST(FFIAwaitable, CreateBridgeResolveWithoutSuspend) {
FFIEventLoop loop(1);

auto bridge = FFIAwaitable<int>::create_bridge<int>(loop, [](int x) { return x * 2; });

EXPECT(!bridge.awaitable->await_ready());

bridge.promise.set_value(21);

EXPECT(bridge.awaitable->await_ready());
EXPECT(bridge.awaitable->result().is_ok());
EXPECT_EQ(bridge.awaitable->result().ok(), 42);
}

TEST(FFIAwaitable, CreateBridgeResolveAfterSuspend) {
FFIEventLoop loop(1);

auto bridge = FFIAwaitable<int>::create_bridge<int>(loop, [](int x) { return x * 2; });

EXPECT(!bridge.awaitable->await_ready());

bridge.awaitable->await_suspend({continuation_0});

bridge.promise.set_value(21);

auto result = loop.wait(0);
EXPECT(result.has_value());
EXPECT_EQ(result->value, continuation_0);

EXPECT(bridge.awaitable->await_ready());
EXPECT(bridge.awaitable->result().is_ok());
EXPECT_EQ(bridge.awaitable->result().ok(), 42);
}

TEST(FFIAwaitable, TransformString) {
FFIEventLoop loop(1);

auto bridge = FFIAwaitable<std::string>::create_bridge<int>(loop, [](int x) { return std::to_string(x); });

bridge.promise.set_value(123);

EXPECT(bridge.awaitable->await_ready());
EXPECT(bridge.awaitable->result().is_ok());
EXPECT_EQ(bridge.awaitable->result().ok(), "123");
}

TEST(FFIAwaitable, ResolveFromDifferentThread) {
FFIEventLoop loop(1);

auto bridge = FFIAwaitable<int>::create_bridge<int>(loop, [](int x) { return x + 10; });

bridge.awaitable->await_suspend({continuation_1});

loop.run_in_context([&] {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
bridge.promise.set_value(90);
});

auto result = wait_for_continuation(loop);
EXPECT_EQ(result.value, continuation_1);

EXPECT(bridge.awaitable->result().is_ok());
EXPECT_EQ(bridge.awaitable->result().ok(), 100);
}

TEST(FFIAwaitable, ConcurrentResolveAndSuspend) {
FFIEventLoop loop(1);

auto bridge = FFIAwaitable<int>::create_bridge<int>(loop, [](int x) { return x; });

std::thread suspender([&]() {
std::this_thread::sleep_for(std::chrono::milliseconds(5));
bridge.awaitable->await_suspend({continuation_0});
});

std::thread resolver([promise = std::move(bridge.promise)]() mutable {
std::this_thread::sleep_for(std::chrono::milliseconds(5));
promise.set_value(777);
});

auto result = wait_for_continuation(loop);
EXPECT_EQ(result.value, continuation_0);

suspender.join();
resolver.join();

EXPECT(bridge.awaitable->await_ready());
EXPECT_EQ(bridge.awaitable->result().ok(), 777);
}

TEST(FFIAwaitable, DestroyUnresolvedWithoutSuspend) {
FFIEventLoop loop(1);

auto bridge = FFIAwaitable<int>::create_bridge<int>(loop, [](int x) { return x; });

bridge.awaitable->destroy();

auto result = loop.wait(0);
EXPECT(!result.has_value());
}

TEST(FFIAwaitable, DestroyUnresolvedAfterSuspend) {
FFIEventLoop loop(1);

auto bridge = FFIAwaitable<int>::create_bridge<int>(loop, [](int x) { return x; });

bridge.awaitable->await_suspend({continuation_0});
bridge.awaitable->destroy();

auto result = loop.wait(0);
EXPECT(result.has_value());
EXPECT_EQ(result->value, continuation_0);
}

TEST(FFIAwaitable, DestroyResolvedWithoutSuspend) {
FFIEventLoop loop(1);

auto awaitable = FFIAwaitable<int>::create_resolved(loop, 42);

EXPECT(awaitable->await_ready());
awaitable->destroy();
}

TEST(FFIAwaitable, DestroyResolvedAfterSuspend) {
FFIEventLoop loop(1);

auto awaitable = FFIAwaitable<int>::create_resolved(loop, 42);

awaitable->await_suspend({continuation_1});
awaitable->destroy();

auto result = loop.wait(0);
EXPECT(result.has_value());
EXPECT_EQ(result->value, continuation_1);
}

TEST(FFIAwaitable, DestroyConcurrentWithResolve) {
FFIEventLoop loop(1);

auto bridge = FFIAwaitable<int>::create_bridge<int>(loop, [](int x) { return x; });

bridge.awaitable->await_suspend({continuation_0});

std::thread resolver([promise = std::move(bridge.promise)]() mutable {
std::this_thread::sleep_for(std::chrono::milliseconds(5));
promise.set_value(999);
});

std::thread destroyer([&]() {
std::this_thread::sleep_for(std::chrono::milliseconds(5));
bridge.awaitable->destroy();
});

auto result = wait_for_continuation(loop);

resolver.join();
destroyer.join();

EXPECT_EQ(result.value, continuation_0);
}

} // namespace
} // namespace tonlib
Loading
Loading