-
Notifications
You must be signed in to change notification settings - Fork 336
Concurrent multisource backwardpass #5206
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Concurrent multisource backwardpass #5206
Conversation
… centrality values
…to HH-betweenness-centrality
…g statements that were removed
… input array of size vertex_list and outputs array of same size vertex_list + use scatter to map array of size vertex_list back into global indices --> this fixed indexing issue
… and delta updates on frontier vertices only
… upper bound for each vertex frontier
…reduce kernel launches
…iteration of the delta update loop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please ping me in slack if you have any questions.
@@ -48,6 +49,9 @@ | |||
#include <thrust/sort.h> | |||
#include <thrust/transform.h> | |||
|
|||
// Add CUB include for better sorting performance |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this comment is an overkill.
printf("DEBUG: GPU has %d SMs, target chunk size: %zu vertices\n", | ||
handle.get_device_properties().multiProcessorCount, | ||
approx_vertices_to_sort_per_iteration); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delete this before merging the code
|
||
// Sort vertices by vertex ID within each distance level using chunked CUB segmented sort | ||
if (total_vertices > 0) { | ||
// Calculate target chunk size based on GPU hardware (like manager's code) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
manager's code? Here, you aren't writing comments for you, for others reading this code later.
Just saying
// Calculate target chunk size based on GPU hardware
will be sufficient
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are basically setting the chunk size large enough to saturate a GPU but small enough to avoid memory allocation failure.
if (total_vertices > 0) { | ||
// Calculate target chunk size based on GPU hardware (like manager's code) | ||
auto approx_vertices_to_sort_per_iteration = | ||
static_cast<size_t>(handle.get_device_properties().multiProcessorCount) * (1 << 20); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add /* tuning parameter */ after (1 << 20) to inform that this value can be modified without impacting code correctness.
if (host_distance_counts[d] > 0) { | ||
distance_buckets_vertices.emplace_back(host_distance_counts[d], handle.get_stream()); | ||
distance_buckets_sources.emplace_back(host_distance_counts[d], handle.get_stream()); | ||
} else { | ||
distance_buckets_vertices.emplace_back(0, handle.get_stream()); | ||
distance_buckets_sources.emplace_back(0, handle.get_stream()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need this if - else statement? The if and the else paths execute the same code.
printf("DEBUG: Processing chunk %zu: %zu vertices, %zu distance levels\n", | ||
chunk_i, | ||
chunk_size, | ||
num_segments_in_chunk); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Delete.
// Gather data for this chunk from original bucket arrays | ||
size_t write_offset = 0; | ||
std::vector<size_t> chunk_segment_offsets; | ||
chunk_segment_offsets.push_back(0); | ||
|
||
// Map distance level indices to actual distance values | ||
std::vector<vertex_t> distance_levels_in_chunk; | ||
for (vertex_t d = 1; d <= global_max_distance; ++d) { | ||
if (d < distance_buckets_vertices.size() && distance_buckets_vertices[d].size() > 0) { | ||
distance_levels_in_chunk.push_back(d); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code is not necessary.
// Copy data for distance levels in this chunk | ||
for (size_t level_idx = chunk_distance_start; level_idx < chunk_distance_end; ++level_idx) { | ||
vertex_t d = distance_levels_in_chunk[level_idx]; | ||
size_t level_size = distance_buckets_vertices[d].size(); | ||
|
||
// Copy vertices with int32_t conversion | ||
thrust::transform(handle.get_thrust_policy(), | ||
distance_buckets_vertices[d].begin(), | ||
distance_buckets_vertices[d].end(), | ||
chunk_vertices_int32.begin() + write_offset, | ||
[] __device__(vertex_t v) { return static_cast<int32_t>(v); }); | ||
|
||
// Copy sources | ||
thrust::copy(handle.get_thrust_policy(), | ||
distance_buckets_sources[d].begin(), | ||
distance_buckets_sources[d].end(), | ||
chunk_sources.begin() + write_offset); | ||
|
||
write_offset += level_size; | ||
chunk_segment_offsets.push_back(write_offset); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This as well.
// Copy segment offsets to device | ||
rmm::device_uvector<size_t> d_chunk_segment_offsets(chunk_segment_offsets.size(), | ||
handle.get_stream()); | ||
raft::update_device(d_chunk_segment_offsets.data(), | ||
chunk_segment_offsets.data(), | ||
chunk_segment_offsets.size(), | ||
handle.get_stream()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This too.
You can just use distance_level_offsets (just need to shift the offset values properly).
|
||
// Scatter results back to original bucket arrays | ||
size_t read_offset = 0; | ||
for (size_t level_idx = chunk_distance_start; level_idx < chunk_distance_end; | ||
++level_idx) { | ||
vertex_t d = distance_levels_in_chunk[level_idx]; | ||
size_t level_size = distance_buckets_vertices[d].size(); | ||
|
||
// Convert back to vertex_t and copy to original bucket | ||
thrust::transform(handle.get_thrust_policy(), | ||
chunk_vertices_int32.begin() + read_offset, | ||
chunk_vertices_int32.begin() + read_offset + level_size, | ||
distance_buckets_vertices[d].begin(), | ||
[] __device__(int32_t v32) { return static_cast<vertex_t>(v32); }); | ||
|
||
thrust::copy(handle.get_thrust_policy(), | ||
chunk_sources.begin() + read_offset, | ||
chunk_sources.begin() + read_offset + level_size, | ||
distance_buckets_sources[d].begin()); | ||
|
||
read_offset += level_size; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't need to copy in loops. Just copy for the entire chunk.
// Check that number of sources doesn't overflow origin_t | ||
CUGRAPH_EXPECTS( | ||
cuda::std::distance(vertex_first, vertex_last) <= std::numeric_limits<origin_t>::max(), | ||
"Number of sources exceeds maximum value for origin_t (uint32_t), would cause overflow"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You may set origin_t to uint16_t to cut memory capacity & bandwidth requirements.
I don't think speedup will be dramatic but it can be non-negligible...
uint16_t can cover up to 65535 sources, and I assume for most practical use cases (except for tiny graphs), limiting the maximum number of concurrent sources to this number will be sufficient.
|
||
while (vertex_frontier.bucket(bucket_idx_cur).aggregate_size() > 0) { | ||
// Step 1: Extract ALL edges from frontier (filtered by unvisited vertices) | ||
using bfs_edge_tuple_t = thrust::tuple<vertex_t, origin_t, edge_t>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are migrating from thrust::tuple to cuda::std::tuple. Don't use thrust::tuple anymore.
[d_sigma_2d = sigmas_2d.begin(), num_vertices, vertex_partition] __device__( | ||
auto tagged_src, auto dst, auto, auto, auto) { | ||
auto src = thrust::get<0>(tagged_src); | ||
auto origin = thrust::get<1>(tagged_src); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thrust::get => cuda::std::get
auto src_idx = origin * num_vertices + src_offset; | ||
auto src_sigma = static_cast<edge_t>(d_sigma_2d[src_idx]); | ||
|
||
return thrust::make_tuple(dst, origin, src_sigma); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thrust::make_tuple=>cuda::std::make_tuple
auto num_unique = thrust::unique_count( | ||
handle.get_thrust_policy(), | ||
thrust::make_zip_iterator(frontier_vertices.begin(), frontier_origins.begin()), | ||
thrust::make_zip_iterator(frontier_vertices.end(), frontier_origins.end()), | ||
[] __device__(auto const& a, auto const& b) { return a == b; }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume you don't need this. You are not using this anywhere.
|
||
size_t cumulative_vertices = 0; | ||
for (vertex_t d = 0; d <= global_max_distance; ++d) { | ||
if (distance_buckets_vertices[d].size() > 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
distance_buckets_vertices[d] can't be 0, this if statement is not necessary.
As long as total_vertices > 0, distance_buckets_vertices[global_max_distance] >= 1 and there should be at least one vertex in each d.
} | ||
} | ||
|
||
if (distance_level_offsets.size() > 1) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this necessary? I assume distance_level_offsets.size() is always >= 2.
for (size_t level_idx = chunk_distance_start; level_idx < chunk_distance_end; ++level_idx) { | ||
vertex_t d = level_idx; | ||
size_t level_size = distance_buckets_vertices[d].size(); | ||
|
||
// Copy vertices | ||
thrust::copy(handle.get_thrust_policy(), | ||
distance_buckets_vertices[d].begin(), | ||
distance_buckets_vertices[d].end(), | ||
chunk_vertices.begin() + write_offset); | ||
|
||
// Copy sources | ||
thrust::copy(handle.get_thrust_policy(), | ||
distance_buckets_sources[d].begin(), | ||
distance_buckets_sources[d].end(), | ||
chunk_sources.begin() + write_offset); | ||
|
||
write_offset += level_size; | ||
chunk_segment_offsets.push_back(write_offset); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can skip this copy if you maintain a (vertex, origin) pairs in a consecutive array (instead of creating separate vector pairs for every d).
|
||
vertex_frontier_t<vertex_t, void, multi_gpu, true> vertex_frontier(handle, num_buckets); | ||
if constexpr (multi_gpu) { | ||
// Multi-GPU: Use sequential brandes_bfs (more reliable for cross-GPU) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"more reliable" doesn't sound right here. Our multi-source code is not ready for multi-GPU yet. Once it is updated, it should be as reliable as the sequential version.
include_endpoints, | ||
do_expensive_check); | ||
auto [distances_2d, sigmas_2d] = detail::multisource_bfs( | ||
handle, graph_view, edge_weight_view, vertices_begin, vertices_end, do_expensive_check); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where are you limiting the number of concurrent origins? You need to limit the number of origins within the size of origin_t. If the total number exceeds the limit, you need to call multisource_bfs (and backward_pass) in multiple rounds.
…arrays per distance level
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, and great job!!!
/ok to test 95764a4 |
/ok to test 642c4d0 |
/merge |
/ok to test d398316 |
/merge |
Update the Betweenness Centrality implementation to perform multi-source backwards pass concurrently.
Closes #5221