Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change the use of rte_ring for dumping in/out/drop packets to a newer… #155

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions dataplane/controlplane.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1674,11 +1674,6 @@ void cControlPlane::mainThread()
break;
}
}
for (const auto& iter : dataPlane->workers)
{
cWorker* worker = iter.second;
ring_handle(worker->ring_toFreePackets, worker->ring_lowPriority);
}

for (auto& iter : kernel_interfaces)
{
Expand Down Expand Up @@ -1893,10 +1888,6 @@ unsigned cControlPlane::ring_handle(rte_ring* ring_to_free_mbuf,
{
handlePacket_farm(mbuf);
}
else if (metadata->flow.type == common::globalBase::eFlowType::slowWorker_dump)
{
handlePacket_dump(mbuf);
}
else if (metadata->flow.type == common::globalBase::eFlowType::slowWorker_repeat)
{
handlePacket_repeat(mbuf);
Expand Down
43 changes: 43 additions & 0 deletions dataplane/dataplane.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
#include "sock_dev.h"
#include "worker.h"

#define MAX_PACK_SIZE 16384
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I know, we don't have any coding style document apart from .clang-format which should specify such things. However, if we strive to use "modern C++", we shouldn't use macros.

From C++ Core Guidelines, part ES.31:
Don’t use macros for constants or “functions”
Reason
Macros are a major source of bugs. Macros don’t obey the usual scope and type rules. Macros don’t obey the usual rules for argument passing. Macros ensure that the human reader sees something different from what the compiler sees. Macros complicate tool building.

Here we could use constexpr unsigned long max_pack_size = 16384; or maybe constexpr auto MY_CONSTANT{42}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At least we should move the definition to common/config.release.h where it will be look more consistent.


common::log::LogPriority common::log::logPriority = common::log::TLOG_INFO;

cDataPlane::cDataPlane() :
Expand Down Expand Up @@ -1209,6 +1211,25 @@ eResult cDataPlane::allocateSharedMemory()
}
}

// init size for in/out/drop lowPriority ring
for (const auto& [socket_id, num] : number_of_workers_per_socket)
{
auto unit_size = sizeof(sharedmemory::item_header_t) + MAX_PACK_SIZE;
if (unit_size % RTE_CACHE_LINE_SIZE != 0)
{
unit_size += RTE_CACHE_LINE_SIZE - unit_size % RTE_CACHE_LINE_SIZE; /// round up
}

auto size = sizeof(sharedmemory::ring_header_t) + unit_size * getConfigValues().ring_lowPriority_size;

auto it = shm_size_per_socket.find(socket_id);
if (it == shm_size_per_socket.end())
{
it = shm_size_per_socket.emplace_hint(it, socket_id, 0);
}
it->second += size * num;
}

for (const auto& [socket_id, num] : number_of_workers_per_socket)
{
auto it = shm_size_per_socket.find(socket_id);
Expand Down Expand Up @@ -1327,6 +1348,28 @@ eResult cDataPlane::splitSharedMemoryPerWorkers()

ring_id++;
}
// init lowPriority ring
{
auto name = "r_lp_" + std::to_string(core_id);
auto offset = offsets[shm];
auto memaddr = (void*)((intptr_t)shm + offset);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid c-style casts.
It's generally better to avoid casts whatsoever, but I understand that some things cannot be done without them (though I'm pretty sure we can avoid some of them by using templates and/or std::variant, but that's a topic for discussion).

Here we should use named casts: auto memaddr = reinterpret_cast<void*>(reinterpret_cast<std::uintptr_t>(shm) + offset);

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use C-style typecast in sake of performance and the code is on our most important hot-path.

Also this memory region will contain many data structures (and arrays of them) mapped dynamically with data offsets. Another requirement that the region layout should be parseable by third-party application written in C or Go for example.

However, I think we could improve the approach but let us do this in the another piece of work.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, but I cannot resolve the conversation by myself, I guess I don't have the related permissions :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unlike static_cast, but like const_cast, the reinterpret_cast expression does not compile to any CPU instructions (except when converting between integers and pointers, or between pointers on obscure architectures where pointer representation depends on its type). It is primarily a compile-time directive which instructs the compiler to treat expression as if it had the type target-type.

So there is no penalty in using reinterpret_cast. It also has a benefit of being easily greped

sharedmemory::cSharedMemory ring;

auto unit_size = sizeof(sharedmemory::item_header_t) + MAX_PACK_SIZE;
if (unit_size % RTE_CACHE_LINE_SIZE != 0)
{
unit_size += RTE_CACHE_LINE_SIZE - unit_size % RTE_CACHE_LINE_SIZE; /// round up
}
const auto units_number = getConfigValues().ring_lowPriority_size;
const auto size = sizeof(sharedmemory::ring_header_t) + unit_size * units_number;
ring.init(memaddr, unit_size, units_number);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, I don't know if we allow or prohibit this, but C++ Core Guidelines tells us "ES.46: Avoid lossy (narrowing, truncating) arithmetic conversions".
This is a narrowing conversion from 'unsigned long' to signed type 'int' for both unit_size and units_member.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch

offsets[shm] += size;

worker->lowPriorityRing = ring;

auto meta = common::idp::get_shm_info::dump_meta(name, "lp", unit_size, units_number, core_id, socket_id, key, offset);
dumps_meta.emplace_back(meta);
}
Comment on lines +1351 to +1372
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This almost looks like a previous block of code (body of for (const auto& [tag, ring_cfg] : config.shared_memory) loop).
I think we could extract common functionality to a function.
something like:

  void foo(arguments)
  {
      auto name = base_name + std::to_string(core_id) + (is_low_priority ? "" : "_" + std::to_string(ring_id));
  
      auto unit_size = sizeof(sharedmemory::item_header_t) + dump_size;
      if (unit_size % RTE_CACHE_LINE_SIZE != 0)
      {
          unit_size += RTE_CACHE_LINE_SIZE - unit_size % RTE_CACHE_LINE_SIZE;
      }
  
      auto size = sizeof(sharedmemory::ring_header_t) + unit_size * units_number;
      auto offset = offsets[shm];
      auto memaddr = reinterpret_cast<void*>(reinterpret_cast<std::uintptr_t>(shm) + offset);
  
      sharedmemory::cSharedMemory ring;
      ring.init(memaddr, unit_size, units_number);
      offsets[shm] += size;
  
      auto meta = common::idp::get_shm_info::dump_meta(name, tag, unit_size, units_number, core_id, socket_id, key, offset);
      dumps_meta.emplace_back(meta);
  }

as we cannot interact with private worker fields in the regular function.

Then the "low priority ring" turns into:

foo(...);
worker->lowPriorityRing = ring;

and loop body turns into

foo();
worker->dumpRings[ring_id] = ring;                                                                                                                                  
tag_to_id[tag] = ring_id;                                          
ring_id++;  

}

