Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/linter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,10 @@ jobs:
sudo ./scripts/download_install_dependencies.sh capnp install

- name: Build and test C++ Components

# Ignore man-in-the-middle tests in CI for now
run: |
CXX=$(which g++-14) ./scripts/build.sh
CXX=$(which g++-14) GTEST_FILTER="-*Mitm*" ./scripts/build.sh

- name: Install Python Dependent Packages
run: |
Expand Down
21 changes: 21 additions & 0 deletions scaler/io/ymq/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,27 @@ if(LINUX)

set_target_properties(ymq PROPERTIES PREFIX "")
set_target_properties(ymq PROPERTIES LINKER_LANGUAGE CXX)
find_package(Python3 COMPONENTS Development.Module REQUIRED)

target_sources(ymq PRIVATE pymod_ymq/async.h
pymod_ymq/bytes.h
pymod_ymq/exception.h
pymod_ymq/message.h
pymod_ymq/io_context.h
pymod_ymq/io_socket.h
pymod_ymq/utils.h
pymod_ymq/ymq.h
pymod_ymq/ymq.cpp
)
target_include_directories(ymq PRIVATE ${Python3_INCLUDE_DIRS})
target_link_libraries(ymq PRIVATE cc_ymq
PRIVATE ${Python3_LIBRARIES}
)

target_link_options(ymq PRIVATE "-Wl,-rpath,$ORIGIN")

install(TARGETS ymq
LIBRARY DESTINATION scaler/io/ymq)

