Skip to content

Commit

Permalink
[PERF] Move with_column and exclude function logic to Rust side, …
Browse files Browse the repository at this point in the history
…add `with_columns` (#2167)

This PR includes two things:
- a performance improvement to calling (not execution of)
`DataFrame.with_column` and `DataFrame.exclude` where projection
conversion is done on the Rust side so that costly conversions of the
dataframe fields into Python objects are no longer necessary
- the addition of the `DataFrame.with_columns` method which allows
adding multiple columns to a dataframe at once. Will also improve
performance over calling `with_column` multiple times. Since I was
modifying the `with_column` function I figured I would add this as a
drive-by

I also took this opportunity to clean up some of the rust builder code.
  • Loading branch information
kevinzwang authored Apr 24, 2024
1 parent 1e20b2d commit 3582aa7
Show file tree
Hide file tree
Showing 12 changed files with 229 additions and 165 deletions.
4 changes: 3 additions & 1 deletion daft/daft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -1268,7 +1268,9 @@ class LogicalPlanBuilder:
) -> LogicalPlanBuilder: ...
@staticmethod
def table_scan(scan_operator: ScanOperatorHandle) -> LogicalPlanBuilder: ...
def project(self, projection: list[PyExpr], resource_request: ResourceRequest) -> LogicalPlanBuilder: ...
def select(self, to_select: list[PyExpr]) -> LogicalPlanBuilder: ...
def with_columns(self, columns: list[PyExpr], resource_request: ResourceRequest) -> LogicalPlanBuilder: ...
def exclude(self, to_exclude: list[str]) -> LogicalPlanBuilder: ...
def filter(self, predicate: PyExpr) -> LogicalPlanBuilder: ...
def limit(self, limit: int, eager: bool) -> LogicalPlanBuilder: ...
def explode(self, to_explode: list[PyExpr]) -> LogicalPlanBuilder: ...
Expand Down
54 changes: 45 additions & 9 deletions daft/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,7 @@ def select(self, *columns: ColumnInputType) -> "DataFrame":
DataFrame: new DataFrame that will select the passed in columns
"""
assert len(columns) > 0
builder = self._builder.project(self.__column_input_to_expression(columns))
builder = self._builder.select(self.__column_input_to_expression(columns))
return DataFrame(builder)

@DataframePublicAPI
Expand Down Expand Up @@ -704,9 +704,7 @@ def exclude(self, *names: str) -> "DataFrame":
Returns:
DataFrame: DataFrame with some columns excluded.
"""
names_to_skip = set(names)
el = [col(e.name) for e in self._builder.schema() if e.name not in names_to_skip]
builder = self._builder.project(el)
builder = self._builder.exclude(list(names))
return DataFrame(builder)

@DataframePublicAPI
Expand Down Expand Up @@ -746,14 +744,52 @@ def with_column(
Returns:
DataFrame: DataFrame with new column.
"""
return self.with_columns({column_name: expr}, resource_request)

@DataframePublicAPI
def with_columns(
self,
columns: Dict[str, Expression],
resource_request: ResourceRequest = ResourceRequest(),
) -> "DataFrame":
"""Adds columns to the current DataFrame with Expressions, equivalent to a ``select``
with all current columns and the new ones
Example:
>>> df = daft.from_pydict({'x': [1, 2, 3], 'y': [4, 5, 6]})
>>>
>>> new_df = df.with_columns({
'foo': df['x'] + 1,
'bar': df['y'] - df['x']
})
>>> new_df.show()
╭───────┬───────┬───────┬───────╮
│ x ┆ y ┆ foo ┆ bar │
│ --- ┆ --- ┆ --- ┆ --- │
│ Int64 ┆ Int64 ┆ Int64 ┆ Int64 │
╞═══════╪═══════╪═══════╪═══════╡
│ 1 ┆ 4 ┆ 2 ┆ 3 │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 2 ┆ 5 ┆ 3 ┆ 3 │
├╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌┤
│ 3 ┆ 6 ┆ 4 ┆ 3 │
╰───────┴───────┴───────┴───────╯
(Showing first 3 of 3 rows)
Args:
columns (Dict[str, Expression]): Dictionary of new columns in the format { name: expression }
resource_request (ResourceRequest): a custom resource request for the execution of this operation
Returns:
DataFrame: DataFrame with new columns.
"""
if not isinstance(resource_request, ResourceRequest):
raise TypeError(f"resource_request should be a ResourceRequest, but got {type(resource_request)}")

prev_schema_as_cols = ExpressionsProjection(
[col(field.name) for field in self._builder.schema() if field.name != column_name]
)
new_schema = prev_schema_as_cols.union(ExpressionsProjection([expr.alias(column_name)]))
builder = self._builder.project(list(new_schema), resource_request)
new_columns = [col.alias(name) for name, col in columns.items()]

builder = self._builder.with_columns(new_columns, resource_request)
return DataFrame(builder)

@DataframePublicAPI
Expand Down
20 changes: 14 additions & 6 deletions daft/logical/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,21 @@ def from_tabular_scan(
builder = _LogicalPlanBuilder.table_scan(scan_operator)
return cls(builder)

def project(
def select(
self,
projection: list[Expression],
custom_resource_request: ResourceRequest = ResourceRequest(),
to_select: list[Expression],
) -> LogicalPlanBuilder:
projection_pyexprs = [expr._expr for expr in projection]
builder = self._builder.project(projection_pyexprs, custom_resource_request)
to_select_pyexprs = [expr._expr for expr in to_select]
builder = self._builder.select(to_select_pyexprs)
return LogicalPlanBuilder(builder)

def with_columns(self, columns: list[Expression], custom_resource_request: ResourceRequest) -> LogicalPlanBuilder:
column_pyexprs = [expr._expr for expr in columns]
builder = self._builder.with_columns(column_pyexprs, custom_resource_request)
return LogicalPlanBuilder(builder)

def exclude(self, to_exclude: list[str]) -> LogicalPlanBuilder:
builder = self._builder.exclude(to_exclude)
return LogicalPlanBuilder(builder)

def filter(self, predicate: Expression) -> LogicalPlanBuilder:
Expand All @@ -113,7 +121,7 @@ def count(self) -> LogicalPlanBuilder:
# TODO(Clark): Add dedicated logical/physical ops when introducing metadata-based count optimizations.
first_col = col(self.schema().column_names()[0])
builder = self._builder.aggregate([first_col.count(CountMode.All)._expr], [])
builder = builder.project([first_col.alias("count")._expr], ResourceRequest())
builder = builder.select([first_col.alias("count")._expr])
return LogicalPlanBuilder(builder)

def distinct(self) -> LogicalPlanBuilder:
Expand Down
1 change: 1 addition & 0 deletions docs/source/api_docs/dataframe.rst
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ Manipulating Columns

DataFrame.select
DataFrame.with_column
DataFrame.with_columns
DataFrame.exclude
DataFrame.explode

Expand Down
Loading

0 comments on commit 3582aa7

Please sign in to comment.