Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-34205][SQL][SS] Add pipe to Dataset to enable Streaming Dataset pipe #31296

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ private[spark] class PipedRDD[T: ClassTag](
}
}

private object PipedRDD {
private[spark] object PipedRDD {
// Split a string into words using a standard StringTokenizer
def tokenize(command: String): Seq[String] = {
val buf = new ArrayBuffer[String]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.api.java.function.FilterFunction
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.{Encoder, Row}
import org.apache.spark.sql.{Encoder, Encoders, Row}
import org.apache.spark.sql.catalyst.analysis.UnresolvedDeserializer
import org.apache.spark.sql.catalyst.encoders._
import org.apache.spark.sql.catalyst.expressions._
Expand Down Expand Up @@ -585,3 +585,32 @@ case class CoGroup(
outputObjAttr: Attribute,
left: LogicalPlan,
right: LogicalPlan) extends BinaryNode with ObjectProducer

object PipeElements {
def apply[T : Encoder](
command: String,
printElement: (Any, String => Unit) => Unit,
child: LogicalPlan): LogicalPlan = {
val deserialized = CatalystSerde.deserialize[T](child)
implicit val encoder = Encoders.STRING
val piped = PipeElements(
implicitly[Encoder[T]].clsTag.runtimeClass,
implicitly[Encoder[T]].schema,
CatalystSerde.generateObjAttr[String],
command,
printElement,
deserialized)
CatalystSerde.serialize[String](piped)
}
}

/**
* A relation produced by piping elements to a forked external process.
*/
case class PipeElements[T](
argumentClass: Class[_],
argumentSchema: StructType,
outputObjAttr: Attribute,
command: String,
printElement: (Any, String => Unit) => Unit,
child: LogicalPlan) extends ObjectConsumer with ObjectProducer
29 changes: 29 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2889,6 +2889,35 @@ class Dataset[T] private[sql](
flatMap(func)(encoder)
}

/**
* Return a new Dataset of string created by piping elements to a forked external process.
* The resulting Dataset is computed by executing the given process once per partition.
* All elements of each input partition are written to a process's stdin as lines of input
* separated by a newline. The resulting partition consists of the process's stdout output, with
* each line of stdout resulting in one element of the output partition. A process is invoked
* even for empty partitions.
dongjoon-hyun marked this conversation as resolved.
Show resolved Hide resolved
*
* Note that for micro-batch streaming Dataset, the effect of pipe is only per micro-batch, not
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd kindly explain the case they need to be careful, like e.g. If your external process does aggregation on inputs, the aggregation is applied per a partition in micro-batch. You may want to aggregate these outputs after calling pipe to get global aggregation across partitions and also across micro-batches.

* cross entire stream. If your external process does aggregation-like on inputs, e.g. `wc -l`,
* the aggregation is applied per a partition in micro-batch. You may want to aggregate these
* outputs after calling pipe to get global aggregation across partitions and also across
* micro-batches.
*
* @param command command to run in forked process.
* @param printElement Use this function to customize how to pipe elements. This function
* will be called with each Dataset element as the 1st parameter, and the
* print line function (like out.println()) as the 2nd parameter.
* @group typedrel
* @since 3.2.0
*/
def pipe(command: String, printElement: (T, String => Unit) => Unit): Dataset[String] = {
Copy link
Contributor

@HeartSaVioR HeartSaVioR Jan 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see all examples are simply calling print function with converted string. Could we simply get serializer func like serializeFn: (T => String) instead, or have two overloaded methods allowing both cases if we are unsure printElement might be necessary in some cases? This should simplify the test codes and actual user codes (as _.toString would simply work).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is okay. Most cases there should be no difference. Only difference might be when we want to print out multi-lines per obj:

def printElement(obj: T, printFunc: String => Unit) = {
  printFunc(obj.a)
  printFunc(obj.b)
  ...
}
def serializeFn(obj: T): String = {
  s"${obj.a}\n${obj.b}\n..."
}

I'm fine with either one as they are working the same effect although taking different form.

implicit val stringEncoder = Encoders.STRING
withTypedPlan[String](PipeElements[T](
command,
printElement.asInstanceOf[(Any, String => Unit) => Unit],
logicalPlan))
}

/**
* Applies a function `f` to all rows.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.python.MapInPandasExec(func, output, planLater(child)) :: Nil
case logical.MapElements(f, _, _, objAttr, child) =>
execution.MapElementsExec(f, objAttr, planLater(child)) :: Nil
case logical.PipeElements(_, _, objAttr, command, printElement, child) =>
execution.PipeElementsExec(objAttr, command, printElement, planLater(child)) :: Nil
case logical.AppendColumns(f, _, _, in, out, child) =>
execution.AppendColumnsExec(f, in, out, planLater(child)) :: Nil
case logical.AppendColumnsWithObject(f, childSer, newSer, child) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.language.existentials
import org.apache.spark.api.java.function.MapFunction
import org.apache.spark.api.r._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.rdd.{PipedRDD, RDD}
import org.apache.spark.sql.Row
import org.apache.spark.sql.api.r.SQLUtils._
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -624,3 +624,34 @@ case class CoGroupExec(
}
}
}

/**
* Piping elements to a forked external process.
* The output of its child must be a single-field row containing the input object.
*/
case class PipeElementsExec(
outputObjAttr: Attribute,
command: String,
printElement: (Any, String => Unit) => Unit,
child: SparkPlan)
extends ObjectConsumerExec with ObjectProducerExec {

override protected def doExecute(): RDD[InternalRow] = {
val getObject = ObjectOperator.unwrapObjectFromRow(child.output.head.dataType)
val printRDDElement: (InternalRow, String => Unit) => Unit = (row, printFunc) => {
val obj = getObject(row)
printElement(obj, printFunc)
}

child.execute()
.pipe(command = PipedRDD.tokenize(command), printRDDElement = printRDDElement)
.mapPartitionsInternal { iter =>
val outputObject = ObjectOperator.wrapObjectToRow(outputObjectType)
iter.map(e => outputObject(e))
}
}

override def outputOrdering: Seq[SortOrder] = child.outputOrdering

override def outputPartitioning: Partitioning = child.outputPartitioning
}
50 changes: 49 additions & 1 deletion sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.scalatest.Assertions._
import org.scalatest.exceptions.TestFailedException
import org.scalatest.prop.TableDrivenPropertyChecks._

import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.{SparkException, TaskContext, TestUtils}
import org.apache.spark.sql.catalyst.{FooClassWithEnum, FooEnum, ScroogeLikeExample}
import org.apache.spark.sql.catalyst.encoders.{OuterScopes, RowEncoder}
import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi}
Expand Down Expand Up @@ -2007,6 +2007,54 @@ class DatasetSuite extends QueryTest

checkAnswer(withUDF, Row(Row(1), null, null) :: Row(Row(1), null, null) :: Nil)
}

test("SPARK-34205: Pipe Dataset") {
assume(TestUtils.testCommandAvailable("cat"))

val nums = spark.range(4)
val piped = nums.pipe("cat", (l, printFunc) => printFunc(l.toString)).toDF
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@viirya, what do you think about we expose an transform equivalent expression exposed as DSL? e.g.)

scala> val data = Seq((123, "first"), (4567, "second")).toDF("num", "word")
data: org.apache.spark.sql.DataFrame = [num: int, word: string]

scala> data.createOrReplaceTempView("t1")

scala> sql("select transform(*) using 'cat' from t1").show()
+----+------+
| key| value|
+----+------+
| 123| first|
|4567|second|
+----+------+
scala> data.repartition(1).createOrReplaceTempView("t1")

scala> sql("select transform(*) using 'wc -l' as (echo) from t1").show()
+--------+
|    echo|
+--------+
|       2|
+--------+

Spark lately added the native support of script transformation, and I think it could do what you want.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could address most of comments here such as #31296 (comment), being typed or non-standard stuff (as it follows Hive's feature) - at least we have one format to follow, etc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great if we can leverage the script transform #29414.

Copy link
Contributor

@HeartSaVioR HeartSaVioR Jan 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great point! I don't know how exhaustive Spark implements the Hive's transform feature, but the description in Hive's manual for transform looks pretty much powerful, and much beyond on what we plan to provide with pipe.

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Transform#LanguageManualTransform-Transform/Map-ReduceSyntax

Looks like the reason of absence of pipe in DataFrame is obvious - transform just replaced it. (Not valid as it was only available for Hive support) That looks to be only available in SQL statement so we still need DSL support for using it in SS.

Copy link
Member Author

@viirya viirya Jan 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered transform at the beginning as it looks close to pipe. I don't pick it for this because I only see it is exposed as SQL syntax and I am not sure if it works for streaming Dataset? Another reason is that it is designed for untyped Dataset. So if you want to pipe complex object T with custom output instead of column-wise output, "transform" isn't as powerful as "pipe".

Although I asked our customer and they only use primitive type Dataset for now. So untyped Dataset should be enough for the purpose.

Another reason is although the query looks like "SELECT TRANSFORM(...) FROM ...", it is actually not an expression but implemented as an operator. If we have it as DSL expression, there will be some problems.

Unlike Window function, it seems to me that we cannot have a query like "SELECT a, TRANSFORM(...), c FROM ..." or in DSL format like:

df.select($"a", $"b", transform(...) ...)

But for Window function we can do:

df.select($"a", $"b", lead("key", 1).over(window) ...)

That being said, in the end it is also Dataset.transform, instead of an expression DSL.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see what @viirya said. I'd agree that transform looks to behave as an operation (not sure that is intended or not, but looks like at least for now) and transform also requires top level API to cover up like we did for mapPartition.

If we are OK to add the top level API (again not yet decided so just a 2 cents) then which one? I'd rather say transform is something we'd like to be consistent with, instead of pipe. They have been exposed as SQL statement, and probably used widely for Spark SQL users, and even Hive users. If we want feature parity then my vote goes to transform.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the top-level API, you mean Plan node like CollectSet or other thing?

@AngersZhuuuu The top-level API here means the new API added in Dataset.

Can you share how to make transformation as an expression? I don't think It is an expression at all.

@viirya Sure. I followed the comment "I have thought this problem too, first I want to add transform as a DSL function, in this way, we need to make an equivalent ScriptTransformation expression first. We can think that this is just a new expression, or a new function" from @AngersZhuuuu. To add a new expression ScriptTransformationExpression for ScriptTransformation and turn to ScriptTransformationExec.

Two limitations here might need more discussion:

