Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ endif

if WITH_DPDK
ipfixprobe_input_src+=\
input/dpdk/dpdkMbuf.hpp \
input/dpdk/dpdkMbuf.cpp \
input/dpdk/dpdkDevice.hpp \
input/dpdk/dpdkDevice.cpp \
input/dpdk.cpp \
input/dpdk.h \
input/dpdk-ring.cpp \
Expand Down
2 changes: 1 addition & 1 deletion init/ipfixprobed
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ if [ -e "$CONFFILE" ]; then
dpdkinput=""
if [ "$USE_DPDK" = "1" ]; then
# set up DPDK interface(s)
dpdkinput=("-i" "dpdk;p=$DPDK_PORT;q=$DPDK_QUEUES_COUNT;e=-c $DPDK_CPUMASK -a $DPDK_DEVICE")
dpdkinput=("-i" "dpdk;p=$DPDK_PORT;q=$DPDK_QUEUES_COUNT;e=--lcores $DPDK_LCORES -a $DPDK_DEVICE")
for ((ifc=1; ifc<$DPDK_QUEUES_COUNT;ifc++)); do
dpdkinput+=("-i" "dpdk")
done
Expand Down
4 changes: 2 additions & 2 deletions init/link0.conf.example
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@

#DPDK_DEVICE=0000:43:00.0

# Set CPU mask:
# Set mapping of DPDK lcores to threads:

#DPDK_CPUMASK=0x1
#DPDK_LCORES="(0-7)@(0-7)"

# Set network device port:

Expand Down
278 changes: 27 additions & 251 deletions input/dpdk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
*
*/


#include <cstring>
#include <mutex>
#include <rte_ethdev.h>
Expand Down Expand Up @@ -121,9 +122,8 @@ DpdkCore& DpdkCore::getInstance()

DpdkCore::~DpdkCore()
{
rte_eth_dev_stop(m_portId);
rte_eth_dev_close(m_portId);
rte_eal_cleanup();
m_dpdkDevices.clear();
//rte_eal_cleanup(); // segfault?
m_instance = nullptr;
}

Expand All @@ -135,103 +135,9 @@ void DpdkCore::deinit()
}
}

void DpdkCore::initInterface()
{
validatePort();
auto portConfig = createPortConfig();
configurePort(portConfig);
}

void DpdkCore::validatePort()
{
if (!rte_eth_dev_is_valid_port(m_portId)) {
throw PluginError("Invalid DPDK port specified");
}
}

struct rte_eth_conf DpdkCore::createPortConfig()
{
if (m_rxQueueCount > 1 && !m_supportedRSS) {
std::cerr << "RSS is not supported by card, multiple queues will not work as expected." << std::endl;
throw PluginError("Required RSS for q>1 is not supported.");
}

#if RTE_VERSION >= RTE_VERSION_NUM(21, 11, 0, 0)
rte_eth_conf portConfig {.rxmode = {.mtu = RTE_ETHER_MAX_LEN}};
#else
rte_eth_conf portConfig {.rxmode = {.max_rx_pkt_len = RTE_ETHER_MAX_LEN}};
#endif

if (m_supportedRSS) {
#if RTE_VERSION >= RTE_VERSION_NUM(21, 11, 0, 0)
portConfig.rxmode.mq_mode = RTE_ETH_MQ_RX_RSS;
#else
portConfig.rxmode.mq_mode = ETH_MQ_RX_RSS;
#endif
} else {
portConfig.rxmode.mq_mode = RTE_ETH_MQ_RX_NONE;
}

if (m_supportedHWTimestamp) {
portConfig.rxmode.offloads |= RTE_ETH_RX_OFFLOAD_TIMESTAMP;
}
return portConfig;
}

void DpdkCore::configurePort(const struct rte_eth_conf& portConfig)
{
if (rte_eth_dev_configure(m_portId, m_rxQueueCount, m_txQueueCount, &portConfig)) {
throw PluginError("Unable to configure interface");
}
}

void DpdkCore::configureRSS()
{
if (!m_supportedRSS) {
std::cerr << "SKipped RSS hash setting for port " << m_portId << "." << std::endl;
return;
}

constexpr size_t RSS_KEY_LEN = 40;
// biflow hash key
static uint8_t rssKey[RSS_KEY_LEN] = {
0x6D, 0x5A, 0x6D, 0x5A, 0x6D, 0x5A, 0x6D, 0x5A, 0x6D, 0x5A,
0x6D, 0x5A, 0x6D, 0x5A, 0x6D, 0x5A, 0x6D, 0x5A, 0x6D, 0x5A,
0x6D, 0x5A, 0x6D, 0x5A, 0x6D, 0x5A, 0x6D, 0x5A, 0x6D, 0x5A,
0x6D, 0x5A, 0x6D, 0x5A, 0x6D, 0x5A, 0x6D, 0x5A, 0x6D, 0x5A
};

struct rte_eth_rss_conf rssConfig = {
.rss_key = rssKey,
.rss_key_len = RSS_KEY_LEN,
#if RTE_VERSION >= RTE_VERSION_NUM(21, 11, 0, 0)
.rss_hf = RTE_ETH_RSS_IP,
#else
.rss_hf = ETH_RSS_IP,
#endif
};

if (rte_eth_dev_rss_hash_update(m_portId, &rssConfig)) {
std::cerr << "Setting RSS hash for port " << m_portId << "." << std::endl;
}
}

