diff --git a/.github/workflows/linter.yml b/.github/workflows/linter.yml index 4a65ae120..308a52d0a 100644 --- a/.github/workflows/linter.yml +++ b/.github/workflows/linter.yml @@ -74,7 +74,7 @@ jobs: - name: Build Object Storage Component run: | - CXX=$(which clang++) ./scripts/build.sh + CXX=$(which g++) ./scripts/build.sh - name: Install Python Dependent Packages run: | diff --git a/CMakeLists.txt b/CMakeLists.txt index 1dcbcacc3..94b1b3b4b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -47,6 +47,17 @@ else() message(STATUS "${PROJECT_NAME} ${CMAKE_BUILD_TYPE} build") endif() +# Compiler configuration +if(CMAKE_CXX_COMPILER_ID MATCHES "Clang") + message(STATUS "Using Clang: adding -stdlib=libc++") + add_compile_options(-stdlib=libc++) + add_link_options(-stdlib=libc++) + + # needed for jthread in libc++ + add_compile_options(-fexperimental-library) + add_link_options(-fexperimental-library) +endif() + find_package(CapnProto CONFIG REQUIRED) get_target_property(CAPNP_INCLUDE_DIRS CapnProto::capnp INTERFACE_INCLUDE_DIRECTORIES) message(STATUS "Found Capnp in ${CAPNP_INCLUDE_DIRS}") diff --git a/scaler/io/ymq/CMakeLists.txt b/scaler/io/ymq/CMakeLists.txt index f4d3d5572..a7f892883 100644 --- a/scaler/io/ymq/CMakeLists.txt +++ b/scaler/io/ymq/CMakeLists.txt @@ -1,30 +1,45 @@ add_library(cc_ymq SHARED) -add_library(ymq SHARED) target_sources(cc_ymq PRIVATE bytes.h common.h configuration.h - defs.h + main.h + epoll_context.h + epoll_context.cpp + event_loop_backend.h event_loop.h - event_loop_thread.cpp + event_loop_thread.h + event_loop_thread.cpp + event_manager.h - file_descriptor.h + # file_descriptor.h + + message_connection.h + message_connection_tcp.h + + third_party/concurrentqueue.h interruptive_concurrent_queue.h - io_context.cpp + + typedefs.h + io_context.h - io_socket.cpp + io_context.cpp + io_socket.h - main.h - message_connection.h - message_connection_tcp.h - tcp_client.h + io_socket.cpp + tcp_server.h - third_party/concurrentqueue.h - timed_concurrent_queue.h + tcp_server.cpp + + tcp_client.h + tcp_client.cpp + + timestamp.h + timed_queue.h ) set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/scaler/io/ymq) @@ -34,9 +49,18 @@ install(TARGETS cc_ymq find_package(Python COMPONENTS Development) +add_library(ymq SHARED) + set_target_properties(ymq PROPERTIES PREFIX "") +set_target_properties(ymq PROPERTIES LINKER_LANGUAGE CXX) -target_sources(ymq PRIVATE pymod_ymq/ymq.cpp) +target_sources(ymq PRIVATE pymod_ymq/bytes.h + pymod_ymq/message.h + pymod_ymq/io_context.h + pymod_ymq/io_socket.h + pymod_ymq/ymq.h + pymod_ymq/ymq.cpp +) target_include_directories(ymq PRIVATE ${Python_INCLUDE_DIRS}) target_link_libraries(ymq PRIVATE cc_ymq PRIVATE ${Python_LIBRARIES} @@ -45,4 +69,6 @@ target_link_libraries(ymq PRIVATE cc_ymq target_link_options(ymq PRIVATE "-Wl,-rpath,$ORIGIN") install(TARGETS ymq - LIBRARY DESTINATION ${CMAKE_SOURCE_DIR}/scaler/lib) + LIBRARY DESTINATION scaler/io/ymq) + +add_subdirectory(examples) diff --git a/scaler/io/ymq/bytes.h b/scaler/io/ymq/bytes.h index 077350daf..cfc3b0af1 100644 --- a/scaler/io/ymq/bytes.h +++ b/scaler/io/ymq/bytes.h @@ -10,48 +10,45 @@ // First-party #include "scaler/io/ymq/common.h" +#include "scaler/io/ymq/typedefs.h" class Bytes { - uint8_t* m_data; - size_t m_len; - - enum Ownership { Owned, Borrowed } tag; + uint8_t* _data; + size_t _len; + Ownership _tag; void free() { - if (tag != Owned) + if (_tag != Owned) return; if (is_empty()) return; - delete[] m_data; - this->m_data = NULL; + delete[] _data; + this->_data = NULL; } - Bytes(uint8_t* m_data, size_t m_len, Ownership tag): m_data(m_data), m_len(m_len), tag(tag) { - if (tag == Owned && m_data == NULL) - panic("tried to create owned bytes with NULL m_data"); - } + Bytes(uint8_t* m_data, size_t m_len, Ownership tag): _data(m_data), _len(m_len), _tag(tag) {} public: // move-only // TODO: make copyable Bytes(const Bytes&) = delete; Bytes& operator=(const Bytes&) = delete; - Bytes(Bytes&& other) noexcept: m_data(other.m_data), m_len(other.m_len), tag(other.tag) { - other.m_data = NULL; - other.m_len = 0; + Bytes(Bytes&& other) noexcept: _data(other._data), _len(other._len), _tag(other._tag) { + other._data = NULL; + other._len = 0; } Bytes& operator=(Bytes&& other) noexcept { if (this != &other) { this->free(); // free current data - m_data = other.m_data; - m_len = other.m_len; - tag = other.tag; + _data = other._data; + _len = other._len; + _tag = other._tag; - other.m_data = NULL; - other.m_len = 0; + other._data = NULL; + other._len = 0; } return *this; } @@ -59,28 +56,28 @@ class Bytes { ~Bytes() { this->free(); } bool operator==(const Bytes& other) const { - if (m_len != other.m_len) + if (_len != other._len) return false; - if (m_data == other.m_data) + if (_data == other._data) return true; - return std::memcmp(m_data, other.m_data, m_len) == 0; + return std::memcmp(_data, other._data, _len) == 0; } bool operator!() const { return is_empty(); } - bool is_empty() const { return this->m_data == NULL; } + bool is_empty() const { return this->_data == NULL; } // debugging utility std::string as_string() const { if (is_empty()) return "[EMPTY]"; - return std::string((char*)m_data, m_len); + return std::string((char*)_data, _len); } - Bytes ref() { return Bytes {this->m_data, this->m_len, Borrowed}; } + Bytes ref() { return Bytes {this->_data, this->_len, Borrowed}; } static Bytes alloc(size_t m_len) { if (m_len == 0) @@ -102,7 +99,7 @@ class Bytes { if (bytes.is_empty()) panic("tried to clone empty bytes"); - return Bytes {datadup(bytes.m_data, bytes.m_len), bytes.m_len, Owned}; + return Bytes {datadup(bytes._data, bytes._len), bytes._len, Owned}; } // static Bytes from_buffer(Buffer& buffer) { return buffer.into_bytes(); } @@ -123,7 +120,8 @@ class Bytes { // return buffer; // } - size_t len() const { return m_len; } + size_t len() const { return _len; } + const uint8_t* data() const { return _data; } friend class Buffer; }; diff --git a/scaler/io/ymq/common.h b/scaler/io/ymq/common.h index c99594a2d..9ae6ebf60 100644 --- a/scaler/io/ymq/common.h +++ b/scaler/io/ymq/common.h @@ -14,7 +14,7 @@ const size_t HEADER_SIZE = 4; // size of the message header in bytes using Errno = int; -void print_trace(void) { +inline void print_trace(void) { void* array[10]; char** strings; int size, i; @@ -32,7 +32,8 @@ void print_trace(void) { // this is an unrecoverable error that exits the program // prints a message plus the source location -[[noreturn]] void panic(std::string message, const std::source_location& location = std::source_location::current()) { +[[noreturn]] inline void panic( + std::string message, const std::source_location& location = std::source_location::current()) { auto file_name = std::string(location.file_name()); file_name = file_name.substr(file_name.find_last_of("/") + 1); @@ -44,7 +45,7 @@ void print_trace(void) { std::abort(); } -[[noreturn]] void todo( +[[noreturn]] inline void todo( std::optional message = std::nullopt, const std::source_location& location = std::source_location::current()) { if (message) { @@ -54,19 +55,19 @@ void print_trace(void) { } } -uint8_t* datadup(const uint8_t* data, size_t len) { +inline uint8_t* datadup(const uint8_t* data, size_t len) { uint8_t* dup = new uint8_t[len]; std::memcpy(dup, data, len); return dup; } -void serialize_u32(uint32_t x, uint8_t buffer[4]) { +inline void serialize_u32(uint32_t x, uint8_t buffer[4]) { buffer[0] = x & 0xFF; buffer[1] = (x >> 8) & 0xFF; buffer[2] = (x >> 16) & 0xFF; buffer[3] = (x >> 24) & 0xFF; } -void deserialize_u32(const uint8_t buffer[4], uint32_t* x) { +inline void deserialize_u32(const uint8_t buffer[4], uint32_t* x) { *x = buffer[0] | buffer[1] << 8 | buffer[2] << 16 | buffer[3] << 24; } diff --git a/scaler/io/ymq/configuration.h b/scaler/io/ymq/configuration.h index 980efc68b..acc64a43f 100644 --- a/scaler/io/ymq/configuration.h +++ b/scaler/io/ymq/configuration.h @@ -1,8 +1,11 @@ #pragma once -// First-party -#include "scaler/io/ymq/epoll_context.h" +// C++ +#include -struct configuration { - using polling_context_t = EpollContext; +class EpollContext; + +struct Configuration { + using PollingContext = EpollContext; + using Identity = std::string; }; diff --git a/scaler/io/ymq/defs.h b/scaler/io/ymq/defs.h deleted file mode 100644 index 5e5fbe4e9..000000000 --- a/scaler/io/ymq/defs.h +++ /dev/null @@ -1 +0,0 @@ -// TODO: C-API definitions diff --git a/scaler/io/ymq/epoll_context.cpp b/scaler/io/ymq/epoll_context.cpp new file mode 100644 index 000000000..25d86caf0 --- /dev/null +++ b/scaler/io/ymq/epoll_context.cpp @@ -0,0 +1,77 @@ +#include "scaler/io/ymq/epoll_context.h" +#include +#include "scaler/io/ymq/common.h" + +void EpollContext::registerEventManager(EventManager& em) { + epoll_event ev { + .events = EPOLLOUT | EPOLLIN | EPOLLET, // Edge-triggered + .data = {.ptr = &em}, + }; + + epoll_fd.epoll_ctl(EPOLL_CTL_ADD, em._fd, &ev); +} + +void EpollContext::removeEventManager(EventManager& em) { + epoll_fd.epoll_ctl(EPOLL_CTL_DEL, em._fd, nullptr); +} + +void EpollContext::executePendingFunctors() {} + +void EpollContext::loop() { + std::array events; + auto result = epoll_fd.epoll_wait(events.data(), 1024, -1); + + if (!result) { + panic(std::format("Failed to epoll_wait(): {}", result.error())); + }; + + for (auto it = events.begin(); it != events.begin() + *result; ++it) { + epoll_event current_event = *it; + if (current_event.events & EPOLLERR) { + // ... + } + + auto* event = (EventManager*)current_event.data.ptr; + // event->onEvents(current_event.events); + } + executePendingFunctors(); +} + +// EXAMPLE +// epoll_wait; +// for each event that is returned to the caller { +// cast the event back to EventManager +// if this event is an eventfd { +// func = queue front +// func() +// } else if this event is an timerfd { +// if it is oneshot then execute once +// if it is multishot then execute and rearm timer +// } else { +// eventmanager.revent = events return by epoll +// eventmanager.on_event() , +// where as on_event is set differently for tcpserver, tcpclient, and tcpconn + +// they are defined something like: +// tcpserver.on_event() { +// accept the socket and generate a new tcpConn or handle err +// this.ioSocket.addNewConn(tcpConn) +// } +// tcpclient.on_event() { +// connect the socket and generate a new tcpConn +// this.ioSocket.addNewConn(tcpConn) +// if there need retry { +// close this socket +// this.eventloop.executeAfter(the time you want from now) +// } +// } +// tcpConn.on_event() { +// read everything you can to the buffer +// write everything you can to write the remote end +// if tcpconn.ioSocket is something special, for example dealer +// tcpConn.ioSocket.route to corresponding tcpConn +// } +// +// } + +// } diff --git a/scaler/io/ymq/epoll_context.h b/scaler/io/ymq/epoll_context.h index fd1a65049..b77fc1934 100644 --- a/scaler/io/ymq/epoll_context.h +++ b/scaler/io/ymq/epoll_context.h @@ -1,72 +1,57 @@ #pragma once +// System +#include + // C++ #include +#include // First-party +#include "scaler/io/ymq/event_manager.h" +#include "scaler/io/ymq/file_descriptor.h" +#include "scaler/io/ymq/timestamp.h" class EventManager; + struct EpollContext { + FileDescriptor epoll_fd; + using Function = std::function; // TBD - using TimeStamp = int; // TBD using Identifier = int; // TBD void registerCallbackBeforeLoop(EventManager*); - void loop() { - for (;;) { - // EXAMPLE - // epoll_wait; - // for each event that is returned to the caller { - // cast the event back to EventManager - // if this event is an eventfd { - // func = queue front - // func() - // } else if this event is an timerfd { - // if it is oneshot then execute once - // if it is multishot then execute and rearm timer - // } else { - // eventmanager.revent = events return by epoll - // eventmanager.on_event() , - // where as on_event is set differently for tcpserver, tcpclient, and tcpconn - // they are defined something like: - // tcpserver.on_event() { - // accept the socket and generate a new tcpConn or handle err - // this.ioSocket.addNewConn(tcpConn) - // } - // tcpclient.on_event() { - // connect the socket and generate a new tcpConn - // this.ioSocket.addNewConn(tcpConn) - // if there need retry { - // close this socket - // this.eventloop.executeAfter(the time you want from now) - // } - // } - // tcpConn.on_event() { - // read everything you can to the buffer - // write everything you can to write the remote end - // if tcpconn.ioSocket is something special, for example dealer - // tcpConn.ioSocket.route to corresponding tcpConn - // } - // - // } + EpollContext() { + auto fd = FileDescriptor::epollfd(); - // } + if (!fd) { + throw std::system_error(fd.error(), std::system_category(), "Failed to create epoll fd"); } + + this->epoll_fd = std::move(*fd); } + + void loop(); + void registerEventManager(EventManager& em); + void removeEventManager(EventManager& em); + void stop(); - void executeNow(Function func); + void executeNow(Function func) { + // TODO: Implement this function + } + void executeLater(Function func, Identifier identifier); - void executeAt(TimeStamp, Function, Identifier identifier); + void executeAt(Timestamp, Function, Identifier identifier); void cancelExecution(Identifier identifier); - // int epoll_fd; + void executePendingFunctors(); + // int connect_timer_tfd; // std::map monitoringEvent; // bool timer_armed; // // NOTE: Utility functions, may be defined otherwise // void ensure_timer_armed(); - // void add_epoll(int fd, uint32_t flags, EpollType type, void* data); // void remove_epoll(int fd); // EpollData* epoll_by_fd(int fd); }; diff --git a/scaler/io/ymq/event_loop.h b/scaler/io/ymq/event_loop.h index 0723e9d6a..a23178414 100644 --- a/scaler/io/ymq/event_loop.h +++ b/scaler/io/ymq/event_loop.h @@ -1,10 +1,13 @@ #pragma once // C++ +#include // uint64_t #include // First-party -// #include "scaler/io/ymq/event_manager.hpp" +#include "scaler/io/ymq/event_manager.h" +#include "scaler/io/ymq/epoll_context.h" +#include "scaler/io/ymq/event_loop_backend.h" // #include "scaler/io/ymq/interruptive_concurrent_queue.hpp" // #include "scaler/io/ymq/timed_concurrent_queue.hpp" @@ -12,24 +15,29 @@ // #include "scaler/io/ymq/third_party/concurrentqueue.h" // #include "scaler/io/ymq/event_loop_backend.hpp" -#include "scaler/io/ymq/epoll_context.h" +struct Timestamp; +class EventManager; + template struct EventLoop { using Function = std::function; // TBD - using TimeStamp = int; // TBD using Identifier = int; // TBD - void loop(); + void loop() { eventLoopBackend->loop(); } void stop(); - void executeNow(Function func); + void executeNow(Function func) { eventLoopBackend->executeNow(func); } void executeLater(Function func, Identifier identifier); - void executeAt(TimeStamp, Function, Identifier identifier); + void executeAt(Timestamp, Function, Identifier identifier); void cancelExecution(Identifier identifier); void registerCallbackBeforeLoop(EventManager*); + void registerEventManager(EventManager &em) { + eventLoopBackend->registerEventManager(em); + } + // InterruptiveConcurrentQueue immediateExecutionQueue; // TimedConcurrentQueue timedExecutionQueue; // ConcurrentQueue delayedExecutionQueue; - // EventLoopBackend eventLoopBackend; + EventLoopBackend* eventLoopBackend; }; diff --git a/scaler/io/ymq/event_loop_thread.cpp b/scaler/io/ymq/event_loop_thread.cpp index cdfb38f7d..315d7a54b 100644 --- a/scaler/io/ymq/event_loop_thread.cpp +++ b/scaler/io/ymq/event_loop_thread.cpp @@ -1,10 +1,30 @@ #include "scaler/io/ymq/event_loop_thread.h" -IOSocket* EventLoopThread::addIOSocket(std::string identity, std::string socketType) { - return nullptr; +#include + +#include "scaler/io/ymq/io_socket.h" + +IOSocket* EventLoopThread::createIOSocket(std::string identity, IOSocketType socketType) { + if (thread.get_id() == std::thread::id()) { + thread = std::jthread([this](std::stop_token token) { + while (!token.stop_requested()) { + this->eventLoop.loop(); + } + }); + } + + auto [iterator, inserted] = identityToIOSocket.try_emplace(identity, shared_from_this(), identity, socketType); + assert(inserted); + auto ptr = &iterator->second; + + // TODO: Something happen with the running thread + eventLoop.executeNow([ptr] { ptr->onCreated(); }); + return ptr; } -bool EventLoopThread::removeIOSocket(IOSocket*) { - return false; +// TODO: Think about non null pointer +void EventLoopThread::removeIOSocket(IOSocket* target) { + // TODO: Something happen with the running thread + identityToIOSocket.erase(target->identity()); } diff --git a/scaler/io/ymq/event_loop_thread.h b/scaler/io/ymq/event_loop_thread.h index c2f2f08fc..b3e49e412 100644 --- a/scaler/io/ymq/event_loop_thread.h +++ b/scaler/io/ymq/event_loop_thread.h @@ -1,34 +1,35 @@ #pragma once -// C++ #include +#include #include -// First-party #include "scaler/io/ymq/configuration.h" #include "scaler/io/ymq/event_loop.h" -// #include "scaler/io/ymq/io_socket.hpp" +#include "scaler/io/ymq/typedefs.h" class IOSocket; -class EventLoopThread { - using PollingContext = configuration::polling_context_t; - std::thread thread; - // std::map identityToIOSocket; - EventLoop eventLoop; +class EventLoopThread: public std::enable_shared_from_this { + using PollingContext = Configuration::PollingContext; + std::jthread thread; + std::map identityToIOSocket; public: + EventLoop eventLoop; // Why not make the class a friend class of IOContext? // Because the removeIOSocket method is a bit trickier than addIOSocket, - // the IOSocket that is being removed will first remove every MessageConnectionTCP - // managed by it from the EventLoop, before it removes it self from ioSockets. - // return eventLoop.executeNow(createIOSocket()); - IOSocket* addIOSocket(std::string identity, std::string socketType); + // the IOSocket that is being removed will first remove every + // MessageConnectionTCP managed by it from the EventLoop, before it removes + // it self from ioSockets. return eventLoop.executeNow(createIOSocket()); + IOSocket* createIOSocket(std::string identity, IOSocketType socketType); - bool removeIOSocket(IOSocket*); + void removeIOSocket(IOSocket* target); // EventLoop& getEventLoop(); // IOSocket* getIOSocketByIdentity(size_t identity); - // EventLoopThread(const EventLoopThread&) = delete; - // EventLoopThread& operator=(const EventLoopThread&) = delete; + EventLoopThread(const EventLoopThread&) = delete; + EventLoopThread& operator=(const EventLoopThread&) = delete; + // TODO: Revisit the default ctor + EventLoopThread() = default; }; diff --git a/scaler/io/ymq/event_manager.h b/scaler/io/ymq/event_manager.h index 95e8af795..097480ecd 100644 --- a/scaler/io/ymq/event_manager.h +++ b/scaler/io/ymq/event_manager.h @@ -1,22 +1,26 @@ #pragma once // C++ +#include // uint64_t #include +#include // First-party #include "scaler/io/ymq/event_loop_thread.h" +#include "scaler/io/ymq/file_descriptor.h" + +class EventLoopThread; class EventManager { - EventLoopThread& eventLoop; - const int fd; - // Implementation defined method, will call onRead, onWrite etc based on events - void onEvents(); + std::shared_ptr eventLoop; + FileDescriptor _fd; public: int events; int revents; void updateEvents(); + void onEvents(uint64_t events) {} // User that registered them should have everything they need // In the future, we might add more onXX() methods, for now these are all we need. using OnEventCallback = std::function; @@ -24,4 +28,7 @@ class EventManager { OnEventCallback onWrite; OnEventCallback onClose; OnEventCallback onError; + EventManager(): _fd {} {} + + friend class EpollContext; }; diff --git a/scaler/io/ymq/examples/CMakeLists.txt b/scaler/io/ymq/examples/CMakeLists.txt index d44970716..6b0e4cca5 100644 --- a/scaler/io/ymq/examples/CMakeLists.txt +++ b/scaler/io/ymq/examples/CMakeLists.txt @@ -1,4 +1,8 @@ add_executable(echo_server echo_server.cpp) +add_executable(timestamp timestamp.cpp) +add_executable(timed_queue timed_queue.cpp) -target_link_libraries(echo_server ymq) +target_link_libraries(echo_server cc_ymq) +target_link_libraries(timestamp cc_ymq) +target_link_libraries(timed_queue cc_ymq) diff --git a/scaler/io/ymq/examples/echo_server.cpp b/scaler/io/ymq/examples/echo_server.cpp index 7fc66b105..747362c8b 100644 --- a/scaler/io/ymq/examples/echo_server.cpp +++ b/scaler/io/ymq/examples/echo_server.cpp @@ -4,6 +4,7 @@ // First-party #include "scaler/io/ymq/io_context.h" #include "scaler/io/ymq/io_socket.h" +#include "scaler/io/ymq/typedefs.h" // Goal: // Make sure we can write an echo server with ymq in C++, pretend there is a language barrier, to mimic @@ -14,7 +15,7 @@ int main() { printf("Hello, world!\n"); IOContext context; - IOSocket* socket = context.addIOSocket("ServerSocket", "Dealer"); + std::shared_ptr socket = context.createIOSocket("ServerSocket", IOSocketType::Dealer); // char buf[8]; // while (true) { // socket.read("any_identity", buf, []() { printf("read completed\n"); }); diff --git a/scaler/io/ymq/examples/timed_queue.cpp b/scaler/io/ymq/examples/timed_queue.cpp new file mode 100644 index 000000000..5cedf1c70 --- /dev/null +++ b/scaler/io/ymq/examples/timed_queue.cpp @@ -0,0 +1,8 @@ +#include "scaler/io/ymq/timed_queue.h" + +int main() { + TimedQueue tq; + Timestamp ts; + + tq.push(ts, [] { printf("in timer\n"); }); +} diff --git a/scaler/io/ymq/examples/timestamp.cpp b/scaler/io/ymq/examples/timestamp.cpp new file mode 100644 index 000000000..08f3808b6 --- /dev/null +++ b/scaler/io/ymq/examples/timestamp.cpp @@ -0,0 +1,17 @@ +#include "scaler/io/ymq/timestamp.h" + +#include + +using namespace std::chrono_literals; + +int main() { + Timestamp ts; + std::cout << ts.timestamp << std::endl; + Timestamp three_seconds_later_than_ts = ts.createTimestampByOffsetDuration(3s); + std::cout << three_seconds_later_than_ts.timestamp << std::endl; + printf("%s\n", stringifyTimestamp(ts).c_str()); + // a timestamp is smaller iff it is closer to the beginning of the world + if (ts < three_seconds_later_than_ts) { + printf("ts happen before than three_seconds_later_than_ts\n"); + } +} diff --git a/scaler/io/ymq/file_descriptor.h b/scaler/io/ymq/file_descriptor.h index d3488bf3b..7be424683 100644 --- a/scaler/io/ymq/file_descriptor.h +++ b/scaler/io/ymq/file_descriptor.h @@ -23,13 +23,15 @@ class FileDescriptor { FileDescriptor(int fd): fd(fd) {} public: - ~FileDescriptor() { + ~FileDescriptor() noexcept(false) { if (auto code = close(fd) < 0) throw std::system_error(errno, std::system_category(), "Failed to close file descriptor"); this->fd = -1; } + FileDescriptor(): fd(-1) {} + // move-only FileDescriptor(const FileDescriptor&) = delete; FileDescriptor& operator=(const FileDescriptor&) = delete; @@ -164,19 +166,19 @@ class FileDescriptor { } } - std::optional epoll_ctl(int op, FileDescriptor& other, epoll_event& event) { - if (::epoll_ctl(fd, op, other.fd, &event) < 0) { + std::optional epoll_ctl(int op, FileDescriptor& other, epoll_event* event) { + if (::epoll_ctl(fd, op, other.fd, event) < 0) { return errno; } else { return std::nullopt; } } - std::optional epoll_wait(epoll_event* events, int maxevents, int timeout) { - if (::epoll_wait(fd, events, maxevents, timeout) < 0) { + std::expected epoll_wait(epoll_event* events, int maxevents, int timeout) { + if (auto n = ::epoll_wait(fd, events, maxevents, timeout) < 0) { return errno; } else { - return std::nullopt; + return n; } } }; diff --git a/scaler/io/ymq/io_context.cpp b/scaler/io/ymq/io_context.cpp index 1910ddcae..bb87ffca1 100644 --- a/scaler/io/ymq/io_context.cpp +++ b/scaler/io/ymq/io_context.cpp @@ -1,14 +1,29 @@ -#include "scaler/io/ymq/event_loop_thread.h" + #include "scaler/io/ymq/io_context.h" -IOSocket* IOContext::addIOSocket(std::string identity, std::string socketType) { - std::lock_guard guard {_threadsMu}; - static size_t threadsHead {0}; - auto res = _threads[threadsHead].addIOSocket(identity, socketType); - ++threadsHead %= _threads.size(); - return res; +#include // std::ranges::generate +#include // assert + +#include "scaler/io/ymq/event_loop_thread.h" +#include "scaler/io/ymq/io_socket.h" +#include "scaler/io/ymq/typedefs.h" + +IOContext::IOContext(size_t threadCount): _threads(threadCount) { + assert(threadCount > 0); + std::ranges::generate(_threads, std::make_shared); +} + +std::shared_ptr IOContext::createIOSocket(Identity identity, IOSocketType socketType) { + static size_t threadsRoundRobin = 0; + auto& thread = _threads[threadsRoundRobin]; + ++threadsRoundRobin %= _threads.size(); + + auto socket = std::make_shared(thread, identity, socketType); + // todo + // thread.addIOSocket(socket); + return socket; } -bool IOContext::removeIOSocket(IOSocket*) { - return false; +bool IOContext::removeIOSocket(std::shared_ptr socket) { + return false; // todo: implement this } diff --git a/scaler/io/ymq/io_context.h b/scaler/io/ymq/io_context.h index 2a199711b..2f998eb6e 100644 --- a/scaler/io/ymq/io_context.h +++ b/scaler/io/ymq/io_context.h @@ -1,30 +1,34 @@ #pragma once // C++ +#include #include #include #include // First-party #include "scaler/io/ymq/event_loop_thread.h" -#include "scaler/io/ymq/io_socket.h" +#include "scaler/io/ymq/typedefs.h" + +using Identity = Configuration::Identity; class IOSocket; class IOContext { // This is a pointer, just for now - std::vector _threads; - std::mutex _threadsMu; + std::vector> _threads; public: - IOContext() = default; + IOContext(size_t threadCount = 1); + IOContext(const IOContext&) = delete; IOContext& operator=(const IOContext&) = delete; IOContext(IOContext&&) = delete; IOContext& operator=(IOContext&&) = delete; // These methods need to be thread-safe. - IOSocket* addIOSocket(std::string identity, std::string socketType); - // ioSocket.getEventLoop().removeIOSocket(&ioSocket); - bool removeIOSocket(IOSocket*); + std::shared_ptr createIOSocket(Identity identity, IOSocketType socketType); + bool removeIOSocket(std::shared_ptr); + + size_t numThreads() const { return _threads.size(); } }; diff --git a/scaler/io/ymq/io_socket.cpp b/scaler/io/ymq/io_socket.cpp index e69de29bb..444580e4d 100644 --- a/scaler/io/ymq/io_socket.cpp +++ b/scaler/io/ymq/io_socket.cpp @@ -0,0 +1,18 @@ +#include "scaler/io/ymq/io_socket.h" + +// NOTE: We need it after we put impl +#include "scaler/io/ymq/event_loop_thread.h" + +void IOSocket::onCreated() { + // Detect if we need to initialize tcpClient and/or tcpServer + // If so, initialize it, and then call their onAdd(); + if (_socketType == IOSocketType::Router) { + // assert(!tcpClient); + _tcpClient.emplace(_eventLoopThread); + // assert(!tcpServer); + _tcpServer.emplace(_eventLoopThread); + _tcpClient->onCreated(); + _tcpServer->onCreated(); + } + // Different SocketType might have different rules +} diff --git a/scaler/io/ymq/io_socket.h b/scaler/io/ymq/io_socket.h index b1360386d..aed8363fd 100644 --- a/scaler/io/ymq/io_socket.h +++ b/scaler/io/ymq/io_socket.h @@ -3,31 +3,45 @@ // C++ // #include // #include -#include +#include +#include // First-party +#include "scaler/io/ymq/configuration.h" #include "scaler/io/ymq/event_loop_thread.h" -// #include "scaler/io/ymq/message_connection_tcp.hpp" -// #include "scaler/io/ymq/tcp_client.hpp" -// #include "scaler/io/ymq/tcp_server.hpp" +#include "scaler/io/ymq/tcp_client.h" +#include "scaler/io/ymq/tcp_server.h" +#include "scaler/io/ymq/typedefs.h" + +using Identity = Configuration::Identity; + +class TCPClient; +class TCPServer; + +class EventLoopThread; class IOSocket { - EventLoopThread& eventLoopThread; - enum SocketTypes { Binder, Sub, Pub, Dealer, Router, Pair /* etc. */ }; - SocketTypes socketTypes; + std::shared_ptr _eventLoopThread; + Identity _identity; + IOSocketType _socketType; - // std::optional tcpServer; - // std::optional tcpClient; + std::optional _tcpClient; + std::optional _tcpServer; // std::map fdToConnection; // std::map identityToConnection; public: + IOSocket(std::shared_ptr eventLoopThread, Identity identity, IOSocketType socketType) + : _eventLoopThread(eventLoopThread), _identity(identity), _socketType(socketType) {} + IOSocket(const IOSocket&) = delete; IOSocket& operator=(const IOSocket&) = delete; IOSocket(IOSocket&&) = delete; IOSocket& operator=(IOSocket&&) = delete; - const std::string identity; + Identity identity() const { return _identity; } + IOSocketType socketType() const { return _socketType; } + // string -> connection mapping // and connection->string mapping @@ -47,5 +61,7 @@ class IOSocket { // ) // } + void onCreated(); + // void recvMessage(Message* msg); }; diff --git a/scaler/io/ymq/message_connection_tcp.h b/scaler/io/ymq/message_connection_tcp.h index 4d904445f..c7e87aa80 100644 --- a/scaler/io/ymq/message_connection_tcp.h +++ b/scaler/io/ymq/message_connection_tcp.h @@ -16,6 +16,6 @@ class MessageConnectionTCP: public MessageConnection { TcpReadOperation read_op; public: - void send(Bytes data, SendMessageContinuation k) { todo(); } - void recv(RecvMessageContinuation k) { todo(); } + void send(Bytes data, SendMessageContinuation k); + void recv(RecvMessageContinuation k); }; diff --git a/scaler/io/ymq/note b/scaler/io/ymq/note new file mode 100644 index 000000000..e69de29bb diff --git a/scaler/io/ymq/pymod_ymq/bytes.cpp b/scaler/io/ymq/pymod_ymq/bytes.cpp deleted file mode 100644 index 681cd30ad..000000000 --- a/scaler/io/ymq/pymod_ymq/bytes.cpp +++ /dev/null @@ -1,64 +0,0 @@ -// allows us to define PyTypeObjects in the canonical way without warnings -#pragma clang diagnostic ignored "-Wreorder-init-list" -#pragma clang diagnostic ignored "-Wc99-designator" - -// Python -#define PY_SSIZE_T_CLEAN -#include -#include - -struct PyBytesYmq { - PyObject_HEAD; -}; - -static int PyBytesYmq_init(PyBytesYmq* self, PyObject* args, PyObject* kwds) { - return 0; // todo -} - -static void PyBytesYmq_dealloc(PyBytesYmq* self) { - // todo -} - -static PyObject* PyBytesYmq_repr(PyBytesYmq* self) { - Py_RETURN_NONE; // todo -} - -static PyObject* PyBytesYmq_data_getter(PyBytesYmq* self) { - Py_RETURN_NONE; // todo -} - -static PyObject* PyBytesYmq_len_getter(PyBytesYmq* self) { - Py_RETURN_NONE; // todo -} - -static int PyBytesYmq_getbuffer(PyBytesYmq* self, Py_buffer* view, int flags) { - return 0; // todo -} - -static void PyBytesYmq_releasebuffer(PyBytesYmq* self, Py_buffer* view) { - // todo -} - -static PyGetSetDef PyBytesYmq_properties[] = {{nullptr, nullptr, nullptr, nullptr, nullptr}}; - -static PyBufferProcs PyBytesYmqBufferProcs = { - .bf_getbuffer = (getbufferproc)PyBytesYmq_getbuffer, - .bf_releasebuffer = (releasebufferproc)PyBytesYmq_releasebuffer, -}; - -// clang-format off -static PyTypeObject PyBytesYmqType = { - PyVarObject_HEAD_INIT(NULL, 0) - .tp_name = "ymq.Bytes", - .tp_doc = PyDoc_STR("Bytes"), - .tp_basicsize = sizeof(PyBytesYmq), - .tp_itemsize = 0, - .tp_flags = Py_TPFLAGS_DEFAULT, - .tp_new = PyType_GenericNew, - .tp_init = (initproc)PyBytesYmq_init, - .tp_repr = (reprfunc)PyBytesYmq_repr, - .tp_dealloc = (destructor)PyBytesYmq_dealloc, - .tp_getset = PyBytesYmq_properties, - .tp_as_buffer = &PyBytesYmqBufferProcs, -}; -// clang-format on diff --git a/scaler/io/ymq/pymod_ymq/bytes.h b/scaler/io/ymq/pymod_ymq/bytes.h new file mode 100644 index 000000000..b898ca305 --- /dev/null +++ b/scaler/io/ymq/pymod_ymq/bytes.h @@ -0,0 +1,107 @@ +#pragma once + +// Python +#define PY_SSIZE_T_CLEAN +#include +#include + +// First-party +#include "scaler/io/ymq/bytes.h" + +struct PyBytesYmq { + PyObject_HEAD; + Bytes bytes; +}; + +extern "C" { + +static int PyBytesYmq_init(PyBytesYmq* self, PyObject* args, PyObject* kwds) { + PyObject* bytes = nullptr; + const char* keywords[] = {"bytes", nullptr}; + if (!PyArg_ParseTupleAndKeywords(args, kwds, "|O", (char**)keywords, &bytes)) { + return -1; // Error parsing arguments + } + + if (!bytes) { + // If no bytes were provided, initialize with an empty Bytes object + self->bytes = Bytes::empty(); + return 0; + } + + if (!PyBytes_Check(bytes)) { + bytes = PyObject_Bytes(bytes); + + if (!bytes) { + PyErr_SetString(PyExc_TypeError, "Expected bytes or bytes-like object"); + return -1; + } + } + + char* data; + Py_ssize_t len; + + if (PyBytes_AsStringAndSize(bytes, &data, &len) < 0) { + PyErr_SetString(PyExc_TypeError, "Failed to get bytes data"); + return -1; + } + + // copy the data into the Bytes object + // it might be possible to make this zero-copy in the future + self->bytes = Bytes::copy((uint8_t*)data, len); + + return 0; +} + +static void PyBytesYmq_dealloc(PyBytesYmq* self) { + self->bytes.~Bytes(); // Call the destructor of Bytes + Py_TYPE(self)->tp_free(self); +} + +static PyObject* PyBytesYmq_repr(PyBytesYmq* self) { + if (self->bytes.is_empty()) { + return PyUnicode_FromString(""); + } else { + return PyUnicode_FromFormat("", self->bytes.len()); + } +} + +static PyObject* PyBytesYmq_data_getter(PyBytesYmq* self) { + return PyBytes_FromStringAndSize((const char*)self->bytes.data(), self->bytes.len()); +} + +static PyObject* PyBytesYmq_len_getter(PyBytesYmq* self) { + return PyLong_FromSize_t(self->bytes.len()); +} + +static int PyBytesYmq_getbuffer(PyBytesYmq* self, Py_buffer* view, int flags) { + return PyBuffer_FillInfo(view, (PyObject*)self, (void*)self->bytes.data(), self->bytes.len(), true, flags); +} +} + +static PyGetSetDef PyBytesYmq_properties[] = { + {"data", (getter)PyBytesYmq_data_getter, nullptr, PyDoc_STR("Data of the Bytes object"), nullptr}, + {"len", (getter)PyBytesYmq_len_getter, nullptr, PyDoc_STR("Length of the Bytes object"), nullptr}, + {nullptr, nullptr, nullptr, nullptr, nullptr} // Sentinel +}; + +static PyBufferProcs PyBytesYmqBufferProcs = { + .bf_getbuffer = (getbufferproc)PyBytesYmq_getbuffer, + .bf_releasebuffer = (releasebufferproc) nullptr, +}; + +static PyType_Slot PyBytesYmq_slots[] = { + {Py_tp_init, (void*)PyBytesYmq_init}, + {Py_tp_dealloc, (void*)PyBytesYmq_dealloc}, + {Py_tp_repr, (void*)PyBytesYmq_repr}, + {Py_tp_getset, (void*)PyBytesYmq_properties}, + {Py_bf_getbuffer, (void*)&PyBytesYmqBufferProcs}, + {0, nullptr}, +}; + +static PyType_Spec PyBytesYmq_spec = { + .name = "ymq.Bytes", + .basicsize = sizeof(PyBytesYmq), + .itemsize = 0, + .flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_IMMUTABLETYPE, + .slots = PyBytesYmq_slots, +}; diff --git a/scaler/io/ymq/pymod_ymq/io_context.cpp b/scaler/io/ymq/pymod_ymq/io_context.cpp deleted file mode 100644 index e6675ef19..000000000 --- a/scaler/io/ymq/pymod_ymq/io_context.cpp +++ /dev/null @@ -1,42 +0,0 @@ -// allows us to define PyTypeObjects in the canonical way without warnings -#pragma clang diagnostic ignored "-Wreorder-init-list" -#pragma clang diagnostic ignored "-Wc99-designator" - -// Python -#define PY_SSIZE_T_CLEAN -#include -#include - -struct PyIOContext { - PyObject_HEAD; -}; - -static int PyIOContext_init(PyIOContext* self, PyObject* args, PyObject* kwds) { - return 0; // todo -} - -static void PyIOContext_dealloc(PyIOContext* self) { - // todo -} - -static PyObject* PyIOContext_repr(PyIOContext* self) { - Py_RETURN_NONE; // todo -} - -static PyMethodDef PyIOContext_methods[] = {{nullptr, nullptr, 0, nullptr}}; - -// clang-format off -static PyTypeObject PyIOContextType = { - PyVarObject_HEAD_INIT(NULL, 0) - .tp_name = "ymq.IOContext", - .tp_doc = PyDoc_STR("IOContext"), - .tp_basicsize = sizeof(PyIOContext), - .tp_itemsize = 0, - .tp_flags = Py_TPFLAGS_DEFAULT, - .tp_new = PyType_GenericNew, - .tp_init = (initproc)PyIOContext_init, - .tp_repr = (reprfunc)PyIOContext_repr, - .tp_dealloc = (destructor)PyIOContext_dealloc, - .tp_methods = PyIOContext_methods, -}; -// clang-format on diff --git a/scaler/io/ymq/pymod_ymq/io_context.h b/scaler/io/ymq/pymod_ymq/io_context.h new file mode 100644 index 000000000..7374885ff --- /dev/null +++ b/scaler/io/ymq/pymod_ymq/io_context.h @@ -0,0 +1,172 @@ +#pragma once + +// Python +#define PY_SSIZE_T_CLEAN +#include +#include + +// C++ +#include + +// First-party +#include "scaler/io/ymq/io_context.h" +#include "scaler/io/ymq/pymod_ymq/io_socket.h" +#include "scaler/io/ymq/pymod_ymq/ymq.h" + +struct PyIOContext { + PyObject_HEAD; + std::shared_ptr ioContext; +}; + +extern "C" { + +static int PyIOContext_init(PyIOContext* self, PyObject* args, PyObject* kwds) { + PyObject* numThreadsObj = nullptr; + const char* kwlist[] = {"num_threads", nullptr}; + if (!PyArg_ParseTupleAndKeywords(args, kwds, "|O", (char**)kwlist, &numThreadsObj)) { + return -1; // Error parsing arguments + } + + size_t numThreads = 1; // Default to 1 thread if not specified + + if (numThreadsObj) { + if (!PyLong_Check(numThreadsObj)) { + PyErr_SetString(PyExc_TypeError, "num_threads must be an integer"); + return -1; + } + numThreads = PyLong_AsSize_t(numThreadsObj); + if (numThreads == static_cast(-1) && PyErr_Occurred()) { + PyErr_SetString(PyExc_RuntimeError, "Failed to convert num_threads to size_t"); + return -1; + } + if (numThreads <= 0) { + PyErr_SetString(PyExc_ValueError, "num_threads must be greater than 0"); + return -1; + } + } + + self->ioContext = std::make_shared(numThreads); + return 0; +} + +static void PyIOContext_dealloc(PyIOContext* self) { + // Python knows nothing about C++, so we need to manually call destructors + self->ioContext.~shared_ptr(); // Call the destructor of shared_ptr + Py_TYPE(self)->tp_free((PyObject*)self); // Free the PyObject +} + +static PyObject* PyIOContext_repr(PyIOContext* self) { + return PyUnicode_FromFormat("", (void*)self->ioContext.get()); +} + +// todo: how to parse keyword arguments? +// https://docs.python.org/3/c-api/structures.html#c.METH_METHOD +// https://docs.python.org/3.10/c-api/call.html#vectorcall +// https://peps.python.org/pep-0590/ +static PyObject* PyIOContext_createIOSocket( + PyIOContext* self, PyTypeObject* clazz, PyObject* const* args, Py_ssize_t nargs, PyObject* kwnames) { + if (nargs != 2) { + PyErr_SetString(PyExc_TypeError, "createIOSocket() requires exactly two arguments: identity and socket_type"); + return nullptr; + } + + PyObject *pyIdentity = args[0], *pySocketType = args[1]; + + if (!PyUnicode_Check(pyIdentity)) { + PyErr_SetString(PyExc_TypeError, "Expected identity to be a string"); + return nullptr; + } + + // get the module state from the class + YmqState* state = (YmqState*)PyType_GetModuleState(clazz); + + if (!state) { + // PyErr_SetString(PyExc_RuntimeError, "Failed to get module state"); + return nullptr; + } + + if (!PyObject_IsInstance(pySocketType, state->ioSocketTypeEnum)) { + PyErr_SetString(PyExc_TypeError, "Expected socket_type to be an instance of IOSocketType"); + return nullptr; + } + + Py_ssize_t identitySize; + const char* identityCStr = PyUnicode_AsUTF8AndSize(pyIdentity, &identitySize); + + if (!identityCStr) { + PyErr_SetString(PyExc_TypeError, "Failed to convert identity to string"); + return nullptr; + } + + PyObject* value = PyObject_GetAttrString(pySocketType, "value"); + + if (!value) { + PyErr_SetString(PyExc_TypeError, "Failed to get value from socket_type"); + return nullptr; + } + + if (!PyLong_Check(value)) { + PyErr_SetString(PyExc_TypeError, "Expected socket_type to be an integer"); + Py_DECREF(value); + return nullptr; + } + + long socketTypeValue = PyLong_AsLong(value); + + if (socketTypeValue < 0 && PyErr_Occurred()) { + PyErr_SetString(PyExc_TypeError, "Failed to convert socket_type to integer"); + Py_DECREF(value); + return nullptr; + } + + Py_DECREF(value); + + Identity identity(identityCStr, identitySize); + IOSocketType socketType = static_cast(socketTypeValue); + + PyIOSocket* ioSocket = (PyIOSocket*)PyObject_CallObject((PyObject*)state->PyIOSocketType, nullptr); + if (!ioSocket) { + PyErr_SetString(PyExc_RuntimeError, "Failed to create IOSocket instance"); + return nullptr; + } + + ioSocket->socket = self->ioContext->createIOSocket(identity, socketType); + return (PyObject*)ioSocket; +} + +static PyObject* PyIOContext_numThreads_getter(PyIOContext* self, void* /*closure*/) { + return PyLong_FromSize_t(self->ioContext->numThreads()); +} +} + +static PyMethodDef PyIOContext_methods[] = { + {"createIOSocket", + (PyCFunction)PyIOContext_createIOSocket, + METH_METHOD | METH_FASTCALL | METH_KEYWORDS, + PyDoc_STR("Create a new IOSocket")}, + {nullptr, nullptr, 0, nullptr}}; + +static PyGetSetDef PyIOContext_properties[] = { + {"num_threads", + (getter)PyIOContext_numThreads_getter, + nullptr, + PyDoc_STR("Get the number of threads in the IOContext"), + nullptr}, + {nullptr, nullptr, nullptr, nullptr, nullptr}}; + +static PyType_Slot PyIOContext_slots[] = { + {Py_tp_init, (void*)PyIOContext_init}, + {Py_tp_dealloc, (void*)PyIOContext_dealloc}, + {Py_tp_repr, (void*)PyIOContext_repr}, + {Py_tp_methods, (void*)PyIOContext_methods}, + {Py_tp_getset, (void*)PyIOContext_properties}, + {0, nullptr}, +}; + +static PyType_Spec PyIOContext_spec = { + .name = "ymq.IOContext", + .basicsize = sizeof(PyIOContext), + .itemsize = 0, + .flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_IMMUTABLETYPE, + .slots = PyIOContext_slots, +}; diff --git a/scaler/io/ymq/pymod_ymq/io_socket.cpp b/scaler/io/ymq/pymod_ymq/io_socket.cpp deleted file mode 100644 index 4e9974abc..000000000 --- a/scaler/io/ymq/pymod_ymq/io_socket.cpp +++ /dev/null @@ -1,49 +0,0 @@ -// allows us to define PyTypeObjects in the canonical way without warnings -#pragma clang diagnostic ignored "-Wreorder-init-list" -#pragma clang diagnostic ignored "-Wc99-designator" - -// Python -#define PY_SSIZE_T_CLEAN -#include -#include - -struct PyIOSocket { - PyObject_HEAD; -}; - -static int PyIOSocket_init(PyIOSocket* self, PyObject* args, PyObject* kwds) { - return 0; // todo -} - -static void PyIOSocket_dealloc(PyIOSocket* self) { - // todo -} - -static PyObject* PyIOSocket_repr(PyIOSocket* self) { - Py_RETURN_NONE; // todo -} - -static PyObject* PyIOSocket_identity_getter(PyIOSocket* self, void* closure) { - Py_RETURN_NONE; // todo -} - -static PyGetSetDef PyIOSocket_properties[] = {{nullptr, nullptr, nullptr, nullptr, nullptr}}; - -static PyMethodDef PyIOSocket_methods[] = {{nullptr, nullptr, 0, nullptr}}; - -// clang-format off -static PyTypeObject PyIOSocketType = { - PyVarObject_HEAD_INIT(NULL, 0) - .tp_name = "ymq.IOSocket", - .tp_doc = PyDoc_STR("IOSocket"), - .tp_basicsize = sizeof(PyIOSocket), - .tp_itemsize = 0, - .tp_flags = Py_TPFLAGS_DEFAULT, - .tp_new = PyType_GenericNew, - .tp_init = (initproc)PyIOSocket_init, - .tp_repr = (reprfunc)PyIOSocket_repr, - .tp_dealloc = (destructor)PyIOSocket_dealloc, - .tp_getset = PyIOSocket_properties, - .tp_methods = PyIOSocket_methods, -}; -// clang-format on diff --git a/scaler/io/ymq/pymod_ymq/io_socket.h b/scaler/io/ymq/pymod_ymq/io_socket.h new file mode 100644 index 000000000..17b893e4a --- /dev/null +++ b/scaler/io/ymq/pymod_ymq/io_socket.h @@ -0,0 +1,104 @@ +#pragma once + +// Python +#define PY_SSIZE_T_CLEAN +#include +#include + +// C++ +#include + +// First-party +#include "scaler/io/ymq/io_socket.h" +#include "scaler/io/ymq/pymod_ymq/ymq.h" + +struct PyIOSocket { + PyObject_HEAD; + std::shared_ptr socket; +}; + +extern "C" { + +static int PyIOSocket_init(PyIOSocket* self, PyObject* args, PyObject* kwds) { + return 0; +} + +static void PyIOSocket_dealloc(PyIOSocket* self) { + self->socket.~shared_ptr(); // Call the destructor of shared_ptr + Py_TYPE(self)->tp_free((PyObject*)self); // Free the PyObject +} + +static PyObject* PyIOSocket_send(PyIOSocket* self, PyObject* args, PyObject* kwargs) { + PyErr_SetString(PyExc_NotImplementedError, "send() is not implemented yet"); + return nullptr; + + // in this function we need to: + // 1. create an asyncio future (easy) + // - should be pretty easy, just call standard methods + // 2. create a handle to that future that can be completed in a C++ callback (harder) + // - how do we call Python from non-Python threads? where is the handle stored? + // 3. await the future (very hard) + // - how do you await in a C extension module? is it even possible? + // - might need to call into Python code to make this work +} + +static PyObject* PyIOSocket_repr(PyIOSocket* self) { + return PyUnicode_FromFormat("", (void*)self->socket.get()); +} + +static PyObject* PyIOSocket_identity_getter(PyIOSocket* self, void* closure) { + return PyUnicode_FromStringAndSize(self->socket->identity().data(), self->socket->identity().size()); +} + +static PyObject* PyIOSocket_socket_type_getter(PyIOSocket* self, void* closure) { + // replace with PyType_GetModuleByDef(Py_TYPE(self), &ymq_module) in a newer Python version + // https://docs.python.org/3/c-api/type.html#c.PyType_GetModuleByDef + PyObject* module = PyType_GetModule(Py_TYPE(self)); + if (!module) { + PyErr_SetString(PyExc_RuntimeError, "Failed to get module for Message type"); + return nullptr; + } + + auto state = (YmqState*)PyModule_GetState(module); + if (!state) { + PyErr_SetString(PyExc_RuntimeError, "Failed to get module state"); + return nullptr; + } + + IOSocketType socketType = self->socket->socketType(); + PyObject* socketTypeObj = PyLong_FromLong((long)socketType); + + if (!socketTypeObj) { + PyErr_SetString(PyExc_RuntimeError, "Failed to convert socket type to a Python integer"); + return nullptr; + } + + return socketTypeObj; +} +} + +static PyGetSetDef PyIOSocket_properties[] = { + {"identity", (getter)PyIOSocket_identity_getter, nullptr, PyDoc_STR("Get the identity of the IOSocket"), nullptr}, + {"socket_type", (getter)PyIOSocket_socket_type_getter, nullptr, PyDoc_STR("Get the type of the IOSocket"), nullptr}, + {nullptr, nullptr, nullptr, nullptr, nullptr}}; + +static PyMethodDef PyIOSocket_methods[] = { + {"send", (PyCFunction)PyIOSocket_send, METH_VARARGS | METH_KEYWORDS, PyDoc_STR("Send data through the IOSocket")}, + {nullptr, nullptr, 0, nullptr}}; + +static PyType_Slot PyIOSocket_slots[] = { + {Py_tp_init, (void*)PyIOSocket_init}, + {Py_tp_dealloc, (void*)PyIOSocket_dealloc}, + {Py_tp_repr, (void*)PyIOSocket_repr}, + {Py_tp_getset, (void*)PyIOSocket_properties}, + {Py_tp_methods, (void*)PyIOSocket_methods}, + {0, nullptr}, +}; + +static PyType_Spec PyIOSocket_spec = { + .name = "ymq.IOSocket", + .basicsize = sizeof(PyIOSocket), + .itemsize = 0, + .flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_IMMUTABLETYPE, + .slots = PyIOSocket_slots, +}; diff --git a/scaler/io/ymq/pymod_ymq/message.cpp b/scaler/io/ymq/pymod_ymq/message.cpp deleted file mode 100644 index 7b9ab6184..000000000 --- a/scaler/io/ymq/pymod_ymq/message.cpp +++ /dev/null @@ -1,42 +0,0 @@ -// allows us to define PyTypeObjects in the canonical way without warnings -#pragma clang diagnostic ignored "-Wreorder-init-list" -#pragma clang diagnostic ignored "-Wc99-designator" - -// Python -#define PY_SSIZE_T_CLEAN -#include -#include - -struct PyMessage { - PyObject_HEAD; -}; - -static int PyMessage_init(PyMessage* self, PyObject* args, PyObject* kwds) { - return 0; // todo -} - -static void PyMessage_dealloc(PyMessage* self) { - // todo -} - -static PyObject* PyMessage_repr(PyMessage* self) { - Py_RETURN_NONE; // todo -} - -static PyMemberDef PyMessage_members[] = {{nullptr}}; - -// clang-format off -static PyTypeObject PyMessageType = { - PyVarObject_HEAD_INIT(NULL, 0) - .tp_name = "ymq.Message", - .tp_doc = PyDoc_STR("Message"), - .tp_basicsize = sizeof(PyMessage), - .tp_itemsize = 0, - .tp_flags = Py_TPFLAGS_DEFAULT, - .tp_new = PyType_GenericNew, - .tp_init = (initproc)PyMessage_init, - .tp_dealloc = (destructor)PyMessage_dealloc, - .tp_repr = (reprfunc)PyMessage_repr, - .tp_members = PyMessage_members, -}; -// clang-format on diff --git a/scaler/io/ymq/pymod_ymq/message.h b/scaler/io/ymq/pymod_ymq/message.h new file mode 100644 index 000000000..c0d589a0a --- /dev/null +++ b/scaler/io/ymq/pymod_ymq/message.h @@ -0,0 +1,96 @@ +#pragma once + +// Python +#define PY_SSIZE_T_CLEAN +#include +#include + +// First-party +#include "scaler/io/ymq/pymod_ymq/bytes.h" +#include "scaler/io/ymq/pymod_ymq/ymq.h" + +struct PyMessage { + PyObject_HEAD; + PyBytesYmq* address; // Address of the message + PyBytesYmq* payload; // Payload of the message +}; + +extern "C" { + +static int PyMessage_init(PyMessage* self, PyObject* args, PyObject* kwds) { + // replace with PyType_GetModuleByDef(Py_TYPE(self), &ymq_module) in a newer Python version + // https://docs.python.org/3/c-api/type.html#c.PyType_GetModuleByDef + PyObject* module = PyType_GetModule(Py_TYPE(self)); + if (!module) { + PyErr_SetString(PyExc_RuntimeError, "Failed to get module for Message type"); + return -1; + } + + auto state = (YmqState*)PyModule_GetState(module); + if (!state) { + PyErr_SetString(PyExc_RuntimeError, "Failed to get module state"); + return -1; + } + + PyObject *address = nullptr, *payload = nullptr; + const char* keywords[] = {"address", "payload", nullptr}; + if (!PyArg_ParseTupleAndKeywords(args, kwds, "OO", (char**)keywords, &address, &payload)) { + PyErr_SetString(PyExc_TypeError, "Expected two Bytes objects: address and payload"); + return -1; + } + + // check if the address and payload are of type PyBytesYmq + if (!PyObject_IsInstance(address, state->PyBytesYmqType)) { + PyObject* args = PyTuple_Pack(1, address); + address = PyObject_CallObject(state->PyBytesYmqType, args); + Py_DECREF(args); + + if (!address) { + return -1; + } + } + + if (!PyObject_IsInstance(payload, state->PyBytesYmqType)) { + PyObject* args = PyTuple_Pack(1, payload); + payload = PyObject_CallObject(state->PyBytesYmqType, args); + Py_DECREF(args); + + if (!payload) { + return -1; + } + } + + self->address = (PyBytesYmq*)address; + self->payload = (PyBytesYmq*)payload; + + return 0; // todo +} + +static void PyMessage_dealloc(PyMessage* self) { + Py_XDECREF(self->address); + Py_XDECREF(self->payload); + Py_TYPE(self)->tp_free(self); +} + +static PyObject* PyMessage_repr(PyMessage* self) { + return PyUnicode_FromFormat("", self->address, self->payload); +} +} + +static PyMemberDef PyMessage_members[] = {{nullptr}}; + +static PyType_Slot PyMessage_slots[] = { + {Py_tp_init, (void*)PyMessage_init}, + {Py_tp_dealloc, (void*)PyMessage_dealloc}, + {Py_tp_repr, (void*)PyMessage_repr}, + {Py_tp_members, (void*)PyMessage_members}, + {0, nullptr}, +}; + +static PyType_Spec PyMessage_spec = { + .name = "ymq.Message", + .basicsize = sizeof(PyMessage), + .itemsize = 0, + .flags = Py_TPFLAGS_DEFAULT | Py_TPFLAGS_IMMUTABLETYPE, + .slots = PyMessage_slots, +}; diff --git a/scaler/io/ymq/pymod_ymq/ymq.cpp b/scaler/io/ymq/pymod_ymq/ymq.cpp index 3f493e9e4..30b8f07a9 100644 --- a/scaler/io/ymq/pymod_ymq/ymq.cpp +++ b/scaler/io/ymq/pymod_ymq/ymq.cpp @@ -1,65 +1,4 @@ -// Python -#define PY_SSIZE_T_CLEAN -#include -#include - -// First-Party -#include "scaler/io/ymq/pymod_ymq/bytes.cpp" -#include "scaler/io/ymq/pymod_ymq/io_context.cpp" -#include "scaler/io/ymq/pymod_ymq/io_socket.cpp" -#include "scaler/io/ymq/pymod_ymq/message.cpp" - -struct YmqState {}; - -static int ymq_exec(PyObject* module) { - if (PyType_Ready(&PyBytesYmqType) < 0) - return -1; - - if (PyModule_AddObjectRef(module, "Bytes", (PyObject*)&PyBytesYmqType) < 0) - return -1; - - if (PyType_Ready(&PyMessageType) < 0) - return -1; - - if (PyModule_AddObjectRef(module, "Message", (PyObject*)&PyMessageType) < 0) - return -1; - - if (PyType_Ready(&PyIOSocketType) < 0) - return -1; - - if (PyModule_AddObjectRef(module, "IoSocket", (PyObject*)&PyIOSocketType) < 0) - return -1; - - if (PyType_Ready(&PyIOContextType) < 0) - return -1; - - if (PyModule_AddObjectRef(module, "IoContext", (PyObject*)&PyIOContextType) < 0) - return -1; - - auto state = (YmqState*)PyModule_GetState(module); - - // todo! - - return 0; -} - -static PyMethodDef ymq_methods[] = {{NULL, NULL, 0, NULL}}; - -static PyModuleDef_Slot ymq_slots[] = {{Py_mod_exec, (void*)ymq_exec}, {0, NULL}}; - -void ymq_free(YmqState* state) { - // todo -} - -static PyModuleDef ymq_module = { - .m_base = PyModuleDef_HEAD_INIT, - .m_name = "ymq", - .m_doc = PyDoc_STR("YMQ Python bindings"), - .m_size = sizeof(YmqState), - .m_methods = ymq_methods, - .m_slots = ymq_slots, - .m_free = (freefunc)ymq_free, -}; +#include "ymq.h" PyMODINIT_FUNC PyInit_ymq(void) { return PyModuleDef_Init(&ymq_module); diff --git a/scaler/io/ymq/pymod_ymq/ymq.h b/scaler/io/ymq/pymod_ymq/ymq.h new file mode 100644 index 000000000..79d3a7e44 --- /dev/null +++ b/scaler/io/ymq/pymod_ymq/ymq.h @@ -0,0 +1,181 @@ +#pragma once + +// Python +#define PY_SSIZE_T_CLEAN +#include +#include + +struct YmqState { + PyObject* enumModule; // Reference to the enum module + PyObject* ioSocketTypeEnum; // Reference to the IOSocketType enum + PyObject* PyBytesYmqType; // Reference to the BytesYmq type + PyObject* PyMessageType; // Reference to the Message type + PyObject* PyIOSocketType; // Reference to the IOSocket type + PyObject* PyIOContextType; // Reference to the IOContext type +}; + +// C++ +#include +#include + +// First-Party +#include "scaler/io/ymq/io_socket.h" +#include "scaler/io/ymq/pymod_ymq/bytes.h" +#include "scaler/io/ymq/pymod_ymq/io_context.h" +#include "scaler/io/ymq/pymod_ymq/io_socket.h" +#include "scaler/io/ymq/pymod_ymq/message.h" + +extern "C" { + +static void ymq_free(YmqState* state) { + Py_XDECREF(state->enumModule); + Py_XDECREF(state->ioSocketTypeEnum); + Py_XDECREF(state->PyBytesYmqType); + Py_XDECREF(state->PyMessageType); + Py_XDECREF(state->PyIOSocketType); + Py_XDECREF(state->PyIOContextType); + + state->enumModule = nullptr; + state->ioSocketTypeEnum = nullptr; + state->PyBytesYmqType = nullptr; + state->PyMessageType = nullptr; + state->PyIOSocketType = nullptr; + state->PyIOContextType = nullptr; +} + +static int ymq_createIntEnum(PyObject* module, std::string enumName, std::vector> entries) { + // create a python dictionary to hold the entries + auto enumDict = PyDict_New(); + if (!enumDict) { + PyErr_SetString(PyExc_RuntimeError, "Failed to create enum dictionary"); + return -1; + } + + // add each entry to the dictionary + for (const auto& entry: entries) { + PyObject* value = PyLong_FromLong(entry.second); + if (!value) { + PyErr_SetString(PyExc_RuntimeError, "Failed to create enum value"); + Py_DECREF(enumDict); + return -1; + } + + if (PyDict_SetItemString(enumDict, entry.first.c_str(), value) < 0) { + Py_DECREF(value); + Py_DECREF(enumDict); + PyErr_SetString(PyExc_RuntimeError, "Failed to set item in enum dictionary"); + return -1; + } + Py_DECREF(value); + } + + auto state = (YmqState*)PyModule_GetState(module); + + if (!state) { + PyErr_SetString(PyExc_RuntimeError, "Failed to get module state"); + Py_DECREF(enumDict); + return -1; + } + + // create our class by calling enum.IntEnum(enumName, enumDict) + auto ioSocketTypeEnum = PyObject_CallMethod(state->enumModule, "IntEnum", "sO", enumName.c_str(), enumDict); + Py_DECREF(enumDict); + + if (!ioSocketTypeEnum) { + PyErr_SetString(PyExc_RuntimeError, "Failed to create IntEnum class"); + return -1; + } + + state->ioSocketTypeEnum = ioSocketTypeEnum; + + // add the class to the module + // this increments the reference count of enumClass + if (PyModule_AddObjectRef(module, enumName.c_str(), ioSocketTypeEnum) < 0) { + PyErr_SetString(PyExc_RuntimeError, "Failed to add IntEnum class to module"); + Py_DECREF(ioSocketTypeEnum); + return -1; + } + + return 0; +} + +static int ymq_createIOSocketTypeEnum(PyObject* module) { + std::vector> ioSocketTypes = { + {"Binder", (int)IOSocketType::Binder}, + {"Sub", (int)IOSocketType::Sub}, + {"Pub", (int)IOSocketType::Pub}, + {"Dealer", (int)IOSocketType::Dealer}, + {"Router", (int)IOSocketType::Router}, + {"Pair", (int)IOSocketType::Pair}}; + + if (ymq_createIntEnum(module, "IOSocketType", ioSocketTypes) < 0) { + PyErr_SetString(PyExc_RuntimeError, "Failed to create IOSocketType enum"); + return -1; + } + + return 0; +} + +static int ymq_createType(PyObject* module, PyObject** storage, PyType_Spec* spec, const char* name) { + *storage = PyType_FromModuleAndSpec(module, spec, nullptr); + + if (!*storage) { + PyErr_SetString(PyExc_RuntimeError, "Failed to create type from spec"); + return -1; + } + + if (PyModule_AddObjectRef(module, name, *storage) < 0) { + PyErr_SetString(PyExc_RuntimeError, "Failed to add type to module"); + Py_DECREF(*storage); + return -1; + } + + return 0; +} + +static int ymq_exec(PyObject* module) { + auto state = (YmqState*)PyModule_GetState(module); + + if (!state) { + PyErr_SetString(PyExc_RuntimeError, "Failed to get module state"); + return -1; + } + + state->enumModule = PyImport_ImportModule("enum"); + + if (!state->enumModule) { + PyErr_SetString(PyExc_RuntimeError, "Failed to import enum module"); + return -1; + } + + if (ymq_createIOSocketTypeEnum(module) < 0) + return -1; + + if (ymq_createType(module, &state->PyBytesYmqType, &PyBytesYmq_spec, "Bytes") < 0) + return -1; + + if (ymq_createType(module, &state->PyMessageType, &PyMessage_spec, "Message") < 0) + return -1; + + if (ymq_createType(module, &state->PyIOSocketType, &PyIOSocket_spec, "IOSocket") < 0) + return -1; + + if (ymq_createType(module, &state->PyIOContextType, &PyIOContext_spec, "IOContext") < 0) + return -1; + + return 0; +} +} + +static PyModuleDef_Slot ymq_slots[] = {{Py_mod_exec, (void*)ymq_exec}, {0, NULL}}; + +static PyModuleDef ymq_module = { + .m_base = PyModuleDef_HEAD_INIT, + .m_name = "ymq", + .m_doc = PyDoc_STR("YMQ Python bindings"), + .m_size = sizeof(YmqState), + .m_slots = ymq_slots, + .m_free = (freefunc)ymq_free, +}; + +PyMODINIT_FUNC PyInit_ymq(void); diff --git a/scaler/io/ymq/readme.md b/scaler/io/ymq/readme.md index 74c2d74bd..8e55b461a 100644 --- a/scaler/io/ymq/readme.md +++ b/scaler/io/ymq/readme.md @@ -5,24 +5,33 @@ Welcome. This file contains schedule for each day, for each person. Each person maintains a todo and done list. +## gxu +### DONE -# gxu +- CMake integration, generate C++ stuff in build dir +- Basic coroutine API +- develop the so-called coro_context and their utility DEAD +- write up interfaces(not impl) that uses coroutine DEAD ## DONE: - CMake integration, generate C++ stuff in build dir - Basic coroutine API - develop the so-called coro_context and their utility DEAD - write up interfaces(not impl) that uses coroutine DEAD + - Use unified file path (only include dir is project dir) ## TODO: - Add example usage of this lib to scaler/tests - Start organize files that they can be compiled in one go + - Implement the detail of IOSocket, especially `onAdded` behavior +## magniloquency -# PUT NAME HERE +### DONE -## DONE: +- CPython module stub -## TODO: +### TODO +- ? diff --git a/scaler/io/ymq/tcp_client.cpp b/scaler/io/ymq/tcp_client.cpp new file mode 100644 index 000000000..c3340b22d --- /dev/null +++ b/scaler/io/ymq/tcp_client.cpp @@ -0,0 +1,5 @@ +#include "scaler/io/ymq/tcp_client.h" + +void TcpClient::onCreated() { + printf("tcpClient onAdded\n"); +} diff --git a/scaler/io/ymq/tcp_client.h b/scaler/io/ymq/tcp_client.h index 0ee11009a..b34d9565b 100644 --- a/scaler/io/ymq/tcp_client.h +++ b/scaler/io/ymq/tcp_client.h @@ -5,13 +5,19 @@ #include // First-party -#include "scaler/io/ymq/event_loop_thread.h" #include "scaler/io/ymq/event_manager.h" #include "scaler/io/ymq/file_descriptor.h" +// #include "event_manager.hpp" +// #include "scaler/io/ymq/event_loop_thread.h" + +class EventLoopThread; + +class EventLoopThread; +class EventManager; class TcpClient { - EventLoopThread& eventLoop; /* shared ownership */ - std::unique_ptr eventManager; + std::shared_ptr eventLoop; /* shared ownership */ + // std::unique_ptr eventManager; // Implementation defined method. connect(3) should happen here. // This function will call user defined onConnectReturn() // It will handle error it can handle. If it is unreasonable to @@ -21,9 +27,13 @@ class TcpClient { public: TcpClient(const TcpClient&) = delete; TcpClient& operator=(const TcpClient&) = delete; + // TODO: Modify this behavior + TcpClient(std::shared_ptr eventLoop): eventLoop(eventLoop) {} using ConnectReturnCallback = std::function; ConnectReturnCallback onConnectReturn; + void onCreated(); + void retry(/* Arguments */); }; diff --git a/scaler/io/ymq/tcp_server.cpp b/scaler/io/ymq/tcp_server.cpp new file mode 100644 index 000000000..c7410edf9 --- /dev/null +++ b/scaler/io/ymq/tcp_server.cpp @@ -0,0 +1,47 @@ +#include "scaler/io/ymq/tcp_server.h" + +#include +#include +#include +#include + +#include + +#include "scaler/io/ymq/event_manager.h" + +static int create_and_bind_socket() { + int server_fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0); + if (server_fd == -1) { + perror("socket"); + return -1; + } + + sockaddr_in addr {}; + addr.sin_family = AF_INET; + addr.sin_port = htons(8080); + addr.sin_addr.s_addr = INADDR_ANY; + + if (bind(server_fd, (struct sockaddr*)&addr, sizeof(addr)) == -1) { + perror("bind"); + close(server_fd); + return -1; + } + + if (listen(server_fd, SOMAXCONN) == -1) { + perror("listen"); + close(server_fd); + return -1; + } + + return server_fd; +} + +TcpServer::TcpServer(std::shared_ptr eventLoop): eventLoop(eventLoop) { + eventManager = std::make_unique(EventManager()); + serverFd = create_and_bind_socket(); +} + +void TcpServer::onCreated() { + printf("TcpServer::onAdded()\n"); + eventLoop->eventLoop.registerEventManager(*this->eventManager.get()); +} diff --git a/scaler/io/ymq/tcp_server.h b/scaler/io/ymq/tcp_server.h index c97b44c02..6df5659d5 100644 --- a/scaler/io/ymq/tcp_server.h +++ b/scaler/io/ymq/tcp_server.h @@ -5,14 +5,18 @@ #include // First-party -#include "scaler/io/ymq/event_loop_thread.h" -#include "scaler/io/ymq/event_manager.h" #include "scaler/io/ymq/file_descriptor.h" +// #include "event_loop_thread.hpp" +// #include "event_manager.hpp" + +class EventLoopThread; +class EventManager; class TcpServer { - EventLoopThread& eventLoop; // eventLoop thread will call onRead that is associated w/ the eventManager + // eventLoop thread will call onRead that is associated w/ the eventManager + std::shared_ptr eventLoop; std::unique_ptr eventManager; // will copy the `onRead()` to itself - FileDescriptor fd; + int serverFd; // Implementation defined method. accept(3) should happen here. // This function will call user defined onAcceptReturn() // It will handle error it can handle. If it is unreasonable to @@ -23,6 +27,10 @@ class TcpServer { TcpServer(const TcpServer&) = delete; TcpServer& operator=(const TcpServer&) = delete; + // TODO: Modify the behavior of default ctor + TcpServer(std::shared_ptr eventLoop); + using AcceptReturnCallback = std::function; AcceptReturnCallback onAcceptReturn; + void onCreated(); }; diff --git a/scaler/io/ymq/timed_concurrent_queue.h b/scaler/io/ymq/timed_concurrent_queue.h deleted file mode 100644 index 4db695719..000000000 --- a/scaler/io/ymq/timed_concurrent_queue.h +++ /dev/null @@ -1 +0,0 @@ -class TimedConcurrentQueue {}; diff --git a/scaler/io/ymq/timed_queue.h b/scaler/io/ymq/timed_queue.h new file mode 100644 index 000000000..bd47e780a --- /dev/null +++ b/scaler/io/ymq/timed_queue.h @@ -0,0 +1,53 @@ +#pragma once + +#include +#include + +#include +#include + +#include "scaler/io/ymq/timestamp.h" + +inline int createTimerfd() { + int timerfd = ::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); + if (timerfd < 0) { + exit(1); + } + return timerfd; +} + +// TODO: HANDLE ERRS +struct TimedQueue { + int timer_fd; + using callback_t = std::function; + using timed_fn = std::pair; + using cmp = decltype([](const auto& x, const auto& y) { return x.first < y.first; }); + + std::priority_queue, cmp> pq; + + TimedQueue(): timer_fd(createTimerfd()) {} + + void push(Timestamp timestamp, callback_t cb) { + if (pq.size() && timestamp < pq.top().first) { + auto ts = convertToItimerspec(timestamp); + int ret = timerfd_settime(timer_fd, 0, &ts, nullptr); + } + + pq.push({timestamp, cb}); + } + + void onRead() { + uint64_t numItems; + ssize_t n = read(timer_fd, &numItems, sizeof numItems); + + Timestamp now; + while (pq.size()) { + auto [ts, cb] = pq.top(); + if (ts < now) { + cb(); + pq.pop(); + } else + break; + } + } +}; diff --git a/scaler/io/ymq/timestamp.h b/scaler/io/ymq/timestamp.h new file mode 100644 index 000000000..25f268737 --- /dev/null +++ b/scaler/io/ymq/timestamp.h @@ -0,0 +1,45 @@ +#pragma once + +#include // itimerspec + +#include +#include // stringify + +// Simple timestamp utility +struct Timestamp { + std::chrono::time_point timestamp; + + friend std::strong_ordering operator<=>(Timestamp x, Timestamp y) { return x.timestamp <=> y.timestamp; } + + Timestamp(): timestamp(std::chrono::system_clock::now()) {} + Timestamp(std::chrono::time_point t) { timestamp = std::move(t); } + + template > + Timestamp createTimestampByOffsetDuration(std::chrono::duration offset) { + return {timestamp + offset}; + } +}; + +// For possibly logging purposes +inline std::string stringifyTimestamp(Timestamp ts) { + std::ostringstream oss; + oss << ts.timestamp; + return oss.str(); +} + +// For timerfd +inline itimerspec convertToItimerspec(Timestamp ts) { + using namespace std::chrono; + + itimerspec timerspec {}; + const auto duration = ts.timestamp - std::chrono::system_clock::now(); + if (duration.count() < 0) { + return timerspec; + } + + const auto secs = duration_cast(duration); + const auto nanosecs = duration_cast(duration - secs); + timerspec.it_value.tv_sec = secs.count(); + timerspec.it_value.tv_nsec = nanosecs.count(); + return timerspec; +} diff --git a/scaler/io/ymq/typedefs.h b/scaler/io/ymq/typedefs.h new file mode 100644 index 000000000..7b60a8e5c --- /dev/null +++ b/scaler/io/ymq/typedefs.h @@ -0,0 +1,6 @@ +#pragma once + +#include + +enum IOSocketType : uint8_t { Uninit, Binder, Sub, Pub, Dealer, Router, Pair /* etc. */ }; +enum Ownership { Owned, Borrowed }; diff --git a/scaler/io/ymq/ymq.pyi b/scaler/io/ymq/ymq.pyi new file mode 100644 index 000000000..564544438 --- /dev/null +++ b/scaler/io/ymq/ymq.pyi @@ -0,0 +1,24 @@ +# NOTE: NOT IMPLEMENTATION, TYPE INFORMATION ONLY +# This file contains type stubs for the Ymq Python C Extension module + +from enum import IntEnum + +class IOSocketType(IntEnum): + Uninit = 0 + Binder = 1 + Sub = 2 + Pub = 3 + Dealer = 4 + Router = 5 + Pair = 6 + +class IOContext: + num_threads: int + + def __init__(self, num_threads: int = 1) -> None: ... + + def createIOSocket(self, /, identity: str, socket_type: IOSocketType) -> IOSocket: ... + +class IOSocket: + identity: str + socket_type: IOSocketType diff --git a/scaler/io/ymq/ymq_test.py b/scaler/io/ymq/ymq_test.py new file mode 100644 index 000000000..5986b532f --- /dev/null +++ b/scaler/io/ymq/ymq_test.py @@ -0,0 +1,10 @@ +import ymq + +context = ymq.IOContext(num_threads=2) +socket = context.createIOSocket("my-socket", ymq.IOSocketType.Dealer) + +print(context, ";", socket) + +assert context.num_threads == 2 +assert socket.identity == "my-socket" +assert socket.socket_type == ymq.IOSocketType.Dealer diff --git a/scaler/object_storage/io_helper.h b/scaler/object_storage/io_helper.h index a95dc8fc9..00c6bd88f 100644 --- a/scaler/object_storage/io_helper.h +++ b/scaler/object_storage/io_helper.h @@ -1,5 +1,7 @@ #pragma once +#include + #include #include #include