Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
maropu committed Jul 22, 2020
1 parent 7916d72 commit 972775b
Showing 1 changed file with 90 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,84 +53,8 @@ case class HiveScriptTransformationExec(
output: Seq[Attribute],
child: SparkPlan,
ioschema: ScriptTransformationIOSchema)
extends BaseScriptTransformationExec with HiveInspectors {

private def initInputSerDe(
input: Seq[Expression]): Option[(AbstractSerDe, StructObjectInspector)] = {
ioschema.inputSerdeClass.map { serdeClass =>
val (columns, columnTypes) = parseAttrs(input)
val serde = initSerDe(serdeClass, columns, columnTypes, ioschema.inputSerdeProps)
val fieldObjectInspectors = columnTypes.map(toInspector)
val objectInspector = ObjectInspectorFactory
.getStandardStructObjectInspector(columns.asJava, fieldObjectInspectors.asJava)
(serde, objectInspector)
}
}

private def initOutputSerDe(
output: Seq[Attribute]): Option[(AbstractSerDe, StructObjectInspector)] = {
ioschema.outputSerdeClass.map { serdeClass =>
val (columns, columnTypes) = parseAttrs(output)
val serde = initSerDe(serdeClass, columns, columnTypes, ioschema.outputSerdeProps)
val structObjectInspector = serde.getObjectInspector().asInstanceOf[StructObjectInspector]
(serde, structObjectInspector)
}
}

private def parseAttrs(attrs: Seq[Expression]): (Seq[String], Seq[DataType]) = {
val columns = attrs.zipWithIndex.map(e => s"${e._1.prettyName}_${e._2}")
val columnTypes = attrs.map(_.dataType)
(columns, columnTypes)
}

private def initSerDe(
serdeClassName: String,
columns: Seq[String],
columnTypes: Seq[DataType],
serdeProps: Seq[(String, String)]): AbstractSerDe = {

val serde = Utils.classForName[AbstractSerDe](serdeClassName).getConstructor().
newInstance()

val columnTypesNames = columnTypes.map(_.toTypeInfo.getTypeName()).mkString(",")

var propsMap = serdeProps.toMap + (serdeConstants.LIST_COLUMNS -> columns.mkString(","))
propsMap = propsMap + (serdeConstants.LIST_COLUMN_TYPES -> columnTypesNames)

val properties = new Properties()
// Can not use properties.putAll(propsMap.asJava) in scala-2.12
// See https://github.com/scala/bug/issues/10418
propsMap.foreach { case (k, v) => properties.put(k, v) }
serde.initialize(null, properties)

serde
}

private def recordReader(
inputStream: InputStream,
conf: Configuration): Option[RecordReader] = {
ioschema.recordReaderClass.map { klass =>
val instance = Utils.classForName[RecordReader](klass).getConstructor().
newInstance()
val props = new Properties()
// Can not use props.putAll(outputSerdeProps.toMap.asJava) in scala-2.12
// See https://github.com/scala/bug/issues/10418
ioschema.outputSerdeProps.toMap.foreach { case (k, v) => props.put(k, v) }
instance.initialize(inputStream, conf, props)
instance
}
}

private def recordWriter(
outputStream: OutputStream,
conf: Configuration): Option[RecordWriter] = {
ioschema.recordWriterClass.map { klass =>
val instance = Utils.classForName[RecordWriter](klass).getConstructor().
newInstance()
instance.initialize(outputStream, conf)
instance
}
}
extends BaseScriptTransformationExec {
import HiveScriptIOSchema._

private def createOutputIteratorWithSerde(
writerThread: BaseScriptTransformationWriterThread,
Expand All @@ -144,7 +68,8 @@ case class HiveScriptTransformationExec(
var curLine: String = null
val scriptOutputStream = new DataInputStream(inputStream)

@Nullable val scriptOutputReader = recordReader(scriptOutputStream, hadoopConf).orNull
@Nullable val scriptOutputReader =
recordReader(ioschema, scriptOutputStream, hadoopConf).orNull

var scriptOutputWritable: Writable = null
val reusedWritableObject = outputSerde.getSerializedClass.getConstructor().newInstance()
Expand Down Expand Up @@ -218,7 +143,7 @@ case class HiveScriptTransformationExec(

// This nullability is a performance optimization in order to avoid an Option.foreach() call
// inside of a loop
@Nullable val (inputSerde, inputSoi) = initInputSerDe(input).getOrElse((null, null))
@Nullable val (inputSerde, inputSoi) = initInputSerDe(ioschema, input).getOrElse((null, null))

// For HiveScriptTransformationExec, if inputSerde == null, but outputSerde != null
// We will use StringBuffer to pass data, in this case, we should cast data as string too.
Expand Down Expand Up @@ -249,7 +174,7 @@ case class HiveScriptTransformationExec(
// This nullability is a performance optimization in order to avoid an Option.foreach() call
// inside of a loop
@Nullable val (outputSerde, outputSoi) = {
initOutputSerDe(output).getOrElse((null, null))
initOutputSerDe(ioschema, output).getOrElse((null, null))
}

val outputIterator = if (outputSerde == null) {
Expand All @@ -265,6 +190,90 @@ case class HiveScriptTransformationExec(
}
}

object HiveScriptIOSchema extends HiveInspectors {

def initInputSerDe(
ioschema: ScriptTransformationIOSchema,
input: Seq[Expression]): Option[(AbstractSerDe, StructObjectInspector)] = {
ioschema.inputSerdeClass.map { serdeClass =>
val (columns, columnTypes) = parseAttrs(input)
val serde = initSerDe(serdeClass, columns, columnTypes, ioschema.inputSerdeProps)
val fieldObjectInspectors = columnTypes.map(toInspector)
val objectInspector = ObjectInspectorFactory
.getStandardStructObjectInspector(columns.asJava, fieldObjectInspectors.asJava)
(serde, objectInspector)
}
}

def initOutputSerDe(
ioschema: ScriptTransformationIOSchema,
output: Seq[Attribute]): Option[(AbstractSerDe, StructObjectInspector)] = {
ioschema.outputSerdeClass.map { serdeClass =>
val (columns, columnTypes) = parseAttrs(output)
val serde = initSerDe(serdeClass, columns, columnTypes, ioschema.outputSerdeProps)
val structObjectInspector = serde.getObjectInspector().asInstanceOf[StructObjectInspector]
(serde, structObjectInspector)
}
}

private def parseAttrs(attrs: Seq[Expression]): (Seq[String], Seq[DataType]) = {
val columns = attrs.zipWithIndex.map(e => s"${e._1.prettyName}_${e._2}")
val columnTypes = attrs.map(_.dataType)
(columns, columnTypes)
}

def initSerDe(
serdeClassName: String,
columns: Seq[String],
columnTypes: Seq[DataType],
serdeProps: Seq[(String, String)]): AbstractSerDe = {

val serde = Utils.classForName[AbstractSerDe](serdeClassName).getConstructor().
newInstance()

val columnTypesNames = columnTypes.map(_.toTypeInfo.getTypeName()).mkString(",")

var propsMap = serdeProps.toMap + (serdeConstants.LIST_COLUMNS -> columns.mkString(","))
propsMap = propsMap + (serdeConstants.LIST_COLUMN_TYPES -> columnTypesNames)

val properties = new Properties()
// Can not use properties.putAll(propsMap.asJava) in scala-2.12
// See https://github.com/scala/bug/issues/10418
propsMap.foreach { case (k, v) => properties.put(k, v) }
serde.initialize(null, properties)

serde
}

def recordReader(
ioschema: ScriptTransformationIOSchema,
inputStream: InputStream,
conf: Configuration): Option[RecordReader] = {
ioschema.recordReaderClass.map { klass =>
val instance = Utils.classForName[RecordReader](klass).getConstructor().
newInstance()
val props = new Properties()
// Can not use props.putAll(outputSerdeProps.toMap.asJava) in scala-2.12
// See https://github.com/scala/bug/issues/10418
ioschema.outputSerdeProps.toMap.foreach { case (k, v) => props.put(k, v) }
instance.initialize(inputStream, conf, props)
instance
}
}

def recordWriter(
ioschema: ScriptTransformationIOSchema,
outputStream: OutputStream,
conf: Configuration): Option[RecordWriter] = {
ioschema.recordWriterClass.map { klass =>
val instance = Utils.classForName[RecordWriter](klass).getConstructor().
newInstance()
instance.initialize(outputStream, conf)
instance
}
}
}

case class HiveScriptTransformationWriterThread(
iter: Iterator[InternalRow],
inputSchema: Seq[DataType],
Expand Down

0 comments on commit 972775b

Please sign in to comment.