Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/actions/compile-library/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ runs:
if: inputs.os == 'Linux'
shell: bash
run: |
CXX=$(which g++-14) ./scripts/build.sh
CXX=$(which g++-14) GTEST_FILTER="-*Mitm*:*TestIncompleteIdentity*" ./scripts/build.sh

- name: Build and test C++ Components (Windows)
if: inputs.os == 'Windows'
Expand Down
10 changes: 2 additions & 8 deletions examples/task_capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,12 @@ def main():

# Submit a task that requires GPU capabilities, this will be redirected to the GPU worker.
gpu_future = client.submit_verbose(
gpu_task,
args=(16.0,),
kwargs={},
capabilities={"gpu": 1} # Requires a GPU capability
gpu_task, args=(16.0,), kwargs={}, capabilities={"gpu": 1} # Requires a GPU capability
)

# Submit a task that does not require GPU capabilities, this will be routed to any available worker.
cpu_future = client.submit_verbose(
cpu_task,
args=(16.0,),
kwargs={},
capabilities={} # No GPU capability required
cpu_task, args=(16.0,), kwargs={}, capabilities={} # No GPU capability required
)

# Waits for the tasks for finish
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ dev = [
"flake8>=7.3.0",
"flake8-pyproject>=1.2.3",
"mypy>=1.17.1",
"scapy==2.*",
]

[tool.scikit-build.metadata.version]
Expand Down
11 changes: 5 additions & 6 deletions scaler/io/ymq/bytes.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <cstring>

// C++
#include <optional>
#include <string>

