Skip to content

Commit

Permalink
refactor!: rewrite as_polars_df to use Arrow C Stream interface
Browse files Browse the repository at this point in the history
  • Loading branch information
eitsupi committed May 6, 2024
1 parent bbba938 commit 350689c
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 180 deletions.
35 changes: 8 additions & 27 deletions R/as_polars.R
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ as_polars_df.default = function(x, ...) {
#' @param make_names_unique A logical flag to replace duplicated column names
#' with unique names. If `FALSE` and there are duplicated column names, an
#' error is thrown.
#' @param schema named list of DataTypes, or character vector of column names.
#' Should match the number of columns in `x` and correspond to each column in `x` by position.
#' If a column in `x` does not match the name or type at the same position, it will be renamed/recast.
#' If `NULL` (default), convert columns as is.
#' @param schema_overrides named list of DataTypes. Cast some columns to the DataType.
#' @inheritParams as_polars_df.ArrowTabular
#' @export
as_polars_df.data.frame = function(
Expand Down Expand Up @@ -199,26 +204,9 @@ as_polars_df.RPolarsLazyGroupBy = function(x, ...) {

# TODO: link to DataTypes documents
#' @rdname as_polars_df
#' @param rechunk A logical flag (default `TRUE`).
#' Make sure that all data of each column is in contiguous memory.
#' @param schema named list of DataTypes, or character vector of column names.
#' Should match the number of columns in `x` and correspond to each column in `x` by position.
#' If a column in `x` does not match the name or type at the same position, it will be renamed/recast.
#' If `NULL` (default), convert columns as is.
#' @param schema_overrides named list of DataTypes. Cast some columns to the DataType.
#' @export
as_polars_df.ArrowTabular = function(
x,
...,
rechunk = TRUE,
schema = NULL,
schema_overrides = NULL) {
arrow_to_rpldf(
x,
rechunk = rechunk,
schema = schema,
schema_overrides = schema_overrides
)
as_polars_df.ArrowTabular = function(x, ...) {
as_polars_df.RecordBatchReader(arrow::as_record_batch_reader(x))
}


Expand All @@ -241,14 +229,7 @@ as_polars_df.nanoarrow_array = function(x, ...) {
unwrap("in as_polars_df(<nanoarrow_array>):")
}

series = as_polars_series.nanoarrow_array(x, name = NULL)

if (length(series)) {
series$to_frame()$unnest("")
} else {
# TODO: support 0-length array
pl$DataFrame()
}
as_polars_series.nanoarrow_array(x, name = "")$to_frame()$unnest("")
}


Expand Down
99 changes: 1 addition & 98 deletions R/construction.R
Original file line number Diff line number Diff line change
@@ -1,100 +1,3 @@
#' Internal function of `as_polars_df()` for `arrow::Table` class objects.
#'
#' This is a copy of Python Polars' `arrow_to_pydf` function.
#' @param at arrow::ArrowTabular (arrow::Table and arrow::RecordBatch)
#' @param rechunk A logical flag (default `TRUE`).
#' Make sure that all data of each column is in contiguous memory.
#' @param schema named list of DataTypes, or character vector of column names.
#' Should be the same length as the number of columns of `x`.
#' If schema names or types do not match `x`, the columns will be renamed/recast.
#' If `NULL` (default), convert columns as is.
#' @param schema_overrides named list of DataTypes. Cast some columns to the DataType.
#' @noRd
#' @return RPolarsDataFrame
arrow_to_rpldf = function(at, schema = NULL, schema_overrides = NULL, rechunk = TRUE) {
# new column names by schema, #todo get names if schema not NULL
n_cols = at$num_columns

new_schema = unpack_schema(
schema = schema %||% names(at),
schema_overrides = schema_overrides
)
col_names = names(new_schema)

if (length(col_names) != n_cols) {
Err_plain("schema length does not match column length") |>
unwrap()
}

data_cols = list()
# dictionaries cannot be built in different batches (categorical does not allow
# that) so we rechunk them and create them separately.
# struct columns don't work properly if they contain multiple chunks.
special_cols = list()

## iter over columns, possibly do special conversion
for (i in seq_len(n_cols)) {
column = at$column(i - 1L)
col_name = col_names[i]

if (is_arrow_dictionary(column)) {
column = coerce_arrow(column)
special_cols[[col_name]] = as_polars_series.ChunkedArray(column, col_name, rechunk = rechunk)
} else if (is_arrow_struct(column) && column$num_chunks > 1L) {
special_cols[[col_name]] = as_polars_series.ChunkedArray(column, col_name, rechunk = rechunk)
} else {
data_cols[[col_name]] = column
}
}

if (length(data_cols)) {
tbl = do.call(arrow::arrow_table, data_cols)

if (tbl$num_rows == 0L) {
rdf = pl$DataFrame() # TODO: support creating 0-row DataFrame
} else {
rdf = as_polars_series(arrow::as_record_batch_reader(tbl))$to_frame()$unnest("")
}
} else {
rdf = pl$DataFrame()
}

if (rechunk) {
rdf = rdf$select(pl$all()$rechunk())
}

if (length(special_cols)) {
rdf = rdf$with_columns(
unname(lapply(special_cols, \(s) pl$lit(s)$alias(s$name)))
)$select(
pl$col(col_names)
)
}

# cast any imported arrow fields not matching schema
cast_these_fields = mapply(
new_schema,
rdf$schema,
FUN = \(new_field, df_field) {
if (is.null(new_field) || new_field == df_field) NULL else new_field
},
SIMPLIFY = FALSE
) |> (\(l) l[!sapply(l, is.null)])()

if (length(cast_these_fields)) {
rdf = rdf$with_columns(
mapply(
cast_these_fields,
names(cast_these_fields),
FUN = \(dtype, name) pl$col(name)$cast(dtype),
SIMPLIFY = FALSE
) |> unname()
)
}

rdf
}

unpack_schema = function(
schema = NULL, # char vector of names or 'schema' a named list of DataTypes
schema_overrides = NULL # named list of DataTypes
Expand Down Expand Up @@ -204,7 +107,7 @@ arrow_to_rseries_result = function(name, values, rechunk = TRUE) {

#' Internal function of `as_polars_df()` for `data.frame` class objects.
#'
#' This is a copy of `arrow_to_rpldf`
#' This is a copy of Python Polars' `arrow_to_pydf` function.
#' @noRd
#' @return RPolarsDataFrame
df_to_rpldf = function(x, ..., schema = NULL, schema_overrides = NULL) {
Expand Down
5 changes: 1 addition & 4 deletions man/as_polars_df.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

51 changes: 0 additions & 51 deletions tests/testthat/test-as_polars.R
Original file line number Diff line number Diff line change
Expand Up @@ -232,35 +232,6 @@ test_that("from arrow Table and ChunkedArray", {
unname(as.list(at))
)

# no rechunk no longer works
# expect_identical(
# lapply(at$columns, \(x) length(as_polars_series.ChunkedArray(x, rechunk = FALSE)$chunk_lengths())),
# lapply(at$columns, \(x) x$num_chunks)
# )
# expect_grepl_error(expect_identical(
# lapply(at$columns, \(x) length(as_polars_series.ChunkedArray(x, rechunk = TRUE)$chunk_lengths())),
# lapply(at$columns, \(x) x$num_chunks)
# ))
# expect_identical(
# as_polars_df.ArrowTabular(at, rechunk = FALSE)$
# select(pl$all()$map_batches(\(s) s$chunk_lengths()))$
# to_list() |>
# lapply(length) |>
# unname(),
# lapply(at$columns, \(x) x$num_chunks)
# )

# expect_identical(
# as_polars_df.ArrowTabular(at, rechunk = TRUE)$
# select(pl$all()$map_batches(\(s) s$chunk_lengths()))$
# to_list() |>
# lapply(length) |>
# unname(),
# lapply(at$columns, \(x) x$num_chunks)
# )


# #not supported yet
# #chunked data with factors
l = list(
df1 = data.frame(factor = factor(c("apple", "apple", "banana"))),
Expand All @@ -279,28 +250,6 @@ test_that("from arrow Table and ChunkedArray", {
do.call(what = rbind)
df2 = as_polars_df.ArrowTabular(at2)
expect_identical(as.data.frame(at2), as.data.frame(df2))


# use schema override
df = as_polars_df.ArrowTabular(
arrow::arrow_table(iris),
schema_overrides = list(Sepal.Length = pl$Float32, Species = pl$String)
)
iris_str = iris
iris_str$Species = as.character(iris_str$Species)
expect_grepl_error(expect_equal(df$to_list(), as.list(iris_str)))
expect_equal(df$to_list(), as.list(iris_str), tolerance = 0.0001)

# change column name via char schema
char_schema = names(iris)
char_schema[1] = "Alice"
expect_identical(
as_polars_df.ArrowTabular(
arrow::arrow_table(iris),
schema = char_schema
)$columns,
char_schema
)
})


Expand Down

0 comments on commit 350689c

Please sign in to comment.