Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 134 additions & 29 deletions cpp/src/centrality/betweenness_centrality_impl.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,16 @@
#include <cugraph/algorithms.hpp>
#include <cugraph/detail/utility_wrappers.hpp>
#include <cugraph/edge_src_dst_property.hpp>
#include <cugraph/utilities/dataframe_buffer.hpp>
#include <cugraph/utilities/error.hpp>
#include <cugraph/utilities/host_scalar_comm.hpp>
#include <cugraph/vertex_partition_device_view.cuh>

#include <raft/core/handle.hpp>

#include <rmm/mr/device/cuda_memory_resource.hpp>
#include <rmm/mr/device/per_device_resource.hpp>

#include <cub/cub.cuh>
#include <cuda/atomic>
#include <cuda/std/iterator>
Expand All @@ -50,13 +55,57 @@
#include <thrust/sort.h>
#include <thrust/transform.h>

#include <cuda_runtime_api.h>

#include <algorithm>
#include <cstdio>
#include <cstdlib>
#include <numeric>
#include <stdexcept>
#include <string>
#include <vector>

//
// The formula for BC(v) is the sum over all (s,t) where s != v != t of
// sigma_st(v) / sigma_st. Sigma_st(v) is the number of shortest paths
// that pass through vertex v, whereas sigma_st is the total number of shortest
// paths.
namespace {

// Memory measurement utilities
struct MemoryInfo {
size_t free_memory;
size_t used_memory;
size_t total_memory;

static MemoryInfo get_device_memory()
{
size_t free, total;
cudaMemGetInfo(&free, &total);
return {free, total - free, total};
}

void print(const char* label) const
{
double memory_usage = static_cast<double>(used_memory) / total_memory;
printf("[Memory] %s: Free: %.1fGB, Used: %.1fGB, Total: %.1fGB (%.1f%%)\n",
label,
free_memory / (1024.0 * 1024.0 * 1024.0),
used_memory / (1024.0 * 1024.0 * 1024.0),
total_memory / (1024.0 * 1024.0 * 1024.0),
memory_usage * 100.0);
}
};

// Memory cleanup utilities
struct MemoryCleanup {
static void cleanup_batch_memory(raft::handle_t const& handle)
{
handle.sync_stream();
cudaDeviceSynchronize();
}
};

template <typename vertex_t>
struct brandes_e_op_t {
template <typename value_t, typename ignore_t>
Expand Down Expand Up @@ -1184,6 +1233,30 @@ rmm::device_uvector<weight_t> betweenness_centrality(
my_rank = handle.get_comms().get_rank();
}

// Check environment variables for batch mode configuration
std::string batch_mode = std::getenv("BATCH_MODE") ? std::getenv("BATCH_MODE") : "dynamic";
size_t fixed_batch_size =
std::getenv("FIXED_BATCH_SIZE") ? std::stoul(std::getenv("FIXED_BATCH_SIZE")) : 1000;

// Initialize memory management
MemoryInfo initial_mem = MemoryInfo::get_device_memory();

// Initialize batch size variables based on mode
size_t min_batch_size = 10;
size_t max_batch_size = 10000;
size_t current_batch_size = 500; // Start with reasonable size
size_t optimal_batch_size = 0; // Will store the best size found

if (batch_mode == "fixed") {
// Fixed mode: use the specified batch size
current_batch_size = fixed_batch_size;
optimal_batch_size = fixed_batch_size;
}
// Dynamic mode: use binary search to find optimal size

size_t processed_sources = 0;
size_t batch_number = 0;

if constexpr (multi_gpu) {
// Multi-GPU: Use sequential version
for (size_t source_idx = 0; source_idx < num_sources; ++source_idx) {
Expand Down Expand Up @@ -1214,41 +1287,73 @@ rmm::device_uvector<weight_t> betweenness_centrality(
do_expensive_check);
}
} else {
// Single-GPU: Use parallel version
// Process sources in batches to respect origin_t (uint16_t) limit
constexpr size_t max_sources_per_batch = std::numeric_limits<uint16_t>::max();

size_t num_sources = cuda::std::distance(vertices_begin, vertices_end);
size_t num_batches = (num_sources + max_sources_per_batch - 1) / max_sources_per_batch;

for (size_t batch_idx = 0; batch_idx < num_batches; ++batch_idx) {
size_t batch_start = batch_idx * max_sources_per_batch;
size_t batch_end = std::min(batch_start + max_sources_per_batch, num_sources);
// Single-GPU: Use adaptive batching
while (processed_sources < num_sources) {
size_t actual_batch_size = std::min(current_batch_size, num_sources - processed_sources);

bool batch_success = true;

try {
// Create iterators for current batch
auto batch_begin = vertices_begin + processed_sources;
auto batch_end = vertices_begin + processed_sources + actual_batch_size;

auto [distances_2d, sigmas_2d] = detail::multisource_bfs(
handle, graph_view, edge_weight_view, batch_begin, batch_end, do_expensive_check);

detail::multisource_backward_pass(
handle,
graph_view,
edge_weight_view,
raft::device_span<weight_t>{centralities.data(), centralities.size()},
std::move(distances_2d),
std::move(sigmas_2d),
batch_begin,
batch_end,
include_endpoints,
do_expensive_check);

// RAII objects (distances_2d, sigmas_2d) go out of scope here and are automatically freed
// Now run explicit cleanup to ensure any remaining memory is freed
MemoryCleanup::cleanup_batch_memory(handle);

processed_sources += actual_batch_size;
batch_success = true;
batch_number++; // Only increment on success

} catch (const std::exception& e) {
batch_success = false;
}

auto batch_vertices_begin = vertices_begin + batch_start;
auto batch_vertices_end = vertices_begin + batch_end;
// Record batch result and update binary search (only in dynamic mode)
if (batch_mode == "dynamic") {
if (batch_success) {
// This batch size worked, try larger
optimal_batch_size = current_batch_size; // Remember this working size
min_batch_size = current_batch_size + 1; // Search in upper half

auto [distances_2d, sigmas_2d] = detail::multisource_bfs(handle,
graph_view,
edge_weight_view,
batch_vertices_begin,
batch_vertices_end,
do_expensive_check);
if (min_batch_size <= max_batch_size) {
current_batch_size = (min_batch_size + max_batch_size) / 2; // Binary search
} else {
current_batch_size = optimal_batch_size; // Use the best size found
}
} else {
// This batch size failed, try smaller
max_batch_size = current_batch_size - 1; // Search in lower half

detail::multisource_backward_pass(
handle,
graph_view,
edge_weight_view,
raft::device_span<weight_t>{centralities.data(), centralities.size()},
std::move(distances_2d),
std::move(sigmas_2d),
batch_vertices_begin,
batch_vertices_end,
include_endpoints,
do_expensive_check);
if (min_batch_size <= max_batch_size) {
current_batch_size = (min_batch_size + max_batch_size) / 2; // Binary search
} else {
current_batch_size = optimal_batch_size; // Use the best size found
}
}
}
}
}

// Final memory measurement
MemoryInfo final_mem = MemoryInfo::get_device_memory();

std::optional<weight_t> scale_nonsource{std::nullopt};
std::optional<weight_t> scale_source{std::nullopt};

Expand Down