target_sources(ymq PRIVATE pymod_ymq/async.h
pymod_ymq/bytes.h
Expand Down
16 changes: 9 additions & 7 deletions scaler/io/ymq/bytes.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <cstring>

// C++
#include <optional>
#include <string>

// First-party
Expand All @@ -21,7 +22,7 @@ class Bytes {

void free()
{
if (is_empty())
if (is_null())
return;
delete[] _data;
_data = nullptr;
Expand All @@ -32,6 +33,8 @@ class Bytes {
public:
Bytes(char* data, size_t len): _data(datadup((uint8_t*)data, len)), _len(len) {}

Bytes(std::string s): _data(datadup((uint8_t*)s.data(), s.length())), _len(s.length()) {}

Bytes(): _data {}, _len {} {}

Bytes(const Bytes& other) noexcept
Expand Down Expand Up @@ -81,15 +84,14 @@ class Bytes {

~Bytes() { this->free(); }

[[nodiscard]] constexpr bool operator!() const noexcept { return is_empty(); }
[[nodiscard]] constexpr bool operator!() const noexcept { return is_null(); }

[[nodiscard]] constexpr bool is_empty() const noexcept { return !this->_data; }
[[nodiscard]] constexpr bool is_null() const noexcept { return !this->_data; }

// debugging utility
std::string as_string() const
std::optional<std::string> as_string() const
{
if (is_empty())
return "[EMPTY]";
if (is_null())
return std::nullopt;

return std::string((char*)_data, _len);
}
Expand Down
17 changes: 1 addition & 16 deletions scaler/io/ymq/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
// C++
#include <cstdlib>
#include <cstring>
#include <format>
#include <iostream>
#include <source_location>
#include <string>
Expand All @@ -33,22 +34,6 @@ inline void print_trace(void)
#endif // __linux__
}

// this is an unrecoverable error that exits the program
// prints a message plus the source location
[[noreturn]] inline void panic(
std::string message, const std::source_location& location = std::source_location::current())
{
auto file_name = std::string(location.file_name());
file_name = file_name.substr(file_name.find_last_of("/") + 1);

std::cout << "panic at " << file_name << ":" << location.line() << ":" << location.column() << " in function ["
<< location.function_name() << "]: " << message << std::endl;

print_trace();

std::abort();
}

[[nodiscard("Memory is allocated but not used, likely causing a memory leak")]]
inline uint8_t* datadup(const uint8_t* data, size_t len) noexcept
{
Expand Down
18 changes: 11 additions & 7 deletions scaler/io/ymq/epoll_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,17 @@ void EpollContext::loop()
const int myErrno = errno;
switch (myErrno) {
case EINTR:
unrecoverableError({
Error::ErrorCode::SignalNotSupported,
"Originated from",
"epoll_wait(2)",
"Errno is",
strerror(errno),
});
// unrecoverableError({
// Error::ErrorCode::SignalNotSupported,
// "Originated from",
// "epoll_wait(2)",
// "Errno is",
// strerror(errno),
// });

// todo: investigate better error handling
// the epoll thread is not expected to receive signals(?)
// but occasionally does (e.g. sigwinch) and we shouldn't stop the thread in that case
break;
case EBADF:
case EFAULT:
Expand Down
2 changes: 1 addition & 1 deletion scaler/io/ymq/examples/automated_echo_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ int main()
auto future = x.get_future();
Message msg = future.get().first;
if (msg.payload.as_string() != longStr) {
printf("Checksum failed, %s\n", msg.payload.as_string().c_str());
printf("Checksum failed, %s\n", msg.payload.as_string()->c_str());
exit(1);
}
}
Expand Down
37 changes: 37 additions & 0 deletions scaler/io/ymq/examples/common.h
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
#pragma once

#include <unistd.h>

#include <expected>
#include <future>
#include <memory>
#include <optional>

#include "scaler/io/ymq/bytes.h"
#include "scaler/io/ymq/error.h"
#include "scaler/io/ymq/io_context.h"
#include "scaler/io/ymq/io_socket.h"
#include "scaler/io/ymq/message.h"

// We should not be using namespace in header file, but this is example, so we are good
using namespace scaler::ymq;
Expand Down Expand Up @@ -39,3 +45,34 @@ inline void syncConnectSocket(std::shared_ptr<IOSocket> socket, std::string addr

connect_future.wait();
}

inline std::expected<Message, Error> syncRecvMessage(std::shared_ptr<IOSocket> socket)
{
auto promise = std::promise<std::pair<Message, Error>>();
auto future = promise.get_future();

socket->recvMessage([&promise](auto result) { promise.set_value(result); });

auto result = future.get();

if (result.second._errorCode == Error::ErrorCode::Uninit) {
return result.first;
} else {
return std::unexpected {result.second};
}
}

inline std::optional<Error> syncSendMessage(std::shared_ptr<IOSocket> socket, Message message)
{
auto promise = std::promise<std::expected<void, Error>>();
auto future = promise.get_future();

socket->sendMessage(message, [&promise](auto result) { promise.set_value(result); });

auto result = future.get();

if (result)
return std::nullopt;
else
return result.error();
}
4 changes: 2 additions & 2 deletions scaler/io/ymq/message_connection_tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,10 +249,10 @@ void MessageConnectionTCP::updateReadOperation()
Bytes address(_remoteIOSocketIdentity->data(), _remoteIOSocketIdentity->size());
Bytes payload(std::move(_receivedReadOperations.front()._payload));
_receivedReadOperations.pop();

auto recvMessageCallback = std::move(_pendingRecvMessageCallbacks->front());
_pendingRecvMessageCallbacks->pop();

recvMessageCallback({Message(std::move(address), std::move(payload)), {}});

} else {
Expand Down
65 changes: 31 additions & 34 deletions scaler/io/ymq/pymod_ymq/async.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,58 +12,47 @@
#include "scaler/io/ymq/pymod_ymq/ymq.h"

// wraps an async callback that accepts a Python asyncio future
static PyObject* async_wrapper(PyObject* self, const std::function<void(YMQState* state, PyObject* future)>& callback)
static PyObject* async_wrapper(PyObject* self, const std::function<void(YMQState* state, PyObject* future)>&& callback)
{
// replace with PyType_GetModuleByDef(Py_TYPE(self), &ymq_module) in a newer Python version
// https://docs.python.org/3/c-api/type.html#c.PyType_GetModuleByDef
PyObject* pyModule = PyType_GetModule(Py_TYPE(self));
if (!pyModule) {
PyErr_SetString(PyExc_RuntimeError, "Failed to get module for Message type");
auto state = YMQStateFromSelf(self);
if (!state)
return nullptr;
}

auto state = (YMQState*)PyModule_GetState(pyModule);
if (!state) {
PyErr_SetString(PyExc_RuntimeError, "Failed to get module state");
return nullptr;
}

PyObject* loop = PyObject_CallMethod(state->asyncioModule, "get_event_loop", nullptr);

if (!loop) {
PyErr_SetString(PyExc_RuntimeError, "Failed to get event loop");
OwnedPyObject loop = PyObject_CallMethod(*state->asyncioModule, "get_event_loop", nullptr);
if (!loop)
return nullptr;
}

PyObject* future = PyObject_CallMethod(loop, "create_future", nullptr);

if (!future) {
PyErr_SetString(PyExc_RuntimeError, "Failed to create future");
OwnedPyObject future = PyObject_CallMethod(*loop, "create_future", nullptr);
if (!future)
return nullptr;
}

// borrow the future, we'll decref this after the C++ thread is done
Py_INCREF(future);
// create the awaitable before calling the callback
// this ensures that we create a new strong reference to the future before the callback decrefs it
auto awaitable = PyObject_CallFunction(*state->PyAwaitableType, "O", *future);

// async
callback(state, future);
// we transfer ownership of the future to the callback
// TODO: investigate having the callback take an OwnedPyObject, and just std::move()
callback(state, future.take());

return PyObject_CallFunction(state->PyAwaitableType, "O", future);
return awaitable;
}

struct Awaitable {
PyObject_HEAD;
PyObject* future;
OwnedPyObject<> future;
};

extern "C" {

static int Awaitable_init(Awaitable* self, PyObject* args, PyObject* kwds)
{
if (!PyArg_ParseTuple(args, "O", &self->future)) {
PyErr_SetString(PyExc_RuntimeError, "Failed to parse arguments for Iterable");
PyObject* future = nullptr;
if (!PyArg_ParseTuple(args, "O", &future))
return -1;
}

new (&self->future) OwnedPyObject<>();
self->future = OwnedPyObject<>::fromBorrowed(future);

return 0;
}
Expand All @@ -72,13 +61,21 @@ static PyObject* Awaitable_await(Awaitable* self)
{
// Easy: coroutines are just iterators and we don't need anything fancy
// so we can just return the future's iterator!
return PyObject_GetIter(self->future);
return PyObject_GetIter(*self->future);
}

static void Awaitable_dealloc(Awaitable* self)
{
Py_DECREF(self->future);
Py_TYPE(self)->tp_free((PyObject*)self);
try {
self->future.~OwnedPyObject();
} catch (...) {
PyErr_SetString(PyExc_RuntimeError, "Failed to deallocate Awaitable");
PyErr_WriteUnraisable((PyObject*)self);
}

auto* tp = Py_TYPE(self);
tp->tp_free(self);
Py_DECREF(tp);
}
}

Expand Down
Loading