From 0fd8555ffbd250b9a8608f66547220f82b67d323 Mon Sep 17 00:00:00 2001 From: Ivan Morozko Date: Thu, 12 Dec 2024 20:17:46 +0400 Subject: [PATCH] TMP: add pcaps, for now dealing with capacity of shm --- common/bufferring.h | 29 ++++++++++--- dataplane/action_dispatcher.h | 5 ++- dataplane/config.h | 2 +- dataplane/controlplane.cpp | 2 +- dataplane/dataplane.cpp | 26 +++++------- dataplane/meson.build | 1 + dataplane/sharedmemory.cpp | 65 +++++++++++++++------------- dataplane/sharedmemory.h | 79 +++++++++++++++++++++++++++-------- dataplane/worker.h | 3 +- 9 files changed, 139 insertions(+), 73 deletions(-) diff --git a/common/bufferring.h b/common/bufferring.h index 6c4e4aea..105a158e 100644 --- a/common/bufferring.h +++ b/common/bufferring.h @@ -24,20 +24,39 @@ struct PacketBufferRing { PacketBufferRing() = default; - PacketBufferRing(void* memory, size_t ring_size, size_t item_count) : - unit_size(sizeof(item_header_t) + ring_size), units_number(item_count) + // static function, helps to get capacity in DumpRingRaw + static size_t GetCapacity(size_t ring_size, size_t item_count, size_t unit_size = 0) { - if (unit_size % RTE_CACHE_LINE_SIZE != 0) + if (unit_size == 0) { - unit_size += RTE_CACHE_LINE_SIZE - unit_size % RTE_CACHE_LINE_SIZE; /// round up + unit_size = sizeof(item_header_t) + ring_size; + + if (unit_size % RTE_CACHE_LINE_SIZE != 0) + { + unit_size += RTE_CACHE_LINE_SIZE - unit_size % RTE_CACHE_LINE_SIZE; /// round up + } } - capacity = sizeof(ring_header_t) + unit_size * units_number; + size_t capacity = sizeof(ring_header_t) + unit_size * item_count; + if (capacity % RTE_CACHE_LINE_SIZE != 0) { capacity += RTE_CACHE_LINE_SIZE - capacity % RTE_CACHE_LINE_SIZE; /// round up } + return capacity; + } + + PacketBufferRing(void* memory, size_t ring_size, size_t item_count) : + unit_size(sizeof(item_header_t) + ring_size), units_number(item_count) + { + if (unit_size % RTE_CACHE_LINE_SIZE != 0) + { + unit_size += RTE_CACHE_LINE_SIZE - unit_size % RTE_CACHE_LINE_SIZE; /// round up + } + + capacity = GetCapacity(ring_size, item_count, unit_size); + ring = (ring_t*)memory; } diff --git a/dataplane/action_dispatcher.h b/dataplane/action_dispatcher.h index 2b57f141..da1cf061 100644 --- a/dataplane/action_dispatcher.h +++ b/dataplane/action_dispatcher.h @@ -88,8 +88,9 @@ struct ActionDispatcher return; } - auto& ring = args.worker->dumpRings[ring_id]; - ring.write(args.mbuf, flow.type); + // polymorphic, will execute either DumpRingRaw or DumpRingPcap method, + // likely to be devirtualized + args.worker->dump_rings[ring_id]->write(args.mbuf, flow.type); } static void execute(const common::StateTimeoutAction& action, const Flow& flow, const ActionDispatcherArgs& args) diff --git a/dataplane/config.h b/dataplane/config.h index bd43f6ad..708e00e4 100644 --- a/dataplane/config.h +++ b/dataplane/config.h @@ -25,9 +25,9 @@ struct tDataPlaneConfig struct DumpConfig { + DumpFormat format; unsigned int size; unsigned int count; - DumpFormat format; }; static DumpFormat StringToDumpFormat(const std::string& format_str) diff --git a/dataplane/controlplane.cpp b/dataplane/controlplane.cpp index 22ee69f0..44a0e198 100644 --- a/dataplane/controlplane.cpp +++ b/dataplane/controlplane.cpp @@ -1050,7 +1050,7 @@ common::idp::hexdump_ring::response cControlPlane::hexdump_ring(const common::id for (cWorker* worker : dataPlane->workers_vector) { - auto ring = worker->dumpRings[ring_id]; + auto ring = worker->dump_rings[ring_id]; auto addr = reinterpret_cast(ring.buffer.ring); diff --git a/dataplane/dataplane.cpp b/dataplane/dataplane.cpp index ea73a6f6..5c1b02cc 100644 --- a/dataplane/dataplane.cpp +++ b/dataplane/dataplane.cpp @@ -1601,12 +1601,7 @@ static std::unordered_map calculate_shared_memory_size(const // Calculate sizes based on shared memory configuration for (const auto& ring_cfg : config.shared_memory) { - const auto& [dump_size, dump_count, format] = ring_cfg.second; - YANET_GCC_BUG_UNUSED(format); - - // Temporarily materialization will occur to create an object and get it's capacity. - // It's okay, because this object is lightweight - size_t size = common::PacketBufferRing(nullptr, dump_size, dump_count).capacity; + size_t size = sharedmemory::GetCapacity(ring_cfg.second); for (const auto& [socket_id, worker_count] : workers_per_socket) { @@ -1623,6 +1618,7 @@ static std::unordered_map calculate_shared_memory_size(const return shm_size_per_socket; } +//FIXME: why is this class not using class SharedMemory from common/shared_memory.h? eResult cDataPlane::allocateSharedMemory() { // shared memory size for each numa @@ -1681,8 +1677,7 @@ eResult cDataPlane::allocateSharedMemory() /// split memory per worker eResult cDataPlane::splitSharedMemoryPerWorkers() { - using sharedmemory::SharedMemoryDumpRing; - using utils::ShiftBuffer; + using namespace sharedmemory; for (cWorker* worker : workers_vector) { @@ -1701,16 +1696,15 @@ eResult cDataPlane::splitSharedMemoryPerWorkers() int ring_id = 0; for (const auto& [tag, ring_cfg] : config.shared_memory) { - const auto& [dump_size, dump_count, format] = ring_cfg; + const auto& [format, dump_size, dump_count] = ring_cfg; auto memaddr = utils::ShiftBuffer(shm, offset); + offset += GetCapacity(ring_cfg); - sharedmemory::SharedMemoryDumpRing ring(format, memaddr, dump_size, dump_count); - worker->dumpRings[ring_id] = ring; - - offset += ring.Capacity(); + worker->dump_rings[ring_id] = CreateSharedMemoryDumpRing(ring_cfg, memaddr); std::string name = "shm_" + std::to_string(core_id) + "_" + std::to_string(ring_id); + // TODO: add format here dumps_meta.emplace_back(name, tag, dump_size, dump_count, core_id, socket_id, key, offset); tag_to_id[tag] = ring_id; @@ -1719,10 +1713,10 @@ eResult cDataPlane::splitSharedMemoryPerWorkers() } auto memaddr = utils::ShiftBuffer(shm, offset); - worker->tsc_deltas = new (memaddr) dataplane::perf::tsc_deltas{}; - offset += sizeof(dataplane::perf::tsc_deltas); + worker->tsc_deltas = new (memaddr) dataplane::perf::tsc_deltas{}; + tscs_meta.emplace_back(core_id, socket_id, key, offset); } @@ -2214,7 +2208,7 @@ eResult cDataPlane::parseSharedMemory(const nlohmann::json& json) return eResult::invalidConfigurationFile; } - config.shared_memory[tag] = {size, count, tDataPlaneConfig::StringToDumpFormat(format_str)}; + config.shared_memory[tag] = {tDataPlaneConfig::StringToDumpFormat(format_str), size, count}; } return eResult::success; diff --git a/dataplane/meson.build b/dataplane/meson.build index 73962ecd..bf121d8f 100644 --- a/dataplane/meson.build +++ b/dataplane/meson.build @@ -3,6 +3,7 @@ dependencies += libdpdk.get_variable('dpdk_dep') dependencies += libjson.get_variable('nlohmann_json_dep') dependencies += dependency('libsystemd') dependencies += dependency('threads') +dependencies += pcapplusplus_deps sources = files('bus.cpp', 'controlplane.cpp', diff --git a/dataplane/sharedmemory.cpp b/dataplane/sharedmemory.cpp index 64104a30..d27719b3 100644 --- a/dataplane/sharedmemory.cpp +++ b/dataplane/sharedmemory.cpp @@ -1,58 +1,63 @@ #include "sharedmemory.h" #include "common/type.h" +#include "common/utils.h" #include "metadata.h" -using namespace sharedmemory; +namespace sharedmemory +{ -SharedMemoryDumpRing::SharedMemoryDumpRing(DumpFormat format, void* memory, size_t dump_size, size_t dump_count) : - format_(format) +DumpRingRaw::DumpRingRaw(void* memory, size_t max_pkt_size, size_t pkt_count) : + buffer_(memory, max_pkt_size, pkt_count), ring_(buffer_.ring) { - switch (format_) - { - case DumpFormat::kPcap: - // init somehow with pcaps - break; - - case DumpFormat::kRaw: - buffer = common::PacketBufferRing(memory, dump_size, dump_count); - capacity_ = buffer.capacity; - - buffer.ring->header.before = 0; - buffer.ring->header.after = 0; - - break; - default: - YANET_THROW("Wrong shared memory dump format"); - } + ring_->header.before = 0; + ring_->header.after = 0; } -void SharedMemoryDumpRing::write(rte_mbuf* mbuf, common::globalBase::eFlowType flow_type) +void DumpRingRaw::write(rte_mbuf* mbuf, common::globalBase::eFlowType flow_type) { // Each ring has its own header, the header contains absolute position // to which next packet should be written. Position has two state: // -- "before" increments immediately before of copying data to memory; // -- "after" increments after copying data. - uint64_t wpos = (buffer.ring->header.before) % buffer.units_number; - buffer.ring->header.before++; - auto* item = (item_t*)((uintptr_t)buffer.ring->memory + (wpos * buffer.unit_size)); + uint64_t wpos = (ring_->header.before) % buffer_.units_number; + ring_->header.before++; + auto* item = utils::ShiftBuffer(ring_->memory, wpos * buffer_.unit_size); dataplane::metadata* metadata = YADECAP_METADATA(mbuf); - uint64_t memory_size = buffer.unit_size - sizeof(ring_header_t); + uint64_t memory_size = buffer_.unit_size - sizeof(ring_header_t); uint64_t copy_size = RTE_MIN(memory_size, mbuf->data_len); item->header.size = copy_size; item->header.tag = metadata->hash; item->header.in_logicalport_id = metadata->in_logicalport_id; item->header.out_logicalport_id = metadata->out_logicalport_id; - item->header.flow_type = (uint8_t)flow_type; + item->header.flow_type = static_cast(flow_type); - memcpy(item->memory, - rte_pktmbuf_mtod(mbuf, void*), - copy_size); + memcpy(item->memory, rte_pktmbuf_mtod(mbuf, void*), copy_size); YANET_MEMORY_BARRIER_COMPILE; - buffer.ring->header.after++; + ring_->header.after++; +} + +size_t DumpRingRaw::GetCapacity(size_t max_pkt_size, size_t pkt_count) +{ + return PacketBufferRing::GetCapacity(max_pkt_size, pkt_count); +} + +DumpRingPcap::DumpRingPcap(void* memory, size_t max_pkt_size, size_t pkt_count) : dev_(memory, 12, 3) +{ +} + +void DumpRingPcap::write(rte_mbuf* mbuf, common::globalBase::eFlowType flow_type) +{ } + +size_t DumpRingPcap::GetCapacity(size_t max_pkt_size, size_t pkt_count) +{ + return 0; +} + +} // namespace sharedmemory diff --git a/dataplane/sharedmemory.h b/dataplane/sharedmemory.h index e1fd8d54..50e57aac 100644 --- a/dataplane/sharedmemory.h +++ b/dataplane/sharedmemory.h @@ -1,4 +1,5 @@ #pragma once +//TODO: RENAME TO dump_rings.h #include @@ -6,36 +7,80 @@ #include "common/type.h" #include "config.h" +#include "pcap_shm_device.h" namespace sharedmemory { - -using ring_header_t = common::PacketBufferRing::ring_header_t; -using ring_t = common::PacketBufferRing::ring_t; -using item_header_t = common::PacketBufferRing::item_header_t; -using item_t = common::PacketBufferRing::item_t; using DumpFormat = tDataPlaneConfig::DumpFormat; +using DumpConfig = tDataPlaneConfig::DumpConfig; + +struct DumpRingBase +{ + virtual ~DumpRingBase() = 0; -class SharedMemoryDumpRing + virtual void write(rte_mbuf* mbuf, common::globalBase::eFlowType flow_type) = 0; +}; + +class DumpRingRaw : public DumpRingBase { - DumpFormat format_; - size_t capacity_; + using PacketBufferRing = common::PacketBufferRing; + using ring_t = PacketBufferRing::ring_t; + using item_t = PacketBufferRing::item_t; + using ring_header_t = PacketBufferRing::ring_header_t; + + PacketBufferRing buffer_; + ring_t* ring_; public: - SharedMemoryDumpRing() : - format_(DumpFormat::kRaw), capacity_(0) {} + DumpRingRaw(void* memory, size_t max_pkt_size, size_t pkt_count); - SharedMemoryDumpRing(DumpFormat format, void* memory, size_t dump_size, size_t dump_count); + void write(rte_mbuf* mbuf, common::globalBase::eFlowType flow_type) override; - void write(rte_mbuf* mbuf, common::globalBase::eFlowType flow_type); + static size_t GetCapacity(size_t max_pkt_size, size_t pkt_count); +}; + +class DumpRingPcap : public DumpRingBase +{ + pcpp::PcapShmWriterDevice dev_; + +public: + DumpRingPcap(void* memory, size_t max_pkt_size, size_t pkt_count); - // FIXME: make it private. I've made it public to simplify hexdump code - common::PacketBufferRing buffer; + void write(rte_mbuf* mbuf, common::globalBase::eFlowType flow_type) override; - [[nodiscard]] size_t Capacity() const + static size_t GetCapacity(size_t max_pkt_size, size_t pkt_count); +}; + +inline size_t GetCapacity(const DumpConfig& config) +{ + auto& [format, max_pkt_size, pkt_count] = config; + + switch (format) { - return capacity_; + case DumpFormat::kRaw: + return DumpRingRaw::GetCapacity(max_pkt_size, pkt_count); + case DumpFormat::kPcap: + return DumpRingPcap::GetCapacity(max_pkt_size, pkt_count); + default: + YANET_THROW("Invalid dump format"); + std::abort(); } -}; +} + +inline std::unique_ptr CreateSharedMemoryDumpRing(const DumpConfig& config, void* memory) +{ + auto& [format, max_pkt_size, pkt_count] = config; + + switch (format) + { + case DumpFormat::kRaw: + return std::make_unique(memory, max_pkt_size, pkt_count); + case DumpFormat::kPcap: + return std::make_unique(memory, max_pkt_size, pkt_count); + default: + YANET_THROW("Invalid dump format"); + std::abort(); + } +} } // namespace sharedmemory diff --git a/dataplane/worker.h b/dataplane/worker.h index fee3012a..4486b1bf 100644 --- a/dataplane/worker.h +++ b/dataplane/worker.h @@ -356,7 +356,8 @@ class cWorker // will decrease with each new packet sent to slow worker, replenishes each N mseconds int32_t packetsToSWNPRemainder; - sharedmemory::SharedMemoryDumpRing dumpRings[YANET_CONFIG_SHARED_RINGS_NUMBER]; + using DumpRingBasePtr = std::unique_ptr; + std::array dump_rings; samples::Sampler sampler;