diff --git a/.gitmodules b/.gitmodules index 288c73af..5bee6416 100644 --- a/.gitmodules +++ b/.gitmodules @@ -4,3 +4,6 @@ [submodule "subprojects/json"] path = subprojects/json url = https://github.com/nlohmann/json.git +[submodule "subprojects/pcap"] + path = subprojects/pcap + url = https://github.com/seladb/PcapPlusPlus.git diff --git a/autotest/autotest.cpp b/autotest/autotest.cpp index 5a3cef86..b5acddbb 100644 --- a/autotest/autotest.cpp +++ b/autotest/autotest.cpp @@ -16,10 +16,13 @@ #include +#include "PcapFileDevice.h" #include "autotest.h" #include "common.h" +#include "common/define.h" #include "common/sdpclient.h" +#include "common/sdpcommon.h" #include "common/utils.h" #define MAX_PACK_LEN 16384 @@ -175,7 +178,7 @@ eResult tAutotest::initSharedMemory() void* shm = shm_by_key[ipcKey]; auto memaddr = (void*)((intptr_t)shm + offset); - dumpRings[name] = common::bufferring(memaddr, unitSize, unitsNumber); + dumpRings[name] = common::PacketBufferRing(memaddr, unitSize, unitsNumber); } return eResult::success; @@ -235,8 +238,7 @@ void tAutotest::sendThread(std::string interfaceName, pcap_errbuf); if (!pcap) { - YANET_LOG_ERROR("error: pcap_open_offline(): %s\n", pcap_errbuf); - throw ""; + YANET_THROW("error: pcap_open_offline(): ", pcap_errbuf); } pcap_pkthdr* header = nullptr; @@ -270,8 +272,7 @@ void tAutotest::sendThread(std::string interfaceName, if (writeIovCount(iface, iov, iov_count) < 0) { - YANET_LOG_ERROR("error: write packet(): %s\n", strerror(errno)); - throw ""; + YANET_THROW("error: write packet(): ", strerror(errno)); } packetsCount++; @@ -326,8 +327,7 @@ static bool readPacket(int fd, pcap_pkthdr* header, u_char* data, Duration timel if (hdr.data_length == 0) { - YANET_LOG_ERROR("error: read size is 0\n"); - throw ""; + YANET_THROW("error: read size is 0"); } if (!readTimeLimited(fd, data, hdr.data_length, time_to_give_up)) @@ -416,16 +416,14 @@ class PcapDumper if (!pcap) { - YANET_LOG_ERROR("error: pcap_open_dead()\n"); - throw ""; + YANET_THROW("error: pcap_open_dead()"); } dumper = pcap_dump_open(pcap, tmpFilePath.data()); if (!dumper) { pcap_close(pcap); - YANET_LOG_ERROR("error: pcap_dump_open()\n"); - throw ""; + YANET_THROW("error: pcap_dump_open()"); } } @@ -484,8 +482,7 @@ class pcap_expectation pcap = pcap_open_offline(filename.c_str(), pcap_errbuf); if (!pcap) { - YANET_LOG_ERROR("error: pcap_open_offline(): %s\n", pcap_errbuf); - throw ""; + YANET_THROW("error: pcap_open_offline(): ", pcap_errbuf); } memset(&header, 0, sizeof(struct pcap_pkthdr)); advance(); @@ -609,8 +606,7 @@ void tAutotest::recvThread(std::string interfaceName, auto now = std::chrono::system_clock::now(); if (now > time_to_give_up) { - YANET_LOG_ERROR("error[%s]: step time limit exceeded\n", interfaceName.data()); - throw ""; + YANET_THROW("error[", interfaceName, "]: step time limit exceeded"); } if (!readPacket(iface, &tmp_pcap_packetHeader, buffer, time_to_give_up - now)) { @@ -633,11 +629,7 @@ void tAutotest::recvThread(std::string interfaceName, packetsCount + 1, buf.str().data()); - YANET_LOG_ERROR("pcap[%s]: %s\n", - interfaceName.data(), - pcapDumper.path().data()); - - throw ""; + YANET_LOG_ERROR("pcap[%s]: %s\n", interfaceName.data(), pcapDumper.path().data()); } if (dumpPackets) @@ -735,8 +727,6 @@ void tAutotest::recvThread(std::string interfaceName, YANET_LOG_ERROR("pcap[%s]: %s\n", interfaceName.data(), pcapDumper.path().data()); - - throw ""; } unlink(pcapDumper.path().data()); @@ -960,7 +950,7 @@ bool tAutotest::step_sendPackets(const YAML::Node& yamlStep, if (!success) { - throw ""; + YANET_THROW(""); } return true; @@ -1327,8 +1317,7 @@ void tAutotest::mainThread() const auto result = controlPlane.loadConfig(request); if (result != eResult::success) { - YANET_LOG_ERROR("invalid config: eResult %d\n", static_cast(result)); - throw ""; + YANET_THROW("invalid config: eResult ", common::result_to_c_str(result)); } controlPlane.rib_flush(); @@ -1483,13 +1472,12 @@ void tAutotest::mainThread() } else { - YANET_LOG_ERROR("unknown step\n"); - throw ""; + YANET_THROW("unknown step"); } if (!result) { - throw ""; + YANET_THROW(""); } } } @@ -1894,14 +1882,16 @@ bool tAutotest::step_cli_check(const YAML::Node& yamlStep) return true; } -common::bufferring::item_t* read_shm_packet(common::bufferring* buffer, uint64_t position) +common::PacketBufferRing::item_t* read_shm_packet(common::PacketBufferRing* buffer, uint64_t position) { - if (position >= buffer->ring->header.after) + common::PacketBufferRing::ring_t* ring = buffer->ring; + + if (position >= ring->header.after) { return nullptr; } - auto* item = (common::bufferring::item_t*)((uintptr_t)buffer->ring->memory + (position * buffer->unit_size)); - return item; + + return utils::ShiftBuffer(ring->memory, position * buffer->unit_size); } bool tAutotest::step_dumpPackets(const YAML::Node& yamlStep, @@ -1914,86 +1904,99 @@ bool tAutotest::step_dumpPackets(const YAML::Node& yamlStep, std::string expectFilePath = path + "/" + yamlDump["expect"].as(); bool success = true; - common::bufferring* ring = nullptr; + common::PacketBufferRing* ring = nullptr; { /// searching memory ring by tag auto it = dumpRings.find(tag); if (it == dumpRings.end()) { - YANET_LOG_ERROR("dump [%s]: error: dump ring not found\n", tag.data()); - throw ""; + YANET_THROW("dump [", tag, "]: error: dump ring not found"); } ring = &it->second; } - pcap_t* pcap = nullptr; - { /// open pcap file with expected data - char pcap_errbuf[PCAP_ERRBUF_SIZE]; - pcap = pcap_open_offline(expectFilePath.data(), pcap_errbuf); - if (!pcap) - { - YANET_LOG_ERROR("dump [%s]: error: pcap_open_offline(): %s\n", tag.data(), pcap_errbuf); - throw ""; - } + // Open pcap file using PcapPlusPlus + pcpp::IFileReaderDevice* reader = pcpp::IFileReaderDevice::getReader(expectFilePath); + if (reader == nullptr) + { + YANET_THROW("dump [", tag, "]: error: cannot determine reader for file ", expectFilePath); } - struct pcap_pkthdr header; - const u_char* pcap_packet = nullptr; - common::bufferring::item_t* shm_packet = nullptr; + if (!reader->open()) + { + YANET_THROW("dump [", tag, "]: error: cannot open pcap file", expectFilePath); + } + + pcpp::RawPacket rawPacket; + common::PacketBufferRing::item_t* shm_packet; uint64_t position = 0; /// read packets from pcap and compare them with packets from memory ring - while ((pcap_packet = pcap_next(pcap, &header))) + while (reader->getNextPacket(rawPacket)) { shm_packet = read_shm_packet(ring, position); position++; - if (shm_packet && header.len == shm_packet->header.size && - memcmp(shm_packet->memory, pcap_packet, header.len) == 0) - { /// packets are the same - continue; + if (!shm_packet) + { + success = false; + YANET_LOG_ERROR("dump [%s]: error: missing packet #%lu in shared memory\n", tag.data(), position); + break; } - /// packets are different, so... - success = false; - YANET_LOG_ERROR("dump [%s]: error: wrong packet #%lu (%s)\n", - tag.data(), - position, - expectFilePath.data()); - - if (dumpPackets && shm_packet) + // Compare the packet data + if (static_cast(rawPacket.getRawDataLen()) == shm_packet->header.size && + memcmp(rawPacket.getRawData(), shm_packet->memory, rawPacket.getRawDataLen()) == 0) { - YANET_LOG_DEBUG("dump [%s]: expected %u, got %u\n", tag.data(), header.len, shm_packet->header.size); - dumper.dump(pcap_packet, pcap_packet + shm_packet->header.size, shm_packet->memory, shm_packet->memory + header.len); + /// packets are the same + continue; } - } - - /// read the remaining packets from memory ring - for (;;) - { - shm_packet = read_shm_packet(ring, position); - if (!shm_packet) + else { + /// packets are different + success = false; + YANET_LOG_ERROR("dump [%s]: error: packet #%lu does not match (%s)\n", + tag.data(), + position, + expectFilePath.data()); + + if (dumpPackets) + { + YANET_LOG_DEBUG("dump [%s]: expected %u bytes, got %u bytes\n", + tag.data(), + rawPacket.getRawDataLen(), + shm_packet->header.size); + dumper.dump(rawPacket.getRawData(), + rawPacket.getRawData() + rawPacket.getRawDataLen(), + shm_packet->memory, + shm_packet->memory + shm_packet->header.size); + } break; } - position++; + } + /// Check for extra packets in shared memory + shm_packet = read_shm_packet(ring, position); + if (shm_packet) + { success = false; + YANET_LOG_ERROR("dump [%s]: error: extra packet #%lu in shared memory\n", tag.data(), position + 1); if (dumpPackets) { - YANET_LOG_DEBUG("dump [%s]: unexpected %u\n", tag.data(), shm_packet->header.size); - dumper.dump(nullptr, nullptr, shm_packet->memory, shm_packet->memory + header.len); + YANET_LOG_DEBUG("dump [%s]: unexpected packet size %u bytes\n", + tag.data(), + shm_packet->header.size); + dumper.dump(nullptr, nullptr, shm_packet->memory, shm_packet->memory + shm_packet->header.size); } } - YANET_LOG_DEBUG("dump [%s]: recv %lu packets\n", tag.data(), position); + YANET_LOG_DEBUG("dump [%s]: compared %lu packets\n", tag.data(), position); - pcap_close(pcap); + reader->close(); if (!success) { - YANET_LOG_ERROR("dump [%s]: error: unknown packet (%s)\n", tag.data(), expectFilePath.data()); - throw ""; + YANET_THROW("dump [", tag, "]: error: packet comparison failed", expectFilePath); } } diff --git a/autotest/autotest.h b/autotest/autotest.h index 1725a16c..18f977e8 100644 --- a/autotest/autotest.h +++ b/autotest/autotest.h @@ -107,7 +107,8 @@ class tAutotest pcaps; std::tuple rawShmInfo; - std::map dumpRings; + // TODO: this should be DumpRingBase instead of PacketBufferRing. + std::map dumpRings; std::vector threads; volatile bool flagStop; diff --git a/autotest/meson.build b/autotest/meson.build index ae20e0ec..552cf894 100644 --- a/autotest/meson.build +++ b/autotest/meson.build @@ -2,17 +2,20 @@ sources = files('autotest.cpp', 'main.cpp') dependencies = [] +dependencies += libdpdk.get_variable('dpdk_dep') dependencies += libjson.get_variable('nlohmann_json_dep') dependencies += dependency('libsystemd') dependencies += dependency('yaml-cpp', static: true) dependencies += dependency('libpcap', static: true) dependencies += dependency('gmock') +dependencies += pcapplusplus_deps executable('yanet-autotest', sources, include_directories: yanet_rootdir, dependencies: dependencies, link_args : ['-lstdc++fs'], + # override_options: 'b_lto=false', install: true) install_data('yanet-autotest-run.py', install_dir: get_option('bindir')) diff --git a/cli/bus.h b/cli/bus.h index 807c54ac..b5beea8c 100644 --- a/cli/bus.h +++ b/cli/bus.h @@ -47,6 +47,7 @@ inline std::vector get_bus_requests(common::sdp::DataPlaneInSh {common::idp::requestType::update_vip_vport_proto, "update_vip_vport_proto"}, {common::idp::requestType::version, "version"}, {common::idp::requestType::get_shm_info, "get_shm_info"}, + {common::idp::requestType::hexdump_ring, "hexdump_ring"}, {common::idp::requestType::get_shm_tsc_info, "get_shm_tsc_info"}, {common::idp::requestType::set_shm_tsc_state, "set_shm_tsc_state"}, {common::idp::requestType::dump_physical_port, "dump_physical_port"}, diff --git a/cli/main.cpp b/cli/main.cpp index 4d20e965..5d9832ce 100644 --- a/cli/main.cpp +++ b/cli/main.cpp @@ -110,6 +110,8 @@ std::vector", [](const auto& args) { Call(show::hexdump_ring, args); }}, + {}, {"samples show", "", [](const auto& args) { Call(show::samples, args); }}, {"samples dump", "", [](const auto& args) { Call(show::samples_dump, args); }}, {}, diff --git a/cli/show.h b/cli/show.h index 2a4325ef..53e80b41 100644 --- a/cli/show.h +++ b/cli/show.h @@ -854,6 +854,14 @@ inline void shm_info() response); } +inline void hexdump_ring(const std::string& ring) +{ + interface::dataPlane dataplane; + const auto& response = dataplane.hexdump_ring(ring); + std::cout << "Hexdump for the dump ring " << ring << ":\n" + << response.hexdumped_ring << std::endl; +} + void shm_tsc_info() { interface::dataPlane dataplane; diff --git a/cli/telegraf.h b/cli/telegraf.h index 577d5f0a..6169fe2a 100644 --- a/cli/telegraf.h +++ b/cli/telegraf.h @@ -710,8 +710,8 @@ void main_counters() for (const auto& [coreId, worker_info] : sdp_data.workers) { std::vector values; - auto* buffer = common::sdp::ShiftBuffer(worker_info.buffer, - sdp_data.metadata_worker.start_counters); + auto* buffer = utils::ShiftBuffer(worker_info.buffer, + sdp_data.metadata_worker.start_counters); for (const auto& [name, index] : sdp_data.metadata_worker.counter_positions) { values.emplace_back(name.data(), buffer[index]); @@ -722,8 +722,8 @@ void main_counters() for (const auto& [coreId, worker_info] : sdp_data.workers_gc) { std::vector values; - auto* buffer = common::sdp::ShiftBuffer(worker_info.buffer, - sdp_data.metadata_worker.start_counters); + auto* buffer = utils::ShiftBuffer(worker_info.buffer, + sdp_data.metadata_worker.start_counters); for (const auto& [name, index] : sdp_data.metadata_worker_gc.counter_positions) { values.emplace_back(name.data(), buffer[index]); diff --git a/common/bufferring.h b/common/bufferring.h index a27d2b98..1db01144 100644 --- a/common/bufferring.h +++ b/common/bufferring.h @@ -1,4 +1,8 @@ +#pragma once + +#include #include +#include namespace common { @@ -18,14 +22,44 @@ namespace common // ring_header_t: "b" -- before ... -- padding // "a" -- after // ... -- padding -class bufferring +struct PacketBufferRing { -public: - bufferring() = default; - bufferring(void* memory, int unit_size, int units_number) : - unit_size(unit_size), - units_number(units_number), ring((ring_t*)memory) + PacketBufferRing() = default; + + // static function, helps to get capacity in DumpRingRaw + static size_t GetCapacity(size_t ring_size, size_t item_count, size_t unit_size = 0) + { + if (unit_size == 0) + { + unit_size = sizeof(item_header_t) + ring_size; + + if (unit_size % RTE_CACHE_LINE_SIZE != 0) + { + unit_size += RTE_CACHE_LINE_SIZE - unit_size % RTE_CACHE_LINE_SIZE; /// round up + } + } + + size_t capacity = sizeof(ring_header_t) + unit_size * item_count; + + if (capacity % RTE_CACHE_LINE_SIZE != 0) + { + capacity += RTE_CACHE_LINE_SIZE - capacity % RTE_CACHE_LINE_SIZE; /// round up + } + + return capacity; + } + + PacketBufferRing(void* memory, size_t ring_size, size_t item_count) : + unit_size(sizeof(item_header_t) + ring_size), units_number(item_count) { + if (unit_size % RTE_CACHE_LINE_SIZE != 0) + { + unit_size += RTE_CACHE_LINE_SIZE - unit_size % RTE_CACHE_LINE_SIZE; /// round up + } + + capacity = GetCapacity(ring_size, item_count, unit_size); + + ring = (ring_t*)memory; } struct ring_header_t @@ -55,9 +89,9 @@ class bufferring uint8_t memory[]; }; - int unit_size; - int units_number; + size_t unit_size; + size_t units_number; + size_t capacity; ring_t* ring; }; - } diff --git a/common/define.h b/common/define.h index 47cc5fd0..4350cd53 100644 --- a/common/define.h +++ b/common/define.h @@ -7,6 +7,7 @@ #include #include #include +#include #include /** @@ -104,16 +105,20 @@ extern LogPriority logPriority; #define CALCULATE_LOGICALPORT_ID(portId, vlanId) ((portId << 13) | ((vlanId & 0xFFF) << 1) | 1) +template +void YANET_THROW(Args&&... args) +{ + std::ostringstream oss; + (oss << ... << std::forward(args)); + const std::string message = oss.str(); + #if __cpp_exceptions -#define YANET_THROW(message) throw std::runtime_error(std::string(message)) + throw std::runtime_error(message); #else // __cpp_exceptions -#define YANET_THROW(message) \ - do \ - { \ - YANET_LOG_ERROR("%s\n", std::string_view(message).data()); \ - std::abort(); \ - } while (0) + YANET_LOG_ERROR("%s\n", message.c_str()); + std::abort(); #endif // __cpp_exceptions +} #define YANET_RIB_PRIORITY_DEFAULT ((uint32_t)10000) #define YANET_RIB_PRIORITY_ROUTE_TUNNEL_FALLBACK ((uint32_t)11000) diff --git a/common/idataplane.h b/common/idataplane.h index dc64ba16..156224d8 100644 --- a/common/idataplane.h +++ b/common/idataplane.h @@ -185,6 +185,11 @@ class dataPlane return get(); } + auto hexdump_ring(const common::idp::hexdump_ring::request& request) const + { + return get(request); + } + auto get_shm_tsc_info() const { return get(); diff --git a/common/idp.h b/common/idp.h index fbf2920b..1bf7ebc6 100644 --- a/common/idp.h +++ b/common/idp.h @@ -67,6 +67,7 @@ enum class requestType : uint32_t update_vip_vport_proto, version, get_shm_info, + hexdump_ring, get_shm_tsc_info, set_shm_tsc_state, dump_physical_port, @@ -854,6 +855,26 @@ using dump_meta = std::tuple; } +namespace hexdump_ring +{ +using request = std::string; // ring tag + +struct response +{ + std::string hexdumped_ring; + + void pop(common::stream_in_t& stream) + { + stream.pop(hexdumped_ring); + } + + void push(common::stream_out_t& stream) const + { + stream.push(hexdumped_ring); + } +}; +} + namespace get_shm_tsc_info { using tsc_meta = std::tuple>; +// Oh no, this is so bad.. We can't have same types as a responces, i.e right now we can't have two +// commands with std::string as responce. Need to refactor this whole thing. +// We can just use structures, I guess..? Need to think about this more. using response = std::variant, updateGlobalBase::response, ///< + others which have eResult as response getGlobalBase::response, @@ -1025,6 +1050,7 @@ using response = std::variant, samples::response, hitcount_dump::response, get_shm_info::response, + hexdump_ring::response, get_shm_tsc_info::response, neighbor_show::response, neighbor_stats::response, diff --git a/common/sdpclient.h b/common/sdpclient.h index 990cb31a..0b2010c6 100644 --- a/common/sdpclient.h +++ b/common/sdpclient.h @@ -131,7 +131,7 @@ class SdpClient shift); return eResult::errorInitSharedMemory; } - iter.second.buffer = ShiftBuffer(buffer, shift); + iter.second.buffer = ShiftBuffer(buffer, shift); } for (auto& iter : sdp_data.workers_gc) { @@ -147,7 +147,7 @@ class SdpClient shift); return eResult::errorInitSharedMemory; } - iter.second.buffer = ShiftBuffer(buffer, shift); + iter.second.buffer = ShiftBuffer(buffer, shift); } return eResult::success; @@ -177,8 +177,8 @@ class SdpClient { if (!core_id.has_value() || worker_core_id == core_id) { - auto* counters = common::sdp::ShiftBuffer(worker_info.buffer, - sdp_data.metadata_worker.start_counters); + auto* counters = ShiftBuffer(worker_info.buffer, + sdp_data.metadata_worker.start_counters); result[worker_core_id] = counters[index]; } } @@ -193,8 +193,8 @@ class SdpClient { if (!core_id.has_value() || worker_core_id == core_id) { - auto* counters = common::sdp::ShiftBuffer(worker_info.buffer, - sdp_data.metadata_worker.start_counters); + auto* counters = ShiftBuffer(worker_info.buffer, + sdp_data.metadata_worker.start_counters); result[worker_core_id] = counters[index]; } } @@ -232,8 +232,8 @@ class SdpClient std::vector buffers; for (const auto& iter : sdp_data.workers) { - buffers.push_back(common::sdp::ShiftBuffer(iter.second.buffer, - sdp_data.metadata_worker.start_counters)); + buffers.push_back(ShiftBuffer(iter.second.buffer, + sdp_data.metadata_worker.start_counters)); } for (size_t i = 0; i < counter_ids.size(); i++) @@ -429,7 +429,7 @@ class SdpClient static uint64_t ReadValue(void* buffer, uint64_t index) { - auto* data = common::sdp::ShiftBuffer(buffer, index * sizeof(uint64_t)); + auto* data = ShiftBuffer(buffer, index * sizeof(uint64_t)); uint64_t result = 0; for (int i = 0; i < 8; i++) { @@ -443,9 +443,9 @@ class SdpClient values.clear(); for (uint64_t index = 0; index < count; index++) { - void* current = common::sdp::ShiftBuffer(buffer, shift + 128 * index); + void* current = ShiftBuffer(buffer, shift + 128 * index); uint64_t value = ReadValue(current, 0); - char* str = common::sdp::ShiftBuffer(current, 8); + char* str = ShiftBuffer(current, 8); if (str[119] != 0) { // 119 - index of last symbol diff --git a/common/sdpcommon.h b/common/sdpcommon.h index 0fda626d..721dd140 100644 --- a/common/sdpcommon.h +++ b/common/sdpcommon.h @@ -6,6 +6,7 @@ #include "define.h" #include "idp.h" +#include "utils.h" // #define YANET_USE_POSIX_SHARED_MEMORY @@ -87,6 +88,8 @@ Block for worker_gc namespace common::sdp { +using utils::ShiftBuffer; + #ifdef YANET_USE_POSIX_SHARED_MEMORY inline std::string FileNameWorkerOnNumaNode(tSocketId socket_id) { @@ -94,12 +97,6 @@ inline std::string FileNameWorkerOnNumaNode(tSocketId socket_id) } #endif -template -inline TResult ShiftBuffer(TBuffer buffer, uint64_t size) -{ - return reinterpret_cast((reinterpret_cast(buffer) + size)); -} - template bool MapsEqual(const std::map& left, const std::map& right) { @@ -224,9 +221,9 @@ struct DataPlaneInSharedMemory { auto count_errors = static_cast(common::idp::errorType::size); auto count_requests = static_cast(common::idp::requestType::size); - auto* requests = common::sdp::ShiftBuffer(dataplane_data, start_bus_section); - auto* errors = common::sdp::ShiftBuffer(dataplane_data, start_bus_section + count_requests * sizeof(uint64_t)); - auto* durations = common::sdp::ShiftBuffer(dataplane_data, start_bus_section + (count_requests + count_errors) * sizeof(uint64_t)); + auto* requests = ShiftBuffer(dataplane_data, start_bus_section); + auto* errors = ShiftBuffer(dataplane_data, start_bus_section + count_requests * sizeof(uint64_t)); + auto* durations = ShiftBuffer(dataplane_data, start_bus_section + (count_requests + count_errors) * sizeof(uint64_t)); return {requests, errors, durations}; } }; diff --git a/common/type.h b/common/type.h index 051ef803..29d7d792 100644 --- a/common/type.h +++ b/common/type.h @@ -269,9 +269,7 @@ class ipv4_address_t // i.e. 192.168.0.010 if (inet_aton(string.data(), (struct in_addr*)&address) != 1) { - std::ostringstream error; - error << "'" << string << "' is not a valid IPv4 address"; - YANET_THROW(error.str()); + YANET_THROW("'", string, "' is not a valid IPv4 address"); } address = ntohl(address); } @@ -383,9 +381,7 @@ class ipv6_address_t { if (inet_pton(AF_INET6, string.data(), address.data()) != 1) { - std::ostringstream error; - error << "'" << string << "' is not a valid IPv6 address"; - YANET_THROW(error.str()); + YANET_THROW("'", string, "' is not a valid IPv6 address"); } } @@ -839,24 +835,18 @@ class ipv4_prefix_with_announces_t ipv4_prefix_t announce{announceRaw.get()}; if (!announce.isValid()) { - std::ostringstream error; - error << "prefix has invalid announce: '" << announce.toString() - << "' that isn' t a subnet of prefix "; - YANET_THROW(error.str()); + YANET_THROW("prefix has invalid announce: '", announce.toString(), "' that isn' t a subnet of prefix"); } if (!announce.subnetOf(prefix)) { - std::ostringstream error; - error << "prefix: '" << prefix.toString() << "' has announce: '" - << announce.toString() << "' that isn' t a subnet of prefix "; - YANET_THROW(error.str()); + YANET_THROW("prefix: '", prefix.toString(), "' has announce: '", announce.toString(), "' that isn' t a subnet of prefix"); } announces.emplace_back(std::move(announce)); } } else { - YANET_THROW(std::string("prefix has invalid type")); + YANET_THROW("prefix has invalid type"); } } @@ -1089,24 +1079,18 @@ class ipv6_prefix_with_announces_t ipv6_prefix_t announce{announceRaw.get()}; if (!announce.isValid()) { - std::ostringstream error; - error << "prefix has invalid announce: '" << announce.toString() - << "' that isn' t a subnet of prefix "; - YANET_THROW(error.str()); + YANET_THROW("prefix has invalid announce: '", announce.toString(), "' that isn' t a subnet of prefix"); } if (!announce.subnetOf(prefix)) { - std::ostringstream error; - error << "prefix: '" << prefix.toString() << "' has announce: '" - << announce.toString() << "' that isn' t a subnet of prefix "; - YANET_THROW(error.str()); + YANET_THROW("prefix: '", prefix.toString(), "' has announce: '", announce.toString(), "' that isn' t a subnet of prefix"); } announces.emplace_back(std::move(announce)); } } else { - YANET_THROW(std::string("prefix has invalid type")); + YANET_THROW("prefix has invalid type"); } } diff --git a/common/utils.h b/common/utils.h index 20e20696..24213e99 100644 --- a/common/utils.h +++ b/common/utils.h @@ -1,11 +1,20 @@ #pragma once #include +#include +#include #include namespace utils { +template +TResult ShiftBuffer(void* buffer, size_t size) +{ + static_assert(std::is_pointer_v, "TResult must be a pointer type."); + return reinterpret_cast(static_cast(buffer) + size); +} + // Utility to calculate percentage // TODO C++20: use std::type_identity_t to establish non-deduced context // Will allow to do `to_percent(4.2, 1)` @@ -46,5 +55,56 @@ inline std::vector split(const std::string& str, char delimiter) return split(std::string_view(str), delimiter); } +inline std::string hexdump(std::string_view data) +{ + std::ostringstream oss; + oss << std::hex << std::setfill('0'); // Set hexadecimal formatting and fill character + + const size_t size = data.size(); + + for (size_t offset = 0; offset < size; offset += 16) + { + // Output the offset + oss << std::setw(8) << offset << " "; + + // Prepare ASCII representation + std::string ascii_representation; + ascii_representation.reserve(16); + + const size_t line_size = std::min(size - offset, size_t(16)); + + for (size_t i = 0; i < 16; ++i) + { + // Add extra space after 8 bytes + if (i == 8) + { + oss << " "; + } + else if (i != 0) + { + oss << ' '; + } + + if (i < line_size) + { + const auto byte = static_cast(data[offset + i]); + oss << std::setw(2) << static_cast(byte); + + ascii_representation += std::isprint(byte) ? byte : '.'; + } + else + { + // Fill in spaces for alignment if line is shorter than 16 bytes + oss << " "; + ascii_representation += ' '; + } + } + + // Append ASCII representation + oss << " |" << ascii_representation << "|\n"; + } + + return oss.str(); +} } // namespace utils diff --git a/controlplane/controlplane.cpp b/controlplane/controlplane.cpp index ae31b56d..b74dc7f0 100644 --- a/controlplane/controlplane.cpp +++ b/controlplane/controlplane.cpp @@ -1004,7 +1004,7 @@ std::vector cControlPlane::getAclCounters() uint64_t start_acl_counters = sdp_data.metadata_worker.start_acl_counters; for (const auto& iter : sdp_data.workers) { - auto* aclCounters = common::sdp::ShiftBuffer(iter.second.buffer, start_acl_counters); + auto* aclCounters = utils::ShiftBuffer(iter.second.buffer, start_acl_counters); for (size_t i = 0; i < YANET_CONFIG_ACL_COUNTERS_SIZE; i++) { response[i] += aclCounters[i]; diff --git a/controlplane/telegraf.cpp b/controlplane/telegraf.cpp index 05dc955f..3817148b 100644 --- a/controlplane/telegraf.cpp +++ b/controlplane/telegraf.cpp @@ -304,7 +304,7 @@ common::icp::telegraf_other::response telegraf_t::telegraf_other() { std::array bursts; auto* worker_bursts = - common::sdp::ShiftBuffer(worker_info.buffer, sdp_data->metadata_worker.start_bursts); + utils::ShiftBuffer(worker_info.buffer, sdp_data->metadata_worker.start_bursts); memcpy(&bursts[0], worker_bursts, sizeof(uint64_t) * (CONFIG_YADECAP_MBUFS_BURST_SIZE + 1)); currWorkers[coreId] = bursts; } diff --git a/dataplane/action_dispatcher.h b/dataplane/action_dispatcher.h index b3c3b771..87ef3053 100644 --- a/dataplane/action_dispatcher.h +++ b/dataplane/action_dispatcher.h @@ -88,8 +88,11 @@ struct ActionDispatcher return; } - auto& ring = args.worker->dumpRings[ring_id]; - ring.write(args.mbuf, flow.type); + cWorker* worker = args.worker; + + // polymorphic, will execute either DumpRingRaw or DumpRingPcap method, + // likely to be devirtualized + worker->dump_rings[ring_id]->Write(args.mbuf, flow.type, worker->CurrentTime()); } static void execute(const common::StateTimeoutAction& action, const Flow& flow, const ActionDispatcherArgs& args) diff --git a/dataplane/bus.cpp b/dataplane/bus.cpp index 081aae2a..c24ebdb2 100644 --- a/dataplane/bus.cpp +++ b/dataplane/bus.cpp @@ -337,6 +337,10 @@ void cBus::clientThread(int clientSocket) { response = callWithResponse(&cControlPlane::get_shm_info, request); } + else if (type == common::idp::requestType::hexdump_ring) + { + response = callWithResponse(&cControlPlane::hexdump_ring, request); + } else if (type == common::idp::requestType::get_shm_tsc_info) { response = callWithResponse(&cControlPlane::get_shm_tsc_info, request); diff --git a/dataplane/config.h b/dataplane/config.h new file mode 100644 index 00000000..c7fd37f6 --- /dev/null +++ b/dataplane/config.h @@ -0,0 +1,103 @@ +#pragma once + +#include "common/define.h" +#include "common/type.h" +#include "rte_ethdev.h" +#include +#include + +using InterfaceName = std::string; + +struct CPlaneWorkerConfig +{ + std::set interfaces; + std::set workers; + std::set gcs; +}; + +struct tDataPlaneConfig +{ + enum class DumpFormat + { + kRaw, + kPcap + }; + + // TODO: add here path, prefix, pcap files count? like std::variant if format == pcap? + struct DumpConfig + { + DumpFormat format; + unsigned int size; + unsigned int count; + }; + + static DumpFormat StringToDumpFormat(const std::string& format_str) + { + if (format_str == "raw") + return DumpFormat::kRaw; + else if (format_str == "pcap") + return DumpFormat::kPcap; + + YANET_LOG_WARNING("Invalid dump format %s, will use raw format", format_str.data()); + return DumpFormat::kRaw; + } + + /* + DPDK ports used by `dataplane`. + Each port has a name with which is exposed into host system + and an identifier (typically pci id) used to lookup the port within + DPDK. + */ + std::map> + ports; + + std::set workerGCs; + tCoreId controlPlaneCoreId; + std::map> controlplane_workers; + std::map> workers; + bool useHugeMem = true; + bool use_kernel_interface = true; + bool interfaces_required = true; + uint64_t rssFlags = RTE_ETH_RSS_IP; + uint32_t SWNormalPriorityRateLimitPerWorker = 0; + uint32_t SWICMPOutRateLimit = 0; + uint32_t rateLimitDivisor = 1; + std::string memory; + std::map shared_memory; + + std::vector ealArgs; + std::set WorkersInterfaces(std::set cores) + { + std::set ifaces; + for (auto core : cores) + { + auto worker = workers.at(core); + ifaces.insert(worker.begin(), worker.end()); + } + return ifaces; + } + std::map VdevQueues() + { + std::map total; + for (auto& [_, cores] : controlplane_workers) + { + YANET_GCC_BUG_UNUSED(_); + std::set ifaces; + for (auto core : cores) + { + const auto& w = workers.at(core); + ifaces.insert(w.begin(), w.end()); + } + for (auto& iface : ifaces) + { + ++total[iface]; + } + } + return total; + } +}; diff --git a/dataplane/controlplane.cpp b/dataplane/controlplane.cpp index 1171b975..56c47f87 100644 --- a/dataplane/controlplane.cpp +++ b/dataplane/controlplane.cpp @@ -10,6 +10,7 @@ #include #include "common.h" +#include "common/utils.h" #include "common/version.h" #include "dataplane.h" #include "dataplane/worker_gc.h" @@ -1025,6 +1026,7 @@ common::idp::version::response cControlPlane::version() version_custom()}; } +// FIXME: just return dataPlane->getShmInfo() common::idp::get_shm_info::response cControlPlane::get_shm_info() { common::idp::get_shm_info::response response; @@ -1036,6 +1038,76 @@ common::idp::get_shm_info::response cControlPlane::get_shm_info() return response; } +// I won't need this.. +common::idp::hexdump_ring::response cControlPlane::hexdump_ring(const common::idp::hexdump_ring::request& request) +{ + common::idp::hexdump_ring::response response; +#if 0 + const std::string& requested_tag = request; + + std::string combined_hexdump; + + auto ring_id_it = dataPlane->tag_to_id.find(requested_tag); + if (ring_id_it == dataPlane->tag_to_id.end()) + { + YANET_LOG_ERROR("Tag '%s' not found\n", requested_tag.c_str()); + return response; + } + uint64_t ring_id = ring_id_it->second; + + for (cWorker* worker : dataPlane->workers_vector) + { + auto ring = worker->dump_rings[ring_id]; + + auto addr = reinterpret_cast(ring.buffer.ring); + + // Get the `after` counter to determine the end of valid data + uint64_t after = ring.buffer.ring->header.after; + + // Calculate the size of the valid data to dump + size_t valid_data_size = sizeof(sharedmemory::ring_header_t) + after * ring.buffer.unit_size; + + // Include worker-specific details in the hexdump + combined_hexdump += "Worker Core ID: " + std::to_string(worker->coreId) + "\n"; + combined_hexdump += "After: " + std::to_string(after) + "\n"; + combined_hexdump += utils::hexdump(std::string_view(addr, valid_data_size)); + combined_hexdump += "\n"; + } + + response.hexdumped_ring = combined_hexdump; +#endif + +#if 0 + common::idp::get_shm_info::response shm_info = dataPlane->getShmInfo(); + + for (const auto& [name, tag, size, count, core_id, socket_id, shm_key, offset] : shm_info) + { + if (tag != requested_tag) + continue; + + int shmid = shmget(shm_key, 0, 0); + if (shmid == -1) + { + YANET_LOG_ERROR("Error on shmget(%d, 0, 0) = %d\n", shm_key, errno); + return {}; + } + + void* shmaddr = shmat(shmid, nullptr, 0); + if (shmaddr == reinterpret_cast(-1)) + { + YANET_LOG_ERROR("Error on shmat(%d, NULL, 0) = %d\n", shmid, errno); + return {}; + } + + auto addr = utils::ShiftBuffer(shmaddr, offset); + } + + response.hexdumped_ring = //hexdump addr, size (what size) here; +#endif + + return response; +} + common::idp::get_shm_tsc_info::response cControlPlane::get_shm_tsc_info() { common::idp::get_shm_tsc_info::response response; diff --git a/dataplane/controlplane.h b/dataplane/controlplane.h index 643830fa..49bc3478 100644 --- a/dataplane/controlplane.h +++ b/dataplane/controlplane.h @@ -80,6 +80,7 @@ class cControlPlane ///< @todo: move to cDataPlane common::idp::version::response version(); common::idp::nat64stateful_state::response nat64stateful_state(const common::idp::nat64stateful_state::request& request); common::idp::get_shm_info::response get_shm_info(); + common::idp::hexdump_ring::response hexdump_ring(const common::idp::hexdump_ring::request& request); common::idp::get_shm_tsc_info::response get_shm_tsc_info(); eResult dump_physical_port(const common::idp::dump_physical_port::request& request); eResult balancer_state_clear(); diff --git a/dataplane/dataplane.cpp b/dataplane/dataplane.cpp index 0a0df910..28a97fb7 100644 --- a/dataplane/dataplane.cpp +++ b/dataplane/dataplane.cpp @@ -37,6 +37,7 @@ #include "dataplane.h" #include "dataplane/sdpserver.h" #include "globalbase.h" +#include "sharedmemory.h" #include "sock_dev.h" #include "work_runner.h" #include "worker.h" @@ -700,7 +701,7 @@ eResult cDataPlane::init_kernel_interfaces() bool cDataPlane::KNIAddTxQueue(KniHandleBundle& bundle, tQueueId queue, tSocketId socket) { auto& [_, fwd, in, out, drop] = bundle; - (void)_; + YANET_GCC_BUG_UNUSED(_); return fwd.SetupTxQueue(queue, socket) && in.SetupTxQueue(queue, socket) && out.SetupTxQueue(queue, socket) && @@ -709,7 +710,7 @@ bool cDataPlane::KNIAddTxQueue(KniHandleBundle& bundle, tQueueId queue, tSocketI bool cDataPlane::KNIAddRxQueue(KniHandleBundle& bundle, tQueueId queue, tSocketId socket, rte_mempool* mempool) { auto& [_, fwd, in, out, drop] = bundle; - (void)_; + YANET_GCC_BUG_UNUSED(_); return fwd.SetupRxQueue(queue, socket, mempool) && in.SetupRxQueue(queue, socket, mempool) && out.SetupRxQueue(queue, socket, mempool) && @@ -837,7 +838,7 @@ eResult cDataPlane::initGlobalBases() /// slow worker for (const auto& [core, _] : config.controlplane_workers) { - (void)_; + YANET_GCC_BUG_UNUSED(_); tSocketId socketId = rte_lcore_to_socket_id(core); result = create_globalbase_atomics(socketId); @@ -1566,88 +1567,63 @@ eResult cDataPlane::initSharedMemory() return common::sdp::SdrSever::PrepareSharedMemoryData(sdp_data, workers_id, workers_gc_id, config.useHugeMem); } -eResult cDataPlane::allocateSharedMemory() +static int get_numa_node(unsigned int core_id) { - /// precalculation of shared memory size for each numa - std::map number_of_workers_per_socket; - for (const auto& worker : config.workers) + int socket_id = numa_node_of_cpu(static_cast(core_id)); + if (socket_id == -1) { - const int coreId = worker.first; + YADECAP_LOG_ERROR("numa_node_of_cpu error: %s\n", strerror(errno)); + return 0; // Default to socket 0 + } + return socket_id; +} - auto socket_id = numa_node_of_cpu(coreId); - if (socket_id == -1) - { - YADECAP_LOG_ERROR("numa_node_of_cpu err: %s\n", strerror(errno)); - socket_id = 0; - } +static std::unordered_map calculate_shared_memory_size(const tDataPlaneConfig& config) +{ + /// helper for calculation of shared memory size for each numa + std::unordered_map workers_per_socket; - if (number_of_workers_per_socket.find(socket_id) == number_of_workers_per_socket.end()) - { - number_of_workers_per_socket[socket_id] = 1; - } - else - { - number_of_workers_per_socket[socket_id]++; - } + for (const auto& [core_id, _] : config.workers) + { + YANET_GCC_BUG_UNUSED(_); + workers_per_socket[get_numa_node(core_id)]++; } /// slow worker - for (const auto& [coreId, _] : config.controlplane_workers) + for (const auto& [core_id, _] : config.controlplane_workers) { - (void)_; - - auto socket_id = numa_node_of_cpu(coreId); - if (socket_id == -1) - { - YADECAP_LOG_ERROR("numa_node_of_cpu err: %s\n", strerror(errno)); - socket_id = 0; - } - - if (number_of_workers_per_socket.find(socket_id) == number_of_workers_per_socket.end()) - { - number_of_workers_per_socket[socket_id] = 1; - } - else - { - number_of_workers_per_socket[socket_id]++; - } + YANET_GCC_BUG_UNUSED(_); + workers_per_socket[get_numa_node(core_id)]++; } - std::map shm_size_per_socket; + std::unordered_map shm_size_per_socket; + + // Calculate sizes based on shared memory configuration for (const auto& ring_cfg : config.shared_memory) { - const auto& [dump_size, dump_count] = ring_cfg.second; + size_t size = sharedmemory::GetCapacity(ring_cfg.second); - auto unit_size = sizeof(sharedmemory::item_header_t) + dump_size; - if (unit_size % RTE_CACHE_LINE_SIZE != 0) + for (const auto& [socket_id, worker_count] : workers_per_socket) { - unit_size += RTE_CACHE_LINE_SIZE - unit_size % RTE_CACHE_LINE_SIZE; /// round up - } - - auto size = sizeof(sharedmemory::ring_header_t) + unit_size * dump_count; - - for (const auto& [socket_id, num] : number_of_workers_per_socket) - { - auto it = shm_size_per_socket.find(socket_id); - if (it == shm_size_per_socket.end()) - { - it = shm_size_per_socket.emplace_hint(it, socket_id, 0); - } - it->second += size * num; + shm_size_per_socket[socket_id] += size * worker_count; } } - for (const auto& [socket_id, num] : number_of_workers_per_socket) + // Add additional memory for performance-related data + for (const auto& [socket_id, worker_count] : workers_per_socket) { - auto it = shm_size_per_socket.find(socket_id); - if (it == shm_size_per_socket.end()) - { - it = shm_size_per_socket.emplace_hint(it, socket_id, 0); - } - - it->second += sizeof(dataplane::perf::tsc_deltas) * (num + 1); + shm_size_per_socket[socket_id] += sizeof(dataplane::perf::tsc_deltas) * (worker_count + 1); } + return shm_size_per_socket; +} + +// FIXME: why is this class not using class SharedMemory from common/shared_memory.h? +eResult cDataPlane::allocateSharedMemory() +{ + // shared memory size for each numa + std::unordered_map shm_size_per_socket = calculate_shared_memory_size(config); + /// allocating IPC shared memory key_t key = YANET_DEFAULT_IPC_SHMKEY; for (const auto& [socket_id, size] : shm_size_per_socket) @@ -1688,7 +1664,9 @@ eResult cDataPlane::allocateSharedMemory() return eResult::errorInitSharedMemory; } - shm_by_socket_id[socket_id] = std::make_tuple(key, shmaddr); + // TODO: mlock to lock shared memory to RAM to avoid page faults therefore increasing performance? + + shm_by_socket_id[socket_id] = {key, shmaddr, 0}; key++; } @@ -1696,85 +1674,50 @@ eResult cDataPlane::allocateSharedMemory() return eResult::success; } +/// split memory per worker eResult cDataPlane::splitSharedMemoryPerWorkers() { - std::map offsets; - for (const auto& it : shm_by_socket_id) - { - const auto& addr = std::get<1>(it.second); - offsets[addr] = 0; - } + using namespace sharedmemory; - /// split memory per worker for (cWorker* worker : workers_vector) { - const auto& socket_id = worker->socketId; + tSocketId socket_id = worker->socketId; + tCoreId core_id = worker->coreId; + const auto& it = shm_by_socket_id.find(socket_id); if (it == shm_by_socket_id.end()) { + // No shared memory allocated for this socket, skip this worker continue; } - const auto& [key, shm] = it->second; + auto& [key, shm, offset] = it->second; int ring_id = 0; for (const auto& [tag, ring_cfg] : config.shared_memory) { - const auto& [dump_size, units_number] = ring_cfg; - - auto unit_size = sizeof(sharedmemory::item_header_t) + dump_size; - if (unit_size % RTE_CACHE_LINE_SIZE != 0) - { - unit_size += RTE_CACHE_LINE_SIZE - unit_size % RTE_CACHE_LINE_SIZE; /// round up - } - - auto size = sizeof(sharedmemory::ring_header_t) + unit_size * units_number; - if (size % RTE_CACHE_LINE_SIZE != 0) - { - size += RTE_CACHE_LINE_SIZE - size % RTE_CACHE_LINE_SIZE; /// round up - } - - auto name = "shm_" + std::to_string(worker->coreId) + "_" + std::to_string(ring_id); - - auto offset = offsets[shm]; + const auto& [format, dump_size, dump_count] = ring_cfg; - auto memaddr = (void*)((intptr_t)shm + offset); + auto memaddr = utils::ShiftBuffer(shm, offset); + offset += GetCapacity(ring_cfg); - sharedmemory::cSharedMemory ring; + worker->dump_rings[ring_id] = CreateSharedMemoryDumpRing(ring_cfg, memaddr); - ring.init(memaddr, unit_size, units_number); - - offsets[shm] += size; - - worker->dumpRings[ring_id] = ring; - - auto meta = common::idp::get_shm_info::dump_meta(name, tag, unit_size, units_number, worker->coreId, socket_id, key, offset); - dumps_meta.emplace_back(meta); + std::string name = "shm_" + std::to_string(core_id) + "_" + std::to_string(ring_id); + // TODO: add format here + dumps_meta.emplace_back(name, tag, dump_size, dump_count, core_id, socket_id, key, offset); tag_to_id[tag] = ring_id; ring_id++; } - } - for (cWorker* worker : workers_vector) - { - const auto& socket_id = worker->socketId; - const auto& it = shm_by_socket_id.find(socket_id); - if (it == shm_by_socket_id.end()) - { - continue; - } - const auto& [key, shm] = it->second; + auto memaddr = utils::ShiftBuffer(shm, offset); + offset += sizeof(dataplane::perf::tsc_deltas); - auto offset = offsets[shm]; - worker->tsc_deltas = reinterpret_cast(reinterpret_cast(shm) + offset); - // Use value-initialization to reset the object - *worker->tsc_deltas = {}; - offsets[shm] += sizeof(dataplane::perf::tsc_deltas); + worker->tsc_deltas = new (memaddr) dataplane::perf::tsc_deltas{}; - auto meta = common::idp::get_shm_tsc_info::tsc_meta(worker->coreId, socket_id, key, offset); - tscs_meta.emplace_back(meta); + tscs_meta.emplace_back(core_id, socket_id, key, offset); } return eResult::success; @@ -2260,6 +2203,7 @@ eResult cDataPlane::parseSharedMemory(const nlohmann::json& json) std::string tag = shmJson["tag"]; unsigned int size = shmJson["dump_size"]; unsigned int count = shmJson["dump_count"]; + std::string format_str = shmJson.value("dump_format", "raw"); if (exist(config.shared_memory, tag)) { @@ -2267,7 +2211,7 @@ eResult cDataPlane::parseSharedMemory(const nlohmann::json& json) return eResult::invalidConfigurationFile; } - config.shared_memory[tag] = {size, count}; + config.shared_memory[tag] = {tDataPlaneConfig::StringToDumpFormat(format_str), size, count}; } return eResult::success; @@ -2283,7 +2227,7 @@ eResult cDataPlane::checkConfig() for (auto& [core, _] : config.controlplane_workers) { - (void)_; + YANET_GCC_BUG_UNUSED(_); if (core >= std::thread::hardware_concurrency()) { YADECAP_LOG_ERROR("invalid coreId: '%u'\n", core); diff --git a/dataplane/dataplane.h b/dataplane/dataplane.h index 9e75d108..2e441dd4 100644 --- a/dataplane/dataplane.h +++ b/dataplane/dataplane.h @@ -19,6 +19,7 @@ #include "common/type.h" #include "bus.h" +#include "config.h" #include "config_values.h" #include "controlplane.h" #include "globalbase.h" @@ -29,70 +30,6 @@ #include "slow_worker.h" #include "type.h" -using InterfaceName = std::string; - -struct tDataPlaneConfig -{ - /* - DPDK ports used by `dataplane`. - Each port has a name with which is exposed into host system - and an identifier (typically pci id) used to lookup the port within - DPDK. - */ - std::map> - ports; - - std::set workerGCs; - tCoreId controlPlaneCoreId; - std::map> controlplane_workers; - std::map> workers; - bool useHugeMem = true; - bool use_kernel_interface = true; - bool interfaces_required = true; - uint64_t rssFlags = RTE_ETH_RSS_IP; - uint32_t SWNormalPriorityRateLimitPerWorker = 0; - uint32_t SWICMPOutRateLimit = 0; - uint32_t rateLimitDivisor = 1; - std::string memory; - std::map> shared_memory; - - std::vector ealArgs; - std::set WorkersInterfaces(std::set cores) - { - std::set ifaces; - for (auto core : cores) - { - auto worker = workers.at(core); - ifaces.insert(worker.begin(), worker.end()); - } - return ifaces; - } - std::map VdevQueues() - { - std::map total; - for (auto& [_, cores] : controlplane_workers) - { - (void)_; - std::set ifaces; - for (auto core : cores) - { - const auto& w = workers.at(core); - ifaces.insert(w.begin(), w.end()); - } - for (auto& iface : ifaces) - { - ++total[iface]; - } - } - return total; - } -}; - class hugepage_pointer { public: @@ -255,7 +192,13 @@ class cDataPlane common::idp::hitcount_dump::response hitcount_map_; - std::map> shm_by_socket_id; + struct ShmInfo + { + key_t key; + void* addr; + size_t offset; + }; + std::unordered_map shm_by_socket_id; std::set socket_ids; std::map socket_worker_gcs; diff --git a/dataplane/meson.build b/dataplane/meson.build index 73962ecd..7f9f648c 100644 --- a/dataplane/meson.build +++ b/dataplane/meson.build @@ -3,6 +3,7 @@ dependencies += libdpdk.get_variable('dpdk_dep') dependencies += libjson.get_variable('nlohmann_json_dep') dependencies += dependency('libsystemd') dependencies += dependency('threads') +dependencies += pcapplusplus_deps sources = files('bus.cpp', 'controlplane.cpp', @@ -20,6 +21,7 @@ sources = files('bus.cpp', 'neighbor.cpp', 'report.cpp', 'sharedmemory.cpp', + 'pcap_shm_device.cpp', 'slow_worker.cpp', 'sock_dev.cpp', 'worker.cpp', diff --git a/dataplane/pcap_shm_device.cpp b/dataplane/pcap_shm_device.cpp new file mode 100644 index 00000000..8e2969a4 --- /dev/null +++ b/dataplane/pcap_shm_device.cpp @@ -0,0 +1,311 @@ +#include +#include +#include +#include +#include + +#include "pcap_shm_device.h" + +// TODO: replace cerr with YANET_LOG +namespace pcpp +{ + +bool PcapShmWriterDevice::RotateToNextSegment() +{ + current_segment_index_ = (current_segment_index_ + 1) % pcap_files_; + FILE* file = segments_[current_segment_index_].file; + // Move file pointer to just after the global header in the new segment + return (fseek(file, kPcapFileHeaderSize, SEEK_SET) == 0); +} + +bool PcapShmWriterDevice::FillSegments() +{ + segments_.resize(pcap_files_); + + size_t base_size = shm_size_ / pcap_files_; + size_t remainder = shm_size_ % pcap_files_; + + size_t offset = 0; + for (size_t i = 0; i < pcap_files_; ++i) + { + size_t segment_size = base_size + (i == pcap_files_ - 1 ? remainder : 0); + segments_[i].start_ptr = static_cast(shm_ptr_) + offset; + segments_[i].size = segment_size; + offset += segment_size; + + FILE* file = fmemopen(segments_[i].start_ptr, segments_[i].size, "w+"); + if (!file) + { + std::cerr << "fmemopen failed for segment " << i << std::endl; + return false; + } + + pcap_dumper_t* dumper = pcap_dump_fopen(m_PcapDescriptor.get(), file); + if (!dumper) + { + std::cerr << "pcap_dump_fopen failed for segment " << i << std::endl; + fclose(file); + return false; + } + + segments_[i].file = file; + segments_[i].dumper = dumper; + } + + return true; +} + +PcapShmWriterDevice::PcapShmWriterDevice(void* shm_ptr, size_t shm_size, size_t pcap_files, LinkLayerType link_layer_type, bool nanoseconds_precision) : + IShmWriterDevice(shm_ptr, shm_size), + link_layer_type_(link_layer_type), + pcap_files_(pcap_files), + current_segment_index_(0) +{ +#if defined(PCAP_TSTAMP_PRECISION_NANO) + precision_ = nanoseconds_precision ? FileTimestampPrecision::Nanoseconds + : FileTimestampPrecision::Microseconds; +#else + if (nanosecondsPrecision) + { + std::cerr << "PcapPlusPlus was compiled without nano precision support which requires " + "libpcap > 1.5.1. Please " + "recompile PcapPlusPlus with nano precision support to use this feature. " + "Using " + "default microsecond precision.\n"; + } + m_Precision_ = FileTimestampPrecision::Microseconds; +#endif + + // TODO: we should add this assert + /* if (m_SegmentSize <= kPcapFileHeaderSize + PCPP_MAX_PACKET_SIZE - 1) { */ + /* TMP_LOG("Segment too small to hold at least one full packet"); */ + /* throw("something"); */ + /* } */ +} + +PcapShmWriterDevice::~PcapShmWriterDevice() +{ + PcapShmWriterDevice::close(); +} + +void PcapShmWriterDevice::DumpPcapFilesToDisk(std::string_view filename_prefix) +{ + Flush(); + + size_t file_index = 1; + std::string filename; + // Allocate space for prefix + index + ".pcap" + filename.reserve(filename_prefix.size() + 10); + + for (size_t i = 0; i < pcap_files_; ++i) + { + size_t segment_index = (current_segment_index_ + 1 + i) % pcap_files_; + FILE* file = segments_[segment_index].file; + + // Not opened or already closed + if (file == nullptr) + { + continue; + } + + long used = ftell(file); + if (used < 0) + { + std::cerr << "ftell failed on segment " << i << std::endl; + continue; + } + + // If only global header is present, no packets were written. + if (static_cast(used) <= kPcapFileHeaderSize) + { + continue; + } + + filename = filename_prefix; + filename += std::to_string(file_index++) + ".pcap"; + std::ofstream output_file(filename, std::ios::binary); + if (!output_file) + { + std::cerr << "Failed to open " << filename << " for writing" << std::endl; + continue; + } + + output_file.write(reinterpret_cast(segments_[segment_index].start_ptr), used); + if (output_file.bad()) + { + std::cerr << "Error writing to file " << filename << std::endl; + continue; + } + } +} + +bool PcapShmWriterDevice::open() +{ + if (m_DeviceOpened) + { + return true; + } + + switch (link_layer_type_) + { + case LINKTYPE_RAW: + case LINKTYPE_DLT_RAW2: + std::cerr << "The only Raw IP link type supported in libpcap/WinPcap/Npcap is " + "LINKTYPE_DLT_RAW1, please use that instead\n"; + return false; + default: + break; + } + +#if defined(PCAP_TSTAMP_PRECISION_NANO) + m_PcapDescriptor = internal::PcapHandle(pcap_open_dead_with_tstamp_precision( + link_layer_type_, PCPP_MAX_PACKET_SIZE - 1, static_cast(precision_))); +#else + m_PcapDescriptor = + internal::PcapHandle(pcap_open_dead(m_LinkLayerType_, PCPP_MAX_PACKET_SIZE - 1)); +#endif + if (m_PcapDescriptor == nullptr) + { + std::cerr << "Error opening pcap descriptor: pcap_open_dead returned nullptr" + << std::endl; + return false; + } + + if (!FillSegments()) + { + return false; + } + + current_segment_index_ = 0; + m_DeviceOpened = true; + return true; +} + +bool PcapShmWriterDevice::WritePacket(RawPacket const& packet) +{ + if (!m_DeviceOpened) + { + std::cerr << "Device not opened" << std::endl; + ++num_of_packets_not_written_; + return false; + } + + if (packet.getLinkLayerType() != link_layer_type_) + { + std::cerr << "Cannot write a packet with a different link layer type" << std::endl; + ++num_of_packets_not_written_; + return false; + } + + pcap_pkthdr pkt_hdr; + pkt_hdr.caplen = packet.getRawDataLen(); + pkt_hdr.len = packet.getFrameLength(); + + timespec packet_timestamp = packet.getPacketTimeStamp(); +#if defined(PCAP_TSTAMP_PRECISION_NANO) + if (precision_ != FileTimestampPrecision::Nanoseconds) + { + TIMESPEC_TO_TIMEVAL(&pkt_hdr.ts, &packet_timestamp); + } + else + { + pkt_hdr.ts.tv_sec = packet_timestamp.tv_sec; + pkt_hdr.ts.tv_usec = packet_timestamp.tv_nsec; + } +#else + TIMESPEC_TO_TIMEVAL(&pkt_hdr.ts, &packet_timestamp); +#endif + + // kPcapPacketHeaderSizeOnDisk is different from sizeof(pcap_pkthdr) + size_t needed = kPcapPacketHeaderSizeOnDisk + pkt_hdr.caplen; + + FILE* file = segments_[current_segment_index_].file; + long used = ftell(file); + if (used < 0) + { + std::cerr << "ftell failed on current segment" << std::endl; + ++num_of_packets_not_written_; + return false; + } + + size_t available = segments_[current_segment_index_].size - used; + if (needed > available) + { + if (!RotateToNextSegment()) + { + std::cerr << "fseek failed when rotating to next segment" << std::endl; + ++num_of_packets_not_written_; + return false; + } + file = segments_[current_segment_index_].file; + } + + pcap_dump(reinterpret_cast(segments_[current_segment_index_].dumper), &pkt_hdr, packet.getRawData()); + ++num_of_packets_written_; + return true; +} + +bool PcapShmWriterDevice::WritePackets(RawPacketVector const& packets) +{ + for (RawPacket const* packet : packets) + { + if (!WritePacket(*packet)) + return false; + } + return true; +} + +void PcapShmWriterDevice::Flush() +{ + if (!m_DeviceOpened) + return; + + for (auto& seg : segments_) + { + if (seg.dumper != nullptr && pcap_dump_flush(seg.dumper) == -1) + { + std::cerr << "Error while flushing the packets to shared memory" << std::endl; + } + } + + for (auto& seg : segments_) + { + if (seg.file != nullptr && fflush(seg.file) == EOF) + { + std::cerr << "Error while flushing the packets to file" << std::endl; + } + } +} + +void PcapShmWriterDevice::close() +{ + if (!m_DeviceOpened) + return; + + Flush(); + + for (auto& [ptr, size, file, dumper] : segments_) + { + if (dumper != nullptr) + { + // pcap_dump_close closes both the dumper and the FILE* + pcap_dump_close(dumper); + ptr = nullptr; + size = 0; + dumper = nullptr; + file = nullptr; + } + } + + m_PcapDescriptor.reset(); + m_DeviceOpened = false; +} + +void PcapShmWriterDevice::getStatistics(PcapStats& stats) const +{ + stats.packetsRecv = num_of_packets_written_; + stats.packetsDrop = num_of_packets_not_written_; + stats.packetsDropByInterface = 0; +} + +} // namespace pcpp diff --git a/dataplane/pcap_shm_device.h b/dataplane/pcap_shm_device.h new file mode 100644 index 00000000..595a8a7f --- /dev/null +++ b/dataplane/pcap_shm_device.h @@ -0,0 +1,233 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include "PcapDevice.h" +#include "PcapFileDevice.h" + +// The PcapPlus Plus namespace. +namespace pcpp +{ + +/** + * @brief An abstract class representing a shared memory device for pcap data. + * + * This device provides a pcap-compatible interface for reading/writing packets, + * but the underlying storage is a shared memory region rather than a file or a live network + * interface. + * + * Derived classes must implement device-specific logic for reading/writing packets. + */ +class IShmDevice : public IPcapDevice +{ +protected: + void* shm_ptr_; + size_t shm_size_; + + explicit IShmDevice(void* shm_ptr, size_t shm_size) : + IPcapDevice(), shm_ptr_(shm_ptr), shm_size_(shm_size) {} + + ~IShmDevice() override + { + close(); + } + +public: + /** + * @return Pointer to the underlying shared memory region. + */ + [[nodiscard]] void* GetShmPtr() const + { + return shm_ptr_; + } + + /** + * @return The size of the shared memory region in bytes. + */ + [[nodiscard]] size_t GetShmSize() const + { + return shm_size_; + } + + /** + * @brief Close the device. + * + * This will release any pcap resources associated with it. + */ + void close() override + { + if (m_PcapDescriptor != nullptr) + { + m_PcapDescriptor = nullptr; + } + m_DeviceOpened = false; + } +}; + +/** + * @brief An abstract class for shared memory writer devices. + * + * A writer device provides methods to write packets into the shared memory region. + * These packets can later be read or dumped to disk by other utilities. + */ +class IShmWriterDevice : public IShmDevice +{ +protected: + uint32_t num_of_packets_written_; + uint32_t num_of_packets_not_written_; + + IShmWriterDevice(void* shm_ptr, size_t shm_size) : + IShmDevice(shm_ptr, shm_size), + num_of_packets_written_(0), + num_of_packets_not_written_(0) {} + +public: + ~IShmWriterDevice() override = default; + + /** + * @brief Write a single RawPacket into the shared memory. + * + * @param[in] packet The packet to write. + * + * @return True if the packet was written successfully, false otherwise. + */ + [[nodiscard]] virtual bool WritePacket(RawPacket const& packet) = 0; + + /** + * @brief Write multiple RawPackets into the shared memory. + * + * @param[in] packets A vector of packet pointers to be written. + * + * @return True if all packets were written successfully, false otherwise. + */ + [[nodiscard]] virtual bool WritePackets(RawPacketVector const& packets) = 0; +}; + +/** + * @brief A class for writing packets to a shared memory region in pcap format, using a ring-buffer + * approach. + * + * The objective is to enable continuous packet capture while utilizing a limited amount of memory. + * The approach adopted here is inspired by Wireshark's "multiple files, ring buffer" feature: + * + * Multiple files, ring buffer: + * "Much like 'Multiple files continuous', reaching one of the multiple files switch conditions + * (one of the 'Next file every …' values) will switch to the next file. This will be a newly + * created file if the value of 'Ring buffer with n files' is not reached; otherwise, it will + * replace the oldest of the formerly used files (thus forming a 'ring'). + * + * This mode will limit the maximum disk usage, even for an unlimited amount of capture input data, + * only keeping the latest captured data." + * (Source: https://www.wireshark.org/docs/wsug_html_chunked/ChCapCaptureFiles.html) + * + * **Algorithm Behind Ring-Buffer Writing:** + * The shared memory region is divided into multiple segments (each representing a 'virtual pcap + * file'). Packets are written sequentially into the current segment. If there isn't enough space + * for a new packet, the writer 'rotates' to the next segment. + * - Suppose you have N segments. + * - You write packets into segment 1 until it's almost full. + * - If you can't fit a new packet, you move to segment 2, and continue writing there. + * - Once you reach segment N and still have more packets, you wrap around to segment 1 again, + * overwriting old data. + * + * After all packets are written, `DumpPcapFilesToDisk()` can be used to extract each segment + * into a standalone pcap file. + */ +class PcapShmWriterDevice : public IShmWriterDevice +{ + LinkLayerType link_layer_type_; + FileTimestampPrecision precision_; + + size_t pcap_files_; ///< Number of pcap segments + size_t current_segment_index_; ///< Current segment index we're writing to + + struct SegmentInfo + { + void* start_ptr; ///< Pointer to the start of this segment in shared memory + size_t size; ///< Size of the segment + FILE* file; ///< FILE stream for this pcap segment + pcap_dumper_t* dumper; ///< pcap dumper for this pcap segment + }; + + std::vector segments_; + + // Prevent copying + PcapShmWriterDevice(PcapShmWriterDevice const&) = delete; + PcapShmWriterDevice& operator=(PcapShmWriterDevice const&) = delete; + + /** + * @brief Rotate to the next segment if the current one doesn't have enough space. + * + * @return True if successful, false if fseek fails. + */ + bool RotateToNextSegment(); + + /** + * @brief Distribute the shared memory into multiple segments and initialize them as in-memory + * pcap 'files'. + * + * This method divides the shared memory region into pcap_files_ segments, + * ensuring all available memory is utilized. Each segment will have an equal base size, + * except for the last segment which includes any remainder bytes. It then opens each segment as + * an in-memory pcap 'file'. + * + * @return True if all segments were successfully initialized, false otherwise. + */ + bool FillSegments(); + +public: + static constexpr size_t kPcapPacketHeaderSizeOnDisk = 16; + static constexpr size_t kPcapFileHeaderSize = 24; + + /** + * @brief Constructor for PcapShmWriterDevice + * + * @param[in] shmPtr Pointer to the shared memory region. + * @param[in] shmSize Size of the shared memory region. + * @param[in] pcapFiles Number of 'pcap segments' to divide the shared memory into. + * @param[in] linkLayerType The link layer type all packets in this region will be based on. The + * default is Ethernet. + * @param[in] nanosecondsPrecision A boolean indicating whether to write timestamps in + * nano-precision. If set to false, timestamps will be written in micro-precision. + */ + PcapShmWriterDevice(void* shm_ptr, size_t shm_size, size_t pcap_files, LinkLayerType link_layer_type = LINKTYPE_ETHERNET, bool nanoseconds_precision = false); + + ~PcapShmWriterDevice() override; + + /** + * @brief Dump each pcap segment from shared memory to a file on disk. + * + * @param filenamePrefix The prefix for the output pcap files, e.g. "capture_" + * will produce "capture_1.pcap", "capture_2.pcap", etc. + */ + void DumpPcapFilesToDisk(std::string_view filename_prefix); + + bool open() override; + + bool WritePacket(RawPacket const& packet) override; + + bool WritePackets(RawPacketVector const& packets) override; + + /** + * @brief Flush all pending writes to the shared memory segments. + */ + void Flush(); + + /** + * @brief Close the device and free associated resources. + */ + void close() override; + + /** + * @brief Get statistics for packets written so far. + * + * @param[out] stats The PcapStats structure to fill. + */ + void getStatistics(PcapStats& stats) const override; +}; + +} // namespace pcpp diff --git a/dataplane/sharedmemory.cpp b/dataplane/sharedmemory.cpp index aec56f17..51efa43e 100644 --- a/dataplane/sharedmemory.cpp +++ b/dataplane/sharedmemory.cpp @@ -1,46 +1,109 @@ #include "sharedmemory.h" #include "common/type.h" +#include "common/utils.h" #include "metadata.h" -using namespace sharedmemory; +#include "MBufRawPacket.h" -eResult cSharedMemory::init(void* memory, int unit_size, int units_number) +namespace sharedmemory { - buffer = common::bufferring(memory, unit_size, units_number); - buffer.ring->header.before = 0; - buffer.ring->header.after = 0; - - return eResult::success; +DumpRingRaw::DumpRingRaw(void* memory, size_t max_pkt_size, size_t pkt_count) : + buffer_(memory, max_pkt_size, pkt_count), ring_(buffer_.ring) +{ + ring_->header.before = 0; + ring_->header.after = 0; } -void cSharedMemory::write(rte_mbuf* mbuf, common::globalBase::eFlowType flow_type) +void DumpRingRaw::Write(rte_mbuf* mbuf, common::globalBase::eFlowType flow_type, [[maybe_unused]] uint32_t time) { // Each ring has its own header, the header contains absolute position // to which next packet should be written. Position has two state: // -- "before" increments immediately before of copying data to memory; // -- "after" increments after copying data. - uint64_t wpos = (buffer.ring->header.before) % buffer.units_number; - buffer.ring->header.before++; - auto* item = (item_t*)((uintptr_t)buffer.ring->memory + (wpos * buffer.unit_size)); + uint64_t wpos = (ring_->header.before) % buffer_.units_number; + ring_->header.before++; + auto* item = utils::ShiftBuffer(ring_->memory, wpos * buffer_.unit_size); dataplane::metadata* metadata = YADECAP_METADATA(mbuf); - uint64_t memory_size = buffer.unit_size - sizeof(ring_header_t); + uint64_t memory_size = buffer_.unit_size - sizeof(ring_header_t); uint64_t copy_size = RTE_MIN(memory_size, mbuf->data_len); item->header.size = copy_size; item->header.tag = metadata->hash; item->header.in_logicalport_id = metadata->in_logicalport_id; item->header.out_logicalport_id = metadata->out_logicalport_id; - item->header.flow_type = (uint8_t)flow_type; + item->header.flow_type = static_cast(flow_type); - memcpy(item->memory, - rte_pktmbuf_mtod(mbuf, void*), - copy_size); + memcpy(item->memory, rte_pktmbuf_mtod(mbuf, void*), copy_size); YANET_MEMORY_BARRIER_COMPILE; - buffer.ring->header.after++; + ring_->header.after++; +} + +size_t DumpRingRaw::GetCapacity(size_t max_pkt_size, size_t pkt_count) +{ + return PacketBufferRing::GetCapacity(max_pkt_size, pkt_count); +} + +// TODO: use max_pkt_size as snaplen in pcap? +DumpRingPcap::DumpRingPcap(void* memory, size_t max_pkt_size, size_t pkt_count) : + dev_(memory, GetCapacity(max_pkt_size, pkt_count), 3) +{ + // TODO: Don't know how yet, but we need to pass files amount. Let's do three by now. +} + +/** + * @brief A complete copy of the PcapPlusPlus wrapper of the RawPacket class. + * + * This class allows initialization with an already-created mbuf, making it + * possible to safely pass the object to a Writer instance as the base class + * RawPacket. In the original `MBufRawPacket` class, the `setMBuf` method + * was protected, plus is requires to build PcapPlusPlus with DPDK support, + * which is unnecessary for such a small change. + */ +class MBufRawPacketCopy : public pcpp::RawPacket +{ + void SetMBuf(rte_mbuf* mbuf, timespec timestamp) + { + if (mbuf == nullptr) + { + std::cerr << "mbuf to set is nullptr" << std::endl; + return; + } + + setRawData(rte_pktmbuf_mtod(mbuf, const uint8_t*), rte_pktmbuf_pkt_len(mbuf), timestamp, pcpp::LINKTYPE_ETHERNET); + } + +public: + MBufRawPacketCopy(rte_mbuf* mbuf, const timespec& timestamp) : + RawPacket() + { + m_DeleteRawDataAtDestructor = false; + SetMBuf(mbuf, timestamp); + } +}; + +void DumpRingPcap::Write(rte_mbuf* mbuf, [[maybe_unused]] common::globalBase::eFlowType flow_type, uint32_t time) +{ + timespec ts = {.tv_sec = time, .tv_nsec = 0}; + MBufRawPacketCopy raw_packet(mbuf, ts); + + // TODO: can I do this, or should I use time obtained from basePermanently.globalBaseAtomic->currentTime like I do now? + /* timespec_get(&ts, TIME_UTC); */ + + dev_.WritePacket(raw_packet); } + +size_t DumpRingPcap::GetCapacity(size_t max_pkt_size, size_t pkt_count) +{ + auto& file_hdr_size = pcpp::PcapShmWriterDevice::kPcapFileHeaderSize; + auto& pkt_hdr_size = pcpp::PcapShmWriterDevice::kPcapPacketHeaderSizeOnDisk; + + return file_hdr_size + (pkt_hdr_size + max_pkt_size) * pkt_count; +} + +} // namespace sharedmemory diff --git a/dataplane/sharedmemory.h b/dataplane/sharedmemory.h index b2ea8751..9b49776b 100644 --- a/dataplane/sharedmemory.h +++ b/dataplane/sharedmemory.h @@ -1,24 +1,90 @@ +#pragma once +// TODO: RENAME TO dump_rings.h + #include #include "common/bufferring.h" -#include "common/result.h" #include "common/type.h" +#include "config.h" +#include "pcap_shm_device.h" + namespace sharedmemory { +using DumpFormat = tDataPlaneConfig::DumpFormat; +using DumpConfig = tDataPlaneConfig::DumpConfig; + +struct DumpRingBase +{ + virtual ~DumpRingBase() = default; -using ring_header_t = common::bufferring::ring_header_t; -using ring_t = common::bufferring::ring_t; -using item_header_t = common::bufferring::item_header_t; -using item_t = common::bufferring::item_t; + virtual void Write(rte_mbuf* mbuf, common::globalBase::eFlowType flow_type, uint32_t time) = 0; +}; -class cSharedMemory +class DumpRingRaw : public DumpRingBase { + using PacketBufferRing = common::PacketBufferRing; + using ring_t = PacketBufferRing::ring_t; + using item_t = PacketBufferRing::item_t; + using ring_header_t = PacketBufferRing::ring_header_t; + + PacketBufferRing buffer_; + ring_t* ring_; + public: - eResult init(void* memory, int unit_size, int units_number); - void write(rte_mbuf* mbuf, common::globalBase::eFlowType flow_type); + DumpRingRaw(void* memory, size_t max_pkt_size, size_t pkt_count); + + ~DumpRingRaw() override = default; + + void Write(rte_mbuf* mbuf, common::globalBase::eFlowType flow_type, uint32_t time) override; - common::bufferring buffer; + static size_t GetCapacity(size_t max_pkt_size, size_t pkt_count); }; +class DumpRingPcap : public DumpRingBase +{ + pcpp::PcapShmWriterDevice dev_; + +public: + DumpRingPcap(void* memory, size_t max_pkt_size, size_t pkt_count); + + ~DumpRingPcap() override = default; + + void Write(rte_mbuf* mbuf, common::globalBase::eFlowType flow_type, uint32_t time) override; + + static size_t GetCapacity(size_t max_pkt_size, size_t pkt_count); +}; + +inline size_t GetCapacity(const DumpConfig& config) +{ + auto& [format, max_pkt_size, pkt_count] = config; + + switch (format) + { + case DumpFormat::kRaw: + return DumpRingRaw::GetCapacity(max_pkt_size, pkt_count); + case DumpFormat::kPcap: + return DumpRingPcap::GetCapacity(max_pkt_size, pkt_count); + default: + YANET_THROW("Invalid dump format"); + std::abort(); + } +} + +inline std::unique_ptr CreateSharedMemoryDumpRing(const DumpConfig& config, void* memory) +{ + auto& [format, max_pkt_size, pkt_count] = config; + + switch (format) + { + case DumpFormat::kRaw: + return std::make_unique(memory, max_pkt_size, pkt_count); + case DumpFormat::kPcap: + return std::make_unique(memory, max_pkt_size, pkt_count); + default: + YANET_THROW("Invalid dump format"); + std::abort(); + } +} + } // namespace sharedmemory diff --git a/dataplane/unittest/sdp.cpp b/dataplane/unittest/sdp.cpp index 992cd7ff..407077c8 100644 --- a/dataplane/unittest/sdp.cpp +++ b/dataplane/unittest/sdp.cpp @@ -38,12 +38,12 @@ class TestBus void CompareWithClient(const common::sdp::DataPlaneInSharedMemory& sdp_data_client) { - void* buffer = common::sdp::ShiftBuffer(sdp_data_client.dataplane_data, sdp_data_client.start_bus_section); + void* buffer = utils::ShiftBuffer(sdp_data_client.dataplane_data, sdp_data_client.start_bus_section); auto count_errors = static_cast(common::idp::errorType::size); auto count_requests = static_cast(common::idp::requestType::size); - auto* requests = common::sdp::ShiftBuffer(buffer, 0); - auto* errors = common::sdp::ShiftBuffer(buffer, count_requests * sizeof(uint64_t)); - auto* durations = common::sdp::ShiftBuffer(buffer, (count_requests + count_errors) * sizeof(uint64_t)); + auto* requests = utils::ShiftBuffer(buffer, 0); + auto* errors = utils::ShiftBuffer(buffer, count_requests * sizeof(uint64_t)); + auto* durations = utils::ShiftBuffer(buffer, (count_requests + count_errors) * sizeof(uint64_t)); for (uint32_t index = 0; index < static_cast(common::idp::requestType::size); index++) { @@ -159,11 +159,11 @@ class TestWorker void SetBufferForCounters(void* buffer, const common::sdp::MetadataWorker& metadata) { - counters = common::sdp::ShiftBuffer(buffer, metadata.start_counters); - aclCounters = common::sdp::ShiftBuffer(buffer, metadata.start_acl_counters); - bursts = common::sdp::ShiftBuffer(buffer, metadata.start_bursts); - stats = common::sdp::ShiftBuffer(buffer, metadata.start_stats); - statsPorts = common::sdp::ShiftBuffer(buffer, metadata.start_stats_ports); + counters = utils::ShiftBuffer(buffer, metadata.start_counters); + aclCounters = utils::ShiftBuffer(buffer, metadata.start_acl_counters); + bursts = utils::ShiftBuffer(buffer, metadata.start_bursts); + stats = utils::ShiftBuffer(buffer, metadata.start_stats); + statsPorts = utils::ShiftBuffer(buffer, metadata.start_stats_ports); } void SetTestValues(tCoreId coreId) @@ -207,7 +207,7 @@ class TestWorker ASSERT_EQ(common::sdp::SdpClient::GetCounterByName(sdp_data_client, "dropPackets", coreId)[coreId], stats->dropPackets); // statsPorts - auto* bufStatsPorts = common::sdp::ShiftBuffer(buffer, sdp_data_client.metadata_worker.start_stats_ports); + auto* bufStatsPorts = utils::ShiftBuffer(buffer, sdp_data_client.metadata_worker.start_stats_ports); for (uint32_t index = 0; index < CONFIG_YADECAP_PORTS_SIZE + 1; index++) { ASSERT_EQ(statsPorts[index].controlPlane_drops, bufStatsPorts[index].controlPlane_drops); @@ -215,21 +215,21 @@ class TestWorker } // bursts - auto* bufBursts = common::sdp::ShiftBuffer(buffer, sdp_data_client.metadata_worker.start_bursts); + auto* bufBursts = utils::ShiftBuffer(buffer, sdp_data_client.metadata_worker.start_bursts); for (uint32_t index = 0; index < CONFIG_YADECAP_MBUFS_BURST_SIZE + 1; index++) { ASSERT_EQ(bursts[index], bufBursts[index]); } // counters - auto* bufCounters = common::sdp::ShiftBuffer(buffer, sdp_data_client.metadata_worker.start_counters); + auto* bufCounters = utils::ShiftBuffer(buffer, sdp_data_client.metadata_worker.start_counters); for (uint32_t index = 0; index < YANET_CONFIG_COUNTERS_SIZE; index++) { ASSERT_EQ(counters[index], bufCounters[index]); } // aclCounters - auto* bufAclCounters = common::sdp::ShiftBuffer(buffer, sdp_data_client.metadata_worker.start_acl_counters); + auto* bufAclCounters = utils::ShiftBuffer(buffer, sdp_data_client.metadata_worker.start_acl_counters); for (uint32_t index = 0; index < YANET_CONFIG_ACL_COUNTERS_SIZE; index++) { ASSERT_EQ(aclCounters[index], bufAclCounters[index]); @@ -276,8 +276,8 @@ class TestWorkerGc void SetBufferForCounters(void* buffer, const common::sdp::MetadataWorkerGc& metadata) { - counters = common::sdp::ShiftBuffer(buffer, metadata.start_counters); - stats = common::sdp::ShiftBuffer(buffer, metadata.start_stats); + counters = utils::ShiftBuffer(buffer, metadata.start_counters); + stats = utils::ShiftBuffer(buffer, metadata.start_stats); } void SetTestValues(tCoreId coreId) @@ -302,7 +302,7 @@ class TestWorkerGc ASSERT_EQ(common::sdp::SdpClient::GetCounterByName(sdp_data_client, "drop_samples", coreId)[coreId], stats->drop_samples); // counters - auto* bufCounters = common::sdp::ShiftBuffer(buffer, sdp_data_client.metadata_worker_gc.start_counters); + auto* bufCounters = utils::ShiftBuffer(buffer, sdp_data_client.metadata_worker_gc.start_counters); for (uint32_t index = 0; index < YANET_CONFIG_COUNTERS_SIZE; index++) { ASSERT_EQ(counters[index], bufCounters[index]); diff --git a/dataplane/worker.cpp b/dataplane/worker.cpp index 7a1ad8e5..22800ee7 100644 --- a/dataplane/worker.cpp +++ b/dataplane/worker.cpp @@ -21,6 +21,7 @@ #include "common/fallback.h" #include "common/nat46clat.h" +#include "common/utils.h" #include "dataplane/sdpserver.h" #include "action_dispatcher.h" @@ -344,11 +345,11 @@ void cWorker::FillMetadataWorkerCounters(common::sdp::MetadataWorker& metadata) void cWorker::SetBufferForCounters(void* buffer, const common::sdp::MetadataWorker& metadata) { - counters = common::sdp::ShiftBuffer(buffer, metadata.start_counters); - aclCounters = common::sdp::ShiftBuffer(buffer, metadata.start_acl_counters); - bursts = common::sdp::ShiftBuffer(buffer, metadata.start_bursts); - stats = common::sdp::ShiftBuffer(buffer, metadata.start_stats); - statsPorts = common::sdp::ShiftBuffer(buffer, metadata.start_stats_ports); + counters = utils::ShiftBuffer(buffer, metadata.start_counters); + aclCounters = utils::ShiftBuffer(buffer, metadata.start_acl_counters); + bursts = utils::ShiftBuffer(buffer, metadata.start_bursts); + stats = utils::ShiftBuffer(buffer, metadata.start_stats); + statsPorts = utils::ShiftBuffer(buffer, metadata.start_stats_ports); } eResult cWorker::sanityCheck() diff --git a/dataplane/worker.h b/dataplane/worker.h index 1ed5774f..fec58c90 100644 --- a/dataplane/worker.h +++ b/dataplane/worker.h @@ -358,7 +358,8 @@ class cWorker // will decrease with each new packet sent to slow worker, replenishes each N mseconds int32_t packetsToSWNPRemainder; - sharedmemory::cSharedMemory dumpRings[YANET_CONFIG_SHARED_RINGS_NUMBER]; + using DumpRingBasePtr = std::unique_ptr; + std::array dump_rings; samples::Sampler sampler; diff --git a/dataplane/worker_gc.cpp b/dataplane/worker_gc.cpp index 2a645aa4..66c20434 100644 --- a/dataplane/worker_gc.cpp +++ b/dataplane/worker_gc.cpp @@ -179,8 +179,8 @@ void worker_gc_t::FillMetadataWorkerCounters(common::sdp::MetadataWorkerGc& meta void worker_gc_t::SetBufferForCounters(void* buffer, const common::sdp::MetadataWorkerGc& metadata) { - counters = common::sdp::ShiftBuffer(buffer, metadata.start_counters); - stats = common::sdp::ShiftBuffer(buffer, metadata.start_stats); + counters = utils::ShiftBuffer(buffer, metadata.start_counters); + stats = utils::ShiftBuffer(buffer, metadata.start_stats); } YANET_INLINE_NEVER void worker_gc_t::thread() diff --git a/logger/meson.build b/logger/meson.build index c07d93eb..e37a4253 100644 --- a/logger/meson.build +++ b/logger/meson.build @@ -6,9 +6,7 @@ dependencies += dependency('threads') sources = files('main.cpp') -cpp_args = [] -cpp_args += '-fno-rtti' -cpp_args += '-march=corei7' +cpp_args = ['-fno-rtti', '-march=corei7'] executable('yanet-logger', sources, diff --git a/meson.build b/meson.build index 08c20a68..6a1d6cf0 100644 --- a/meson.build +++ b/meson.build @@ -47,6 +47,7 @@ if target_option.contains('librib') subdir_done() endif + libdpdk = subproject('dpdk', default_options: [ 'platform=generic', 'cpu_instruction_set=corei7', @@ -58,6 +59,44 @@ libdpdk = subproject('dpdk', default_options: [ libjson = subproject('json') +# # Run the fix_dpdk_pc.sh script immediately after configuring DPDK +# fix_dpdk_pc = run_command( +# find_program('sh'), +# 'fix_dpdk_pc.sh', +# meson.current_build_dir(), +# check: true +# ) + +# # Ensure the script ran successfully +# if fix_dpdk_pc.returncode() != 0 +# error('fix_dpdk_pc.sh failed to execute successfully.') +# endif + +cmake = import('cmake') + +pcapplusplus_options = cmake.subproject_options() +pcapplusplus_options.add_cmake_defines({ + 'PCAPPP_BUILD_EXAMPLES': 'OFF', + 'PCAPPP_BUILD_TESTS': 'OFF', + 'PCAPPP_INSTALL': 'OFF', + + # 'PCAPPP_USE_DPDK': 'ON', + # 'DPDK_ROOT': meson.current_build_dir() / 'meson-private', + + # 'PKG_CONFIG_PATH': meson.current_build_dir() / 'meson-private', + # 'DPDK_ROOT': meson.current_source_dir() / 'subprojects' / 'dpdk', + # 'DPDK_BUILD_DIR': meson.current_build_dir() / 'subprojects' / 'dpdk', + 'CMAKE_CXX_FLAGS': '-fexceptions', +}) + +libpcapplusplus = cmake.subproject('pcap', options: pcapplusplus_options) + +pcapplusplus_deps = [ + libpcapplusplus.dependency('Common++'), + libpcapplusplus.dependency('Packet++'), + libpcapplusplus.dependency('Pcap++'), +] + if target_option.contains('buildenv') subdir('libprotobuf') subdir('libfwparser') diff --git a/subprojects/pcap b/subprojects/pcap new file mode 160000 index 00000000..91160dc8 --- /dev/null +++ b/subprojects/pcap @@ -0,0 +1 @@ +Subproject commit 91160dc8e158027a86c9208ad7f4982d83338f5f