Skip to content

Commit da33087

Browse files
committed
Add YMQ Tests
1 parent 9b030ca commit da33087

31 files changed

Lines changed: 1822 additions & 71 deletions

.github/actions/compile-library/action.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ runs:
1414
shell: bash
1515
run: |
1616
CXX=$(which g++-14) ./scripts/build.sh
17+
sudo ./scripts/test.sh
1718
1819
- name: Build and test C++ Components (Windows)
1920
if: inputs.os == 'Windows'

.github/actions/setup-env/action.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,12 @@ runs:
2323
sudo chmod 755 ./scripts/download_install_libraries.sh
2424
sudo chmod 755 ./scripts/build.sh
2525
26-
- name: Install Python Base Packages
26+
- name: Install Python Packages
2727
shell: bash
2828
run: |
2929
pip install uv
3030
uv pip install --system --upgrade pip
31+
uv pip install --system 'scapy==2.*'
3132
3233
- name: Cache Library Install
3334
if: inputs.os == 'Linux'

examples/cpp/ymq/automated_echo_client.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ int main()
7070
auto future = x.get_future();
7171
Message msg = future.get().first;
7272
if (msg.payload.as_string() != longStr) {
73-
printf("Checksum failed, %s\n", msg.payload.as_string().c_str());
73+
printf("Checksum failed, %s\n", msg.payload.as_string()->c_str());
7474
exit(1);
7575
}
7676
}

examples/task_capabilities.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -56,18 +56,12 @@ def main():
5656

5757
# Submit a task that requires GPU capabilities, this will be redirected to the GPU worker.
5858
gpu_future = client.submit_verbose(
59-
gpu_task,
60-
args=(16.0,),
61-
kwargs={},
62-
capabilities={"gpu": 1} # Requires a GPU capability
59+
gpu_task, args=(16.0,), kwargs={}, capabilities={"gpu": 1} # Requires a GPU capability
6360
)
6461

6562
# Submit a task that does not require GPU capabilities, this will be routed to any available worker.
6663
cpu_future = client.submit_verbose(
67-
cpu_task,
68-
args=(16.0,),
69-
kwargs={},
70-
capabilities={} # No GPU capability required
64+
cpu_task, args=(16.0,), kwargs={}, capabilities={} # No GPU capability required
7165
)
7266

7367
# Waits for the tasks for finish

pyproject.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,12 @@ all = [
5151

5252
[dependency-groups]
5353
dev = [
54-
"black>=25.1.0",
54+
"black>=24.8.0; python_version == '3.8'",
55+
"black>=25.1.0; python_version >= '3.8'",
5556
"flake8>=7.3.0",
5657
"flake8-pyproject>=1.2.3",
5758
"mypy>=1.17.1",
59+
"scapy==2.*",
5860
]
5961

6062
[tool.scikit-build.metadata.version]

scaler/io/ymq/bytes.h

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include <cstring>
1111

1212
// C++
13+
#include <optional>
1314
#include <string>
1415

1516
// First-party
@@ -33,10 +34,9 @@ class Bytes {
3334
public:
3435
Bytes(char* data, size_t len): _data(datadup((uint8_t*)data, len)), _len(len) {}
3536

36-
Bytes(): _data {}, _len {} {}
37+
Bytes(const std::string& s): _data(datadup((uint8_t*)s.data(), s.length())), _len(s.length()) {}
3738

38-
// For debug and convenience only
39-
explicit Bytes(const std::string& str): Bytes((char*)str.c_str(), str.size()) {}
39+
Bytes(): _data {}, _len {} {}
4040

4141
Bytes(const Bytes& other) noexcept
4242
{
@@ -92,11 +92,10 @@ class Bytes {
9292

9393
[[nodiscard]] constexpr bool is_null() const noexcept { return !this->_data; }
9494

95-
// debugging utility
96-
std::string as_string() const
95+
std::optional<std::string> as_string() const
9796
{
9897
if (is_null())
99-
return "[EMPTY]";
98+
return std::nullopt;
10099

101100
return std::string((char*)_data, _len);
102101
}

scaler/io/ymq/message_connection_tcp.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#include "scaler/io/ymq/message_connection_tcp.h"
33

44
#include <future>
5+
#include <print>
56

67
#include "scaler/io/ymq/configuration.h"
78

@@ -269,7 +270,6 @@ void MessageConnectionTCP::updateReadOperation()
269270
_pendingRecvMessageCallbacks->pop();
270271

271272
recvMessageCallback({Message(std::move(address), std::move(payload)), {}});
272-
273273
} else {
274274
assert(_pendingRecvMessageCallbacks->size());
275275
break;

scaler/io/ymq/network_utils.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <arpa/inet.h>
55
#include <netinet/in.h>
66
#include <netinet/tcp.h>
7+
#include <unistd.h>
78
#endif // __linux__
89
#ifdef _WIN32
910
// clang-format off

scaler/io/ymq/pymod_ymq/async.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
#include "scaler/io/ymq/pymod_ymq/ymq.h"
1111

1212
// wraps an async callback that accepts a Python asyncio future
13-
static PyObject* async_wrapper(PyObject* self, const std::function<void(YMQState* state, PyObject* future)>& callback)
13+
static PyObject* async_wrapper(PyObject* self, const std::function<void(YMQState* state, PyObject* future)>&& callback)
1414
{
1515
auto state = YMQStateFromSelf(self);
1616
if (!state)
@@ -23,7 +23,6 @@ static PyObject* async_wrapper(PyObject* self, const std::function<void(YMQState
2323
}
2424

2525
OwnedPyObject future = PyObject_CallMethod(*loop, "create_future", nullptr);
26-
2726
if (!future) {
2827
PyErr_SetString(PyExc_RuntimeError, "Failed to create future");
2928
return nullptr;

scaler/io/ymq/pymod_ymq/io_socket.h

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -154,10 +154,6 @@ static PyObject* PyIOSocket_recv(PyIOSocket* self, PyObject* args)
154154
if (!pyMessage)
155155
return YMQ_GetRaisedException();
156156

157-
// TODO: why is leaking necessary?
158-
address.forget();
159-
payload.forget();
160-
161157
return (PyObject*)pyMessage.take();
162158
});
163159
} catch (...) {
@@ -219,10 +215,6 @@ static PyObject* PyIOSocket_recv_sync(PyIOSocket* self, PyObject* args)
219215
if (!pyMessage)
220216
return nullptr;
221217

222-
// TODO: why is leaking necessary?
223-
address.forget();
224-
payload.forget();
225-
226218
return (PyObject*)pyMessage.take();
227219
}
228220

0 commit comments

Comments
 (0)