diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala index 69b5b493394be..3606c31ae314d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveScriptTransformationExec.scala @@ -53,84 +53,8 @@ case class HiveScriptTransformationExec( output: Seq[Attribute], child: SparkPlan, ioschema: ScriptTransformationIOSchema) - extends BaseScriptTransformationExec with HiveInspectors { - - private def initInputSerDe( - input: Seq[Expression]): Option[(AbstractSerDe, StructObjectInspector)] = { - ioschema.inputSerdeClass.map { serdeClass => - val (columns, columnTypes) = parseAttrs(input) - val serde = initSerDe(serdeClass, columns, columnTypes, ioschema.inputSerdeProps) - val fieldObjectInspectors = columnTypes.map(toInspector) - val objectInspector = ObjectInspectorFactory - .getStandardStructObjectInspector(columns.asJava, fieldObjectInspectors.asJava) - (serde, objectInspector) - } - } - - private def initOutputSerDe( - output: Seq[Attribute]): Option[(AbstractSerDe, StructObjectInspector)] = { - ioschema.outputSerdeClass.map { serdeClass => - val (columns, columnTypes) = parseAttrs(output) - val serde = initSerDe(serdeClass, columns, columnTypes, ioschema.outputSerdeProps) - val structObjectInspector = serde.getObjectInspector().asInstanceOf[StructObjectInspector] - (serde, structObjectInspector) - } - } - - private def parseAttrs(attrs: Seq[Expression]): (Seq[String], Seq[DataType]) = { - val columns = attrs.zipWithIndex.map(e => s"${e._1.prettyName}_${e._2}") - val columnTypes = attrs.map(_.dataType) - (columns, columnTypes) - } - - private def initSerDe( - serdeClassName: String, - columns: Seq[String], - columnTypes: Seq[DataType], - serdeProps: Seq[(String, String)]): AbstractSerDe = { - - val serde = Utils.classForName[AbstractSerDe](serdeClassName).getConstructor(). - newInstance() - - val columnTypesNames = columnTypes.map(_.toTypeInfo.getTypeName()).mkString(",") - - var propsMap = serdeProps.toMap + (serdeConstants.LIST_COLUMNS -> columns.mkString(",")) - propsMap = propsMap + (serdeConstants.LIST_COLUMN_TYPES -> columnTypesNames) - - val properties = new Properties() - // Can not use properties.putAll(propsMap.asJava) in scala-2.12 - // See https://github.com/scala/bug/issues/10418 - propsMap.foreach { case (k, v) => properties.put(k, v) } - serde.initialize(null, properties) - - serde - } - - private def recordReader( - inputStream: InputStream, - conf: Configuration): Option[RecordReader] = { - ioschema.recordReaderClass.map { klass => - val instance = Utils.classForName[RecordReader](klass).getConstructor(). - newInstance() - val props = new Properties() - // Can not use props.putAll(outputSerdeProps.toMap.asJava) in scala-2.12 - // See https://github.com/scala/bug/issues/10418 - ioschema.outputSerdeProps.toMap.foreach { case (k, v) => props.put(k, v) } - instance.initialize(inputStream, conf, props) - instance - } - } - - private def recordWriter( - outputStream: OutputStream, - conf: Configuration): Option[RecordWriter] = { - ioschema.recordWriterClass.map { klass => - val instance = Utils.classForName[RecordWriter](klass).getConstructor(). - newInstance() - instance.initialize(outputStream, conf) - instance - } - } + extends BaseScriptTransformationExec { + import HiveScriptIOSchema._ private def createOutputIteratorWithSerde( writerThread: BaseScriptTransformationWriterThread, @@ -144,7 +68,8 @@ case class HiveScriptTransformationExec( var curLine: String = null val scriptOutputStream = new DataInputStream(inputStream) - @Nullable val scriptOutputReader = recordReader(scriptOutputStream, hadoopConf).orNull + @Nullable val scriptOutputReader = + recordReader(ioschema, scriptOutputStream, hadoopConf).orNull var scriptOutputWritable: Writable = null val reusedWritableObject = outputSerde.getSerializedClass.getConstructor().newInstance() @@ -218,7 +143,7 @@ case class HiveScriptTransformationExec( // This nullability is a performance optimization in order to avoid an Option.foreach() call // inside of a loop - @Nullable val (inputSerde, inputSoi) = initInputSerDe(input).getOrElse((null, null)) + @Nullable val (inputSerde, inputSoi) = initInputSerDe(ioschema, input).getOrElse((null, null)) // For HiveScriptTransformationExec, if inputSerde == null, but outputSerde != null // We will use StringBuffer to pass data, in this case, we should cast data as string too. @@ -249,7 +174,7 @@ case class HiveScriptTransformationExec( // This nullability is a performance optimization in order to avoid an Option.foreach() call // inside of a loop @Nullable val (outputSerde, outputSoi) = { - initOutputSerDe(output).getOrElse((null, null)) + initOutputSerDe(ioschema, output).getOrElse((null, null)) } val outputIterator = if (outputSerde == null) { @@ -265,6 +190,90 @@ case class HiveScriptTransformationExec( } } +object HiveScriptIOSchema extends HiveInspectors { + + def initInputSerDe( + ioschema: ScriptTransformationIOSchema, + input: Seq[Expression]): Option[(AbstractSerDe, StructObjectInspector)] = { + ioschema.inputSerdeClass.map { serdeClass => + val (columns, columnTypes) = parseAttrs(input) + val serde = initSerDe(serdeClass, columns, columnTypes, ioschema.inputSerdeProps) + val fieldObjectInspectors = columnTypes.map(toInspector) + val objectInspector = ObjectInspectorFactory + .getStandardStructObjectInspector(columns.asJava, fieldObjectInspectors.asJava) + (serde, objectInspector) + } + } + + def initOutputSerDe( + ioschema: ScriptTransformationIOSchema, + output: Seq[Attribute]): Option[(AbstractSerDe, StructObjectInspector)] = { + ioschema.outputSerdeClass.map { serdeClass => + val (columns, columnTypes) = parseAttrs(output) + val serde = initSerDe(serdeClass, columns, columnTypes, ioschema.outputSerdeProps) + val structObjectInspector = serde.getObjectInspector().asInstanceOf[StructObjectInspector] + (serde, structObjectInspector) + } + } + + private def parseAttrs(attrs: Seq[Expression]): (Seq[String], Seq[DataType]) = { + val columns = attrs.zipWithIndex.map(e => s"${e._1.prettyName}_${e._2}") + val columnTypes = attrs.map(_.dataType) + (columns, columnTypes) + } + + def initSerDe( + serdeClassName: String, + columns: Seq[String], + columnTypes: Seq[DataType], + serdeProps: Seq[(String, String)]): AbstractSerDe = { + + val serde = Utils.classForName[AbstractSerDe](serdeClassName).getConstructor(). + newInstance() + + val columnTypesNames = columnTypes.map(_.toTypeInfo.getTypeName()).mkString(",") + + var propsMap = serdeProps.toMap + (serdeConstants.LIST_COLUMNS -> columns.mkString(",")) + propsMap = propsMap + (serdeConstants.LIST_COLUMN_TYPES -> columnTypesNames) + + val properties = new Properties() + // Can not use properties.putAll(propsMap.asJava) in scala-2.12 + // See https://github.com/scala/bug/issues/10418 + propsMap.foreach { case (k, v) => properties.put(k, v) } + serde.initialize(null, properties) + + serde + } + + def recordReader( + ioschema: ScriptTransformationIOSchema, + inputStream: InputStream, + conf: Configuration): Option[RecordReader] = { + ioschema.recordReaderClass.map { klass => + val instance = Utils.classForName[RecordReader](klass).getConstructor(). + newInstance() + val props = new Properties() + // Can not use props.putAll(outputSerdeProps.toMap.asJava) in scala-2.12 + // See https://github.com/scala/bug/issues/10418 + ioschema.outputSerdeProps.toMap.foreach { case (k, v) => props.put(k, v) } + instance.initialize(inputStream, conf, props) + instance + } + } + + def recordWriter( + ioschema: ScriptTransformationIOSchema, + outputStream: OutputStream, + conf: Configuration): Option[RecordWriter] = { + ioschema.recordWriterClass.map { klass => + val instance = Utils.classForName[RecordWriter](klass).getConstructor(). + newInstance() + instance.initialize(outputStream, conf) + instance + } + } +} + case class HiveScriptTransformationWriterThread( iter: Iterator[InternalRow], inputSchema: Seq[DataType],