From a8260e8ff369a1e09f326d1773acc3fa99513b55 Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Sun, 10 May 2015 23:02:51 +0800 Subject: [PATCH] Update code as feedback --- .../spark/sql/hive/HiveMetastoreCatalog.scala | 24 ++++++++-------- .../org/apache/spark/sql/hive/HiveQl.scala | 28 ++++++++----------- .../apache/spark/sql/hive/HiveQlSuite.scala | 2 +- 3 files changed, 24 insertions(+), 30 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 3da9e7f650be9..bbf48efb24440 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -424,18 +424,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive val desc = table.copy(schema = schema) - // This is a hack, we only take the RC, ORC and Parquet as specific storage - // otherwise, we will convert it into Parquet2 when hive.convertCTAS specified - val specificStorage = (table.inputFormat.map(format => { - // org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat => Parquet - // org.apache.hadoop.hive.ql.io.orc.OrcInputFormat => Orc - // org.apache.hadoop.hive.ql.io.RCFileInputFormat => RCFile - // parquet.hive.DeprecatedParquetInputFormat => Parquet - // TODO configurable? - format.contains("Orc") || format.contains("Parquet") || format.contains("RCFile") - }).getOrElse(false)) - - if (hive.convertCTAS && !specificStorage) { + if (hive.convertCTAS && table.serde.isEmpty) { // Do the conversion when spark.sql.hive.convertCTAS is true and the query // does not specify any storage format (file format and storage handler). if (table.specifiedDatabase.isDefined) { @@ -454,9 +443,18 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive child ) } else { + val desc = if (table.serde.isEmpty) { + // add default serde + table.copy( + serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) + } else { + table + } + val (dbName, tblName) = processDatabaseAndTableName( - table.specifiedDatabase.getOrElse(client.currentDatabase), table.name) + desc.specifiedDatabase.getOrElse(client.currentDatabase), desc.name) + execution.CreateTableAsSelect( desc.copy( specifiedDatabase = Some(dbName), diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 786f4dd36f395..5d9b1448d7c23 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -64,7 +64,11 @@ case class CreateTableAsSelect( override def output: Seq[Attribute] = Seq.empty[Attribute] override lazy val resolved: Boolean = - tableDesc.specifiedDatabase.isDefined && tableDesc.schema.size > 0 && childrenResolved + // TODO add more condition? + tableDesc.specifiedDatabase.isDefined && + tableDesc.schema.size > 0 && + tableDesc.serde.isDefined && + childrenResolved } /** Provides a mapping from HiveQL statements to catalyst logical plans and expression trees. */ @@ -607,26 +611,24 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C serde = None, viewText = None) - // default serde & input/output format - tableDesc = if ("SequenceFile".equalsIgnoreCase( - hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT))) { + // default storage type abbriviation (e.g. RCFile, ORC, PARQUET etc.) + val defaultStorageType = hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT) + // handle the default format for the storage type abbriviation + tableDesc = if ("SequenceFile".equalsIgnoreCase(defaultStorageType)) { tableDesc.copy( inputFormat = Option("org.apache.hadoop.mapred.SequenceFileInputFormat"), outputFormat = Option("org.apache.hadoop.mapred.SequenceFileOutputFormat")) - } else if ("RCFile".equalsIgnoreCase( - hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT))) { + } else if ("RCFile".equalsIgnoreCase(defaultStorageType)) { tableDesc.copy( inputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileInputFormat"), outputFormat = Option("org.apache.hadoop.hive.ql.io.RCFileOutputFormat"), serde = Option(hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTRCFILESERDE))) - } else if ("ORC".equalsIgnoreCase( - hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT))) { + } else if ("ORC".equalsIgnoreCase(defaultStorageType)) { tableDesc.copy( inputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"), outputFormat = Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"), serde = Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde")) - } else if ("PARQUET".equalsIgnoreCase( - hiveConf.getVar(HiveConf.ConfVars.HIVEDEFAULTFILEFORMAT))) { + } else if ("PARQUET".equalsIgnoreCase(defaultStorageType)) { tableDesc.copy( inputFormat = Option("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"), @@ -766,12 +768,6 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C case _ => // Unsupport features } - if (tableDesc.serde.isEmpty) { - // add default serde - tableDesc = tableDesc.copy( - serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) - } - CreateTableAsSelect(tableDesc, nodeToPlan(query), allowExisting != None) // If its not a "CTAS" like above then take it as a native command diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala index 78c28e330373e..941a2941649b8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala @@ -137,7 +137,7 @@ class HiveQlSuite extends FunSuite with BeforeAndAfterAll { assert(desc.serdeProperties == Map()) assert(desc.inputFormat == Option("org.apache.hadoop.mapred.TextInputFormat")) assert(desc.outputFormat == Option("org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat")) - assert(desc.serde == Option("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")) + assert(desc.serde.isEmpty) assert(desc.properties == Map()) }