Skip to content

Commit

Permalink
Simulate link prober offload
Browse files Browse the repository at this point in the history
Signed-off-by: Longxiang Lyu <[email protected]>
  • Loading branch information
lolyu committed Jan 19, 2024
1 parent f34cb09 commit ea58665
Show file tree
Hide file tree
Showing 54 changed files with 2,379 additions and 36 deletions.
3 changes: 2 additions & 1 deletion objects.mk
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ LIBS := \
-lboost_program_options \
-lboost_filesystem \
-lnl-3 \
-lnl-route-3
-lnl-route-3 \
-lhiredis

LIBS_TEST := \
-lgtest_main \
Expand Down
72 changes: 72 additions & 0 deletions src/DbInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
#include <boost/bind/bind.hpp>
#include <boost/date_time/gregorian/gregorian.hpp>
#include <boost/lexical_cast.hpp>
#include "swss/json.hpp"

#include "swss/netdispatcher.h"
#include "swss/netlink.h"
#include "swss/macaddress.h"
#include "swss/select.h"
#include "swss/json.hpp"

#include "DbInterface.h"
#include "MuxManager.h"
Expand All @@ -42,8 +44,11 @@

namespace mux
{
using json = nlohmann::json;

constexpr auto DEFAULT_TIMEOUT_MSEC = 1000;
std::vector<std::string> DbInterface::mMuxState = {"active", "standby", "unknown", "Error"};
std::vector<std::string> DbInterface::mLinkProberState = {"Active", "Standby", "Unknown", "Wait", "PeerWait", "PeerActive", "PeerUnknown", "Init", "Up", "Down", "Init", "Up", "Down"};
std::vector<std::string> DbInterface::mMuxLinkmgrState = {"uninitialized", "unhealthy", "healthy"};
std::vector<std::string> DbInterface::mMuxMetrics = {"start", "end"};
std::vector<std::string> DbInterface::mLinkProbeMetrics = {"link_prober_unknown_start", "link_prober_unknown_end", "link_prober_wait_start", "link_prober_active_start", "link_prober_standby_start"};
Expand Down Expand Up @@ -112,6 +117,18 @@ void DbInterface::setPeerMuxState(const std::string &portName, mux_state::MuxSta
));
}

void DbInterface::setLinkProberSessionState(const std::string portName, const std::string &sessionId, link_prober::LinkProberState::Label label)
{
MUXLOGDEBUG(boost::format("%s: setting link prober session %s state to %s") % portName % sessionId % mLinkProberState[label]);

boost::asio::post(mStrand, boost::bind(
&DbInterface::handleSetLinkProberSessionState,
this,
sessionId,
label
));
}


//
// ---> probeMuxState(const std::string &portName)
Expand Down Expand Up @@ -308,6 +325,7 @@ void DbInterface::initialize()
try {
mAppDbPtr = std::make_shared<swss::DBConnector> ("APPL_DB", 0);
mStateDbPtr = std::make_shared<swss::DBConnector> ("STATE_DB", 0);
mAsicDbPtr = std::make_shared<swss::DBConnector>("ASIC_DB", 0);

mAppDbMuxTablePtr = std::make_shared<swss::ProducerStateTable> (
mAppDbPtr.get(), APP_MUX_CABLE_TABLE_NAME
Expand All @@ -334,6 +352,9 @@ void DbInterface::initialize()
mStateDbPtr.get(), STATE_MUX_SWITCH_CAUSE_TABLE_NAME
);
mMuxStateTablePtr = std::make_shared<swss::Table> (mStateDbPtr.get(), STATE_MUX_CABLE_TABLE_NAME);
mAsicDbNotificationChannelPtr = std::make_shared<swss::NotificationProducer> (
mAsicDbPtr.get(), LINK_PROBER_SESSION_STATE_CHANGE_NOTIFICATION_CHANNEL
);

mSwssThreadPtr = std::make_shared<boost::thread> (&DbInterface::handleSwssNotification, this);
}
Expand Down Expand Up @@ -420,6 +441,25 @@ void DbInterface::handleSetPeerMuxState(const std::string portName, mux_state::M
}
}

