Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core #29085

Closed
wants to merge 43 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
dfcec3c
[SPARK-32106][SQL]Implement SparkScriptTransformationExec in sql/core
AngersZhuuuu Jul 13, 2020
e53744b
save
AngersZhuuuu Jul 13, 2020
a693722
save
AngersZhuuuu Jul 13, 2020
5bfa669
follow comment
AngersZhuuuu Jul 14, 2020
ec754e2
fix input and out put format
AngersZhuuuu Jul 14, 2020
a2b12a1
follow comment
AngersZhuuuu Jul 15, 2020
c3dc66b
follow comment
AngersZhuuuu Jul 15, 2020
cb19b7b
follow comment
AngersZhuuuu Jul 17, 2020
ce8a0a5
fix bytetype and add it in UT
AngersZhuuuu Jul 17, 2020
d37ef86
format code
AngersZhuuuu Jul 17, 2020
fce25ff
Fix
maropu Jul 17, 2020
f3e05c6
Fix
maropu Jul 17, 2020
5c049b5
Merge pull request #5 from maropu/pr29085
AngersZhuuuu Jul 18, 2020
04684a8
fix UT and follow comment
AngersZhuuuu Jul 18, 2020
6811721
move ut and add ut for schema less
AngersZhuuuu Jul 18, 2020
fc96e1f
follow comment
AngersZhuuuu Jul 18, 2020
ed901af
Update BaseScriptTransformationExec.scala
AngersZhuuuu Jul 18, 2020
a6f1e7d
catch data convert exception
AngersZhuuuu Jul 18, 2020
e367c05
add UTD support
AngersZhuuuu Jul 18, 2020
e74d04c
add test
AngersZhuuuu Jul 18, 2020
4ef4d76
add data type
AngersZhuuuu Jul 19, 2020
22d223c
fix ut
AngersZhuuuu Jul 19, 2020
72b2155
added UT
AngersZhuuuu Jul 20, 2020
a3628ac
update
AngersZhuuuu Jul 20, 2020
e16c136
update title
AngersZhuuuu Jul 20, 2020
858f4e5
Update BaseScriptTransformationExec.scala
AngersZhuuuu Jul 20, 2020
cfecc90
support array map struct
AngersZhuuuu Jul 21, 2020
43d0f24
Revert "support array map struct"
AngersZhuuuu Jul 21, 2020
9e18fa8
fix SQLQueryTestSuite
AngersZhuuuu Jul 22, 2020
9537d9b
address comment
AngersZhuuuu Jul 22, 2020
5227441
Update BaseScriptTransformationExec.scala
AngersZhuuuu Jul 22, 2020
670f21b
Update BaseScriptTransformationSuite.scala
AngersZhuuuu Jul 22, 2020
ce8184a
address comment
AngersZhuuuu Jul 22, 2020
4615733
Update SparkScriptTransformationSuite.scala
AngersZhuuuu Jul 22, 2020
08d97c8
throw exception when complex data type
AngersZhuuuu Jul 22, 2020
33923b6
https://github.com/apache/spark/pull/29085#discussion_r458676081
AngersZhuuuu Jul 22, 2020
f5ec656
https://github.com/apache/spark/pull/29085#discussion_r458687735
AngersZhuuuu Jul 22, 2020
7916d72
https://github.com/apache/spark/pull/29085#discussion_r458692902
AngersZhuuuu Jul 22, 2020
a769aa7
address comment
AngersZhuuuu Jul 22, 2020
d93f7fa
add UT of row format and fi UT
AngersZhuuuu Jul 22, 2020
be80c27
address comment
AngersZhuuuu Jul 23, 2020
7f3cff8
Update PlanParserSuite.scala
AngersZhuuuu Jul 23, 2020
03d3409
address comment
AngersZhuuuu Jul 23, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class SparkPlanner(
Window ::
JoinSelection ::
InMemoryScans ::
SparkScripts::
BasicOperators :: Nil)

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution

import java.io._
import java.nio.charset.StandardCharsets

import scala.collection.JavaConverters._
import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration

import org.apache.spark.TaskContext
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.ScriptInputOutputSchema
import org.apache.spark.sql.types.DataType
import org.apache.spark.util.{CircularBuffer, RedirectThread}

