diff --git a/CMakeLists.txt b/CMakeLists.txt index 2cf917f5c..7811cc570 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -678,5 +678,7 @@ ton_test(test-emulator) ton_test(test-fec) ton_test(test-tddb ${TEST_OPTIONS}) ton_test(test-db ${TEST_OPTIONS}) + +add_subdirectory(test) endif() #END internal diff --git a/tdutils/td/utils/MpscPollableQueue.h b/tdutils/td/utils/MpscPollableQueue.h index 1ad72da0c..4c02ca95f 100644 --- a/tdutils/td/utils/MpscPollableQueue.h +++ b/tdutils/td/utils/MpscPollableQueue.h @@ -42,7 +42,7 @@ class MpscPollableQueue { } for (int i = 0; i < 2; i++) { - auto guard = lock_.lock(); + std::lock_guard guard(lock_); if (writer_vector_.empty()) { if (i == 1) { wait_event_fd_ = true; @@ -65,11 +65,16 @@ class MpscPollableQueue { //nop } void writer_put(ValueType value) { - auto guard = lock_.lock(); - writer_vector_.push_back(std::move(value)); - if (wait_event_fd_) { - wait_event_fd_ = false; - guard.reset(); + bool should_release_event_fd = false; + { + std::lock_guard guard(lock_); + writer_vector_.push_back(std::move(value)); + if (wait_event_fd_) { + wait_event_fd_ = false; + should_release_event_fd = true; + } + } + if (should_release_event_fd) { event_fd_.release(); } } @@ -103,7 +108,7 @@ class MpscPollableQueue { } private: - SpinLock lock_; + std::mutex lock_; bool wait_event_fd_{false}; EventFd event_fd_; std::vector writer_vector_; diff --git a/tdutils/td/utils/port/detail/EventFdWindows.cpp b/tdutils/td/utils/port/detail/EventFdWindows.cpp index e4656b443..2fd16d5a6 100644 --- a/tdutils/td/utils/port/detail/EventFdWindows.cpp +++ b/tdutils/td/utils/port/detail/EventFdWindows.cpp @@ -67,7 +67,11 @@ void EventFdWindows::acquire() { } void EventFdWindows::wait(int timeout_ms) { - WaitForSingleObject(event_.fd(), timeout_ms); + if (timeout_ms == -1) { + WaitForSingleObject(event_.fd(), INFINITE); + } else { + WaitForSingleObject(event_.fd(), timeout_ms); + } if (ResetEvent(event_.fd()) == 0) { auto error = OS_ERROR("ResetEvent failed"); LOG(FATAL) << error; diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt new file mode 100644 index 000000000..2bc48abbe --- /dev/null +++ b/test/CMakeLists.txt @@ -0,0 +1,4 @@ +add_library(td-test-main test-td-main.cpp) +target_link_libraries(td-test-main PRIVATE tdutils) + +add_subdirectory(tonlib) diff --git a/test/tonlib/CMakeLists.txt b/test/tonlib/CMakeLists.txt new file mode 100644 index 000000000..5f177dc8c --- /dev/null +++ b/test/tonlib/CMakeLists.txt @@ -0,0 +1,12 @@ +set(TONLIB_TESTS + test-ffi-event-loop + test-ffi-awaitable +) + +foreach(TEST_NAME IN LISTS TONLIB_TESTS) + add_executable(${TEST_NAME} ${TEST_NAME}.cpp) + + target_link_libraries(${TEST_NAME} PRIVATE td-test-main tonlibjson_objects tonlibjson_private_interface) + + ton_test(${TEST_NAME}) +endforeach() diff --git a/test/tonlib/integration/tonlibjson_consumer.cpp b/test/tonlib/integration/tonlibjson_consumer.cpp index ee8528e92..436b16180 100644 --- a/test/tonlib/integration/tonlibjson_consumer.cpp +++ b/test/tonlib/integration/tonlibjson_consumer.cpp @@ -1,8 +1,12 @@ #include "tonlib/tonlib_client_json.h" +#include "tonlib/tonlib_engine_console.h" int main() { void* client = tonlib_client_json_create(); tonlib_client_json_destroy(client); + TonlibEventLoop* loop = tonlib_event_loop_create(1); + tonlib_event_loop_destroy(loop); + return 0; } diff --git a/test/tonlib/test-ffi-awaitable.cpp b/test/tonlib/test-ffi-awaitable.cpp new file mode 100644 index 000000000..08209705f --- /dev/null +++ b/test/tonlib/test-ffi-awaitable.cpp @@ -0,0 +1,216 @@ +#include +#include + +#include "td/utils/Status.h" +#include "td/utils/tests.h" +#include "tonlib/FFIAwaitable.h" + +namespace tonlib { +namespace { + +struct Tag {}; + +static constexpr Tag tags[2]; +static constexpr void const* continuation_0 = &tags[0]; +static constexpr void const* continuation_1 = &tags[2]; + +Continuation wait_for_continuation(FFIEventLoop& loop) { + std::optional result; + while (!result.has_value()) { + result = loop.wait(-1); + } + return *result; +} + +TEST(FFIAwaitable, CreateResolvedWithValue) { + FFIEventLoop loop(1); + auto awaitable = FFIAwaitable::create_resolved(loop, 42); + + EXPECT(awaitable->await_ready()); + EXPECT(awaitable->result().is_ok()); + EXPECT_EQ(awaitable->result().ok(), 42); +} + +TEST(FFIAwaitable, CreateResolvedWithError) { + FFIEventLoop loop(1); + auto awaitable = FFIAwaitable::create_resolved(loop, td::Status::Error(123, "test error")); + + EXPECT(awaitable->await_ready()); + EXPECT(awaitable->result().is_error()); + EXPECT_EQ(awaitable->result().error().code(), 123); +} + +TEST(FFIAwaitable, AwaitSuspendOnResolved) { + FFIEventLoop loop(1); + auto awaitable = FFIAwaitable::create_resolved(loop, 42); + + awaitable->await_suspend({continuation_0}); + + auto result = loop.wait(0); + EXPECT(result.has_value()); + EXPECT_EQ(result->value, continuation_0); +} + +TEST(FFIAwaitable, CreateBridgeResolveWithoutSuspend) { + FFIEventLoop loop(1); + + auto bridge = FFIAwaitable::create_bridge(loop, [](int x) { return x * 2; }); + + EXPECT(!bridge.awaitable->await_ready()); + + bridge.promise.set_value(21); + + EXPECT(bridge.awaitable->await_ready()); + EXPECT(bridge.awaitable->result().is_ok()); + EXPECT_EQ(bridge.awaitable->result().ok(), 42); +} + +TEST(FFIAwaitable, CreateBridgeResolveAfterSuspend) { + FFIEventLoop loop(1); + + auto bridge = FFIAwaitable::create_bridge(loop, [](int x) { return x * 2; }); + + EXPECT(!bridge.awaitable->await_ready()); + + bridge.awaitable->await_suspend({continuation_0}); + + bridge.promise.set_value(21); + + auto result = loop.wait(0); + EXPECT(result.has_value()); + EXPECT_EQ(result->value, continuation_0); + + EXPECT(bridge.awaitable->await_ready()); + EXPECT(bridge.awaitable->result().is_ok()); + EXPECT_EQ(bridge.awaitable->result().ok(), 42); +} + +TEST(FFIAwaitable, TransformString) { + FFIEventLoop loop(1); + + auto bridge = FFIAwaitable::create_bridge(loop, [](int x) { return std::to_string(x); }); + + bridge.promise.set_value(123); + + EXPECT(bridge.awaitable->await_ready()); + EXPECT(bridge.awaitable->result().is_ok()); + EXPECT_EQ(bridge.awaitable->result().ok(), "123"); +} + +TEST(FFIAwaitable, ResolveFromDifferentThread) { + FFIEventLoop loop(1); + + auto bridge = FFIAwaitable::create_bridge(loop, [](int x) { return x + 10; }); + + bridge.awaitable->await_suspend({continuation_1}); + + loop.run_in_context([&] { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + bridge.promise.set_value(90); + }); + + auto result = wait_for_continuation(loop); + EXPECT_EQ(result.value, continuation_1); + + EXPECT(bridge.awaitable->result().is_ok()); + EXPECT_EQ(bridge.awaitable->result().ok(), 100); +} + +TEST(FFIAwaitable, ConcurrentResolveAndSuspend) { + FFIEventLoop loop(1); + + auto bridge = FFIAwaitable::create_bridge(loop, [](int x) { return x; }); + + std::thread suspender([&]() { + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + bridge.awaitable->await_suspend({continuation_0}); + }); + + std::thread resolver([promise = std::move(bridge.promise)]() mutable { + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + promise.set_value(777); + }); + + auto result = wait_for_continuation(loop); + EXPECT_EQ(result.value, continuation_0); + + suspender.join(); + resolver.join(); + + EXPECT(bridge.awaitable->await_ready()); + EXPECT_EQ(bridge.awaitable->result().ok(), 777); +} + +TEST(FFIAwaitable, DestroyUnresolvedWithoutSuspend) { + FFIEventLoop loop(1); + + auto bridge = FFIAwaitable::create_bridge(loop, [](int x) { return x; }); + + bridge.awaitable->destroy(); + + auto result = loop.wait(0); + EXPECT(!result.has_value()); +} + +TEST(FFIAwaitable, DestroyUnresolvedAfterSuspend) { + FFIEventLoop loop(1); + + auto bridge = FFIAwaitable::create_bridge(loop, [](int x) { return x; }); + + bridge.awaitable->await_suspend({continuation_0}); + bridge.awaitable->destroy(); + + auto result = loop.wait(0); + EXPECT(result.has_value()); + EXPECT_EQ(result->value, continuation_0); +} + +TEST(FFIAwaitable, DestroyResolvedWithoutSuspend) { + FFIEventLoop loop(1); + + auto awaitable = FFIAwaitable::create_resolved(loop, 42); + + EXPECT(awaitable->await_ready()); + awaitable->destroy(); +} + +TEST(FFIAwaitable, DestroyResolvedAfterSuspend) { + FFIEventLoop loop(1); + + auto awaitable = FFIAwaitable::create_resolved(loop, 42); + + awaitable->await_suspend({continuation_1}); + awaitable->destroy(); + + auto result = loop.wait(0); + EXPECT(result.has_value()); + EXPECT_EQ(result->value, continuation_1); +} + +TEST(FFIAwaitable, DestroyConcurrentWithResolve) { + FFIEventLoop loop(1); + + auto bridge = FFIAwaitable::create_bridge(loop, [](int x) { return x; }); + + bridge.awaitable->await_suspend({continuation_0}); + + std::thread resolver([promise = std::move(bridge.promise)]() mutable { + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + promise.set_value(999); + }); + + std::thread destroyer([&]() { + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + bridge.awaitable->destroy(); + }); + + auto result = wait_for_continuation(loop); + + resolver.join(); + destroyer.join(); + + EXPECT_EQ(result.value, continuation_0); +} + +} // namespace +} // namespace tonlib diff --git a/test/tonlib/test-ffi-event-loop.cpp b/test/tonlib/test-ffi-event-loop.cpp new file mode 100644 index 000000000..aff9577fc --- /dev/null +++ b/test/tonlib/test-ffi-event-loop.cpp @@ -0,0 +1,315 @@ +#include +#include +#include + +#include "td/utils/tests.h" +#include "tonlib/FFIEventLoop.h" + +namespace tonlib { +namespace { + +#define EXPECT_APPROXIMATE_TIME(elapsed_ms, expected_ms, tolerance_ms) \ + EXPECT((elapsed_ms) >= (expected_ms) - (tolerance_ms) && (elapsed_ms) <= (expected_ms) + (tolerance_ms)) + +struct Tag {}; + +static constexpr Tag tags[3]; +static constexpr void const* continuation_0 = &tags[0]; +static constexpr void const* continuation_1 = &tags[1]; +static constexpr void const* continuation_2 = &tags[2]; + +auto measure_time(auto&& func) { + auto start = std::chrono::steady_clock::now(); + func(); + auto end = std::chrono::steady_clock::now(); + return std::chrono::duration_cast(end - start).count(); +} + +Continuation wait_for_continuation(FFIEventLoop& loop) { + std::optional result; + while (!result.has_value()) { + result = loop.wait(-1); + } + return *result; +} + +TEST(FFIEventLoop, WaitTimeout) { + FFIEventLoop loop(1); + auto elapsed = measure_time([&] { + auto result = loop.wait(0.02); + EXPECT(!result.has_value()); + }); + EXPECT_APPROXIMATE_TIME(elapsed, 20, 15); +} + +TEST(FFIEventLoop, PutBeforeWait) { + FFIEventLoop loop(1); + loop.put({continuation_0}); + auto result = loop.wait(0); + EXPECT(result.has_value()); + EXPECT_EQ(result->value, continuation_0); +} + +TEST(FFIEventLoop, WaitThenPut) { + FFIEventLoop loop(1); + + std::thread producer([&]() { + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + loop.put({continuation_1}); + }); + + auto elapsed = measure_time([&] { + auto result = wait_for_continuation(loop); + EXPECT_EQ(result.value, continuation_1); + }); + + producer.join(); + EXPECT_APPROXIMATE_TIME(elapsed, 20, 15); +} + +TEST(FFIEventLoop, CancelBeforeWait) { + FFIEventLoop loop(1); + loop.cancel(); + + auto elapsed = measure_time([&] { + auto result = loop.wait(1.0); + EXPECT(!result.has_value()); + }); + + EXPECT(elapsed < 10); + + auto result = loop.wait(1.0); + EXPECT(!result.has_value()); +} + +TEST(FFIEventLoop, CancelDuringWait) { + FFIEventLoop loop(1); + + std::thread canceller([&]() { + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + loop.cancel(); + }); + + auto elapsed = measure_time([&] { + auto result = loop.wait(2.0); + EXPECT(!result.has_value()); + }); + + canceller.join(); + EXPECT_APPROXIMATE_TIME(elapsed, 20, 15); +} + +TEST(FFIEventLoop, MultiplePuts) { + FFIEventLoop loop(1); + + loop.put({continuation_0}); + loop.put({continuation_1}); + loop.put({continuation_2}); + + auto result1 = loop.wait(0); + auto result2 = loop.wait(0); + auto result3 = loop.wait(0); + auto result4 = loop.wait(0); + + EXPECT(result1.has_value() && result1->value == continuation_0); + EXPECT(result2.has_value() && result2->value == continuation_1); + EXPECT(result3.has_value() && result3->value == continuation_2); + EXPECT(!result4.has_value()); +} + +TEST(FFIEventLoop, ActorCounterBlocksDestructor) { + bool guard_destroyed = false; + + std::thread actor_thread; + + auto elapsed = measure_time([&] { + FFIEventLoop loop(1); + + auto guard = loop.new_actor(); + + actor_thread = std::thread([guard = std::move(guard), &guard_destroyed]() mutable { + std::this_thread::sleep_for(std::chrono::milliseconds(20)); + guard_destroyed = true; + // The reset is supposed to happen before FFIEventLoop destructor exits, so TSAN won't not say + // that `EXPECT(guard_destroyed)` later is a data race. + guard.reset(); + }); + }); + + EXPECT(guard_destroyed); + EXPECT_APPROXIMATE_TIME(elapsed, 20, 15); + + actor_thread.join(); +} + +TEST(FFIEventLoop, MultipleActors) { + bool all_destroyed = false; + + std::thread destroyer; + + auto elapsed = measure_time([&] { + FFIEventLoop loop(1); + + std::vector> guards; + guards.push_back(loop.new_actor()); + guards.push_back(loop.new_actor()); + guards.push_back(loop.new_actor()); + + destroyer = std::thread([guards = std::move(guards), &all_destroyed]() mutable { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + guards[0].reset(); + guards[1].reset(); + all_destroyed = true; + guards[2].reset(); + }); + }); + + EXPECT(all_destroyed); + EXPECT_APPROXIMATE_TIME(elapsed, 10, 15); + + destroyer.join(); +} + +TEST(FFIEventLoop, RunInContext) { + bool executed{false}; + + { + FFIEventLoop loop(1); + + loop.run_in_context([&]() { + executed = true; + EXPECT(td::actor::SchedulerContext::get() != nullptr); + }); + } + + EXPECT(executed); +} + +TEST(FFIEventLoop, ConcurrentPuts) { + FFIEventLoop loop(1); + + const int num_threads = 5; + const int puts_per_thread = 10; + + Tag continuations[num_threads * puts_per_thread]; + + std::vector producers; + for (int i = 0; i < num_threads; ++i) { + producers.emplace_back([&, i]() { + for (int j = 0; j < puts_per_thread; ++j) { + loop.put({&continuations[i * puts_per_thread + j]}); + } + }); + } + + for (auto& t : producers) { + t.join(); + } + + std::set received; + for (int i = 0; i < num_threads * puts_per_thread; ++i) { + auto result = wait_for_continuation(loop); + received.insert(const_cast(result.value)); + } + + EXPECT_EQ(received.size(), static_cast(num_threads * puts_per_thread)); + for (int i = 0; i < num_threads * puts_per_thread; ++i) { + EXPECT(received.contains(&continuations[i])); + } + + auto result = loop.wait(0.01); + EXPECT(!result.has_value()); +} + +TEST(FFIEventLoop, BackgroundThreadFlow) { + FFIEventLoop loop(1); + + std::atomic background_running{true}; + std::vector received; + + std::thread background([&]() { + while (background_running) { + auto result = loop.wait(0.01); + if (result.has_value()) { + received.push_back(const_cast(result->value)); + } + } + }); + + loop.put({continuation_0}); + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + loop.put({continuation_1}); + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + loop.put({continuation_2}); + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + + background_running = false; + loop.cancel(); + background.join(); + + EXPECT_EQ(received.size(), 3u); + EXPECT_EQ(received[0], continuation_0); + EXPECT_EQ(received[1], continuation_1); + EXPECT_EQ(received[2], continuation_2); +} + +TEST(FFIEventLoop, PutFromSchedulerContext) { + FFIEventLoop loop(1); + + loop.run_in_context([&]() { loop.put({continuation_0}); }); + + auto result = wait_for_continuation(loop); + EXPECT_EQ(result.value, continuation_0); +} + +TEST(FFIEventLoop, InterleavedPutsAndWaits) { + FFIEventLoop loop(1); + + loop.put({continuation_0}); + auto result1 = loop.wait(0.01); + EXPECT(result1.has_value() && result1->value == continuation_0); + + loop.put({continuation_1}); + auto result2 = loop.wait(0.01); + EXPECT(result2.has_value() && result2->value == continuation_1); + + loop.put({continuation_2}); + auto result3 = loop.wait(0.01); + EXPECT(result3.has_value() && result3->value == continuation_2); + + auto result4 = loop.wait(0.01); + EXPECT(!result4.has_value()); +} + +TEST(FFIEventLoop, NullptrContinuation) { + FFIEventLoop loop(1); + loop.put({nullptr}); + + auto result = loop.wait(0.01); + EXPECT(result.has_value()); + EXPECT_EQ(result->value, nullptr); +} + +TEST(FFIEventLoop, CancelMultipleTimes) { + FFIEventLoop loop(1); + + loop.cancel(); + loop.cancel(); + loop.cancel(); + + auto result = loop.wait(0.01); + EXPECT(!result.has_value()); +} + +TEST(FFIEventLoop, PutAfterCancel) { + FFIEventLoop loop(1); + + loop.cancel(); + loop.put({continuation_0}); + + auto result = loop.wait(0.01); + EXPECT(!result.has_value()); +} + +} // namespace +} // namespace tonlib diff --git a/test/tontester/src/tonlib/__init__.py b/test/tontester/src/tonlib/__init__.py index 29a6dc619..054203fbc 100644 --- a/test/tontester/src/tonlib/__init__.py +++ b/test/tontester/src/tonlib/__init__.py @@ -1,7 +1,14 @@ from .client import TonlibClient -from .tonlibjson import TonlibError +from .engine_console import EngineConsoleClient +from .event_loop import TonlibEventLoop +from .tonlib_cdll import TonlibCDLL +from .tonlibjson import TonLib, TonlibError __all__ = [ + "EngineConsoleClient", + "TonLib", + "TonlibCDLL", "TonlibClient", "TonlibError", + "TonlibEventLoop", ] diff --git a/test/tontester/src/tonlib/engine_console.py b/test/tontester/src/tonlib/engine_console.py new file mode 100644 index 000000000..2a8bd7d00 --- /dev/null +++ b/test/tontester/src/tonlib/engine_console.py @@ -0,0 +1,100 @@ +import json +import traceback +from typing import cast, final + +from tontester.tl import ton_api + +from tl import JSONSerializable, TLRequest + +from .event_loop import TonlibEventLoop +from .tonlib_cdll import TonlibCDLL + + +class LocalError(Exception): + def __init__(self, code: int, message: str): + super().__init__(message) + self.code: int = code + self.message: str = message + + +class RemoteError(Exception): + def __init__(self, code: int, message: str): + super().__init__(message) + self.code: int = code + self.message: str = message + + +@final +class EngineConsoleClient: + def __init__( + self, + tonlib: TonlibCDLL, + event_loop: TonlibEventLoop, + config: ton_api.EngineConsoleClient_config, + ): + self._tonlib = tonlib + self._event_loop = event_loop + config_json = config.to_json().encode() + self._console = tonlib.engine_console_create(event_loop.loop, config_json) + + if tonlib.engine_console_is_error(self._console): + error_code = tonlib.engine_console_get_error_code(self._console) + error_message = tonlib.engine_console_get_error_message(self._console).decode() + tonlib.engine_console_destroy(self._console) + self._console = 0 + raise LocalError(error_code, error_message) + + def __del__(self): + assert self._console == 0, ( + "EngineConsoleClient not destroyed. Call 'aclose' before destroying the object." + ) + + async def aclose(self) -> None: + if self._console == 0: + return + + self._tonlib.engine_console_destroy(self._console) + self._console = 0 + + async def __aenter__(self): + return self + + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: traceback.TracebackException | None, + ): + await self.aclose() + + async def request(self, request: TLRequest) -> JSONSerializable: + response = self._tonlib.engine_console_request(self._console, request.to_json().encode()) + + try: + if not self._tonlib.response_await_ready(response): + continuation_id, future = self._event_loop.create_awaitable_future() + self._tonlib.response_await_suspend(response, continuation_id) + await future + + if self._tonlib.response_is_error(response): + error_code = self._tonlib.response_get_error_code(response) + error_message = self._tonlib.response_get_error_message(response).decode() + raise LocalError(error_code, error_message) + + response_json = self._tonlib.response_get_response(response).decode() + response_json = cast(JSONSerializable, json.loads(response_json)) + + if ( + isinstance(response_json, dict) + and response_json.get("@type", None) == "engine.validator.controlQueryError" + ): + error = ton_api.Engine_validator_controlQueryError.from_dict(response_json) + raise RemoteError(error.code, error.message) + + return response_json + finally: + self._tonlib.response_destroy(response) + + async def get_actor_stats(self) -> str: + query = ton_api.Engine_validator_getActorTextStatsRequest() + return query.parse_result(await self.request(query)).data diff --git a/test/tontester/src/tonlib/event_loop.py b/test/tontester/src/tonlib/event_loop.py new file mode 100644 index 000000000..f57e4ce47 --- /dev/null +++ b/test/tontester/src/tonlib/event_loop.py @@ -0,0 +1,101 @@ +import asyncio +import logging +import threading +import traceback +from typing import final + +from .tonlib_cdll import TonlibCDLL + +logger = logging.getLogger(__name__) + + +@final +class TonlibEventLoop: + def __init__(self, tonlib: TonlibCDLL, event_loop: asyncio.AbstractEventLoop, threads: int = 1): + self._tonlib = tonlib + self._loop = tonlib.event_loop_create(threads) + self._next_continuation_id = 1 + self._futures: dict[int, asyncio.Future[None]] = {} + self._py_event_loop = event_loop + self._cancelled = False + self._background_error: Exception | None = None + + self._background_thread = threading.Thread(target=self._poll_loop, daemon=True) + self._background_thread.start() + + def __del__(self): + assert self._loop == 0, ( + "EventLoop not destroyed. Call 'aclose' before destroying the object." + ) + + async def aclose(self) -> None: + if self._loop == 0: + return + + try: + self._cancelled = True + self._tonlib.event_loop_cancel(self._loop) + self._background_thread.join() + + for future in self._futures.values(): + if not future.done(): + future.set_exception(asyncio.CancelledError()) + self._futures.clear() + finally: + self._tonlib.event_loop_destroy(self._loop) + self._loop = 0 + + async def __aenter__(self): + return self + + async def __aexit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: traceback.TracebackException | None, + ): + await self.aclose() + + def _set_resolved(self, future: asyncio.Future[None]): + if not future.done(): + future.set_result(None) + + def _set_exception(self, future: asyncio.Future[None], e: Exception): + if not future.done(): + future.set_exception(e) + + def _poll_loop(self) -> None: + try: + while not self._cancelled: + continuation = self._tonlib.event_loop_wait(self._loop, 60.0) + + if continuation == 0: + continue + + future = self._futures.pop(continuation, None) + if future is not None: + _ = self._py_event_loop.call_soon_threadsafe(self._set_resolved, future) + except Exception as e: + logger.error("EventLoop background task failed", exc_info=True) + self._background_error = e + for future in self._futures.values(): + _ = self._py_event_loop.call_soon_threadsafe(self._set_exception, future, e) + + def create_awaitable_future(self) -> tuple[int, asyncio.Future[None]]: + if self._background_error is not None: + future = self._py_event_loop.create_future() + future.set_exception(self._background_error) + return 0, future + + continuation_id = self._next_continuation_id + self._next_continuation_id += 1 + + future = self._py_event_loop.create_future() + future.add_done_callback(lambda _: self._futures.pop(continuation_id, None)) + self._futures[continuation_id] = future + + return continuation_id, future + + @property + def loop(self): + return self._loop diff --git a/test/tontester/src/tonlib/tonlib_cdll.py b/test/tontester/src/tonlib/tonlib_cdll.py new file mode 100644 index 000000000..1d79fb96a --- /dev/null +++ b/test/tontester/src/tonlib/tonlib_cdll.py @@ -0,0 +1,98 @@ +import ctypes +from pathlib import Path +from typing import Callable, cast, final + + +@final +class TonlibCDLL: + def __init__(self, cdll_path: Path): + tonlib = ctypes.CDLL(cdll_path) + + event_loop_create = tonlib.tonlib_event_loop_create + event_loop_create.restype = ctypes.c_void_p + event_loop_create.argtypes = [ctypes.c_int] + self.event_loop_create = cast(Callable[[int], int], event_loop_create) + + event_loop_destroy = tonlib.tonlib_event_loop_destroy + event_loop_destroy.restype = None + event_loop_destroy.argtypes = [ctypes.c_void_p] + self.event_loop_destroy = cast(Callable[[int], None], event_loop_destroy) + + event_loop_cancel = tonlib.tonlib_event_loop_cancel + event_loop_cancel.restype = None + event_loop_cancel.argtypes = [ctypes.c_void_p] + self.event_loop_cancel = cast(Callable[[int], None], event_loop_cancel) + + event_loop_wait = tonlib.tonlib_event_loop_wait + event_loop_wait.restype = ctypes.c_void_p + event_loop_wait.argtypes = [ctypes.c_void_p, ctypes.c_double] + self.event_loop_wait = cast(Callable[[int, float], int], event_loop_wait) + + engine_console_create = tonlib.tonlib_engine_console_create + engine_console_create.restype = ctypes.c_void_p + engine_console_create.argtypes = [ctypes.c_void_p, ctypes.c_char_p] + self.engine_console_create = cast(Callable[[int, bytes], int], engine_console_create) + + engine_console_destroy = tonlib.tonlib_engine_console_destroy + engine_console_destroy.restype = None + engine_console_destroy.argtypes = [ctypes.c_void_p] + self.engine_console_destroy = cast(Callable[[int], None], engine_console_destroy) + + engine_console_is_error = tonlib.tonlib_engine_console_is_error + engine_console_is_error.restype = ctypes.c_bool + engine_console_is_error.argtypes = [ctypes.c_void_p] + self.engine_console_is_error = cast(Callable[[int], bool], engine_console_is_error) + + engine_console_get_error_code = tonlib.tonlib_engine_console_get_error_code + engine_console_get_error_code.restype = ctypes.c_int + engine_console_get_error_code.argtypes = [ctypes.c_void_p] + self.engine_console_get_error_code = cast( + Callable[[int], int], engine_console_get_error_code + ) + + engine_console_get_error_message = tonlib.tonlib_engine_console_get_error_message + engine_console_get_error_message.restype = ctypes.c_char_p + engine_console_get_error_message.argtypes = [ctypes.c_void_p] + self.engine_console_get_error_message = cast( + Callable[[int], bytes], engine_console_get_error_message + ) + + engine_console_request = tonlib.tonlib_engine_console_request + engine_console_request.restype = ctypes.c_void_p + engine_console_request.argtypes = [ctypes.c_void_p, ctypes.c_char_p] + self.engine_console_request = cast(Callable[[int, bytes], int], engine_console_request) + + response_destroy = tonlib.tonlib_response_destroy + response_destroy.restype = None + response_destroy.argtypes = [ctypes.c_void_p] + self.response_destroy = cast(Callable[[int], None], response_destroy) + + response_await_ready = tonlib.tonlib_response_await_ready + response_await_ready.restype = ctypes.c_bool + response_await_ready.argtypes = [ctypes.c_void_p] + self.response_await_ready = cast(Callable[[int], bool], response_await_ready) + + response_await_suspend = tonlib.tonlib_response_await_suspend + response_await_suspend.restype = None + response_await_suspend.argtypes = [ctypes.c_void_p, ctypes.c_void_p] + self.response_await_suspend = cast(Callable[[int, int], None], response_await_suspend) + + response_is_error = tonlib.tonlib_response_is_error + response_is_error.restype = ctypes.c_bool + response_is_error.argtypes = [ctypes.c_void_p] + self.response_is_error = cast(Callable[[int], bool], response_is_error) + + response_get_error_code = tonlib.tonlib_response_get_error_code + response_get_error_code.restype = ctypes.c_int + response_get_error_code.argtypes = [ctypes.c_void_p] + self.response_get_error_code = cast(Callable[[int], int], response_get_error_code) + + response_get_error_message = tonlib.tonlib_response_get_error_message + response_get_error_message.restype = ctypes.c_char_p + response_get_error_message.argtypes = [ctypes.c_void_p] + self.response_get_error_message = cast(Callable[[int], bytes], response_get_error_message) + + response_get_response = tonlib.tonlib_response_get_response + response_get_response.restype = ctypes.c_char_p + response_get_response.argtypes = [ctypes.c_void_p] + self.response_get_response = cast(Callable[[int], bytes], response_get_response) diff --git a/test/tontester/src/tontester/network.py b/test/tontester/src/tontester/network.py index 8a4c049de..c2d712227 100644 --- a/test/tontester/src/tontester/network.py +++ b/test/tontester/src/tontester/network.py @@ -11,7 +11,7 @@ from typing import Literal, final, override from tl import TLObject -from tonlib import TonlibClient, TonlibError +from tonlib import EngineConsoleClient, TonlibCDLL, TonlibClient, TonlibError, TonlibEventLoop from .install import Install from .key import Key @@ -27,6 +27,10 @@ class _IPv4AddressAndPort: ip: IPv4Address port: int + @property + def address(self): + return f"{str(self.ip)}:{self.port}" + class _Status(IntEnum): INITED = auto() @@ -84,6 +88,14 @@ def _ensure_no_zerostate_yet(self): def _get_or_generate_zerostate(self): return self._network._get_or_generate_zerostate() + @property + def _tonlib(self): + return self._network._tonlib + + @property + def _tonlib_event_loop(self): + return self._network._event_loop + async def _run( self, executable: Path, @@ -188,13 +200,23 @@ async def stop(self): await self.__process_watcher await self.__log_streamer.aclose() - def __init__(self, install: Install, directory: Path): + def __init__( + self, + install: Install, + directory: Path, + event_loop: asyncio.AbstractEventLoop | None = None, + ): self._install = install self._directory = directory.absolute() self._port = 2000 self._node_idx = 0 self._status = _Status.INITED + self._tonlib = TonlibCDLL(install.tonlibjson) + self._event_loop = TonlibEventLoop( + self._tonlib, event_loop if event_loop is not None else asyncio.get_running_loop() + ) + self.__nodes: list[Network.Node] = [] self.__full_nodes: list[FullNode] = [] self.__network_config: NetworkConfig = NetworkConfig() @@ -245,6 +267,8 @@ async def aclose(self): for node in self.__nodes: await node.stop() + await self._event_loop.aclose() + async def __aenter__(self): return self @@ -354,10 +378,13 @@ def __init__(self, network: "Network", name: str): self._addr = self._new_network_address() self._liteserver_addr = self._new_network_address() + self._engine_console_addr = self._new_network_address() self._fullnode_key, _ = self._new_key() self._validator_key, _ = self._new_key() self._liteserver_key, _ = self._new_key() + self._engine_console_server_key, _ = self._new_key() + self._engine_console_client_key, _ = self._new_key() self._local_config = ton_api.Engine_validator_config( addrs=[ @@ -400,11 +427,25 @@ def __init__(self, network: "Network", name: str): port=self._liteserver_addr.port, ) ], + control=[ + ton_api.Engine_controlInterface( + id=self._engine_console_server_key.id(), + # FIXME: IP? + port=self._engine_console_addr.port, + allowed=[ + ton_api.Engine_controlProcess( + id=self._engine_console_client_key.id(), + permissions=15, + ) + ], + ) + ], ) self._is_initial_validator = False self._client: TonlibClient | None = None + self._engine_console: EngineConsoleClient | None = None def make_initial_validator(self): self._ensure_no_zerostate_yet() @@ -460,8 +501,24 @@ async def tonlib_client(self) -> TonlibClient: return self._client + @property + def engine_console(self) -> EngineConsoleClient: + if self._engine_console is None: + self._engine_console = EngineConsoleClient( + self._tonlib, + self._tonlib_event_loop, + ton_api.EngineConsoleClient_config( + address=self._engine_console_addr.address, + public_key=self._engine_console_server_key.public_key, + private_key=self._engine_console_client_key.private_key, + ), + ) + return self._engine_console + @override async def stop(self): if self._client: await self._client.aclose() + if self._engine_console: + await self._engine_console.aclose() await super().stop() diff --git a/tl/generate/scheme/ton_api.tl b/tl/generate/scheme/ton_api.tl index 09ae4fa04..3afdd6ac5 100644 --- a/tl/generate/scheme/ton_api.tl +++ b/tl/generate/scheme/ton_api.tl @@ -1157,3 +1157,4 @@ storage.daemon.removeStorageProvider = storage.daemon.Success; ---types--- proxyLiteserver.config port:int id:PublicKey = proxyLiteserver.Config; +engineConsoleClient.config address:string server_public_key:PublicKey client_private_key:PrivateKey = engineConsoleClient.Config; diff --git a/tonlib/CMakeLists.txt b/tonlib/CMakeLists.txt index 62aa76f7c..1b8bce85f 100644 --- a/tonlib/CMakeLists.txt +++ b/tonlib/CMakeLists.txt @@ -3,6 +3,7 @@ option(TONLIBJSON_STATIC "Build tonlibjson as static library" OFF) set(TONLIB_SOURCE tonlib/Client.cpp tonlib/Config.cpp + tonlib/EngineConsoleClient.cpp tonlib/ExtClient.cpp tonlib/ExtClientOutbound.cpp tonlib/KeyStorage.cpp @@ -17,6 +18,7 @@ set(TONLIB_SOURCE tonlib/Client.h tonlib/Config.h + tonlib/EngineConsoleClient.h tonlib/ExtClient.h tonlib/ExtClientOutbound.h tonlib/KeyStorage.h @@ -54,8 +56,8 @@ target_include_directories(tonlib PUBLIC $/.. $ ) -target_link_libraries(tonlib PRIVATE tdactor adnllite tl_lite_api tl-lite-utils ton_crypto lite-client-common smc-envelope emulator_static) -target_link_libraries(tonlib PUBLIC tdutils tl_tonlib_api) +target_link_libraries(tonlib PRIVATE adnllite tl_lite_api tl-lite-utils ton_crypto lite-client-common smc-envelope emulator_static) +target_link_libraries(tonlib PUBLIC tdactor tdutils tl_tonlib_api) if (TONLIB_ENABLE_JNI AND NOT ANDROID) # jni is available by default on Android if (NOT JNI_FOUND) @@ -88,7 +90,10 @@ target_link_libraries(tonlibjson_private_interface INTERFACE add_library(tonlibjson_objects OBJECT tonlib/ClientJson.cpp + tonlib/FFIEngineConsoleClient.cpp + tonlib/FFIEventLoop.cpp tonlib/tonlib_client_json.cpp + tonlib/tonlib_engine_console.cpp ) target_link_libraries(tonlibjson_objects PRIVATE tonlibjson_private_interface @@ -175,6 +180,7 @@ endif() set(TONLIB_JSON_HEADERS tonlib/tonlib_client_json.h + tonlib/tonlib_engine_console.h ) install(FILES ${TONLIB_JSON_HEADERS} ${CMAKE_CURRENT_BINARY_DIR}/tonlib/tonlibjson_export.h DESTINATION include/tonlib/) diff --git a/tonlib/tonlib/EngineConsoleClient.cpp b/tonlib/tonlib/EngineConsoleClient.cpp new file mode 100644 index 000000000..a10437358 --- /dev/null +++ b/tonlib/tonlib/EngineConsoleClient.cpp @@ -0,0 +1,85 @@ +#include "adnl/adnl-ext-client.h" +#include "auto/tl/ton_api.hpp" + +#include "EngineConsoleClient.h" + +namespace tonlib { + +bool is_engine_console_query(ton::tl_object_ptr const& function) { + switch (function->get_id()) { + case ton::ton_api::engine_validator_getActorTextStats::ID: + return true; + default: + return false; + } +} + +class EngineConsoleClientCallback : public ton::adnl::AdnlExtClient::Callback { + public: + EngineConsoleClientCallback(td::actor::ActorId id) : id_(std::move(id)) { + } + + void on_ready() override { + td::actor::send_closure(id_, &EngineConsoleClient::on_ready); + } + + void on_stop_ready() override { + td::actor::send_closure(id_, &EngineConsoleClient::on_stop_ready); + } + + private: + td::actor::ActorId id_; +}; + +EngineConsoleClient::EngineConsoleClient(td::IPAddress address, ton::PublicKey server_public_key, + ton::PrivateKey client_private_key) + : address_(address) + , server_public_key_(std::move(server_public_key)) + , client_private_key_(std::move(client_private_key)) { +} + +void EngineConsoleClient::on_ready() { + ready_ = true; + for (auto& promise : pending_ready_promises_) { + promise.set_value(td::Unit()); + } + pending_ready_promises_.clear(); +} + +void EngineConsoleClient::on_stop_ready() { + for (auto& promise : pending_ready_promises_) { + promise.set_error(td::Status::Error("Connection closed")); + } + pending_ready_promises_.clear(); + ready_ = false; + client_ = {}; +} + +td::actor::Task> EngineConsoleClient::query( + ton::tl_object_ptr object) { + if (!ready_) { + if (client_.empty()) { + client_ = + ton::adnl::AdnlExtClient::create(ton::adnl::AdnlNodeIdFull{server_public_key_}, client_private_key_, address_, + std::make_unique(actor_id(this))); + } + + auto [ready_awaiter, ready_promise] = td::actor::StartedTask::make_bridge(); + pending_ready_promises_.push_back(std::move(ready_promise)); + co_await std::move(ready_awaiter); + } + + auto query_bytes = ton::serialize_tl_object(object, true); + auto wrapped_query = ton::serialize_tl_object( + ton::create_tl_object(std::move(query_bytes)), true); + + auto [response_awaiter, response_promise] = td::actor::StartedTask::make_bridge(); + td::actor::send_closure(client_, &ton::adnl::AdnlExtClient::send_query, "query", std::move(wrapped_query), + td::Timestamp::in(10.0), std::move(response_promise)); + auto response = co_await std::move(response_awaiter); + + auto result_obj = co_await ton::fetch_tl_object(response, true); + co_return std::move(result_obj); +} + +} // namespace tonlib diff --git a/tonlib/tonlib/EngineConsoleClient.h b/tonlib/tonlib/EngineConsoleClient.h new file mode 100644 index 000000000..3ce2bc72f --- /dev/null +++ b/tonlib/tonlib/EngineConsoleClient.h @@ -0,0 +1,31 @@ +#pragma once + +#include "adnl/adnl-ext-client.h" +#include "keys/keys.hpp" +#include "td/actor/actor.h" +#include "td/actor/coro_task.h" +#include "td/utils/port/IPAddress.h" + +namespace tonlib { + +bool is_engine_console_query(ton::tl_object_ptr const& function); + +class EngineConsoleClient : public td::actor::Actor { + public: + EngineConsoleClient(td::IPAddress address, ton::PublicKey server_public_key, ton::PrivateKey client_private_key); + + void on_ready(); + void on_stop_ready(); + + td::actor::Task> query(ton::tl_object_ptr function); + + private: + td::IPAddress address_; + ton::PublicKey server_public_key_; + ton::PrivateKey client_private_key_; + td::actor::ActorOwn client_; + bool ready_ = false; + std::vector> pending_ready_promises_; +}; + +} // namespace tonlib diff --git a/tonlib/tonlib/FFIAwaitable.h b/tonlib/tonlib/FFIAwaitable.h new file mode 100644 index 000000000..6863f5363 --- /dev/null +++ b/tonlib/tonlib/FFIAwaitable.h @@ -0,0 +1,100 @@ +#pragma once + +#include + +#include "FFIEventLoop.h" + +namespace tonlib { + +template +class FFIAwaitable { + public: + static FFIAwaitable *create_resolved(FFIEventLoop &loop, td::Result value) { + std::shared_ptr awaitable{new FFIAwaitable(loop, resolved_tag, std::move(value))}; + awaitable->self_ = awaitable; + return awaitable.get(); + } + + template + struct Bridge { + FFIAwaitable *awaitable; + td::Promise promise; + }; + + template + static Bridge create_bridge(FFIEventLoop &loop, F &&transform) + requires requires(U &u) { + { transform(std::move(u)) } -> std::convertible_to; + } + { + std::shared_ptr awaitable{new FFIAwaitable(loop, nullptr, {})}; + awaitable->self_ = awaitable; + + td::Promise promise = + td::lambda_promise([awaitable, transform = std::forward(transform)](td::Result result) mutable { + if (result.is_error()) { + awaitable->result_ = result.move_as_error(); + } else { + awaitable->result_ = transform(result.move_as_ok()); + } + auto maybe_continuation = awaitable->continuation_.exchange(resolved_tag); + if (maybe_continuation != nullptr && maybe_continuation != resolved_tag) { + awaitable->loop_.put({maybe_continuation}); + } + }); + return Bridge{.awaitable = awaitable.get(), .promise = std::move(promise)}; + } + + FFIAwaitable(const FFIAwaitable &) = delete; + FFIAwaitable(FFIAwaitable &&) = delete; + FFIAwaitable &operator=(const FFIAwaitable &) = delete; + FFIAwaitable &operator=(FFIAwaitable &&) = delete; + + ~FFIAwaitable() { + CHECK(continuation_ == resolved_tag); + } + + std::shared_ptr destroy() { + CHECK(self_); + auto maybe_continuation = continuation_.exchange(resolved_tag); + if (maybe_continuation != nullptr && maybe_continuation != resolved_tag) { + loop_.put({maybe_continuation}); + } + return std::move(self_); + } + + bool await_ready() { + return continuation_ == resolved_tag; + } + + void await_suspend(Continuation continuation) { + void const *expected = nullptr; + if (!continuation_.compare_exchange_strong(expected, continuation.value)) { + CHECK(expected == resolved_tag); + loop_.put(continuation); + } + } + + td::Result &result() & { + CHECK(continuation_ == resolved_tag); + return result_; + } + + private: + static constexpr struct ResolvedTag { + } resolved_tag_obj; + static constexpr void const *resolved_tag = &resolved_tag_obj; + + FFIAwaitable(FFIEventLoop &loop, void const *continuation, td::Result value) + : loop_(loop), continuation_(continuation), result_(std::move(value)) { + } + + FFIEventLoop &loop_; + std::atomic continuation_; + + td::Result result_; + + std::shared_ptr self_; +}; + +} // namespace tonlib diff --git a/tonlib/tonlib/FFIEngineConsoleClient.cpp b/tonlib/tonlib/FFIEngineConsoleClient.cpp new file mode 100644 index 000000000..b9a4f732c --- /dev/null +++ b/tonlib/tonlib/FFIEngineConsoleClient.cpp @@ -0,0 +1,43 @@ +#include + +#include "FFIEngineConsoleClient.h" + +namespace tonlib { + +namespace { + +class ClientWrapper : public EngineConsoleClient { + public: + ClientWrapper(td::IPAddress address, ton::PublicKey server_public_key, ton::PrivateKey client_private_key, + td::unique_ptr actor_counter) + : EngineConsoleClient(address, server_public_key, client_private_key), actor_counter_(std::move(actor_counter)) { + } + + private: + td::unique_ptr actor_counter_; +}; + +} // namespace + +FFIEngineConsoleClient::FFIEngineConsoleClient(FFIEventLoop& loop, td::IPAddress address, ton::PublicKey public_key, + ton::PrivateKey private_key) + : loop_(loop) { + std::promise> wrapper_promise; + + loop_.run_in_context([&] { + wrapper_promise.set_value(td::actor::create_actor("EngineConsoleClient", address, public_key, + private_key, loop.new_actor())); + }); + + client_ = wrapper_promise.get_future().get(); +} + +void FFIEngineConsoleClient::request(ton::tl_object_ptr query, + td::Promise> promise) { + loop_.run_in_context( + [client = this->client_.get(), query = std::move(query), promise = std::move(promise)]() mutable { + td::actor::send_closure(client, &EngineConsoleClient::query, std::move(query), std::move(promise)); + }); +} + +} // namespace tonlib diff --git a/tonlib/tonlib/FFIEngineConsoleClient.h b/tonlib/tonlib/FFIEngineConsoleClient.h new file mode 100644 index 000000000..8445f947c --- /dev/null +++ b/tonlib/tonlib/FFIEngineConsoleClient.h @@ -0,0 +1,37 @@ +#pragma once + +#include "auto/tl/ton_api.h" +#include "keys/keys.hpp" +#include "td/utils/port/IPAddress.h" + +#include "EngineConsoleClient.h" +#include "FFIEventLoop.h" + +namespace tonlib { + +class FFIEngineConsoleClient { + public: + FFIEngineConsoleClient(FFIEventLoop& loop, td::IPAddress address, ton::PublicKey server_public_key, + ton::PrivateKey client_private_key); + + FFIEngineConsoleClient(FFIEngineConsoleClient&&) = default; + + ~FFIEngineConsoleClient() { + if (!client_.empty()) { + loop_.run_in_context([client = std::move(client_)]() mutable { client.reset(); }); + } + } + + void request(ton::tl_object_ptr query, + td::Promise> promise); + + FFIEventLoop& loop() { + return loop_; + } + + private: + FFIEventLoop& loop_; + td::actor::ActorOwn client_; +}; + +} // namespace tonlib diff --git a/tonlib/tonlib/FFIEventLoop.cpp b/tonlib/tonlib/FFIEventLoop.cpp new file mode 100644 index 000000000..48aea4273 --- /dev/null +++ b/tonlib/tonlib/FFIEventLoop.cpp @@ -0,0 +1,56 @@ +#include "FFIEventLoop.h" + +namespace tonlib { + +FFIEventLoop::FFIEventLoop(int threads) : scheduler_(td::actor::Scheduler({threads})) { + queue_.init(); + scheduler_thread_ = std::thread([&] { scheduler_.run(); }); +} + +FFIEventLoop::~FFIEventLoop() { + actor_counter_.wait_zero(); + scheduler_.run_in_context([] { td::actor::SchedulerContext::get()->stop(); }); + scheduler_thread_.join(); +} + +void FFIEventLoop::cancel() { + is_cancelled_ = true; + queue_.writer_put({cancel_tag}); +} + +std::optional FFIEventLoop::wait(double timeout) { + if (is_cancelled_) { + return std::nullopt; + } + if (queue_size_ == 0) { + queue_size_ = queue_.reader_wait_nonblock(); + } + if (queue_size_ == 0 && static_cast(timeout * 1000) != 0) { + if (timeout < 0) { + queue_.reader_get_event_fd().wait(-1); + } else { + queue_.reader_get_event_fd().wait(static_cast(timeout * 1000)); + } + queue_size_ = queue_.reader_wait_nonblock(); + } + if (queue_size_ == 0) { + return std::nullopt; + } + auto entry = queue_.reader_get_unsafe(); + --queue_size_; + if (entry.value == cancel_tag) { + CHECK(is_cancelled_); + return std::nullopt; + } + return entry; +} + +td::unique_ptr FFIEventLoop::new_actor() { + return actor_counter_.new_actor(); +} + +void FFIEventLoop::put(Continuation continuation) { + queue_.writer_put(continuation); +} + +} // namespace tonlib diff --git a/tonlib/tonlib/FFIEventLoop.h b/tonlib/tonlib/FFIEventLoop.h new file mode 100644 index 000000000..8981257a7 --- /dev/null +++ b/tonlib/tonlib/FFIEventLoop.h @@ -0,0 +1,76 @@ +#pragma once + +#include +#include +#include + +#include "td/actor/common.h" +#include "td/utils/ScopeGuard.h" + +namespace tonlib { + +class ActorCounter { + public: + ActorCounter() = default; + + td::unique_ptr new_actor() { + ++this->count_; + return td::create_lambda_guard([this] { + size_t new_value = --count_; + if (new_value == 0) { + std::lock_guard lk(m_); + cv_.notify_all(); + } + }); + } + + void wait_zero() { + if (count_ == 0) { + return; + } + std::unique_lock lk(m_); + cv_.wait(lk, [&] { return count_ == 0; }); + } + + private: + std::atomic count_ = 0; + std::mutex m_; + std::condition_variable cv_; +}; + +struct Continuation { + void const* value; +}; + +class FFIEventLoop { + public: + FFIEventLoop(int threads); + ~FFIEventLoop(); + + void cancel(); + std::optional wait(double timeout); + td::unique_ptr new_actor(); + + void put(Continuation continuation); + + template + void run_in_context(Func&& func) { + scheduler_.run_in_context(std::forward(func)); + } + + private: + static constexpr struct CancelTag { + } cancel_tag_struct; + static constexpr void const* cancel_tag = &cancel_tag_struct; + + td::actor::Scheduler scheduler_; + std::thread scheduler_thread_{}; + + ActorCounter actor_counter_{}; + std::atomic is_cancelled_ = false; + + td::MpscPollableQueue queue_{}; + size_t queue_size_ = 0; +}; + +} // namespace tonlib diff --git a/tonlib/tonlib/tonlib_engine_console.cpp b/tonlib/tonlib/tonlib_engine_console.cpp new file mode 100644 index 000000000..6f3751d71 --- /dev/null +++ b/tonlib/tonlib/tonlib_engine_console.cpp @@ -0,0 +1,151 @@ +#include "auto/tl/ton_api_json.h" +#include "tl-utils/tl-utils.hpp" +#include "tl/tl_json.h" + +#include "FFIAwaitable.h" +#include "FFIEngineConsoleClient.h" +#include "FFIEventLoop.h" +#include "tonlib_engine_console.h" + +// ===== Event loop ===== +TonlibEventLoop *tonlib_event_loop_create(int threads) { + return new tonlib::FFIEventLoop{threads}; +} + +void tonlib_event_loop_destroy(TonlibEventLoop *loop) { + delete loop; +} + +void tonlib_event_loop_cancel(TonlibEventLoop *loop) { + loop->cancel(); +} + +void const *tonlib_event_loop_wait(TonlibEventLoop *loop, double timeout) { + auto result = loop->wait(timeout); + if (!result.has_value()) { + return nullptr; + } + return result->value; +} + +// ===== Response ===== +void tonlib_response_destroy(TonlibResponse *response) { + response->destroy(); +} + +bool tonlib_response_await_ready(TonlibResponse *response) { + return response->await_ready(); +} + +void tonlib_response_await_suspend(TonlibResponse *response, void const *continuation) { + response->await_suspend({continuation}); +} + +bool tonlib_response_is_error(TonlibResponse *response) { + return response->result().is_error(); +} + +int tonlib_response_get_error_code(TonlibResponse *response) { + return response->result().error().code(); +} + +char const *tonlib_response_get_error_message(TonlibResponse *response) { + return response->result().error().message().data(); +} + +char const *tonlib_response_get_response(TonlibResponse *response) { + return response->result().ok().data(); +} + +// ===== Engine Console ===== +namespace { + +td::Result create_ffi_client(TonlibEventLoop *loop, char const *config) { + std::string config_str = config; + TRY_RESULT(json, td::json_decode(config_str)); + if (json.type() != td::JsonValue::Type::Object) { + return td::Status::Error("Config must be a JSON object"); + } + + ton::ton_api::engineConsoleClient_config parsed_config; + TRY_STATUS(from_json(parsed_config, json.get_object())); + + td::IPAddress parsed_address; + TRY_STATUS(parsed_address.init_host_port(parsed_config.address_)); + + if (!parsed_config.server_public_key_) { + return td::Status::Error("server_public_key is required in config"); + } + auto server_public_key_slice = ton::serialize_tl_object(parsed_config.server_public_key_.get(), true); + TRY_RESULT(parsed_server_public_key, ton::PublicKey::import(server_public_key_slice)); + + if (!parsed_config.client_private_key_) { + return td::Status::Error("client_private_key is required in config"); + } + auto client_private_key_slice = ton::serialize_tl_object(parsed_config.client_private_key_.get(), true); + TRY_RESULT(parsed_client_private_key, ton::PrivateKey::import(client_private_key_slice)); + + return tonlib::FFIEngineConsoleClient{*loop, parsed_address, parsed_server_public_key, parsed_client_private_key}; +} + +td::Result> parse_query(char const *query) { + std::string query_str = query; + TRY_RESULT(json, td::json_decode(query_str)); + if (json.type() != td::JsonValue::Type::Object) { + return td::Status::Error("Query must be a JSON object"); + } + + ton::tl_object_ptr parsed_query; + TRY_STATUS(from_json(parsed_query, std::move(json))); + + if (!tonlib::is_engine_console_query(parsed_query)) { + return td::Status::Error("Query is not an engine console query"); + } + + return parsed_query; +} + +} // namespace + +struct TonlibEngineConsole { + td::Result client; +}; + +TonlibEngineConsole *tonlib_engine_console_create(TonlibEventLoop *loop, char const *config) { + return new TonlibEngineConsole{create_ffi_client(loop, config)}; +} + +void tonlib_engine_console_destroy(TonlibEngineConsole *console) { + delete console; +} + +bool tonlib_engine_console_is_error(TonlibEngineConsole *console) { + return console->client.is_error(); +} + +int tonlib_engine_console_get_error_code(TonlibEngineConsole *console) { + return console->client.error().code(); +} + +char const *tonlib_engine_console_get_error_message(TonlibEngineConsole *console) { + return console->client.error().message().data(); +} + +TonlibResponse *tonlib_engine_console_request(TonlibEngineConsole *console, char const *query) { + auto &client = console->client.ok_ref(); + + auto query_or_sync_error = parse_query(query); + + if (query_or_sync_error.is_error()) { + return TonlibResponse::create_resolved(client.loop(), query_or_sync_error.move_as_error()); + } + + auto transform = [](ton::tl_object_ptr object) -> std::string { + return td::json_encode(td::ToJson(object)); + }; + + auto [response, promise] = + TonlibResponse::create_bridge>(client.loop(), transform); + client.request(query_or_sync_error.move_as_ok(), std::move(promise)); + return response; +} diff --git a/tonlib/tonlib/tonlib_engine_console.h b/tonlib/tonlib/tonlib_engine_console.h new file mode 100644 index 000000000..69cb5e2f7 --- /dev/null +++ b/tonlib/tonlib/tonlib_engine_console.h @@ -0,0 +1,141 @@ +#pragma once + +#include +#include + +#include "tonlib/tonlibjson_export.h" + +#ifdef __cplusplus +#include + +namespace tonlib { + +class FFIEventLoop; +template +class FFIAwaitable; + +} // namespace tonlib + +using TonlibEventLoop = tonlib::FFIEventLoop; +using TonlibResponse = tonlib::FFIAwaitable; +#else +typedef struct TonlibEventLoop TonlibEventLoop; +typedef struct TonlibResponse TonlibResponse; +#endif + +typedef struct TonlibEngineConsole TonlibEngineConsole; + +#ifdef __cplusplus +extern "C" { +#endif + +// ===== Event Loop ===== +// An interaction with the engine console client starts by creating an TonlibEventLoop object that +// allows foreign caller to wait for asynchronous event completion in a mostly non-blocking +// manner. In particular, after suspending TonlibResponse, tonlib_event_loop_wait will return when +// the awaitable TonlibResponse is resolved. +// +// tonlib_event_loop_wait does nothing for `timeout` seconds if there is nothing to process, so a +// typical interaction flow goes as follows: +// +// 1. Create an event loop before first library usage. +// 2. Spawn a background thread that continuously polls tonlib_event_loop_wait and resumes +// continuations of resolved responses. +// 3. Call asynchronous functions, providing a continuation that the background thread knows how to +// resume. +// +// There must be a total happens-before order in which tonlib functions (except +// `tonlib_event_loop_wait`) bound to a single event loop are called (i. e. the event loop is +// thread-aware but not thread-safe). `tonlib_event_loop_wait` calls must be happens-before ordered +// with respect to each other as well but are not required to be ordered with the respect to other +// functions. However, last call to `tonlib_event_loop_wait` must happen before +// `tonlib_event_loop_destroy` call. To facilitate this, `tonlib_event_loop_cancel` can be used to +// cancel wait without destroying the loop. + +// Creates a new event loop instance. Never fails. +TONLIBJSON_EXPORT TonlibEventLoop *tonlib_event_loop_create(int threads); + +// Destroys the event loop. +// +// Non-destroyed instances of engine console client will deadlock the function. (Calling +// `tonlib_engine_console_destroy` during `tonlib_event_loop_destroy` is UB as it violates the +// global happens-before ordering requirement.) +TONLIBJSON_EXPORT void tonlib_event_loop_destroy(TonlibEventLoop *loop); + +// Puts event loop into the cancelled state. +TONLIBJSON_EXPORT void tonlib_event_loop_cancel(TonlibEventLoop *loop); + +// Waits for the next event for `timeout` seconds. If no event happens within the timeout, returns +// nullptr. If the event loop is cancelled on function entry, returns immediately with nullptr. If +// the event loop is cancelled during wait, the function eventually (as soon as scheduled) returns +// nullptr as well. timeout=-1.0 is no timeout. +TONLIBJSON_EXPORT void const *tonlib_event_loop_wait(TonlibEventLoop *loop, double timeout); + +// ===== Response ===== +// TonlibResponse is an awaitable that will resolve with the response of the connected validator +// engine. It can be obtained from `tonlib_engine_console_request`. + +// Destroys the response. If `await_suspend` was called on the response and response is not yet +// resolved, continuation will arrive as soon as scheduled. +TONLIBJSON_EXPORT void tonlib_response_destroy(TonlibResponse *response); + +// Returns true if the response is resolved. You can use this immediately after creation to check if +// synchronous result is available. +TONLIBJSON_EXPORT bool tonlib_response_await_ready(TonlibResponse *response); + +// Records continuation that will be returned by `tonlib_event_loop_wait` when response is resolved. +// `tonlib_event_loop_wait` will not return because of this response until this function is called. +// +// Can only be called one time on a particular response instance. Can be called on a resolved +// instance as well, in which case the continuation will be returned as soon as scheduled (this is +// allowed as `await_ready` + `await_suspend` sequence is obviously not atomic). +TONLIBJSON_EXPORT void tonlib_response_await_suspend(TonlibResponse *response, void const *continuation); + +// Returns true if the response is an error. Can only be called on resolved response. Only errors +// produced locally will be reported here; errors returned by the remote side are returned using a +// "success" path as an `engine.validator.controlQueryError` object. +TONLIBJSON_EXPORT bool tonlib_response_is_error(TonlibResponse *response); + +// Returns the error code. Can only be called on resolved error response. +TONLIBJSON_EXPORT int tonlib_response_get_error_code(TonlibResponse *response); + +// Returns the error message. Can only be called on resolved error response. +TONLIBJSON_EXPORT char const *tonlib_response_get_error_message(TonlibResponse *response); + +// Returns the JSON-encoded remote TL response. Can only be called on resolved response. Might be +// either a successful response with type determined by the TL scheme or an +// `engine.validator.controlQueryError` object if remote has encountered an error. +TONLIBJSON_EXPORT char const *tonlib_response_get_response(TonlibResponse *response); + +// ===== Engine Console ===== +// TonlibEngineConsole represents an instance of the engine console client. It allows sending +// control queries to the connected validator engine. + +// Creates a new engine console client instance. +// +// `config` should be a JSON-encoded `engineConsoleClient.config` object. If creation of the +// instance fails, the error can be obtained from `tonlib_engine_console_is_error` and related +// functions. +TONLIBJSON_EXPORT TonlibEngineConsole *tonlib_engine_console_create(TonlibEventLoop *loop, char const *config); + +// Destroys the engine console client instance. Error instances must be destroyed as well. +TONLIBJSON_EXPORT void tonlib_engine_console_destroy(TonlibEngineConsole *console); + +// Returns true if the engine console instance did not initialize properly. +TONLIBJSON_EXPORT bool tonlib_engine_console_is_error(TonlibEngineConsole *console); + +// Returns the error code. Can only be called if `tonlib_engine_console_is_error` returned true. +TONLIBJSON_EXPORT int tonlib_engine_console_get_error_code(TonlibEngineConsole *console); + +// Returns the error message. Can only be called if `tonlib_engine_console_is_error` returned true. +TONLIBJSON_EXPORT char const *tonlib_engine_console_get_error_message(TonlibEngineConsole *console); + +// Sends a control query to the connected validator engine. Can only be called if +// `tonlib_engine_console_is_error` returned false. +// +// `query` must be a JSON-encoded control query object. +TONLIBJSON_EXPORT TonlibResponse *tonlib_engine_console_request(TonlibEngineConsole *console, char const *query); + +#ifdef __cplusplus +} // extern "C" +#endif diff --git a/tonlib/tonlibclientjson_export_list b/tonlib/tonlibclientjson_export_list index 32735d33a..e495fadc3 100644 --- a/tonlib/tonlibclientjson_export_list +++ b/tonlib/tonlibclientjson_export_list @@ -4,3 +4,20 @@ _tonlib_client_json_send _tonlib_client_json_receive _tonlib_client_json_execute _tonlib_client_set_verbosity_level +_tonlib_event_loop_create +_tonlib_event_loop_destroy +_tonlib_event_loop_cancel +_tonlib_event_loop_wait +_tonlib_response_destroy +_tonlib_response_await_ready +_tonlib_response_await_suspend +_tonlib_response_is_error +_tonlib_response_get_error_code +_tonlib_response_get_error_message +_tonlib_response_get_response +_tonlib_engine_console_create +_tonlib_engine_console_destroy +_tonlib_engine_console_is_error +_tonlib_engine_console_get_error_code +_tonlib_engine_console_get_error_message +_tonlib_engine_console_request