Skip to content

Commit

Permalink
[SPARK-32105][SQL] Refactor current ScriptTransformationExec code
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

 * Renamed  hive transform scrip class `hive/execution/ScriptTransformationExec` to `hive/execution/HiveScriptTransformationExec` (don't rename file)
 * Extract class `BaseScriptTransformationExec ` about common code used across `SparkScriptTransformationExec(next pr add this)` and `HiveScriptTransformationExec`
 * Extract class `BaseScriptTransformationWriterThread` of writing data thread across `SparkScriptTransformationWriterThread(added next for support transform in sql/core )`  and  `HiveScriptTransformationWriterThread` ,
 * `HiveScriptTransformationWriterThread` additionally supports Hive serde format
 * Rename current `Script` strategies in hive module to `HiveScript`, in next pr will add `SparkScript` strategies for support transform in sql/core.

Todo List;

- Support transform in sql/core base on `BaseScriptTransformationExec`, which would run script operator in SQL mode (without Hive).
The output of script would be read as a string and column values are extracted by using a delimiter (default : tab character)
- For Hive, by default only serde's must be used, and without hive we can run without serde
- Cleanup past hacks that are observed (and people suggest / report), such as
       - [Solve string value error about Date/Timestamp in ScriptTransform](https://issues.apache.org/jira/browse/SPARK-31947)
       - [support use transform with aggregation](https://issues.apache.org/jira/browse/SPARK-28227)
       - [support array/map as transform's input](https://issues.apache.org/jira/browse/SPARK-22435)
- Use code-gen projection to serialize rows to output stream()

### Why are the changes needed?
Support run transform in SQL mode without hive

### Does this PR introduce any user-facing change?
Yes

### How was this patch tested?
Added UT

Closes #27983 from AngersZhuuuu/follow_spark_15694.

Authored-by: angerszhu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
AngersZhuuuu authored and cloud-fan committed Jul 13, 2020
1 parent b6229df commit 6d49964
Show file tree
Hide file tree
Showing 5 changed files with 380 additions and 280 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution

import java.io.OutputStream
import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit

import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration

import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{AttributeSet, UnsafeProjection}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DataType
import org.apache.spark.util.{CircularBuffer, SerializableConfiguration, Utils}

trait BaseScriptTransformationExec extends UnaryExecNode {

override def producedAttributes: AttributeSet = outputSet -- inputSet

override def outputPartitioning: Partitioning = child.outputPartitioning

override def doExecute(): RDD[InternalRow] = {
val broadcastedHadoopConf =
new SerializableConfiguration(sqlContext.sessionState.newHadoopConf())

child.execute().mapPartitions { iter =>
if (iter.hasNext) {
val proj = UnsafeProjection.create(schema)
processIterator(iter, broadcastedHadoopConf.value).map(proj)
} else {
// If the input iterator has no rows then do not launch the external script.
Iterator.empty
}
}
}

def processIterator(
inputIterator: Iterator[InternalRow],
hadoopConf: Configuration): Iterator[InternalRow]

protected def checkFailureAndPropagate(
writerThread: BaseScriptTransformationWriterThread,
cause: Throwable = null,
proc: Process,
stderrBuffer: CircularBuffer): Unit = {
if (writerThread.exception.isDefined) {
throw writerThread.exception.get
}

// There can be a lag between reader read EOF and the process termination.
// If the script fails to startup, this kind of error may be missed.
// So explicitly waiting for the process termination.
val timeout = conf.getConf(SQLConf.SCRIPT_TRANSFORMATION_EXIT_TIMEOUT)
val exitRes = proc.waitFor(timeout, TimeUnit.SECONDS)
if (!exitRes) {
log.warn(s"Transformation script process exits timeout in $timeout seconds")
}

if (!proc.isAlive) {
val exitCode = proc.exitValue()
if (exitCode != 0) {
logError(stderrBuffer.toString) // log the stderr circular buffer
throw new SparkException(s"Subprocess exited with status $exitCode. " +
s"Error: ${stderrBuffer.toString}", cause)
}
}
}
}

abstract class BaseScriptTransformationWriterThread(
iter: Iterator[InternalRow],
inputSchema: Seq[DataType],
ioSchema: BaseScriptTransformIOSchema,
outputStream: OutputStream,
proc: Process,
stderrBuffer: CircularBuffer,
taskContext: TaskContext,
conf: Configuration) extends Thread with Logging {

setDaemon(true)

@volatile protected var _exception: Throwable = null

/** Contains the exception thrown while writing the parent iterator to the external process. */
def exception: Option[Throwable] = Option(_exception)

protected def processRows(): Unit

protected def processRowsWithoutSerde(): Unit = {
val len = inputSchema.length
iter.foreach { row =>
val data = if (len == 0) {
ioSchema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")
} else {
val sb = new StringBuilder
sb.append(row.get(0, inputSchema(0)))
var i = 1
while (i < len) {
sb.append(ioSchema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"))
sb.append(row.get(i, inputSchema(i)))
i += 1
}
sb.append(ioSchema.inputRowFormatMap("TOK_TABLEROWFORMATLINES"))
sb.toString()
}
outputStream.write(data.getBytes(StandardCharsets.UTF_8))
}
}

override def run(): Unit = Utils.logUncaughtExceptions {
TaskContext.setTaskContext(taskContext)

// We can't use Utils.tryWithSafeFinally here because we also need a `catch` block, so
// let's use a variable to record whether the `finally` block was hit due to an exception
var threwException: Boolean = true
try {
processRows()
threwException = false
} catch {
// SPARK-25158 Exception should not be thrown again, otherwise it will be captured by
// SparkUncaughtExceptionHandler, then Executor will exit because of this Uncaught Exception,
// so pass the exception to `ScriptTransformationExec` is enough.
case t: Throwable =>
// An error occurred while writing input, so kill the child process. According to the
// Javadoc this call will not throw an exception:
_exception = t
proc.destroy()
logError("Thread-ScriptTransformation-Feed exit cause by: ", t)
} finally {
try {
Utils.tryLogNonFatalError(outputStream.close())
if (proc.waitFor() != 0) {
logError(stderrBuffer.toString) // log the stderr circular buffer
}
} catch {
case NonFatal(exceptionFromFinallyBlock) =>
if (!threwException) {
throw exceptionFromFinallyBlock
} else {
log.error("Exception in finally block", exceptionFromFinallyBlock)
}
}
}
}
}

/**
* The wrapper class of input and output schema properties
*/
abstract class BaseScriptTransformIOSchema extends Serializable {
import ScriptIOSchema._

def inputRowFormat: Seq[(String, String)]

def outputRowFormat: Seq[(String, String)]

def inputSerdeClass: Option[String]

def outputSerdeClass: Option[String]

def inputSerdeProps: Seq[(String, String)]

def outputSerdeProps: Seq[(String, String)]

def recordReaderClass: Option[String]

def recordWriterClass: Option[String]

def schemaLess: Boolean

val inputRowFormatMap = inputRowFormat.toMap.withDefault((k) => defaultFormat(k))
val outputRowFormatMap = outputRowFormat.toMap.withDefault((k) => defaultFormat(k))
}

object ScriptIOSchema {
val defaultFormat = Map(
("TOK_TABLEROWFORMATFIELD", "\t"),
("TOK_TABLEROWFORMATLINES", "\n")
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session
override val sparkSession: SparkSession = session

override def extraPlanningStrategies: Seq[Strategy] =
super.extraPlanningStrategies ++ customPlanningStrategies ++ Seq(HiveTableScans, Scripts)
super.extraPlanningStrategies ++ customPlanningStrategies ++
Seq(HiveTableScans, HiveScripts)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils}
import org.apache.spark.sql.execution.datasources.CreateTable
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.hive.execution.{HiveScriptIOSchema, HiveScriptTransformationExec}
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}


Expand Down Expand Up @@ -240,11 +241,11 @@ private[hive] trait HiveStrategies {

val sparkSession: SparkSession

object Scripts extends Strategy {
object HiveScripts extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case ScriptTransformation(input, script, output, child, ioschema) =>
val hiveIoSchema = HiveScriptIOSchema(ioschema)
ScriptTransformationExec(input, script, output, planLater(child), hiveIoSchema) :: Nil
HiveScriptTransformationExec(input, script, output, planLater(child), hiveIoSchema) :: Nil
case _ => Nil
}
}
Expand Down
Loading

0 comments on commit 6d49964

Please sign in to comment.