/**
* Transforms the input by forking and running the specified script.
*
* @param input the set of expression that should be passed to the script.
* @param script the command that should be executed.
* @param output the attributes that are produced by the script.
*/
case class SparkScriptTransformationExec(
input: Seq[Expression],
script: String,
output: Seq[Attribute],
child: SparkPlan,
ioschema: SparkScriptIOSchema)
extends BaseScriptTransformationExec {

override def processIterator(inputIterator: Iterator[InternalRow], hadoopConf: Configuration)
: Iterator[InternalRow] = {
val cmd = List("/bin/bash", "-c", script)
Copy link
Member

@maropu maropu Jul 15, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the implementation of processIterator is pretty similar to the Hive one. Could we share the code between them more?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like the implementation of processIterator is pretty similar to the Hive one. Could we share the code between them more?

Yea, working on this

val builder = new ProcessBuilder(cmd.asJava)

val proc = builder.start()
val inputStream = proc.getInputStream
val outputStream = proc.getOutputStream
val errorStream = proc.getErrorStream

// In order to avoid deadlocks, we need to consume the error output of the child process.
// To avoid issues caused by large error output, we use a circular buffer to limit the amount
// of error output that we retain. See SPARK-7862 for more discussion of the deadlock / hang
// that motivates this.
val stderrBuffer = new CircularBuffer(2048)
new RedirectThread(
errorStream,
stderrBuffer,
"Thread-ScriptTransformation-STDERR-Consumer").start()
AngersZhuuuu marked this conversation as resolved.
Show resolved Hide resolved

val outputProjection = new InterpretedProjection(input, child.output)

// This new thread will consume the ScriptTransformation's input rows and write them to the
// external process. That process's output will be read by this current thread.
val writerThread = new ScriptTransformationWriterThread(
inputIterator.map(outputProjection),
input.map(_.dataType),
ioschema,
outputStream,
proc,
stderrBuffer,
TaskContext.get(),
hadoopConf
)

val reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))
val outputIterator: Iterator[InternalRow] = new Iterator[InternalRow] {
var curLine: String = null
val mutableRow = new SpecificInternalRow(output.map(_.dataType))

override def hasNext: Boolean = {
try {
if (curLine == null) {
curLine = reader.readLine()
if (curLine == null) {
checkFailureAndPropagate(writerThread, null, proc, stderrBuffer)
return false
}
}
true
} catch {
case NonFatal(e) =>
// If this exception is due to abrupt / unclean termination of `proc`,
// then detect it and propagate a better exception message for end users
checkFailureAndPropagate(writerThread, e, proc, stderrBuffer)

throw e
}
}

override def next(): InternalRow = {
if (!hasNext) {
throw new NoSuchElementException
}
val prevLine = curLine
curLine = reader.readLine()
if (!ioschema.schemaLess) {
new GenericInternalRow(
prevLine.split(ioschema.outputRowFormatMap("TOK_TABLEROWFORMATFIELD"))
.map(CatalystTypeConverters.convertToCatalyst))
} else {
new GenericInternalRow(
prevLine.split(ioschema.outputRowFormatMap("TOK_TABLEROWFORMATFIELD"), 2)
.map(CatalystTypeConverters.convertToCatalyst))
}
}
}

writerThread.start()

outputIterator
}
}

private class ScriptTransformationWriterThread(
iter: Iterator[InternalRow],
inputSchema: Seq[DataType],
ioSchema: SparkScriptIOSchema,
outputStream: OutputStream,
proc: Process,
stderrBuffer: CircularBuffer,
taskContext: TaskContext,
conf: Configuration)
extends BaseScriptTransformationWriterThread(
AngersZhuuuu marked this conversation as resolved.
Show resolved Hide resolved
iter,
inputSchema,
ioSchema,
outputStream,
proc,
stderrBuffer,
taskContext,
conf) {

setDaemon(true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can remove this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


override def processRows(): Unit = {
processRowsWithoutSerde()
}
}

object SparkScriptIOSchema {
def apply(input: ScriptInputOutputSchema): SparkScriptIOSchema = {
SparkScriptIOSchema(
input.inputRowFormat,
input.outputRowFormat,
input.inputSerdeClass,
input.outputSerdeClass,
input.inputSerdeProps,
input.outputSerdeProps,
input.recordReaderClass,
input.recordWriterClass,
input.schemaLess)
}
}

/**
* The wrapper class of Spark script transformation input and output schema properties
*/
case class SparkScriptIOSchema (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this class so big while it doesn't support hive serde?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this class so big while it doesn't support hive serde?

For this , I think we should change this after decide if need to implement serde in script of sql/core

Copy link

@alfozan alfozan Jul 14, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation here offers a very limited support for ROW FORMAT DELIMITED format - it does not rely on a Hive's SerDe class.

A complete implementation (SerDes class for ROW FORMAT DELIMITED) can be added later and will live in the same folder.
#29085 (comment)

inputRowFormat: Seq[(String, String)],
outputRowFormat: Seq[(String, String)],
inputSerdeClass: Option[String],
outputSerdeClass: Option[String],
inputSerdeProps: Seq[(String, String)],
outputSerdeProps: Seq[(String, String)],
recordReaderClass: Option[String],
recordWriterClass: Option[String],
schemaLess: Boolean) extends BaseScriptTransformIOSchema
AngersZhuuuu marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,21 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}
}

object SparkScripts extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.ScriptTransformation(input, script, output, child, ioschema)
if ioschema.inputSerdeClass.isEmpty && ioschema.outputSerdeClass.isEmpty =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to check this here? Seems like it has been checked in https://github.com/apache/spark/pull/29085/files#diff-9847f5cef7cf7fbc5830fbc6b779ee10R783-R784 ?

Yea, don't need now

SparkScriptTransformationExec(
input,
script,
output,
planLater(child),
SparkScriptIOSchema(ioschema)
) :: Nil
case _ => Nil
}
}

/**
* This strategy is just for explaining `Dataset/DataFrame` created by `spark.readStream`.
* It won't affect the execution, because `StreamingRelation` will be replaced with
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ object HiveScriptIOSchema {
}

/**
* The wrapper class of Hive input and output schema properties
* The wrapper class of Hive script transformation input and output schema properties
*/
case class HiveScriptIOSchema (
AngersZhuuuu marked this conversation as resolved.
Show resolved Hide resolved
inputRowFormat: Seq[(String, String)],
Expand Down
Loading