Skip to content

Commit

Permalink
save
Browse files Browse the repository at this point in the history
  • Loading branch information
AngersZhuuuu committed Jul 13, 2020
1 parent e53744b commit a693722
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution

import java.io.OutputStream
import java.nio.charset.StandardCharsets
import java.time.ZoneId
import java.util.concurrent.TimeUnit

import scala.util.control.NonFatal
Expand All @@ -31,8 +32,9 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{AttributeSet, UnsafeProjection}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.types.{DataType, DateType, TimestampType}
import org.apache.spark.util.{CircularBuffer, SerializableConfiguration, Utils}

trait BaseScriptTransformationExec extends UnaryExecNode {
Expand Down Expand Up @@ -127,7 +129,19 @@ abstract class BaseScriptTransformationWriterThread extends Thread with Logging
var i = 1
while (i < len) {
sb.append(ioSchema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"))
sb.append(row.get(i, inputSchema(i)))
val columnType = inputSchema(i)
val fieldValue = row.get(i, columnType)
val fieldStringValue = columnType match {
case _: DateType =>
val dateFormatter = DateFormatter(ZoneId.systemDefault())
dateFormatter.format(fieldValue.asInstanceOf[Int])
case _: TimestampType =>
TimestampFormatter.getFractionFormatter(ZoneId.systemDefault())
.format(fieldValue.asInstanceOf[Long])
case _ =>
fieldValue.toString
}
sb.append(fieldStringValue)
i += 1
}
sb.append(ioSchema.inputRowFormatMap("TOK_TABLEROWFORMATLINES"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution}
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.types.StructType

/**
Expand Down Expand Up @@ -713,13 +714,16 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
}
(Seq.empty, Option(name), props.toSeq, recordHandler)

case null =>
case null if conf.getConf(CATALOG_IMPLEMENTATION).equals("hive") =>
// Use default (serde) format.
val name = conf.getConfString("hive.script.serde",
"org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
val props = Seq("field.delim" -> "\t")
val recordHandler = Option(conf.getConfString(configKey, defaultConfigValue))
(Nil, Option(name), props, recordHandler)

case null =>
(Nil, None, Seq.empty, None)
}

val (inFormat, inSerdeClass, inSerdeProps, reader) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ private[hive] trait HiveStrategies {

object HiveScripts extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case ScriptTransformation(input, script, output, child, ioschema) =>
case ScriptTransformation(input, script, output, child, ioschema)
if ioschema.inputSerdeClass.nonEmpty || ioschema.outputSerdeClass.nonEmpty =>
val hiveIoSchema = HiveScriptIOSchema(ioschema)
HiveScriptTransformationExec(input, script, output, planLater(child), hiveIoSchema) :: Nil
case _ => Nil
Expand Down

0 comments on commit a693722

Please sign in to comment.