From aab856b05f77651b43675471fbaa4f53ae4efb77 Mon Sep 17 00:00:00 2001 From: Adam Binford Date: Mon, 23 Sep 2024 06:33:49 -0400 Subject: [PATCH] Cleanup --- .../execution/python/BaseGroupedArrowPythonRunner.scala | 3 +-- .../sql/execution/python/CoGroupedArrowPythonRunner.scala | 7 +++---- .../sql/execution/python/GroupedArrowPythonRunner.scala | 3 +-- 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BaseGroupedArrowPythonRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BaseGroupedArrowPythonRunner.scala index 73523652c90ac..c88c39444932d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BaseGroupedArrowPythonRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/BaseGroupedArrowPythonRunner.scala @@ -33,7 +33,6 @@ import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.Utils - /** * Python UDF Runner for grouped udfs. */ @@ -103,7 +102,7 @@ abstract class BaseGroupedArrowPythonRunner[IN]( group: Iterator[InternalRow], schema: StructType, dataOut: DataOutputStream, - name: String): Unit = { + name: Option[String] = None): Unit = { val arrowSchema = ArrowUtils.toArrowSchema( schema, timeZoneId, errorOnDuplicatedFieldNames = true, largeVarTypes) val allocator = ArrowUtils.rootAllocator.newChildAllocator( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala index d9d256bf89a7f..c084db1c3b1e1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/CoGroupedArrowPythonRunner.scala @@ -47,14 +47,13 @@ class CoGroupedArrowPythonRunner( pythonMetrics, jobArtifactUUID, profiler) { override protected def writeNextGroup( - group: (Iterator[InternalRow], - Iterator[InternalRow]), + group: (Iterator[InternalRow], Iterator[InternalRow]), dataOut: DataOutputStream): Unit = { val (leftGroup, rightGroup) = group dataOut.writeInt(2) - writeSingleStream(leftGroup, leftSchema, dataOut, "left") - writeSingleStream(rightGroup, rightSchema, dataOut, "right") + writeSingleStream(leftGroup, leftSchema, dataOut, Some("left")) + writeSingleStream(rightGroup, rightSchema, dataOut, Some("right")) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/GroupedArrowPythonRunner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/GroupedArrowPythonRunner.scala index 31af7ea2ee1e6..4d6b5454391f7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/GroupedArrowPythonRunner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/GroupedArrowPythonRunner.scala @@ -25,7 +25,6 @@ import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.types.StructType - /** * Python UDF Runner for grouped udfs. */ @@ -49,6 +48,6 @@ class GroupedArrowPythonRunner( group: Iterator[InternalRow], dataOut: DataOutputStream): Unit = { dataOut.writeInt(1) - writeSingleStream(group, schema, dataOut, "batch") + writeSingleStream(group, schema, dataOut) } }