diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala index 24a43e4d4dddb..df03a2174dd9d 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala @@ -230,7 +230,7 @@ private[spark] class PipedRDD[T: ClassTag]( } } -object PipedRDD { +private[spark] object PipedRDD { // Split a string into words using a standard StringTokenizer def tokenize(command: String): Seq[String] = { val buf = new ArrayBuffer[String] diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index c3ce1a71a81dd..cef05924bce74 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -2008,7 +2008,7 @@ class DatasetSuite extends QueryTest checkAnswer(withUDF, Row(Row(1), null, null) :: Row(Row(1), null, null) :: Nil) } - test("Pipe Dataset") { + test("SPARK-34205: Pipe Dataset") { assume(TestUtils.testCommandAvailable("cat")) val nums = spark.range(4) @@ -2021,6 +2021,14 @@ class DatasetSuite extends QueryTest assert(piped2(0).getString(0).trim == "2") assert(piped2(1).getString(0).trim == "2") } + + test("SPARK-34205: pipe Dataset with empty partition") { + val data = Seq(123, 4567).toDF("num").repartition(8, $"num") + val piped = data.pipe("wc -l") + assert(piped.count == 8) + val lineCounts = piped.map(_.trim.toInt).collect().toSet + assert(Set(0, 1, 1) == lineCounts) + } } case class Bar(a: Int) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index 37d4bd8b8edc1..55290dd7098ec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -1265,7 +1265,7 @@ class StreamSuite extends StreamTest { } } - test("Pipe Streaming Dataset") { + test("SPARK-34205: Pipe Streaming Dataset") { assume(TestUtils.testCommandAvailable("cat")) val inputData = MemoryStream[Int]