-
Notifications
You must be signed in to change notification settings - Fork 908
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add partitioning APIs to pylibcudf (#16781)
Contributes to #15162 Authors: - Matthew Roeschke (https://github.com/mroeschke) - Matthew Murray (https://github.com/Matt711) - Vyas Ramasubramani (https://github.com/vyasr) Approvers: - Matthew Murray (https://github.com/Matt711) - Vyas Ramasubramani (https://github.com/vyasr) URL: #16781
- Loading branch information
Showing
11 changed files
with
229 additions
and
54 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
6 changes: 6 additions & 0 deletions
6
docs/cudf/source/user_guide/api_docs/pylibcudf/partitioning.rst
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
============ | ||
partitioning | ||
============ | ||
|
||
.. automodule:: pylibcudf.partitioning | ||
:members: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
# Copyright (c) 2024, NVIDIA CORPORATION. | ||
|
||
from .column cimport Column | ||
from .table cimport Table | ||
|
||
|
||
cpdef tuple[Table, list] hash_partition( | ||
Table input, | ||
list columns_to_hash, | ||
int num_partitions | ||
) | ||
|
||
cpdef tuple[Table, list] partition(Table t, Column partition_map, int num_partitions) | ||
|
||
cpdef tuple[Table, list] round_robin_partition( | ||
Table input, | ||
int num_partitions, | ||
int start_partition=* | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
# Copyright (c) 2024, NVIDIA CORPORATION. | ||
|
||
cimport pylibcudf.libcudf.types as libcudf_types | ||
from libcpp.memory cimport unique_ptr | ||
from libcpp.pair cimport pair | ||
from libcpp.utility cimport move | ||
from libcpp.vector cimport vector | ||
from pylibcudf.libcudf cimport partitioning as cpp_partitioning | ||
from pylibcudf.libcudf.table.table cimport table | ||
|
||
from .column cimport Column | ||
from .table cimport Table | ||
|
||
|
||
cpdef tuple[Table, list] hash_partition( | ||
Table input, | ||
list columns_to_hash, | ||
int num_partitions | ||
): | ||
""" | ||
Partitions rows from the input table into multiple output tables. | ||
For details, see :cpp:func:`hash_partition`. | ||
Parameters | ||
---------- | ||
input : Table | ||
The table to partition | ||
columns_to_hash : list[int] | ||
Indices of input columns to hash | ||
num_partitions : int | ||
The number of partitions to use | ||
Returns | ||
------- | ||
tuple[Table, list[int]] | ||
An output table and a vector of row offsets to each partition | ||
""" | ||
cdef pair[unique_ptr[table], vector[libcudf_types.size_type]] c_result | ||
cdef vector[libcudf_types.size_type] c_columns_to_hash = columns_to_hash | ||
cdef int c_num_partitions = num_partitions | ||
|
||
with nogil: | ||
c_result = move( | ||
cpp_partitioning.hash_partition( | ||
input.view(), c_columns_to_hash, c_num_partitions | ||
) | ||
) | ||
|
||
return Table.from_libcudf(move(c_result.first)), list(c_result.second) | ||
|
||
cpdef tuple[Table, list] partition(Table t, Column partition_map, int num_partitions): | ||
""" | ||
Partitions rows of `t` according to the mapping specified by `partition_map`. | ||
For details, see :cpp:func:`partition`. | ||
Parameters | ||
---------- | ||
t : Table | ||
The table to partition | ||
partition_map : Column | ||
Non-nullable column of integer values that map each row | ||
in `t` to it's partition. | ||
num_partitions : int | ||
The total number of partitions | ||
Returns | ||
------- | ||
tuple[Table, list[int]] | ||
An output table and a list of row offsets to each partition | ||
""" | ||
cdef pair[unique_ptr[table], vector[libcudf_types.size_type]] c_result | ||
cdef int c_num_partitions = num_partitions | ||
|
||
with nogil: | ||
c_result = move( | ||
cpp_partitioning.partition(t.view(), partition_map.view(), c_num_partitions) | ||
) | ||
|
||
return Table.from_libcudf(move(c_result.first)), list(c_result.second) | ||
|
||
|
||
cpdef tuple[Table, list] round_robin_partition( | ||
Table input, | ||
int num_partitions, | ||
int start_partition=0 | ||
): | ||
""" | ||
Round-robin partition. | ||
For details, see :cpp:func:`round_robin_partition`. | ||
Parameters | ||
---------- | ||
input : Table | ||
The input table to be round-robin partitioned | ||
num_partitions : int | ||
Number of partitions for the table | ||
start_partition : int, default 0 | ||
Index of the 1st partition | ||
Returns | ||
------- | ||
tuple[Table, list[int]] | ||
The partitioned table and the partition offsets | ||
for each partition within the table. | ||
""" | ||
cdef pair[unique_ptr[table], vector[libcudf_types.size_type]] c_result | ||
cdef int c_num_partitions = num_partitions | ||
cdef int c_start_partition = start_partition | ||
|
||
with nogil: | ||
c_result = move( | ||
cpp_partitioning.round_robin_partition( | ||
input.view(), c_num_partitions, c_start_partition | ||
) | ||
) | ||
|
||
return Table.from_libcudf(move(c_result.first)), list(c_result.second) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
# Copyright (c) 2024, NVIDIA CORPORATION. | ||
|
||
import pyarrow as pa | ||
import pylibcudf as plc | ||
import pytest | ||
from utils import assert_table_eq | ||
|
||
|
||
@pytest.fixture(scope="module") | ||
def partitioning_data(): | ||
data = {"a": [1, 2, 3], "b": [1, 2, 5], "c": [1, 2, 10]} | ||
pa_table = pa.table(data) | ||
plc_table = plc.interop.from_arrow(pa_table) | ||
return data, plc_table, pa_table | ||
|
||
|
||
def test_partition(partitioning_data): | ||
raw_data, plc_table, pa_table = partitioning_data | ||
result, result_offsets = plc.partitioning.partition( | ||
plc_table, | ||
plc.interop.from_arrow(pa.array([0, 0, 0])), | ||
1, | ||
) | ||
expected = pa.table( | ||
list(raw_data.values()), | ||
schema=pa.schema([pa.field("", pa.int64(), nullable=False)] * 3), | ||
) | ||
assert_table_eq(expected, result) | ||
assert result_offsets == [0, 3] | ||
|
||
|
||
def test_hash_partition(partitioning_data): | ||
raw_data, plc_table, pa_table = partitioning_data | ||
result, result_offsets = plc.partitioning.hash_partition( | ||
plc_table, [0, 1], 1 | ||
) | ||
expected = pa.table( | ||
list(raw_data.values()), | ||
schema=pa.schema([pa.field("", pa.int64(), nullable=False)] * 3), | ||
) | ||
assert_table_eq(expected, result) | ||
assert result_offsets == [0] | ||
|
||
|
||
def test_round_robin_partition(partitioning_data): | ||
raw_data, plc_table, pa_table = partitioning_data | ||
result, result_offsets = plc.partitioning.round_robin_partition( | ||
plc_table, 1, 0 | ||
) | ||
expected = pa.table( | ||
list(raw_data.values()), | ||
schema=pa.schema([pa.field("", pa.int64(), nullable=False)] * 3), | ||
) | ||
assert_table_eq(expected, result) | ||
assert result_offsets == [0] |