Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT MERGE] PR with prior cugraph/raft build to debug CentOS CI failure #1794

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/cmake/thirdparty/get_raft.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,5 @@ set(CUGRAPH_BRANCH_VERSION_raft "${CUGRAPH_VERSION_MAJOR}.${CUGRAPH_VERSION_MINO
# RPM_raft_SOURCE=/path/to/local/raft
find_and_configure_raft(VERSION ${CUGRAPH_MIN_VERSION_raft}
FORK rapidsai
PINNED_TAG branch-${CUGRAPH_BRANCH_VERSION_raft}
PINNED_TAG aab9b958399fee343e6ac9d476fd18fba4df04f8
)
15 changes: 7 additions & 8 deletions cpp/include/cugraph/detail/graph_utils.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -78,20 +78,19 @@ rmm::device_uvector<edge_t> compute_major_degrees(
[(detail::num_sparse_segments_per_vertex_partition + 2) * i +
detail::num_sparse_segments_per_vertex_partition]
: major_last;
auto execution_policy = handle.get_thrust_policy();
thrust::transform(execution_policy,
thrust::transform(rmm::exec_policy(handle.get_stream()),
thrust::make_counting_iterator(vertex_t{0}),
thrust::make_counting_iterator(major_hypersparse_first - major_first),
local_degrees.begin(),
[p_offsets] __device__(auto i) { return p_offsets[i + 1] - p_offsets[i]; });
if (use_dcs) {
auto p_dcs_nzd_vertices = (*adj_matrix_partition_dcs_nzd_vertices)[i];
auto dcs_nzd_vertex_count = (*adj_matrix_partition_dcs_nzd_vertex_counts)[i];
thrust::fill(execution_policy,
thrust::fill(rmm::exec_policy(handle.get_stream()),
local_degrees.begin() + (major_hypersparse_first - major_first),
local_degrees.begin() + (major_last - major_first),
edge_t{0});
thrust::for_each(execution_policy,
thrust::for_each(rmm::exec_policy(handle.get_stream()),
thrust::make_counting_iterator(vertex_t{0}),
thrust::make_counting_iterator(dcs_nzd_vertex_count),
[p_offsets,
Expand Down Expand Up @@ -124,10 +123,10 @@ rmm::device_uvector<edge_t> compute_major_degrees(raft::handle_t const& handle,
vertex_t number_of_vertices)
{
rmm::device_uvector<edge_t> degrees(number_of_vertices, handle.get_stream());
thrust::tabulate(
handle.get_thrust_policy(), degrees.begin(), degrees.end(), [offsets] __device__(auto i) {
return offsets[i + 1] - offsets[i];
});
thrust::tabulate(rmm::exec_policy(handle.get_stream()),
degrees.begin(),
degrees.end(),
[offsets] __device__(auto i) { return offsets[i + 1] - offsets[i]; });
return degrees;
}

Expand Down
20 changes: 10 additions & 10 deletions cpp/include/cugraph/prims/copy_to_adj_matrix_row_col.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ void copy_to_matrix_major(raft::handle_t const& handle,
assert(graph_view.get_number_of_local_vertices() == GraphViewType::is_adj_matrix_transposed
? graph_view.get_number_of_local_adj_matrix_partition_cols()
: graph_view.get_number_of_local_adj_matrix_partition_rows());
thrust::copy(handle.get_thrust_policy(),
thrust::copy(rmm::exec_policy(handle.get_stream()),
vertex_value_input_first,
vertex_value_input_first + graph_view.get_number_of_local_vertices(),
matrix_major_value_output_first);
Expand Down Expand Up @@ -169,7 +169,7 @@ void copy_to_matrix_major(raft::handle_t const& handle,
});
// FIXME: this gather (and temporary buffer) is unnecessary if NCCL directly takes a
// permutation iterator (and directly gathers to the internal buffer)
thrust::gather(handle.get_thrust_policy(),
thrust::gather(rmm::exec_policy(handle.get_stream()),
map_first,
map_first + thrust::distance(vertex_first, vertex_last),
vertex_value_input_first,
Expand All @@ -190,7 +190,7 @@ void copy_to_matrix_major(raft::handle_t const& handle,
// FIXME: this scatter is unnecessary if NCCL directly takes a permutation iterator (and
// directly scatters from the internal buffer)
thrust::scatter(
handle.get_thrust_policy(),
rmm::exec_policy(handle.get_stream()),
rx_value_first,
rx_value_first + rx_counts[i],
map_first,
Expand All @@ -203,7 +203,7 @@ void copy_to_matrix_major(raft::handle_t const& handle,
// FIXME: this scatter is unnecessary if NCCL directly takes a permutation iterator (and
// directly scatters from the internal buffer)
thrust::scatter(
handle.get_thrust_policy(),
rmm::exec_policy(handle.get_stream()),
rx_value_first,
rx_value_first + rx_counts[i],
map_first,
Expand All @@ -226,7 +226,7 @@ void copy_to_matrix_major(raft::handle_t const& handle,
? graph_view.get_number_of_local_adj_matrix_partition_cols()
: graph_view.get_number_of_local_adj_matrix_partition_rows());
auto val_first = thrust::make_permutation_iterator(vertex_value_input_first, vertex_first);
thrust::scatter(handle.get_thrust_policy(),
thrust::scatter(rmm::exec_policy(handle.get_stream()),
val_first,
val_first + thrust::distance(vertex_first, vertex_last),
vertex_first,
Expand Down Expand Up @@ -290,7 +290,7 @@ void copy_to_matrix_minor(raft::handle_t const& handle,
assert(graph_view.get_number_of_local_vertices() == GraphViewType::is_adj_matrix_transposed
? graph_view.get_number_of_local_adj_matrix_partition_rows()
: graph_view.get_number_of_local_adj_matrix_partition_cols());
thrust::copy(handle.get_thrust_policy(),
thrust::copy(rmm::exec_policy(handle.get_stream()),
vertex_value_input_first,
vertex_value_input_first + graph_view.get_number_of_local_vertices(),
matrix_minor_value_output_first);
Expand Down Expand Up @@ -360,7 +360,7 @@ void copy_to_matrix_minor(raft::handle_t const& handle,
});
// FIXME: this gather (and temporary buffer) is unnecessary if NCCL directly takes a
// permutation iterator (and directly gathers to the internal buffer)
thrust::gather(handle.get_thrust_policy(),
thrust::gather(rmm::exec_policy(handle.get_stream()),
map_first,
map_first + thrust::distance(vertex_first, vertex_last),
vertex_value_input_first,
Expand All @@ -380,7 +380,7 @@ void copy_to_matrix_minor(raft::handle_t const& handle,
});
// FIXME: this scatter is unnecessary if NCCL directly takes a permutation iterator (and
// directly scatters from the internal buffer)
thrust::scatter(handle.get_thrust_policy(),
thrust::scatter(rmm::exec_policy(handle.get_stream()),
rx_value_first,
rx_value_first + rx_counts[i],
map_first,
Expand All @@ -392,7 +392,7 @@ void copy_to_matrix_minor(raft::handle_t const& handle,
});
// FIXME: this scatter is unnecessary if NCCL directly takes a permutation iterator (and
// directly scatters from the internal buffer)
thrust::scatter(handle.get_thrust_policy(),
thrust::scatter(rmm::exec_policy(handle.get_stream()),
rx_value_first,
rx_value_first + rx_counts[i],
map_first,
Expand All @@ -414,7 +414,7 @@ void copy_to_matrix_minor(raft::handle_t const& handle,
assert(graph_view.get_number_of_local_vertices() ==
graph_view.get_number_of_local_adj_matrix_partition_rows());
auto val_first = thrust::make_permutation_iterator(vertex_value_input_first, vertex_first);
thrust::scatter(handle.get_thrust_policy(),
thrust::scatter(rmm::exec_policy(handle.get_stream()),
val_first,
val_first + thrust::distance(vertex_first, vertex_last),
vertex_first,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,14 +438,13 @@ void copy_v_transform_reduce_nbr(raft::handle_t const& handle,
minor_init = (row_comm_rank == 0) ? init : T{};
}

auto execution_policy = handle.get_thrust_policy();
if (GraphViewType::is_multi_gpu) {
thrust::fill(execution_policy,
thrust::fill(rmm::exec_policy(handle.get_stream()),
minor_buffer_first,
minor_buffer_first + minor_tmp_buffer_size,
minor_init);
} else {
thrust::fill(execution_policy,
thrust::fill(rmm::exec_policy(handle.get_stream()),
vertex_value_output_first,
vertex_value_output_first + graph_view.get_number_of_local_vertices(),
minor_init);
Expand Down Expand Up @@ -547,7 +546,7 @@ void copy_v_transform_reduce_nbr(raft::handle_t const& handle,
if constexpr (update_major) { // this is necessary as we don't visit every vertex in the
// hypersparse segment in
// for_all_major_for_all_nbr_hypersparse
thrust::fill(handle.get_thrust_policy(),
thrust::fill(rmm::exec_policy(handle.get_stream()),
output_buffer_first + (*segment_offsets)[3],
output_buffer_first + (*segment_offsets)[4],
major_init);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ void decompress_matrix_partition_to_fill_edgelist_majors(
vertex_t* majors,
std::optional<std::vector<vertex_t>> const& segment_offsets)
{
auto execution_policy = handle.get_thrust_policy();
if (segment_offsets) {
// FIXME: we may further improve performance by 1) concurrently running kernels on different
// segments; 2) individually tuning block sizes for different segments; and 3) adding one more
Expand Down Expand Up @@ -154,7 +153,7 @@ void decompress_matrix_partition_to_fill_edgelist_majors(
}
if ((*segment_offsets)[3] - (*segment_offsets)[2] > 0) {
thrust::for_each(
execution_policy,
rmm::exec_policy(handle.get_stream()),
thrust::make_counting_iterator(matrix_partition.get_major_first()) + (*segment_offsets)[2],
thrust::make_counting_iterator(matrix_partition.get_major_first()) + (*segment_offsets)[3],
[matrix_partition, majors] __device__(auto major) {
Expand All @@ -168,7 +167,7 @@ void decompress_matrix_partition_to_fill_edgelist_majors(
if (matrix_partition.get_dcs_nzd_vertex_count() &&
(*(matrix_partition.get_dcs_nzd_vertex_count()) > 0)) {
thrust::for_each(
execution_policy,
rmm::exec_policy(handle.get_stream()),
thrust::make_counting_iterator(vertex_t{0}),
thrust::make_counting_iterator(*(matrix_partition.get_dcs_nzd_vertex_count())),
[matrix_partition, major_start_offset = (*segment_offsets)[3], majors] __device__(
Expand All @@ -184,7 +183,7 @@ void decompress_matrix_partition_to_fill_edgelist_majors(
}
} else {
thrust::for_each(
execution_policy,
rmm::exec_policy(handle.get_stream()),
thrust::make_counting_iterator(matrix_partition.get_major_first()),
thrust::make_counting_iterator(matrix_partition.get_major_first()) +
matrix_partition.get_major_size(),
Expand Down Expand Up @@ -341,13 +340,12 @@ void copy_v_transform_reduce_key_aggregated_out_nbr(
}
// FIXME: these copies are unnecessary, better fix RAFT comm's bcast to take separate input &
// output pointers
auto execution_policy = handle.get_thrust_policy();
thrust::copy(execution_policy,
thrust::copy(rmm::exec_policy(handle.get_stream()),
map_key_first,
map_key_last,
map_keys.begin() + map_displacements[row_comm_rank]);
thrust::copy(
execution_policy,
rmm::exec_policy(handle.get_stream()),
map_value_first,
map_value_first + thrust::distance(map_key_first, map_key_last),
get_dataframe_buffer_begin<value_t>(map_value_buffer) + map_displacements[row_comm_rank]);
Expand Down Expand Up @@ -422,13 +420,12 @@ void copy_v_transform_reduce_key_aggregated_out_nbr(
matrix_partition.get_indices(),
detail::minor_to_key_t<VertexIterator0>{adj_matrix_col_key_first,
matrix_partition.get_minor_first()});
auto execution_policy = handle.get_thrust_policy();
thrust::copy(execution_policy,
thrust::copy(rmm::exec_policy(handle.get_stream()),
minor_key_first,
minor_key_first + matrix_partition.get_number_of_edges(),
tmp_minor_keys.begin());
if (graph_view.is_weighted()) {
thrust::copy(execution_policy,
thrust::copy(rmm::exec_policy(handle.get_stream()),
*(matrix_partition.get_weights()),
*(matrix_partition.get_weights()) + matrix_partition.get_number_of_edges(),
tmp_key_aggregated_edge_weights.begin());
Expand All @@ -451,24 +448,25 @@ void copy_v_transform_reduce_key_aggregated_out_nbr(
auto output_key_first = thrust::make_zip_iterator(
thrust::make_tuple(reduced_major_vertices.begin(), reduced_minor_keys.begin()));
if (graph_view.is_weighted()) {
thrust::sort_by_key(execution_policy,
thrust::sort_by_key(rmm::exec_policy(handle.get_stream()),
input_key_first,
input_key_first + tmp_major_vertices.size(),
tmp_key_aggregated_edge_weights.begin());
reduced_size = thrust::distance(
output_key_first,
thrust::get<0>(thrust::reduce_by_key(execution_policy,
thrust::get<0>(thrust::reduce_by_key(rmm::exec_policy(handle.get_stream()),
input_key_first,
input_key_first + tmp_major_vertices.size(),
tmp_key_aggregated_edge_weights.begin(),
output_key_first,
reduced_key_aggregated_edge_weights.begin())));
} else {
thrust::sort(
execution_policy, input_key_first, input_key_first + tmp_major_vertices.size());
thrust::sort(rmm::exec_policy(handle.get_stream()),
input_key_first,
input_key_first + tmp_major_vertices.size());
reduced_size = thrust::distance(
output_key_first,
thrust::get<0>(thrust::reduce_by_key(execution_policy,
thrust::get<0>(thrust::reduce_by_key(rmm::exec_policy(handle.get_stream()),
input_key_first,
input_key_first + tmp_major_vertices.size(),
thrust::make_constant_iterator(weight_t{1.0}),
Expand Down Expand Up @@ -517,15 +515,14 @@ void copy_v_transform_reduce_key_aggregated_out_nbr(

auto pair_first = thrust::make_zip_iterator(
thrust::make_tuple(rx_major_vertices.begin(), rx_minor_keys.begin()));
auto execution_policy = handle.get_thrust_policy();
thrust::sort_by_key(execution_policy,
thrust::sort_by_key(rmm::exec_policy(handle.get_stream()),
pair_first,
pair_first + rx_major_vertices.size(),
rx_key_aggregated_edge_weights.begin());
tmp_major_vertices.resize(rx_major_vertices.size(), handle.get_stream());
tmp_minor_keys.resize(tmp_major_vertices.size(), handle.get_stream());
tmp_key_aggregated_edge_weights.resize(tmp_major_vertices.size(), handle.get_stream());
auto pair_it = thrust::reduce_by_key(execution_policy,
auto pair_it = thrust::reduce_by_key(rmm::exec_policy(handle.get_stream()),
pair_first,
pair_first + rx_major_vertices.size(),
rx_key_aggregated_edge_weights.begin(),
Expand All @@ -549,7 +546,7 @@ void copy_v_transform_reduce_key_aggregated_out_nbr(
auto triplet_first = thrust::make_zip_iterator(thrust::make_tuple(
tmp_major_vertices.begin(), tmp_minor_keys.begin(), tmp_key_aggregated_edge_weights.begin()));
thrust::transform(
handle.get_thrust_policy(),
rmm::exec_policy(handle.get_stream()),
triplet_first,
triplet_first + tmp_major_vertices.size(),
tmp_e_op_result_buffer_first,
Expand Down Expand Up @@ -635,18 +632,17 @@ void copy_v_transform_reduce_key_aggregated_out_nbr(
#endif
}

auto execution_policy = handle.get_thrust_policy();
thrust::fill(execution_policy,
thrust::fill(rmm::exec_policy(handle.get_stream()),
vertex_value_output_first,
vertex_value_output_first + graph_view.get_number_of_local_vertices(),
T{});
thrust::sort_by_key(execution_policy,
thrust::sort_by_key(rmm::exec_policy(handle.get_stream()),
major_vertices.begin(),
major_vertices.end(),
get_dataframe_buffer_begin<T>(e_op_result_buffer));

auto num_uniques = thrust::count_if(
execution_policy,
rmm::exec_policy(handle.get_stream()),
thrust::make_counting_iterator(size_t{0}),
thrust::make_counting_iterator(major_vertices.size()),
[major_vertices = major_vertices.data()] __device__(auto i) {
Expand All @@ -662,13 +658,13 @@ void copy_v_transform_reduce_key_aggregated_out_nbr(
: invalid_vertex_id<vertex_t>::value;
});
thrust::copy_if(
execution_policy,
rmm::exec_policy(handle.get_stream()),
major_vertex_first,
major_vertex_first + major_vertices.size(),
unique_major_vertices.begin(),
[] __device__(auto major) { return major != invalid_vertex_id<vertex_t>::value; });
thrust::reduce_by_key(
execution_policy,
rmm::exec_policy(handle.get_stream()),
major_vertices.begin(),
major_vertices.end(),
get_dataframe_buffer_begin<T>(e_op_result_buffer),
Expand All @@ -684,7 +680,7 @@ void copy_v_transform_reduce_key_aggregated_out_nbr(
thrust::equal_to<vertex_t>{},
reduce_op);

thrust::transform(execution_policy,
thrust::transform(rmm::exec_policy(handle.get_stream()),
vertex_value_output_first,
vertex_value_output_first + graph_view.get_number_of_local_vertices(),
vertex_value_output_first,
Expand Down
5 changes: 3 additions & 2 deletions cpp/include/cugraph/prims/count_if_v.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ typename GraphViewType::vertex_type count_if_v(raft::handle_t const& handle,
VertexOp v_op)
{
auto count =
thrust::count_if(handle.get_thrust_policy(),
thrust::count_if(rmm::exec_policy(handle.get_stream()),
vertex_value_input_first,
vertex_value_input_first + graph_view.get_number_of_local_vertices(),
v_op);
Expand Down Expand Up @@ -92,7 +92,8 @@ typename GraphViewType::vertex_type count_if_v(raft::handle_t const& handle,
InputIterator input_last,
VertexOp v_op)
{
auto count = thrust::count_if(handle.get_thrust_policy(), input_first, input_last, v_op);
auto count =
thrust::count_if(rmm::exec_policy(handle.get_stream()), input_first, input_last, v_op);
if (GraphViewType::is_multi_gpu) {
count = host_scalar_allreduce(handle.get_comms(), count, handle.get_stream());
}
Expand Down
4 changes: 2 additions & 2 deletions cpp/include/cugraph/prims/reduce_v.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ T reduce_v(raft::handle_t const& handle,
T init)
{
auto ret = thrust::reduce(
handle.get_thrust_policy(),
rmm::exec_policy(handle.get_stream()),
vertex_value_input_first,
vertex_value_input_first + graph_view.get_number_of_local_vertices(),
((GraphViewType::is_multi_gpu) && (handle.get_comms().get_rank() == 0)) ? init : T{},
Expand Down Expand Up @@ -89,7 +89,7 @@ T reduce_v(raft::handle_t const& handle,
T init)
{
auto ret = thrust::reduce(
handle.get_thrust_policy(),
rmm::exec_policy(handle.get_stream()),
input_first,
input_last,
((GraphViewType::is_multi_gpu) && (handle.get_comms().get_rank() == 0)) ? init : T{},
Expand Down
Loading