Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 38 additions & 19 deletions src/plugins/libfabric/libfabric_backend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include "libfabric_backend.h"
#include "serdes/serdes.h"
#include "common/nixl_log.h"
#include "libfabric/libfabric_topology.h"

#include <limits>
#include <cstring>
Expand Down Expand Up @@ -482,7 +481,7 @@ nixlLibfabricEngine::connect(const std::string &remote_agent) {
auto it = connections_.find(remote_agent);
if (it != connections_.end() && it->second->overall_state_ == ConnectionState::CONNECTED) {
NIXL_DEBUG << "Connection already established for " << remote_agent
<< ", fi_addr: " << it->second->rail_remote_addr_list_[0];
<< ", fi_addr: " << it->second->rail_remote_addr_list_[0][0];
return NIXL_SUCCESS;
}

Expand Down Expand Up @@ -520,7 +519,7 @@ nixlLibfabricEngine::disconnect(const std::string &remote_agent) {
// Connection exists - check if already disconnected
if (it->second->overall_state_ == ConnectionState::DISCONNECTED) {
NIXL_DEBUG << "Connection already established for " << remote_agent
<< ", fi_addr: " << it->second->rail_remote_addr_list_[0];
<< ", fi_addr: " << it->second->rail_remote_addr_list_[0][0];
return NIXL_SUCCESS;
}
// TODO: Implement disconnect logic to cleanup the AV Address Entries from both local and remote
Expand All @@ -543,11 +542,9 @@ nixlLibfabricEngine::createAgentConnection(

NIXL_DEBUG << "Creating connection for agent: " << agent_name;

// Validate input parameters
if (data_rail_endpoints.size() != rail_manager.getNumDataRails()) {
NIXL_ERROR << "Expected " << rail_manager.getNumDataRails() << " data rail endpoints, got "
<< data_rail_endpoints.size();
return NIXL_ERR_INVALID_PARAM;
NIXL_INFO << "Local " << rail_manager.getNumDataRails() << " data rail endpoints, remote "
<< data_rail_endpoints.size();
}

if (control_rail_endpoints.size() != rail_manager.getNumControlRails()) {
Expand Down Expand Up @@ -681,7 +678,7 @@ nixlLibfabricEngine::establishConnection(const std::string &remote_agent) const
nixl_status_t status = rail_manager.postControlMessage(
nixlLibfabricRailManager::ControlMessageType::CONNECTION_REQ,
control_request,
conn_info->control_rail_remote_addr_list_[0], // Always use control rail 0
conn_info->control_rail_remote_addr_list_[0][0], // Always use control rail 0
it->second->agent_index_ // agent_index is only used in the ACK back from remote,
// to match connection request
);
Expand Down Expand Up @@ -863,7 +860,10 @@ nixlLibfabricEngine::loadRemoteMD(const nixlBlobDesc &input,
std::vector<uint64_t> remote_keys;
uint64_t remote_addr;
nixl_status_t status =
rail_manager.deserializeMemoryKeys(input.metaInfo, remote_keys, remote_addr);
rail_manager.deserializeMemoryKeys(input.metaInfo,
conn_it->second->rail_remote_addr_list_.at(0).size(),
remote_keys,
remote_addr);
if (status != NIXL_SUCCESS) {
NIXL_ERROR << "Rail Manager deserializeMemoryKeys failed";
return status;
Expand All @@ -873,11 +873,12 @@ nixlLibfabricEngine::loadRemoteMD(const nixlBlobDesc &input,
auto pub_md = std::make_unique<nixlLibfabricPublicMetadata>();
pub_md->conn_ = conn_it->second;
pub_md->rail_remote_key_list_ = std::move(remote_keys);
pub_md->derive_remote_selected_endpoints();
pub_md->remote_buf_addr_ = remote_addr;
NIXL_DEBUG << "Remote metadata loaded with"
<< " Remote addr: " << (void *)pub_md->remote_buf_addr_ << " Remote keys for "
<< pub_md->rail_remote_key_list_.size() << " rails"
<< " Remote fi_addr: " << pub_md->conn_->rail_remote_addr_list_[0];
<< " Remote fi_addr: " << pub_md->conn_->rail_remote_addr_list_[0][0];

output = pub_md.release();
return NIXL_SUCCESS;
Expand All @@ -889,6 +890,23 @@ nixlLibfabricEngine::unloadMD(nixlBackendMD *input) {
return NIXL_SUCCESS;
}

/****************************************
* Public Metadata Methods
*****************************************/

void
nixlLibfabricPublicMetadata::derive_remote_selected_endpoints() {
remote_selected_endpoints_.clear();

for (size_t i = 0; i < rail_remote_key_list_.size(); ++i) {
if (rail_remote_key_list_[i] != 0) {
remote_selected_endpoints_.push_back(i);
} else {
NIXL_DEBUG << "Skipping remote endpoint " << i << " with key 0";
}
}
}

/****************************************
* Data movement
*****************************************/
Expand Down Expand Up @@ -1061,6 +1079,7 @@ nixlLibfabricEngine::postXfer(const nixl_xfer_op_t &operation,
local_md->selected_rails_,
local_md->rail_mr_list_,
remote_md->rail_remote_key_list_,
remote_md->remote_selected_endpoints_,
conn_it->second->rail_remote_addr_list_,
conn_it->second->agent_index_,
[backend_handle]() {
Expand Down Expand Up @@ -1204,11 +1223,11 @@ nixlLibfabricEngine::notifSendPriv(const std::string &remote_agent,
NIXL_DEBUG << "Sending binary notification control request"
<< " Message: " << binary_notification.getMessage()
<< " expected_completions: " << binary_notification.expected_completions;
nixl_status_t status =
rail_manager.postControlMessage(nixlLibfabricRailManager::ControlMessageType::NOTIFICATION,
control_request,
connection->control_rail_remote_addr_list_[control_rail_id],
connection->agent_index_);
nixl_status_t status = rail_manager.postControlMessage(
nixlLibfabricRailManager::ControlMessageType::NOTIFICATION,
control_request,
connection->control_rail_remote_addr_list_[control_rail_id][0],
connection->agent_index_);

if (status != NIXL_SUCCESS) {
NIXL_ERROR << "postControlMessage failed on control rail " << control_rail_id;
Expand Down Expand Up @@ -1344,7 +1363,7 @@ nixlLibfabricEngine::postShutdownCompletion() {
nixl_status_t status = rail_manager.postControlMessage(
nixlLibfabricRailManager::ControlMessageType::DISCONNECT_REQ,
control_request,
self_conn_it->second->rail_remote_addr_list_[rail_id],
self_conn_it->second->rail_remote_addr_list_[rail_id][0],
self_conn_it->second->agent_index_);

if (status == NIXL_SUCCESS) {
Expand Down Expand Up @@ -1461,7 +1480,7 @@ nixlLibfabricEngine::processConnectionRequest(uint16_t agent_idx,
}

// Insert ALL data rail addresses at once
std::vector<fi_addr_t> data_fi_addrs;
std::unordered_map<size_t, std::vector<fi_addr_t>> data_fi_addrs;
std::vector<char *> data_ep_names;
status = rail_manager.insertAllAddresses(
nixlLibfabricRailManager::RailType::DATA, data_endpoints, data_fi_addrs, data_ep_names);
Expand All @@ -1471,7 +1490,7 @@ nixlLibfabricEngine::processConnectionRequest(uint16_t agent_idx,
}

// Insert ALL control rail addresses at once
std::vector<fi_addr_t> control_fi_addrs;
std::unordered_map<size_t, std::vector<fi_addr_t>> control_fi_addrs;
std::vector<char *> control_ep_names;
status = rail_manager.insertAllAddresses(nixlLibfabricRailManager::RailType::CONTROL,
control_endpoints,
Expand All @@ -1483,7 +1502,7 @@ nixlLibfabricEngine::processConnectionRequest(uint16_t agent_idx,
}

// Use the first control rail's fi_addr for ACK (same as before)
fi_addr_t initiator_control_fi_addr = control_fi_addrs[0];
fi_addr_t initiator_control_fi_addr = control_fi_addrs[0][0];

NIXL_DEBUG << "Successfully inserted addresses for " << data_fi_addrs.size()
<< " data rails and " << control_fi_addrs.size() << " control rails"
Expand Down
12 changes: 10 additions & 2 deletions src/plugins/libfabric/libfabric_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,15 @@ class nixlLibfabricPublicMetadata : public nixlBackendMD {
std::shared_ptr<nixlLibfabricConnection> conn_; // Connection to remote agent
std::vector<uint64_t> rail_remote_key_list_; // Remote access keys, one per rail
std::vector<char *> src_ep_names_; // Source endpoint names, one per rail
std::vector<size_t>
remote_selected_endpoints_; // Remote rails selected, derived from rail_remote_key_list_.

public:
nixlLibfabricPublicMetadata() : nixlBackendMD(false) {}

void
derive_remote_selected_endpoints();

friend class nixlLibfabricEngine;
};

Expand All @@ -107,8 +113,10 @@ class nixlLibfabricConnection : public nixlBackendConnMD {
private:
size_t agent_index_; // Unique agent identifier in agent_names vector
std::string remoteAgent_; // Remote agent name
std::vector<fi_addr_t> rail_remote_addr_list_; // Data rail libfabric addresses
std::vector<fi_addr_t> control_rail_remote_addr_list_; // Control rail libfabric addresses
std::unordered_map<size_t, std::vector<fi_addr_t>>
rail_remote_addr_list_; // Data rail libfabric addresses. Key: data rail id.
std::unordered_map<size_t, std::vector<fi_addr_t>>
control_rail_remote_addr_list_; // Control rail libfabric addresses. Key: control rail id.
std::vector<char *> src_ep_names_; // Data rail endpoint names
std::vector<char *> control_ep_names_; // Control rail endpoint names
ConnectionState overall_state_; // Current connection state
Expand Down
Loading