Skip to content

Commit

Permalink
Revert "[SPARK-45620][PYTHON] Fix user-facing APIs related to Python …
Browse files Browse the repository at this point in the history
…UDTF to use camelCase"

This reverts commit e3ba9cf.

Closes apache#43485 from LuciferYang/revert-SPARK-45620.

Authored-by: yangjie01 <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
  • Loading branch information
LuciferYang committed Oct 23, 2023
1 parent 959382f commit 3de9ebc
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 77 deletions.
22 changes: 11 additions & 11 deletions python/docs/source/user_guide/sql/python_udtf.rst
Original file line number Diff line number Diff line change
Expand Up @@ -77,29 +77,29 @@ To implement a Python UDTF, you first need to define a class implementing the me
the particular UDTF call under consideration. Each parameter is an instance of the
`AnalyzeArgument` class, which contains fields including the provided argument's data
type and value (in the case of literal scalar arguments only). For table arguments, the
`isTable` field is set to true and the `dataType` field is a StructType representing
`is_table` field is set to true and the `data_type` field is a StructType representing
the table's column types:
dataType: DataType
data_type: DataType
value: Optional[Any]
isTable: bool
is_table: bool
This method returns an instance of the `AnalyzeResult` class which includes the result
table's schema as a StructType. If the UDTF accepts an input table argument, then the
`AnalyzeResult` can also include a requested way to partition the rows of the input
table across several UDTF calls. If `withSinglePartition` is set to True, the query
table across several UDTF calls. If `with_single_partition` is set to True, the query
planner will arrange a repartitioning operation from the previous execution stage such
that all rows of the input table are consumed by the `eval` method from exactly one
instance of the UDTF class. On the other hand, if the `partitionBy` list is non-empty,
instance of the UDTF class. On the other hand, if the `partition_by` list is non-empty,
the query planner will arrange a repartitioning such that all rows with each unique
combination of values of the partitioning columns are consumed by a separate unique
instance of the UDTF class. If `orderBy` is non-empty, this specifies the requested
instance of the UDTF class. If `order_by` is non-empty, this specifies the requested
ordering of rows within each partition.
schema: StructType
withSinglePartition: bool = False
partitionBy: Sequence[PartitioningColumn] = field(default_factory=tuple)
orderBy: Sequence[OrderingColumn] = field(default_factory=tuple)
with_single_partition: bool = False
partition_by: Sequence[PartitioningColumn] = field(default_factory=tuple)
order_by: Sequence[OrderingColumn] = field(default_factory=tuple)
Examples
--------
Expand All @@ -116,7 +116,7 @@ To implement a Python UDTF, you first need to define a class implementing the me
>>> def analyze(self, *args) -> AnalyzeResult:
... assert len(args) == 1, "This function accepts one argument only"
... assert args[0].dataType == StringType(), "Only string arguments are supported"
... assert args[0].data_type == StringType(), "Only string arguments are supported"
... text = args[0]
... schema = StructType()
... for index, word in enumerate(text.split(" ")):
Expand All @@ -128,7 +128,7 @@ To implement a Python UDTF, you first need to define a class implementing the me
>>> def analyze(self, **kwargs) -> AnalyzeResult:
... assert len(kwargs) == 1, "This function accepts one argument only"
... assert "text" in kwargs, "An argument named 'text' is required"
... assert kwargs["text"].dataType == StringType(), "Only strings are supported"
... assert kwargs["text"].data_type == StringType(), "Only strings are supported"
... text = args["text"]
... schema = StructType()
... for index, word in enumerate(text.split(" ")):
Expand Down
12 changes: 6 additions & 6 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -17184,9 +17184,9 @@ def udtf(

- The number and order of arguments are the same as the UDTF inputs
- Each argument is a :class:`pyspark.sql.udtf.AnalyzeArgument`, containing:
- dataType: DataType
- data_type: DataType
- value: Any: the calculated value if the argument is foldable; otherwise None
- isTable: bool: True if the argument is a table argument
- is_table: bool: True if the argument is a table argument

and return a :class:`pyspark.sql.udtf.AnalyzeResult`, containing.

Expand All @@ -17198,7 +17198,7 @@ def udtf(
... class TestUDTFWithAnalyze:
... @staticmethod
... def analyze(a: AnalyzeArgument, b: AnalyzeArgument) -> AnalyzeResult:
... return AnalyzeResult(StructType().add("a", a.dataType).add("b", b.dataType))
... return AnalyzeResult(StructType().add("a", a.data_type).add("b", b.data_type))
...
... def eval(self, a, b):
... yield a, b
Expand All @@ -17219,9 +17219,9 @@ def udtf(
... a: AnalyzeArgument, b: AnalyzeArgument, **kwargs: AnalyzeArgument
... ) -> AnalyzeResult:
... return AnalyzeResult(
... StructType().add("a", a.dataType)
... .add("b", b.dataType)
... .add("x", kwargs["x"].dataType)
... StructType().add("a", a.data_type)
... .add("b", b.data_type)
... .add("x", kwargs["x"].data_type)
... )
...
... def eval(self, a, b, **kwargs):
Expand Down
84 changes: 42 additions & 42 deletions python/pyspark/sql/tests/test_udtf.py
Original file line number Diff line number Diff line change
Expand Up @@ -1244,10 +1244,10 @@ class TestUDTF:
@staticmethod
def analyze(a: AnalyzeArgument) -> AnalyzeResult:
assert isinstance(a, AnalyzeArgument)
assert isinstance(a.dataType, DataType)
assert isinstance(a.data_type, DataType)
assert a.value is not None
assert a.isTable is False
return AnalyzeResult(StructType().add("a", a.dataType))
assert a.is_table is False
return AnalyzeResult(StructType().add("a", a.data_type))

def eval(self, a):
yield a,
Expand Down Expand Up @@ -1333,7 +1333,7 @@ def test_udtf_with_analyze_multiple_arguments(self):
class TestUDTF:
@staticmethod
def analyze(a: AnalyzeArgument, b: AnalyzeArgument) -> AnalyzeResult:
return AnalyzeResult(StructType().add("a", a.dataType).add("b", b.dataType))
return AnalyzeResult(StructType().add("a", a.data_type).add("b", b.data_type))

def eval(self, a, b):
yield a, b
Expand Down Expand Up @@ -1364,7 +1364,7 @@ class TestUDTF:
@staticmethod
def analyze(*args: AnalyzeArgument) -> AnalyzeResult:
return AnalyzeResult(
StructType([StructField(f"col{i}", a.dataType) for i, a in enumerate(args)])
StructType([StructField(f"col{i}", a.data_type) for i, a in enumerate(args)])
)

def eval(self, *args):
Expand Down Expand Up @@ -1397,10 +1397,10 @@ class TestUDTF:
@staticmethod
def analyze(a: AnalyzeArgument) -> AnalyzeResult:
assert isinstance(a, AnalyzeArgument)
assert isinstance(a.dataType, StructType)
assert isinstance(a.data_type, StructType)
assert a.value is None
assert a.isTable is True
return AnalyzeResult(StructType().add("a", a.dataType[0].dataType))
assert a.is_table is True
return AnalyzeResult(StructType().add("a", a.data_type[0].dataType))

def eval(self, a: Row):
if a["id"] > 5:
Expand All @@ -1417,9 +1417,9 @@ def test_udtf_with_analyze_table_argument_adding_columns(self):
class TestUDTF:
@staticmethod
def analyze(a: AnalyzeArgument) -> AnalyzeResult:
assert isinstance(a.dataType, StructType)
assert a.isTable is True
return AnalyzeResult(a.dataType.add("is_even", BooleanType()))
assert isinstance(a.data_type, StructType)
assert a.is_table is True
return AnalyzeResult(a.data_type.add("is_even", BooleanType()))

def eval(self, a: Row):
yield a["id"], a["id"] % 2 == 0
Expand Down Expand Up @@ -1449,11 +1449,11 @@ def analyze(n, row) -> AnalyzeResult:
if n.value is None or not isinstance(n.value, int) or (n.value < 1 or n.value > 10):
raise Exception("The first argument must be a scalar integer between 1 and 10")

if row.isTable is False:
if row.is_table is False:
raise Exception("The second argument must be a table argument")

assert isinstance(row.dataType, StructType)
return AnalyzeResult(row.dataType)
assert isinstance(row.data_type, StructType)
return AnalyzeResult(row.data_type)

def eval(self, n: int, row: Row):
for _ in range(n):
Expand Down Expand Up @@ -1604,7 +1604,7 @@ def test_udtf_with_analyze_null_literal(self):
class TestUDTF:
@staticmethod
def analyze(a: AnalyzeArgument) -> AnalyzeResult:
return AnalyzeResult(StructType().add("a", a.dataType))
return AnalyzeResult(StructType().add("a", a.data_type))

def eval(self, a):
yield a,
Expand All @@ -1619,7 +1619,7 @@ def test_udtf_with_analyze_taking_wrong_number_of_arguments(self):
class TestUDTF:
@staticmethod
def analyze(a: AnalyzeArgument, b: AnalyzeArgument) -> AnalyzeResult:
return AnalyzeResult(StructType().add("a", a.dataType).add("b", b.dataType))
return AnalyzeResult(StructType().add("a", a.data_type).add("b", b.data_type))

def eval(self, a):
yield a, a + 1
Expand Down Expand Up @@ -1675,7 +1675,7 @@ def test_udtf_with_analyze_using_broadcast(self):
class TestUDTF:
@staticmethod
def analyze(a: AnalyzeArgument) -> AnalyzeResult:
return AnalyzeResult(StructType().add(colname.value, a.dataType))
return AnalyzeResult(StructType().add(colname.value, a.data_type))

def eval(self, a):
assert colname.value == "col1"
Expand All @@ -1700,7 +1700,7 @@ class TestUDTF:
@staticmethod
def analyze(a: AnalyzeArgument) -> AnalyzeResult:
test_accum.add(1)
return AnalyzeResult(StructType().add("col1", a.dataType))
return AnalyzeResult(StructType().add("col1", a.data_type))

def eval(self, a):
test_accum.add(10)
Expand Down Expand Up @@ -1739,7 +1739,7 @@ def call_my_func() -> str:

@staticmethod
def analyze(a: AnalyzeArgument) -> AnalyzeResult:
return AnalyzeResult(StructType().add(TestUDTF.call_my_func(), a.dataType))
return AnalyzeResult(StructType().add(TestUDTF.call_my_func(), a.data_type))

def eval(self, a):
assert TestUDTF.call_my_func() == "col1"
Expand Down Expand Up @@ -1779,7 +1779,7 @@ def call_my_func() -> str:

@staticmethod
def analyze(a: AnalyzeArgument) -> AnalyzeResult:
return AnalyzeResult(StructType().add(TestUDTF.call_my_func(), a.dataType))
return AnalyzeResult(StructType().add(TestUDTF.call_my_func(), a.data_type))

def eval(self, a):
assert TestUDTF.call_my_func() == "col1"
Expand Down Expand Up @@ -1826,7 +1826,7 @@ def read_my_archive() -> str:

@staticmethod
def analyze(a: AnalyzeArgument) -> AnalyzeResult:
return AnalyzeResult(StructType().add(TestUDTF.read_my_archive(), a.dataType))
return AnalyzeResult(StructType().add(TestUDTF.read_my_archive(), a.data_type))

def eval(self, a):
assert TestUDTF.read_my_archive() == "col1"
Expand Down Expand Up @@ -1867,7 +1867,7 @@ def read_my_file() -> str:

@staticmethod
def analyze(a: AnalyzeArgument) -> AnalyzeResult:
return AnalyzeResult(StructType().add(TestUDTF.read_my_file(), a.dataType))
return AnalyzeResult(StructType().add(TestUDTF.read_my_file(), a.data_type))

def eval(self, a):
assert TestUDTF.read_my_file() == "col1"
Expand Down Expand Up @@ -1967,15 +1967,15 @@ def test_udtf_with_analyze_kwargs(self):
class TestUDTF:
@staticmethod
def analyze(**kwargs: AnalyzeArgument) -> AnalyzeResult:
assert isinstance(kwargs["a"].dataType, IntegerType)
assert isinstance(kwargs["a"].data_type, IntegerType)
assert kwargs["a"].value == 10
assert not kwargs["a"].isTable
assert isinstance(kwargs["b"].dataType, StringType)
assert not kwargs["a"].is_table
assert isinstance(kwargs["b"].data_type, StringType)
assert kwargs["b"].value == "x"
assert not kwargs["b"].isTable
assert not kwargs["b"].is_table
return AnalyzeResult(
StructType(
[StructField(key, arg.dataType) for key, arg in sorted(kwargs.items())]
[StructField(key, arg.data_type) for key, arg in sorted(kwargs.items())]
)
)

Expand All @@ -2000,7 +2000,7 @@ def test_udtf_with_named_arguments_lateral_join(self):
class TestUDTF:
@staticmethod
def analyze(a, b):
return AnalyzeResult(StructType().add("a", a.dataType))
return AnalyzeResult(StructType().add("a", a.data_type))

def eval(self, a, b):
yield a,
Expand Down Expand Up @@ -2028,18 +2028,18 @@ def test_udtf_with_named_arguments_and_defaults(self):
class TestUDTF:
@staticmethod
def analyze(a: AnalyzeArgument, b: Optional[AnalyzeArgument] = None):
assert isinstance(a.dataType, IntegerType)
assert isinstance(a.data_type, IntegerType)
assert a.value == 10
assert not a.isTable
assert not a.is_table
if b is not None:
assert isinstance(b.dataType, StringType)
assert isinstance(b.data_type, StringType)
assert b.value == "z"
assert not b.isTable
schema = StructType().add("a", a.dataType)
assert not b.is_table
schema = StructType().add("a", a.data_type)
if b is None:
return AnalyzeResult(schema.add("b", IntegerType()))
else:
return AnalyzeResult(schema.add("b", b.dataType))
return AnalyzeResult(schema.add("b", b.data_type))

def eval(self, a, b=100):
yield a, b
Expand Down Expand Up @@ -2298,8 +2298,8 @@ def analyze(*args, **kwargs):
.add("count", IntegerType())
.add("total", IntegerType())
.add("last", IntegerType()),
withSinglePartition=True,
orderBy=[OrderingColumn("input"), OrderingColumn("partition_col")],
with_single_partition=True,
order_by=[OrderingColumn("input"), OrderingColumn("partition_col")],
)

def eval(self, row: Row):
Expand Down Expand Up @@ -2352,8 +2352,8 @@ def analyze(*args, **kwargs):
.add("count", IntegerType())
.add("total", IntegerType())
.add("last", IntegerType()),
partitionBy=[PartitioningColumn("partition_col")],
orderBy=[
partition_by=[PartitioningColumn("partition_col")],
order_by=[
OrderingColumn(name="input", ascending=True, overrideNullsFirst=False)
],
)
Expand Down Expand Up @@ -2433,16 +2433,16 @@ def __init__(self, analyze_result=None):
def analyze(argument, _):
if (
argument.value is None
or argument.isTable
or argument.is_table
or not isinstance(argument.value, str)
or len(argument.value) == 0
):
raise Exception("The first argument must be non-empty string")
assert argument.dataType == StringType()
assert not argument.isTable
assert argument.data_type == StringType()
assert not argument.is_table
return AnalyzeResultWithBuffer(
schema=StructType().add("total", IntegerType()).add("buffer", StringType()),
withSinglePartition=True,
with_single_partition=True,
buffer=argument.value,
)

Expand Down
24 changes: 12 additions & 12 deletions python/pyspark/sql/udtf.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,17 @@ class AnalyzeArgument:
Parameters
----------
dataType : :class:`DataType`
data_type : :class:`DataType`
The argument's data type
value : Optional[Any]
The calculated value if the argument is foldable; otherwise None
isTable : bool
is_table : bool
If True, the argument is a table argument.
"""

dataType: DataType
data_type: DataType
value: Optional[Any]
isTable: bool
is_table: bool


@dataclass(frozen=True)
Expand Down Expand Up @@ -97,25 +97,25 @@ class AnalyzeResult:
----------
schema : :class:`StructType`
The schema that the Python UDTF will return.
withSinglePartition : bool
with_single_partition : bool
If true, the UDTF is specifying for Catalyst to repartition all rows of the input TABLE
argument to one collection for consumption by exactly one instance of the correpsonding
UDTF class.
partitionBy : Sequence[PartitioningColumn]
partition_by : Sequence[PartitioningColumn]
If non-empty, this is a sequence of columns that the UDTF is specifying for Catalyst to
partition the input TABLE argument by. In this case, calls to the UDTF may not include any
explicit PARTITION BY clause, in which case Catalyst will return an error. This option is
mutually exclusive with 'withSinglePartition'.
orderBy: Sequence[OrderingColumn]
mutually exclusive with 'with_single_partition'.
order_by: Sequence[OrderingColumn]
If non-empty, this is a sequence of columns that the UDTF is specifying for Catalyst to
sort the input TABLE argument by. Note that the 'partitionBy' list must also be non-empty
sort the input TABLE argument by. Note that the 'partition_by' list must also be non-empty
in this case.
"""

schema: StructType
withSinglePartition: bool = False
partitionBy: Sequence[PartitioningColumn] = field(default_factory=tuple)
orderBy: Sequence[OrderingColumn] = field(default_factory=tuple)
with_single_partition: bool = False
partition_by: Sequence[PartitioningColumn] = field(default_factory=tuple)
order_by: Sequence[OrderingColumn] = field(default_factory=tuple)


def _create_udtf(
Expand Down
Loading

0 comments on commit 3de9ebc

Please sign in to comment.