void DpdkCore::enablePort()
{
if (rte_eth_dev_start(m_portId) < 0) {
throw PluginError("Unable to start DPDK port");
}

if (rte_eth_promiscuous_enable(m_portId)) {
throw PluginError("Unable to set promiscuous mode");
}
}

void DpdkCore::registerRxTimestamp()
uint16_t DpdkCore::getMbufsCount() const noexcept
{
if (rte_mbuf_dyn_rx_timestamp_register(&m_rxTimestampOffset, NULL)) {
throw PluginError("Unable to get Rx timestamp offset");
}
return m_mBufsCount;
}

void DpdkCore::configure(const char* params)
Expand All @@ -240,55 +146,24 @@ void DpdkCore::configure(const char* params)
return;
}


try {
parser.parse(params);
} catch (ParserError& e) {
throw PluginError(e.what());
}

m_portId = parser.port_num();
m_rxQueueCount = parser.rx_queues();
configureEal(parser.eal_params());

/* recognize NIC driver and check capabilities */
recognizeDriver();
registerRxTimestamp();
initInterface();
isConfigured = true;
}

void DpdkCore::recognizeDriver()
{
rte_eth_dev_info rteDevInfo;
if (rte_eth_dev_info_get(m_portId, &rteDevInfo)) {
throw PluginError("Unable to get rte dev info");
}

if (std::strcmp(rteDevInfo.driver_name, "net_nfb") == 0) {
m_isNfbDpdkDriver = true;
}

std::cerr << "Capabilities of the port " << m_portId << " with driver " << rteDevInfo.driver_name << ":" << std::endl;
std::cerr << "\tRX offload: " << rteDevInfo.rx_offload_capa << std::endl;
std::cerr << "\tflow type RSS offloads: " << rteDevInfo.flow_type_rss_offloads << std::endl;
uint16_t mempoolSize = parser.pkt_mempool_size();
uint16_t rxQueueCount = parser.rx_queues();
m_mBufsCount = parser.pkt_buffer_size();

/* Check if RSS hashing is supported in NIC */
m_supportedRSS = (rteDevInfo.flow_type_rss_offloads & RTE_ETH_RSS_IP) != 0;
std::cerr << "\tDetected RSS offload capability: " << (m_supportedRSS ? "yes" : "no") << std::endl;
configureEal(parser.eal_params());

/* Check if HW timestamps are supported, we support NFB cards only */
if (m_isNfbDpdkDriver) {
m_supportedHWTimestamp = (rteDevInfo.rx_offload_capa & RTE_ETH_RX_OFFLOAD_TIMESTAMP) != 0;
} else {
m_supportedHWTimestamp = false;
m_dpdkDevices.reserve(parser.port_numbers().size());
for (auto portID : parser.port_numbers()) {
m_dpdkDevices.emplace_back(portID, rxQueueCount, mempoolSize, m_mBufsCount);
}
std::cerr << "\tDetected HW timestamp capability: " << (m_supportedHWTimestamp ? "yes" : "no") << std::endl;
}

bool DpdkCore::isNfbDpdkDriver()
{
return m_isNfbDpdkDriver;
isConfigured = true;
}

std::vector<char *> DpdkCore::convertStringToArgvFormat(const std::string& ealParams)
Expand Down Expand Up @@ -316,37 +191,14 @@ void DpdkCore::configureEal(const std::string& ealParams)
}
}

uint16_t DpdkCore::getRxQueueId()
uint16_t DpdkCore::getRxQueueId() noexcept
{
return m_currentRxId++;
}

void DpdkCore::startIfReady()
{
if (m_rxQueueCount == m_currentRxId) {
configureRSS();
enablePort();
is_ifc_ready = true;

std::cerr << "DPDK input at port " << m_portId << " started." << std::endl;
}
}

int DpdkCore::getRxTimestampOffset()
{
return m_rxTimestampOffset;
}

int DpdkCore::getRxTimestampDynflag()
{
return RTE_BIT64(rte_mbuf_dynflag_lookup(RTE_MBUF_DYNFLAG_RX_TIMESTAMP_NAME, NULL));
}

DpdkReader::DpdkReader()
: m_dpdkCore(DpdkCore::getInstance())
{
pkts_read_ = 0;
m_useHwRxTimestamp = false;
}