void DbInterface::handleSetLinkProberSessionState(const std::string &sessionId, link_prober::LinkProberState::Label label)
{
std::string notification;
std::vector<swss::FieldValueTuple> entries;

json j = json::array();
json item;
item["link_prober_session_id"] = sessionId;
item["session_state"] = mLinkProberState[label];
j.push_back(item);
notification = j.dump();

mAsicDbNotificationChannelPtr->send(
LINK_PROBER_SESSION_STATE_CHANGE_NOTIFICATION,
notification,
entries
);
}

//
// ---> handleProbeMuxState(const std::string portName)
//
Expand Down Expand Up @@ -1337,6 +1377,30 @@ void DbInterface::processMuxStateNotifiction(std::deque<swss::KeyOpFieldsValuesT
}
}

void DbInterface::handleLinkProberSessionStateNotification(swss::NotificationConsumer &linkProberNotificationChannel)
{
std::deque<swss::KeyOpFieldsValuesTuple> entries;

linkProberNotificationChannel.pops(entries);
processPeerMuxNotification(entries);
}

void DbInterface::processLinkProberSessionStateNotification(std::deque<swss::KeyOpFieldsValuesTuple> &entries)
{
for (auto &entry : entries) {
std::string key = kfvKey(entry);
std::string op = kfvOp(entry);

json j = json::parse(op);
for (uint32_t i = 0; i < j.size(); ++i) {
mMuxManagerPtr->handleLinkProberSessionStateNotification(
j[i]["link_prober_session_id"],
j[i]["session_state"]
);
}
}
}

//
// ---> handleMuxStateNotifiction(swss::SubscriberStateTable &statedbPortTable);
//
Expand Down Expand Up @@ -1460,6 +1524,7 @@ void DbInterface::handleSwssNotification()
std::shared_ptr<swss::DBConnector> configDbPtr = std::make_shared<swss::DBConnector> ("CONFIG_DB", 0);
std::shared_ptr<swss::DBConnector> appDbPtr = std::make_shared<swss::DBConnector> ("APPL_DB", 0);
std::shared_ptr<swss::DBConnector> stateDbPtr = std::make_shared<swss::DBConnector> ("STATE_DB", 0);
std::shared_ptr<swss::DBConnector> asicDbPtr = std::make_shared<swss::DBConnector> ("ASIC_DB", 0);

// For reading Link Prober configurations from the MUX linkmgr table name
swss::SubscriberStateTable configDbMuxLinkmgrTable(configDbPtr.get(), CFG_MUX_LINKMGR_TABLE_NAME);
Expand All @@ -1482,6 +1547,8 @@ void DbInterface::handleSwssNotification()
swss::SubscriberStateTable stateDbMuxInfoTable(stateDbPtr.get(), MUX_CABLE_INFO_TABLE);
// for getting peer's admin forwarding state
swss::SubscriberStateTable stateDbPeerMuxTable(stateDbPtr.get(), STATE_PEER_HW_FORWARDING_STATE_TABLE_NAME);
// for getting simulated link prober notifications
swss::NotificationConsumer linkProberNotificationChannel(asicDbPtr.get(), LINK_PROBER_SESSION_STATE_CHANGE_NOTIFICATION_CHANNEL);

getTorMacAddress(configDbPtr);
getVlanNames(configDbPtr);
Expand Down Expand Up @@ -1510,6 +1577,9 @@ void DbInterface::handleSwssNotification()
swssSelect.addSelectable(&stateDbMuxInfoTable);
swssSelect.addSelectable(&stateDbPeerMuxTable);
swssSelect.addSelectable(&netlinkNeighbor);
if (mMuxManagerPtr->getMuxConfig().getIfEnableSimulateLfdOffload()) {
swssSelect.addSelectable(&linkProberNotificationChannel);
}

