From 7a4c55e8c8c2e4d2a473088ec82114fb14b58473 Mon Sep 17 00:00:00 2001 From: magniloquency <197707854+magniloquency@users.noreply.github.com> Date: Wed, 1 Oct 2025 20:37:50 -0400 Subject: [PATCH 1/9] Add C++ YMQ Tests --- tests/CMakeLists.txt | 2 +- tests/cc_ymq/CMakeLists.txt | 1 + tests/cc_ymq/common.h | 410 +++++++++++++++++++++++++++ tests/cc_ymq/test_cc_ymq.cpp | 522 +++++++++++++++++++++++++++++++++++ 4 files changed, 934 insertions(+), 1 deletion(-) create mode 100644 tests/cc_ymq/CMakeLists.txt create mode 100644 tests/cc_ymq/common.h create mode 100644 tests/cc_ymq/test_cc_ymq.cpp diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index ae6a3b116..e864595a7 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -31,11 +31,11 @@ function(add_test_executable test_name source_file) add_test(NAME ${test_name} COMMAND ${test_name}) endfunction() - if(LINUX OR APPLE) # This directory fetches Google Test, so it must be included first. add_subdirectory(object_storage) # Add the new directory for io tests. add_subdirectory(io/ymq) + add_subdirectory(cc_ymq) endif() diff --git a/tests/cc_ymq/CMakeLists.txt b/tests/cc_ymq/CMakeLists.txt new file mode 100644 index 000000000..9f6abe371 --- /dev/null +++ b/tests/cc_ymq/CMakeLists.txt @@ -0,0 +1 @@ +add_test_executable(test_cc_ymq test_cc_ymq.cpp) diff --git a/tests/cc_ymq/common.h b/tests/cc_ymq/common.h new file mode 100644 index 000000000..5fd9dad9f --- /dev/null +++ b/tests/cc_ymq/common.h @@ -0,0 +1,410 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define RETURN_FAILURE_IF_FALSE(condition) \ + if (!(condition)) { \ + return TestResult::Failure; \ + } + +using namespace std::chrono_literals; + +enum class TestResult : char { Success = 1, Failure = 2 }; + +inline const char* check_localhost(const char* host) +{ + return std::strcmp(host, "localhost") == 0 ? "127.0.0.1" : host; +} + +inline std::string format_address(std::string host, uint16_t port) +{ + return std::format("tcp://{}:{}", check_localhost(host.c_str()), port); +} + +class OwnedFd { +public: + int fd; + + OwnedFd(int fd): fd(fd) {} + + // move-only + OwnedFd(const OwnedFd&) = delete; + OwnedFd& operator=(const OwnedFd&) = delete; + OwnedFd(OwnedFd&& other) noexcept: fd(other.fd) { other.fd = 0; } + OwnedFd& operator=(OwnedFd&& other) noexcept + { + if (this != &other) { + this->fd = other.fd; + other.fd = 0; + } + return *this; + } + + ~OwnedFd() + { + if (fd > 0 && close(fd) < 0) + std::println(std::cerr, "failed to close fd!"); + } + + size_t write(const void* data, size_t len) + { + auto n = ::write(this->fd, data, len); + if (n < 0) + throw std::system_error(errno, std::generic_category(), "failed to write to socket"); + + return n; + } + + void write_all(const char* data, size_t len) + { + for (size_t cursor = 0; cursor < len;) + cursor += this->write(data + cursor, len - cursor); + } + + void write_all(std::string data) { this->write_all(data.data(), data.length()); } + + void write_all(std::vector data) { this->write_all(data.data(), data.size()); } + + size_t read(void* buffer, size_t len) + { + auto n = ::read(this->fd, buffer, len); + if (n < 0) + throw std::system_error(errno, std::generic_category(), "failed to read from socket"); + return n; + } + + void read_exact(char* buffer, size_t len) + { + for (size_t cursor = 0; cursor < len;) + cursor += this->read(buffer + cursor, len - cursor); + } + + operator int() { return fd; } +}; + +class Socket: public OwnedFd { +public: + Socket(int fd): OwnedFd(fd) {} + + void connect(const char* host, uint16_t port, bool nowait = false) + { + sockaddr_in addr { + .sin_family = AF_INET, + .sin_port = htons(port), + .sin_addr = {.s_addr = inet_addr(check_localhost(host))}, + .sin_zero = {0}}; + + connect: + if (::connect(this->fd, (sockaddr*)&addr, sizeof(addr)) < 0) { + if (errno == ECONNREFUSED && !nowait) { + std::this_thread::sleep_for(300ms); + goto connect; + } + + throw std::system_error(errno, std::generic_category(), "failed to connect"); + } + } + + void bind(const char* host, int port) + { + sockaddr_in addr { + .sin_family = AF_INET, + .sin_port = htons(port), + .sin_addr = {.s_addr = inet_addr(check_localhost(host))}, + .sin_zero = {0}}; + + auto status = ::bind(this->fd, (sockaddr*)&addr, sizeof(addr)); + if (status < 0) + throw std::system_error(errno, std::generic_category(), "failed to bind"); + } + + void listen(int n = 32) + { + auto status = ::listen(this->fd, n); + if (status < 0) + throw std::system_error(errno, std::generic_category(), "failed to listen on socket"); + } + + std::pair accept(int flags = 0) + { + sockaddr_in peer_addr {}; + socklen_t len = sizeof(peer_addr); + auto fd = ::accept4(this->fd, (sockaddr*)&peer_addr, &len, flags); + if (fd < 0) + throw std::system_error(errno, std::generic_category(), "failed to accept socket"); + + return std::make_pair(Socket(fd), peer_addr); + } + + void write_message(std::string message) + { + uint64_t header = message.length(); + this->write_all((char*)&header, 8); + this->write_all(message.data(), message.length()); + } + + std::string read_message() + { + uint64_t header = 0; + this->read_exact((char*)&header, 8); + std::vector buffer(header); + this->read_exact(buffer.data(), header); + return std::string(buffer.data(), header); + } +}; + +class TcpSocket: public Socket { +public: + TcpSocket(bool nodelay = true): Socket(0) + { + this->fd = ::socket(AF_INET, SOCK_STREAM, 0); + if (this->fd < 0) + throw std::system_error(errno, std::generic_category(), "failed to create socket"); + + int on = 1; + if (nodelay && setsockopt(this->fd, IPPROTO_TCP, TCP_NODELAY, (char*)&on, sizeof(on)) < 0) + throw std::system_error(errno, std::generic_category(), "failed to set nodelay"); + + if (setsockopt(this->fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) < 0) + throw std::system_error(errno, std::generic_category(), "failed to set reuseaddr"); + } + + void flush() + { + int on = 1; + int off = 0; + + if (setsockopt(this->fd, IPPROTO_TCP, TCP_NODELAY, (char*)&off, sizeof(off)) < 0) + throw std::system_error(errno, std::generic_category(), "failed to set nodelay"); + + if (setsockopt(this->fd, IPPROTO_TCP, TCP_NODELAY, (char*)&on, sizeof(on)) < 0) + throw std::system_error(errno, std::generic_category(), "failed to set nodelay"); + + if (setsockopt(this->fd, IPPROTO_TCP, TCP_NODELAY, (char*)&off, sizeof(off)) < 0) + throw std::system_error(errno, std::generic_category(), "failed to set nodelay"); + + if (setsockopt(this->fd, IPPROTO_TCP, TCP_NODELAY, (char*)&on, sizeof(on)) < 0) + throw std::system_error(errno, std::generic_category(), "failed to set nodelay"); + } +}; + +inline void fork_wrapper(std::function fn, int timeout_secs, OwnedFd pipe_wr) +{ + TestResult result = TestResult::Failure; + try { + result = fn(); + } catch (const std::exception& e) { + std::println(stderr, "Exception: {}", e.what()); + result = TestResult::Failure; + } catch (...) { + std::println(stderr, "Unknown exception"); + result = TestResult::Failure; + } + + pipe_wr.write_all((char*)&result, sizeof(TestResult)); +} + +// run a test +// forks and runs each of the provided closures +inline TestResult test( + int timeout_secs, std::vector> closures) +{ + std::vector> pipes {}; + std::vector pids {}; + for (size_t i = 0; i < closures.size(); i++) { + int pipe[2] = {0}; + if (pipe2(pipe, O_NONBLOCK) < 0) { + std::for_each(pipes.begin(), pipes.end(), [](const auto& pipe) { + close(pipe.first); + close(pipe.second); + }); + + throw std::system_error(errno, std::generic_category(), "failed to create pipe: "); + } + pipes.push_back(std::make_pair(pipe[0], pipe[1])); + } + + for (size_t i = 0; i < closures.size(); i++) { + auto pid = fork(); + if (pid < 0) { + std::for_each(pipes.begin(), pipes.end(), [](const auto& pipe) { + close(pipe.first); + close(pipe.second); + }); + + std::for_each(pids.begin(), pids.end(), [](const auto& pid) { kill(pid, SIGKILL); }); + + throw std::system_error(errno, std::generic_category(), "failed to fork"); + } + + if (pid == 0) { + // close all pipes except our write half + for (size_t j = 0; j < pipes.size(); j++) { + if (i == j) + close(pipes[i].first); + else { + close(pipes[j].first); + close(pipes[j].second); + } + } + + fork_wrapper(closures[i], timeout_secs, pipes[i].second); + std::exit(EXIT_SUCCESS); + } + + pids.push_back(pid); + } + + // close all write halves of the pipes + for (auto pipe: pipes) + close(pipe.second); + + std::vector pfds {}; + + OwnedFd timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK); + if (timerfd < 0) { + std::for_each(pipes.begin(), pipes.end(), [](const auto& pipe) { close(pipe.first); }); + std::for_each(pids.begin(), pids.end(), [](const auto& pid) { kill(pid, SIGKILL); }); + + throw std::system_error(errno, std::generic_category(), "failed to create timerfd"); + } + + pfds.push_back({.fd = timerfd.fd, .events = POLL_IN, .revents = 0}); + for (auto pipe: pipes) + pfds.push_back({ + .fd = pipe.first, + .events = POLL_IN, + .revents = 0, + }); + + itimerspec spec { + .it_interval = + { + .tv_sec = 0, + .tv_nsec = 0, + }, + .it_value = { + .tv_sec = timeout_secs, + .tv_nsec = 0, + }}; + + if (timerfd_settime(timerfd, 0, &spec, nullptr) < 0) { + std::for_each(pipes.begin(), pipes.end(), [](const auto& pipe) { close(pipe.first); }); + std::for_each(pids.begin(), pids.end(), [](const auto& pid) { kill(pid, SIGKILL); }); + + throw std::system_error(errno, std::generic_category(), "failed to set timerfd"); + } + + std::vector> results(pids.size(), std::nullopt); + + for (;;) { + auto n = poll(pfds.data(), pfds.size(), -1); + if (n < 0) { + std::for_each(pipes.begin(), pipes.end(), [](const auto& pipe) { close(pipe.first); }); + std::for_each(pids.begin(), pids.end(), [](const auto& pid) { kill(pid, SIGKILL); }); + + throw std::system_error(errno, std::generic_category(), "failed to poll: "); + } + + for (auto& pfd: std::vector(pfds)) { + if (pfd.revents == 0) + continue; + + // timed out + if (pfd.fd == timerfd) { + std::println("Timed out!"); + + std::for_each(pipes.begin(), pipes.end(), [](const auto& pipe) { close(pipe.first); }); + std::for_each(pids.begin(), pids.end(), [](const auto& pid) { kill(pid, SIGKILL); }); + + return TestResult::Failure; + } + + auto elem = std::find_if(pipes.begin(), pipes.end(), [fd = pfd.fd](auto pipe) { return pipe.first == fd; }); + auto idx = elem - pipes.begin(); + + TestResult result = TestResult::Failure; + char buffer = 0; + auto n = read(pfd.fd, &buffer, sizeof(TestResult)); + if (n == 0) { + std::println("failed to read from pipe: pipe closed unexpectedly"); + result = TestResult::Failure; + } else if (n < 0) { + std::println("failed to read from pipe: {}", std::strerror(errno)); + result = TestResult::Failure; + } else + result = (TestResult)buffer; + + // the subprocess should have exited + // check its exit status + int status; + if (waitpid(pids[idx], &status, 0) < 0) + std::println("failed to wait on subprocess[{}]: {}", idx, std::strerror(errno)); + + auto exit_status = WEXITSTATUS(status); + if (WIFEXITED(status) && exit_status != EXIT_SUCCESS) { + std::println("subprocess[{}] exited with status {}", idx, exit_status); + } else if (WIFSIGNALED(status)) { + std::println("subprocess[{}] killed by signal {}", idx, WTERMSIG(status)); + } else + std::println( + "subprocess[{}] completed with {}", idx, result == TestResult::Success ? "Success" : "Failure"); + + // store the result + results[idx] = result; + + // this subprocess is done, remove its pipe from the poll fds + pfds.erase(std::remove_if(pfds.begin(), pfds.end(), [&](auto p) { return p.fd == pfd.fd; }), pfds.end()); + + auto done = std::all_of(results.begin(), results.end(), [](auto result) { return result.has_value(); }); + if (done) + goto end; // justification for goto: breaks out of two levels of loop + } + } + +end: + + std::for_each(pipes.begin(), pipes.end(), [](const auto& pipe) { close(pipe.first); }); + + if (std::ranges::any_of(results, [](auto x) { return x == TestResult::Failure; })) + return TestResult::Failure; + + return TestResult::Success; +} diff --git a/tests/cc_ymq/test_cc_ymq.cpp b/tests/cc_ymq/test_cc_ymq.cpp new file mode 100644 index 000000000..b55dda17a --- /dev/null +++ b/tests/cc_ymq/test_cc_ymq.cpp @@ -0,0 +1,522 @@ +// this file contains the tests for the C++ interface of YMQ +// each test case is comprised of at least one client and one server, and possibly a middleman +// the clients and servers used in these tests are defined in the first part of this file +// +// the test cases are at the bottom of this file, after the clients and servers +// the documentation for each case is found on the TEST() definition + +#include +#include + +#include +#include +#include +#include +#include +#include + +#include "common.h" +#include "scaler/io/ymq/bytes.h" +#include "scaler/io/ymq/io_context.h" +#include "scaler/io/ymq/simple_interface.h" +#include "tests/cc_ymq/common.h" + +using namespace scaler::ymq; +using namespace std::chrono_literals; + +// ━━━━━━━━━━━━━━━━━━━ +// clients and servers +// ━━━━━━━━━━━━━━━━━━━ + +TestResult basic_server_ymq(std::string host, uint16_t port) +{ + IOContext context(1); + + auto socket = syncCreateSocket(context, IOSocketType::Binder, "server"); + syncBindSocket(socket, format_address(host, port)); + auto result = syncRecvMessage(socket); + + RETURN_FAILURE_IF_FALSE(result.has_value()); + RETURN_FAILURE_IF_FALSE(result->payload.as_string() == "yi er san si wu liu"); + + context.removeIOSocket(socket); + + return TestResult::Success; +} + +TestResult basic_client_ymq(std::string host, uint16_t port) +{ + IOContext context(1); + + auto socket = syncCreateSocket(context, IOSocketType::Connector, "client"); + syncConnectSocket(socket, format_address(host, port)); + auto result = syncSendMessage(socket, {.address = Bytes("server"), .payload = Bytes("yi er san si wu liu")}); + + context.removeIOSocket(socket); + + return TestResult::Success; +} + +TestResult basic_server_raw(std::string host, uint16_t port) +{ + TcpSocket socket; + + socket.bind(host.c_str(), port); + socket.listen(); + auto [client, _] = socket.accept(); + client.write_message("server"); + auto client_identity = client.read_message(); + RETURN_FAILURE_IF_FALSE(client_identity == "client"); + auto msg = client.read_message(); + RETURN_FAILURE_IF_FALSE(msg == "yi er san si wu liu"); + + return TestResult::Success; +} + +TestResult basic_client_raw(std::string host, uint16_t port) +{ + TcpSocket socket; + + socket.connect(host.c_str(), port); + socket.write_message("client"); + auto server_identity = socket.read_message(); + RETURN_FAILURE_IF_FALSE(server_identity == "server"); + socket.write_message("yi er san si wu liu"); + + return TestResult::Success; +} + +TestResult server_receives_big_message(std::string host, uint16_t port) +{ + IOContext context(1); + + auto socket = syncCreateSocket(context, IOSocketType::Binder, "server"); + syncBindSocket(socket, format_address(host, port)); + auto result = syncRecvMessage(socket); + + RETURN_FAILURE_IF_FALSE(result.has_value()); + RETURN_FAILURE_IF_FALSE(result->payload.len() == 500'000'000); + + context.removeIOSocket(socket); + + return TestResult::Success; +} + +TestResult client_sends_big_message(std::string host, uint16_t port) +{ + TcpSocket socket; + + socket.connect(host.c_str(), port); + socket.write_message("client"); + auto remote_identity = socket.read_message(); + RETURN_FAILURE_IF_FALSE(remote_identity == "server"); + std::string msg(500'000'000, '.'); + socket.write_message(msg); + + return TestResult::Success; +} + +TestResult reconnect_server_main(std::string host, uint16_t port) +{ + IOContext context(1); + + auto socket = syncCreateSocket(context, IOSocketType::Binder, "server"); + syncBindSocket(socket, format_address(host, port)); + auto result = syncRecvMessage(socket); + + RETURN_FAILURE_IF_FALSE(result.has_value()); + RETURN_FAILURE_IF_FALSE(result->payload.as_string() == "hello!!"); + + auto error = syncSendMessage(socket, {.address = Bytes("client"), .payload = Bytes("world!!")}); + RETURN_FAILURE_IF_FALSE(!error); + + context.removeIOSocket(socket); + + return TestResult::Success; +} + +TestResult reconnect_client_main(std::string host, uint16_t port) +{ + IOContext context(1); + + auto socket = syncCreateSocket(context, IOSocketType::Connector, "client"); + syncConnectSocket(socket, format_address(host, port)); + auto result = syncSendMessage(socket, {.address = Bytes("server"), .payload = Bytes("hello!!")}); + auto msg = syncRecvMessage(socket); + RETURN_FAILURE_IF_FALSE(msg.has_value()); + RETURN_FAILURE_IF_FALSE(msg->payload.as_string() == "world!!"); + + context.removeIOSocket(socket); + + return TestResult::Success; +} + +TestResult client_simulated_slow_network(const char* host, uint16_t port) +{ + TcpSocket socket; + + socket.connect(host, port); + socket.write_message("client"); + auto remote_identity = socket.read_message(); + RETURN_FAILURE_IF_FALSE(remote_identity == "server"); + + std::string message = "yi er san si wu liu"; + uint64_t header = message.length(); + + socket.write_all((char*)&header, 4); + std::this_thread::sleep_for(2s); + socket.write_all((char*)&header + 4, 4); + std::this_thread::sleep_for(3s); + socket.write_all(message.data(), header / 2); + std::this_thread::sleep_for(2s); + socket.write_all(message.data() + header / 2, header - header / 2); + + return TestResult::Success; +} + +TestResult client_sends_incomplete_identity(const char* host, uint16_t port) +{ + // open a socket, write an incomplete identity and exit + { + TcpSocket socket; + + socket.connect(host, port); + + auto server_identity = socket.read_message(); + RETURN_FAILURE_IF_FALSE(server_identity == "server"); + + // write incomplete identity and exit + std::string identity = "client"; + uint64_t header = identity.length(); + socket.write_all((char*)&header, 8); + socket.write_all(identity.data(), identity.length() - 2); + } + + // connect again and try to send a message + { + TcpSocket socket; + socket.connect(host, port); + auto server_identity = socket.read_message(); + RETURN_FAILURE_IF_FALSE(server_identity == "server"); + socket.write_message("client"); + socket.write_message("yi er san si wu liu"); + } + + return TestResult::Success; +} + +TestResult server_receives_huge_header(const char* host, uint16_t port) +{ + IOContext context(1); + + auto socket = syncCreateSocket(context, IOSocketType::Binder, "server"); + syncBindSocket(socket, format_address(host, port)); + auto result = syncRecvMessage(socket); + + RETURN_FAILURE_IF_FALSE(result.has_value()); + RETURN_FAILURE_IF_FALSE(result->payload.as_string() == "yi er san si wu liu"); + + context.removeIOSocket(socket); + + return TestResult::Success; +} + +TestResult client_sends_huge_header(const char* host, uint16_t port) +{ + // ignore SIGPIPE so that write() returns EPIPE instead of crashing the program + signal(SIGPIPE, SIG_IGN); + + { + TcpSocket socket; + + socket.connect(host, port); + socket.write_message("client"); + auto server_identity = socket.read_message(); + RETURN_FAILURE_IF_FALSE(server_identity == "server"); + + // write the huge header + uint64_t header = std::numeric_limits::max(); + socket.write_all((char*)&header, 8); + + size_t i = 0; + for (; i < 10; i++) { + std::this_thread::sleep_for(1s); + + try { + socket.write_all("yi er san si wu liu"); + } catch (const std::system_error& e) { + if (e.code().value() == EPIPE) { + std::println("writing failed with EPIPE as expected after sending huge header, continuing.."); + break; // this is expected + } + + throw; // rethrow other errors + } + } + + if (i == 10) { + std::println("expected EPIPE after sending huge header"); + return TestResult::Failure; + } + } + + { + TcpSocket socket; + socket.connect(host, port); + socket.write_message("client"); + auto server_identity = socket.read_message(); + RETURN_FAILURE_IF_FALSE(server_identity == "server"); + socket.write_message("yi er san si wu liu"); + } + + return TestResult::Success; +} + +TestResult server_receives_empty_messages(const char* host, uint16_t port) +{ + IOContext context(1); + + auto socket = syncCreateSocket(context, IOSocketType::Binder, "server"); + syncBindSocket(socket, format_address(host, port)); + + auto result = syncRecvMessage(socket); + RETURN_FAILURE_IF_FALSE(result.has_value()); + RETURN_FAILURE_IF_FALSE(result->payload.as_string() == ""); + + auto result2 = syncRecvMessage(socket); + RETURN_FAILURE_IF_FALSE(result2.has_value()); + RETURN_FAILURE_IF_FALSE(result2->payload.as_string() == ""); + + context.removeIOSocket(socket); + + return TestResult::Success; +} + +TestResult client_sends_empty_messages(std::string host, uint16_t port) +{ + IOContext context(1); + + auto socket = syncCreateSocket(context, IOSocketType::Connector, "client"); + syncConnectSocket(socket, format_address(host, port)); + + auto error = syncSendMessage(socket, Message {.address = Bytes(), .payload = Bytes()}); + RETURN_FAILURE_IF_FALSE(!error); + + auto error2 = syncSendMessage(socket, Message {.address = Bytes(), .payload = Bytes("")}); + RETURN_FAILURE_IF_FALSE(!error2); + + context.removeIOSocket(socket); + + return TestResult::Success; +} + +TestResult pubsub_subscriber(std::string host, uint16_t port, std::string topic, int differentiator) +{ + IOContext context(1); + + auto socket = + syncCreateSocket(context, IOSocketType::Unicast, std::format("{}_subscriber_{}", topic, differentiator)); + syncConnectSocket(socket, format_address(host, port)); + auto msg = syncRecvMessage(socket); + RETURN_FAILURE_IF_FALSE(msg.has_value()); + RETURN_FAILURE_IF_FALSE(msg->payload.as_string() == "hello topic " + topic); + + context.removeIOSocket(socket); + return TestResult::Success; +} + +TestResult pubsub_publisher(std::string host, uint16_t port, std::string topic) +{ + IOContext context(1); + + auto socket = syncCreateSocket(context, IOSocketType::Multicast, "publisher"); + syncBindSocket(socket, format_address(host, port)); + + // wait a second to ensure that the subscribers are ready + std::this_thread::sleep_for(1s); + + // the topic is wrong, so no one should receive this + auto error = syncSendMessage( + socket, Message {.address = Bytes(std::format("x{}", topic)), .payload = Bytes("no one should get this")}); + RETURN_FAILURE_IF_FALSE(!error); + + // no one should receive this either + error = syncSendMessage( + socket, + Message {.address = Bytes(std::format("{}x", topic)), .payload = Bytes("no one should get this either")}); + RETURN_FAILURE_IF_FALSE(!error); + + error = syncSendMessage(socket, Message {.address = Bytes(topic), .payload = Bytes("hello topic " + topic)}); + RETURN_FAILURE_IF_FALSE(!error); + + context.removeIOSocket(socket); + return TestResult::Success; +} + +// ━━━━━━━━━━━━━ +// test cases +// ━━━━━━━━━━━━━ + +// this is a 'basic' test which sends a single message from a client to a server +// in this variant, both the client and server are implemented using YMQ +// +// this case includes a _delay_ +// this is a thread sleep that happens after the client sends the message, to delay the close() of the socket +// at the moment, if this delay is missing, YMQ will not shut down correctly +TEST(CcYmqTestSuite, TestBasicYMQClientYMQServer) +{ + auto host = "localhost"; + auto port = 2889; + + // this is the test harness, it accepts a timeout, and a list of functions to run + auto result = + test(10, {[=] { return basic_client_ymq(host, port); }, [=] { return basic_server_ymq(host, port); }}); + + // test() aggregates the results across all of the provided functions + EXPECT_EQ(result, TestResult::Success); +} + +// same as above, except YMQs protocol is directly implemented on top of a TCP socket +TEST(CcYmqTestSuite, TestBasicRawClientYMQServer) +{ + auto host = "localhost"; + auto port = 2890; + + auto result = + test(10, {[=] { return basic_client_raw(host, port); }, [=] { return basic_server_ymq(host, port); }}); + + // test() aggregates the results across all of the provided functions + EXPECT_EQ(result, TestResult::Success); +} + +TEST(CcYmqTestSuite, TestBasicRawClientRawServer) +{ + auto host = "localhost"; + auto port = 2891; + + auto result = + test(10, {[=] { return basic_client_raw(host, port); }, [=] { return basic_server_raw(host, port); }}); + + // test() aggregates the results across all of the provided functions + EXPECT_EQ(result, TestResult::Success); +} + +// this is the same as above, except that it has no delay before calling close() on the socket +TEST(CcYmqTestSuite, TestBasicRawClientRawServerNoDelay) +{ + auto host = "localhost"; + auto port = 2892; + + auto result = + test(10, {[=] { return basic_client_raw(host, port); }, [=] { return basic_server_ymq(host, port); }}); + EXPECT_EQ(result, TestResult::Success); +} + +TEST(CcYmqTestSuite, TestBasicDelayYMQClientRawServer) +{ + auto host = "localhost"; + auto port = 2893; + + auto result = + test(10, {[=] { return basic_client_ymq(host, port); }, [=] { return basic_server_raw(host, port); }}); + + // test() aggregates the results across all of the provided functions + EXPECT_EQ(result, TestResult::Success); +} + +// in this test case, the client sends a large message to the server +// YMQ should be able to handle this without issue +TEST(CcYmqTestSuite, TestClientSendBigMessageToServer) +{ + auto host = "localhost"; + auto port = 2894; + + auto result = test( + 10, + {[=] { return client_sends_big_message(host, port); }, + [=] { return server_receives_big_message(host, port); }}); + EXPECT_EQ(result, TestResult::Success); +} + +// in this test the client is sending a message to the server +// but we simulate a slow network connection by sending the message in segmented chunks +TEST(CcYmqTestSuite, TestSlowNetwork) +{ + auto host = "localhost"; + auto port = 2895; + + auto result = test( + 20, {[=] { return client_simulated_slow_network(host, port); }, [=] { return basic_server_ymq(host, port); }}); + EXPECT_EQ(result, TestResult::Success); +} + +// TODO: figure out why this test fails in ci sometimes, and re-enable +// +// in this test, a client connects to the YMQ server but only partially sends its identity and then disconnects +// then a new client connection is established, and this one sends a complete identity and message +// YMQ should be able to recover from a poorly-behaved client like this +TEST(CcYmqTestSuite, TestClientSendIncompleteIdentity) +{ + auto host = "localhost"; + auto port = 2896; + + auto result = test( + 20, + {[=] { return client_sends_incomplete_identity(host, port); }, [=] { return basic_server_ymq(host, port); }}); + EXPECT_EQ(result, TestResult::Success); +} + +// TODO: this should pass +// currently YMQ rejects the second connection, saying that the message is too large even when it isn't +// +// in this test, the client sends an unrealistically-large header +// it is important that YMQ checks the header size before allocating memory +// both for resilence against attacks and to guard against errors +TEST(CcYmqTestSuite, TestClientSendHugeHeader) +{ + auto host = "localhost"; + auto port = 2897; + + auto result = test( + 20, + {[=] { return client_sends_huge_header(host, port); }, + [=] { return server_receives_huge_header(host, port); }}); + EXPECT_EQ(result, TestResult::Success); +} + +// in this test, the client sends empty messages to the server +// there are in effect two kinds of empty messages: Bytes() and Bytes("") +// in the former case, the bytes contains a nullptr +// in the latter case, the bytes contains a zero-length allocation +// it's important that the behaviour of YMQ is known for both of these cases +TEST(CcYmqTestSuite, TestClientSendEmptyMessage) +{ + auto host = "localhost"; + auto port = 2898; + + auto result = test( + 20, + {[=] { return client_sends_empty_messages(host, port); }, + [=] { return server_receives_empty_messages(host, port); }}); + EXPECT_EQ(result, TestResult::Success); +} + +// this case tests the publish-subscribe pattern of YMQ +// we create one publisher and two subscribers with a common topic +// the publisher will send two messages to the wrong topic +// none of the subscribers should receive these +// and then the publisher will send a message to the correct topic +// both subscribers should receive this message +TEST(CcYmqTestSuite, TestPubSub) +{ + auto host = "localhost"; + auto port = 2900; + auto topic = "mytopic"; + + auto result = test( + 20, + {[=] { return pubsub_publisher(host, port, topic); }, + [=] { return pubsub_subscriber(host, port, topic, 0); }, + [=] { return pubsub_subscriber(host, port, topic, 1); }}); + EXPECT_EQ(result, TestResult::Success); +} From 2d3872c7020a6ec3f9684dfb8c2a61dea0ebe921 Mon Sep 17 00:00:00 2001 From: magniloquency <197707854+magniloquency@users.noreply.github.com> Date: Wed, 1 Oct 2025 21:48:35 -0400 Subject: [PATCH 2/9] Move tests to tests/cpp/ymq --- tests/CMakeLists.txt | 2 +- tests/cpp/CMakeLists.txt | 1 + tests/{cc_ymq => cpp/ymq}/CMakeLists.txt | 0 tests/{cc_ymq => cpp/ymq}/common.h | 0 tests/{cc_ymq => cpp/ymq}/test_cc_ymq.cpp | 2 +- 5 files changed, 3 insertions(+), 2 deletions(-) create mode 100644 tests/cpp/CMakeLists.txt rename tests/{cc_ymq => cpp/ymq}/CMakeLists.txt (100%) rename tests/{cc_ymq => cpp/ymq}/common.h (100%) rename tests/{cc_ymq => cpp/ymq}/test_cc_ymq.cpp (99%) diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index e864595a7..b62b86fd4 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -37,5 +37,5 @@ if(LINUX OR APPLE) # Add the new directory for io tests. add_subdirectory(io/ymq) - add_subdirectory(cc_ymq) + add_subdirectory(cpp) endif() diff --git a/tests/cpp/CMakeLists.txt b/tests/cpp/CMakeLists.txt new file mode 100644 index 000000000..26c48e9b0 --- /dev/null +++ b/tests/cpp/CMakeLists.txt @@ -0,0 +1 @@ +add_subdirectory(ymq) \ No newline at end of file diff --git a/tests/cc_ymq/CMakeLists.txt b/tests/cpp/ymq/CMakeLists.txt similarity index 100% rename from tests/cc_ymq/CMakeLists.txt rename to tests/cpp/ymq/CMakeLists.txt diff --git a/tests/cc_ymq/common.h b/tests/cpp/ymq/common.h similarity index 100% rename from tests/cc_ymq/common.h rename to tests/cpp/ymq/common.h diff --git a/tests/cc_ymq/test_cc_ymq.cpp b/tests/cpp/ymq/test_cc_ymq.cpp similarity index 99% rename from tests/cc_ymq/test_cc_ymq.cpp rename to tests/cpp/ymq/test_cc_ymq.cpp index b55dda17a..f20321908 100644 --- a/tests/cc_ymq/test_cc_ymq.cpp +++ b/tests/cpp/ymq/test_cc_ymq.cpp @@ -19,7 +19,7 @@ #include "scaler/io/ymq/bytes.h" #include "scaler/io/ymq/io_context.h" #include "scaler/io/ymq/simple_interface.h" -#include "tests/cc_ymq/common.h" +#include "tests/cpp/ymq/common.h" using namespace scaler::ymq; using namespace std::chrono_literals; From daa98825cfe8a0b3fc6e49b81b84a84889daa424 Mon Sep 17 00:00:00 2001 From: magniloquency <197707854+magniloquency@users.noreply.github.com> Date: Wed, 1 Oct 2025 21:48:35 -0400 Subject: [PATCH 3/9] Move tests to tests/cpp/ymq --- scripts/build.sh | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/scripts/build.sh b/scripts/build.sh index 7256ea367..ec709a017 100755 --- a/scripts/build.sh +++ b/scripts/build.sh @@ -11,9 +11,9 @@ ARCH="$(uname -m)" # e.g. x86_64 or arm64 BUILD_DIR="build_${OS}_${ARCH}" BUILD_PRESET="${OS}-${ARCH}" -rm -rf $BUILD_DIR -rm -f scaler/protocol/capnp/*.c++ -rm -f scaler/protocol/capnp/*.h +# rm -rf $BUILD_DIR +# rm -f scaler/protocol/capnp/*.c++ +# rm -f scaler/protocol/capnp/*.h echo "Build directory: $BUILD_DIR" echo "Build preset: $BUILD_PRESET" From 637b692ad8c220eee6156e9f3bfab3b6ff988db6 Mon Sep 17 00:00:00 2001 From: magniloquency <197707854+magniloquency@users.noreply.github.com> Date: Wed, 1 Oct 2025 20:46:18 -0400 Subject: [PATCH 4/9] Add YMQ MITM Tests Move files Revert --- scripts/build.sh | 9 +- scripts/test.sh | 15 ++ tests/CMakeLists.txt | 3 + tests/cpp/ymq/common.h | 148 ++++++++++++++- tests/cpp/ymq/py_mitm/__init__.py | 0 tests/cpp/ymq/py_mitm/main.py | 175 ++++++++++++++++++ tests/cpp/ymq/py_mitm/passthrough.py | 25 +++ .../cpp/ymq/py_mitm/randomly_drop_packets.py | 29 +++ tests/cpp/ymq/py_mitm/send_rst_to_client.py | 50 +++++ tests/cpp/ymq/py_mitm/types.py | 54 ++++++ tests/cpp/ymq/test_cc_ymq.cpp | 119 +++++++++++- 11 files changed, 611 insertions(+), 16 deletions(-) create mode 100755 scripts/test.sh create mode 100644 tests/cpp/ymq/py_mitm/__init__.py create mode 100644 tests/cpp/ymq/py_mitm/main.py create mode 100644 tests/cpp/ymq/py_mitm/passthrough.py create mode 100644 tests/cpp/ymq/py_mitm/randomly_drop_packets.py create mode 100644 tests/cpp/ymq/py_mitm/send_rst_to_client.py create mode 100644 tests/cpp/ymq/py_mitm/types.py diff --git a/scripts/build.sh b/scripts/build.sh index ec709a017..4279683d0 100755 --- a/scripts/build.sh +++ b/scripts/build.sh @@ -11,9 +11,9 @@ ARCH="$(uname -m)" # e.g. x86_64 or arm64 BUILD_DIR="build_${OS}_${ARCH}" BUILD_PRESET="${OS}-${ARCH}" -# rm -rf $BUILD_DIR -# rm -f scaler/protocol/capnp/*.c++ -# rm -f scaler/protocol/capnp/*.h +rm -rf $BUILD_DIR +rm -f scaler/protocol/capnp/*.c++ +rm -f scaler/protocol/capnp/*.h echo "Build directory: $BUILD_DIR" echo "Build preset: $BUILD_PRESET" @@ -25,6 +25,3 @@ cmake --build --preset $BUILD_PRESET # Install cmake --install $BUILD_DIR - -# Tests -ctest --preset $BUILD_PRESET diff --git a/scripts/test.sh b/scripts/test.sh new file mode 100755 index 000000000..948090580 --- /dev/null +++ b/scripts/test.sh @@ -0,0 +1,15 @@ +#!/bin/bash -e +# +# This script tests the C++ components. +# +# Usage: +# ./scripts/test.sh + +OS="$(uname -s | tr '[:upper:]' '[:lower:]')" # e.g. linux or darwin +ARCH="$(uname -m)" # e.g. x86_64 or arm64 + +BUILD_DIR="build_${OS}_${ARCH}" +BUILD_PRESET="${OS}-${ARCH}" + +# Run tests +ctest --preset $BUILD_PRESET -VV diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index b62b86fd4..ca35dc2c2 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -11,6 +11,8 @@ set(BUILD_GMOCK OFF CACHE BOOL "" FORCE) set(BUILD_GTEST ON CACHE BOOL "" FORCE) FetchContent_MakeAvailable(googletest) +find_package(Python3 COMPONENTS Development REQUIRED) + # This function compiles, links, and adds a C++ test executable using Google Test. # It is shared by all test subdirectories. function(add_test_executable test_name source_file) @@ -26,6 +28,7 @@ function(add_test_executable test_name source_file) CapnProto::capnp CapnProto::kj GTest::gtest_main + Python3::Python ) add_test(NAME ${test_name} COMMAND ${test_name}) diff --git a/tests/cpp/ymq/common.h b/tests/cpp/ymq/common.h index 5fd9dad9f..a8a23de01 100644 --- a/tests/cpp/ymq/common.h +++ b/tests/cpp/ymq/common.h @@ -1,6 +1,8 @@ #pragma once #include +#define PY_SSIZE_T_CLEAN +#include #include #include #include @@ -241,10 +243,50 @@ inline void fork_wrapper(std::function fn, int timeout_secs, Owned pipe_wr.write_all((char*)&result, sizeof(TestResult)); } +// this function along with `wait_for_python_ready_sigwait()` +// work together to wait on a signal from the python process +// indicating that the tuntap interface has been created, and that the mitm is ready +inline void wait_for_python_ready_sigblock() +{ + sigset_t set {}; + + if (sigemptyset(&set) < 0) + throw std::system_error(errno, std::generic_category(), "failed to create empty signal set"); + + if (sigaddset(&set, SIGUSR1) < 0) + throw std::system_error(errno, std::generic_category(), "failed to add sigusr1 to the signal set"); + + if (sigprocmask(SIG_BLOCK, &set, nullptr) < 0) + throw std::system_error(errno, std::generic_category(), "failed to mask sigusr1"); + + std::println("blocked signal..."); +} + +inline void wait_for_python_ready_sigwait(int timeout_secs) +{ + sigset_t set {}; + siginfo_t sig {}; + + if (sigemptyset(&set) < 0) + throw std::system_error(errno, std::generic_category(), "failed to create empty signal set"); + + if (sigaddset(&set, SIGUSR1) < 0) + throw std::system_error(errno, std::generic_category(), "failed to add sigusr1 to the signal set"); + + std::println("waiting for python to be ready..."); + timespec ts {.tv_sec = timeout_secs, .tv_nsec = 0}; + if (sigtimedwait(&set, &sig, &ts) < 0) + throw std::system_error(errno, std::generic_category(), "failed to wait on sigusr1"); + + sigprocmask(SIG_UNBLOCK, &set, nullptr); + std::println("signal received; python is ready"); +} + // run a test // forks and runs each of the provided closures +// if `wait_for_python` is true, wait for SIGUSR1 after forking and executing the first closure inline TestResult test( - int timeout_secs, std::vector> closures) + int timeout_secs, std::vector> closures, bool wait_for_python = false) { std::vector> pipes {}; std::vector pids {}; @@ -262,6 +304,9 @@ inline TestResult test( } for (size_t i = 0; i < closures.size(); i++) { + if (wait_for_python && i == 0) + wait_for_python_ready_sigblock(); + auto pid = fork(); if (pid < 0) { std::for_each(pipes.begin(), pipes.end(), [](const auto& pipe) { @@ -290,6 +335,9 @@ inline TestResult test( } pids.push_back(pid); + + if (wait_for_python && i == 0) + wait_for_python_ready_sigwait(3); } // close all write halves of the pipes @@ -408,3 +456,101 @@ inline TestResult test( return TestResult::Success; } + +inline TestResult run_python(const char* path, std::vector argv = {}) +{ + // insert the pid at the start of the argv, this is important for signalling readiness + pid_t pid = getppid(); + auto pid_ws = std::to_wstring(pid); + argv.insert(argv.begin(), pid_ws.c_str()); + + PyStatus status; + PyConfig config; + PyConfig_InitPythonConfig(&config); + + status = PyConfig_SetBytesString(&config, &config.program_name, "mitm"); + if (PyStatus_Exception(status)) + goto exception; + + status = Py_InitializeFromConfig(&config); + if (PyStatus_Exception(status)) + goto exception; + PyConfig_Clear(&config); + + argv.insert(argv.begin(), L"mitm"); + PySys_SetArgv(argv.size(), (wchar_t**)argv.data()); + + { + auto file = fopen(path, "r"); + if (!file) + throw std::system_error(errno, std::generic_category(), "failed to open python file"); + + PyRun_SimpleFile(file, path); + fclose(file); + } + + if (Py_FinalizeEx() < 0) { + std::println("finalization failure"); + return TestResult::Failure; + } + + return TestResult::Success; + +exception: + PyConfig_Clear(&config); + Py_ExitStatusException(status); + + return TestResult::Failure; +} + +// change the current working directory to the project root +// this is important for finding the python mitm script +inline void chdir_to_project_root() +{ + auto cwd = std::filesystem::current_path(); + + // if pyproject.toml is in `path`, it's the project root + for (auto path = cwd; !path.empty(); path = path.parent_path()) { + if (std::filesystem::exists(path / "pyproject.toml")) { + // change to the project root + std::filesystem::current_path(path); + return; + } + } +} + +inline TestResult run_mitm( + std::string testcase, + std::string mitm_ip, + uint16_t mitm_port, + std::string remote_ip, + uint16_t remote_port, + std::vector extra_args = {}) +{ + auto cwd = std::filesystem::current_path(); + chdir_to_project_root(); + + // we build the args for the user to make calling the function more convenient + std::vector args { + testcase, mitm_ip, std::to_string(mitm_port), remote_ip, std::to_string(remote_port)}; + + for (auto arg: extra_args) + args.push_back(arg); + + // we need to convert to wide strings to pass to Python + std::vector wide_args_owned {}; + + // the strings are ascii so we can just make them into wstrings + for (const auto& str: args) + wide_args_owned.emplace_back(str.begin(), str.end()); + + std::vector wide_args {}; + for (const auto& wstr: wide_args_owned) + wide_args.push_back(wstr.c_str()); + + auto result = run_python("tests/cpp/ymq/py_mitm/main.py", wide_args); + + // change back to the original working directory + std::filesystem::current_path(cwd); + return result; +} diff --git a/tests/cpp/ymq/py_mitm/__init__.py b/tests/cpp/ymq/py_mitm/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/cpp/ymq/py_mitm/main.py b/tests/cpp/ymq/py_mitm/main.py new file mode 100644 index 000000000..15e8e8349 --- /dev/null +++ b/tests/cpp/ymq/py_mitm/main.py @@ -0,0 +1,175 @@ +# flake8: noqa: E402 + +""" +This script provides a framework for running MITM test cases +""" + +import argparse +import os +import sys +import importlib +import signal +import subprocess +from tests.cpp.ymq.py_mitm.types import AbstractMITM, TCPConnection +from scapy.all import IP, TCP, TunTapInterface # type: ignore +from typing import List + +from tests.cpp.ymq.py_mitm import passthrough, randomly_drop_packets, send_rst_to_client + + +def echo_call(cmd: List[str]): + print(f"+ {' '.join(cmd)}") + subprocess.check_call(cmd) + + +def create_tuntap_interface(iface_name: str, mitm_ip: str, remote_ip: str) -> TunTapInterface: + """ + Creates a TUNTAP interface and sets brings it up and adds ips using the `ip` program + + Args: + iface_name: The name of the TUNTAP interface, usually like `tun0`, `tun1`, etc. + mitm_ip: The desired ip address of the mitm. This is the ip that clients can use to connect to the mitm + remote_ip: The ip that routes to/from the tuntap interface. + packets sent to `mitm_ip` will appear to come from `remote_ip`,\ + and conversely the tuntap interface can connect/send packets + to `remote_ip`, making it a suitable ip for binding a server + + Returns: + The TUNTAP interface + """ + iface = TunTapInterface(iface_name, mode="tun") + + try: + echo_call(["sudo", "ip", "link", "set", iface_name, "up"]) + echo_call(["sudo", "ip", "addr", "add", remote_ip, "peer", mitm_ip, "dev", iface_name]) + print(f"[+] Interface {iface_name} up with IP {mitm_ip}") + except subprocess.CalledProcessError: + print("[!] Could not bring up interface. Run as root or set manually.") + raise + + return iface + + +def main(pid: int, mitm_ip: str, mitm_port: int, remote_ip: str, server_port: int, mitm: MITMProtocol): + """ + This function serves as a framework for man in the middle implementations + A client connects to the MITM, then the MITM connects to a remote server + The MITM sits inbetween the client and the server, manipulating the packets sent depending on the test case + This function: + 1. creates a TUNTAP interface and prepares it for MITM + 2. handles connecting clients and handling connection closes + 3. delegates additional logic to a pluggable callable, `mitm` + 4. returns when both connections have terminated (via ) + + Args: + pid: this is the pid of the test process, used for signaling readiness \ + we send SIGUSR1 to this process when the mitm is ready + mitm_ip: The desired ip address of the mitm server + mitm_port: The desired port of the mitm server. \ + This is the port used to connect to the server, but the client is free to connect on any port + remote_ip: The desired remote ip for the TUNTAP interface. This is the only ip address \ + reachable by the interface and is thus the src ip for clients, and the ip that the remote server \ + must be bound to + server_port: The port that the remote server is bound to + mitm: The core logic for a MITM test case. This callable may maintain its own state and is responsible \ + for sending packets over the TUNTAP interface (if it doesn't, nothing will happen) + """ + + tuntap = create_tuntap_interface("tun0", mitm_ip, remote_ip) + + # signal the caller that the tuntap interface has been created + if pid > 0: + os.kill(pid, signal.SIGUSR1) + + # these track information about our connections + # we already know what to expect for the server connection, we are the connector + client_conn = None + + # the port that the mitm uses to connect to the server + # we increment the port for each new connection to avoid collisions + mitm_server_port = mitm_port + server_conn = TCPConnection(mitm_ip, mitm_server_port, remote_ip, server_port) + + # tracks the state of each connection + client_sent_fin_ack = False + client_closed = False + server_sent_fin_ack = False + server_closed = False + + while True: + pkt = tuntap.recv() + if not pkt.haslayer(IP) or not pkt.haslayer(TCP): + continue + ip = pkt[IP] + tcp = pkt[TCP] + + # for a received packet, the destination ip and port are our local ip and port + # and the source ip and port will be the remote ip and port + sender = TCPConnection(pkt.dst, pkt.dport, pkt.src, pkt.sport) + + pretty = f"[{tcp.flags}]{(': ' + str(bytes(tcp.payload))) if tcp.payload else ''}" + + if not mitm.proxy(tuntap, pkt, sender, client_conn, server_conn): + if sender == client_conn: + print(f"[DROPPED]: -> {pretty}") + elif sender == server_conn: + print(f"[DROPPED]: <- {pretty}") + else: + print(f"[DROPPED]: ?? {pretty}") + + continue # the segment was not proxied, so we can't update our internal state + + if sender == client_conn: + print(f"-> {pretty}") + elif sender == server_conn: + print(f"<- {pretty}") + + if tcp.flags == "S": # SYN from client + print("-> [S]") + if sender != client_conn or client_conn is None: + print(f"[*] New connection from {ip.src}:{tcp.sport} to {ip.dst}:{tcp.dport}") + client_conn = sender + + server_conn = TCPConnection(mitm_ip, mitm_server_port, remote_ip, server_port) + + # increment the port so that the next client connection (if there is one) uses a different port + mitm_server_port += 1 + + if tcp.flags == "SA": # SYN-ACK from server + if sender == server_conn: + print(f"[*] Connection to server established: {ip.src}:{tcp.sport} to {ip.dst}:{tcp.dport}") + + if tcp.flags.F and tcp.flags.A: # FIN-ACK + if sender == client_conn: + client_sent_fin_ack = True + if sender == server_conn: + server_sent_fin_ack = True + + if tcp.flags.A: # ACK + if sender == client_conn and server_sent_fin_ack: + server_closed = True + if sender == server_conn and client_sent_fin_ack: + client_closed = True + + if client_closed and server_closed: + print("[*] Both connections closed") + return + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Man in the middle test framework") + parser.add_argument("pid", type=int, help="The pid of the test process, used for signaling") + parser.add_argument("testcase", type=str, help="The MITM test case module name") + parser.add_argument("mitm_ip", type=str, help="The desired ip address of the mitm server") + parser.add_argument("mitm_port", type=int, help="The desired port of the mitm server") + parser.add_argument("remote_ip", type=str, help="The desired remote ip for the TUNTAP interface") + parser.add_argument("server_port", type=int, help="The port that the remote server is bound to") + + args, unknown = parser.parse_known_args() + + # add the script's directory to path + sys.path.append(os.path.dirname(os.path.realpath(__file__))) + + # load the module dynamically + module = importlib.import_module(args.testcase) + main(args.pid, args.mitm_ip, args.mitm_port, args.remote_ip, args.server_port, module.MITM(*unknown)) diff --git a/tests/cpp/ymq/py_mitm/passthrough.py b/tests/cpp/ymq/py_mitm/passthrough.py new file mode 100644 index 000000000..04a17e6ee --- /dev/null +++ b/tests/cpp/ymq/py_mitm/passthrough.py @@ -0,0 +1,25 @@ +""" +This MITM acts as a transparent passthrough, it simply forwards packets as they are, +minus necessary header changes to retransmit +This MITM should have no effect on the client and server, +and they should behave as if the MITM is not present +""" + +from tests.cpp.ymq.py_mitm.types import AbstractMITM, TunTapInterface, IP, TCPConnection +from typing import Optional + + +class MITM(MITMProtocol): + def proxy( + self, + tuntap: TunTapInterface, + pkt: IP, + sender: TCPConnection, + client_conn: Optional[TCPConnection], + server_conn: TCPConnection, + ) -> bool: + if sender == client_conn or client_conn is None: + tuntap.send(server_conn.rewrite(pkt)) + elif sender == server_conn: + tuntap.send(client_conn.rewrite(pkt)) + return True diff --git a/tests/cpp/ymq/py_mitm/randomly_drop_packets.py b/tests/cpp/ymq/py_mitm/randomly_drop_packets.py new file mode 100644 index 000000000..6278c93c2 --- /dev/null +++ b/tests/cpp/ymq/py_mitm/randomly_drop_packets.py @@ -0,0 +1,29 @@ +""" +This MITM drops a % of packets +""" + +import random +from tests.cpp.ymq.py_mitm.types import AbstractMITM, TunTapInterface, IP, TCPConnection +from typing import Optional + + +class MITM(MITMProtocol): + def __init__(self, drop_pcent: str): + self.drop_pcent = float(drop_pcent) + + def proxy( + self, + tuntap: TunTapInterface, + pkt: IP, + sender: TCPConnection, + client_conn: Optional[TCPConnection], + server_conn: TCPConnection, + ) -> bool: + if random.random() < self.drop_pcent: + return False + + if sender == client_conn or client_conn is None: + tuntap.send(server_conn.rewrite(pkt)) + elif sender == server_conn: + tuntap.send(client_conn.rewrite(pkt)) + return True diff --git a/tests/cpp/ymq/py_mitm/send_rst_to_client.py b/tests/cpp/ymq/py_mitm/send_rst_to_client.py new file mode 100644 index 000000000..9864b6986 --- /dev/null +++ b/tests/cpp/ymq/py_mitm/send_rst_to_client.py @@ -0,0 +1,50 @@ +""" +This MITM inserts an unexpected TCP RST +""" + +from tests.cpp.ymq.py_mitm.types import IP, TCP, AbstractMITM, TCPConnection, TunTapInterface +from typing import Optional + + +class MITM(MITMProtocol): + def __init__(self): + # count the number of psh-acks sent by the client + self.client_pshack_counter = 0 + + def proxy( + self, + tuntap: TunTapInterface, + pkt: IP, + sender: TCPConnection, + client_conn: Optional[TCPConnection], + server_conn: TCPConnection, + ) -> bool: + if sender == client_conn or client_conn is None: + if pkt[TCP].flags == "PA": + self.client_pshack_counter += 1 + + # on the second psh-ack, send a rst instead + if self.client_pshack_counter == 2: + rst_pkt = IP(src=client_conn.local_ip, dst=client_conn.remote_ip) / TCP( + sport=client_conn.local_port, dport=client_conn.remote_port, flags="R", seq=pkt[TCP].ack + ) + print(f"<- [{rst_pkt[TCP].flags}] (simulated)") + tuntap.send(rst_pkt) + return False + + tuntap.send(server_conn.rewrite(pkt)) + elif sender == server_conn: + tuntap.send(client_conn.rewrite(pkt)) + return True + + +# client -> mitm -> server +# server -> mitm -> client + +# client: 127.0.0.1:8080 +# mitm: 127.0.0.1:8081 +# server: 127.0.0.1:8081 + + +# client -> mitm == src = client.ip, sport = client.port ;; dst = mitm.ip, dport = mitm.port +# mitm -> server == src = mitm.ip, sport = mitm.port ;; dst = server.ip, dport = server.port diff --git a/tests/cpp/ymq/py_mitm/types.py b/tests/cpp/ymq/py_mitm/types.py new file mode 100644 index 000000000..fd0ce3ddf --- /dev/null +++ b/tests/cpp/ymq/py_mitm/types.py @@ -0,0 +1,54 @@ +""" +This is the common code for implementing man in the middle in Python +""" + +import dataclasses +from typing import Protocol, Optional +from scapy.all import TunTapInterface, IP, TCP # type: ignore + + +@dataclasses.dataclass +class TCPConnection: + """ + Represents a TCP connection over the TUNTAP interface + local_ip and local_port are the mitm's ip and port, and + remote_ip and remote_port are the port for the remote peer + """ + + local_ip: str + local_port: int + remote_ip: str + remote_port: int + + def rewrite(self, pkt: IP, ack: Optional[int] = None, data=None): + """ + Rewrite a TCP/IP packet as a packet originating + from (local_ip, local_port) and going to (remote_ip, remote_port) + This function is useful for taking a packet received from one connection, and redirecting it to another + + Args: + pkt: A scapy TCP/IP packet to rewrite + ack: An optional ack number to use instead of the one found in `pkt` + data: An optional payload to use instead of the one found int `pkt` + + Returns: + The rewritten packet, suitable for sending over TUNTAP + """ + tcp = pkt[TCP] + + return ( + IP(src=self.local_ip, dst=self.remote_ip) + / TCP(sport=self.local_port, dport=self.remote_port, flags=tcp.flags, seq=tcp.seq, ack=ack or tcp.ack) + / bytes(data or tcp.payload) + ) + + +class MITMProtocol(Protocol): + def proxy( + self, + tuntap: TunTapInterface, + pkt: IP, + sender: TCPConnection, + client_conn: Optional[TCPConnection], + server_conn: TCPConnection, + ) -> bool: ... diff --git a/tests/cpp/ymq/test_cc_ymq.cpp b/tests/cpp/ymq/test_cc_ymq.cpp index f20321908..5432937a4 100644 --- a/tests/cpp/ymq/test_cc_ymq.cpp +++ b/tests/cpp/ymq/test_cc_ymq.cpp @@ -2,6 +2,10 @@ // each test case is comprised of at least one client and one server, and possibly a middleman // the clients and servers used in these tests are defined in the first part of this file // +// the men in the middle (mitm) are implemented using Python and are found in py_mitm/ +// in that directory, `main.py` is the entrypoint and framework for all the mitm, +// and the individual mitm implementations are found in their respective files +// // the test cases are at the bottom of this file, after the clients and servers // the documentation for each case is found on the TEST() definition @@ -10,6 +14,7 @@ #include #include +#include #include #include #include @@ -125,9 +130,9 @@ TestResult reconnect_server_main(std::string host, uint16_t port) auto result = syncRecvMessage(socket); RETURN_FAILURE_IF_FALSE(result.has_value()); - RETURN_FAILURE_IF_FALSE(result->payload.as_string() == "hello!!"); + RETURN_FAILURE_IF_FALSE(result->payload.as_string() == "sync"); - auto error = syncSendMessage(socket, {.address = Bytes("client"), .payload = Bytes("world!!")}); + auto error = syncSendMessage(socket, {.address = Bytes("client"), .payload = Bytes("acknowledge")}); RETURN_FAILURE_IF_FALSE(!error); context.removeIOSocket(socket); @@ -141,14 +146,39 @@ TestResult reconnect_client_main(std::string host, uint16_t port) auto socket = syncCreateSocket(context, IOSocketType::Connector, "client"); syncConnectSocket(socket, format_address(host, port)); - auto result = syncSendMessage(socket, {.address = Bytes("server"), .payload = Bytes("hello!!")}); - auto msg = syncRecvMessage(socket); - RETURN_FAILURE_IF_FALSE(msg.has_value()); - RETURN_FAILURE_IF_FALSE(msg->payload.as_string() == "world!!"); - context.removeIOSocket(socket); + // send "sync" and wait for "acknowledge" in a loop + // the mitm will send a RST after the first "sync" + // the "sync" message will be lost, but YMQ should automatically reconnect + // therefore the next "sync" message should succeed + for (size_t i = 0; i < 10; i++) { + auto error = syncSendMessage(socket, {.address = Bytes("server"), .payload = Bytes("sync")}); + RETURN_FAILURE_IF_FALSE(!error); + + auto future = futureRecvMessage(socket); + auto result = future.wait_for(1s); + if (result == std::future_status::ready) { + auto msg = future.get(); + if (!msg.has_value()) { + std::println("message error: {}", msg.error().what()); + } + RETURN_FAILURE_IF_FALSE(msg.has_value()); + std::println("received message: {}", *msg->payload.as_string()); + RETURN_FAILURE_IF_FALSE(msg->payload.as_string() == "acknowledge"); + context.removeIOSocket(socket); + + return TestResult::Success; + } else if (result == std::future_status::timeout) { + // timeout, try again + continue; + } else { + std::println("future status error"); + return TestResult::Failure; + } + } - return TestResult::Success; + std::println("failed to reconnect after 10 attempts"); + return TestResult::Failure; } TestResult client_simulated_slow_network(const char* host, uint16_t port) @@ -368,7 +398,8 @@ TEST(CcYmqTestSuite, TestBasicYMQClientYMQServer) auto host = "localhost"; auto port = 2889; - // this is the test harness, it accepts a timeout, and a list of functions to run + // this is the test harness, it accepts a timeout, a list of functions to run, + // and an optional third argument used to coordinate the execution of python (for mitm) auto result = test(10, {[=] { return basic_client_ymq(host, port); }, [=] { return basic_server_ymq(host, port); }}); @@ -382,6 +413,8 @@ TEST(CcYmqTestSuite, TestBasicRawClientYMQServer) auto host = "localhost"; auto port = 2890; + // this is the test harness, it accepts a timeout, a list of functions to run, + // and an optional third argument used to coordinate the execution of python (for mitm) auto result = test(10, {[=] { return basic_client_raw(host, port); }, [=] { return basic_server_ymq(host, port); }}); @@ -394,6 +427,8 @@ TEST(CcYmqTestSuite, TestBasicRawClientRawServer) auto host = "localhost"; auto port = 2891; + // this is the test harness, it accepts a timeout, a list of functions to run, + // and an optional third argument used to coordinate the execution of python (for mitm) auto result = test(10, {[=] { return basic_client_raw(host, port); }, [=] { return basic_server_raw(host, port); }}); @@ -417,6 +452,8 @@ TEST(CcYmqTestSuite, TestBasicDelayYMQClientRawServer) auto host = "localhost"; auto port = 2893; + // this is the test harness, it accepts a timeout, a list of functions to run, + // and an optional third argument used to coordinate the execution of python (for mitm) auto result = test(10, {[=] { return basic_client_ymq(host, port); }, [=] { return basic_server_raw(host, port); }}); @@ -438,6 +475,70 @@ TEST(CcYmqTestSuite, TestClientSendBigMessageToServer) EXPECT_EQ(result, TestResult::Success); } +// this is the no-op/passthrough man in the middle test +// for this test case we use YMQ on both the client side and the server side +// the client connects to the mitm, and the mitm connects to the server +// when the mitm receives packets from the client, it forwards it to the server without changing it +// and similarly when it receives packets from the server, it forwards them to the client +// +// the mitm is implemented in Python. we pass the name of the test case, which corresponds to the Python filename, +// and a list of arguments, which are: mitm ip, mitm port, remote ip, remote port +// this defines the address of the mitm, and the addresses that can connect to it +// for more, see the python mitm files +TEST(CcYmqTestSuite, TestMitmPassthrough) +{ + auto mitm_ip = "192.0.2.4"; + auto mitm_port = 2323; + auto remote_ip = "192.0.2.3"; + auto remote_port = 23571; + + // the Python program must be the first and only the first function passed to test() + // we must also pass `true` as the third argument to ensure that Python is fully started + // before beginning the test + auto result = test( + 20, + {[=] { return run_mitm("passthrough", mitm_ip, mitm_port, remote_ip, remote_port); }, + [=] { return basic_client_ymq(mitm_ip, mitm_port); }, + [=] { return basic_server_ymq(remote_ip, remote_port); }}, + true); + EXPECT_EQ(result, TestResult::Success); +} + +// this test uses the mitm to test the reconnect logic of YMQ by sending RST packets +TEST(CcYmqTestSuite, DISABLED_TestMitmReconnect) +{ + auto mitm_ip = "192.0.2.4"; + auto mitm_port = 2525; + auto remote_ip = "192.0.2.3"; + auto remote_port = 23575; + + auto result = test( + 10, + {[=] { return run_mitm("send_rst_to_client", mitm_ip, mitm_port, remote_ip, remote_port); }, + [=] { return reconnect_client_main(mitm_ip, mitm_port); }, + [=] { return reconnect_server_main(remote_ip, remote_port); }}, + true); + EXPECT_EQ(result, TestResult::Success); +} + +// TODO: Make this more reliable, and re-enable it +// in this test, the mitm drops a random % of packets arriving from the client and server +TEST(CcYmqTestSuite, TestMitmRandomlyDropPackets) +{ + auto mitm_ip = "192.0.2.4"; + auto mitm_port = 2828; + auto remote_ip = "192.0.2.3"; + auto remote_port = 23591; + + auto result = test( + 60, + {[=] { return run_mitm("randomly_drop_packets", mitm_ip, mitm_port, remote_ip, remote_port, {"0.3"}); }, + [=] { return basic_client_ymq(mitm_ip, mitm_port); }, + [=] { return basic_server_ymq(remote_ip, remote_port); }}, + true); + EXPECT_EQ(result, TestResult::Success); +} + // in this test the client is sending a message to the server // but we simulate a slow network connection by sending the message in segmented chunks TEST(CcYmqTestSuite, TestSlowNetwork) From eaf9201112e460c6dc77206e0ad467681eaca66f Mon Sep 17 00:00:00 2001 From: magniloquency <197707854+magniloquency@users.noreply.github.com> Date: Wed, 1 Oct 2025 20:46:47 -0400 Subject: [PATCH 5/9] Change protocol to abstract class --- tests/cpp/ymq/py_mitm/main.py | 2 +- tests/cpp/ymq/py_mitm/passthrough.py | 2 +- tests/cpp/ymq/py_mitm/randomly_drop_packets.py | 2 +- tests/cpp/ymq/py_mitm/send_rst_to_client.py | 2 +- tests/cpp/ymq/py_mitm/types.py | 6 ++++-- 5 files changed, 8 insertions(+), 6 deletions(-) diff --git a/tests/cpp/ymq/py_mitm/main.py b/tests/cpp/ymq/py_mitm/main.py index 15e8e8349..546e90d67 100644 --- a/tests/cpp/ymq/py_mitm/main.py +++ b/tests/cpp/ymq/py_mitm/main.py @@ -50,7 +50,7 @@ def create_tuntap_interface(iface_name: str, mitm_ip: str, remote_ip: str) -> Tu return iface -def main(pid: int, mitm_ip: str, mitm_port: int, remote_ip: str, server_port: int, mitm: MITMProtocol): +def main(pid: int, mitm_ip: str, mitm_port: int, remote_ip: str, server_port: int, mitm: AbstractMITM): """ This function serves as a framework for man in the middle implementations A client connects to the MITM, then the MITM connects to a remote server diff --git a/tests/cpp/ymq/py_mitm/passthrough.py b/tests/cpp/ymq/py_mitm/passthrough.py index 04a17e6ee..17e099db1 100644 --- a/tests/cpp/ymq/py_mitm/passthrough.py +++ b/tests/cpp/ymq/py_mitm/passthrough.py @@ -9,7 +9,7 @@ from typing import Optional -class MITM(MITMProtocol): +class MITM(AbstractMITM): def proxy( self, tuntap: TunTapInterface, diff --git a/tests/cpp/ymq/py_mitm/randomly_drop_packets.py b/tests/cpp/ymq/py_mitm/randomly_drop_packets.py index 6278c93c2..6aa9cc7f5 100644 --- a/tests/cpp/ymq/py_mitm/randomly_drop_packets.py +++ b/tests/cpp/ymq/py_mitm/randomly_drop_packets.py @@ -7,7 +7,7 @@ from typing import Optional -class MITM(MITMProtocol): +class MITM(AbstractMITM): def __init__(self, drop_pcent: str): self.drop_pcent = float(drop_pcent) diff --git a/tests/cpp/ymq/py_mitm/send_rst_to_client.py b/tests/cpp/ymq/py_mitm/send_rst_to_client.py index 9864b6986..11608f117 100644 --- a/tests/cpp/ymq/py_mitm/send_rst_to_client.py +++ b/tests/cpp/ymq/py_mitm/send_rst_to_client.py @@ -6,7 +6,7 @@ from typing import Optional -class MITM(MITMProtocol): +class MITM(AbstractMITM): def __init__(self): # count the number of psh-acks sent by the client self.client_pshack_counter = 0 diff --git a/tests/cpp/ymq/py_mitm/types.py b/tests/cpp/ymq/py_mitm/types.py index fd0ce3ddf..03d94fbd8 100644 --- a/tests/cpp/ymq/py_mitm/types.py +++ b/tests/cpp/ymq/py_mitm/types.py @@ -2,8 +2,9 @@ This is the common code for implementing man in the middle in Python """ +from abc import ABC, abstractmethod import dataclasses -from typing import Protocol, Optional +from typing import Optional from scapy.all import TunTapInterface, IP, TCP # type: ignore @@ -43,7 +44,8 @@ def rewrite(self, pkt: IP, ack: Optional[int] = None, data=None): ) -class MITMProtocol(Protocol): +class AbstractMITM(ABC): + @abstractmethod def proxy( self, tuntap: TunTapInterface, From b7b5df120e14a293174514be29ff82628e066506 Mon Sep 17 00:00:00 2001 From: magniloquency <197707854+magniloquency@users.noreply.github.com> Date: Wed, 1 Oct 2025 20:48:35 -0400 Subject: [PATCH 6/9] Remove usage of importlib --- tests/cpp/ymq/py_mitm/main.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/tests/cpp/ymq/py_mitm/main.py b/tests/cpp/ymq/py_mitm/main.py index 546e90d67..7cec8ed5e 100644 --- a/tests/cpp/ymq/py_mitm/main.py +++ b/tests/cpp/ymq/py_mitm/main.py @@ -7,7 +7,6 @@ import argparse import os import sys -import importlib import signal import subprocess from tests.cpp.ymq.py_mitm.types import AbstractMITM, TCPConnection @@ -167,9 +166,14 @@ def main(pid: int, mitm_ip: str, mitm_port: int, remote_ip: str, server_port: in args, unknown = parser.parse_known_args() - # add the script's directory to path - sys.path.append(os.path.dirname(os.path.realpath(__file__))) + # TODO: use `match` in Python 3.10+ + if args.testcase == "passthrough": + module = passthrough + elif args.testcase == "randomly_drop_packets": + module = randomly_drop_packets + elif args.testcase == "send_rst_to_client": + module = send_rst_to_client + else: + raise ValueError(f"Unknown testcase: {args.testcase}") - # load the module dynamically - module = importlib.import_module(args.testcase) main(args.pid, args.mitm_ip, args.mitm_port, args.remote_ip, args.server_port, module.MITM(*unknown)) From c9b650aaee8c5fc4d2ce69dddc2897490955109c Mon Sep 17 00:00:00 2001 From: magniloquency <197707854+magniloquency@users.noreply.github.com> Date: Wed, 1 Oct 2025 22:03:37 -0400 Subject: [PATCH 7/9] Update action Run tests with sudo --- .github/actions/compile-library/action.yml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.github/actions/compile-library/action.yml b/.github/actions/compile-library/action.yml index 1036b1524..d6e055a12 100644 --- a/.github/actions/compile-library/action.yml +++ b/.github/actions/compile-library/action.yml @@ -9,11 +9,16 @@ inputs: runs: using: "composite" steps: + - name: Install dependencies for MITM tests + shell: bash + run: uv pip install --system scapy==2.* + - name: Build and test C++ Components (Linux) if: inputs.os == 'Linux' shell: bash run: | CXX=$(which g++-14) ./scripts/build.sh + sudo ./scripts/test.sh - name: Build and test C++ Components (Windows) if: inputs.os == 'Windows' From 18c2dc809e85f785aae500b390eb527ed734fb52 Mon Sep 17 00:00:00 2001 From: magniloquency <197707854+magniloquency@users.noreply.github.com> Date: Wed, 1 Oct 2025 22:24:56 -0400 Subject: [PATCH 8/9] Lint --- tests/cpp/ymq/py_mitm/main.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/cpp/ymq/py_mitm/main.py b/tests/cpp/ymq/py_mitm/main.py index 7cec8ed5e..ab20f933e 100644 --- a/tests/cpp/ymq/py_mitm/main.py +++ b/tests/cpp/ymq/py_mitm/main.py @@ -6,13 +6,13 @@ import argparse import os -import sys import signal import subprocess -from tests.cpp.ymq.py_mitm.types import AbstractMITM, TCPConnection -from scapy.all import IP, TCP, TunTapInterface # type: ignore +import types from typing import List +from scapy.all import IP, TCP, TunTapInterface # type: ignore +from tests.cpp.ymq.py_mitm.types import AbstractMITM, TCPConnection from tests.cpp.ymq.py_mitm import passthrough, randomly_drop_packets, send_rst_to_client @@ -167,6 +167,7 @@ def main(pid: int, mitm_ip: str, mitm_port: int, remote_ip: str, server_port: in args, unknown = parser.parse_known_args() # TODO: use `match` in Python 3.10+ + module: types.ModuleType if args.testcase == "passthrough": module = passthrough elif args.testcase == "randomly_drop_packets": From 7b477276e120cd550ef2dd1897bb5ab213d5d884 Mon Sep 17 00:00:00 2001 From: gxu Date: Fri, 3 Oct 2025 00:04:10 +0800 Subject: [PATCH 9/9] Fix test Signed-off-by: gxu --- tests/cpp/ymq/test_cc_ymq.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/cpp/ymq/test_cc_ymq.cpp b/tests/cpp/ymq/test_cc_ymq.cpp index 5432937a4..5c8e46ea1 100644 --- a/tests/cpp/ymq/test_cc_ymq.cpp +++ b/tests/cpp/ymq/test_cc_ymq.cpp @@ -151,11 +151,11 @@ TestResult reconnect_client_main(std::string host, uint16_t port) // the mitm will send a RST after the first "sync" // the "sync" message will be lost, but YMQ should automatically reconnect // therefore the next "sync" message should succeed + auto future = futureRecvMessage(socket); for (size_t i = 0; i < 10; i++) { auto error = syncSendMessage(socket, {.address = Bytes("server"), .payload = Bytes("sync")}); RETURN_FAILURE_IF_FALSE(!error); - auto future = futureRecvMessage(socket); auto result = future.wait_for(1s); if (result == std::future_status::ready) { auto msg = future.get(); @@ -505,7 +505,7 @@ TEST(CcYmqTestSuite, TestMitmPassthrough) } // this test uses the mitm to test the reconnect logic of YMQ by sending RST packets -TEST(CcYmqTestSuite, DISABLED_TestMitmReconnect) +TEST(CcYmqTestSuite, TestMitmReconnect) { auto mitm_ip = "192.0.2.4"; auto mitm_port = 2525;