diff --git a/conda/recipes/libcudf/build.sh b/conda/recipes/libcudf/build.sh index fef3dabd733..a3a0415575b 100644 --- a/conda/recipes/libcudf/build.sh +++ b/conda/recipes/libcudf/build.sh @@ -5,5 +5,5 @@ export cudf_ROOT="$(realpath ./cpp/build)" ./build.sh -n -v \ libcudf libcudf_kafka benchmarks tests \ - --build_metrics --incl_cache_stats \ + --build_metrics --incl_cache_stats --allgpuarch \ --cmake-args=\"-DCMAKE_INSTALL_LIBDIR=lib -DCUDF_ENABLE_ARROW_S3=ON\" diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index f637db66c2c..ca85996b990 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -439,7 +439,6 @@ add_library( src/io/utilities/data_sink.cpp src/io/utilities/datasource.cpp src/io/utilities/file_io_utilities.cpp - src/io/utilities/parsing_utils.cu src/io/utilities/row_selection.cpp src/io/utilities/type_inference.cu src/io/utilities/trie.cu diff --git a/cpp/benchmarks/text/tokenize.cpp b/cpp/benchmarks/text/tokenize.cpp index 2151b28d637..e83310e0343 100644 --- a/cpp/benchmarks/text/tokenize.cpp +++ b/cpp/benchmarks/text/tokenize.cpp @@ -39,8 +39,10 @@ static void bench_tokenize(nvbench::state& state) state.skip("Skip benchmarks greater than size_type limit"); } - data_profile const profile = data_profile_builder().distribution( - cudf::type_id::STRING, distribution_id::NORMAL, 0, row_width); + data_profile const profile = + data_profile_builder() + .distribution(cudf::type_id::STRING, distribution_id::NORMAL, 0, row_width) + .no_validity(); auto const column = create_random_column(cudf::type_id::STRING, row_count{num_rows}, profile); cudf::strings_column_view input(column->view()); diff --git a/cpp/include/cudf/io/parquet.hpp b/cpp/include/cudf/io/parquet.hpp index b2f949cdcee..51eeed5b721 100644 --- a/cpp/include/cudf/io/parquet.hpp +++ b/cpp/include/cudf/io/parquet.hpp @@ -29,6 +29,7 @@ #include #include #include +#include #include namespace cudf::io { @@ -576,22 +577,16 @@ struct sorting_column { bool is_nulls_first{true}; //!< true if nulls come before non-null values }; -class parquet_writer_options_builder; - /** - * @brief Settings for `write_parquet()`. + * @brief Base settings for `write_parquet()` and `parquet_chunked_writer`. */ -class parquet_writer_options { +class parquet_writer_options_base { // Specify the sink to use for writer output sink_info _sink; // Specify the compression format to use compression_type _compression = compression_type::SNAPPY; // Specify the level of statistics in the output file statistics_freq _stats_level = statistics_freq::STATISTICS_ROWGROUP; - // Sets of columns to output - table_view _table; - // Partitions described as {start_row, num_rows} pairs - std::vector _partitions; // Optional associated metadata std::optional _metadata; // Optional footer key_value_metadata @@ -602,8 +597,6 @@ class parquet_writer_options { // Parquet writer can write timestamps as UTC // Defaults to true because libcudf timestamps are implicitly UTC bool _write_timestamps_as_UTC = true; - // Column chunks file paths to be set in the raw output metadata. One per output file - std::vector _column_chunks_file_paths; // Maximum size of each row group (unless smaller than a single page) size_t _row_group_size_bytes = default_row_group_size_bytes; // Maximum number of rows in row group (unless smaller than a single page) @@ -627,18 +620,13 @@ class parquet_writer_options { // Which columns in _table are used for sorting std::optional> _sorting_columns; + protected: /** - * @brief Constructor from sink and table. + * @brief Constructor from sink. * * @param sink The sink used for writer output - * @param table Table to be written to output */ - explicit parquet_writer_options(sink_info const& sink, table_view const& table) - : _sink(sink), _table(table) - { - } - - friend parquet_writer_options_builder; + explicit parquet_writer_options_base(sink_info const& sink) : _sink(sink) {} public: /** @@ -646,24 +634,7 @@ class parquet_writer_options { * * This has been added since Cython requires a default constructor to create objects on stack. */ - parquet_writer_options() = default; - - /** - * @brief Create builder to create `parquet_writer_options`. - * - * @param sink The sink used for writer output - * @param table Table to be written to output - * - * @return Builder to build parquet_writer_options - */ - static parquet_writer_options_builder builder(sink_info const& sink, table_view const& table); - - /** - * @brief Create builder to create `parquet_writer_options`. - * - * @return parquet_writer_options_builder - */ - static parquet_writer_options_builder builder(); + parquet_writer_options_base() = default; /** * @brief Returns sink info. @@ -686,20 +657,6 @@ class parquet_writer_options { */ [[nodiscard]] statistics_freq get_stats_level() const { return _stats_level; } - /** - * @brief Returns table_view. - * - * @return Table view - */ - [[nodiscard]] table_view get_table() const { return _table; } - - /** - * @brief Returns partitions. - * - * @return Partitions - */ - [[nodiscard]] std::vector const& get_partitions() const { return _partitions; } - /** * @brief Returns associated metadata. * @@ -712,7 +669,8 @@ class parquet_writer_options { * * @return Key-Value footer metadata information */ - std::vector> const& get_key_value_metadata() const + [[nodiscard]] std::vector> const& get_key_value_metadata() + const { return _user_data; } @@ -722,7 +680,7 @@ class parquet_writer_options { * * @return `true` if timestamps will be written as INT96 */ - bool is_enabled_int96_timestamps() const { return _write_timestamps_as_int96; } + [[nodiscard]] bool is_enabled_int96_timestamps() const { return _write_timestamps_as_int96; } /** * @brief Returns `true` if timestamps will be written as UTC @@ -731,29 +689,19 @@ class parquet_writer_options { */ [[nodiscard]] auto is_enabled_utc_timestamps() const { return _write_timestamps_as_UTC; } - /** - * @brief Returns Column chunks file paths to be set in the raw output metadata. - * - * @return Column chunks file paths to be set in the raw output metadata - */ - std::vector const& get_column_chunks_file_paths() const - { - return _column_chunks_file_paths; - } - /** * @brief Returns maximum row group size, in bytes. * * @return Maximum row group size, in bytes */ - auto get_row_group_size_bytes() const { return _row_group_size_bytes; } + [[nodiscard]] auto get_row_group_size_bytes() const { return _row_group_size_bytes; } /** * @brief Returns maximum row group size, in rows. * * @return Maximum row group size, in rows */ - auto get_row_group_size_rows() const { return _row_group_size_rows; } + [[nodiscard]] auto get_row_group_size_rows() const { return _row_group_size_rows; } /** * @brief Returns the maximum uncompressed page size, in bytes. @@ -762,7 +710,7 @@ class parquet_writer_options { * * @return Maximum uncompressed page size, in bytes */ - auto get_max_page_size_bytes() const + [[nodiscard]] auto get_max_page_size_bytes() const { return std::min(_max_page_size_bytes, get_row_group_size_bytes()); } @@ -774,7 +722,7 @@ class parquet_writer_options { * * @return Maximum page size, in rows */ - auto get_max_page_size_rows() const + [[nodiscard]] auto get_max_page_size_rows() const { return std::min(_max_page_size_rows, get_row_group_size_rows()); } @@ -784,7 +732,10 @@ class parquet_writer_options { * * @return length min/max will be truncated to */ - auto get_column_index_truncate_length() const { return _column_index_truncate_length; } + [[nodiscard]] auto get_column_index_truncate_length() const + { + return _column_index_truncate_length; + } /** * @brief Returns policy for dictionary use. @@ -831,20 +782,12 @@ class parquet_writer_options { */ [[nodiscard]] auto const& get_sorting_columns() const { return _sorting_columns; } - /** - * @brief Sets partitions. - * - * @param partitions Partitions of input table in {start_row, num_rows} pairs. If specified, must - * be same size as number of sinks in sink_info - */ - void set_partitions(std::vector partitions); - /** * @brief Sets metadata. * * @param metadata Associated metadata */ - void set_metadata(table_input_metadata metadata) { _metadata = std::move(metadata); } + void set_metadata(table_input_metadata metadata); /** * @brief Sets metadata. @@ -858,14 +801,13 @@ class parquet_writer_options { * * @param sf Level of statistics requested in the output file */ - void set_stats_level(statistics_freq sf) { _stats_level = sf; } - + void set_stats_level(statistics_freq sf); /** * @brief Sets compression type. * * @param compression The compression type to use */ - void set_compression(compression_type compression) { _compression = compression; } + void set_compression(compression_type compression); /** * @brief Sets timestamp writing preferences. INT96 timestamps will be written @@ -873,22 +815,14 @@ class parquet_writer_options { * * @param req Boolean value to enable/disable writing of INT96 timestamps */ - void enable_int96_timestamps(bool req) { _write_timestamps_as_int96 = req; } + void enable_int96_timestamps(bool req); /** * @brief Sets preference for writing timestamps as UTC. Write timestamps as UTC if set to `true`. * * @param val Boolean value to enable/disable writing of timestamps as UTC. */ - void enable_utc_timestamps(bool val) { _write_timestamps_as_UTC = val; } - - /** - * @brief Sets column chunks file path to be set in the raw output metadata. - * - * @param file_paths Vector of Strings which indicates file path. Must be same size as number of - * data sinks in sink info - */ - void set_column_chunks_file_paths(std::vector file_paths); + void enable_utc_timestamps(bool val); /** * @brief Sets the maximum row group size, in bytes. @@ -951,116 +885,84 @@ class parquet_writer_options { * * @param comp_stats Pointer to compression statistics to be updated after writing */ - void set_compression_statistics(std::shared_ptr comp_stats) - { - _compression_stats = std::move(comp_stats); - } + void set_compression_statistics(std::shared_ptr comp_stats); /** * @brief Sets preference for V2 page headers. Write V2 page headers if set to `true`. * * @param val Boolean value to enable/disable writing of V2 page headers. */ - void enable_write_v2_headers(bool val) { _v2_page_headers = val; } + void enable_write_v2_headers(bool val); /** * @brief Sets sorting columns. * * @param sorting_columns Column sort order metadata */ - void set_sorting_columns(std::vector sorting_columns) - { - _sorting_columns = std::move(sorting_columns); - } + void set_sorting_columns(std::vector sorting_columns); }; /** - * @brief Class to build `parquet_writer_options`. + * @brief Base class for Parquet options builders. */ -class parquet_writer_options_builder { - parquet_writer_options options; +template +class parquet_writer_options_builder_base { + OptionsT _options; - public: + protected: /** - * @brief Default constructor. + * @brief Return reference to the options object being built * - * This has been added since Cython requires a default constructor to create objects on stack. + * @return the options object */ - explicit parquet_writer_options_builder() = default; + inline OptionsT& get_options() { return _options; } /** - * @brief Constructor from sink and table. + * @brief Constructor from options. * - * @param sink The sink used for writer output - * @param table Table to be written to output + * @param options Options object to build */ - explicit parquet_writer_options_builder(sink_info const& sink, table_view const& table) - : options(sink, table) - { - } + explicit parquet_writer_options_builder_base(OptionsT options); + public: /** - * @brief Sets partitions in parquet_writer_options. + * @brief Default constructor. * - * @param partitions Partitions of input table in {start_row, num_rows} pairs. If specified, must - * be same size as number of sinks in sink_info - * @return this for chaining + * This has been added since Cython requires a default constructor to create objects on stack. */ - parquet_writer_options_builder& partitions(std::vector partitions); + explicit parquet_writer_options_builder_base() = default; /** - * @brief Sets metadata in parquet_writer_options. + * @brief Sets metadata. * * @param metadata Associated metadata * @return this for chaining */ - parquet_writer_options_builder& metadata(table_input_metadata metadata) - { - options._metadata = std::move(metadata); - return *this; - } + BuilderT& metadata(table_input_metadata metadata); /** - * @brief Sets Key-Value footer metadata in parquet_writer_options. + * @brief Sets Key-Value footer metadata. * * @param metadata Key-Value footer metadata * @return this for chaining */ - parquet_writer_options_builder& key_value_metadata( - std::vector> metadata); + BuilderT& key_value_metadata(std::vector> metadata); /** - * @brief Sets the level of statistics in parquet_writer_options. + * @brief Sets the level of statistics. * * @param sf Level of statistics requested in the output file * @return this for chaining */ - parquet_writer_options_builder& stats_level(statistics_freq sf) - { - options._stats_level = sf; - return *this; - } + BuilderT& stats_level(statistics_freq sf); /** - * @brief Sets compression type in parquet_writer_options. + * @brief Sets compression type. * * @param compression The compression type to use * @return this for chaining */ - parquet_writer_options_builder& compression(compression_type compression) - { - options._compression = compression; - return *this; - } - - /** - * @brief Sets column chunks file path to be set in the raw output metadata. - * - * @param file_paths Vector of Strings which indicates file path. Must be same size as number of - * data sinks - * @return this for chaining - */ - parquet_writer_options_builder& column_chunks_file_paths(std::vector file_paths); + BuilderT& compression(compression_type compression); /** * @brief Sets the maximum row group size, in bytes. @@ -1068,11 +970,7 @@ class parquet_writer_options_builder { * @param val maximum row group size * @return this for chaining */ - parquet_writer_options_builder& row_group_size_bytes(size_t val) - { - options.set_row_group_size_bytes(val); - return *this; - } + BuilderT& row_group_size_bytes(size_t val); /** * @brief Sets the maximum number of rows in output row groups. @@ -1080,11 +978,7 @@ class parquet_writer_options_builder { * @param val maximum number or rows * @return this for chaining */ - parquet_writer_options_builder& row_group_size_rows(size_type val) - { - options.set_row_group_size_rows(val); - return *this; - } + BuilderT& row_group_size_rows(size_type val); /** * @brief Sets the maximum uncompressed page size, in bytes. @@ -1096,11 +990,7 @@ class parquet_writer_options_builder { * @param val maximum page size * @return this for chaining */ - parquet_writer_options_builder& max_page_size_bytes(size_t val) - { - options.set_max_page_size_bytes(val); - return *this; - } + BuilderT& max_page_size_bytes(size_t val); /** * @brief Sets the maximum page size, in rows. Counts only top-level rows, ignoring any nesting. @@ -1109,11 +999,7 @@ class parquet_writer_options_builder { * @param val maximum rows per page * @return this for chaining */ - parquet_writer_options_builder& max_page_size_rows(size_type val) - { - options.set_max_page_size_rows(val); - return *this; - } + BuilderT& max_page_size_rows(size_type val); /** * @brief Sets the desired maximum size in bytes for min and max values in the column index. @@ -1128,11 +1014,7 @@ class parquet_writer_options_builder { * @param val length min/max will be truncated to, with 0 indicating no truncation * @return this for chaining */ - parquet_writer_options_builder& column_index_truncate_length(int32_t val) - { - options.set_column_index_truncate_length(val); - return *this; - } + BuilderT& column_index_truncate_length(int32_t val); /** * @brief Sets the policy for dictionary use. @@ -1151,7 +1033,7 @@ class parquet_writer_options_builder { * @param val policy for dictionary use * @return this for chaining */ - parquet_writer_options_builder& dictionary_policy(enum dictionary_policy val); + BuilderT& dictionary_policy(enum dictionary_policy val); /** * @brief Sets the maximum dictionary size, in bytes. @@ -1164,7 +1046,7 @@ class parquet_writer_options_builder { * @param val maximum dictionary size * @return this for chaining */ - parquet_writer_options_builder& max_dictionary_size(size_t val); + BuilderT& max_dictionary_size(size_t val); /** * @brief Sets the maximum page fragment size, in rows. @@ -1176,7 +1058,7 @@ class parquet_writer_options_builder { * @param val maximum page fragment size * @return this for chaining */ - parquet_writer_options_builder& max_page_fragment_size(size_type val); + BuilderT& max_page_fragment_size(size_type val); /** * @brief Sets the pointer to the output compression statistics. @@ -1184,24 +1066,16 @@ class parquet_writer_options_builder { * @param comp_stats Pointer to compression statistics to be filled once writer is done * @return this for chaining */ - parquet_writer_options_builder& compression_statistics( - std::shared_ptr const& comp_stats) - { - options._compression_stats = comp_stats; - return *this; - } + BuilderT& compression_statistics( + std::shared_ptr const& comp_stats); /** - * @brief Sets whether int96 timestamps are written or not in parquet_writer_options. + * @brief Sets whether int96 timestamps are written or not. * * @param enabled Boolean value to enable/disable int96 timestamps * @return this for chaining */ - parquet_writer_options_builder& int96_timestamps(bool enabled) - { - options._write_timestamps_as_int96 = enabled; - return *this; - } + BuilderT& int96_timestamps(bool enabled); /** * @brief Set to true if timestamps are to be written as UTC. @@ -1209,126 +1083,60 @@ class parquet_writer_options_builder { * @param enabled Boolean value to enable/disable writing of timestamps as UTC. * @return this for chaining */ - parquet_writer_options_builder& utc_timestamps(bool enabled) - { - options._write_timestamps_as_UTC = enabled; - return *this; - } - + BuilderT& utc_timestamps(bool enabled); /** * @brief Set to true if V2 page headers are to be written. * * @param enabled Boolean value to enable/disable writing of V2 page headers. * @return this for chaining */ - parquet_writer_options_builder& write_v2_headers(bool enabled); + BuilderT& write_v2_headers(bool enabled); /** - * @brief Sets column sorting metadata to chunked_parquet_writer_options. + * @brief Sets column sorting metadata. * * @param sorting_columns Column sort order metadata * @return this for chaining */ - parquet_writer_options_builder& sorting_columns(std::vector sorting_columns); + BuilderT& sorting_columns(std::vector sorting_columns); /** - * @brief move parquet_writer_options member once it's built. + * @brief move options member once it's built. */ - operator parquet_writer_options&&() { return std::move(options); } + operator OptionsT&&(); /** - * @brief move parquet_writer_options member once it's built. + * @brief move options member once it's built. * * This has been added since Cython does not support overloading of conversion operators. * * @return Built `parquet_writer_options` object's r-value reference */ - parquet_writer_options&& build() { return std::move(options); } + OptionsT&& build(); }; -/** - * @brief Writes a set of columns to parquet format. - * - * The following code snippet demonstrates how to write columns to a file: - * @code - * auto destination = cudf::io::sink_info("dataset.parquet"); - * auto options = cudf::io::parquet_writer_options::builder(destination, table->view()); - * cudf::io::write_parquet(options); - * @endcode - * - * @param options Settings for controlling writing behavior - * @param stream CUDA stream used for device memory operations and kernel launches - * @return A blob that contains the file metadata (parquet FileMetadata thrift message) if - * requested in parquet_writer_options (empty blob otherwise). - */ - -std::unique_ptr> write_parquet( - parquet_writer_options const& options, rmm::cuda_stream_view stream = cudf::get_default_stream()); +class parquet_writer_options_builder; /** - * @brief Merges multiple raw metadata blobs that were previously created by write_parquet - * into a single metadata blob. - * - * @ingroup io_writers - * - * @param[in] metadata_list List of input file metadata - * @return A parquet-compatible blob that contains the data for all row groups in the list + * @brief Settings for `write_parquet()`. */ -std::unique_ptr> merge_row_group_metadata( - std::vector>> const& metadata_list); - -class chunked_parquet_writer_options_builder; +class parquet_writer_options : public parquet_writer_options_base { + // Sets of columns to output + table_view _table; + // Partitions described as {start_row, num_rows} pairs + std::vector _partitions; + // Column chunks file paths to be set in the raw output metadata. One per output file + std::vector _column_chunks_file_paths; -/** - * @brief Settings for `write_parquet_chunked()`. - */ -class chunked_parquet_writer_options { - // Specify the sink to use for writer output - sink_info _sink; - // Specify the compression format to use - compression_type _compression = compression_type::AUTO; - // Specify the level of statistics in the output file - statistics_freq _stats_level = statistics_freq::STATISTICS_ROWGROUP; - // Optional associated metadata. - std::optional _metadata; - // Optional footer key_value_metadata - std::vector> _user_data; - // Parquet writer can write INT96 or TIMESTAMP_MICROS. Defaults to TIMESTAMP_MICROS. - // If true then overrides any per-column setting in _metadata. - bool _write_timestamps_as_int96 = false; - // Parquet writer can write timestamps as UTC. Defaults to true. - bool _write_timestamps_as_UTC = true; - // Maximum size of each row group (unless smaller than a single page) - size_t _row_group_size_bytes = default_row_group_size_bytes; - // Maximum number of rows in row group (unless smaller than a single page) - size_type _row_group_size_rows = default_row_group_size_rows; - // Maximum size of each page (uncompressed) - size_t _max_page_size_bytes = default_max_page_size_bytes; - // Maximum number of rows in a page - size_type _max_page_size_rows = default_max_page_size_rows; - // Maximum size of min or max values in column index - int32_t _column_index_truncate_length = default_column_index_truncate_length; - // When to use dictionary encoding for data - dictionary_policy _dictionary_policy = dictionary_policy::ADAPTIVE; - // Maximum size of column chunk dictionary (in bytes) - size_t _max_dictionary_size = default_max_dictionary_size; - // Maximum number of rows in a page fragment - std::optional _max_page_fragment_size; - // Optional compression statistics - std::shared_ptr _compression_stats; - // write V2 page headers? - bool _v2_page_headers = false; - // Which columns in _table are used for sorting - std::optional> _sorting_columns; + friend parquet_writer_options_builder; /** - * @brief Constructor from sink. + * @brief Constructor from sink and table. * - * @param sink Sink used for writer output + * @param sink The sink used for writer output + * @param table Table to be written to output */ - explicit chunked_parquet_writer_options(sink_info const& sink) : _sink(sink) {} - - friend chunked_parquet_writer_options_builder; + explicit parquet_writer_options(sink_info const& sink, table_view const& table); public: /** @@ -1336,277 +1144,160 @@ class chunked_parquet_writer_options { * * This has been added since Cython requires a default constructor to create objects on stack. */ - chunked_parquet_writer_options() = default; + parquet_writer_options() = default; /** - * @brief Returns sink info. + * @brief Create builder to create `parquet_writer_options`. * - * @return Sink info + * @param sink The sink used for writer output + * @param table Table to be written to output + * + * @return Builder to build parquet_writer_options */ - [[nodiscard]] sink_info const& get_sink() const { return _sink; } + static parquet_writer_options_builder builder(sink_info const& sink, table_view const& table); /** - * @brief Returns compression format used. + * @brief Create builder to create `parquet_writer_options`. * - * @return Compression format + * @return parquet_writer_options_builder */ - [[nodiscard]] compression_type get_compression() const { return _compression; } + static parquet_writer_options_builder builder(); /** - * @brief Returns level of statistics requested in output file. + * @brief Returns table_view. * - * @return Level of statistics requested in output file + * @return Table view */ - [[nodiscard]] statistics_freq get_stats_level() const { return _stats_level; } + [[nodiscard]] table_view get_table() const { return _table; } /** - * @brief Returns metadata information. + * @brief Returns partitions. * - * @return Metadata information + * @return Partitions */ - [[nodiscard]] auto const& get_metadata() const { return _metadata; } + [[nodiscard]] std::vector const& get_partitions() const { return _partitions; } /** - * @brief Returns Key-Value footer metadata information. + * @brief Returns Column chunks file paths to be set in the raw output metadata. * - * @return Key-Value footer metadata information + * @return Column chunks file paths to be set in the raw output metadata */ - std::vector> const& get_key_value_metadata() const + [[nodiscard]] std::vector const& get_column_chunks_file_paths() const { - return _user_data; - } - - /** - * @brief Returns `true` if timestamps will be written as INT96 - * - * @return `true` if timestamps will be written as INT96 - */ - bool is_enabled_int96_timestamps() const { return _write_timestamps_as_int96; } - - /** - * @brief Returns `true` if timestamps will be written as UTC - * - * @return `true` if timestamps will be written as UTC - */ - [[nodiscard]] auto is_enabled_utc_timestamps() const { return _write_timestamps_as_UTC; } - - /** - * @brief Returns maximum row group size, in bytes. - * - * @return Maximum row group size, in bytes - */ - auto get_row_group_size_bytes() const { return _row_group_size_bytes; } - - /** - * @brief Returns maximum row group size, in rows. - * - * @return Maximum row group size, in rows - */ - auto get_row_group_size_rows() const { return _row_group_size_rows; } - - /** - * @brief Returns maximum uncompressed page size, in bytes. - * - * If set larger than the row group size, then this will return the - * row group size. - * - * @return Maximum uncompressed page size, in bytes - */ - auto get_max_page_size_bytes() const - { - return std::min(_max_page_size_bytes, get_row_group_size_bytes()); - } - - /** - * @brief Returns maximum page size, in rows. - * - * If set larger than the row group size, then this will return the row group size. - * - * @return Maximum page size, in rows - */ - auto get_max_page_size_rows() const - { - return std::min(_max_page_size_rows, get_row_group_size_rows()); - } - - /** - * @brief Returns maximum length of min or max values in column index, in bytes. - * - * @return length min/max will be truncated to - */ - auto get_column_index_truncate_length() const { return _column_index_truncate_length; } - - /** - * @brief Returns policy for dictionary use. - * - * @return policy for dictionary use - */ - [[nodiscard]] dictionary_policy get_dictionary_policy() const { return _dictionary_policy; } - - /** - * @brief Returns maximum dictionary size, in bytes. - * - * @return Maximum dictionary size, in bytes. - */ - [[nodiscard]] auto get_max_dictionary_size() const { return _max_dictionary_size; } - - /** - * @brief Returns maximum page fragment size, in rows. - * - * @return Maximum page fragment size, in rows. - */ - [[nodiscard]] auto get_max_page_fragment_size() const { return _max_page_fragment_size; } - - /** - * @brief Returns a shared pointer to the user-provided compression statistics. - * - * @return Compression statistics - */ - [[nodiscard]] std::shared_ptr get_compression_statistics() const - { - return _compression_stats; + return _column_chunks_file_paths; } /** - * @brief Returns `true` if V2 page headers should be written. - * - * @return `true` if V2 page headers should be written. - */ - [[nodiscard]] auto is_enabled_write_v2_headers() const { return _v2_page_headers; } - - /** - * @brief Returns the sorting_columns. - * - * @return Column sort order metadata - */ - [[nodiscard]] auto const& get_sorting_columns() const { return _sorting_columns; } - - /** - * @brief Sets metadata. - * - * @param metadata Associated metadata - */ - void set_metadata(table_input_metadata metadata) { _metadata = std::move(metadata); } - - /** - * @brief Sets Key-Value footer metadata. - * - * @param metadata Key-Value footer metadata - */ - void set_key_value_metadata(std::vector> metadata); - - /** - * @brief Sets the level of statistics in parquet_writer_options. - * - * @param sf Level of statistics requested in the output file - */ - void set_stats_level(statistics_freq sf) { _stats_level = sf; } - - /** - * @brief Sets compression type. - * - * @param compression The compression type to use - */ - void set_compression(compression_type compression) { _compression = compression; } - - /** - * @brief Sets timestamp writing preferences. - * - * INT96 timestamps will be written if `true` and TIMESTAMP_MICROS will be written if `false`. + * @brief Sets partitions. * - * @param req Boolean value to enable/disable writing of INT96 timestamps + * @param partitions Partitions of input table in {start_row, num_rows} pairs. If specified, must + * be same size as number of sinks in sink_info */ - void enable_int96_timestamps(bool req) { _write_timestamps_as_int96 = req; } + void set_partitions(std::vector partitions); /** - * @brief Sets preference for writing timestamps as UTC. Write timestamps as UTC if set to `true`. + * @brief Sets column chunks file path to be set in the raw output metadata. * - * @param val Boolean value to enable/disable writing of timestamps as UTC. + * @param file_paths Vector of Strings which indicates file path. Must be same size as number of + * data sinks in sink info */ - void enable_utc_timestamps(bool val) { _write_timestamps_as_UTC = val; } + void set_column_chunks_file_paths(std::vector file_paths); +}; +/** + * @brief Class to build `parquet_writer_options`. + */ +class parquet_writer_options_builder + : public parquet_writer_options_builder_base { + public: /** - * @brief Sets the maximum row group size, in bytes. + * @brief Default constructor. * - * @param size_bytes Maximum row group size, in bytes to set + * This has been added since Cython requires a default constructor to create objects on stack. */ - void set_row_group_size_bytes(size_t size_bytes); + explicit parquet_writer_options_builder() = default; /** - * @brief Sets the maximum row group size, in rows. + * @brief Constructor from sink and table. * - * @param size_rows The maximum row group size, in rows to set + * @param sink The sink used for writer output + * @param table Table to be written to output */ - void set_row_group_size_rows(size_type size_rows); + explicit parquet_writer_options_builder(sink_info const& sink, table_view const& table); /** - * @brief Sets the maximum uncompressed page size, in bytes. + * @brief Sets partitions in parquet_writer_options. * - * @param size_bytes Maximum uncompressed page size, in bytes to set + * @param partitions Partitions of input table in {start_row, num_rows} pairs. If specified, must + * be same size as number of sinks in sink_info + * @return this for chaining */ - void set_max_page_size_bytes(size_t size_bytes); + parquet_writer_options_builder& partitions(std::vector partitions); /** - * @brief Sets the maximum page size, in rows. + * @brief Sets column chunks file path to be set in the raw output metadata. * - * @param size_rows The maximum page size, in rows to set + * @param file_paths Vector of Strings which indicates file path. Must be same size as number of + * data sinks + * @return this for chaining */ - void set_max_page_size_rows(size_type size_rows); + parquet_writer_options_builder& column_chunks_file_paths(std::vector file_paths); +}; - /** - * @brief Sets the maximum length of min or max values in column index, in bytes. - * - * @param size_bytes length min/max will be truncated to - */ - void set_column_index_truncate_length(int32_t size_bytes); +/** + * @brief Writes a set of columns to parquet format. + * + * The following code snippet demonstrates how to write columns to a file: + * @code + * auto destination = cudf::io::sink_info("dataset.parquet"); + * auto options = cudf::io::parquet_writer_options::builder(destination, table->view()); + * cudf::io::write_parquet(options); + * @endcode + * + * @param options Settings for controlling writing behavior + * @param stream CUDA stream used for device memory operations and kernel launches + * @return A blob that contains the file metadata (parquet FileMetadata thrift message) if + * requested in parquet_writer_options (empty blob otherwise). + */ - /** - * @brief Sets the policy for dictionary use. - * - * @param policy Policy for dictionary use - */ - void set_dictionary_policy(dictionary_policy policy); +std::unique_ptr> write_parquet( + parquet_writer_options const& options, rmm::cuda_stream_view stream = cudf::get_default_stream()); - /** - * @brief Sets the maximum dictionary size, in bytes. - * - * @param size_bytes Maximum dictionary size, in bytes - */ - void set_max_dictionary_size(size_t size_bytes); +/** + * @brief Merges multiple raw metadata blobs that were previously created by write_parquet + * into a single metadata blob. + * + * @ingroup io_writers + * + * @param[in] metadata_list List of input file metadata + * @return A parquet-compatible blob that contains the data for all row groups in the list + */ +std::unique_ptr> merge_row_group_metadata( + std::vector>> const& metadata_list); - /** - * @brief Sets the maximum page fragment size, in rows. - * - * @param size_rows Maximum page fragment size, in rows. - */ - void set_max_page_fragment_size(size_type size_rows); +class chunked_parquet_writer_options_builder; +/** + * @brief Settings for `parquet_chunked_writer`. + */ +class chunked_parquet_writer_options : public parquet_writer_options_base { /** - * @brief Sets the pointer to the output compression statistics. + * @brief Constructor from sink. * - * @param comp_stats Pointer to compression statistics to be updated after writing + * @param sink Sink used for writer output */ - void set_compression_statistics(std::shared_ptr comp_stats) - { - _compression_stats = std::move(comp_stats); - } + explicit chunked_parquet_writer_options(sink_info const& sink); - /** - * @brief Sets preference for V2 page headers. Write V2 page headers if set to `true`. - * - * @param val Boolean value to enable/disable writing of V2 page headers. - */ - void enable_write_v2_headers(bool val) { _v2_page_headers = val; } + friend chunked_parquet_writer_options_builder; + public: /** - * @brief Sets sorting columns. + * @brief Default constructor. * - * @param sorting_columns Column sort order metadata + * This has been added since Cython requires a default constructor to create objects on stack. */ - void set_sorting_columns(std::vector sorting_columns) - { - _sorting_columns = std::move(sorting_columns); - } + chunked_parquet_writer_options() = default; /** * @brief creates builder to build chunked_parquet_writer_options. @@ -1619,11 +1310,11 @@ class chunked_parquet_writer_options { }; /** - * @brief Builds options for chunked_parquet_writer_options. + * @brief Class to build `chunked_parquet_writer_options`. */ -class chunked_parquet_writer_options_builder { - chunked_parquet_writer_options options; - +class chunked_parquet_writer_options_builder + : public parquet_writer_options_builder_base { public: /** * @brief Default constructor. @@ -1637,238 +1328,7 @@ class chunked_parquet_writer_options_builder { * * @param sink The sink used for writer output */ - chunked_parquet_writer_options_builder(sink_info const& sink) : options(sink){}; - - /** - * @brief Sets metadata to chunked_parquet_writer_options. - * - * @param metadata Associated metadata - * @return this for chaining - */ - chunked_parquet_writer_options_builder& metadata(table_input_metadata metadata) - { - options._metadata = std::move(metadata); - return *this; - } - - /** - * @brief Sets Key-Value footer metadata in parquet_writer_options. - * - * @param metadata Key-Value footer metadata - * @return this for chaining - */ - chunked_parquet_writer_options_builder& key_value_metadata( - std::vector> metadata); - - /** - * @brief Sets the level of statistics in chunked_parquet_writer_options. - * - * @param sf Level of statistics requested in the output file - * @return this for chaining - */ - chunked_parquet_writer_options_builder& stats_level(statistics_freq sf) - { - options._stats_level = sf; - return *this; - } - - /** - * @brief Sets compression type to chunked_parquet_writer_options. - * - * @param compression The compression type to use - * @return this for chaining - */ - chunked_parquet_writer_options_builder& compression(compression_type compression) - { - options._compression = compression; - return *this; - } - - /** - * @brief Set to true if timestamps should be written as - * int96 types instead of int64 types. Even though int96 is deprecated and is - * not an internal type for cudf, it needs to be written for backwards - * compatibility reasons. - * - * @param enabled Boolean value to enable/disable int96 timestamps - * @return this for chaining - */ - chunked_parquet_writer_options_builder& int96_timestamps(bool enabled) - { - options._write_timestamps_as_int96 = enabled; - return *this; - } - - /** - * @brief Set to true if timestamps are to be written as UTC. - * - * @param enabled Boolean value to enable/disable writing of timestamps as UTC. - * @return this for chaining - */ - chunked_parquet_writer_options_builder& utc_timestamps(bool enabled) - { - options._write_timestamps_as_UTC = enabled; - return *this; - } - - /** - * @brief Set to true if V2 page headers are to be written. - * - * @param enabled Boolean value to enable/disable writing of V2 page headers. - * @return this for chaining - */ - chunked_parquet_writer_options_builder& write_v2_headers(bool enabled); - - /** - * @brief Sets the maximum row group size, in bytes. - * - * @param val maximum row group size - * @return this for chaining - */ - chunked_parquet_writer_options_builder& row_group_size_bytes(size_t val) - { - options.set_row_group_size_bytes(val); - return *this; - } - - /** - * @brief Sets the maximum number of rows in output row groups. - * - * @param val maximum number or rows - * @return this for chaining - */ - chunked_parquet_writer_options_builder& row_group_size_rows(size_type val) - { - options.set_row_group_size_rows(val); - return *this; - } - - /** - * @brief Sets the maximum uncompressed page size, in bytes. - * - * Serves as a hint to the writer, and can be exceeded under certain circumstances. Cannot be - * larger than the row group size in bytes, and will be adjusted to match if it is. - * - * @param val maximum page size - * @return this for chaining - */ - chunked_parquet_writer_options_builder& max_page_size_bytes(size_t val) - { - options.set_max_page_size_bytes(val); - return *this; - } - - /** - * @brief Sets the maximum page size, in rows. Counts only top-level rows, ignoring any nesting. - * Cannot be larger than the row group size in rows, and will be adjusted to match if it is. - * - * @param val maximum rows per page - * @return this for chaining - */ - chunked_parquet_writer_options_builder& max_page_size_rows(size_type val) - { - options.set_max_page_size_rows(val); - return *this; - } - - /** - * @brief Sets the desired maximum size in bytes for min and max values in the column index. - * - * Values exceeding this limit will be truncated, but modified such that they will still - * be valid lower and upper bounds. This only applies to variable length types, such as string. - * Maximum values will not be truncated if there is no suitable truncation that results in - * a valid upper bound. - * - * Default value is 64. - * - * @param val length min/max will be truncated to, with 0 indicating no truncation - * @return this for chaining - */ - chunked_parquet_writer_options_builder& column_index_truncate_length(int32_t val) - { - options.set_column_index_truncate_length(val); - return *this; - } - - /** - * @brief Sets the policy for dictionary use. - * - * Certain compression algorithms (e.g Zstandard) have limits on how large of a buffer can - * be compressed. In some circumstances, the dictionary can grow beyond this limit, which - * will prevent the column from being compressed. This setting controls how the writer - * should act in these circumstances. A setting of dictionary_policy::ADAPTIVE will disable - * dictionary encoding for columns where the dictionary exceeds the limit. A setting of - * dictionary_policy::NEVER will disable the use of dictionary encoding globally. A setting of - * dictionary_policy::ALWAYS will allow the use of dictionary encoding even if it will result in - * the disabling of compression for columns that would otherwise be compressed. - * - * The default value is dictionary_policy::ADAPTIVE. - * - * @param val policy for dictionary use - * @return this for chaining - */ - chunked_parquet_writer_options_builder& dictionary_policy(enum dictionary_policy val); - - /** - * @brief Sets the maximum dictionary size, in bytes. - * - * Disables dictionary encoding for any column chunk where the dictionary will - * exceed this limit. Only used when the dictionary_policy is set to 'ADAPTIVE'. - * - * Default value is 1048576 (1MiB). - * - * @param val maximum dictionary size - * @return this for chaining - */ - chunked_parquet_writer_options_builder& max_dictionary_size(size_t val); - - /** - * @brief Sets the maximum page fragment size, in rows. - * - * Files with nested schemas or very long strings may need a page fragment size - * smaller than the default value of 5000 to ensure a single fragment will not - * exceed the desired maximum page size in bytes. - * - * @param val maximum page fragment size - * @return this for chaining - */ - chunked_parquet_writer_options_builder& max_page_fragment_size(size_type val); - - /** - * @brief Sets the pointer to the output compression statistics. - * - * @param comp_stats Pointer to compression statistics to be filled once writer is done - * @return this for chaining - */ - chunked_parquet_writer_options_builder& compression_statistics( - std::shared_ptr const& comp_stats) - { - options._compression_stats = comp_stats; - return *this; - } - - /** - * @brief Sets column sorting metadata to chunked_parquet_writer_options. - * - * @param sorting_columns Column sort order metadata - * @return this for chaining - */ - chunked_parquet_writer_options_builder& sorting_columns( - std::vector sorting_columns); - - /** - * @brief move chunked_parquet_writer_options member once it's built. - */ - operator chunked_parquet_writer_options&&() { return std::move(options); } - - /** - * @brief move chunked_parquet_writer_options member once it's is built. - * - * This has been added since Cython does not support overloading of conversion operators. - * - * @return Built `chunked_parquet_writer_options` object's r-value reference - */ - chunked_parquet_writer_options&& build() { return std::move(options); } + chunked_parquet_writer_options_builder(sink_info const& sink); }; /** diff --git a/cpp/include/nvtext/tokenize.hpp b/cpp/include/nvtext/tokenize.hpp index ea1b9c716f0..29fed0759c7 100644 --- a/cpp/include/nvtext/tokenize.hpp +++ b/cpp/include/nvtext/tokenize.hpp @@ -176,7 +176,8 @@ std::unique_ptr count_tokens( * t is now ["h","e","l","l","o"," ","w","o","r","l","d","g","o","o","d","b","y","e"] * @endcode * - * All null row entries are ignored and the output contains all valid rows. + * @throw std::invalid_argument if `input` contains nulls + * @throw std::overflow_error if the output would produce more than max size_type rows * * @param input Strings column to tokenize * @param stream CUDA stream used for device memory operations and kernel launches diff --git a/cpp/src/io/functions.cpp b/cpp/src/io/functions.cpp index 3ba2facf276..1ed8ee5ce06 100644 --- a/cpp/src/io/functions.cpp +++ b/cpp/src/io/functions.cpp @@ -115,7 +115,7 @@ parquet_writer_options_builder parquet_writer_options::builder() chunked_parquet_writer_options_builder chunked_parquet_writer_options::builder( sink_info const& sink) { - return chunked_parquet_writer_options_builder(sink); + return chunked_parquet_writer_options_builder{sink}; } namespace { @@ -740,29 +740,37 @@ void parquet_reader_options::set_num_rows(size_type val) _num_rows = val; } -void parquet_writer_options::set_partitions(std::vector partitions) +void parquet_writer_options_base::set_metadata(table_input_metadata metadata) { - CUDF_EXPECTS(partitions.size() == _sink.num_sinks(), - "Mismatch between number of sinks and number of partitions"); - _partitions = std::move(partitions); + _metadata = std::move(metadata); } -void parquet_writer_options::set_key_value_metadata( +void parquet_writer_options_base::set_key_value_metadata( std::vector> metadata) { - CUDF_EXPECTS(metadata.size() == _sink.num_sinks(), + CUDF_EXPECTS(metadata.size() == get_sink().num_sinks(), "Mismatch between number of sinks and number of metadata maps"); _user_data = std::move(metadata); } -void parquet_writer_options::set_column_chunks_file_paths(std::vector file_paths) +void parquet_writer_options_base::set_stats_level(statistics_freq sf) { _stats_level = sf; } + +void parquet_writer_options_base::set_compression(compression_type compression) { - CUDF_EXPECTS(file_paths.size() == _sink.num_sinks(), - "Mismatch between number of sinks and number of chunk paths to set"); - _column_chunks_file_paths = std::move(file_paths); + _compression = compression; +} + +void parquet_writer_options_base::enable_int96_timestamps(bool req) +{ + _write_timestamps_as_int96 = req; +} + +void parquet_writer_options_base::enable_utc_timestamps(bool val) +{ + _write_timestamps_as_UTC = val; } -void parquet_writer_options::set_row_group_size_bytes(size_t size_bytes) +void parquet_writer_options_base::set_row_group_size_bytes(size_t size_bytes) { CUDF_EXPECTS( size_bytes >= 1024, @@ -770,13 +778,13 @@ void parquet_writer_options::set_row_group_size_bytes(size_t size_bytes) _row_group_size_bytes = size_bytes; } -void parquet_writer_options::set_row_group_size_rows(size_type size_rows) +void parquet_writer_options_base::set_row_group_size_rows(size_type size_rows) { CUDF_EXPECTS(size_rows > 0, "The maximum row group row count must be a positive integer."); _row_group_size_rows = size_rows; } -void parquet_writer_options::set_max_page_size_bytes(size_t size_bytes) +void parquet_writer_options_base::set_max_page_size_bytes(size_t size_bytes) { CUDF_EXPECTS(size_bytes >= 1024, "The maximum page size cannot be smaller than 1KB."); CUDF_EXPECTS(size_bytes <= static_cast(std::numeric_limits::max()), @@ -784,190 +792,249 @@ void parquet_writer_options::set_max_page_size_bytes(size_t size_bytes) _max_page_size_bytes = size_bytes; } -void parquet_writer_options::set_max_page_size_rows(size_type size_rows) +void parquet_writer_options_base::set_max_page_size_rows(size_type size_rows) { CUDF_EXPECTS(size_rows > 0, "The maximum page row count must be a positive integer."); _max_page_size_rows = size_rows; } -void parquet_writer_options::set_column_index_truncate_length(int32_t size_bytes) +void parquet_writer_options_base::set_column_index_truncate_length(int32_t size_bytes) { CUDF_EXPECTS(size_bytes >= 0, "Column index truncate length cannot be negative."); _column_index_truncate_length = size_bytes; } -void parquet_writer_options::set_dictionary_policy(dictionary_policy policy) +void parquet_writer_options_base::set_dictionary_policy(dictionary_policy policy) { _dictionary_policy = policy; } -void parquet_writer_options::set_max_dictionary_size(size_t size_bytes) +void parquet_writer_options_base::set_max_dictionary_size(size_t size_bytes) { CUDF_EXPECTS(size_bytes <= static_cast(std::numeric_limits::max()), "The maximum dictionary size cannot exceed 2GB."); _max_dictionary_size = size_bytes; } -void parquet_writer_options::set_max_page_fragment_size(size_type size_rows) +void parquet_writer_options_base::set_max_page_fragment_size(size_type size_rows) { CUDF_EXPECTS(size_rows > 0, "Page fragment size must be a positive integer."); _max_page_fragment_size = size_rows; } -parquet_writer_options_builder& parquet_writer_options_builder::partitions( - std::vector partitions) +void parquet_writer_options_base::set_compression_statistics( + std::shared_ptr comp_stats) { - options.set_partitions(std::move(partitions)); - return *this; + _compression_stats = std::move(comp_stats); +} + +void parquet_writer_options_base::enable_write_v2_headers(bool val) { _v2_page_headers = val; } + +void parquet_writer_options_base::set_sorting_columns(std::vector sorting_columns) +{ + _sorting_columns = std::move(sorting_columns); +} + +parquet_writer_options::parquet_writer_options(sink_info const& sink, table_view const& table) + : parquet_writer_options_base(sink), _table(table) +{ +} + +void parquet_writer_options::set_partitions(std::vector partitions) +{ + CUDF_EXPECTS(partitions.size() == get_sink().num_sinks(), + "Mismatch between number of sinks and number of partitions"); + _partitions = std::move(partitions); +} + +void parquet_writer_options::set_column_chunks_file_paths(std::vector file_paths) +{ + CUDF_EXPECTS(file_paths.size() == get_sink().num_sinks(), + "Mismatch between number of sinks and number of chunk paths to set"); + _column_chunks_file_paths = std::move(file_paths); +} + +template +parquet_writer_options_builder_base::parquet_writer_options_builder_base( + OptionsT options) + : _options(std::move(options)) +{ +} + +template +BuilderT& parquet_writer_options_builder_base::metadata( + table_input_metadata metadata) +{ + _options.set_metadata(std::move(metadata)); + return static_cast(*this); } -parquet_writer_options_builder& parquet_writer_options_builder::key_value_metadata( +template +BuilderT& parquet_writer_options_builder_base::key_value_metadata( std::vector> metadata) { - options.set_key_value_metadata(std::move(metadata)); - return *this; + _options.set_key_value_metadata(std::move(metadata)); + return static_cast(*this); } -parquet_writer_options_builder& parquet_writer_options_builder::column_chunks_file_paths( - std::vector file_paths) +template +BuilderT& parquet_writer_options_builder_base::stats_level(statistics_freq sf) { - options.set_column_chunks_file_paths(std::move(file_paths)); - return *this; + _options.set_stats_level(sf); + return static_cast(*this); } -parquet_writer_options_builder& parquet_writer_options_builder::dictionary_policy( - enum dictionary_policy val) +template +BuilderT& parquet_writer_options_builder_base::compression( + compression_type compression) { - options.set_dictionary_policy(val); - return *this; + _options.set_compression(compression); + return static_cast(*this); } -parquet_writer_options_builder& parquet_writer_options_builder::max_dictionary_size(size_t val) +template +BuilderT& parquet_writer_options_builder_base::row_group_size_bytes(size_t val) { - options.set_max_dictionary_size(val); - return *this; + _options.set_row_group_size_bytes(val); + return static_cast(*this); } -parquet_writer_options_builder& parquet_writer_options_builder::max_page_fragment_size( +template +BuilderT& parquet_writer_options_builder_base::row_group_size_rows( size_type val) { - options.set_max_page_fragment_size(val); - return *this; + _options.set_row_group_size_rows(val); + return static_cast(*this); } -parquet_writer_options_builder& parquet_writer_options_builder::write_v2_headers(bool enabled) +template +BuilderT& parquet_writer_options_builder_base::max_page_size_bytes(size_t val) { - options.enable_write_v2_headers(enabled); - return *this; + _options.set_max_page_size_bytes(val); + return static_cast(*this); } -parquet_writer_options_builder& parquet_writer_options_builder::sorting_columns( - std::vector sorting_columns) +template +BuilderT& parquet_writer_options_builder_base::max_page_size_rows(size_type val) { - options._sorting_columns = std::move(sorting_columns); - return *this; + _options.set_max_page_size_rows(val); + return static_cast(*this); } -void chunked_parquet_writer_options::set_key_value_metadata( - std::vector> metadata) +template +BuilderT& parquet_writer_options_builder_base::column_index_truncate_length( + int32_t val) { - CUDF_EXPECTS(metadata.size() == _sink.num_sinks(), - "Mismatch between number of sinks and number of metadata maps"); - _user_data = std::move(metadata); + _options.set_column_index_truncate_length(val); + return static_cast(*this); } -void chunked_parquet_writer_options::set_row_group_size_bytes(size_t size_bytes) +template +BuilderT& parquet_writer_options_builder_base::dictionary_policy( + enum dictionary_policy val) { - CUDF_EXPECTS( - size_bytes >= 1024, - "The maximum row group size cannot be smaller than the minimum page size, which is 1KB."); - _row_group_size_bytes = size_bytes; + _options.set_dictionary_policy(val); + return static_cast(*this); } -void chunked_parquet_writer_options::set_row_group_size_rows(size_type size_rows) +template +BuilderT& parquet_writer_options_builder_base::max_dictionary_size(size_t val) { - CUDF_EXPECTS(size_rows > 0, "The maximum row group row count must be a positive integer."); - _row_group_size_rows = size_rows; + _options.set_max_dictionary_size(val); + return static_cast(*this); } -void chunked_parquet_writer_options::set_max_page_size_bytes(size_t size_bytes) +template +BuilderT& parquet_writer_options_builder_base::max_page_fragment_size( + size_type val) { - CUDF_EXPECTS(size_bytes >= 1024, "The maximum page size cannot be smaller than 1KB."); - CUDF_EXPECTS(size_bytes <= static_cast(std::numeric_limits::max()), - "The maximum page size cannot exceed 2GB."); - _max_page_size_bytes = size_bytes; + _options.set_max_page_fragment_size(val); + return static_cast(*this); } -void chunked_parquet_writer_options::set_max_page_size_rows(size_type size_rows) +template +BuilderT& parquet_writer_options_builder_base::compression_statistics( + std::shared_ptr const& comp_stats) { - CUDF_EXPECTS(size_rows > 0, "The maximum page row count must be a positive integer."); - _max_page_size_rows = size_rows; + _options.set_compression_statistics(comp_stats); + return static_cast(*this); } -void chunked_parquet_writer_options::set_column_index_truncate_length(int32_t size_bytes) +template +BuilderT& parquet_writer_options_builder_base::int96_timestamps(bool enabled) { - CUDF_EXPECTS(size_bytes >= 0, "Column index truncate length cannot be negative."); - _column_index_truncate_length = size_bytes; + _options.enable_int96_timestamps(enabled); + return static_cast(*this); } -void chunked_parquet_writer_options::set_dictionary_policy(dictionary_policy policy) +template +BuilderT& parquet_writer_options_builder_base::utc_timestamps(bool enabled) { - _dictionary_policy = policy; + _options.enable_utc_timestamps(enabled); + return static_cast(*this); } -void chunked_parquet_writer_options::set_max_dictionary_size(size_t size_bytes) +template +BuilderT& parquet_writer_options_builder_base::write_v2_headers(bool enabled) { - CUDF_EXPECTS(size_bytes <= static_cast(std::numeric_limits::max()), - "The maximum dictionary size cannot exceed 2GB."); - _max_dictionary_size = size_bytes; + _options.enable_write_v2_headers(enabled); + return static_cast(*this); } -void chunked_parquet_writer_options::set_max_page_fragment_size(size_type size_rows) +template +BuilderT& parquet_writer_options_builder_base::sorting_columns( + std::vector sorting_columns) { - CUDF_EXPECTS(size_rows > 0, "Page fragment size must be a positive integer."); - _max_page_fragment_size = size_rows; + _options.set_sorting_columns(std::move(sorting_columns)); + return static_cast(*this); } -chunked_parquet_writer_options_builder& chunked_parquet_writer_options_builder::key_value_metadata( - std::vector> metadata) +template +parquet_writer_options_builder_base::operator OptionsT&&() { - options.set_key_value_metadata(std::move(metadata)); - return *this; + return std::move(_options); } -chunked_parquet_writer_options_builder& chunked_parquet_writer_options_builder::dictionary_policy( - enum dictionary_policy val) +template +OptionsT&& parquet_writer_options_builder_base::build() { - options.set_dictionary_policy(val); - return *this; + return std::move(_options); } -chunked_parquet_writer_options_builder& chunked_parquet_writer_options_builder::max_dictionary_size( - size_t val) +template class parquet_writer_options_builder_base; +template class parquet_writer_options_builder_base; + +parquet_writer_options_builder::parquet_writer_options_builder(sink_info const& sink, + table_view const& table) + : parquet_writer_options_builder_base(parquet_writer_options{sink, table}) { - options.set_max_dictionary_size(val); - return *this; } -chunked_parquet_writer_options_builder& chunked_parquet_writer_options_builder::write_v2_headers( - bool enabled) +parquet_writer_options_builder& parquet_writer_options_builder::partitions( + std::vector partitions) { - options.enable_write_v2_headers(enabled); + get_options().set_partitions(std::move(partitions)); return *this; } -chunked_parquet_writer_options_builder& chunked_parquet_writer_options_builder::sorting_columns( - std::vector sorting_columns) +parquet_writer_options_builder& parquet_writer_options_builder::column_chunks_file_paths( + std::vector file_paths) { - options._sorting_columns = std::move(sorting_columns); + get_options().set_column_chunks_file_paths(std::move(file_paths)); return *this; } -chunked_parquet_writer_options_builder& -chunked_parquet_writer_options_builder::max_page_fragment_size(size_type val) +chunked_parquet_writer_options::chunked_parquet_writer_options(sink_info const& sink) + : parquet_writer_options_base(sink) +{ +} + +chunked_parquet_writer_options_builder::chunked_parquet_writer_options_builder( + sink_info const& sink) + : parquet_writer_options_builder_base(chunked_parquet_writer_options{sink}) { - options.set_max_page_fragment_size(val); - return *this; } } // namespace cudf::io diff --git a/cpp/src/io/utilities/parsing_utils.cu b/cpp/src/io/utilities/parsing_utils.cu deleted file mode 100644 index cb8be380c5b..00000000000 --- a/cpp/src/io/utilities/parsing_utils.cu +++ /dev/null @@ -1,221 +0,0 @@ -/* - * Copyright (c) 2019-2024, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include -#include -#include -#include - -#include - -#include - -#include - -namespace cudf { -namespace io { -namespace { -// When processing the input in chunks, this is the maximum size of each chunk. -// Only one chunk is loaded on the GPU at a time, so this value is chosen to -// be small enough to fit on the GPU in most cases. -constexpr size_t max_chunk_bytes = 256 * 1024 * 1024; // 256MB - -constexpr int bytes_per_find_thread = 64; - -using pos_key_pair = thrust::pair; - -template -constexpr T divCeil(T dividend, T divisor) noexcept -{ - return (dividend + divisor - 1) / divisor; -} - -/** - * @brief Sets the specified element of the array to the passed value - */ -template -__device__ __forceinline__ void setElement(T* array, cudf::size_type idx, T const& t, V const&) -{ - array[idx] = t; -} - -/** - * @brief Sets the specified element of the array of pairs using the two passed - * parameters. - */ -template -__device__ __forceinline__ void setElement(thrust::pair* array, - cudf::size_type idx, - T const& t, - V const& v) -{ - array[idx] = {t, v}; -} - -/** - * @brief Overloads the setElement() functions for void* arrays. - * Does not do anything, indexing is not allowed with void* arrays. - */ -template -__device__ __forceinline__ void setElement(void*, cudf::size_type, T const&, V const&) -{ -} - -/** - * @brief CUDA kernel that finds all occurrences of a character in the given - * character array. If the 'positions' parameter is not void*, - * positions of all occurrences are stored in the output array. - * - * @param[in] data Pointer to the input character array - * @param[in] size Number of bytes in the input array - * @param[in] offset Offset to add to the output positions - * @param[in] key Character to find in the array - * @param[in,out] count Pointer to the number of found occurrences - * @param[out] positions Array containing the output positions - */ -template -CUDF_KERNEL void count_and_set_positions(char const* data, - uint64_t size, - uint64_t offset, - char const key, - cudf::size_type* count, - T* positions) -{ - // thread IDs range per block, so also need the block id - auto const tid = cudf::detail::grid_1d::global_thread_id(); - auto const did = tid * bytes_per_find_thread; - - char const* raw = (data + did); - - long const byteToProcess = - ((did + bytes_per_find_thread) < size) ? bytes_per_find_thread : (size - did); - - // Process the data - for (long i = 0; i < byteToProcess; i++) { - if (raw[i] == key) { - auto const idx = atomicAdd(count, static_cast(1)); - setElement(positions, idx, did + offset + i, key); - } - } -} - -} // namespace - -template -cudf::size_type find_all_from_set(device_span data, - std::vector const& keys, - uint64_t result_offset, - T* positions, - rmm::cuda_stream_view stream) -{ - int block_size = 0; // suggested thread count to use - int min_grid_size = 0; // minimum block count required - CUDF_CUDA_TRY( - cudaOccupancyMaxPotentialBlockSize(&min_grid_size, &block_size, count_and_set_positions)); - int const grid_size = divCeil(data.size(), (size_t)block_size); - - auto d_count = cudf::detail::make_zeroed_device_uvector_async( - 1, stream, rmm::mr::get_current_device_resource()); - for (char key : keys) { - count_and_set_positions<<>>( - data.data(), data.size(), result_offset, key, d_count.data(), positions); - } - - return cudf::detail::make_std_vector_sync(d_count, stream)[0]; -} - -template -cudf::size_type find_all_from_set(host_span data, - std::vector const& keys, - uint64_t result_offset, - T* positions, - rmm::cuda_stream_view stream) -{ - rmm::device_buffer d_chunk(std::min(max_chunk_bytes, data.size()), stream); - auto d_count = cudf::detail::make_zeroed_device_uvector_async( - 1, stream, rmm::mr::get_current_device_resource()); - - int block_size = 0; // suggested thread count to use - int min_grid_size = 0; // minimum block count required - CUDF_CUDA_TRY( - cudaOccupancyMaxPotentialBlockSize(&min_grid_size, &block_size, count_and_set_positions)); - - size_t const chunk_count = divCeil(data.size(), max_chunk_bytes); - for (size_t ci = 0; ci < chunk_count; ++ci) { - auto const chunk_offset = ci * max_chunk_bytes; - auto const h_chunk = data.data() + chunk_offset; - int const chunk_bytes = std::min((size_t)(data.size() - ci * max_chunk_bytes), max_chunk_bytes); - auto const chunk_bits = divCeil(chunk_bytes, bytes_per_find_thread); - int const grid_size = divCeil(chunk_bits, block_size); - - // Copy chunk to device - CUDF_CUDA_TRY( - cudaMemcpyAsync(d_chunk.data(), h_chunk, chunk_bytes, cudaMemcpyDefault, stream.value())); - - for (char key : keys) { - count_and_set_positions - <<>>(static_cast(d_chunk.data()), - chunk_bytes, - chunk_offset + result_offset, - key, - d_count.data(), - positions); - } - } - - return cudf::detail::make_std_vector_sync(d_count, stream)[0]; -} - -template cudf::size_type find_all_from_set(device_span data, - std::vector const& keys, - uint64_t result_offset, - uint64_t* positions, - rmm::cuda_stream_view stream); - -template cudf::size_type find_all_from_set(device_span data, - std::vector const& keys, - uint64_t result_offset, - pos_key_pair* positions, - rmm::cuda_stream_view stream); - -template cudf::size_type find_all_from_set(host_span data, - std::vector const& keys, - uint64_t result_offset, - uint64_t* positions, - rmm::cuda_stream_view stream); - -template cudf::size_type find_all_from_set(host_span data, - std::vector const& keys, - uint64_t result_offset, - pos_key_pair* positions, - rmm::cuda_stream_view stream); - -cudf::size_type count_all_from_set(device_span data, - std::vector const& keys, - rmm::cuda_stream_view stream) -{ - return find_all_from_set(data, keys, 0, nullptr, stream); -} - -cudf::size_type count_all_from_set(host_span data, - std::vector const& keys, - rmm::cuda_stream_view stream) -{ - return find_all_from_set(data, keys, 0, nullptr, stream); -} - -} // namespace io -} // namespace cudf diff --git a/cpp/src/io/utilities/parsing_utils.cuh b/cpp/src/io/utilities/parsing_utils.cuh index faee05541cc..bc2722441d0 100644 --- a/cpp/src/io/utilities/parsing_utils.cuh +++ b/cpp/src/io/utilities/parsing_utils.cuh @@ -414,82 +414,6 @@ __device__ __inline__ cudf::size_type* infer_integral_field_counter(char const* } // namespace gpu -/** - * @brief Searches the input character array for each of characters in a set. - * Sums up the number of occurrences. If the 'positions' parameter is not void*, - * positions of all occurrences are stored in the output device array. - * - * @param[in] d_data Input character array in device memory - * @param[in] keys Vector containing the keys to count in the buffer - * @param[in] result_offset Offset to add to the output positions - * @param[out] positions Array containing the output positions - * @param[in] stream CUDA stream used for device memory operations and kernel launches - * - * @return cudf::size_type total number of occurrences - */ -template -cudf::size_type find_all_from_set(device_span data, - std::vector const& keys, - uint64_t result_offset, - T* positions, - rmm::cuda_stream_view stream); - -/** - * @brief Searches the input character array for each of characters in a set. - * Sums up the number of occurrences. If the 'positions' parameter is not void*, - * positions of all occurrences are stored in the output device array. - * - * Does not load the entire file into the GPU memory at any time, so it can - * be used to parse large files. Output array needs to be preallocated. - * - * @param[in] h_data Pointer to the input character array - * @param[in] h_size Number of bytes in the input array - * @param[in] keys Vector containing the keys to count in the buffer - * @param[in] result_offset Offset to add to the output positions - * @param[out] positions Array containing the output positions - * @param[in] stream CUDA stream used for device memory operations and kernel launches - * - * @return cudf::size_type total number of occurrences - */ -template -cudf::size_type find_all_from_set(host_span data, - std::vector const& keys, - uint64_t result_offset, - T* positions, - rmm::cuda_stream_view stream); - -/** - * @brief Searches the input character array for each of characters in a set - * and sums up the number of occurrences. - * - * @param d_data Input data buffer in device memory - * @param keys Vector containing the keys to count in the buffer - * @param stream CUDA stream used for device memory operations and kernel launches - * - * @return cudf::size_type total number of occurrences - */ -cudf::size_type count_all_from_set(device_span data, - std::vector const& keys, - rmm::cuda_stream_view stream); - -/** - * @brief Searches the input character array for each of characters in a set - * and sums up the number of occurrences. - * - * Does not load the entire buffer into the GPU memory at any time, so it can - * be used with buffers of any size. - * - * @param h_data Pointer to the data in host memory - * @param h_size Size of the input data, in bytes - * @param keys Vector containing the keys to count in the buffer - * @param stream CUDA stream used for device memory operations and kernel launches - * - * @return cudf::size_type total number of occurrences - */ -cudf::size_type count_all_from_set(host_span data, - std::vector const& keys, - rmm::cuda_stream_view stream); - /** * @brief Checks whether the given character is a whitespace character. * diff --git a/cpp/src/text/tokenize.cu b/cpp/src/text/tokenize.cu index 0b16305a81a..25406bce759 100644 --- a/cpp/src/text/tokenize.cu +++ b/cpp/src/text/tokenize.cu @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -35,6 +36,7 @@ #include #include +#include #include #include #include @@ -99,6 +101,31 @@ std::unique_ptr tokenize_fn(cudf::size_type strings_count, return cudf::strings::detail::make_strings_column(tokens.begin(), tokens.end(), stream, mr); } +constexpr int64_t block_size = 512; // number of threads per block +constexpr int64_t bytes_per_thread = 4; // bytes processed per thread + +CUDF_KERNEL void count_characters(uint8_t const* d_chars, int64_t chars_bytes, int64_t* d_output) +{ + auto const idx = cudf::detail::grid_1d::global_thread_id(); + auto const byte_idx = static_cast(idx) * bytes_per_thread; + auto const lane_idx = static_cast(threadIdx.x); + + using block_reduce = cub::BlockReduce; + __shared__ typename block_reduce::TempStorage temp_storage; + + int64_t count = 0; + // each thread processes multiple bytes + for (auto i = byte_idx; (i < (byte_idx + bytes_per_thread)) && (i < chars_bytes); ++i) { + count += cudf::strings::detail::is_begin_utf8_char(d_chars[i]); + } + auto const total = block_reduce(temp_storage).Reduce(count, cub::Sum()); + + if ((lane_idx == 0) && (total > 0)) { + cuda::atomic_ref ref{*d_output}; + ref.fetch_add(total, cuda::std::memory_order_relaxed); + } +} + } // namespace // detail APIs @@ -176,11 +203,17 @@ std::unique_ptr character_tokenize(cudf::strings_column_view const return cudf::make_empty_column(cudf::data_type{cudf::type_id::STRING}); } - auto offsets = strings_column.offsets(); - auto offset = cudf::strings::detail::get_offset_value(offsets, strings_column.offset(), stream); - auto chars_bytes = cudf::strings::detail::get_offset_value( - offsets, strings_column.offset() + strings_count, stream) - - offset; + CUDF_EXPECTS( + strings_column.null_count() == 0, "input must not contain nulls", std::invalid_argument); + + auto const offsets = strings_column.offsets(); + auto const offset = + cudf::strings::detail::get_offset_value(offsets, strings_column.offset(), stream); + auto const chars_bytes = cudf::strings::detail::get_offset_value( + offsets, strings_column.offset() + strings_count, stream) - + offset; + // no bytes -- this could happen in an all-empty column + if (chars_bytes == 0) { return cudf::make_empty_column(cudf::type_id::STRING); } auto d_chars = strings_column.parent().data(); // unsigned is necessary for checking bits d_chars += offset; @@ -188,23 +221,26 @@ std::unique_ptr character_tokenize(cudf::strings_column_view const // To minimize memory, count the number of characters so we can // build the output offsets without an intermediate buffer. // In the worst case each byte is a character so the output is 4x the input. - cudf::size_type num_characters = thrust::count_if( - rmm::exec_policy(stream), d_chars, d_chars + chars_bytes, [] __device__(uint8_t byte) { - return cudf::strings::detail::is_begin_utf8_char(byte); - }); + rmm::device_scalar d_count(0, stream); + auto const num_blocks = cudf::util::div_rounding_up_safe( + cudf::util::div_rounding_up_safe(chars_bytes, static_cast(bytes_per_thread)), + block_size); + count_characters<<>>( + d_chars, chars_bytes, d_count.data()); + auto const num_characters = d_count.value(stream); - // no characters check -- this could happen in all-empty or all-null strings column - if (num_characters == 0) { - return cudf::make_empty_column(cudf::data_type{cudf::type_id::STRING}); - } + // number of characters becomes the number of rows so need to check the row limit + CUDF_EXPECTS( + num_characters + 1 < static_cast(std::numeric_limits::max()), + "output exceeds the column size limit", + std::overflow_error); // create output offsets column - // -- conditionally copy a counting iterator where - // the first byte of each character is located auto offsets_column = cudf::make_numeric_column( offsets.type(), num_characters + 1, cudf::mask_state::UNALLOCATED, stream, mr); auto d_new_offsets = cudf::detail::offsetalator_factory::make_output_iterator(offsets_column->mutable_view()); + // offsets are at the beginning byte of each character cudf::detail::copy_if_safe( thrust::counting_iterator(0), thrust::counting_iterator(chars_bytes + 1), diff --git a/cpp/src/unary/cast_ops.cu b/cpp/src/unary/cast_ops.cu index 98c412f805d..64427326d87 100644 --- a/cpp/src/unary/cast_ops.cu +++ b/cpp/src/unary/cast_ops.cu @@ -15,11 +15,13 @@ */ #include +#include #include #include #include #include #include +#include #include #include #include @@ -219,6 +221,28 @@ std::unique_ptr rescale(column_view input, } }; +/** + * @brief Check if a floating point value is convertible to fixed point type. + * + * A floating point value is convertible if it is not null, not `NaN`, and not `inf`. + * + * Note that convertible input values may be out of the representable range of the target fixed + * point type. Values out of the representable range need to be checked separately. + */ +template +struct is_convertible_floating_point { + column_device_view d_input; + + bool __device__ operator()(size_type idx) const + { + static_assert(std::is_floating_point_v); + + if (d_input.is_null(idx)) { return false; } + auto const value = d_input.element(idx); + return std::isfinite(value); + } +}; + template struct dispatch_unary_cast_to { column_view input; @@ -294,8 +318,8 @@ struct dispatch_unary_cast_to { std::make_unique(type, size, rmm::device_buffer{size * cudf::size_of(type), stream, mr}, - detail::copy_bitmask(input, stream, mr), - input.null_count()); + rmm::device_buffer{}, + 0); mutable_column_view output_mutable = *output; @@ -308,6 +332,21 @@ struct dispatch_unary_cast_to { output_mutable.begin(), fixed_point_unary_cast{scale}); + if constexpr (cudf::is_floating_point()) { + // For floating-point values, beside input nulls, we also need to set nulls for the output + // rows corresponding to NaN and inf in the input. + auto const d_input_ptr = column_device_view::create(input, stream); + auto [null_mask, null_count] = + cudf::detail::valid_if(thrust::make_counting_iterator(0), + thrust::make_counting_iterator(size), + is_convertible_floating_point{*d_input_ptr}, + stream, + mr); + if (null_count > 0) { output->set_null_mask(std::move(null_mask), null_count); } + } else { + output->set_null_mask(detail::copy_bitmask(input, stream, mr), input.null_count()); + } + return output; } diff --git a/cpp/tests/text/tokenize_tests.cpp b/cpp/tests/text/tokenize_tests.cpp index 6a6bcda87cc..a59a54169d7 100644 --- a/cpp/tests/text/tokenize_tests.cpp +++ b/cpp/tests/text/tokenize_tests.cpp @@ -111,17 +111,13 @@ TEST_F(TextTokenizeTest, TokenizeErrorTest) TEST_F(TextTokenizeTest, CharacterTokenize) { - std::vector h_strings{"the mousé ate the cheese", nullptr, ""}; - cudf::test::strings_column_wrapper strings( - h_strings.begin(), - h_strings.end(), - thrust::make_transform_iterator(h_strings.begin(), [](auto str) { return str != nullptr; })); + cudf::test::strings_column_wrapper input({"the mousé ate the cheese", ""}); cudf::test::strings_column_wrapper expected{"t", "h", "e", " ", "m", "o", "u", "s", "é", " ", "a", "t", "e", " ", "t", "h", "e", " ", "c", "h", "e", "e", "s", "e"}; - auto results = nvtext::character_tokenize(cudf::strings_column_view(strings)); + auto results = nvtext::character_tokenize(cudf::strings_column_view(input)); CUDF_TEST_EXPECT_COLUMNS_EQUAL(*results, expected); } @@ -151,8 +147,6 @@ TEST_F(TextTokenizeTest, TokenizeEmptyTest) EXPECT_EQ(results->size(), 0); results = nvtext::character_tokenize(all_empty); EXPECT_EQ(results->size(), 0); - results = nvtext::character_tokenize(all_null); - EXPECT_EQ(results->size(), 0); auto const delimiter = cudf::string_scalar{""}; results = nvtext::tokenize_with_vocabulary(view, all_empty, delimiter); EXPECT_EQ(results->size(), 0); diff --git a/cpp/tests/unary/cast_tests.cpp b/cpp/tests/unary/cast_tests.cpp index a82449ffc10..ebeafc82039 100644 --- a/cpp/tests/unary/cast_tests.cpp +++ b/cpp/tests/unary/cast_tests.cpp @@ -665,6 +665,27 @@ TYPED_TEST(FixedPointTests, CastFromDouble) CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, result->view()); } +TYPED_TEST(FixedPointTests, CastFromDoubleWithNaNAndInf) +{ + using namespace numeric; + using decimalXX = TypeParam; + using RepType = cudf::device_storage_type_t; + using fp_wrapper = cudf::test::fixed_point_column_wrapper; + using fw_wrapper = cudf::test::fixed_width_column_wrapper; + + auto const NaN = std::numeric_limits::quiet_NaN(); + auto const inf = std::numeric_limits::infinity(); + auto const null = 0; + + auto const input = fw_wrapper{1.729, -inf, NaN, 172.9, -inf, NaN, inf, 1.23, inf}; + auto const expected = fp_wrapper{{1729, null, null, 172900, null, null, null, 1230, null}, + {true, false, false, true, false, false, false, true, false}, + scale_type{-3}}; + auto const result = cudf::cast(input, make_fixed_point_data_type(-3)); + + CUDF_TEST_EXPECT_COLUMNS_EQUAL(expected, result->view()); +} + TYPED_TEST(FixedPointTests, CastFromDoubleLarge) { using namespace numeric; diff --git a/docs/cudf/source/_ext/PandasCompat.py b/docs/cudf/source/_ext/PandasCompat.py index af2b16035c3..331495c981e 100644 --- a/docs/cudf/source/_ext/PandasCompat.py +++ b/docs/cudf/source/_ext/PandasCompat.py @@ -1,14 +1,20 @@ -# Copyright (c) 2021-2022, NVIDIA CORPORATION +# Copyright (c) 2021-2024, NVIDIA CORPORATION # This file is adapted from official sphinx tutorial for `todo` extension: # https://www.sphinx-doc.org/en/master/development/tutorials/todo.html +from __future__ import annotations + +from typing import cast from docutils import nodes +from docutils.nodes import Element from docutils.parsers.rst import Directive -from sphinx.locale import get_translation -from sphinx.util.docutils import SphinxDirective - -translator = get_translation("sphinx") +from docutils.parsers.rst.directives.admonitions import BaseAdmonition +from sphinx import addnodes +from sphinx.domains import Domain +from sphinx.errors import NoUri +from sphinx.locale import _ as get_translation_sphinx +from sphinx.util.docutils import SphinxDirective, new_document class PandasCompat(nodes.Admonition, nodes.Element): @@ -32,7 +38,7 @@ def run(self): return [PandasCompatList("")] -class PandasCompatDirective(SphinxDirective): +class PandasCompatDirective(BaseAdmonition, SphinxDirective): # this enables content in the directive has_content = True @@ -43,9 +49,11 @@ def run(self): PandasCompat_node = PandasCompat("\n".join(self.content)) PandasCompat_node += nodes.title( - translator("Pandas Compatibility Note"), - translator("Pandas Compatibility Note"), + get_translation_sphinx("Pandas Compatibility Note"), + get_translation_sphinx("Pandas Compatibility Note"), ) + PandasCompat_node["docname"] = self.env.docname + PandasCompat_node["target"] = targetnode self.state.nested_parse( self.content, self.content_offset, PandasCompat_node ) @@ -84,71 +92,104 @@ def merge_PandasCompats(app, env, docnames, other): ) -def process_PandasCompat_nodes(app, doctree, fromdocname): - if not app.config.include_pandas_compat: - for node in doctree.traverse(PandasCompat): - node.parent.remove(node) +class PandasCompatDomain(Domain): + name = "pandascompat" + label = "pandascompat" - # Replace all PandasCompatList nodes with a list of the collected - # PandasCompats. Augment each PandasCompat with a backlink to the - # original location. - env = app.builder.env + @property + def pandascompats(self): + return self.data.setdefault("pandascompats", {}) - if not hasattr(env, "PandasCompat_all_pandas_compat"): - env.PandasCompat_all_pandas_compat = [] + def clear_doc(self, docname): + self.pandascompats.pop(docname, None) + + def merge_domaindata(self, docnames, otherdata): + for docname in docnames: + self.pandascompats[docname] = otherdata["pandascompats"][docname] + + def process_doc(self, env, docname, document): + pandascompats = self.pandascompats.setdefault(docname, []) + for pandascompat in document.findall(PandasCompat): + env.app.emit("pandascompat-defined", pandascompat) + pandascompats.append(pandascompat) - for node in doctree.traverse(PandasCompatList): - if not app.config.include_pandas_compat: - node.replace_self([]) - continue - content = [] +class PandasCompatListProcessor: + def __init__(self, app, doctree, docname): + self.builder = app.builder + self.config = app.config + self.env = app.env + self.domain = cast(PandasCompatDomain, app.env.get_domain("pandascompat")) + self.document = new_document("") + self.process(doctree, docname) - for PandasCompat_info in env.PandasCompat_all_pandas_compat: - para = nodes.paragraph() + def process(self, doctree: nodes.document, docname: str) -> None: + pandascompats = [v for vals in self.domain.pandascompats.values() for v in vals] + for node in doctree.findall(PandasCompatList): + if not self.config.include_pandas_compat: + node.parent.remove(node) + continue - # Create a reference back to the original docstring - newnode = nodes.reference("", "") - innernode = nodes.emphasis( - translator("[source]"), translator("[source]") - ) - newnode["refdocname"] = PandasCompat_info["docname"] - newnode["refuri"] = app.builder.get_relative_uri( - fromdocname, PandasCompat_info["docname"] - ) - newnode["refuri"] += "#" + PandasCompat_info["target"]["refid"] - newnode.append(innernode) - para += newnode + content: list[Element | None] = [nodes.target()] if node.get("ids") else [] - # Insert the reference node into PandasCompat node - # Note that this node is a deepcopy from the original copy - # in the docstring, so changing this does not affect that in the - # doc. - PandasCompat_info["PandasCompat"].append(para) + for pandascompat in pandascompats: + # Create a copy of the pandascompat node + new_pandascompat = pandascompat.deepcopy() + new_pandascompat["ids"].clear() - # Insert the PandasCompand node into the PandasCompatList Node - content.append(PandasCompat_info["PandasCompat"]) + self.resolve_reference(new_pandascompat, docname) + content.append(new_pandascompat) - node.replace_self(content) + ref = self.create_reference(pandascompat, docname) + content.append(ref) + + node.replace_self(content) + + def create_reference(self, pandascompat, docname): + para = nodes.paragraph() + newnode = nodes.reference("", "") + innernode = nodes.emphasis( + get_translation_sphinx("[source]"), get_translation_sphinx("[source]") + ) + newnode["refdocname"] = pandascompat["docname"] + try: + newnode["refuri"] = self.builder.get_relative_uri( + docname, pandascompat["docname"] + ) + "#" + pandascompat["target"]["refid"] + except NoUri: + # ignore if no URI can be determined, e.g. for LaTeX output + pass + newnode.append(innernode) + para += newnode + return para + + def resolve_reference(self, todo, docname: str) -> None: + """Resolve references in the todo content.""" + for node in todo.findall(addnodes.pending_xref): + if "refdoc" in node: + node["refdoc"] = docname + + # Note: To resolve references, it is needed to wrap it with document node + self.document += todo + self.env.resolve_references(self.document, docname, self.builder) + self.document.remove(todo) def setup(app): app.add_config_value("include_pandas_compat", False, "html") - app.add_node(PandasCompatList) app.add_node( PandasCompat, html=(visit_PandasCompat_node, depart_PandasCompat_node), latex=(visit_PandasCompat_node, depart_PandasCompat_node), text=(visit_PandasCompat_node, depart_PandasCompat_node), + man=(visit_PandasCompat_node, depart_PandasCompat_node), + texinfo=(visit_PandasCompat_node, depart_PandasCompat_node), ) - - # Sphinx directives are lower-cased app.add_directive("pandas-compat", PandasCompatDirective) app.add_directive("pandas-compat-list", PandasCompatListDirective) - app.connect("doctree-resolved", process_PandasCompat_nodes) - app.connect("env-purge-doc", purge_PandasCompats) - app.connect("env-merge-info", merge_PandasCompats) + app.add_domain(PandasCompatDomain) + app.connect("doctree-resolved", PandasCompatListProcessor) return { "version": "0.1", diff --git a/docs/cudf/source/conf.py b/docs/cudf/source/conf.py index 73d8b4445d3..e9c760e288e 100644 --- a/docs/cudf/source/conf.py +++ b/docs/cudf/source/conf.py @@ -617,6 +617,8 @@ def linkcode_resolve(domain, info) -> str | None: f"branch-{version}/python/cudf/cudf/{fn}{linespec}" ) +# Needed for avoid build warning for PandasCompat extension +suppress_warnings = ["myst.domains"] def setup(app): app.add_css_file("https://docs.rapids.ai/assets/css/custom.css") diff --git a/docs/cudf/source/developer_guide/cudf_pandas.md b/docs/cudf/source/developer_guide/cudf_pandas.md index aeb43f66b2d..827ba18a4a4 100644 --- a/docs/cudf/source/developer_guide/cudf_pandas.md +++ b/docs/cudf/source/developer_guide/cudf_pandas.md @@ -3,8 +3,16 @@ The use of the cuDF pandas accelerator mode (`cudf.pandas`) is explained [in the The purpose of this document is to explain how the fast-slow proxy mechanism works and document internal environment variables that can be used to debug `cudf.pandas` itself. ## fast-slow proxy mechanism -`cudf.pandas` works by wrapping each Pandas type and its corresponding cuDF type in a new proxy type also known as a fast-slow proxy type. -The purpose of proxy types is to attempt computations on the fast (cuDF) object first, and then fall back to running on the slow (Pandas) object if the fast version fails. +The core of `cudf.pandas` is implemented through proxy types defined in [`fast_slow_proxy.py`](https://github.com/rapidsai/cudf/blob/5f45803b2a68b49d330d94e2f701791a7590612a/python/cudf/cudf/pandas/fast_slow_proxy.py), which link a pair of "fast" and "slow" libraries. +`cudf.pandas` works by wrapping each "slow" type and its corresponding "fast" type in a new proxy type, also known as a fast-slow proxy type. +The purpose of these proxy types is so we can first attempt computations on the fast object, and then fall back to the slow object if the fast version fails. +While the core wrapping functionality is generic, the current usage mainly involves providing a proxy pair using cuDF and Pandas. +In the rest of this document, to maintain a concrete pair of libraries in mind, we use cuDF and Pandas interchangeably as names for the "fast" and "slow" libraries, respectively, with the understanding that any pair of API-matching libraries could be used. +For example, future support could include pairs such as CuPy (as the "fast" library) and NumPy (as the "slow" library). + +```{note} +We currently do not wrap the entire NumPy library because it exposes a C API. But we do wrap NumPy's `numpy.ndarray` and CuPy's `cupy.ndarray` in a proxy type. +``` ### Types: #### Wrapped Types and Proxy Types diff --git a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd index fb98650308a..36654457995 100644 --- a/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd @@ -66,24 +66,19 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: cdef cudf_io_types.table_with_metadata read_parquet( parquet_reader_options args) except + - cdef cppclass parquet_writer_options: - parquet_writer_options() except + + cdef cppclass parquet_writer_options_base: + parquet_writer_options_base() except + cudf_io_types.sink_info get_sink_info() except + cudf_io_types.compression_type get_compression() except + cudf_io_types.statistics_freq get_stats_level() except + - cudf_table_view.table_view get_table() except + const optional[cudf_io_types.table_input_metadata]& get_metadata( ) except + - string get_column_chunks_file_paths() except + size_t get_row_group_size_bytes() except + size_type get_row_group_size_rows() except + size_t get_max_page_size_bytes() except + size_type get_max_page_size_rows() except + size_t get_max_dictionary_size() except + - void set_partitions( - vector[cudf_io_types.partition_info] partitions - ) except + void set_metadata( cudf_io_types.table_input_metadata m ) except + @@ -96,9 +91,6 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: void set_compression( cudf_io_types.compression_type compression ) except + - void set_column_chunks_file_paths( - vector[string] column_chunks_file_paths - ) except + void set_int96_timestamps( bool enabled ) except + @@ -113,161 +105,112 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: void enable_write_v2_headers(bool val) except + void set_dictionary_policy(cudf_io_types.dictionary_policy policy) except + + cdef cppclass parquet_writer_options(parquet_writer_options_base): + parquet_writer_options() except + + cudf_table_view.table_view get_table() except + + string get_column_chunks_file_paths() except + + void set_partitions( + vector[cudf_io_types.partition_info] partitions + ) except + + void set_column_chunks_file_paths( + vector[string] column_chunks_file_paths + ) except + + @staticmethod parquet_writer_options_builder builder( cudf_io_types.sink_info sink_, cudf_table_view.table_view table_ ) except + - cdef cppclass parquet_writer_options_builder: - + cdef cppclass parquet_writer_options_builder_base[BuilderT, OptionsT]: parquet_writer_options_builder() except + - parquet_writer_options_builder( - cudf_io_types.sink_info sink_, - cudf_table_view.table_view table_ - ) except + - parquet_writer_options_builder& partitions( - vector[cudf_io_types.partition_info] partitions - ) except + - parquet_writer_options_builder& metadata( + + BuilderT& metadata( cudf_io_types.table_input_metadata m ) except + - parquet_writer_options_builder& key_value_metadata( + BuilderT& key_value_metadata( vector[map[string, string]] kvm ) except + - parquet_writer_options_builder& stats_level( + BuilderT& stats_level( cudf_io_types.statistics_freq sf ) except + - parquet_writer_options_builder& compression( + BuilderT& compression( cudf_io_types.compression_type compression ) except + - parquet_writer_options_builder& column_chunks_file_paths( - vector[string] column_chunks_file_paths - ) except + - parquet_writer_options_builder& int96_timestamps( + BuilderT& int96_timestamps( bool enabled ) except + - parquet_writer_options_builder& utc_timestamps( + BuilderT& utc_timestamps( bool enabled ) except + - parquet_writer_options_builder& row_group_size_bytes( + BuilderT& row_group_size_bytes( size_t val ) except + - parquet_writer_options_builder& row_group_size_rows( + BuilderT& row_group_size_rows( size_type val ) except + - parquet_writer_options_builder& max_page_size_bytes( + BuilderT& max_page_size_bytes( size_t val ) except + - parquet_writer_options_builder& max_page_size_rows( + BuilderT& max_page_size_rows( size_type val ) except + - parquet_writer_options_builder& max_dictionary_size( + BuilderT& max_dictionary_size( size_t val ) except + - parquet_writer_options_builder& write_v2_headers( + BuilderT& write_v2_headers( bool val ) except + - parquet_writer_options_builder& dictionary_policy( + BuilderT& dictionary_policy( cudf_io_types.dictionary_policy val ) except + + # FIXME: the following two functions actually belong in + # parquet_writer_options_builder, but placing them there yields a + # "'parquet_writer_options_builder' is not a type identifier" error. + # This is probably a bug in cython since a simpler CRTP example that + # has methods returning references to a child class seem to work. + # Calling these from the chunked options builder will fail at compile + # time, so this should be safe. + # NOTE: these two are never actually called from libcudf. Instead these + # properties are set in the options after calling build(), so perhaps + # they can be removed. + BuilderT& partitions( + vector[cudf_io_types.partition_info] partitions + ) except + + BuilderT& column_chunks_file_paths( + vector[string] column_chunks_file_paths + ) except + + OptionsT build() except + - parquet_writer_options build() except + + cdef cppclass parquet_writer_options_builder( + parquet_writer_options_builder_base[parquet_writer_options_builder, + parquet_writer_options]): + parquet_writer_options_builder() except + + parquet_writer_options_builder( + cudf_io_types.sink_info sink_, + cudf_table_view.table_view table_ + ) except + cdef unique_ptr[vector[uint8_t]] write_parquet( parquet_writer_options args ) except + - cdef cppclass chunked_parquet_writer_options: + cdef cppclass chunked_parquet_writer_options(parquet_writer_options_base): chunked_parquet_writer_options() except + - cudf_io_types.sink_info get_sink() except + - cudf_io_types.compression_type get_compression() except + - cudf_io_types.statistics_freq get_stats_level() except + - const optional[cudf_io_types.table_input_metadata]& get_metadata( - ) except + - size_t get_row_group_size_bytes() except + - size_type get_row_group_size_rows() except + - size_t get_max_page_size_bytes() except + - size_type get_max_page_size_rows() except + - size_t get_max_dictionary_size() except + - - void set_metadata( - cudf_io_types.table_input_metadata m - ) except + - void set_key_value_metadata( - vector[map[string, string]] kvm - ) except + - void set_stats_level( - cudf_io_types.statistics_freq sf - ) except + - void set_compression( - cudf_io_types.compression_type compression - ) except + - void set_int96_timestamps( - bool enabled - ) except + - void set_utc_timestamps( - bool enabled - ) except + - void set_row_group_size_bytes(size_t val) except + - void set_row_group_size_rows(size_type val) except + - void set_max_page_size_bytes(size_t val) except + - void set_max_page_size_rows(size_type val) except + - void set_max_dictionary_size(size_t val) except + - void enable_write_v2_headers(bool val) except + - void set_dictionary_policy(cudf_io_types.dictionary_policy policy) except + @staticmethod chunked_parquet_writer_options_builder builder( cudf_io_types.sink_info sink_, ) except + - cdef cppclass chunked_parquet_writer_options_builder: + cdef cppclass chunked_parquet_writer_options_builder( + parquet_writer_options_builder_base[chunked_parquet_writer_options_builder, + chunked_parquet_writer_options] + ): chunked_parquet_writer_options_builder() except + chunked_parquet_writer_options_builder( cudf_io_types.sink_info sink_, ) except + - chunked_parquet_writer_options_builder& metadata( - cudf_io_types.table_input_metadata m - ) except + - chunked_parquet_writer_options_builder& key_value_metadata( - vector[map[string, string]] kvm - ) except + - chunked_parquet_writer_options_builder& stats_level( - cudf_io_types.statistics_freq sf - ) except + - chunked_parquet_writer_options_builder& compression( - cudf_io_types.compression_type compression - ) except + - chunked_parquet_writer_options_builder& int96_timestamps( - bool enabled - ) except + - chunked_parquet_writer_options_builder& utc_timestamps( - bool enabled - ) except + - chunked_parquet_writer_options_builder& row_group_size_bytes( - size_t val - ) except + - chunked_parquet_writer_options_builder& row_group_size_rows( - size_type val - ) except + - chunked_parquet_writer_options_builder& max_page_size_bytes( - size_t val - ) except + - chunked_parquet_writer_options_builder& max_page_size_rows( - size_type val - ) except + - chunked_parquet_writer_options_builder& max_dictionary_size( - size_t val - ) except + - parquet_writer_options_builder& write_v2_headers( - bool val - ) except + - parquet_writer_options_builder& dictionary_policy( - cudf_io_types.dictionary_policy val - ) except + - - chunked_parquet_writer_options build() except + cdef cppclass parquet_chunked_writer: parquet_chunked_writer() except + diff --git a/python/cudf/cudf/_lib/pylibcudf/libcudf/strings_udf.pxd b/python/cudf/cudf/_lib/pylibcudf/libcudf/strings_udf.pxd index b895d5e6925..804ad30dfb1 100644 --- a/python/cudf/cudf/_lib/pylibcudf/libcudf/strings_udf.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/libcudf/strings_udf.pxd @@ -18,6 +18,7 @@ cdef extern from "cudf/strings/udf/udf_string.hpp" namespace \ cdef extern from "cudf/strings/udf/udf_apis.hpp" namespace \ "cudf::strings::udf" nogil: + cdef int get_cuda_build_version() except + cdef unique_ptr[device_buffer] to_string_view_array(column_view) except + cdef unique_ptr[column] column_from_udf_string_array( udf_string* strings, size_type size, diff --git a/python/cudf/cudf/_lib/strings_udf.pyx b/python/cudf/cudf/_lib/strings_udf.pyx index e952492c45d..7610cad0b40 100644 --- a/python/cudf/cudf/_lib/strings_udf.pyx +++ b/python/cudf/cudf/_lib/strings_udf.pyx @@ -22,11 +22,16 @@ from cudf._lib.pylibcudf.libcudf.column.column cimport column, column_view from cudf._lib.pylibcudf.libcudf.strings_udf cimport ( column_from_udf_string_array as cpp_column_from_udf_string_array, free_udf_string_array as cpp_free_udf_string_array, + get_cuda_build_version as cpp_get_cuda_build_version, to_string_view_array as cpp_to_string_view_array, udf_string, ) +def get_cuda_build_version(): + return cpp_get_cuda_build_version() + + def column_to_string_view_array(Column strings_col): cdef unique_ptr[device_buffer] c_buffer cdef column_view input_view = strings_col.view() diff --git a/python/cudf/cudf/core/_base_index.py b/python/cudf/cudf/core/_base_index.py index baca7b19e58..5d0f7c4ede4 100644 --- a/python/cudf/cudf/core/_base_index.py +++ b/python/cudf/cudf/core/_base_index.py @@ -2072,12 +2072,7 @@ def dropna(self, how="any"): pass # This is to be consistent with IndexedFrame.dropna to handle nans # as nulls by default - data_columns = [ - col.nans_to_nulls() - if isinstance(col, cudf.core.column.NumericalColumn) - else col - for col in self._columns - ] + data_columns = [col.nans_to_nulls() for col in self._columns] return self._from_columns_like_self( drop_nulls( diff --git a/python/cudf/cudf/core/column/categorical.py b/python/cudf/cudf/core/column/categorical.py index 1828c5ce97b..de20b2ace1d 100644 --- a/python/cudf/cudf/core/column/categorical.py +++ b/python/cudf/cudf/core/column/categorical.py @@ -816,10 +816,8 @@ def to_pandas( .values_host ) - cats = col.categories - if cats.dtype.kind in "biuf": - cats = cats.nans_to_nulls().dropna() # type: ignore[attr-defined] - elif not isinstance(cats.dtype, IntervalDtype): + cats = col.categories.nans_to_nulls() + if not isinstance(cats.dtype, IntervalDtype): # leaving out dropna because it temporarily changes an interval # index into a struct and throws off results. # TODO: work on interval index dropna diff --git a/python/cudf/cudf/core/column/column.py b/python/cudf/cudf/core/column/column.py index 68079371b85..f87797a1fa3 100644 --- a/python/cudf/cudf/core/column/column.py +++ b/python/cudf/cudf/core/column/column.py @@ -281,7 +281,7 @@ def any(self, skipna: bool = True) -> bool: return libcudf.reduce.reduce("any", self, dtype=np.bool_) - def dropna(self) -> ColumnBase: + def dropna(self) -> Self: return drop_nulls([self])[0]._with_type_metadata(self.dtype) def to_arrow(self) -> pa.Array: @@ -332,10 +332,6 @@ def from_arrow(cls, array: pa.Array) -> ColumnBase: "yet supported in pyarrow, see: " "https://github.com/apache/arrow/issues/20213" ) - elif pa.types.is_timestamp(array.type) and array.type.tz is not None: - raise NotImplementedError( - "cuDF does not yet support timezone-aware datetimes" - ) elif isinstance(array.type, ArrowIntervalType): return cudf.core.column.IntervalColumn.from_arrow(array) elif pa.types.is_large_string(array.type): @@ -695,7 +691,9 @@ def fillna( Returns a copy with null filled. """ return libcudf.replace.replace_nulls( - input_col=self, replacement=fill_value, method=method + input_col=self.nans_to_nulls(), + replacement=fill_value, + method=method, )._with_type_metadata(self.dtype) def isnull(self) -> ColumnBase: @@ -990,9 +988,9 @@ def astype(self, dtype: Dtype, copy: bool = False) -> ColumnBase: return col elif isinstance(dtype, cudf.core.dtypes.DecimalDtype): return col.as_decimal_column(dtype) - elif np.issubdtype(cast(Any, dtype), np.datetime64): + elif dtype.kind == "M": return col.as_datetime_column(dtype) - elif np.issubdtype(cast(Any, dtype), np.timedelta64): + elif dtype.kind == "m": return col.as_timedelta_column(dtype) elif dtype.kind == "O": if cudf.get_option("mode.pandas_compatible") and was_object: @@ -1240,6 +1238,10 @@ def unary_operator(self, unaryop: str): f"Operation {unaryop} not supported for dtype {self.dtype}." ) + def nans_to_nulls(self: Self) -> Self: + """Convert NaN to NA.""" + return self + def normalize_binop_value( self, other: ScalarLike ) -> Union[ColumnBase, ScalarLike]: @@ -1802,9 +1804,7 @@ def as_column( data = as_buffer(arbitrary, exposed=cudf.get_option("copy_on_write")) col = build_column(data, dtype=arbitrary.dtype, mask=mask) - if ( - nan_as_null or (mask is None and nan_as_null is None) - ) and col.dtype.kind == "f": + if nan_as_null or (mask is None and nan_as_null is None): col = col.nans_to_nulls() if dtype is not None: col = col.astype(dtype) @@ -1842,21 +1842,11 @@ def as_column( and arbitrary.freq is not None ): raise NotImplementedError("freq is not implemented yet") - elif ( - isinstance(arbitrary.dtype, pd.DatetimeTZDtype) - or ( - isinstance(arbitrary.dtype, pd.IntervalDtype) - and isinstance(arbitrary.dtype.subtype, pd.DatetimeTZDtype) - ) - or ( - isinstance(arbitrary.dtype, pd.CategoricalDtype) - and isinstance( - arbitrary.dtype.categories.dtype, pd.DatetimeTZDtype - ) - ) + elif isinstance(arbitrary.dtype, pd.IntervalDtype) and isinstance( + arbitrary.dtype.subtype, pd.DatetimeTZDtype ): raise NotImplementedError( - "cuDF does not yet support timezone-aware datetimes" + "cuDF does not yet support Intervals with timezone-aware datetimes" ) elif _is_pandas_nullable_extension_dtype(arbitrary.dtype): if cudf.get_option("mode.pandas_compatible"): @@ -1872,7 +1862,8 @@ def as_column( length=length, ) elif isinstance( - arbitrary.dtype, (pd.CategoricalDtype, pd.IntervalDtype) + arbitrary.dtype, + (pd.CategoricalDtype, pd.IntervalDtype, pd.DatetimeTZDtype), ): return as_column( pa.array(arbitrary, from_pandas=True), diff --git a/python/cudf/cudf/core/column/numerical.py b/python/cudf/cudf/core/column/numerical.py index fb413959eb9..6fb4f17b76d 100644 --- a/python/cudf/cudf/core/column/numerical.py +++ b/python/cudf/cudf/core/column/numerical.py @@ -536,7 +536,7 @@ def fillna( return col if method is not None: - return super(NumericalColumn, col).fillna(fill_value, method) + return super().fillna(fill_value, method) if fill_value is None: raise ValueError("Must specify either 'fill_value' or 'method'") @@ -545,7 +545,7 @@ def fillna( isinstance(fill_value, cudf.Scalar) and fill_value.dtype == col.dtype ): - return super(NumericalColumn, col).fillna(fill_value, method) + return super().fillna(fill_value, method) if np.isscalar(fill_value): # cast safely to the same dtype as self @@ -572,7 +572,7 @@ def fillna( else: fill_value = fill_value.astype(col.dtype) - return super(NumericalColumn, col).fillna(fill_value, method) + return super().fillna(fill_value, method) def can_cast_safely(self, to_dtype: DtypeObj) -> bool: """ diff --git a/python/cudf/cudf/core/column/numerical_base.py b/python/cudf/cudf/core/column/numerical_base.py index 541c32a2520..d38ec9cf30f 100644 --- a/python/cudf/cudf/core/column/numerical_base.py +++ b/python/cudf/cudf/core/column/numerical_base.py @@ -49,7 +49,7 @@ def kurtosis(self, skipna: Optional[bool] = None) -> float: if len(self) == 0 or self._can_return_nan(skipna=skipna): return cudf.utils.dtypes._get_nan_for_dtype(self.dtype) - self = self.nans_to_nulls().dropna() # type: ignore + self = self.nans_to_nulls().dropna() if len(self) < 4: return cudf.utils.dtypes._get_nan_for_dtype(self.dtype) @@ -74,7 +74,7 @@ def skew(self, skipna: Optional[bool] = None) -> ScalarLike: if len(self) == 0 or self._can_return_nan(skipna=skipna): return cudf.utils.dtypes._get_nan_for_dtype(self.dtype) - self = self.nans_to_nulls().dropna() # type: ignore + self = self.nans_to_nulls().dropna() if len(self) < 3: return cudf.utils.dtypes._get_nan_for_dtype(self.dtype) diff --git a/python/cudf/cudf/core/column/string.py b/python/cudf/cudf/core/column/string.py index d12aa80e9a3..ad7dbe5e52e 100644 --- a/python/cudf/cudf/core/column/string.py +++ b/python/cudf/cudf/core/column/string.py @@ -552,16 +552,17 @@ def join( return self._return_or_inplace(data) def _split_by_character(self): - result_col = libstrings.character_tokenize(self._column) + col = self._column.fillna("") # sanitize nulls + result_col = libstrings.character_tokenize(col) - offset_col = self._column.children[0] + offset_col = col.children[0] return cudf.core.column.ListColumn( - size=len(self._column), - dtype=cudf.ListDtype(self._column.dtype), - mask=self._column.mask, + size=len(col), + dtype=cudf.ListDtype(col.dtype), + mask=col.mask, offset=0, - null_count=self._column.null_count, + null_count=0, children=(offset_col, result_col), ) diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index 9307267b227..e1b6cc45dd3 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -2688,6 +2688,7 @@ def _set_columns_like(self, other: ColumnAccessor) -> None: self._data = ColumnAccessor( data=dict(zip(other.names, self._data.columns)), multiindex=other.multiindex, + rangeindex=other.rangeindex, level_names=other.level_names, label_dtype=other.label_dtype, verify=False, @@ -7534,7 +7535,7 @@ def _sample_axis_1( def _from_columns_like_self( self, columns: List[ColumnBase], - column_names: abc.Iterable[str], + column_names: Optional[abc.Iterable[str]] = None, index_names: Optional[List[str]] = None, *, override_dtypes: Optional[abc.Iterable[Optional[Dtype]]] = None, diff --git a/python/cudf/cudf/core/index.py b/python/cudf/cudf/core/index.py index 7297ac4e929..732e5cdb01a 100644 --- a/python/cudf/cudf/core/index.py +++ b/python/cudf/cudf/core/index.py @@ -1757,13 +1757,10 @@ def __init__( name = _getdefault_name(data, name=name) data = column.as_column(data) - # TODO: Remove this if statement and fix tests now that - # there's timezone support - if isinstance(data.dtype, pd.DatetimeTZDtype): - raise NotImplementedError( - "cuDF does not yet support timezone-aware datetimes" - ) - data = data.astype(dtype) + # TODO: if data.dtype.kind == "M" (i.e. data is already datetime type) + # We probably shouldn't always astype to datetime64[ns] + if not isinstance(data.dtype, pd.DatetimeTZDtype): + data = data.astype(dtype) if copy: data = data.copy() diff --git a/python/cudf/cudf/core/indexed_frame.py b/python/cudf/cudf/core/indexed_frame.py index ecfcec15337..fdc78005996 100644 --- a/python/cudf/cudf/core/indexed_frame.py +++ b/python/cudf/cudf/core/indexed_frame.py @@ -40,8 +40,6 @@ from cudf.api.extensions import no_default from cudf.api.types import ( _is_non_decimal_numeric_dtype, - is_bool_dtype, - is_decimal_dtype, is_dict_like, is_list_like, is_scalar, @@ -372,7 +370,6 @@ def _mimic_inplace( self._index = result.index return super()._mimic_inplace(result, inplace) - # Scans @_cudf_nvtx_annotate def _scan(self, op, axis=None, skipna=True): """ @@ -417,13 +414,10 @@ def _scan(self, op, axis=None, skipna=True): cast_to_int = op in ("cumsum", "cumprod") skipna = True if skipna is None else skipna - results = {} - for name, col in self._data.items(): + results = [] + for col in self._columns: if skipna: - try: - result_col = col.nans_to_nulls() - except AttributeError: - result_col = col + result_col = col.nans_to_nulls() else: if col.has_nulls(include_nan=True): first_index = col.isnull().find_first_value(True) @@ -432,19 +426,14 @@ def _scan(self, op, axis=None, skipna=True): else: result_col = col - if ( - cast_to_int - and not is_decimal_dtype(result_col.dtype) - and ( - np.issubdtype(result_col.dtype, np.integer) - or np.issubdtype(result_col.dtype, np.bool_) - ) - ): + if cast_to_int and result_col.dtype.kind in "uib": # For reductions that accumulate a value (e.g. sum, not max) # pandas returns an int64 dtype for all int or bool dtypes. result_col = result_col.astype(np.int64) - results[name] = getattr(result_col, op)() - return self._from_data(results, self.index) + results.append(getattr(result_col, op)()) + return self._from_data_like_self( + self._data._from_columns_like_self(results) + ) def _check_data_index_length_match(self) -> None: # Validate that the number of rows in the data matches the index if the @@ -883,7 +872,6 @@ def replace( FutureWarning, ) if not (to_replace is None and value is no_default): - copy_data = {} ( all_na_per_column, to_replace_per_column, @@ -893,10 +881,10 @@ def replace( value=value, columns_dtype_map=dict(self._dtypes), ) - + copy_data = [] for name, col in self._data.items(): try: - copy_data[name] = col.find_and_replace( + replaced = col.find_and_replace( to_replace_per_column[name], replacements_per_column[name], all_na_per_column[name], @@ -909,11 +897,13 @@ def replace( # that exists in `copy_data`. # ii. There is an OverflowError while trying to cast # `to_replace_per_column` to `replacements_per_column`. - copy_data[name] = col.copy(deep=True) + replaced = col.copy(deep=True) + copy_data.append(replaced) + result = self._from_data_like_self( + self._data._from_columns_like_self(copy_data) + ) else: - copy_data = self._data.copy(deep=True) - - result = self._from_data(copy_data, self.index) + result = self.copy() return self._mimic_inplace(result, inplace=inplace) @@ -1034,12 +1024,13 @@ def clip(self, lower=None, upper=None, inplace=False, axis=1): ): lower[0], upper[0] = upper[0], lower[0] - data = { - name: col.clip(lower[i], upper[i]) - for i, (name, col) in enumerate(self._data.items()) - } - output = self._from_data(data, self.index) - output._copy_type_metadata(self, include_index=False) + data = ( + col.clip(low, high) + for col, low, high in zip(self._columns, lower, upper) + ) + output = self._from_data_like_self( + self._data._from_columns_like_self(data) + ) return self._mimic_inplace(output, inplace=inplace) @_cudf_nvtx_annotate @@ -1915,12 +1906,12 @@ def nans_to_nulls(self): 1 3.14 2 """ - result = ( - col.nans_to_nulls() - if isinstance(col, cudf.core.column.NumericalColumn) - else col.copy() - for col in self._data.columns - ) + result = [] + for col in self._columns: + converted = col.nans_to_nulls() + if converted is col: + converted = converted.copy() + result.append(converted) return self._from_data_like_self( self._data._from_columns_like_self(result) ) @@ -2031,8 +2022,8 @@ def interpolate( ) interpolator = cudf.core.algorithms.get_column_interpolator(method) - columns = {} - for colname, col in data._data.items(): + columns = [] + for col in data._columns: if isinstance(col, cudf.core.column.StringColumn): warnings.warn( f"{type(self).__name__}.interpolate with object dtype is " @@ -2043,9 +2034,12 @@ def interpolate( col = col.astype("float64").fillna(np.nan) # Interpolation methods may or may not need the index - columns[colname] = interpolator(col, index=data.index) + columns.append(interpolator(col, index=data.index)) - result = self._from_data(columns, index=data.index) + result = self._from_data_like_self( + self._data._from_columns_like_self(columns) + ) + result.index = data.index return ( result @@ -2072,8 +2066,8 @@ def shift(self, periods=1, freq=None, axis=0, fill_value=None): data_columns = ( col.shift(periods, fill_value) for col in self._columns ) - return self.__class__._from_data( - zip(self._column_names, data_columns), self.index + return self._from_data_like_self( + self._data._from_columns_like_self(data_columns) ) @_cudf_nvtx_annotate @@ -3014,8 +3008,6 @@ def _slice(self, arg: slice, keep_index: bool = True) -> Self: self._column_names, None if has_range_index or not keep_index else self.index.names, ) - result._data.label_dtype = self._data.label_dtype - result._data.rangeindex = self._data.rangeindex if keep_index and has_range_index: result.index = self.index[start:stop] @@ -3564,11 +3556,6 @@ def sort_values( ), keep_index=not ignore_index, ) - if ( - isinstance(self, cudf.core.dataframe.DataFrame) - and self._data.multiindex - ): - out.columns = self._data.to_pandas_index() return out def _n_largest_or_smallest( @@ -3662,14 +3649,12 @@ def _align_to_index( result = result.sort_values(sort_col_id) del result[sort_col_id] - result = self.__class__._from_data( - data=result._data, index=result.index + out = self._from_data( + self._data._from_columns_like_self(result._columns) ) - result._data.multiindex = self._data.multiindex - result._data._level_names = self._data._level_names - result.index.names = self.index.names - - return result + out.index = result.index + out.index.names = self.index.names + return out @_cudf_nvtx_annotate def _reindex( @@ -3901,24 +3886,14 @@ def round(self, decimals=0, how="half_even"): "decimals must be an integer, a dict-like or a Series" ) - cols = { - name: col.round(decimals[name], how=how) - if ( - name in decimals - and _is_non_decimal_numeric_dtype(col.dtype) - and not is_bool_dtype(col.dtype) - ) + cols = ( + col.round(decimals[name], how=how) + if name in decimals and col.dtype.kind in "fiu" else col.copy(deep=True) for name, col in self._data.items() - } - - return self.__class__._from_data( - data=cudf.core.column_accessor.ColumnAccessor( - cols, - multiindex=self._data.multiindex, - level_names=self._data.level_names, - ), - index=self.index, + ) + return self._from_data_like_self( + self._data._from_columns_like_self(cols) ) def resample( @@ -4228,10 +4203,7 @@ def _drop_na_columns(self, how="any", subset=None, thresh=None): thresh = len(df) for name, col in df._data.items(): - try: - check_col = col.nans_to_nulls() - except AttributeError: - check_col = col + check_col = col.nans_to_nulls() no_threshold_valid_count = ( len(col) - check_col.null_count ) < thresh @@ -4261,12 +4233,7 @@ def _drop_na_rows(self, how="any", subset=None, thresh=None): if len(subset) == 0: return self.copy(deep=True) - data_columns = [ - col.nans_to_nulls() - if isinstance(col, cudf.core.column.NumericalColumn) - else col - for col in self._columns - ] + data_columns = [col.nans_to_nulls() for col in self._columns] return self._from_columns_like_self( libcudf.stream_compaction.drop_nulls( @@ -6249,6 +6216,8 @@ def rank( f"axis={axis} is not yet supported in rank" ) + num_cols = self._num_columns + dropped_cols = False source = self if numeric_only: if isinstance( @@ -6266,15 +6235,28 @@ def rank( source = self._get_columns_by_label(numeric_cols) if source.empty: return source.astype("float64") + elif source._num_columns != num_cols: + dropped_cols = True result_columns = libcudf.sort.rank_columns( [*source._columns], method_enum, na_option, ascending, pct ) - return self.__class__._from_data( - dict(zip(source._column_names, result_columns)), - index=source.index, - ).astype(np.float64) + if dropped_cols: + result = type(source)._from_data( + ColumnAccessor( + dict(zip(source._column_names, result_columns)), + multiindex=self._data.multiindex, + level_names=self._data.level_names, + label_dtype=self._data.label_dtype, + ), + ) + else: + result = source._from_data_like_self( + self._data._from_columns_like_self(result_columns) + ) + result.index = source.index + return result.astype(np.float64) def convert_dtypes( self, diff --git a/python/cudf/cudf/core/reshape.py b/python/cudf/cudf/core/reshape.py index d4772d5b4c2..53239cb7ea0 100644 --- a/python/cudf/cudf/core/reshape.py +++ b/python/cudf/cudf/core/reshape.py @@ -1210,9 +1210,7 @@ def _get_unique(column, dummy_na): else: unique = column.unique().sort_values() if not dummy_na: - if np.issubdtype(unique.dtype, np.floating): - unique = unique.nans_to_nulls() - unique = unique.dropna() + unique = unique.nans_to_nulls().dropna() return unique diff --git a/python/cudf/cudf/core/window/rolling.py b/python/cudf/cudf/core/window/rolling.py index 2037b1682db..7d140a1ffa5 100644 --- a/python/cudf/cudf/core/window/rolling.py +++ b/python/cudf/cudf/core/window/rolling.py @@ -1,7 +1,5 @@ # Copyright (c) 2020-2024, NVIDIA CORPORATION -import itertools - import numba import pandas as pd from pandas.api.indexers import BaseIndexer @@ -251,27 +249,13 @@ def _apply_agg_column(self, source_column, agg_name): agg_params=self.agg_params, ) - def _apply_agg_dataframe(self, df, agg_name): - return cudf.DataFrame._from_data( - { - col_name: self._apply_agg_column(col, agg_name) - for col_name, col in df._data.items() - }, - index=df.index, - ) - def _apply_agg(self, agg_name): - if isinstance(self.obj, cudf.Series): - return cudf.Series._from_data( - { - self.obj.name: self._apply_agg_column( - self.obj._column, agg_name - ) - }, - index=self.obj.index, - ) - else: - return self._apply_agg_dataframe(self.obj, agg_name) + applied = ( + self._apply_agg_column(col, agg_name) for col in self.obj._columns + ) + return self.obj._from_data_like_self( + self.obj._data._from_columns_like_self(applied) + ) def _reduce( self, @@ -533,18 +517,9 @@ def _window_to_window_sizes(self, window): ) def _apply_agg(self, agg_name): - index = cudf.MultiIndex.from_frame( - cudf.DataFrame( - { - key: value - for key, value in itertools.chain( - self._group_keys._data.items(), - self.obj.index._data.items(), - ) - } - ) + index = cudf.MultiIndex._from_data( + {**self._group_keys._data, **self.obj.index._data} ) - result = super()._apply_agg(agg_name) result.index = index return result diff --git a/python/cudf/cudf/pandas/_wrappers/common.py b/python/cudf/cudf/pandas/_wrappers/common.py index 468c5687c15..66a51a83896 100644 --- a/python/cudf/cudf/pandas/_wrappers/common.py +++ b/python/cudf/cudf/pandas/_wrappers/common.py @@ -46,5 +46,10 @@ def cuda_array_interface(self: _FastSlowProxy): return self._fsproxy_fast.__cuda_array_interface__ +@property # type: ignore +def array_interface(self: _FastSlowProxy): + return self._fsproxy_slow.__array_interface__ + + def custom_iter(self: _FastSlowProxy): return iter(self._fsproxy_slow) diff --git a/python/cudf/cudf/pandas/_wrappers/numpy.py b/python/cudf/cudf/pandas/_wrappers/numpy.py index 94298872213..c445be46f58 100644 --- a/python/cudf/cudf/pandas/_wrappers/numpy.py +++ b/python/cudf/cudf/pandas/_wrappers/numpy.py @@ -15,6 +15,7 @@ make_intermediate_proxy_type, ) from .common import ( + array_interface, array_method, arrow_array_method, cuda_array_interface, @@ -115,6 +116,7 @@ def wrap_ndarray(cls, arr: cupy.ndarray | numpy.ndarray, constructor): # So that pa.array(wrapped-numpy-array) works "__arrow_array__": arrow_array_method, "__cuda_array_interface__": cuda_array_interface, + "__array_interface__": array_interface, # ndarrays are unhashable "__hash__": None, # iter(cupy-array) produces an iterable of zero-dim device diff --git a/python/cudf/cudf/pandas/fast_slow_proxy.py b/python/cudf/cudf/pandas/fast_slow_proxy.py index 169dd80e132..5f4cf2e6cc6 100644 --- a/python/cudf/cudf/pandas/fast_slow_proxy.py +++ b/python/cudf/cudf/pandas/fast_slow_proxy.py @@ -9,6 +9,7 @@ import operator import pickle import types +import warnings from collections.abc import Iterator from enum import IntEnum from typing import ( @@ -23,6 +24,10 @@ Type, ) +import numpy as np + +from ..options import _env_get_bool +from ..testing._utils import assert_eq from .annotation import nvtx @@ -808,7 +813,9 @@ def __get__(self, instance, owner) -> Any: else: # for anything else, use a fast-slow attribute: self._attr, _ = _fast_slow_function_call( - getattr, owner, self._name + getattr, + owner, + self._name, ) if isinstance( @@ -829,9 +836,11 @@ def __get__(self, instance, owner) -> Any: getattr(instance._fsproxy_slow, self._name), None, # type: ignore ) - return _fast_slow_function_call(getattr, instance, self._name)[ - 0 - ] + return _fast_slow_function_call( + getattr, + instance, + self._name, + )[0] return self._attr @@ -866,7 +875,17 @@ def __name__(self, value): setattr(self._fsproxy_slow, "__name__", value) -def _fast_slow_function_call(func: Callable, /, *args, **kwargs) -> Any: +def _assert_fast_slow_eq(left, right): + if _is_final_type(type(left)) or type(left) in NUMPY_TYPES: + assert_eq(left, right) + + +def _fast_slow_function_call( + func: Callable, + /, + *args, + **kwargs, +) -> Any: """ Call `func` with all `args` and `kwargs` converted to their respective fast type. If that fails, call `func` with all @@ -890,6 +909,37 @@ def _fast_slow_function_call(func: Callable, /, *args, **kwargs) -> Any: # try slow path raise Exception() fast = True + if _env_get_bool("CUDF_PANDAS_DEBUGGING", False): + try: + with nvtx.annotate( + "EXECUTE_SLOW_DEBUG", + color=_CUDF_PANDAS_NVTX_COLORS["EXECUTE_SLOW"], + domain="cudf_pandas", + ): + slow_args, slow_kwargs = ( + _slow_arg(args), + _slow_arg(kwargs), + ) + with disable_module_accelerator(): + slow_result = func(*slow_args, **slow_kwargs) + except Exception as e: + warnings.warn( + "The result from pandas could not be computed. " + f"The exception was {e}." + ) + else: + try: + _assert_fast_slow_eq(result, slow_result) + except AssertionError as e: + warnings.warn( + "The results from cudf and pandas were different. " + f"The exception was {e}." + ) + except Exception as e: + warnings.warn( + "Pandas debugging mode failed. " + f"The exception was {e}." + ) except Exception: with nvtx.annotate( "EXECUTE_SLOW", @@ -1135,6 +1185,9 @@ def _replace_closurevars( ) +NUMPY_TYPES: Set[str] = set(np.sctypeDict.values()) + + _SPECIAL_METHODS: Set[str] = { "__abs__", "__add__", diff --git a/python/cudf/cudf/tests/series/test_datetimelike.py b/python/cudf/cudf/tests/series/test_datetimelike.py index 7ef55761b2b..58ffc610c3c 100644 --- a/python/cudf/cudf/tests/series/test_datetimelike.py +++ b/python/cudf/cudf/tests/series/test_datetimelike.py @@ -223,3 +223,16 @@ def test_contains_tz_aware(item, expected): def test_tz_convert_naive_typeerror(): with pytest.raises(TypeError): cudf.date_range("2020", periods=2, freq="D").tz_convert(None) + + +@pytest.mark.parametrize( + "klass", ["Series", "DatetimeIndex", "Index", "CategoricalIndex"] +) +def test_from_pandas_obj_tz_aware(klass): + tz_aware_data = [ + pd.Timestamp("2020-01-01", tz="UTC").tz_convert("US/Pacific") + ] + pandas_obj = getattr(pd, klass)(tz_aware_data) + result = cudf.from_pandas(pandas_obj) + expected = getattr(cudf, klass)(tz_aware_data) + assert_eq(result, expected) diff --git a/python/cudf/cudf/tests/test_dataframe.py b/python/cudf/cudf/tests/test_dataframe.py index d76d5eb8065..98e9f9881c7 100644 --- a/python/cudf/cudf/tests/test_dataframe.py +++ b/python/cudf/cudf/tests/test_dataframe.py @@ -10980,7 +10980,7 @@ def test_squeeze(axis, data): assert_eq(result, expected) -@pytest.mark.parametrize("column", [range(1), np.array([1], dtype=np.int8)]) +@pytest.mark.parametrize("column", [range(1, 2), np.array([1], dtype=np.int8)]) @pytest.mark.parametrize( "operation", [ @@ -10991,6 +10991,16 @@ def test_squeeze(axis, data): lambda df: abs(df), lambda df: -df, lambda df: ~df, + lambda df: df.cumsum(), + lambda df: df.replace(1, 2), + lambda df: df.replace(10, 20), + lambda df: df.clip(0, 10), + lambda df: df.rolling(1).mean(), + lambda df: df.interpolate(), + lambda df: df.shift(), + lambda df: df.sort_values(1), + lambda df: df.round(), + lambda df: df.rank(), ], ) def test_op_preserves_column_metadata(column, operation): diff --git a/python/cudf/cudf/tests/test_datetime.py b/python/cudf/cudf/tests/test_datetime.py index 4186fff038a..e3ecaafae5b 100644 --- a/python/cudf/cudf/tests/test_datetime.py +++ b/python/cudf/cudf/tests/test_datetime.py @@ -2088,25 +2088,6 @@ def test_datetime_constructor(data, dtype): assert_eq(expected, actual) -@pytest.mark.parametrize( - "data", - [ - [pd.Timestamp("2001-01-01", tz="America/New_York")], - pd.Series(["2001-01-01"], dtype="datetime64[ns, America/New_York]"), - pd.Index(["2001-01-01"], dtype="datetime64[ns, America/New_York]"), - ], -) -def test_construction_from_tz_timestamps(data): - with pytest.raises(NotImplementedError): - _ = cudf.Series(data) - with pytest.raises(NotImplementedError): - _ = cudf.Index(data) - with pytest.raises(NotImplementedError): - _ = cudf.DatetimeIndex(data) - with pytest.raises(NotImplementedError): - cudf.CategoricalIndex(data) - - @pytest.mark.parametrize("op", _cmpops) def test_datetime_binop_tz_timestamp(op): s = cudf.Series([1, 2, 3], dtype="datetime64[ns]") @@ -2391,13 +2372,14 @@ def test_datetime_raise_warning(freqstr): t.dt.ceil(freqstr) -def test_timezone_array_notimplemented(): +def test_timezone_pyarrow_array(): pa_array = pa.array( [datetime.datetime(2020, 1, 1, tzinfo=datetime.timezone.utc)], type=pa.timestamp("ns", "UTC"), ) - with pytest.raises(NotImplementedError): - cudf.Series(pa_array) + result = cudf.Series(pa_array) + expected = pa_array.to_pandas() + assert_eq(result, expected) def test_to_datetime_errors_ignore_deprecated(): diff --git a/python/cudf/cudf/tests/test_replace.py b/python/cudf/cudf/tests/test_replace.py index d77ec596271..9466398964a 100644 --- a/python/cudf/cudf/tests/test_replace.py +++ b/python/cudf/cudf/tests/test_replace.py @@ -6,6 +6,7 @@ import numpy as np import pandas as pd +import pyarrow as pa import pytest import cudf @@ -1370,3 +1371,10 @@ def test_fillna_columns_multiindex(): actual = gdf.fillna(10) assert_eq(expected, actual) + + +def test_fillna_nan_and_null(): + ser = cudf.Series(pa.array([float("nan"), None, 1.1]), nan_as_null=False) + result = ser.fillna(2.2) + expected = cudf.Series([2.2, 2.2, 1.1]) + assert_eq(result, expected) diff --git a/python/cudf/cudf/tests/test_series.py b/python/cudf/cudf/tests/test_series.py index 323716d5fc3..f47c42d9a1d 100644 --- a/python/cudf/cudf/tests/test_series.py +++ b/python/cudf/cudf/tests/test_series.py @@ -2841,3 +2841,10 @@ def test_series_from_series_index_no_shallow_copy(): ser1 = cudf.Series(range(3), index=list("abc")) ser2 = cudf.Series(ser1) assert ser1.index is ser2.index + + +@pytest.mark.parametrize("value", [1, 1.1]) +def test_nans_to_nulls_noop_copies_column(value): + ser1 = cudf.Series([value]) + ser2 = ser1.nans_to_nulls() + assert ser1._column is not ser2._column diff --git a/python/cudf/cudf/tests/text/test_text_methods.py b/python/cudf/cudf/tests/text/test_text_methods.py index 6bd3b99bae1..36f7f3de828 100644 --- a/python/cudf/cudf/tests/text/test_text_methods.py +++ b/python/cudf/cudf/tests/text/test_text_methods.py @@ -426,7 +426,6 @@ def test_character_tokenize_series(): [ "hello world", "sdf", - None, ( "goodbye, one-two:three~four+five_six@sev" "en#eight^nine heŒŽ‘•™œ$µ¾ŤƠé DŽ" @@ -543,7 +542,6 @@ def test_character_tokenize_index(): [ "hello world", "sdf", - None, ( "goodbye, one-two:three~four+five_six@sev" "en#eight^nine heŒŽ‘•™œ$µ¾ŤƠé DŽ" diff --git a/python/cudf/cudf/utils/_numba.py b/python/cudf/cudf/utils/_numba.py index 494b48b3cfd..d9dde58d998 100644 --- a/python/cudf/cudf/utils/_numba.py +++ b/python/cudf/cudf/utils/_numba.py @@ -12,16 +12,14 @@ # strings_udf. This is the easiest way to break an otherwise circular import # loop of _lib.*->cudautils->_numba->_lib.strings_udf @lru_cache -def _get_cc_60_ptx_file(): +def _get_cuda_build_version(): from cudf._lib import strings_udf - return os.path.join( - os.path.dirname(strings_udf.__file__), - "..", - "core", - "udf", - "shim_60.ptx", - ) + # The version is an integer, parsed as 1000 * major + 10 * minor + cuda_build_version = strings_udf.get_cuda_build_version() + cuda_major_version = cuda_build_version // 1000 + cuda_minor_version = (cuda_build_version % 1000) // 10 + return (cuda_major_version, cuda_minor_version) def _get_best_ptx_file(archs, max_compute_capability): @@ -38,8 +36,8 @@ def _get_best_ptx_file(archs, max_compute_capability): def _get_ptx_file(path, prefix): if "RAPIDS_NO_INITIALIZE" in os.environ: - # cc=60 ptx is always built - cc = int(os.environ.get("STRINGS_UDF_CC", "60")) + # cc=70 ptx is always built + cc = int(os.environ.get("STRINGS_UDF_CC", "70")) else: from numba import cuda @@ -120,15 +118,13 @@ def _setup_numba(): versions = safe_get_versions() if versions != NO_DRIVER: driver_version, runtime_version = versions - ptx_toolkit_version = _get_cuda_version_from_ptx_file( - _get_cc_60_ptx_file() - ) + shim_ptx_cuda_version = _get_cuda_build_version() # MVC is required whenever any PTX is newer than the driver - # This could be the shipped PTX file or the PTX emitted by - # the version of NVVM on the user system, the latter aligning - # with the runtime version - if (driver_version < ptx_toolkit_version) or ( + # This could be the shipped shim PTX file (determined by the CUDA + # version used at build time) or the PTX emitted by the version of NVVM + # on the user system (determined by the user's CUDA runtime version) + if (driver_version < shim_ptx_cuda_version) or ( driver_version < runtime_version ): if driver_version < (12, 0): @@ -139,60 +135,6 @@ def _setup_numba(): patch_numba_linker() -def _get_cuda_version_from_ptx_file(path): - """ - https://docs.nvidia.com/cuda/parallel-thread-execution/ - Each PTX module must begin with a .version - directive specifying the PTX language version - - example header: - // - // Generated by NVIDIA NVVM Compiler - // - // Compiler Build ID: CL-31057947 - // Cuda compilation tools, release 11.6, V11.6.124 - // Based on NVVM 7.0.1 - // - - .version 7.6 - .target sm_52 - .address_size 64 - - """ - with open(path) as ptx_file: - for line in ptx_file: - if line.startswith(".version"): - ver_line = line - break - else: - raise ValueError("Could not read CUDA version from ptx file.") - version = ver_line.strip("\n").split(" ")[1] - # This dictionary maps from supported versions of NVVM to the - # PTX version it produces. The lowest value should be the minimum - # CUDA version required to compile the library. Currently CUDA 11.5 - # or higher is required to build cudf. New CUDA versions should - # be added to this dictionary when officially supported. - ver_map = { - "7.5": (11, 5), - "7.6": (11, 6), - "7.7": (11, 7), - "7.8": (11, 8), - "8.0": (12, 0), - "8.1": (12, 1), - "8.2": (12, 2), - "8.3": (12, 3), - "8.4": (12, 4), - } - - cuda_ver = ver_map.get(version) - if cuda_ver is None: - raise ValueError( - f"Could not map PTX version {version} to a CUDA version" - ) - - return cuda_ver - - class _CUDFNumbaConfig: def __enter__(self): self.CUDA_LOW_OCCUPANCY_WARNINGS = ( diff --git a/python/cudf/cudf_pandas_tests/test_cudf_pandas.py b/python/cudf/cudf_pandas_tests/test_cudf_pandas.py index fef829b17fc..72e9ad5fca3 100644 --- a/python/cudf/cudf_pandas_tests/test_cudf_pandas.py +++ b/python/cudf/cudf_pandas_tests/test_cudf_pandas.py @@ -41,8 +41,9 @@ get_calendar, ) -# Accelerated pandas has the real pandas module as an attribute +# Accelerated pandas has the real pandas and cudf modules as attributes pd = xpd._fsproxy_slow +cudf = xpd._fsproxy_fast @pytest.fixture @@ -1424,5 +1425,66 @@ def test_holidays_within_dates(holiday, start, expected): ) == [utc.localize(dt) for dt in expected] +def test_cudf_pandas_debugging_different_results(monkeypatch): + cudf_mean = cudf.Series.mean + + def mock_mean_one(self, *args, **kwargs): + return np.float64(1.0) + + with monkeypatch.context() as monkeycontext: + monkeypatch.setattr(xpd.Series.mean, "_fsproxy_fast", mock_mean_one) + monkeycontext.setenv("CUDF_PANDAS_DEBUGGING", "True") + s = xpd.Series([1, 2]) + with pytest.warns( + UserWarning, + match="The results from cudf and pandas were different.", + ): + assert s.mean() == 1.0 + # Must explicitly undo the patch. Proxy dispatch doesn't work with monkeypatch contexts. + monkeypatch.setattr(xpd.Series.mean, "_fsproxy_fast", cudf_mean) + + +def test_cudf_pandas_debugging_pandas_error(monkeypatch): + pd_mean = pd.Series.mean + + def mock_mean_exception(self, *args, **kwargs): + raise Exception() + + with monkeypatch.context() as monkeycontext: + monkeycontext.setattr( + xpd.Series.mean, "_fsproxy_slow", mock_mean_exception + ) + monkeycontext.setenv("CUDF_PANDAS_DEBUGGING", "True") + s = xpd.Series([1, 2]) + with pytest.warns( + UserWarning, + match="The result from pandas could not be computed.", + ): + s = xpd.Series([1, 2]) + assert s.mean() == 1.5 + # Must explicitly undo the patch. Proxy dispatch doesn't work with monkeypatch contexts. + monkeypatch.setattr(xpd.Series.mean, "_fsproxy_slow", pd_mean) + + +def test_cudf_pandas_debugging_failed(monkeypatch): + pd_mean = pd.Series.mean + + def mock_mean_none(self, *args, **kwargs): + return None + + with monkeypatch.context() as monkeycontext: + monkeycontext.setattr(xpd.Series.mean, "_fsproxy_slow", mock_mean_none) + monkeycontext.setenv("CUDF_PANDAS_DEBUGGING", "True") + s = xpd.Series([1, 2]) + with pytest.warns( + UserWarning, + match="Pandas debugging mode failed.", + ): + s = xpd.Series([1, 2]) + assert s.mean() == 1.5 + # Must explicitly undo the patch. Proxy dispatch doesn't work with monkeypatch contexts. + monkeypatch.setattr(xpd.Series.mean, "_fsproxy_slow", pd_mean) + + def test_excelwriter_pathlike(): assert isinstance(pd.ExcelWriter("foo.xlsx"), os.PathLike) diff --git a/python/cudf/udf_cpp/CMakeLists.txt b/python/cudf/udf_cpp/CMakeLists.txt index fe7f9d0b00d..fa7855cfc65 100644 --- a/python/cudf/udf_cpp/CMakeLists.txt +++ b/python/cudf/udf_cpp/CMakeLists.txt @@ -60,7 +60,7 @@ set(SHIM_CUDA_FLAGS --expt-relaxed-constexpr -rdc=true) # always build a default PTX file in case RAPIDS_NO_INITIALIZE is set and the device cc can't be # safely queried through a context -list(INSERT CMAKE_CUDA_ARCHITECTURES 0 "60") +list(INSERT CMAKE_CUDA_ARCHITECTURES 0 "70") list(TRANSFORM CMAKE_CUDA_ARCHITECTURES REPLACE "-real" "") list(TRANSFORM CMAKE_CUDA_ARCHITECTURES REPLACE "-virtual" "") diff --git a/python/cudf/udf_cpp/strings/include/cudf/strings/udf/udf_apis.hpp b/python/cudf/udf_cpp/strings/include/cudf/strings/udf/udf_apis.hpp index 219dbe27682..8635b1280de 100644 --- a/python/cudf/udf_cpp/strings/include/cudf/strings/udf/udf_apis.hpp +++ b/python/cudf/udf_cpp/strings/include/cudf/strings/udf/udf_apis.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022-2023, NVIDIA CORPORATION. + * Copyright (c) 2022-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -27,6 +27,13 @@ namespace cudf { namespace strings { namespace udf { +/** + * @brief Get the CUDA version used at build time. + * + * @return The CUDA version as an integer, parsed as major * 1000 + minor * 10. + */ +int get_cuda_build_version(); + class udf_string; /** diff --git a/python/cudf/udf_cpp/strings/src/strings/udf/udf_apis.cu b/python/cudf/udf_cpp/strings/src/strings/udf/udf_apis.cu index 9cf86b5ea48..941e61e6787 100644 --- a/python/cudf/udf_cpp/strings/src/strings/udf/udf_apis.cu +++ b/python/cudf/udf_cpp/strings/src/strings/udf/udf_apis.cu @@ -101,6 +101,8 @@ void free_udf_string_array(cudf::strings::udf::udf_string* d_strings, // external APIs +int get_cuda_build_version() { return CUDA_VERSION; } + std::unique_ptr to_string_view_array(cudf::column_view const input) { return detail::to_string_view_array(input, cudf::get_default_stream()); diff --git a/python/dask_cudf/dask_cudf/io/tests/test_parquet.py b/python/dask_cudf/dask_cudf/io/tests/test_parquet.py index 39800145585..f3e3911e6c7 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_parquet.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_parquet.py @@ -596,3 +596,23 @@ def test_parquet_read_filter_and_project(tmpdir): # Check result expected = df[(df.a == 5) & (df.c > 20)][columns].reset_index(drop=True) dd.assert_eq(got, expected) + + +def test_timezone_column(tmpdir): + path = str(tmpdir.join("test.parquet")) + pdf = pd.DataFrame( + { + "time": pd.to_datetime( + ["1996-01-02", "1996-12-01"], + utc=True, + ), + "x": [1, 2], + } + ) + pdf.to_parquet(path) + got = dask_cudf.read_parquet(path) + # cudf.read_parquet does not support reading timezone aware types yet + assert got["time"].dtype == pd.DatetimeTZDtype("ns", "UTC") + got["time"] = got["time"].astype("datetime64[ns]") + expected = cudf.read_parquet(path) + dd.assert_eq(got, expected)