while (mPollSwssNotifcation) {
swss::Selectable *selectable;
Expand Down Expand Up @@ -1545,6 +1615,8 @@ void DbInterface::handleSwssNotification()
handlePeerLinkStateNotification(stateDbMuxInfoTable);
} else if (selectable == static_cast<swss::Selectable *> (&stateDbPeerMuxTable)) {
handlePeerMuxStateNotification(stateDbPeerMuxTable);
} else if (selectable == static_cast<swss::Selectable *> (&linkProberNotificationChannel)) {
handleLinkProberSessionStateNotification(linkProberNotificationChannel);
} else if (selectable == static_cast<swss::Selectable *> (&netlinkNeighbor)) {
continue;
} else {
Expand Down
16 changes: 16 additions & 0 deletions src/DbInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
#include "swss/dbconnector.h"
#include "swss/producerstatetable.h"
#include "swss/subscriberstatetable.h"
#include "swss/notificationproducer.h"
#include "swss/notificationconsumer.h"
#include "swss/warm_restart.h"

#include "link_manager/LinkManagerStateMachineActiveStandby.h"
Expand All @@ -55,6 +57,8 @@ namespace mux
#define STATE_PEER_HW_FORWARDING_STATE_TABLE_NAME "HW_MUX_CABLE_TABLE_PEER"

#define STATE_MUX_SWITCH_CAUSE_TABLE_NAME "MUX_SWITCH_CAUSE"
#define LINK_PROBER_SESSION_STATE_CHANGE_NOTIFICATION_CHANNEL "LINK_PROBER_NOTIFICATIONS"
#define LINK_PROBER_SESSION_STATE_CHANGE_NOTIFICATION "link_prober_session_state_change"

class MuxManager;
using ServerIpPortMap = std::map<boost::asio::ip::address, std::string>;
Expand Down Expand Up @@ -166,6 +170,8 @@ class DbInterface
*/
void setPeerMuxState(const std::string &portName, mux_state::MuxState::Label label);

void setLinkProberSessionState(const std::string portName, const std::string &sessionId, link_prober::LinkProberState::Label label);

/**
*@method probeMuxState
*
Expand Down Expand Up @@ -414,6 +420,8 @@ class DbInterface
*/
virtual void handleSetPeerMuxState(const std::string portName, mux_state::MuxState::Label label);

void handleSetLinkProberSessionState(const std::string &sessionId, link_prober::LinkProberState::Label label);

/**
*@method handleProbeMuxState
*
Expand Down Expand Up @@ -807,6 +815,10 @@ class DbInterface
*/
void handleMuxStateNotifiction(swss::SubscriberStateTable &statedbPortTable);

inline void processLinkProberSessionStateNotification(std::deque<swss::KeyOpFieldsValuesTuple> &entries);

void handleLinkProberSessionStateNotification(swss::NotificationConsumer &linkProberNotificationChannel);

/**
*@method handleSwssNotification
*
Expand Down Expand Up @@ -858,6 +870,7 @@ class DbInterface

private:
static std::vector<std::string> mMuxState;
static std::vector<std::string> mLinkProberState;
static std::vector<std::string> mMuxLinkmgrState;
static std::vector<std::string> mMuxMetrics;
static std::vector<std::string> mLinkProbeMetrics;
Expand All @@ -869,6 +882,7 @@ class DbInterface

std::shared_ptr<swss::DBConnector> mAppDbPtr;
std::shared_ptr<swss::DBConnector> mStateDbPtr;
std::shared_ptr<swss::DBConnector> mAsicDbPtr;
std::shared_ptr<swss::Table> mMuxStateTablePtr;

// for communicating with orchagent
Expand All @@ -888,6 +902,8 @@ class DbInterface
// for writing mux switch reason to state db
std::shared_ptr<swss::Table> mStateDbSwitchCauseTablePtr;

std::shared_ptr<swss::NotificationProducer> mAsicDbNotificationChannelPtr;

std::shared_ptr<boost::thread> mSwssThreadPtr;

boost::barrier mBarrier;
Expand Down
7 changes: 6 additions & 1 deletion src/LinkMgrdMain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ int main(int argc, const char* argv[])
bool measureSwitchover = false;
bool defaultRoute = false;
bool linkToSwssLogger = false;
bool simulateLfdOffload = false;

program_options::options_description description("linkmgrd options");
description.add_options()
Expand All @@ -95,6 +96,10 @@ int main(int argc, const char* argv[])
"Link to swss logger instead of using native boost syslog support, this will"
"set the boost logging level to TRACE and option verbosity is ignored"
)
("simulate_lfd_offload,s",
program_options::bool_switch(&simulateLfdOffload)->default_value(false),
"Simulate LFD offload by posting link prober state change notification to Redis"
)
;

//
Expand Down Expand Up @@ -139,7 +144,7 @@ int main(int argc, const char* argv[])
}

std::shared_ptr<mux::MuxManager> muxManagerPtr = std::make_shared<mux::MuxManager> ();
muxManagerPtr->initialize(measureSwitchover, defaultRoute);
muxManagerPtr->initialize(measureSwitchover, defaultRoute, simulateLfdOffload);
muxManagerPtr->run();
muxManagerPtr->deinitialize();
}
Expand Down
29 changes: 28 additions & 1 deletion src/MuxManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ void MuxManager::setUseWellKnownMacActiveActive(bool useWellKnownMac)
//
// initialize MuxManager class and creates DbInterface instance that reads/listen from/to Redis db
//
void MuxManager::initialize(bool enable_feature_measurement, bool enable_feature_default_route)
void MuxManager::initialize(bool enable_feature_measurement, bool enable_feature_default_route, bool enable_simulate_lfd_offload)
{
for (uint8_t i = 0; (mMuxConfig.getNumberOfThreads() > 2) &&
(i < mMuxConfig.getNumberOfThreads() - 2); i++) {
Expand All @@ -94,6 +94,7 @@ void MuxManager::initialize(bool enable_feature_measurement, bool enable_feature

mMuxConfig.enableSwitchoverMeasurement(enable_feature_measurement);
mMuxConfig.enableDefaultRouteFeature(enable_feature_default_route);
mMuxConfig.enableSimulateLfdOffload(enable_simulate_lfd_offload);
}

//
Expand Down Expand Up @@ -445,6 +446,9 @@ std::shared_ptr<MuxPort> MuxManager::getMuxPortPtrOrThrow(const std::string &por
muxPortPtr->setServerMacAddress(address);
}
}
if (mMuxConfig.getIfEnableSimulateLfdOffload()) {
// register the dispatch table
}
mPortMap.insert({portName, muxPortPtr});
}
else {
Expand Down Expand Up @@ -591,4 +595,27 @@ void MuxManager::handleTsaEnableNotification(bool enable)
}
}

inline std::shared_ptr<MuxPort> MuxManager::getMuxPortPtr(const std::string &sessionId)
{
auto pos = sessionId.find('|');
if (pos != std::string::npos) {
std::string portName = sessionId.substr(0, pos);

PortMapIterator portMapIterator = mPortMap.find(portName);
if (portMapIterator != mPortMap.end()) {
return portMapIterator->second;
}
}
return nullptr;
}


void MuxManager::handleLinkProberSessionStateNotification(const std::string &sessionId, const std::string &sessionState)
{
auto muxPortPtr = getMuxPortPtr(sessionId);
if (muxPortPtr) {
muxPortPtr->handleLinkProberSessionStateNotification(sessionId, sessionState);
}
}

} /* namespace mux */
8 changes: 7 additions & 1 deletion src/MuxManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ class MuxManager
*
* @return none
*/
void initialize(bool enable_feature_measurement, bool enable_feature_default_route);
void initialize(bool enable_feature_measurement, bool enable_feature_default_route, bool enable_simulate_lfd_offload);

/**
*@method deinitialize
Expand Down Expand Up @@ -473,6 +473,10 @@ class MuxManager
*/
void handleTsaEnableNotification(bool enable);

void handleLinkProberSessionStateNotification(const std::string &sessionId, const std::string &sessionState);

common::MuxConfig &getMuxConfig() {return mMuxConfig;};

private:
/**
*@method getMuxPortCableType
Expand All @@ -485,6 +489,8 @@ class MuxManager
*/
inline common::MuxPortConfig::PortCableType getMuxPortCableType(const std::string &portName);

inline std::shared_ptr<MuxPort> getMuxPortPtr(const std::string &sessionId);

/**
*@method getMuxPortPtrOrThrow
*
Expand Down
Loading

0 comments on commit ea58665

Please sign in to comment.