  • The script transformation may produce more than one row for a single row, so it cannot use together with other expressions.
  • The script in hive transformation is partition-based, but if we make it an expression, it becomes row based.

Copy link
Member Author

@viirya viirya Jan 26, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @HeartSaVioR. At least I am glad that the discussion can go forward no matter which one you prefer to add.

Honestly I think transform is a weird stuff and it is only for to have pipe feature under Hive SQL syntax. I don't like the transform syntax which is inconvenient to use and verbose. It is not as flexible as pipe's custom print-out function. BTW, for typed dataset, because transform is for untyped, so it is bound to its serialization row format. In the early discussion there are some comments against that, although it is clarified later pipe doesn't suffer from this issue.

If we still cannot get a consensus, maybe I should raise a discussion on dev mailing list to decide pipe or transform top-level API should be added.

@xuanyuanking @AngersZhuuuu The SQL syntax of transform "SELECT TRANSFORM(...)" is pretty confusing. It looks like expression but actually it is an operator, and IMHO you cannot turn it to an expression. If you force it to be an expression, you will create some inconsistency and weird cases. transform is like pipe and their input/output relation is not 1:1 or N:1 but arbitrary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, just a clarify, we mean we can add an expression (or function?) like TRANSFORM, not convert TRANSFORM to it. And we can extract some common logic with ScriptTransformationExec. The usage such as

