diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala new file mode 100644 index 0000000000000..22bf6df58b040 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import java.io.OutputStream +import java.nio.charset.StandardCharsets +import java.util.concurrent.TimeUnit + +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration + +import org.apache.spark.{SparkException, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{AttributeSet, UnsafeProjection} +import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.DataType +import org.apache.spark.util.{CircularBuffer, SerializableConfiguration, Utils} + +trait BaseScriptTransformationExec extends UnaryExecNode { + + override def producedAttributes: AttributeSet = outputSet -- inputSet + + override def outputPartitioning: Partitioning = child.outputPartitioning + + override def doExecute(): RDD[InternalRow] = { + val broadcastedHadoopConf = + new SerializableConfiguration(sqlContext.sessionState.newHadoopConf()) + + child.execute().mapPartitions { iter => + if (iter.hasNext) { + val proj = UnsafeProjection.create(schema) + processIterator(iter, broadcastedHadoopConf.value).map(proj) + } else { + // If the input iterator has no rows then do not launch the external script. + Iterator.empty + } + } + } + + def processIterator( + inputIterator: Iterator[InternalRow], + hadoopConf: Configuration): Iterator[InternalRow] + + protected def checkFailureAndPropagate( + writerThread: BaseScriptTransformationWriterThread, + cause: Throwable = null, + proc: Process, + stderrBuffer: CircularBuffer): Unit = { + if (writerThread.exception.isDefined) { + throw writerThread.exception.get + } + + // There can be a lag between reader read EOF and the process termination. + // If the script fails to startup, this kind of error may be missed. + // So explicitly waiting for the process termination. + val timeout = conf.getConf(SQLConf.SCRIPT_TRANSFORMATION_EXIT_TIMEOUT) + val exitRes = proc.waitFor(timeout, TimeUnit.SECONDS) + if (!exitRes) { + log.warn(s"Transformation script process exits timeout in $timeout seconds") + } + + if (!proc.isAlive) { + val exitCode = proc.exitValue() + if (exitCode != 0) { + logError(stderrBuffer.toString) // log the stderr circular buffer + throw new SparkException(s"Subprocess exited with status $exitCode. " + + s"Error: ${stderrBuffer.toString}", cause) + } + } + } +} + +abstract class BaseScriptTransformationWriterThread( + iter: Iterator[InternalRow], + inputSchema: Seq[DataType], + ioSchema: BaseScriptTransformIOSchema, + outputStream: OutputStream, + proc: Process, + stderrBuffer: CircularBuffer, + taskContext: TaskContext, + conf: Configuration) extends Thread with Logging { + + setDaemon(true) + + @volatile protected var _exception: Throwable = null + + /** Contains the exception thrown while writing the parent iterator to the external process. */ + def exception: Option[Throwable] = Option(_exception) + + protected def processRows(): Unit + + protected def processRowsWithoutSerde(): Unit = { + val len = inputSchema.length + iter.foreach { row => + val data = if (len == 0) { + ioSchema.inputRowFormatMap("TOK_TABLEROWFORMATLINES") + } else { + val sb = new StringBuilder + sb.append(row.get(0, inputSchema(0))) + var i = 1 + while (i < len) { + sb.append(ioSchema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD")) + sb.append(row.get(i, inputSchema(i))) + i += 1 + } + sb.append(ioSchema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")) + sb.toString() + } + outputStream.write(data.getBytes(StandardCharsets.UTF_8)) + } + } + + override def run(): Unit = Utils.logUncaughtExceptions { + TaskContext.setTaskContext(taskContext) + + // We can't use Utils.tryWithSafeFinally here because we also need a `catch` block, so + // let's use a variable to record whether the `finally` block was hit due to an exception + var threwException: Boolean = true + try { + processRows() + threwException = false + } catch { + // SPARK-25158 Exception should not be thrown again, otherwise it will be captured by + // SparkUncaughtExceptionHandler, then Executor will exit because of this Uncaught Exception, + // so pass the exception to `ScriptTransformationExec` is enough. + case t: Throwable => + // An error occurred while writing input, so kill the child process. According to the + // Javadoc this call will not throw an exception: + _exception = t + proc.destroy() + logError("Thread-ScriptTransformation-Feed exit cause by: ", t) + } finally { + try { + Utils.tryLogNonFatalError(outputStream.close()) + if (proc.waitFor() != 0) { + logError(stderrBuffer.toString) // log the stderr circular buffer + } + } catch { + case NonFatal(exceptionFromFinallyBlock) => + if (!threwException) { + throw exceptionFromFinallyBlock + } else { + log.error("Exception in finally block", exceptionFromFinallyBlock) + } + } + } + } +} + +/** + * The wrapper class of input and output schema properties + */ +abstract class BaseScriptTransformIOSchema extends Serializable { + import ScriptIOSchema._ + + def inputRowFormat: Seq[(String, String)] + + def outputRowFormat: Seq[(String, String)] + + def inputSerdeClass: Option[String] + + def outputSerdeClass: Option[String] + + def inputSerdeProps: Seq[(String, String)] + + def outputSerdeProps: Seq[(String, String)] + + def recordReaderClass: Option[String] + + def recordWriterClass: Option[String] + + def schemaLess: Boolean + + val inputRowFormatMap = inputRowFormat.toMap.withDefault((k) => defaultFormat(k)) + val outputRowFormatMap = outputRowFormat.toMap.withDefault((k) => defaultFormat(k)) +} + +object ScriptIOSchema { + val defaultFormat = Map( + ("TOK_TABLEROWFORMATFIELD", "\t"), + ("TOK_TABLEROWFORMATLINES", "\n") + ) +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala index e25610757a69b..78ec2b8e2047e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionStateBuilder.scala @@ -111,7 +111,8 @@ class HiveSessionStateBuilder(session: SparkSession, parentState: Option[Session override val sparkSession: SparkSession = session override def extraPlanningStrategies: Seq[Strategy] = - super.extraPlanningStrategies ++ customPlanningStrategies ++ Seq(HiveTableScans, Scripts) + super.extraPlanningStrategies ++ customPlanningStrategies ++ + Seq(HiveTableScans, HiveScripts) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 2b1eb05e22cc7..dae68df08f32e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils} import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.hive.execution._ +import org.apache.spark.sql.hive.execution.{HiveScriptIOSchema, HiveScriptTransformationExec} import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} @@ -240,11 +241,11 @@ private[hive] trait HiveStrategies { val sparkSession: SparkSession - object Scripts extends Strategy { + object HiveScripts extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case ScriptTransformation(input, script, output, child, ioschema) => val hiveIoSchema = HiveScriptIOSchema(ioschema) - ScriptTransformationExec(input, script, output, planLater(child), hiveIoSchema) :: Nil + HiveScriptTransformationExec(input, script, output, planLater(child), hiveIoSchema) :: Nil case _ => Nil } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala index c7183fd7385a6..96fe646d39fde 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.hive.execution import java.io._ import java.nio.charset.StandardCharsets import java.util.Properties -import java.util.concurrent.TimeUnit import javax.annotation.Nullable import scala.collection.JavaConverters._ @@ -33,19 +32,15 @@ import org.apache.hadoop.hive.serde2.AbstractSerDe import org.apache.hadoop.hive.serde2.objectinspector._ import org.apache.hadoop.io.Writable -import org.apache.spark.{SparkException, TaskContext} -import org.apache.spark.internal.Logging -import org.apache.spark.rdd.RDD +import org.apache.spark.TaskContext import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.ScriptInputOutputSchema -import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution._ import org.apache.spark.sql.hive.HiveInspectors import org.apache.spark.sql.hive.HiveShim._ -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.DataType -import org.apache.spark.util.{CircularBuffer, RedirectThread, SerializableConfiguration, Utils} +import org.apache.spark.util.{CircularBuffer, RedirectThread, Utils} /** * Transforms the input by forking and running the specified script. @@ -54,301 +49,211 @@ import org.apache.spark.util.{CircularBuffer, RedirectThread, SerializableConfig * @param script the command that should be executed. * @param output the attributes that are produced by the script. */ -case class ScriptTransformationExec( +case class HiveScriptTransformationExec( input: Seq[Expression], script: String, output: Seq[Attribute], child: SparkPlan, ioschema: HiveScriptIOSchema) - extends UnaryExecNode { - - override def producedAttributes: AttributeSet = outputSet -- inputSet - - override def outputPartitioning: Partitioning = child.outputPartitioning - - protected override def doExecute(): RDD[InternalRow] = { - def processIterator(inputIterator: Iterator[InternalRow], hadoopConf: Configuration) - : Iterator[InternalRow] = { - val cmd = List("/bin/bash", "-c", script) - val builder = new ProcessBuilder(cmd.asJava) - - val proc = builder.start() - val inputStream = proc.getInputStream - val outputStream = proc.getOutputStream - val errorStream = proc.getErrorStream - - // In order to avoid deadlocks, we need to consume the error output of the child process. - // To avoid issues caused by large error output, we use a circular buffer to limit the amount - // of error output that we retain. See SPARK-7862 for more discussion of the deadlock / hang - // that motivates this. - val stderrBuffer = new CircularBuffer(2048) - new RedirectThread( - errorStream, - stderrBuffer, - "Thread-ScriptTransformation-STDERR-Consumer").start() - - val outputProjection = new InterpretedProjection(input, child.output) - - // This nullability is a performance optimization in order to avoid an Option.foreach() call - // inside of a loop - @Nullable val (inputSerde, inputSoi) = ioschema.initInputSerDe(input).getOrElse((null, null)) - - // This new thread will consume the ScriptTransformation's input rows and write them to the - // external process. That process's output will be read by this current thread. - val writerThread = new ScriptTransformationWriterThread( - inputIterator.map(outputProjection), - input.map(_.dataType), - inputSerde, - inputSoi, - ioschema, - outputStream, - proc, - stderrBuffer, - TaskContext.get(), - hadoopConf - ) - - // This nullability is a performance optimization in order to avoid an Option.foreach() call - // inside of a loop - @Nullable val (outputSerde, outputSoi) = { - ioschema.initOutputSerDe(output).getOrElse((null, null)) - } - - val reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8)) - val outputIterator: Iterator[InternalRow] = new Iterator[InternalRow] with HiveInspectors { - var curLine: String = null - val scriptOutputStream = new DataInputStream(inputStream) - - @Nullable val scriptOutputReader = - ioschema.recordReader(scriptOutputStream, hadoopConf).orNull - - var scriptOutputWritable: Writable = null - val reusedWritableObject: Writable = if (null != outputSerde) { - outputSerde.getSerializedClass().getConstructor().newInstance() - } else { - null - } - val mutableRow = new SpecificInternalRow(output.map(_.dataType)) + extends BaseScriptTransformationExec { + + override def processIterator( + inputIterator: Iterator[InternalRow], + hadoopConf: Configuration): Iterator[InternalRow] = { + val cmd = List("/bin/bash", "-c", script) + val builder = new ProcessBuilder(cmd.asJava) + + val proc = builder.start() + val inputStream = proc.getInputStream + val outputStream = proc.getOutputStream + val errorStream = proc.getErrorStream + + // In order to avoid deadlocks, we need to consume the error output of the child process. + // To avoid issues caused by large error output, we use a circular buffer to limit the amount + // of error output that we retain. See SPARK-7862 for more discussion of the deadlock / hang + // that motivates this. + val stderrBuffer = new CircularBuffer(2048) + new RedirectThread( + errorStream, + stderrBuffer, + "Thread-ScriptTransformation-STDERR-Consumer").start() + + val outputProjection = new InterpretedProjection(input, child.output) + + // This nullability is a performance optimization in order to avoid an Option.foreach() call + // inside of a loop + @Nullable val (inputSerde, inputSoi) = ioschema.initInputSerDe(input).getOrElse((null, null)) + + // This new thread will consume the ScriptTransformation's input rows and write them to the + // external process. That process's output will be read by this current thread. + val writerThread = new HiveScriptTransformationWriterThread( + inputIterator.map(outputProjection), + input.map(_.dataType), + inputSerde, + inputSoi, + ioschema, + outputStream, + proc, + stderrBuffer, + TaskContext.get(), + hadoopConf + ) + + // This nullability is a performance optimization in order to avoid an Option.foreach() call + // inside of a loop + @Nullable val (outputSerde, outputSoi) = { + ioschema.initOutputSerDe(output).getOrElse((null, null)) + } - @transient - lazy val unwrappers = outputSoi.getAllStructFieldRefs.asScala.map(unwrapperFor) + val reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8)) + val outputIterator: Iterator[InternalRow] = new Iterator[InternalRow] with HiveInspectors { + var curLine: String = null + val scriptOutputStream = new DataInputStream(inputStream) - private def checkFailureAndPropagate(cause: Throwable = null): Unit = { - if (writerThread.exception.isDefined) { - throw writerThread.exception.get - } + @Nullable val scriptOutputReader = + ioschema.recordReader(scriptOutputStream, hadoopConf).orNull - // There can be a lag between reader read EOF and the process termination. - // If the script fails to startup, this kind of error may be missed. - // So explicitly waiting for the process termination. - val timeout = conf.getConf(SQLConf.SCRIPT_TRANSFORMATION_EXIT_TIMEOUT) - val exitRes = proc.waitFor(timeout, TimeUnit.SECONDS) - if (!exitRes) { - log.warn(s"Transformation script process exits timeout in $timeout seconds") - } + var scriptOutputWritable: Writable = null + val reusedWritableObject: Writable = if (null != outputSerde) { + outputSerde.getSerializedClass().getConstructor().newInstance() + } else { + null + } + val mutableRow = new SpecificInternalRow(output.map(_.dataType)) - if (!proc.isAlive) { - val exitCode = proc.exitValue() - if (exitCode != 0) { - logError(stderrBuffer.toString) // log the stderr circular buffer - throw new SparkException(s"Subprocess exited with status $exitCode. " + - s"Error: ${stderrBuffer.toString}", cause) - } - } - } + @transient + lazy val unwrappers = outputSoi.getAllStructFieldRefs.asScala.map(unwrapperFor) - override def hasNext: Boolean = { - try { - if (outputSerde == null) { + override def hasNext: Boolean = { + try { + if (outputSerde == null) { + if (curLine == null) { + curLine = reader.readLine() if (curLine == null) { - curLine = reader.readLine() - if (curLine == null) { - checkFailureAndPropagate() - return false - } + checkFailureAndPropagate(writerThread, null, proc, stderrBuffer) + return false } - } else if (scriptOutputWritable == null) { - scriptOutputWritable = reusedWritableObject + } + } else if (scriptOutputWritable == null) { + scriptOutputWritable = reusedWritableObject - if (scriptOutputReader != null) { - if (scriptOutputReader.next(scriptOutputWritable) <= 0) { - checkFailureAndPropagate() + if (scriptOutputReader != null) { + if (scriptOutputReader.next(scriptOutputWritable) <= 0) { + checkFailureAndPropagate(writerThread, null, proc, stderrBuffer) + return false + } + } else { + try { + scriptOutputWritable.readFields(scriptOutputStream) + } catch { + case _: EOFException => + // This means that the stdout of `proc` (ie. TRANSFORM process) has exhausted. + // Ideally the proc should *not* be alive at this point but + // there can be a lag between EOF being written out and the process + // being terminated. So explicitly waiting for the process to be done. + checkFailureAndPropagate(writerThread, null, proc, stderrBuffer) return false - } - } else { - try { - scriptOutputWritable.readFields(scriptOutputStream) - } catch { - case _: EOFException => - // This means that the stdout of `proc` (ie. TRANSFORM process) has exhausted. - // Ideally the proc should *not* be alive at this point but - // there can be a lag between EOF being written out and the process - // being terminated. So explicitly waiting for the process to be done. - checkFailureAndPropagate() - return false - } } } + } - true - } catch { - case NonFatal(e) => - // If this exception is due to abrupt / unclean termination of `proc`, - // then detect it and propagate a better exception message for end users - checkFailureAndPropagate(e) + true + } catch { + case NonFatal(e) => + // If this exception is due to abrupt / unclean termination of `proc`, + // then detect it and propagate a better exception message for end users + checkFailureAndPropagate(writerThread, e, proc, stderrBuffer) - throw e - } + throw e } + } - override def next(): InternalRow = { - if (!hasNext) { - throw new NoSuchElementException + override def next(): InternalRow = { + if (!hasNext) { + throw new NoSuchElementException + } + if (outputSerde == null) { + val prevLine = curLine + curLine = reader.readLine() + if (!ioschema.schemaLess) { + new GenericInternalRow( + prevLine.split(ioschema.outputRowFormatMap("TOK_TABLEROWFORMATFIELD")) + .map(CatalystTypeConverters.convertToCatalyst)) + } else { + new GenericInternalRow( + prevLine.split(ioschema.outputRowFormatMap("TOK_TABLEROWFORMATFIELD"), 2) + .map(CatalystTypeConverters.convertToCatalyst)) } - if (outputSerde == null) { - val prevLine = curLine - curLine = reader.readLine() - if (!ioschema.schemaLess) { - new GenericInternalRow( - prevLine.split(ioschema.outputRowFormatMap("TOK_TABLEROWFORMATFIELD")) - .map(CatalystTypeConverters.convertToCatalyst)) + } else { + val raw = outputSerde.deserialize(scriptOutputWritable) + scriptOutputWritable = null + val dataList = outputSoi.getStructFieldsDataAsList(raw) + var i = 0 + while (i < dataList.size()) { + if (dataList.get(i) == null) { + mutableRow.setNullAt(i) } else { - new GenericInternalRow( - prevLine.split(ioschema.outputRowFormatMap("TOK_TABLEROWFORMATFIELD"), 2) - .map(CatalystTypeConverters.convertToCatalyst)) - } - } else { - val raw = outputSerde.deserialize(scriptOutputWritable) - scriptOutputWritable = null - val dataList = outputSoi.getStructFieldsDataAsList(raw) - var i = 0 - while (i < dataList.size()) { - if (dataList.get(i) == null) { - mutableRow.setNullAt(i) - } else { - unwrappers(i)(dataList.get(i), mutableRow, i) - } - i += 1 + unwrappers(i)(dataList.get(i), mutableRow, i) } - mutableRow + i += 1 } + mutableRow } } - - writerThread.start() - - outputIterator } - val broadcastedHadoopConf = - new SerializableConfiguration(sqlContext.sessionState.newHadoopConf()) + writerThread.start() - child.execute().mapPartitions { iter => - if (iter.hasNext) { - val proj = UnsafeProjection.create(schema) - processIterator(iter, broadcastedHadoopConf.value).map(proj) - } else { - // If the input iterator has no rows then do not launch the external script. - Iterator.empty - } - } + outputIterator } } -private class ScriptTransformationWriterThread( +private class HiveScriptTransformationWriterThread( iter: Iterator[InternalRow], inputSchema: Seq[DataType], @Nullable inputSerde: AbstractSerDe, @Nullable inputSoi: StructObjectInspector, - ioschema: HiveScriptIOSchema, + ioSchema: HiveScriptIOSchema, outputStream: OutputStream, proc: Process, stderrBuffer: CircularBuffer, taskContext: TaskContext, - conf: Configuration - ) extends Thread("Thread-ScriptTransformation-Feed") with HiveInspectors with Logging { - - setDaemon(true) - - @volatile private var _exception: Throwable = null - - /** Contains the exception thrown while writing the parent iterator to the external process. */ - def exception: Option[Throwable] = Option(_exception) - - override def run(): Unit = Utils.logUncaughtExceptions { - TaskContext.setTaskContext(taskContext) - + conf: Configuration) + extends BaseScriptTransformationWriterThread( + iter, + inputSchema, + ioSchema, + outputStream, + proc, + stderrBuffer, + taskContext, + conf) with HiveInspectors { + + override def processRows(): Unit = { val dataOutputStream = new DataOutputStream(outputStream) - @Nullable val scriptInputWriter = ioschema.recordWriter(dataOutputStream, conf).orNull - - // We can't use Utils.tryWithSafeFinally here because we also need a `catch` block, so - // let's use a variable to record whether the `finally` block was hit due to an exception - var threwException: Boolean = true - val len = inputSchema.length - try { - if (inputSerde == null) { - iter.foreach { row => - val data = if (len == 0) { - ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES") - } else { - val sb = new StringBuilder - sb.append(row.get(0, inputSchema(0))) - var i = 1 - while (i < len) { - sb.append(ioschema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD")) - sb.append(row.get(i, inputSchema(i))) - i += 1 - } - sb.append(ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")) - sb.toString() - } - outputStream.write(data.getBytes(StandardCharsets.UTF_8)) + @Nullable val scriptInputWriter = ioSchema.recordWriter(dataOutputStream, conf).orNull + + if (inputSerde == null) { + processRowsWithoutSerde() + } else { + // Convert Spark InternalRows to hive data via `HiveInspectors.wrapperFor`. + val hiveData = new Array[Any](inputSchema.length) + val fieldOIs = inputSoi.getAllStructFieldRefs.asScala.map(_.getFieldObjectInspector).toArray + val wrappers = fieldOIs.zip(inputSchema).map { case (f, dt) => wrapperFor(f, dt) } + + iter.foreach { row => + var i = 0 + while (i < fieldOIs.length) { + hiveData(i) = if (row.isNullAt(i)) null else wrappers(i)(row.get(i, inputSchema(i))) + i += 1 } - } else { - // Convert Spark InternalRows to hive data via `HiveInspectors.wrapperFor`. - val hiveData = new Array[Any](inputSchema.length) - val fieldOIs = inputSoi.getAllStructFieldRefs.asScala.map(_.getFieldObjectInspector).toArray - val wrappers = fieldOIs.zip(inputSchema).map { case (f, dt) => wrapperFor(f, dt) } - iter.foreach { row => - var i = 0 - while (i < fieldOIs.length) { - hiveData(i) = if (row.isNullAt(i)) null else wrappers(i)(row.get(i, inputSchema(i))) - i += 1 - } - - val writable = inputSerde.serialize(hiveData, inputSoi) - if (scriptInputWriter != null) { - scriptInputWriter.write(writable) - } else { - prepareWritable(writable, ioschema.outputSerdeProps).write(dataOutputStream) - } - } - } - threwException = false - } catch { - // SPARK-25158 Exception should not be thrown again, otherwise it will be captured by - // SparkUncaughtExceptionHandler, then Executor will exit because of this Uncaught Exception, - // so pass the exception to `ScriptTransformationExec` is enough. - case t: Throwable => - // An error occurred while writing input, so kill the child process. According to the - // Javadoc this call will not throw an exception: - _exception = t - proc.destroy() - logError("Thread-ScriptTransformation-Feed exit cause by: ", t) - } finally { - try { - Utils.tryLogNonFatalError(outputStream.close()) - if (proc.waitFor() != 0) { - logError(stderrBuffer.toString) // log the stderr circular buffer + val writable = inputSerde.serialize(hiveData, inputSoi) + if (scriptInputWriter != null) { + scriptInputWriter.write(writable) + } else { + prepareWritable(writable, ioSchema.outputSerdeProps).write(dataOutputStream) } - } catch { - case NonFatal(exceptionFromFinallyBlock) => - if (!threwException) { - throw exceptionFromFinallyBlock - } else { - log.error("Exception in finally block", exceptionFromFinallyBlock) - } } } } @@ -382,16 +287,7 @@ case class HiveScriptIOSchema ( recordReaderClass: Option[String], recordWriterClass: Option[String], schemaLess: Boolean) - extends HiveInspectors { - - private val defaultFormat = Map( - ("TOK_TABLEROWFORMATFIELD", "\t"), - ("TOK_TABLEROWFORMATLINES", "\n") - ) - - val inputRowFormatMap = inputRowFormat.toMap.withDefault((k) => defaultFormat(k)) - val outputRowFormatMap = outputRowFormat.toMap.withDefault((k) => defaultFormat(k)) - + extends BaseScriptTransformIOSchema with HiveInspectors { def initInputSerDe(input: Seq[Expression]): Option[(AbstractSerDe, StructObjectInspector)] = { inputSerdeClass.map { serdeClass => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala similarity index 94% rename from sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala rename to sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala index b97eb869a9e54..35252fc47f49f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationSuite.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types.StringType -class ScriptTransformationSuite extends SparkPlanTest with SQLTestUtils with TestHiveSingleton +class HiveScriptTransformationSuite extends SparkPlanTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach { import spark.implicits._ @@ -83,7 +83,7 @@ class ScriptTransformationSuite extends SparkPlanTest with SQLTestUtils with Tes val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") checkAnswer( rowsDf, - (child: SparkPlan) => new ScriptTransformationExec( + (child: SparkPlan) => new HiveScriptTransformationExec( input = Seq(rowsDf.col("a").expr), script = "cat", output = Seq(AttributeReference("a", StringType)()), @@ -100,7 +100,7 @@ class ScriptTransformationSuite extends SparkPlanTest with SQLTestUtils with Tes val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") checkAnswer( rowsDf, - (child: SparkPlan) => new ScriptTransformationExec( + (child: SparkPlan) => new HiveScriptTransformationExec( input = Seq(rowsDf.col("a").expr), script = "cat", output = Seq(AttributeReference("a", StringType)()), @@ -118,7 +118,7 @@ class ScriptTransformationSuite extends SparkPlanTest with SQLTestUtils with Tes val e = intercept[TestFailedException] { checkAnswer( rowsDf, - (child: SparkPlan) => new ScriptTransformationExec( + (child: SparkPlan) => new HiveScriptTransformationExec( input = Seq(rowsDf.col("a").expr), script = "cat", output = Seq(AttributeReference("a", StringType)()), @@ -139,7 +139,7 @@ class ScriptTransformationSuite extends SparkPlanTest with SQLTestUtils with Tes val e = intercept[TestFailedException] { checkAnswer( rowsDf, - (child: SparkPlan) => new ScriptTransformationExec( + (child: SparkPlan) => new HiveScriptTransformationExec( input = Seq(rowsDf.col("a").expr), script = "cat", output = Seq(AttributeReference("a", StringType)()), @@ -160,7 +160,7 @@ class ScriptTransformationSuite extends SparkPlanTest with SQLTestUtils with Tes val e = intercept[SparkException] { val plan = - new ScriptTransformationExec( + new HiveScriptTransformationExec( input = Seq(rowsDf.col("a").expr), script = "some_non_existent_command", output = Seq(AttributeReference("a", StringType)()), @@ -181,7 +181,7 @@ class ScriptTransformationSuite extends SparkPlanTest with SQLTestUtils with Tes checkAnswer( rowsDf, - (child: SparkPlan) => new ScriptTransformationExec( + (child: SparkPlan) => new HiveScriptTransformationExec( input = Seq(rowsDf.col("name").expr), script = "cat", output = Seq(AttributeReference("name", StringType)()), @@ -234,7 +234,7 @@ class ScriptTransformationSuite extends SparkPlanTest with SQLTestUtils with Tes val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") val e = intercept[SparkException] { val plan = - new ScriptTransformationExec( + new HiveScriptTransformationExec( input = Seq(rowsDf.col("a").expr), script = "some_non_existent_command", output = Seq(AttributeReference("a", StringType)()), @@ -252,7 +252,7 @@ class ScriptTransformationSuite extends SparkPlanTest with SQLTestUtils with Tes val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a") val e = intercept[SparkException] { val plan = - new ScriptTransformationExec( + new HiveScriptTransformationExec( input = Seq(rowsDf.col("a").expr), script = "some_non_existent_command", output = Seq(AttributeReference("a", StringType)()),