DpdkReader::~DpdkReader()
Expand All @@ -358,104 +210,28 @@ void DpdkReader::init(const char* params)
{
m_dpdkCore.configure(params);
m_rxQueueId = m_dpdkCore.getRxQueueId();
m_portId = m_dpdkCore.parser.port_num();
m_rxTimestampOffset = m_dpdkCore.getRxTimestampOffset();
m_rxTimestampDynflag = m_dpdkCore.getRxTimestampDynflag();
m_useHwRxTimestamp = m_dpdkCore.isNfbDpdkDriver();

createRteMempool(m_dpdkCore.parser.pkt_mempool_size());
createRteMbufs(m_dpdkCore.parser.pkt_buffer_size());
setupRxQueue();

m_dpdkCore.startIfReady();
}

void DpdkReader::createRteMempool(uint16_t mempoolSize)
{
std::string mpool_name = "mbuf_pool_" + std::to_string(m_rxQueueId);
rteMempool = rte_pktmbuf_pool_create(
mpool_name.c_str(),
mempoolSize,
MEMPOOL_CACHE_SIZE,
0,
RTE_MBUF_DEFAULT_BUF_SIZE,
rte_lcore_to_socket_id(m_rxQueueId));
if (!rteMempool) {
throw PluginError("Unable to create memory pool. " + std::string(rte_strerror(rte_errno)));
}
}

void DpdkReader::createRteMbufs(uint16_t mbufsSize)
{
try {
mbufs_.resize(mbufsSize);
} catch (const std::exception& e) {
throw PluginError(e.what());
}
}

void DpdkReader::setupRxQueue()
{
int ret = rte_eth_rx_queue_setup(
m_portId,
m_rxQueueId,
mbufs_.size(),
rte_eth_dev_socket_id(m_portId),
nullptr,
rteMempool);
if (ret < 0) {
throw PluginError("Unable to set up RX queues");
}
m_dpdkDeviceCount = m_dpdkCore.getDpdkDeviceCount();
mBufs.resize(m_dpdkCore.getMbufsCount());
}

struct timeval DpdkReader::getTimestamp(rte_mbuf* mbuf)
{
struct timeval tv;
if (m_useHwRxTimestamp && (mbuf->ol_flags & m_rxTimestampDynflag)) {
static constexpr time_t nanosecInSec = 1000000000;
static constexpr time_t nsecInUsec = 1000;

rte_mbuf_timestamp_t timestamp = *RTE_MBUF_DYNFIELD(mbuf, m_rxTimestampOffset, rte_mbuf_timestamp_t *);
tv.tv_sec = timestamp / nanosecInSec;
tv.tv_usec = (timestamp - ((tv.tv_sec) * nanosecInSec)) / nsecInUsec;

return tv;
} else {
auto now = std::chrono::system_clock::now();
auto now_t = std::chrono::system_clock::to_time_t(now);

auto dur = now - std::chrono::system_clock::from_time_t(now_t);
auto micros = std::chrono::duration_cast<std::chrono::microseconds>(dur).count();

tv.tv_sec = now_t;
tv.tv_usec = micros;
return tv;
}

}

InputPlugin::Result DpdkReader::get(PacketBlock& packets)
{
while (m_dpdkCore.is_ifc_ready == false) {
usleep(1000);
}

#ifndef WITH_FLEXPROBE
parser_opt_t opt {&packets, false, false, 0};
#endif

packets.cnt = 0;
for (auto i = 0; i < pkts_read_; i++) {
rte_pktmbuf_free(mbufs_[i]);
}
pkts_read_ = rte_eth_rx_burst(m_portId, m_rxQueueId, mbufs_.data(), mbufs_.size());
if (pkts_read_ == 0) {

DpdkDevice& dpdkDevice = m_dpdkCore.getDpdkDevice(m_dpdkDeviceIndex++ % m_dpdkDeviceCount);
uint16_t recivedPackets = dpdkDevice.receive(mBufs, m_rxQueueId);
if (!recivedPackets) {
return Result::TIMEOUT;
}

for (auto i = 0; i < pkts_read_; i++) {
for (auto packetID = 0; packetID < recivedPackets; packetID++) {
#ifdef WITH_FLEXPROBE
// Convert Flexprobe pre-parsed packet into IPFIXPROBE packet
auto conv_result = convert_from_flexprobe(mbufs_[i], packets.pkts[packets.cnt]);
auto conv_result = convert_from_flexprobe(mBufs[packetID], packets.pkts[packets.cnt]);
packets.bytes += packets.pkts[packets.cnt].packet_len_wire;
m_seen++;

Expand All @@ -466,10 +242,10 @@ InputPlugin::Result DpdkReader::get(PacketBlock& packets)
packets.cnt++;
#else
parse_packet(&opt,
getTimestamp(mbufs_[i]),
rte_pktmbuf_mtod(mbufs_[i], const std::uint8_t*),
rte_pktmbuf_data_len(mbufs_[i]),
rte_pktmbuf_data_len(mbufs_[i]));
dpdkDevice.getPacketTimestamp(mBufs[packetID]),
rte_pktmbuf_mtod(mBufs[packetID], const std::uint8_t*),
rte_pktmbuf_data_len(mBufs[packetID]),
rte_pktmbuf_data_len(mBufs[packetID]));
m_seen++;
m_parsed++;
#endif
Expand Down
Loading