From a693722ce9509da3118118a8e773f01e89a79950 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Mon, 13 Jul 2020 22:41:03 +0800 Subject: [PATCH] save --- .../BaseScriptTransformationExec.scala | 18 ++++++++++++++++-- .../spark/sql/execution/SparkSqlParser.scala | 6 +++++- .../apache/spark/sql/hive/HiveStrategies.scala | 3 ++- 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala index aa54d93d94b7d..8217fa16148bc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution import java.io.OutputStream import java.nio.charset.StandardCharsets +import java.time.ZoneId import java.util.concurrent.TimeUnit import scala.util.control.NonFatal @@ -31,8 +32,9 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{AttributeSet, UnsafeProjection} import org.apache.spark.sql.catalyst.plans.physical.Partitioning +import org.apache.spark.sql.catalyst.util.{DateFormatter, DateTimeUtils, TimestampFormatter} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.types.{DataType, DateType, TimestampType} import org.apache.spark.util.{CircularBuffer, SerializableConfiguration, Utils} trait BaseScriptTransformationExec extends UnaryExecNode { @@ -127,7 +129,19 @@ abstract class BaseScriptTransformationWriterThread extends Thread with Logging var i = 1 while (i < len) { sb.append(ioSchema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD")) - sb.append(row.get(i, inputSchema(i))) + val columnType = inputSchema(i) + val fieldValue = row.get(i, columnType) + val fieldStringValue = columnType match { + case _: DateType => + val dateFormatter = DateFormatter(ZoneId.systemDefault()) + dateFormatter.format(fieldValue.asInstanceOf[Int]) + case _: TimestampType => + TimestampFormatter.getFractionFormatter(ZoneId.systemDefault()) + .format(fieldValue.asInstanceOf[Long]) + case _ => + fieldValue.toString + } + sb.append(fieldStringValue) i += 1 } sb.append(ioSchema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 3a2c673229c20..492750c60c7c0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution} +import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION import org.apache.spark.sql.types.StructType /** @@ -713,13 +714,16 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { } (Seq.empty, Option(name), props.toSeq, recordHandler) - case null => + case null if conf.getConf(CATALOG_IMPLEMENTATION).equals("hive") => // Use default (serde) format. val name = conf.getConfString("hive.script.serde", "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe") val props = Seq("field.delim" -> "\t") val recordHandler = Option(conf.getConfString(configKey, defaultConfigValue)) (Nil, Option(name), props, recordHandler) + + case null => + (Nil, None, Seq.empty, None) } val (inFormat, inSerdeClass, inSerdeProps, reader) = diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index dae68df08f32e..6bac2a2203713 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -243,7 +243,8 @@ private[hive] trait HiveStrategies { object HiveScripts extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case ScriptTransformation(input, script, output, child, ioschema) => + case ScriptTransformation(input, script, output, child, ioschema) + if ioschema.inputSerdeClass.nonEmpty || ioschema.outputSerdeClass.nonEmpty => val hiveIoSchema = HiveScriptIOSchema(ioschema) HiveScriptTransformationExec(input, script, output, planLater(child), hiveIoSchema) :: Nil case _ => Nil