for (auto& [core_id, worker] : workers)
Expand Down
104 changes: 15 additions & 89 deletions dataplane/worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ cWorker::cWorker(cDataPlane* dataPlane) :
translation_packet_id(0),
ring_highPriority(nullptr),
ring_normalPriority(nullptr),
ring_lowPriority(nullptr),
ring_toFreePackets(nullptr),
ring_log(nullptr),
packetsToSWNPRemainder(dataPlane->config.SWNormalPriorityRateLimitPerWorker)
Expand All @@ -66,11 +65,6 @@ cWorker::~cWorker()
rte_ring_free(ring_normalPriority);
}

if (ring_lowPriority)
{
rte_ring_free(ring_lowPriority);
}

if (ring_toFreePackets)
{
rte_ring_free(ring_toFreePackets);
Expand Down Expand Up @@ -140,15 +134,6 @@ eResult cWorker::init(const tCoreId& coreId,
return eResult::errorInitRing;
}

ring_lowPriority = rte_ring_create(("r_lp_" + std::to_string(coreId)).c_str(),
dataPlane->getConfigValues().ring_lowPriority_size,
socketId,
RING_F_SP_ENQ | RING_F_SC_DEQ);
if (!ring_lowPriority)
{
return eResult::errorInitRing;
}

ring_toFreePackets = rte_ring_create(("r_tfp_" + std::to_string(coreId)).c_str(),
dataPlane->getConfigValues().ring_toFreePackets_size,
socketId,
Expand Down Expand Up @@ -1062,26 +1047,11 @@ inline void cWorker::physicalPort_ingress_handle(const unsigned int& worker_port

if (basePermanently.globalBaseAtomic->physicalPort_flags[metadata->fromPortId] & YANET_PHYSICALPORT_FLAG_IN_DUMP)
{
if (!rte_ring_full(ring_lowPriority))
{
rte_mbuf* mbuf_clone = rte_pktmbuf_alloc(mempool);
if (mbuf_clone)
{
*YADECAP_METADATA(mbuf_clone) = *YADECAP_METADATA(mbuf);

rte_memcpy(rte_pktmbuf_mtod(mbuf_clone, char*),
rte_pktmbuf_mtod(mbuf, char*),
mbuf->data_len);

mbuf_clone->data_len = mbuf->data_len;
mbuf_clone->pkt_len = mbuf->pkt_len;

YADECAP_METADATA(mbuf_clone)->flow.type = common::globalBase::eFlowType::slowWorker_dump;
YADECAP_METADATA(mbuf_clone)->flow.data.dump.type = common::globalBase::dump_type_e::physicalPort_ingress;
YADECAP_METADATA(mbuf_clone)->flow.data.dump.id = metadata->fromPortId;
slowWorker_entry_lowPriority(mbuf_clone);
}
}
YADECAP_METADATA(mbuf)->flow.type = common::globalBase::eFlowType::slowWorker_dump;
YADECAP_METADATA(mbuf)->flow.data.dump.type = common::globalBase::dump_type_e::physicalPort_ingress;
YADECAP_METADATA(mbuf)->flow.data.dump.id = metadata->fromPortId;
auto& ring = lowPriorityRing;
ring.write(mbuf, common::globalBase::eFlowType::slowWorker_dump);
}
}

Expand Down Expand Up @@ -1327,26 +1297,12 @@ inline void cWorker::logicalPort_egress_handle()

if (basePermanently.globalBaseAtomic->physicalPort_flags[logicalPort.portId] & YANET_PHYSICALPORT_FLAG_OUT_DUMP)
{
if (!rte_ring_full(ring_lowPriority))
{
rte_mbuf* mbuf_clone = rte_pktmbuf_alloc(mempool);
if (mbuf_clone)
{
*YADECAP_METADATA(mbuf_clone) = *YADECAP_METADATA(mbuf);

rte_memcpy(rte_pktmbuf_mtod(mbuf_clone, char*),
rte_pktmbuf_mtod(mbuf, char*),
mbuf->data_len);

mbuf_clone->data_len = mbuf->data_len;
mbuf_clone->pkt_len = mbuf->pkt_len;

YADECAP_METADATA(mbuf_clone)->flow.type = common::globalBase::eFlowType::slowWorker_dump;
YADECAP_METADATA(mbuf_clone)->flow.data.dump.type = common::globalBase::dump_type_e::physicalPort_egress;
YADECAP_METADATA(mbuf_clone)->flow.data.dump.id = logicalPort.portId;
slowWorker_entry_lowPriority(mbuf_clone);
}
}
YADECAP_METADATA(mbuf)->flow.type = common::globalBase::eFlowType::slowWorker_dump;
YADECAP_METADATA(mbuf)->flow.data.dump.type = common::globalBase::dump_type_e::physicalPort_egress;
YADECAP_METADATA(mbuf)->flow.data.dump.id = logicalPort.portId;
auto& ring = lowPriorityRing;
ring.write(mbuf, common::globalBase::eFlowType::slowWorker_dump);
}
if (rte_mbuf_refcnt_read(mbuf) < 1)
{
Expand Down Expand Up @@ -5798,26 +5754,11 @@ inline void cWorker::drop(rte_mbuf* mbuf)

if (basePermanently.globalBaseAtomic->physicalPort_flags[metadata->fromPortId] & YANET_PHYSICALPORT_FLAG_DROP_DUMP)
{
if (!rte_ring_full(ring_lowPriority))
{
rte_mbuf* mbuf_clone = rte_pktmbuf_alloc(mempool);
if (mbuf_clone)
{
*YADECAP_METADATA(mbuf_clone) = *YADECAP_METADATA(mbuf);

rte_memcpy(rte_pktmbuf_mtod(mbuf_clone, char*),
rte_pktmbuf_mtod(mbuf, char*),
mbuf->data_len);

mbuf_clone->data_len = mbuf->data_len;
mbuf_clone->pkt_len = mbuf->pkt_len;

YADECAP_METADATA(mbuf_clone)->flow.type = common::globalBase::eFlowType::slowWorker_dump;
YADECAP_METADATA(mbuf_clone)->flow.data.dump.type = common::globalBase::dump_type_e::physicalPort_drop;
YADECAP_METADATA(mbuf_clone)->flow.data.dump.id = metadata->fromPortId;
slowWorker_entry_lowPriority(mbuf_clone);
}
}
YADECAP_METADATA(mbuf)->flow.type = common::globalBase::eFlowType::slowWorker_dump;
YADECAP_METADATA(mbuf)->flow.data.dump.type = common::globalBase::dump_type_e::physicalPort_drop;
YADECAP_METADATA(mbuf)->flow.data.dump.id = metadata->fromPortId;;
auto& ring = lowPriorityRing;
ring.write(mbuf, common::globalBase::eFlowType::slowWorker_dump);
}

rte_pktmbuf_free(mbuf);
Expand Down Expand Up @@ -5884,21 +5825,6 @@ inline void cWorker::slowWorker_entry_normalPriority(rte_mbuf* mbuf,
}
}

inline void cWorker::slowWorker_entry_lowPriority(rte_mbuf* mbuf)
{
/// @todo: worker::tStack

if (rte_ring_sp_enqueue(ring_lowPriority, (void*)mbuf))
{
stats.ring_lowPriority_drops++;
rte_pktmbuf_free(mbuf);
}
else
{
stats.ring_lowPriority_packets++;
}
}

YANET_NEVER_INLINE void cWorker::slowWorkerBeforeHandlePackets()
{
localBaseId = currentBaseId;
Expand Down
3 changes: 1 addition & 2 deletions dataplane/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ class cWorker

inline void slowWorker_entry_highPriority(rte_mbuf* mbuf, const common::globalBase::eFlowType& flowType); ///< @todo: DELETE and OPT
inline void slowWorker_entry_normalPriority(rte_mbuf* mbuf, const common::globalBase::eFlowType& flowType); ///< @todo: DELETE and OPT
inline void slowWorker_entry_lowPriority(rte_mbuf* mbuf); ///< @todo: DELETE and OPT

inline uint32_t get_tcp_state_timeout(uint8_t flags, const dataplane::globalBase::state_timeout_config_t& state_timeout_config);
inline uint32_t get_state_timeout(rte_mbuf* mbuf, dataplane::metadata* metadata, const dataplane::globalBase::state_timeout_config_t& state_timeout_config);
Expand Down Expand Up @@ -321,7 +320,6 @@ class cWorker

rte_ring* ring_highPriority;
rte_ring* ring_normalPriority;
rte_ring* ring_lowPriority;
dataplane::perf::tsc_deltas* tsc_deltas;
rte_ring* ring_toFreePackets;

Expand All @@ -337,6 +335,7 @@ class cWorker
int32_t packetsToSWNPRemainder;

sharedmemory::cSharedMemory dumpRings[YANET_CONFIG_SHARED_RINGS_NUMBER];
sharedmemory::cSharedMemory lowPriorityRing;

samples::Sampler sampler;

Expand Down