diff --git a/cpp/src/centrality/betweenness_centrality_impl.cuh b/cpp/src/centrality/betweenness_centrality_impl.cuh index 0e82b8256a5..d3d5ad04547 100644 --- a/cpp/src/centrality/betweenness_centrality_impl.cuh +++ b/cpp/src/centrality/betweenness_centrality_impl.cuh @@ -32,11 +32,16 @@ #include #include #include +#include #include +#include #include #include +#include +#include + #include #include #include @@ -50,6 +55,16 @@ #include #include +#include + +#include +#include +#include +#include +#include +#include +#include + // // The formula for BC(v) is the sum over all (s,t) where s != v != t of // sigma_st(v) / sigma_st. Sigma_st(v) is the number of shortest paths @@ -57,6 +72,40 @@ // paths. namespace { +// Memory measurement utilities +struct MemoryInfo { + size_t free_memory; + size_t used_memory; + size_t total_memory; + + static MemoryInfo get_device_memory() + { + size_t free, total; + cudaMemGetInfo(&free, &total); + return {free, total - free, total}; + } + + void print(const char* label) const + { + double memory_usage = static_cast(used_memory) / total_memory; + printf("[Memory] %s: Free: %.1fGB, Used: %.1fGB, Total: %.1fGB (%.1f%%)\n", + label, + free_memory / (1024.0 * 1024.0 * 1024.0), + used_memory / (1024.0 * 1024.0 * 1024.0), + total_memory / (1024.0 * 1024.0 * 1024.0), + memory_usage * 100.0); + } +}; + +// Memory cleanup utilities +struct MemoryCleanup { + static void cleanup_batch_memory(raft::handle_t const& handle) + { + handle.sync_stream(); + cudaDeviceSynchronize(); + } +}; + template struct brandes_e_op_t { template @@ -1184,6 +1233,30 @@ rmm::device_uvector betweenness_centrality( my_rank = handle.get_comms().get_rank(); } + // Check environment variables for batch mode configuration + std::string batch_mode = std::getenv("BATCH_MODE") ? std::getenv("BATCH_MODE") : "dynamic"; + size_t fixed_batch_size = + std::getenv("FIXED_BATCH_SIZE") ? std::stoul(std::getenv("FIXED_BATCH_SIZE")) : 1000; + + // Initialize memory management + MemoryInfo initial_mem = MemoryInfo::get_device_memory(); + + // Initialize batch size variables based on mode + size_t min_batch_size = 10; + size_t max_batch_size = 10000; + size_t current_batch_size = 500; // Start with reasonable size + size_t optimal_batch_size = 0; // Will store the best size found + + if (batch_mode == "fixed") { + // Fixed mode: use the specified batch size + current_batch_size = fixed_batch_size; + optimal_batch_size = fixed_batch_size; + } + // Dynamic mode: use binary search to find optimal size + + size_t processed_sources = 0; + size_t batch_number = 0; + if constexpr (multi_gpu) { // Multi-GPU: Use sequential version for (size_t source_idx = 0; source_idx < num_sources; ++source_idx) { @@ -1214,41 +1287,73 @@ rmm::device_uvector betweenness_centrality( do_expensive_check); } } else { - // Single-GPU: Use parallel version - // Process sources in batches to respect origin_t (uint16_t) limit - constexpr size_t max_sources_per_batch = std::numeric_limits::max(); - - size_t num_sources = cuda::std::distance(vertices_begin, vertices_end); - size_t num_batches = (num_sources + max_sources_per_batch - 1) / max_sources_per_batch; - - for (size_t batch_idx = 0; batch_idx < num_batches; ++batch_idx) { - size_t batch_start = batch_idx * max_sources_per_batch; - size_t batch_end = std::min(batch_start + max_sources_per_batch, num_sources); + // Single-GPU: Use adaptive batching + while (processed_sources < num_sources) { + size_t actual_batch_size = std::min(current_batch_size, num_sources - processed_sources); + + bool batch_success = true; + + try { + // Create iterators for current batch + auto batch_begin = vertices_begin + processed_sources; + auto batch_end = vertices_begin + processed_sources + actual_batch_size; + + auto [distances_2d, sigmas_2d] = detail::multisource_bfs( + handle, graph_view, edge_weight_view, batch_begin, batch_end, do_expensive_check); + + detail::multisource_backward_pass( + handle, + graph_view, + edge_weight_view, + raft::device_span{centralities.data(), centralities.size()}, + std::move(distances_2d), + std::move(sigmas_2d), + batch_begin, + batch_end, + include_endpoints, + do_expensive_check); + + // RAII objects (distances_2d, sigmas_2d) go out of scope here and are automatically freed + // Now run explicit cleanup to ensure any remaining memory is freed + MemoryCleanup::cleanup_batch_memory(handle); + + processed_sources += actual_batch_size; + batch_success = true; + batch_number++; // Only increment on success + + } catch (const std::exception& e) { + batch_success = false; + } - auto batch_vertices_begin = vertices_begin + batch_start; - auto batch_vertices_end = vertices_begin + batch_end; + // Record batch result and update binary search (only in dynamic mode) + if (batch_mode == "dynamic") { + if (batch_success) { + // This batch size worked, try larger + optimal_batch_size = current_batch_size; // Remember this working size + min_batch_size = current_batch_size + 1; // Search in upper half - auto [distances_2d, sigmas_2d] = detail::multisource_bfs(handle, - graph_view, - edge_weight_view, - batch_vertices_begin, - batch_vertices_end, - do_expensive_check); + if (min_batch_size <= max_batch_size) { + current_batch_size = (min_batch_size + max_batch_size) / 2; // Binary search + } else { + current_batch_size = optimal_batch_size; // Use the best size found + } + } else { + // This batch size failed, try smaller + max_batch_size = current_batch_size - 1; // Search in lower half - detail::multisource_backward_pass( - handle, - graph_view, - edge_weight_view, - raft::device_span{centralities.data(), centralities.size()}, - std::move(distances_2d), - std::move(sigmas_2d), - batch_vertices_begin, - batch_vertices_end, - include_endpoints, - do_expensive_check); + if (min_batch_size <= max_batch_size) { + current_batch_size = (min_batch_size + max_batch_size) / 2; // Binary search + } else { + current_batch_size = optimal_batch_size; // Use the best size found + } + } + } } } + // Final memory measurement + MemoryInfo final_mem = MemoryInfo::get_device_memory(); + std::optional scale_nonsource{std::nullopt}; std::optional scale_source{std::nullopt};