Skip to content

Commit

Permalink
Address comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
viirya committed Jan 22, 2021
1 parent d4f9457 commit c6e57c1
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 3 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ private[spark] class PipedRDD[T: ClassTag](
}
}

object PipedRDD {
private[spark] object PipedRDD {
// Split a string into words using a standard StringTokenizer
def tokenize(command: String): Seq[String] = {
val buf = new ArrayBuffer[String]
Expand Down
10 changes: 9 additions & 1 deletion sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2008,7 +2008,7 @@ class DatasetSuite extends QueryTest
checkAnswer(withUDF, Row(Row(1), null, null) :: Row(Row(1), null, null) :: Nil)
}

test("Pipe Dataset") {
test("SPARK-34205: Pipe Dataset") {
assume(TestUtils.testCommandAvailable("cat"))

val nums = spark.range(4)
Expand All @@ -2021,6 +2021,14 @@ class DatasetSuite extends QueryTest
assert(piped2(0).getString(0).trim == "2")
assert(piped2(1).getString(0).trim == "2")
}

test("SPARK-34205: pipe Dataset with empty partition") {
val data = Seq(123, 4567).toDF("num").repartition(8, $"num")
val piped = data.pipe("wc -l")
assert(piped.count == 8)
val lineCounts = piped.map(_.trim.toInt).collect().toSet
assert(Set(0, 1, 1) == lineCounts)
}
}

case class Bar(a: Int)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1265,7 +1265,7 @@ class StreamSuite extends StreamTest {
}
}

test("Pipe Streaming Dataset") {
test("SPARK-34205: Pipe Streaming Dataset") {
assume(TestUtils.testCommandAvailable("cat"))

val inputData = MemoryStream[Int]
Expand Down

0 comments on commit c6e57c1

Please sign in to comment.