Skip to content

Commit

Permalink
Use SPARK_INDEX_NAME_FORMAT in combine_frames to avoid ambiguity. (#1650
Browse files Browse the repository at this point in the history
)

Use `SPARK_INDEX_NAME_FORMAT` in `utils.combine_frames` to avoid ambiguity.

```py
>>> ks.options.compute.ops_on_diff_frames = True
>>> kdf = ks.DataFrame({"a": [1, 2, 3], "Koalas": [0, 1, 2]}).set_index("Koalas", drop=False)
>>> kdf.index.name = None
>>> kdf["NEW"] = ks.Series([100, 200, 300])
>>> kdf
Traceback (most recent call last):
...
pyspark.sql.utils.AnalysisException: Reference 'Koalas' is ambiguous, could be: Koalas, Koalas.;
```

Related to #1647 as well.
  • Loading branch information
ueshin authored Jul 15, 2020
1 parent 6361053 commit 156a4f5
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 5 deletions.
11 changes: 11 additions & 0 deletions databricks/koalas/tests/test_ops_on_diff_frames.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,17 @@ def test_assignment_series(self):

self.assert_eq(kdf.sort_index(), pdf.sort_index())

pdf = pd.DataFrame({"a": [1, 2, 3], "Koalas": [0, 1, 2]}).set_index("Koalas", drop=False)
kdf = ks.from_pandas(pdf)

kdf.index.name = None
kdf["NEW"] = ks.Series([100, 200, 300])

pdf.index.name = None
pdf["NEW"] = pd.Series([100, 200, 300])

self.assert_eq(kdf.sort_index(), pdf.sort_index())

def test_assignment_frame(self):
kdf = ks.from_pandas(self.pdf1)
pdf = self.pdf1
Expand Down
21 changes: 16 additions & 5 deletions databricks/koalas/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,11 @@ def combine_frames(this, *args, how="full", preserve_order_column=False):
"""
from databricks.koalas.config import get_option
from databricks.koalas.frame import DataFrame
from databricks.koalas.internal import NATURAL_ORDER_COLUMN_NAME
from databricks.koalas.internal import (
InternalFrame,
NATURAL_ORDER_COLUMN_NAME,
SPARK_INDEX_NAME_FORMAT,
)
from databricks.koalas.series import Series

if all(isinstance(arg, Series) for arg in args):
Expand Down Expand Up @@ -116,16 +120,22 @@ def combine_frames(this, *args, how="full", preserve_order_column=False):
that_sdf = that._internal.resolved_copy.spark_frame.alias("that")

# If the same named index is found, that's used.
for (this_column, this_name), (that_column, that_name) in this_and_that_index_map:
index_column_names = []
for i, ((this_column, this_name), (that_column, that_name)) in enumerate(
this_and_that_index_map
):
if this_name == that_name:
# We should merge the Spark columns into one
# to mimic pandas' behavior.
this_scol = scol_for(this_sdf, this_column)
that_scol = scol_for(that_sdf, that_column)
join_scol = this_scol == that_scol
join_scols.append(join_scol)

column_name = SPARK_INDEX_NAME_FORMAT(i)
index_column_names.append(column_name)
merged_index_scols.append(
F.when(this_scol.isNotNull(), this_scol).otherwise(that_scol).alias(this_column)
F.when(this_scol.isNotNull(), this_scol).otherwise(that_scol).alias(column_name)
)
else:
raise ValueError("Index names must be exactly matched currently.")
Expand Down Expand Up @@ -156,7 +166,7 @@ def combine_frames(this, *args, how="full", preserve_order_column=False):
+ order_column
)

index_columns = set(this._internal.index_spark_column_names)
index_columns = set(index_column_names)
new_data_columns = [
col
for col in joined_df.columns
Expand All @@ -179,8 +189,9 @@ def combine_frames(this, *args, how="full", preserve_order_column=False):
else None
)
return DataFrame(
this._internal.copy(
InternalFrame(
spark_frame=joined_df,
index_map=OrderedDict(zip(index_column_names, this._internal.index_names)),
column_labels=column_labels,
data_spark_columns=[scol_for(joined_df, col) for col in new_data_columns],
column_label_names=column_label_names,
Expand Down

0 comments on commit 156a4f5

Please sign in to comment.