Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[TransferEngine] change: auto discover topology & install transport. #73

Open
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

doujiang24
Copy link
Contributor

  1. allow TransferEngine(false) to disable auto discover, for internal testing.
  2. users do not need to install transport manually.
  3. will update kvcache-ai/vllm after this PR merged, since VLLMAdaptor is changed.

fix #69

@alogfans
Copy link
Collaborator

I modified the example code to support auto discovery, and fix bugs in the previous submit. Do apply the following patch.

In addition, may be we need a special storage type name such as "*" in addition to "cpu:0", which includes all detected devices.

diff --git a/doc/zh/run-examples.md b/doc/zh/run-examples.md
index 639df39..51bbf96 100644
--- a/doc/zh/run-examples.md
+++ b/doc/zh/run-examples.md
@@ -106,6 +106,14 @@ Mooncake 支持在执行 `cmake` 命令期间添加下列高级编译选项:
       > 提示:高级用户还可通过 `--nic_priority_matrix` 传入网卡优先级矩阵 JSON 文件,详细参考 Transfer Engine 的开发者手册。
    - 在仅支持 TCP 的网络环境中,可使用 `--protocol=tcp` 参数,此时不需要指定 `--device_name` 参数。
 
+   用户也可通过拓扑自动发现功能基于操作系统配置自动生成网卡优先级矩阵,此时不需要指定传输过程使用的 RDMA 网卡名称。
+   ```
+   ./transfer_engine_bench --mode=target \
+                           --metadata_server=10.0.0.1:2379 \
+                           --local_server_name=10.0.0.2:12345 \
+                           --auto_discovery
+   ```
+
 1. **启动发起节点。**
     ```bash
     # This is 10.0.0.3
diff --git a/mooncake-transfer-engine/example/transfer_engine_bench.cpp b/mooncake-transfer-engine/example/transfer_engine_bench.cpp
index 347276f..fb7c8bc 100644
--- a/mooncake-transfer-engine/example/transfer_engine_bench.cpp
+++ b/mooncake-transfer-engine/example/transfer_engine_bench.cpp
@@ -41,7 +41,8 @@ static void checkCudaError(cudaError_t result, const char *message) {
 
 #endif
 
-#define NR_SOCKETS (2)
+const static int NR_SOCKETS =
+    numa_available() ? numa_num_configured_nodes() : 1;
 
 static std::string getHostname();
 
@@ -66,6 +67,7 @@ DEFINE_int32(batch_size, 128, "Batch size");
 DEFINE_uint64(block_size, 4096, "Block size for each transfer request");
 DEFINE_int32(duration, 10, "Test duration in seconds");
 DEFINE_int32(threads, 4, "Task submission threads");
+DEFINE_bool(auto_discovery, false, "Enable auto discovery");
 
 #ifdef USE_CUDA
 DEFINE_bool(use_vram, true, "Allocate memory from GPU VRAM");
@@ -228,27 +230,28 @@ std::string loadNicPriorityMatrix() {
 
 int initiator() {
     // disable topology auto discovery for testing.
-    auto engine = std::make_unique<TransferEngine>(false);
+    auto engine = std::make_unique<TransferEngine>(FLAGS_auto_discovery);
 
     auto hostname_port = parseHostNameWithPort(FLAGS_local_server_name);
     engine->init(FLAGS_metadata_server, FLAGS_local_server_name.c_str(),
                  hostname_port.first.c_str(), hostname_port.second);
 
-    Transport *xport = nullptr;
-    if (FLAGS_protocol == "rdma") {
-        auto nic_priority_matrix = loadNicPriorityMatrix();
-        void **args = (void **)malloc(2 * sizeof(void *));
-        args[0] = (void *)nic_priority_matrix.c_str();
-        args[1] = nullptr;
-        xport = engine->installTransport("rdma", args);
-    } else if (FLAGS_protocol == "tcp") {
-        xport = engine->installTransport("tcp", nullptr);
-    } else {
-        LOG(ERROR) << "Unsupported protocol";
+    if (!FLAGS_auto_discovery) {
+        Transport *xport = nullptr;
+        if (FLAGS_protocol == "rdma") {
+            auto nic_priority_matrix = loadNicPriorityMatrix();
+            void **args = (void **)malloc(2 * sizeof(void *));
+            args[0] = (void *)nic_priority_matrix.c_str();
+            args[1] = nullptr;
+            xport = engine->installTransport("rdma", args);
+        } else if (FLAGS_protocol == "tcp") {
+            xport = engine->installTransport("tcp", nullptr);
+        } else {
+            LOG(ERROR) << "Unsupported protocol";
+        }
+        LOG_ASSERT(xport);
     }
 
-    LOG_ASSERT(xport);
-
     void *addr[NR_SOCKETS] = {nullptr};
     int buffer_num = NR_SOCKETS;
 
@@ -308,22 +311,24 @@ int initiator() {
 
 int target() {
     // disable topology auto discovery for testing.
-    auto engine = std::make_unique<TransferEngine>(false);
+    auto engine = std::make_unique<TransferEngine>(FLAGS_auto_discovery);
 
     auto hostname_port = parseHostNameWithPort(FLAGS_local_server_name);
     engine->init(FLAGS_metadata_server, FLAGS_local_server_name.c_str(),
                  hostname_port.first.c_str(), hostname_port.second);
 
-    if (FLAGS_protocol == "rdma") {
-        auto nic_priority_matrix = loadNicPriorityMatrix();
-        void **args = (void **)malloc(2 * sizeof(void *));
-        args[0] = (void *)nic_priority_matrix.c_str();
-        args[1] = nullptr;
-        engine->installTransport("rdma", args);
-    } else if (FLAGS_protocol == "tcp") {
-        engine->installTransport("tcp", nullptr);
-    } else {
-        LOG(ERROR) << "Unsupported protocol";
+    if (!FLAGS_auto_discovery) {
+        if (FLAGS_protocol == "rdma") {
+            auto nic_priority_matrix = loadNicPriorityMatrix();
+            void **args = (void **)malloc(2 * sizeof(void *));
+            args[0] = (void *)nic_priority_matrix.c_str();
+            args[1] = nullptr;
+            engine->installTransport("rdma", args);
+        } else if (FLAGS_protocol == "tcp") {
+            engine->installTransport("tcp", nullptr);
+        } else {
+            LOG(ERROR) << "Unsupported protocol";
+        }
     }
 
     void *addr[NR_SOCKETS] = {nullptr};
diff --git a/mooncake-transfer-engine/src/transfer_engine.cpp b/mooncake-transfer-engine/src/transfer_engine.cpp
index 4a4d2de..69046f1 100644
--- a/mooncake-transfer-engine/src/transfer_engine.cpp
+++ b/mooncake-transfer-engine/src/transfer_engine.cpp
@@ -27,6 +27,12 @@ int TransferEngine::init(const std::string &metadata_conn_string,
     multi_transports_ =
         std::make_shared<MultiTransport>(metadata_, local_server_name_);
 
+    TransferMetadata::RpcMetaDesc desc;
+    desc.ip_or_host_name = ip_or_host_name;
+    desc.rpc_port = rpc_port;
+    int ret = metadata_->addRpcMetaEntry(local_server_name_, desc);
+    if (ret) return ret;
+
     if (auto_discover_) {
         // discover topology automatically
         local_topology_->discover();
@@ -40,10 +46,7 @@ int TransferEngine::init(const std::string &metadata_conn_string,
         // TODO: install other transports automatically
     }
 
-    TransferMetadata::RpcMetaDesc desc;
-    desc.ip_or_host_name = ip_or_host_name;
-    desc.rpc_port = rpc_port;
-    return metadata_->addRpcMetaEntry(local_server_name_, desc);
+    return 0;
 }
 
 int TransferEngine::freeEngine() {

@alogfans
Copy link
Collaborator

alogfans commented Jan 15, 2025

LGTM. Wait for @ShangmingCai checking vLLM integration.

@ShangmingCai ShangmingCai self-requested a review January 15, 2025 03:40
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

proposal: topology discover & install transport automatically
2 participants