Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
9632685
implement several classes
magniloquency Jun 5, 2025
4d2f1c4
uncomment method
magniloquency Jun 5, 2025
02517f8
clean
magniloquency Jun 5, 2025
54604a4
rework iocontext and eloop thread
magniloquency Jun 5, 2025
aca37a4
.
magniloquency Jun 5, 2025
cbc8b7d
rename fields
magniloquency Jun 5, 2025
b7de7e5
wip interruptive queue
magniloquency Jun 5, 2025
fc9854c
Merge pull request #140 from magniloquency/cpython-module-stub
magniloquency Jun 5, 2025
3596d28
Merge branch 'main' of https://github.com/Citi/scaler into cpython-work
magniloquency Jun 6, 2025
c7bac0c
implement cpython module
magniloquency Jun 6, 2025
6503de1
implement bytes
magniloquency Jun 6, 2025
a1a3d2b
fix build
magniloquency Jun 6, 2025
5c13db1
fix ymq free
magniloquency Jun 8, 2025
2395a25
extern c
magniloquency Jun 8, 2025
05c91de
remove unused pragmas, implement python bytes class
magniloquency Jun 8, 2025
c80a571
impl python message class, add pragma once, allow default constructin…
magniloquency Jun 8, 2025
bd65eae
begin writing createIOSocket() python method; work-in-progress
magniloquency Jun 8, 2025
ee12509
add comments and todo
magniloquency Jun 8, 2025
b283678
fix build, impl interrupt queue
magniloquency Jun 10, 2025
28e892f
change to using heap-allocated types and module state, closer to fixi…
magniloquency Jun 10, 2025
ab74c52
remove old file
magniloquency Jun 10, 2025
527f7aa
fix createIOSocket()
magniloquency Jun 10, 2025
1e74e34
stub send method
magniloquency Jun 10, 2025
248e839
clean up cmake file
magniloquency Jun 10, 2025
54364ff
migrate files to other PR
magniloquency Jun 10, 2025
747ffb7
Various setup (#144)
gxuu Jun 10, 2025
48a83f4
Merge branch 'main' of https://github.com/Citi/scaler into cpython-work
magniloquency Jun 11, 2025
6fb076f
add test file, type stubs, python module tester
magniloquency Jun 11, 2025
999e6d7
fix typo
magniloquency Jun 11, 2025
827c2d1
add comment
magniloquency Jun 11, 2025
6d27b58
Rename configuration and polling_context_t
magniloquency Jun 11, 2025
a8d1a7e
change m_ prefix to just _
magniloquency Jun 11, 2025
e6bc616
fix member var prefix
magniloquency Jun 11, 2025
34cb5db
delete timed concurrent queue file
magniloquency Jun 11, 2025
fa9cf2f
refactor ownership enum
magniloquency Jun 11, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/linter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ jobs:

- name: Build Object Storage Component
run: |
CXX=$(which clang++) ./scripts/build.sh
CXX=$(which g++) ./scripts/build.sh

- name: Install Python Dependent Packages
run: |
Expand Down
11 changes: 11 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@ else()
message(STATUS "${PROJECT_NAME} ${CMAKE_BUILD_TYPE} build")
endif()

# Compiler configuration
if(CMAKE_CXX_COMPILER_ID MATCHES "Clang")
message(STATUS "Using Clang: adding -stdlib=libc++")
add_compile_options(-stdlib=libc++)
add_link_options(-stdlib=libc++)

# needed for jthread in libc++
add_compile_options(-fexperimental-library)
add_link_options(-fexperimental-library)
endif()

find_package(CapnProto CONFIG REQUIRED)
get_target_property(CAPNP_INCLUDE_DIRS CapnProto::capnp INTERFACE_INCLUDE_DIRECTORIES)
message(STATUS "Found Capnp in ${CAPNP_INCLUDE_DIRS}")
Expand Down
54 changes: 40 additions & 14 deletions scaler/io/ymq/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,30 +1,45 @@
add_library(cc_ymq SHARED)
add_library(ymq SHARED)

target_sources(cc_ymq PRIVATE
bytes.h
common.h
configuration.h
defs.h
main.h

epoll_context.h
epoll_context.cpp

event_loop_backend.h
event_loop.h
event_loop_thread.cpp

event_loop_thread.h
event_loop_thread.cpp

event_manager.h
file_descriptor.h
# file_descriptor.h

message_connection.h
message_connection_tcp.h

third_party/concurrentqueue.h
interruptive_concurrent_queue.h
io_context.cpp

typedefs.h

io_context.h
io_socket.cpp
io_context.cpp

io_socket.h
main.h
message_connection.h
message_connection_tcp.h
tcp_client.h
io_socket.cpp

tcp_server.h
third_party/concurrentqueue.h
timed_concurrent_queue.h
tcp_server.cpp

tcp_client.h
tcp_client.cpp

timestamp.h
timed_queue.h
)

set(CMAKE_LIBRARY_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR}/scaler/io/ymq)
Expand All @@ -34,9 +49,18 @@ install(TARGETS cc_ymq

find_package(Python COMPONENTS Development)

add_library(ymq SHARED)

set_target_properties(ymq PROPERTIES PREFIX "")
set_target_properties(ymq PROPERTIES LINKER_LANGUAGE CXX)

target_sources(ymq PRIVATE pymod_ymq/ymq.cpp)
target_sources(ymq PRIVATE pymod_ymq/bytes.h
pymod_ymq/message.h
pymod_ymq/io_context.h
pymod_ymq/io_socket.h
pymod_ymq/ymq.h
pymod_ymq/ymq.cpp
)
target_include_directories(ymq PRIVATE ${Python_INCLUDE_DIRS})
target_link_libraries(ymq PRIVATE cc_ymq
PRIVATE ${Python_LIBRARIES}
Expand All @@ -45,4 +69,6 @@ target_link_libraries(ymq PRIVATE cc_ymq
target_link_options(ymq PRIVATE "-Wl,-rpath,$ORIGIN")

install(TARGETS ymq
LIBRARY DESTINATION ${CMAKE_SOURCE_DIR}/scaler/lib)
LIBRARY DESTINATION scaler/io/ymq)

add_subdirectory(examples)
52 changes: 25 additions & 27 deletions scaler/io/ymq/bytes.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,77 +10,74 @@

// First-party
#include "scaler/io/ymq/common.h"
#include "scaler/io/ymq/typedefs.h"

class Bytes {
uint8_t* m_data;
size_t m_len;

enum Ownership { Owned, Borrowed } tag;
uint8_t* _data;
size_t _len;
Ownership _tag;

void free() {
if (tag != Owned)
if (_tag != Owned)
return;

if (is_empty())
return;

delete[] m_data;
this->m_data = NULL;
delete[] _data;
this->_data = NULL;
}

Bytes(uint8_t* m_data, size_t m_len, Ownership tag): m_data(m_data), m_len(m_len), tag(tag) {
if (tag == Owned && m_data == NULL)
panic("tried to create owned bytes with NULL m_data");
}
Bytes(uint8_t* m_data, size_t m_len, Ownership tag): _data(m_data), _len(m_len), _tag(tag) {}

public:
// move-only
// TODO: make copyable
Bytes(const Bytes&) = delete;
Bytes& operator=(const Bytes&) = delete;
Bytes(Bytes&& other) noexcept: m_data(other.m_data), m_len(other.m_len), tag(other.tag) {
other.m_data = NULL;
other.m_len = 0;
Bytes(Bytes&& other) noexcept: _data(other._data), _len(other._len), _tag(other._tag) {
other._data = NULL;
other._len = 0;
}
Bytes& operator=(Bytes&& other) noexcept {
if (this != &other) {
this->free(); // free current data

m_data = other.m_data;
m_len = other.m_len;
tag = other.tag;
_data = other._data;
_len = other._len;
_tag = other._tag;

other.m_data = NULL;
other.m_len = 0;
other._data = NULL;
other._len = 0;
}
return *this;
}

~Bytes() { this->free(); }

bool operator==(const Bytes& other) const {
if (m_len != other.m_len)
if (_len != other._len)
return false;

if (m_data == other.m_data)
if (_data == other._data)
return true;

return std::memcmp(m_data, other.m_data, m_len) == 0;
return std::memcmp(_data, other._data, _len) == 0;
}

bool operator!() const { return is_empty(); }

bool is_empty() const { return this->m_data == NULL; }
bool is_empty() const { return this->_data == NULL; }

// debugging utility
std::string as_string() const {
if (is_empty())
return "[EMPTY]";

return std::string((char*)m_data, m_len);
return std::string((char*)_data, _len);
}

Bytes ref() { return Bytes {this->m_data, this->m_len, Borrowed}; }
Bytes ref() { return Bytes {this->_data, this->_len, Borrowed}; }

static Bytes alloc(size_t m_len) {
if (m_len == 0)
Expand All @@ -102,7 +99,7 @@ class Bytes {
if (bytes.is_empty())
panic("tried to clone empty bytes");

return Bytes {datadup(bytes.m_data, bytes.m_len), bytes.m_len, Owned};
return Bytes {datadup(bytes._data, bytes._len), bytes._len, Owned};
}

// static Bytes from_buffer(Buffer& buffer) { return buffer.into_bytes(); }
Expand All @@ -123,7 +120,8 @@ class Bytes {
// return buffer;
// }

size_t len() const { return m_len; }
size_t len() const { return _len; }
const uint8_t* data() const { return _data; }

friend class Buffer;
};
13 changes: 7 additions & 6 deletions scaler/io/ymq/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const size_t HEADER_SIZE = 4; // size of the message header in bytes

using Errno = int;

void print_trace(void) {
inline void print_trace(void) {
void* array[10];
char** strings;
int size, i;
Expand All @@ -32,7 +32,8 @@ void print_trace(void) {

// this is an unrecoverable error that exits the program
// prints a message plus the source location
[[noreturn]] void panic(std::string message, const std::source_location& location = std::source_location::current()) {
[[noreturn]] inline void panic(
std::string message, const std::source_location& location = std::source_location::current()) {
auto file_name = std::string(location.file_name());
file_name = file_name.substr(file_name.find_last_of("/") + 1);

Expand All @@ -44,7 +45,7 @@ void print_trace(void) {
std::abort();
}

[[noreturn]] void todo(
[[noreturn]] inline void todo(
std::optional<std::string> message = std::nullopt,
const std::source_location& location = std::source_location::current()) {
if (message) {
Expand All @@ -54,19 +55,19 @@ void print_trace(void) {
}
}

uint8_t* datadup(const uint8_t* data, size_t len) {
inline uint8_t* datadup(const uint8_t* data, size_t len) {
uint8_t* dup = new uint8_t[len];
std::memcpy(dup, data, len);
return dup;
}

void serialize_u32(uint32_t x, uint8_t buffer[4]) {
inline void serialize_u32(uint32_t x, uint8_t buffer[4]) {
buffer[0] = x & 0xFF;
buffer[1] = (x >> 8) & 0xFF;
buffer[2] = (x >> 16) & 0xFF;
buffer[3] = (x >> 24) & 0xFF;
}

void deserialize_u32(const uint8_t buffer[4], uint32_t* x) {
inline void deserialize_u32(const uint8_t buffer[4], uint32_t* x) {
*x = buffer[0] | buffer[1] << 8 | buffer[2] << 16 | buffer[3] << 24;
}
11 changes: 7 additions & 4 deletions scaler/io/ymq/configuration.h
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
#pragma once

// First-party
#include "scaler/io/ymq/epoll_context.h"
// C++
#include <string>

struct configuration {
using polling_context_t = EpollContext;
class EpollContext;

struct Configuration {
using PollingContext = EpollContext;
using Identity = std::string;
};
1 change: 0 additions & 1 deletion scaler/io/ymq/defs.h

This file was deleted.

77 changes: 77 additions & 0 deletions scaler/io/ymq/epoll_context.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#include "scaler/io/ymq/epoll_context.h"
#include <format>
#include "scaler/io/ymq/common.h"

void EpollContext::registerEventManager(EventManager& em) {
epoll_event ev {
.events = EPOLLOUT | EPOLLIN | EPOLLET, // Edge-triggered
.data = {.ptr = &em},
};

epoll_fd.epoll_ctl(EPOLL_CTL_ADD, em._fd, &ev);
}

void EpollContext::removeEventManager(EventManager& em) {
epoll_fd.epoll_ctl(EPOLL_CTL_DEL, em._fd, nullptr);
}

void EpollContext::executePendingFunctors() {}

void EpollContext::loop() {
std::array<epoll_event, 1024> events;
auto result = epoll_fd.epoll_wait(events.data(), 1024, -1);

if (!result) {
panic(std::format("Failed to epoll_wait(): {}", result.error()));
};

for (auto it = events.begin(); it != events.begin() + *result; ++it) {
epoll_event current_event = *it;
if (current_event.events & EPOLLERR) {
// ...
}

auto* event = (EventManager*)current_event.data.ptr;
// event->onEvents(current_event.events);
}
executePendingFunctors();
}

// EXAMPLE
// epoll_wait;
// for each event that is returned to the caller {
// cast the event back to EventManager
// if this event is an eventfd {
// func = queue front
// func()
// } else if this event is an timerfd {
// if it is oneshot then execute once
// if it is multishot then execute and rearm timer
// } else {
// eventmanager.revent = events return by epoll
// eventmanager.on_event() ,
// where as on_event is set differently for tcpserver, tcpclient, and tcpconn

// they are defined something like:
// tcpserver.on_event() {
// accept the socket and generate a new tcpConn or handle err
// this.ioSocket.addNewConn(tcpConn)
// }
// tcpclient.on_event() {
// connect the socket and generate a new tcpConn
// this.ioSocket.addNewConn(tcpConn)
// if there need retry {
// close this socket
// this.eventloop.executeAfter(the time you want from now)
// }
// }
// tcpConn.on_event() {
// read everything you can to the buffer
// write everything you can to write the remote end
// if tcpconn.ioSocket is something special, for example dealer
// tcpConn.ioSocket.route to corresponding tcpConn
// }
//
// }

// }
Loading
Loading