 script_transform(input, script, output)

input can be a list of input col such as a, b, c
out put can a define such as col1 string, col2 Int
and the return type is Array<Struct<col1: String, col2: Int>> (This DataType can cover all case, and let user to handle)

Then when execute we can make it just run as default format such as ROW FORMAT DELIMIT
A simple and general way to implement and then we can add it as a DSL.

Copy link
Member Author

@viirya viirya Jan 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think this expression can cover all kind of external process output? Transform and pipe have arbitrary relation between input and output. External process can output a line for each input line, can do aggregation-like output like wc -l, can output a line per 2 or 3 input lines, etc. I don't know how do you define an expression that the output type is not deterministic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: any reason for toDF (as pipe gives a Dataset[String])?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is just for checkAnswer.


checkAnswer(piped, Row("0") :: Row("1") :: Row("2") :: Row("3") :: Nil)

val piped2 = nums.pipe("wc -l", (l, printFunc) => printFunc(l.toString)).toDF.collect()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Why toDF?

assert(piped2.size == 2)
assert(piped2(0).getString(0).trim == "2")
assert(piped2(1).getString(0).trim == "2")
}

test("SPARK-34205: Pipe DataFrame") {
assume(TestUtils.testCommandAvailable("cat"))

val data = Seq((123, "first"), (4567, "second")).toDF("num", "word")

def printElement(row: Row, printFunc: (String) => Unit): Unit = {
val line = s"num: ${row.getInt(0)}, word: ${row.getString(1)}"
printFunc.apply(line)
}
val piped = data.pipe("cat", printElement).toDF
checkAnswer(piped, Row("num: 123, word: first") :: Row("num: 4567, word: second") :: Nil)
}

test("SPARK-34205: Pipe complex type Dataset") {
assume(TestUtils.testCommandAvailable("cat"))

val data = Seq(DoubleData(123, "first"), DoubleData(4567, "second")).toDS

def printElement(data: DoubleData, printFunc: (String) => Unit): Unit = {
val line = s"num: ${data.id}, word: ${data.val1}"
printFunc.apply(line)
}
val piped = data.pipe("cat", printElement).toDF
checkAnswer(piped, Row("num: 123, word: first") :: Row("num: 4567, word: second") :: Nil)
}

test("SPARK-34205: pipe Dataset with empty partition") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for making this sure!

val data = Seq(123, 4567).toDF("num").repartition(8, $"num")
val piped = data.pipe("wc -l", (row, printFunc) => printFunc(row.getInt(0).toString))
assert(piped.count == 8)
val lineCounts = piped.map(_.trim.toInt).collect().toSet
assert(Set(0, 1, 1) == lineCounts)
}
}

case class Bar(a: Int)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1264,6 +1264,20 @@ class StreamSuite extends StreamTest {
}
}
}

test("SPARK-34205: Pipe Streaming Dataset") {
assume(TestUtils.testCommandAvailable("cat"))

val inputData = MemoryStream[Int]
val piped = inputData.toDS()
.pipe("cat", (n, printFunc) => printFunc(n.toString)).toDF
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Why toDF?


testStream(piped)(
AddData(inputData, 1, 2, 3),
CheckAnswer(Row("1"), Row("2"), Row("3")),
AddData(inputData, 4),
CheckNewAnswer(Row("4")))
}
}

abstract class FakeSource extends StreamSourceProvider {
Expand Down