// First-party
Expand All @@ -33,10 +34,9 @@ class Bytes {
public:
Bytes(char* data, size_t len): _data(datadup((uint8_t*)data, len)), _len(len) {}

Bytes(): _data {}, _len {} {}
Bytes(const std::string& s): _data(datadup((uint8_t*)s.data(), s.length())), _len(s.length()) {}

// For debug and convenience only
explicit Bytes(const std::string& str): Bytes((char*)str.c_str(), str.size()) {}
Bytes(): _data {}, _len {} {}

Bytes(const Bytes& other) noexcept
{
Expand Down Expand Up @@ -92,11 +92,10 @@ class Bytes {

[[nodiscard]] constexpr bool is_null() const noexcept { return !this->_data; }

// debugging utility
std::string as_string() const
std::optional<std::string> as_string() const
{
if (is_null())
return "[EMPTY]";
return std::nullopt;

return std::string((char*)_data, _len);
}
Expand Down
2 changes: 1 addition & 1 deletion scaler/io/ymq/examples/automated_echo_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ int main()
auto future = x.get_future();
Message msg = future.get().first;
if (msg.payload.as_string() != longStr) {
printf("Checksum failed, %s\n", msg.payload.as_string().c_str());
printf("Checksum failed, %s\n", msg.payload.as_string()->c_str());
exit(1);
}
}
Expand Down
1 change: 0 additions & 1 deletion scaler/io/ymq/message_connection_tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,6 @@ void MessageConnectionTCP::updateReadOperation()
_pendingRecvMessageCallbacks->pop();

recvMessageCallback({Message(std::move(address), std::move(payload)), {}});

} else {
assert(_pendingRecvMessageCallbacks->size());
break;
Expand Down
3 changes: 1 addition & 2 deletions scaler/io/ymq/pymod_ymq/async.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
#include "scaler/io/ymq/pymod_ymq/ymq.h"

// wraps an async callback that accepts a Python asyncio future
static PyObject* async_wrapper(PyObject* self, const std::function<void(YMQState* state, PyObject* future)>& callback)
static PyObject* async_wrapper(PyObject* self, const std::function<void(YMQState* state, PyObject* future)>&& callback)
{
auto state = YMQStateFromSelf(self);
if (!state)
Expand All @@ -25,7 +25,6 @@ static PyObject* async_wrapper(PyObject* self, const std::function<void(YMQState
}

OwnedPyObject future = PyObject_CallMethod(*loop, "create_future", nullptr);

if (!future) {
PyErr_SetString(PyExc_RuntimeError, "Failed to create future");
return nullptr;
Expand Down
2 changes: 1 addition & 1 deletion scaler/io/ymq/pymod_ymq/exception.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
#include <functional>

// First-party
#include "scaler/io/ymq/pymod_ymq/ymq.h"
#include "scaler/io/ymq/pymod_ymq/utils.h"
#include "scaler/io/ymq/pymod_ymq/ymq.h"

// the order of the members in the exception args tuple
const Py_ssize_t YMQException_errorCodeIndex = 0;
Expand Down
4 changes: 2 additions & 2 deletions scaler/io/ymq/pymod_ymq/io_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ static PyObject* PyIOContext_createIOSocket_(
using Identity = Configuration::IOSocketIdentity;

// note: references borrowed from args, so no need to manage their lifetime
PyObject* pyIdentity {};
PyObject* pySocketType {};
PyObject* pyIdentity = nullptr;
PyObject* pySocketType = nullptr;
if (nargs == 1) {
pyIdentity = args[0];
} else if (nargs == 2) {
Expand Down
10 changes: 1 addition & 9 deletions scaler/io/ymq/pymod_ymq/io_socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,6 @@ static PyObject* PyIOSocket_recv(PyIOSocket* self, PyObject* args)
if (!pyMessage)
return YMQ_GetRaisedException();

// TODO: why is leaking necessary?
address.forget();
payload.forget();

return (PyObject*)pyMessage.take();
});
} catch (...) {
Expand Down Expand Up @@ -221,10 +217,6 @@ static PyObject* PyIOSocket_recv_sync(PyIOSocket* self, PyObject* args)
if (!pyMessage)
return nullptr;

// TODO: why is leaking necessary?
address.forget();
payload.forget();

return (PyObject*)pyMessage.take();
}

Expand Down Expand Up @@ -378,7 +370,7 @@ static PyObject* PyIOSocket_socket_type_getter(PyIOSocket* self, void* closure)
if (!state)
return nullptr;

const IOSocketType socketType = self->socket->socketType();
const IOSocketType socketType = self->socket->socketType();
OwnedPyObject socketTypeIntObj = PyLong_FromLong((long)socketType);

if (!socketTypeIntObj)
Expand Down
2 changes: 1 addition & 1 deletion scaler/io/ymq/pymod_ymq/ymq.h
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ static int YMQ_createErrorCodeEnum(PyObject* pyModule, YMQState* state)
// docs and examples are unfortunately scarce for this
// for now this will work just fine
OwnedPyObject item {};
while (item = PyIter_Next(*iter)) {
while ((item = PyIter_Next(*iter))) {
OwnedPyObject fn = PyCMethod_New(&YMQErrorCode_explanation_def, *item, pyModule, nullptr);
if (!fn)
return -1;
Expand Down
28 changes: 18 additions & 10 deletions scaler/io/ymq/simple_interface.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@

#include "scaler/io/ymq/simple_interface.h"

#include <optional>

namespace scaler {
namespace ymq {

Expand Down Expand Up @@ -35,35 +37,41 @@ void syncConnectSocket(std::shared_ptr<IOSocket> socket, std::string address)
connect_future.wait();
}

std::pair<Message, Error> syncRecvMessage(std::shared_ptr<IOSocket> socket)
std::expected<Message, Error> syncRecvMessage(std::shared_ptr<IOSocket> socket)
{
auto fut = futureRecvMessage(std::move(socket));
return fut.get();
}

std::expected<void, Error> syncSendMessage(std::shared_ptr<IOSocket> socket, Message message)
std::optional<Error> syncSendMessage(std::shared_ptr<IOSocket> socket, Message message)
{
auto fut = futureSendMessage(std::move(socket), std::move(message));
return fut.get();
}

std::future<std::pair<Message, Error>> futureRecvMessage(std::shared_ptr<IOSocket> socket)
std::future<std::expected<Message, Error>> futureRecvMessage(std::shared_ptr<IOSocket> socket)
{
auto recv_promise_ptr = std::make_unique<std::promise<std::pair<Message, Error>>>();
auto recv_promise_ptr = std::make_unique<std::promise<std::expected<Message, Error>>>();
auto recv_future = recv_promise_ptr->get_future();
socket->recvMessage([recv_promise = std::move(recv_promise_ptr)](std::pair<Message, Error> msg) {
recv_promise->set_value(std::move(msg));
socket->recvMessage([recv_promise = std::move(recv_promise_ptr)](std::pair<Message, Error> result) {
if (result.second._errorCode == Error::ErrorCode::Uninit)
recv_promise->set_value(std::move(result.first));
else
recv_promise->set_value(std::unexpected {std::move(result.second)});
});
return recv_future;
}

std::future<std::expected<void, Error>> futureSendMessage(std::shared_ptr<IOSocket> socket, Message message)
std::future<std::optional<Error>> futureSendMessage(std::shared_ptr<IOSocket> socket, Message message)
{
auto send_promise_ptr = std::make_unique<std::promise<std::expected<void, Error>>>();
auto send_promise_ptr = std::make_unique<std::promise<std::optional<Error>>>();
auto send_future = send_promise_ptr->get_future();
socket->sendMessage(
std::move(message), [send_promise = std::move(send_promise_ptr)](std::expected<void, Error> msg) {
send_promise->set_value(std::move(msg));
std::move(message), [send_promise = std::move(send_promise_ptr)](std::expected<void, Error> result) {
if (result)
send_promise->set_value(std::nullopt);
else
send_promise->set_value(std::move(result.error()));
});
return send_future;
}
Expand Down
8 changes: 4 additions & 4 deletions scaler/io/ymq/simple_interface.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ std::shared_ptr<IOSocket> syncCreateSocket(IOContext& context, IOSocketType type
void syncBindSocket(std::shared_ptr<IOSocket> socket, std::string address);
void syncConnectSocket(std::shared_ptr<IOSocket> socket, std::string address);

std::pair<Message, Error> syncRecvMessage(std::shared_ptr<IOSocket> socket);
std::expected<void, Error> syncSendMessage(std::shared_ptr<IOSocket> socket, Message message);
std::expected<Message, Error> syncRecvMessage(std::shared_ptr<IOSocket> socket);
std::optional<Error> syncSendMessage(std::shared_ptr<IOSocket> socket, Message message);

std::future<std::pair<Message, Error>> futureRecvMessage(std::shared_ptr<IOSocket> socket);
std::future<std::expected<void, Error>> futureSendMessage(std::shared_ptr<IOSocket> socket, Message message);
std::future<std::expected<Message, Error>> futureRecvMessage(std::shared_ptr<IOSocket> socket);
std::future<std::optional<Error>> futureSendMessage(std::shared_ptr<IOSocket> socket, Message message);

} // namespace ymq
} // namespace scaler
14 changes: 9 additions & 5 deletions scaler/io/ymq/ymq.pyi
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# NOTE: NOT IMPLEMENTATION, TYPE INFORMATION ONLY
# This file contains type stubs for the Ymq Python C Extension module
import abc
import sys
from collections.abc import Awaitable
from enum import IntEnum
Expand All @@ -11,12 +10,17 @@ if sys.version_info >= (3, 12):
else:
Buffer = object

class Bytes(Buffer, metaclass=abc.ABCMeta):
data: bytes
class Bytes(Buffer):
data: bytes | None
len: int

def __init__(self, data: SupportsBytes | bytes) -> None: ...
def __init__(self, data: Buffer | None = None) -> None: ...
def __repr__(self) -> str: ...
def __len__(self) -> int: ...

# this type signature is not 100% accurate because it's implemented in C
# but this satisfies the type check and is good enough
def __buffer__(self, flags: int, /) -> memoryview: ...

class Message:
address: Bytes | None
Expand Down Expand Up @@ -99,7 +103,7 @@ class YMQException(Exception):
code: ErrorCode
message: str

def __init__(self, code: ErrorCode, message: str) -> None: ...
def __init__(self, /, code: ErrorCode, message: str) -> None: ...
def __repr__(self) -> str: ...
def __str__(self) -> str: ...

Expand Down
20 changes: 0 additions & 20 deletions scaler/io/ymq/ymq_test.py

This file was deleted.

14 changes: 8 additions & 6 deletions scaler/object_storage/object_storage_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,15 @@ void ObjectStorageServer::processRequests()

std::ranges::for_each(_pendingSendMessageFuts, [](auto& fut) {
if (fut.wait_for(0s) == std::future_status::ready) {
auto res = fut.get();
assert(res);
auto error = fut.get();
assert(!error);
}
});

auto [message, error] = ymq::syncRecvMessage(_ioSocket);
auto maybeMessage = ymq::syncRecvMessage(_ioSocket);

if (error._errorCode != ymq::Error::ErrorCode::Uninit) {
if (!maybeMessage) {
auto error = maybeMessage.error();
if (error._errorCode == ymq::Error::ErrorCode::IOSocketStopRequested) {
auto n = std::ranges::count_if(_pendingSendMessageFuts, [](auto& x) {
return x.valid() && x.wait_for(0s) == std::future_status::timeout;
Expand All @@ -163,8 +164,9 @@ void ObjectStorageServer::processRequests()
}
}

const auto identity = lastMessageIdentity = message.address.as_string();
const auto headerOrPayload = std::move(message.payload);
const auto identity = *maybeMessage->address.as_string();
lastMessageIdentity = identity;
const auto headerOrPayload = std::move(maybeMessage->payload);

auto it = identityToFullRequest.find(identity);
if (it == identityToFullRequest.end()) {
Expand Down
3 changes: 2 additions & 1 deletion scaler/object_storage/object_storage_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include <expected>
#include <iostream>
#include <memory>
#include <optional>
#include <span>

#include "scaler/io/ymq/configuration.h"
Expand All @@ -22,7 +23,7 @@ namespace object_storage {
class ObjectStorageServer {
public:
using Identity = ymq::Configuration::IOSocketIdentity;
using SendMessageFuture = std::future<std::expected<void, ymq::Error>>;
using SendMessageFuture = std::future<std::optional<ymq::Error>>;

ObjectStorageServer();

Expand Down
2 changes: 1 addition & 1 deletion scripts/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ cmake --build --preset $BUILD_PRESET
cmake --install $BUILD_DIR

# Tests
ctest --preset $BUILD_PRESET
ctest --preset $BUILD_PRESET -VV
5 changes: 4 additions & 1 deletion tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ set(BUILD_GMOCK OFF CACHE BOOL "" FORCE)
set(BUILD_GTEST ON CACHE BOOL "" FORCE)
FetchContent_MakeAvailable(googletest)

find_package(Python3 COMPONENTS Development REQUIRED)

# This function compiles, links, and adds a C++ test executable using Google Test.
# It is shared by all test subdirectories.
function(add_test_executable test_name source_file)
Expand All @@ -26,16 +28,17 @@ function(add_test_executable test_name source_file)
CapnProto::capnp
CapnProto::kj
GTest::gtest_main
Python3::Python
)

add_test(NAME ${test_name} COMMAND ${test_name})
endfunction()


if(LINUX OR APPLE)
# This directory fetches Google Test, so it must be included first.
add_subdirectory(object_storage)

# Add the new directory for io tests.
add_subdirectory(io/ymq)
add_subdirectory(cc_ymq)
endif()
1 change: 1 addition & 0 deletions tests/cc_ymq/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
add_test_executable(test_cc_ymq test_cc_ymq.cpp)
Loading
Loading