From 6bb58808875e82740da64bbb7cbca4d22817dbce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?baishuo=28=E7=99=BD=E7=A1=95=29?= Date: Wed, 13 Aug 2014 01:27:54 +0800 Subject: [PATCH 01/28] Update HiveQl.scala --- .../src/main/scala/org/apache/spark/sql/hive/HiveQl.scala | 5 ----- 1 file changed, 5 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 0aa6292c0184e..4e30e6e06fe21 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -837,11 +837,6 @@ private[hive] object HiveQl { cleanIdentifier(key.toLowerCase) -> None }.toMap).getOrElse(Map.empty) - if (partitionKeys.values.exists(p => p.isEmpty)) { - throw new NotImplementedError(s"Do not support INSERT INTO/OVERWRITE with" + - s"dynamic partitioning.") - } - InsertIntoTable(UnresolvedRelation(db, tableName, None), partitionKeys, query, overwrite) case a: ASTNode => From 1867e23f72f94c7161a485222c6a8e31814d20f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?baishuo=28=E7=99=BD=E7=A1=95=29?= Date: Wed, 13 Aug 2014 01:29:36 +0800 Subject: [PATCH 02/28] Update SparkHadoopWriter.scala --- .../org/apache/spark/SparkHadoopWriter.scala | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index ab7862f4f9e06..a62664835fb20 100644 --- a/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -93,6 +93,33 @@ private[hive] class SparkHiveHadoopWriter( null) } + def open(dynamicPartPath: String) { + val numfmt = NumberFormat.getInstance() + numfmt.setMinimumIntegerDigits(5) + numfmt.setGroupingUsed(false) + + val extension = Utilities.getFileExtension( + conf.value, + fileSinkConf.getCompressed, + getOutputFormat()) + + val outputName = "part-" + numfmt.format(splitID) + extension + val outputPath: Path = FileOutputFormat.getOutputPath(conf.value) + if (outputPath == null) { + throw new IOException("Undefined job output-path") + } + val workPath = new Path(outputPath, dynamicPartPath.substring(1))//remove "/" + val path = new Path(workPath, outputName) + getOutputCommitter().setupTask(getTaskContext()) + writer = HiveFileFormatUtils.getHiveRecordWriter( + conf.value, + fileSinkConf.getTableInfo, + conf.value.getOutputValueClass.asInstanceOf[Class[Writable]], + fileSinkConf, + path, + null) + } + def write(value: Writable) { if (writer != null) { writer.write(value) From adf02f13e8d93eadb2d032007e00cd9406c62439 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?baishuo=28=E7=99=BD=E7=A1=95=29?= Date: Wed, 13 Aug 2014 01:31:01 +0800 Subject: [PATCH 03/28] Update InsertIntoHiveTable.scala --- .../hive/execution/InsertIntoHiveTable.scala | 150 +++++++++++++++--- 1 file changed, 130 insertions(+), 20 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index a284a91a91e31..9727dfa199e14 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -34,7 +34,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveVarcharOb import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf} -import org.apache.spark.{SparkException, TaskContext} +import org.apache.spark.{SerializableWritable, SparkException, TaskContext} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.Row @@ -159,6 +159,30 @@ case class InsertIntoHiveTable( writer.commitJob() } + def getDynamicPartDir(tableInfo: TableDesc, row: Row, dynamicPartNum2: Int) :String = { + println("tableInfo.class:" + tableInfo.getClass + "|row(2):" + row(2)) + println(tableInfo.getProperties.getProperty("columns") + "|" + tableInfo.getProperties.getProperty("partition_columns")) + dynamicPartNum2 match { + case 0 =>"" + case i => { + val colsNum = tableInfo.getProperties.getProperty("columns").split("\\,").length + val partColStr = tableInfo.getProperties.getProperty("partition_columns") + val partCols = partColStr.split("/") + var buf = new StringBuffer() + if (partCols.length == dynamicPartNum2) { + for (j <- 0 until partCols.length) { + buf.append("/").append(partCols(j)).append("=").append(row(j + row.length - colsNum)) + } + } else { + for (j <- 0 until dynamicPartNum2) { + buf.append("/").append(partCols(j + partCols.length - dynamicPartNum2)).append("=").append(row(j + colsNum)) + } + } + buf.toString + } + } + } + override def execute() = result /** @@ -178,6 +202,12 @@ case class InsertIntoHiveTable( val tableLocation = table.hiveQlTable.getDataLocation val tmpLocation = hiveContext.getExternalTmpFileURI(tableLocation) val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) + var dynamicPartNum = 0 + var dynamicPartPath = ""; + val partitionSpec = partition.map { + case (key, Some(value)) => key -> value + case (key, None) => { dynamicPartNum += 1; key -> "" }// Should not reach here right now. + } val rdd = childRdd.mapPartitions { iter => val serializer = newSerializer(fileSinkConf.getTableInfo) val standardOI = ObjectInspectorUtils @@ -191,7 +221,10 @@ case class InsertIntoHiveTable( val outputData = new Array[Any](fieldOIs.length) iter.map { row => var i = 0 - while (i < row.length) { + while (i < fieldOIs.length) { + if (fieldOIs.length < row.length && row.length - fieldOIs.length == dynamicPartNum) { + dynamicPartPath = getDynamicPartDir(fileSinkConf.getTableInfo, row, dynamicPartNum) + } // Casts Strings to HiveVarchars when necessary. outputData(i) = wrap(row(i), fieldOIs(i)) i += 1 @@ -204,12 +237,81 @@ case class InsertIntoHiveTable( // ORC stores compression information in table properties. While, there are other formats // (e.g. RCFile) that rely on hadoop configurations to store compression information. val jobConf = new JobConf(sc.hiveconf) - saveAsHiveFile( - rdd, - outputClass, - fileSinkConf, - jobConf, - sc.hiveconf.getBoolean("hive.exec.compress.output", false)) + val jobConfSer = new SerializableWritable(jobConf) + if (dynamicPartNum>0) { + if (outputClass == null) { + throw new SparkException("Output value class not set") + } + jobConfSer.value.setOutputValueClass(outputClass) + if (fileSinkConf.getTableInfo.getOutputFileFormatClassName == null) { + throw new SparkException("Output format class not set") + } + // Doesn't work in Scala 2.9 due to what may be a generics bug + // TODO: Should we uncomment this for Scala 2.10? + // conf.setOutputFormat(outputFormatClass) + jobConfSer.value.set("mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName) + if (sc.hiveconf.getBoolean("hive.exec.compress.output", false)) { + // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec", + // and "mapred.output.compression.type" have no impact on ORC because it uses table properties + // to store compression information. + jobConfSer.value.set("mapred.output.compress", "true") + fileSinkConf.setCompressed(true) + fileSinkConf.setCompressCodec(jobConfSer.value.get("mapred.output.compression.codec")) + fileSinkConf.setCompressType(jobConfSer.value.get("mapred.output.compression.type")) + } + jobConfSer.value.setOutputCommitter(classOf[FileOutputCommitter]) + + FileOutputFormat.setOutputPath( + jobConfSer.value, + SparkHiveHadoopWriter.createPathFromString(fileSinkConf.getDirName, jobConfSer.value)) + + var writerMap = new scala.collection.mutable.HashMap[String, SparkHiveHadoopWriter] + def writeToFile2(context: TaskContext, iter: Iterator[Writable]) { + // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it + // around by taking a mod. We expect that no task will be attempted 2 billion times. + val attemptNumber = (context.attemptId % Int.MaxValue).toInt + val serializer = newSerializer(fileSinkConf.getTableInfo) + var count = 0 + var writer2:SparkHiveHadoopWriter = null + while(iter.hasNext) { + val record = iter.next(); + val location = fileSinkConf.getDirName + val partLocation = location + dynamicPartPath + writer2=writerMap.get(dynamicPartPath) match { + case Some(writer)=> writer + case None => { + val tempWriter = new SparkHiveHadoopWriter(jobConfSer.value, new FileSinkDesc(partLocation, fileSinkConf.getTableInfo, false)) + tempWriter.setup(context.stageId, context.partitionId, attemptNumber) + tempWriter.open(dynamicPartPath); + writerMap += (dynamicPartPath -> tempWriter) + tempWriter + } + } + count += 1 + writer2.write(record) + } + for((k,v) <- writerMap) { + v.close() + v.commit() + } + } + + sc.sparkContext.runJob(rdd, writeToFile2 _) + + for((k,v) <- writerMap) { + v.commitJob() + } + writerMap.clear() + //writer.commitJob() + + } else { + saveAsHiveFile( + rdd, + outputClass, + fileSinkConf, + jobConf, + sc.hiveconf.getBoolean("hive.exec.compress.output", false)) + } // TODO: Handle dynamic partitioning. val outputPath = FileOutputFormat.getOutputPath(jobConf) @@ -220,10 +322,6 @@ case class InsertIntoHiveTable( // holdDDLTime will be true when TOK_HOLD_DDLTIME presents in the query as a hint. val holdDDLTime = false if (partition.nonEmpty) { - val partitionSpec = partition.map { - case (key, Some(value)) => key -> value - case (key, None) => key -> "" // Should not reach here right now. - } val partVals = MetaStoreUtils.getPvals(table.hiveQlTable.getPartCols, partitionSpec) db.validatePartitionNameCharacters(partVals) // inheritTableSpecs is set to true. It should be set to false for a IMPORT query @@ -231,14 +329,26 @@ case class InsertIntoHiveTable( val inheritTableSpecs = true // TODO: Correctly set isSkewedStoreAsSubdir. val isSkewedStoreAsSubdir = false - db.loadPartition( - outputPath, - qualifiedTableName, - partitionSpec, - overwrite, - holdDDLTime, - inheritTableSpecs, - isSkewedStoreAsSubdir) + if (dynamicPartNum>0) { + db.loadDynamicPartitions( + outputPath, + qualifiedTableName, + partitionSpec, + overwrite, + dynamicPartNum/*dpCtx.getNumDPCols()*/, + holdDDLTime, + isSkewedStoreAsSubdir + ) + } else { + db.loadPartition( + outputPath, + qualifiedTableName, + partitionSpec, + overwrite, + holdDDLTime, + inheritTableSpecs, + isSkewedStoreAsSubdir) + } } else { db.loadTable( outputPath, From 6af73f46430c5e38d43af8ed288936e1b4ca2678 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?baishuo=28=E7=99=BD=E7=A1=95=29?= Date: Wed, 13 Aug 2014 01:53:04 +0800 Subject: [PATCH 04/28] Update InsertIntoHiveTable.scala --- .../apache/spark/sql/hive/execution/InsertIntoHiveTable.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 9727dfa199e14..a6f1a683f3fe2 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -160,8 +160,6 @@ case class InsertIntoHiveTable( } def getDynamicPartDir(tableInfo: TableDesc, row: Row, dynamicPartNum2: Int) :String = { - println("tableInfo.class:" + tableInfo.getClass + "|row(2):" + row(2)) - println(tableInfo.getProperties.getProperty("columns") + "|" + tableInfo.getProperties.getProperty("partition_columns")) dynamicPartNum2 match { case 0 =>"" case i => { From 98cfb1fd8dab201f9411dd487e4a8b259216d9a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?baishuo=28=E7=99=BD=E7=A1=95=29?= Date: Mon, 18 Aug 2014 14:29:21 +0800 Subject: [PATCH 05/28] Update HiveCompatibilitySuite.scala --- .../hive/execution/HiveCompatibilitySuite.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala index 556c984ad392b..35e9c9939d4b7 100644 --- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala +++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala @@ -220,6 +220,23 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter { */ override def whiteList = Seq( "add_part_exist", + "dynamic_partition_skip_default", + "infer_bucket_sort_dyn_part", + "load_dyn_part1", + "load_dyn_part2", + "load_dyn_part3", + "load_dyn_part4", + "load_dyn_part5", + "load_dyn_part6", + "load_dyn_part7", + "load_dyn_part8", + "load_dyn_part9", + "load_dyn_part10", + "load_dyn_part11", + "load_dyn_part12", + "load_dyn_part13", + "load_dyn_part14", + "load_dyn_part14_win", "add_part_multiple", "add_partition_no_whitelist", "add_partition_with_whitelist", From 37c603b5001169d1aa21fa293f782666eedc338a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?baishuo=28=E7=99=BD=E7=A1=95=29?= Date: Mon, 18 Aug 2014 14:34:53 +0800 Subject: [PATCH 06/28] Update InsertIntoHiveTable.scala --- .../hive/execution/InsertIntoHiveTable.scala | 56 +++++++++++++++---- 1 file changed, 44 insertions(+), 12 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index a6f1a683f3fe2..8f327d1887a23 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -24,7 +24,9 @@ import java.util.{HashMap => JHashMap} import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar} import org.apache.hadoop.hive.metastore.MetaStoreUtils import org.apache.hadoop.hive.ql.Context +import org.apache.hadoop.hive.ql.ErrorMsg import org.apache.hadoop.hive.ql.metadata.Hive +import org.apache.hadoop.hive.ql.parse.SemanticException import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc} import org.apache.hadoop.hive.serde2.Serializer import org.apache.hadoop.hive.serde2.objectinspector._ @@ -40,6 +42,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.Row import org.apache.spark.sql.execution.{SparkPlan, UnaryNode} import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation, SparkHiveHadoopWriter} +import org.apache.hadoop.hive.conf.HiveConf /** * :: DeveloperApi :: @@ -159,7 +162,7 @@ case class InsertIntoHiveTable( writer.commitJob() } - def getDynamicPartDir(tableInfo: TableDesc, row: Row, dynamicPartNum2: Int) :String = { + def getDynamicPartDir(tableInfo: TableDesc, row: Row, dynamicPartNum2: Int, jobConf: JobConf) :String = { dynamicPartNum2 match { case 0 =>"" case i => { @@ -169,11 +172,11 @@ case class InsertIntoHiveTable( var buf = new StringBuffer() if (partCols.length == dynamicPartNum2) { for (j <- 0 until partCols.length) { - buf.append("/").append(partCols(j)).append("=").append(row(j + row.length - colsNum)) + buf.append("/").append(partCols(j)).append("=").append(handleNull(row(colsNum + j ), jobConf)) } } else { for (j <- 0 until dynamicPartNum2) { - buf.append("/").append(partCols(j + partCols.length - dynamicPartNum2)).append("=").append(row(j + colsNum)) + buf.append("/").append(partCols(j + partCols.length - dynamicPartNum2)).append("=").append(handleNull(row(colsNum + j), jobConf)) } } buf.toString @@ -181,6 +184,14 @@ case class InsertIntoHiveTable( } } + def handleNull(obj :Any, jobConf: JobConf) :String = { + if (obj == null ||obj.toString.length == 0) { + jobConf.get("hive.exec.default.partition.name ", "__HIVE_DEFAULT_PARTITION__") + } else { + obj.toString + } + } + override def execute() = result /** @@ -201,11 +212,38 @@ case class InsertIntoHiveTable( val tmpLocation = hiveContext.getExternalTmpFileURI(tableLocation) val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) var dynamicPartNum = 0 + var numStaPart = 0 var dynamicPartPath = ""; val partitionSpec = partition.map { - case (key, Some(value)) => key -> value - case (key, None) => { dynamicPartNum += 1; key -> "" }// Should not reach here right now. + case (key, Some(value)) => { numStaPart += 1; key -> value } + case (key, None) => { dynamicPartNum += 1; key -> "" } } + // ORC stores compression information in table properties. While, there are other formats + // (e.g. RCFile) that rely on hadoop configurations to store compression information. + val jobConf = new JobConf(sc.hiveconf) + val jobConfSer = new SerializableWritable(jobConf) + // check if the partition spec is valid + if (dynamicPartNum > 0) { + if (!sc.hiveconf.getBoolVar(HiveConf.ConfVars.DYNAMICPARTITIONING)) { + throw new SemanticException( + ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg()) + } + if (numStaPart == 0 && sc.hiveconf.getVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equalsIgnoreCase("strict")) { + throw new SemanticException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg()); + } + // check if static partition appear after dynamic partitions + for ((k,v) <- partitionSpec) { + if (partitionSpec(k) == "") { + if (numStaPart > 0) { // found a DP, but there exists ST as subpartition + throw new SemanticException( + ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg()); + } + } else { + numStaPart -= 1 + } + } + } + val rdd = childRdd.mapPartitions { iter => val serializer = newSerializer(fileSinkConf.getTableInfo) val standardOI = ObjectInspectorUtils @@ -221,7 +259,7 @@ case class InsertIntoHiveTable( var i = 0 while (i < fieldOIs.length) { if (fieldOIs.length < row.length && row.length - fieldOIs.length == dynamicPartNum) { - dynamicPartPath = getDynamicPartDir(fileSinkConf.getTableInfo, row, dynamicPartNum) + dynamicPartPath = getDynamicPartDir(fileSinkConf.getTableInfo, row, dynamicPartNum, jobConfSer.value) } // Casts Strings to HiveVarchars when necessary. outputData(i) = wrap(row(i), fieldOIs(i)) @@ -232,10 +270,6 @@ case class InsertIntoHiveTable( } } - // ORC stores compression information in table properties. While, there are other formats - // (e.g. RCFile) that rely on hadoop configurations to store compression information. - val jobConf = new JobConf(sc.hiveconf) - val jobConfSer = new SerializableWritable(jobConf) if (dynamicPartNum>0) { if (outputClass == null) { throw new SparkException("Output value class not set") @@ -300,8 +334,6 @@ case class InsertIntoHiveTable( v.commitJob() } writerMap.clear() - //writer.commitJob() - } else { saveAsHiveFile( rdd, From d452eb322da5962875c8e96865f1cd63b7dddda0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?baishuo=28=E7=99=BD=E7=A1=95=29?= Date: Mon, 18 Aug 2014 14:36:58 +0800 Subject: [PATCH 07/28] Update HiveQuerySuite.scala --- .../spark/sql/hive/execution/HiveQuerySuite.scala | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 2da8a6fac3d99..76df6f65a1bb7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -271,6 +271,18 @@ class HiveQuerySuite extends HiveComparisonTest { |insert overwrite table src_lv2 SELECT key, D.* lateral view explode(array(key+3, key+4)) D as CX """.stripMargin) + createQueryTest("dynamice partiton", + """ + |drop table IF EXISTS dynamic_part_table; + |create table dynamic_part_table(intcol int) partitioned by (partcol1 int, partcol2 int); + |set hive.exec.dynamic.partition.mode=nonstrict; + |insert into table dynamic_part_table partition(partcol1, partcol2) select 1, 1, 1 from src where key=150; + |insert into table dynamic_part_table partition(partcol1, partcol2) select 1, NULL, 1 from src where key=150; + |insert into table dynamic_part_table partition(partcol1, partcol2) select 1, 1, NULL from src where key=150; + |insert into table dynamic_part_table partition(partcol1, partcol2) select 1, NULL, NULL from src where key=150; + |drop table IF EXISTS dynamic_part_table; + """.stripMargin) + createQueryTest("lateral view5", "FROM src SELECT explode(array(key+3, key+4))") From 051ba91e4952c463446450c745073ab4cf742b26 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?baishuo=28=E7=99=BD=E7=A1=95=29?= Date: Mon, 18 Aug 2014 15:18:07 +0800 Subject: [PATCH 08/28] Update Cast.scala --- .../org/apache/spark/sql/catalyst/expressions/Cast.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index f626d09f037bc..1a521fc195e70 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -18,9 +18,10 @@ package org.apache.spark.sql.catalyst.expressions import java.sql.Timestamp -import java.text.{DateFormat, SimpleDateFormat} +import java.text.{DateFormat, NumberFormat, SimpleDateFormat} import org.apache.spark.sql.catalyst.types._ +import java.util.Locale /** Cast the child expression to the target data type. */ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression { @@ -271,4 +272,9 @@ object Cast { new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") } } + private[sql] val threadLocalNumberFormat = new ThreadLocal[NumberFormat] { + override def initialValue() = { + NumberFormat.getInstance() + } + } } From 8ad173cd41e2abb93df681383cf15aa59e3484c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?baishuo=28=E7=99=BD=E7=A1=95=29?= Date: Mon, 18 Aug 2014 15:44:07 +0800 Subject: [PATCH 09/28] Update InsertIntoHiveTable.scala --- .../spark/sql/hive/execution/InsertIntoHiveTable.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 8f327d1887a23..4df9fad385b42 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -270,7 +270,7 @@ case class InsertIntoHiveTable( } } - if (dynamicPartNum>0) { + if (dynamicPartNum > 0) { if (outputClass == null) { throw new SparkException("Output value class not set") } @@ -343,7 +343,6 @@ case class InsertIntoHiveTable( sc.hiveconf.getBoolean("hive.exec.compress.output", false)) } - // TODO: Handle dynamic partitioning. val outputPath = FileOutputFormat.getOutputPath(jobConf) // Have to construct the format of dbname.tablename. val qualifiedTableName = s"${table.databaseName}.${table.tableName}" @@ -359,7 +358,7 @@ case class InsertIntoHiveTable( val inheritTableSpecs = true // TODO: Correctly set isSkewedStoreAsSubdir. val isSkewedStoreAsSubdir = false - if (dynamicPartNum>0) { + if (dynamicPartNum > 0) { db.loadDynamicPartitions( outputPath, qualifiedTableName, From 3f91665c1e546881f045034023cace851e1949d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?baishuo=28=E7=99=BD=E7=A1=95=29?= Date: Mon, 18 Aug 2014 15:57:20 +0800 Subject: [PATCH 10/28] Update Cast.scala --- .../org/apache/spark/sql/catalyst/expressions/Cast.scala | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala index 1a521fc195e70..f626d09f037bc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala @@ -18,10 +18,9 @@ package org.apache.spark.sql.catalyst.expressions import java.sql.Timestamp -import java.text.{DateFormat, NumberFormat, SimpleDateFormat} +import java.text.{DateFormat, SimpleDateFormat} import org.apache.spark.sql.catalyst.types._ -import java.util.Locale /** Cast the child expression to the target data type. */ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression { @@ -272,9 +271,4 @@ object Cast { new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") } } - private[sql] val threadLocalNumberFormat = new ThreadLocal[NumberFormat] { - override def initialValue() = { - NumberFormat.getInstance() - } - } } From 8e7268cae0f4f9463c092b6c2833a86be56524e7 Mon Sep 17 00:00:00 2001 From: baishuo Date: Tue, 19 Aug 2014 09:01:49 -0700 Subject: [PATCH 11/28] update file after test --- .../org/apache/spark/SparkHadoopWriter.scala | 10 +- .../hive/execution/InsertIntoHiveTable.scala | 270 ++++++++---------- ...artiton-0-310dfcd4399a7d152dd76020fb41ecef | 0 ...artiton-1-2bba07855af8c11899cc6b89f8c0ee02 | 0 ...artiton-2-16367c381d4b189b3640c92511244bfe | 1 + ...artiton-3-b855e84c1d159eb6fa5fbb8ca371d318 | 0 ...artiton-4-ccc7d6efb0b13d5649ff98006e7ce182 | 0 ...artiton-5-516a04c3833a10c0241ec00dd6474dee | 0 ...artiton-6-b00f7cece45f474c6383b2a9346284ed | 0 ...artiton-7-310dfcd4399a7d152dd76020fb41ecef | 0 .../sql/hive/execution/HiveQuerySuite.scala | 2 +- 11 files changed, 137 insertions(+), 146 deletions(-) create mode 100644 sql/hive/src/test/resources/golden/dynamic_partiton-0-310dfcd4399a7d152dd76020fb41ecef create mode 100644 sql/hive/src/test/resources/golden/dynamic_partiton-1-2bba07855af8c11899cc6b89f8c0ee02 create mode 100644 sql/hive/src/test/resources/golden/dynamic_partiton-2-16367c381d4b189b3640c92511244bfe create mode 100644 sql/hive/src/test/resources/golden/dynamic_partiton-3-b855e84c1d159eb6fa5fbb8ca371d318 create mode 100644 sql/hive/src/test/resources/golden/dynamic_partiton-4-ccc7d6efb0b13d5649ff98006e7ce182 create mode 100644 sql/hive/src/test/resources/golden/dynamic_partiton-5-516a04c3833a10c0241ec00dd6474dee create mode 100644 sql/hive/src/test/resources/golden/dynamic_partiton-6-b00f7cece45f474c6383b2a9346284ed create mode 100644 sql/hive/src/test/resources/golden/dynamic_partiton-7-310dfcd4399a7d152dd76020fb41ecef diff --git a/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index a62664835fb20..7d1be4cbf8fd9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -94,7 +94,7 @@ private[hive] class SparkHiveHadoopWriter( } def open(dynamicPartPath: String) { - val numfmt = NumberFormat.getInstance() + val numfmt = SparkHiveHadoopWriter.threadLocalNumberFormat.get() numfmt.setMinimumIntegerDigits(5) numfmt.setGroupingUsed(false) @@ -108,7 +108,7 @@ private[hive] class SparkHiveHadoopWriter( if (outputPath == null) { throw new IOException("Undefined job output-path") } - val workPath = new Path(outputPath, dynamicPartPath.substring(1))//remove "/" + val workPath = new Path(outputPath, dynamicPartPath.substring(1)) // remove "/" val path = new Path(workPath, outputName) getOutputCommitter().setupTask(getTaskContext()) writer = HiveFileFormatUtils.getHiveRecordWriter( @@ -219,4 +219,10 @@ private[hive] object SparkHiveHadoopWriter { } outputPath.makeQualified(fs) } + + val threadLocalNumberFormat = new ThreadLocal[NumberFormat] { + override def initialValue() = { + NumberFormat.getInstance() + } + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 4df9fad385b42..9b2cd16651b9a 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -26,7 +26,6 @@ import org.apache.hadoop.hive.metastore.MetaStoreUtils import org.apache.hadoop.hive.ql.Context import org.apache.hadoop.hive.ql.ErrorMsg import org.apache.hadoop.hive.ql.metadata.Hive -import org.apache.hadoop.hive.ql.parse.SemanticException import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc} import org.apache.hadoop.hive.serde2.Serializer import org.apache.hadoop.hive.serde2.objectinspector._ @@ -104,91 +103,132 @@ case class InsertIntoHiveTable( } def saveAsHiveFile( - rdd: RDD[Writable], + rdd: RDD[(Writable, String)], valueClass: Class[_], fileSinkConf: FileSinkDesc, - conf: JobConf, - isCompressed: Boolean) { + conf: SerializableWritable[JobConf], + isCompressed: Boolean, + dynamicPartNum: Int) { if (valueClass == null) { throw new SparkException("Output value class not set") } - conf.setOutputValueClass(valueClass) + conf.value.setOutputValueClass(valueClass) if (fileSinkConf.getTableInfo.getOutputFileFormatClassName == null) { throw new SparkException("Output format class not set") } // Doesn't work in Scala 2.9 due to what may be a generics bug // TODO: Should we uncomment this for Scala 2.10? // conf.setOutputFormat(outputFormatClass) - conf.set("mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName) + conf.value.set("mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName) if (isCompressed) { // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec", // and "mapred.output.compression.type" have no impact on ORC because it uses table properties // to store compression information. - conf.set("mapred.output.compress", "true") + conf.value.set("mapred.output.compress", "true") fileSinkConf.setCompressed(true) - fileSinkConf.setCompressCodec(conf.get("mapred.output.compression.codec")) - fileSinkConf.setCompressType(conf.get("mapred.output.compression.type")) + fileSinkConf.setCompressCodec(conf.value.get("mapred.output.compression.codec")) + fileSinkConf.setCompressType(conf.value.get("mapred.output.compression.type")) } - conf.setOutputCommitter(classOf[FileOutputCommitter]) - FileOutputFormat.setOutputPath( - conf, - SparkHiveHadoopWriter.createPathFromString(fileSinkConf.getDirName, conf)) + conf.value.setOutputCommitter(classOf[FileOutputCommitter]) + FileOutputFormat.setOutputPath( + conf.value, + SparkHiveHadoopWriter.createPathFromString(fileSinkConf.getDirName, conf.value)) log.debug("Saving as hadoop file of type " + valueClass.getSimpleName) + var writer: SparkHiveHadoopWriter = null + //Map restore writesr for Dynamic Partition + var writerMap: scala.collection.mutable.HashMap[String, SparkHiveHadoopWriter] = null + if (dynamicPartNum == 0) { + writer = new SparkHiveHadoopWriter(conf.value, fileSinkConf) + writer.preSetup() + } else { + writerMap = new scala.collection.mutable.HashMap[String, SparkHiveHadoopWriter] + } - val writer = new SparkHiveHadoopWriter(conf, fileSinkConf) - writer.preSetup() - - def writeToFile(context: TaskContext, iter: Iterator[Writable]) { - // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it - // around by taking a mod. We expect that no task will be attempted 2 billion times. - val attemptNumber = (context.attemptId % Int.MaxValue).toInt - + def writeToFile(context: TaskContext, iter: Iterator[(Writable, String)]) { + // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it + // around by taking a mod. We expect that no task will be attempted 2 billion times. + val attemptNumber = (context.attemptId % Int.MaxValue).toInt + // writer for No Dynamic Partition + if (dynamicPartNum == 0) { writer.setup(context.stageId, context.partitionId, attemptNumber) writer.open() + } else { - var count = 0 - while(iter.hasNext) { - val record = iter.next() - count += 1 - writer.write(record) - } - - writer.close() - writer.commit() } - - sc.sparkContext.runJob(rdd, writeToFile _) - writer.commitJob() - } - - def getDynamicPartDir(tableInfo: TableDesc, row: Row, dynamicPartNum2: Int, jobConf: JobConf) :String = { - dynamicPartNum2 match { - case 0 =>"" - case i => { - val colsNum = tableInfo.getProperties.getProperty("columns").split("\\,").length - val partColStr = tableInfo.getProperties.getProperty("partition_columns") - val partCols = partColStr.split("/") - var buf = new StringBuffer() - if (partCols.length == dynamicPartNum2) { - for (j <- 0 until partCols.length) { - buf.append("/").append(partCols(j)).append("=").append(handleNull(row(colsNum + j ), jobConf)) - } - } else { - for (j <- 0 until dynamicPartNum2) { - buf.append("/").append(partCols(j + partCols.length - dynamicPartNum2)).append("=").append(handleNull(row(colsNum + j), jobConf)) + var count = 0 + // writer for Dynamic Partition + var writer2: SparkHiveHadoopWriter = null + while(iter.hasNext) { + val record = iter.next() + count += 1 + if (record._2 == null) { // without Dynamic Partition + writer.write(record._1) + } else { // for Dynamic Partition + val location = fileSinkConf.getDirName + val partLocation = location + record._2 // this is why the writer can write to different file + writer2 = writerMap.get(record._2) match { + case Some(writer)=> writer + case None => { + val tempWriter = new SparkHiveHadoopWriter(conf.value, new FileSinkDesc(partLocation, fileSinkConf.getTableInfo, false)) + tempWriter.setup(context.stageId, context.partitionId, attemptNumber) + tempWriter.open(record._2) + writerMap += (record._2 -> tempWriter) + tempWriter + } + } + writer2.write(record._1) } } - buf.toString + if (dynamicPartNum == 0) { + writer.close() + writer.commit() + } else { + for ((k,v) <- writerMap) { + v.close() + v.commit() + } + } } + + sc.sparkContext.runJob(rdd, writeToFile _) + if (dynamicPartNum == 0) { + writer.commitJob() + } else { + for ((k,v) <- writerMap) { + v.commitJob() + } + writerMap.clear() } - } - def handleNull(obj :Any, jobConf: JobConf) :String = { - if (obj == null ||obj.toString.length == 0) { - jobConf.get("hive.exec.default.partition.name ", "__HIVE_DEFAULT_PARTITION__") + + + } + /* + * e.g. + * for sql: Insert.....tablename(part1,part2) select ....val1,val2 from ... + * return: /part1=val1/part2=val2 + * for sql: Insert.....tablename(part1=val1,part2) select ....,val2 from ... + * return: /part2=val2 + * for sql: Insert.....tablename(part1=val1,part2,part3) select ....,val2,val3 from ... + * return: /part2=val2/part3=val3 + * */ + private def getDynamicPartDir(partCols: Array[String], row: Row, dynamicPartNum: Int, defaultPartName: String): String = { + assert(dynamicPartNum > 0) + partCols + .takeRight(dynamicPartNum) + .zip(row.takeRight(dynamicPartNum)) + .map { case (c, v) => s"/$c=${handleNull(v, defaultPartName)}" } + .mkString + } + /* + * if rowVal is null or "",will return HiveConf.get(hive.exec.default.partition.name) with default + * */ + private def handleNull(rowVal: Any, defaultPartName: String): String = { + if (rowVal == null ||String.valueOf(rowVal).length == 0) { + defaultPartName } else { - obj.toString + String.valueOf(rowVal) } } @@ -211,32 +251,32 @@ case class InsertIntoHiveTable( val tableLocation = table.hiveQlTable.getDataLocation val tmpLocation = hiveContext.getExternalTmpFileURI(tableLocation) val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) - var dynamicPartNum = 0 + var tmpDynamicPartNum = 0 var numStaPart = 0 - var dynamicPartPath = ""; val partitionSpec = partition.map { - case (key, Some(value)) => { numStaPart += 1; key -> value } - case (key, None) => { dynamicPartNum += 1; key -> "" } + case (key, Some(value)) => + numStaPart += 1 + key -> value + case (key, None) => + tmpDynamicPartNum += 1 + key -> "" } - // ORC stores compression information in table properties. While, there are other formats - // (e.g. RCFile) that rely on hadoop configurations to store compression information. + val dynamicPartNum = tmpDynamicPartNum val jobConf = new JobConf(sc.hiveconf) val jobConfSer = new SerializableWritable(jobConf) // check if the partition spec is valid if (dynamicPartNum > 0) { if (!sc.hiveconf.getBoolVar(HiveConf.ConfVars.DYNAMICPARTITIONING)) { - throw new SemanticException( - ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg()) + throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg()) } if (numStaPart == 0 && sc.hiveconf.getVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equalsIgnoreCase("strict")) { - throw new SemanticException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg()); + throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg()) } // check if static partition appear after dynamic partitions for ((k,v) <- partitionSpec) { if (partitionSpec(k) == "") { if (numStaPart > 0) { // found a DP, but there exists ST as subpartition - throw new SemanticException( - ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg()); + throw new SparkException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg()) } } else { numStaPart -= 1 @@ -252,96 +292,40 @@ case class InsertIntoHiveTable( ObjectInspectorCopyOption.JAVA) .asInstanceOf[StructObjectInspector] - val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray val outputData = new Array[Any](fieldOIs.length) + val defaultPartName = jobConfSer.value.get("hive.exec.default.partition.name ", "__HIVE_DEFAULT_PARTITION__") + var partColStr: Array[String] = null; + if (fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns") != null) { + partColStr = fileSinkConf + .getTableInfo + .getProperties + .getProperty("partition_columns") + .split("/") + } + iter.map { row => + var dynamicPartPath: String = null + if (dynamicPartNum > 0) { + dynamicPartPath = getDynamicPartDir(partColStr, row, dynamicPartNum, defaultPartName) + } var i = 0 while (i < fieldOIs.length) { - if (fieldOIs.length < row.length && row.length - fieldOIs.length == dynamicPartNum) { - dynamicPartPath = getDynamicPartDir(fileSinkConf.getTableInfo, row, dynamicPartNum, jobConfSer.value) - } // Casts Strings to HiveVarchars when necessary. outputData(i) = wrap(row(i), fieldOIs(i)) i += 1 } - serializer.serialize(outputData, standardOI) + serializer.serialize(outputData, standardOI) -> dynamicPartPath } } - - if (dynamicPartNum > 0) { - if (outputClass == null) { - throw new SparkException("Output value class not set") - } - jobConfSer.value.setOutputValueClass(outputClass) - if (fileSinkConf.getTableInfo.getOutputFileFormatClassName == null) { - throw new SparkException("Output format class not set") - } - // Doesn't work in Scala 2.9 due to what may be a generics bug - // TODO: Should we uncomment this for Scala 2.10? - // conf.setOutputFormat(outputFormatClass) - jobConfSer.value.set("mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName) - if (sc.hiveconf.getBoolean("hive.exec.compress.output", false)) { - // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec", - // and "mapred.output.compression.type" have no impact on ORC because it uses table properties - // to store compression information. - jobConfSer.value.set("mapred.output.compress", "true") - fileSinkConf.setCompressed(true) - fileSinkConf.setCompressCodec(jobConfSer.value.get("mapred.output.compression.codec")) - fileSinkConf.setCompressType(jobConfSer.value.get("mapred.output.compression.type")) - } - jobConfSer.value.setOutputCommitter(classOf[FileOutputCommitter]) - - FileOutputFormat.setOutputPath( - jobConfSer.value, - SparkHiveHadoopWriter.createPathFromString(fileSinkConf.getDirName, jobConfSer.value)) - - var writerMap = new scala.collection.mutable.HashMap[String, SparkHiveHadoopWriter] - def writeToFile2(context: TaskContext, iter: Iterator[Writable]) { - // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it - // around by taking a mod. We expect that no task will be attempted 2 billion times. - val attemptNumber = (context.attemptId % Int.MaxValue).toInt - val serializer = newSerializer(fileSinkConf.getTableInfo) - var count = 0 - var writer2:SparkHiveHadoopWriter = null - while(iter.hasNext) { - val record = iter.next(); - val location = fileSinkConf.getDirName - val partLocation = location + dynamicPartPath - writer2=writerMap.get(dynamicPartPath) match { - case Some(writer)=> writer - case None => { - val tempWriter = new SparkHiveHadoopWriter(jobConfSer.value, new FileSinkDesc(partLocation, fileSinkConf.getTableInfo, false)) - tempWriter.setup(context.stageId, context.partitionId, attemptNumber) - tempWriter.open(dynamicPartPath); - writerMap += (dynamicPartPath -> tempWriter) - tempWriter - } - } - count += 1 - writer2.write(record) - } - for((k,v) <- writerMap) { - v.close() - v.commit() - } - } - - sc.sparkContext.runJob(rdd, writeToFile2 _) - - for((k,v) <- writerMap) { - v.commitJob() - } - writerMap.clear() - } else { saveAsHiveFile( rdd, outputClass, fileSinkConf, - jobConf, - sc.hiveconf.getBoolean("hive.exec.compress.output", false)) - } + jobConfSer, + sc.hiveconf.getBoolean("hive.exec.compress.output", false), + dynamicPartNum) val outputPath = FileOutputFormat.getOutputPath(jobConf) // Have to construct the format of dbname.tablename. @@ -358,13 +342,13 @@ case class InsertIntoHiveTable( val inheritTableSpecs = true // TODO: Correctly set isSkewedStoreAsSubdir. val isSkewedStoreAsSubdir = false - if (dynamicPartNum > 0) { + if (dynamicPartNum>0) { db.loadDynamicPartitions( outputPath, qualifiedTableName, partitionSpec, overwrite, - dynamicPartNum/*dpCtx.getNumDPCols()*/, + dynamicPartNum, holdDDLTime, isSkewedStoreAsSubdir ) diff --git a/sql/hive/src/test/resources/golden/dynamic_partiton-0-310dfcd4399a7d152dd76020fb41ecef b/sql/hive/src/test/resources/golden/dynamic_partiton-0-310dfcd4399a7d152dd76020fb41ecef new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/dynamic_partiton-1-2bba07855af8c11899cc6b89f8c0ee02 b/sql/hive/src/test/resources/golden/dynamic_partiton-1-2bba07855af8c11899cc6b89f8c0ee02 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/dynamic_partiton-2-16367c381d4b189b3640c92511244bfe b/sql/hive/src/test/resources/golden/dynamic_partiton-2-16367c381d4b189b3640c92511244bfe new file mode 100644 index 0000000000000..573541ac9702d --- /dev/null +++ b/sql/hive/src/test/resources/golden/dynamic_partiton-2-16367c381d4b189b3640c92511244bfe @@ -0,0 +1 @@ +0 diff --git a/sql/hive/src/test/resources/golden/dynamic_partiton-3-b855e84c1d159eb6fa5fbb8ca371d318 b/sql/hive/src/test/resources/golden/dynamic_partiton-3-b855e84c1d159eb6fa5fbb8ca371d318 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/dynamic_partiton-4-ccc7d6efb0b13d5649ff98006e7ce182 b/sql/hive/src/test/resources/golden/dynamic_partiton-4-ccc7d6efb0b13d5649ff98006e7ce182 new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/dynamic_partiton-5-516a04c3833a10c0241ec00dd6474dee b/sql/hive/src/test/resources/golden/dynamic_partiton-5-516a04c3833a10c0241ec00dd6474dee new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/dynamic_partiton-6-b00f7cece45f474c6383b2a9346284ed b/sql/hive/src/test/resources/golden/dynamic_partiton-6-b00f7cece45f474c6383b2a9346284ed new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/resources/golden/dynamic_partiton-7-310dfcd4399a7d152dd76020fb41ecef b/sql/hive/src/test/resources/golden/dynamic_partiton-7-310dfcd4399a7d152dd76020fb41ecef new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 76df6f65a1bb7..59639e5d65bf2 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -271,7 +271,7 @@ class HiveQuerySuite extends HiveComparisonTest { |insert overwrite table src_lv2 SELECT key, D.* lateral view explode(array(key+3, key+4)) D as CX """.stripMargin) - createQueryTest("dynamice partiton", + createQueryTest("dynamic_partiton", """ |drop table IF EXISTS dynamic_part_table; |create table dynamic_part_table(intcol int) partitioned by (partcol1 int, partcol2 int); From cd822f0471ddf51635e0c2a7054725ab155f5939 Mon Sep 17 00:00:00 2001 From: baishuo Date: Tue, 19 Aug 2014 10:14:53 -0700 Subject: [PATCH 12/28] do a little modify --- .../src/main/scala/org/apache/spark/SparkHadoopWriter.scala | 2 +- .../apache/spark/sql/hive/execution/InsertIntoHiveTable.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index 7d1be4cbf8fd9..666d0749622c7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -117,7 +117,7 @@ private[hive] class SparkHiveHadoopWriter( conf.value.getOutputValueClass.asInstanceOf[Class[Writable]], fileSinkConf, path, - null) + Reporter.NULL) } def write(value: Writable) { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 9b2cd16651b9a..c1e0b30c35600 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -342,7 +342,7 @@ case class InsertIntoHiveTable( val inheritTableSpecs = true // TODO: Correctly set isSkewedStoreAsSubdir. val isSkewedStoreAsSubdir = false - if (dynamicPartNum>0) { + if (dynamicPartNum > 0) { db.loadDynamicPartitions( outputPath, qualifiedTableName, From b660e74574585a0572b5b21aba1ee42a87d89437 Mon Sep 17 00:00:00 2001 From: baishuo Date: Tue, 2 Sep 2014 20:28:29 -0700 Subject: [PATCH 13/28] delete a empty else branch --- .../apache/spark/sql/hive/execution/InsertIntoHiveTable.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index c1e0b30c35600..ada49e514a704 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -153,9 +153,8 @@ case class InsertIntoHiveTable( if (dynamicPartNum == 0) { writer.setup(context.stageId, context.partitionId, attemptNumber) writer.open() - } else { - } + var count = 0 // writer for Dynamic Partition var writer2: SparkHiveHadoopWriter = null @@ -221,6 +220,7 @@ case class InsertIntoHiveTable( .map { case (c, v) => s"/$c=${handleNull(v, defaultPartName)}" } .mkString } + /* * if rowVal is null or "",will return HiveConf.get(hive.exec.default.partition.name) with default * */ From caea6fb7db2a34e6e0949aabb843b7f728cab541 Mon Sep 17 00:00:00 2001 From: baishuo Date: Tue, 2 Sep 2014 23:03:34 -0700 Subject: [PATCH 14/28] modify code to pass scala style checks --- .../hive/execution/InsertIntoHiveTable.scala | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index ada49e514a704..bf107bbd72285 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -119,7 +119,8 @@ case class InsertIntoHiveTable( // Doesn't work in Scala 2.9 due to what may be a generics bug // TODO: Should we uncomment this for Scala 2.10? // conf.setOutputFormat(outputFormatClass) - conf.value.set("mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName) + conf.value.set("mapred.output.format.class", + fileSinkConf.getTableInfo.getOutputFileFormatClassName) if (isCompressed) { // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec", // and "mapred.output.compression.type" have no impact on ORC because it uses table properties @@ -136,7 +137,7 @@ case class InsertIntoHiveTable( SparkHiveHadoopWriter.createPathFromString(fileSinkConf.getDirName, conf.value)) log.debug("Saving as hadoop file of type " + valueClass.getSimpleName) var writer: SparkHiveHadoopWriter = null - //Map restore writesr for Dynamic Partition + // Map restore writesr for Dynamic Partition var writerMap: scala.collection.mutable.HashMap[String, SparkHiveHadoopWriter] = null if (dynamicPartNum == 0) { writer = new SparkHiveHadoopWriter(conf.value, fileSinkConf) @@ -169,7 +170,8 @@ case class InsertIntoHiveTable( writer2 = writerMap.get(record._2) match { case Some(writer)=> writer case None => { - val tempWriter = new SparkHiveHadoopWriter(conf.value, new FileSinkDesc(partLocation, fileSinkConf.getTableInfo, false)) + val tempWriter = new SparkHiveHadoopWriter(conf.value, + new FileSinkDesc(partLocation, fileSinkConf.getTableInfo, false)) tempWriter.setup(context.stageId, context.partitionId, attemptNumber) tempWriter.open(record._2) writerMap += (record._2 -> tempWriter) @@ -211,8 +213,11 @@ case class InsertIntoHiveTable( * return: /part2=val2 * for sql: Insert.....tablename(part1=val1,part2,part3) select ....,val2,val3 from ... * return: /part2=val2/part3=val3 - * */ - private def getDynamicPartDir(partCols: Array[String], row: Row, dynamicPartNum: Int, defaultPartName: String): String = { + */ + private def getDynamicPartDir(partCols: Array[String], + row: Row, + dynamicPartNum: Int, + defaultPartName: String): String = { assert(dynamicPartNum > 0) partCols .takeRight(dynamicPartNum) @@ -269,7 +274,8 @@ case class InsertIntoHiveTable( if (!sc.hiveconf.getBoolVar(HiveConf.ConfVars.DYNAMICPARTITIONING)) { throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg()) } - if (numStaPart == 0 && sc.hiveconf.getVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equalsIgnoreCase("strict")) { + if (numStaPart == 0 && + sc.hiveconf.getVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equalsIgnoreCase("strict")) { throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg()) } // check if static partition appear after dynamic partitions @@ -294,7 +300,8 @@ case class InsertIntoHiveTable( val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray val outputData = new Array[Any](fieldOIs.length) - val defaultPartName = jobConfSer.value.get("hive.exec.default.partition.name ", "__HIVE_DEFAULT_PARTITION__") + val defaultPartName = jobConfSer.value.get( + "hive.exec.default.partition.name ", "__HIVE_DEFAULT_PARTITION__") var partColStr: Array[String] = null; if (fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns") != null) { partColStr = fileSinkConf From 207c6ace8774f6a89411e0cd929eaedb9f957a8d Mon Sep 17 00:00:00 2001 From: baishuo Date: Wed, 3 Sep 2014 01:22:19 -0700 Subject: [PATCH 15/28] modify for some bad indentation --- .../hive/execution/InsertIntoHiveTable.scala | 35 +++++++++---------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index bf107bbd72285..1d763b992c163 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -157,7 +157,7 @@ case class InsertIntoHiveTable( } var count = 0 - // writer for Dynamic Partition + // writer for Dynamic Partition var writer2: SparkHiveHadoopWriter = null while(iter.hasNext) { val record = iter.next() @@ -201,9 +201,6 @@ case class InsertIntoHiveTable( } writerMap.clear() } - - - } /* * e.g. @@ -215,9 +212,9 @@ case class InsertIntoHiveTable( * return: /part2=val2/part3=val3 */ private def getDynamicPartDir(partCols: Array[String], - row: Row, - dynamicPartNum: Int, - defaultPartName: String): String = { + row: Row, + dynamicPartNum: Int, + defaultPartName: String): String = { assert(dynamicPartNum > 0) partCols .takeRight(dynamicPartNum) @@ -230,11 +227,11 @@ case class InsertIntoHiveTable( * if rowVal is null or "",will return HiveConf.get(hive.exec.default.partition.name) with default * */ private def handleNull(rowVal: Any, defaultPartName: String): String = { - if (rowVal == null ||String.valueOf(rowVal).length == 0) { - defaultPartName - } else { - String.valueOf(rowVal) - } + if (rowVal == null ||String.valueOf(rowVal).length == 0) { + defaultPartName + } else { + String.valueOf(rowVal) + } } override def execute() = result @@ -326,13 +323,13 @@ case class InsertIntoHiveTable( serializer.serialize(outputData, standardOI) -> dynamicPartPath } } - saveAsHiveFile( - rdd, - outputClass, - fileSinkConf, - jobConfSer, - sc.hiveconf.getBoolean("hive.exec.compress.output", false), - dynamicPartNum) + saveAsHiveFile( + rdd, + outputClass, + fileSinkConf, + jobConfSer, + sc.hiveconf.getBoolean("hive.exec.compress.output", false), + dynamicPartNum) val outputPath = FileOutputFormat.getOutputPath(jobConf) // Have to construct the format of dbname.tablename. From 761ecf20365f2864e28b7cad2c0550b159724eb3 Mon Sep 17 00:00:00 2001 From: baishuo Date: Tue, 9 Sep 2014 02:57:48 -0700 Subject: [PATCH 16/28] modify according micheal's advice --- .../org/apache/spark/SparkHadoopWriter.scala | 19 +- .../hive/execution/InsertIntoHiveTable.scala | 170 +++++++++--------- 2 files changed, 96 insertions(+), 93 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index 666d0749622c7..bbb66ae6005bd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -93,8 +93,17 @@ private[hive] class SparkHiveHadoopWriter( null) } + /** + * create an HiveRecordWriter. imitate the above function open() + * @param dynamicPartPath the relative path for dynamic partition + * + * since this function is used to create different writer for + * different dynamic partition.So we need a parameter dynamicPartPath + * and use it we can calculate a new path and pass the new path to + * the function HiveFileFormatUtils.getHiveRecordWriter + */ def open(dynamicPartPath: String) { - val numfmt = SparkHiveHadoopWriter.threadLocalNumberFormat.get() + val numfmt = NumberFormat.getInstance() numfmt.setMinimumIntegerDigits(5) numfmt.setGroupingUsed(false) @@ -108,7 +117,7 @@ private[hive] class SparkHiveHadoopWriter( if (outputPath == null) { throw new IOException("Undefined job output-path") } - val workPath = new Path(outputPath, dynamicPartPath.substring(1)) // remove "/" + val workPath = new Path(outputPath, dynamicPartPath.stripPrefix("/")) // remove "/" val path = new Path(workPath, outputName) getOutputCommitter().setupTask(getTaskContext()) writer = HiveFileFormatUtils.getHiveRecordWriter( @@ -219,10 +228,4 @@ private[hive] object SparkHiveHadoopWriter { } outputPath.makeQualified(fs) } - - val threadLocalNumberFormat = new ThreadLocal[NumberFormat] { - override def initialValue() = { - NumberFormat.getInstance() - } - } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 1d763b992c163..5153a4c2bc38c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -102,6 +102,11 @@ case class InsertIntoHiveTable( obj } + /** + * since we should get directory of dynamic partition from upstream RDD + * reference the code "serializer.serialize(outputData, standardOI) -> dynamicPartPath" + * So The type of the elment in RDD is (Writable, String) + */ def saveAsHiveFile( rdd: RDD[(Writable, String)], valueClass: Class[_], @@ -142,80 +147,77 @@ case class InsertIntoHiveTable( if (dynamicPartNum == 0) { writer = new SparkHiveHadoopWriter(conf.value, fileSinkConf) writer.preSetup() + sc.sparkContext.runJob(rdd, writeToFile _) + writer.commitJob() } else { writerMap = new scala.collection.mutable.HashMap[String, SparkHiveHadoopWriter] + sc.sparkContext.runJob(rdd, writeToFile _) + for ((k,v) <- writerMap) { + v.commitJob() + } + writerMap.clear() } def writeToFile(context: TaskContext, iter: Iterator[(Writable, String)]) { - // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it - // around by taking a mod. We expect that no task will be attempted 2 billion times. - val attemptNumber = (context.attemptId % Int.MaxValue).toInt - // writer for No Dynamic Partition - if (dynamicPartNum == 0) { - writer.setup(context.stageId, context.partitionId, attemptNumber) - writer.open() - } + // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it + // around by taking a mod. We expect that no task will be attempted 2 billion times. + val attemptNumber = (context.attemptId % Int.MaxValue).toInt - var count = 0 - // writer for Dynamic Partition - var writer2: SparkHiveHadoopWriter = null - while(iter.hasNext) { - val record = iter.next() - count += 1 - if (record._2 == null) { // without Dynamic Partition - writer.write(record._1) - } else { // for Dynamic Partition - val location = fileSinkConf.getDirName - val partLocation = location + record._2 // this is why the writer can write to different file - writer2 = writerMap.get(record._2) match { - case Some(writer)=> writer - case None => { - val tempWriter = new SparkHiveHadoopWriter(conf.value, - new FileSinkDesc(partLocation, fileSinkConf.getTableInfo, false)) - tempWriter.setup(context.stageId, context.partitionId, attemptNumber) - tempWriter.open(record._2) - writerMap += (record._2 -> tempWriter) - tempWriter - } - } - writer2.write(record._1) - } + if (dynamicPartNum == 0) { // for All static partition + writer.setup(context.stageId, context.partitionId, attemptNumber) + writer.open() + // writer for Dynamic Partition + while(iter.hasNext) { + val record = iter.next() + writer.write(record._1) } - if (dynamicPartNum == 0) { writer.close() writer.commit() - } else { + } else { // if there is dynamic Partition + while(iter.hasNext) { + val record = iter.next() + val location = fileSinkConf.getDirName + val partLocation = location + record._2 // different writer related with different file + def createNewWriter(): SparkHiveHadoopWriter = { + val tempWriter = new SparkHiveHadoopWriter(conf.value, + new FileSinkDesc(partLocation, fileSinkConf.getTableInfo, false)) + tempWriter.setup(context.stageId, context.partitionId, attemptNumber) + tempWriter.open(record._2) + writerMap += (record._2 -> tempWriter) + tempWriter + } + val writer2 = writerMap.getOrElseUpdate(record._2, createNewWriter) + writer2.write(record._1) + } for ((k,v) <- writerMap) { v.close() v.commit() } } - } - - sc.sparkContext.runJob(rdd, writeToFile _) - if (dynamicPartNum == 0) { - writer.commitJob() - } else { - for ((k,v) <- writerMap) { - v.commitJob() - } - writerMap.clear() } } - /* - * e.g. - * for sql: Insert.....tablename(part1,part2) select ....val1,val2 from ... - * return: /part1=val1/part2=val2 - * for sql: Insert.....tablename(part1=val1,part2) select ....,val2 from ... - * return: /part2=val2 - * for sql: Insert.....tablename(part1=val1,part2,part3) select ....,val2,val3 from ... - * return: /part2=val2/part3=val3 - */ + + /** + * Returns the Dynamic partition directory for the given row. + * @param partCols an array containing the string names of the partition columns + * + * we get the last dynamicPartNum elements from partCols and + * last dynamicPartNum elements from the current row, + * then we can construct a String for dynamic partition directory + * For example: + * for sql: Insert.....tablename(part1,part2) select ....val1,val2 from ... + * return: /part1=val1/part2=val2 + * for sql: Insert.....tablename(part1=val1,part2) select ....,val2 from ... + * return: /part2=val2 + * for sql: Insert.....tablename(part1=val1,part2,part3) select ....,val2,val3 from ... + * return: /part2=val2/part3=val3 + */ private def getDynamicPartDir(partCols: Array[String], row: Row, dynamicPartNum: Int, defaultPartName: String): String = { assert(dynamicPartNum > 0) + // TODO needs optimization partCols .takeRight(dynamicPartNum) .zip(row.takeRight(dynamicPartNum)) @@ -223,15 +225,16 @@ case class InsertIntoHiveTable( .mkString } - /* - * if rowVal is null or "",will return HiveConf.get(hive.exec.default.partition.name) with default - * */ - private def handleNull(rowVal: Any, defaultPartName: String): String = { - if (rowVal == null ||String.valueOf(rowVal).length == 0) { - defaultPartName - } else { - String.valueOf(rowVal) - } + /** + * Returns `rowVal` as a String. + * If `rowVal` is null or equal to "", returns the default partition name. + */ + private def handleNull(rowVal: Any, defaultPartName: String): String = { + if (rowVal == null ||String.valueOf(rowVal).length == 0) { + defaultPartName + } else { + String.valueOf(rowVal) + } } override def execute() = result @@ -253,36 +256,36 @@ case class InsertIntoHiveTable( val tableLocation = table.hiveQlTable.getDataLocation val tmpLocation = hiveContext.getExternalTmpFileURI(tableLocation) val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) - var tmpDynamicPartNum = 0 - var numStaPart = 0 + + val numDynamicPartitions = partition.values.filter(_.isEmpty).size + val numStaticPartitions = partition.values.filter(_.isDefined).size val partitionSpec = partition.map { case (key, Some(value)) => - numStaPart += 1 key -> value case (key, None) => - tmpDynamicPartNum += 1 key -> "" } - val dynamicPartNum = tmpDynamicPartNum + val jobConf = new JobConf(sc.hiveconf) val jobConfSer = new SerializableWritable(jobConf) // check if the partition spec is valid - if (dynamicPartNum > 0) { + if (numDynamicPartitions > 0) { if (!sc.hiveconf.getBoolVar(HiveConf.ConfVars.DYNAMICPARTITIONING)) { throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg()) } - if (numStaPart == 0 && + if (numStaticPartitions == 0 && sc.hiveconf.getVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equalsIgnoreCase("strict")) { throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg()) } // check if static partition appear after dynamic partitions + var tmpNumStaticPartitions = numStaticPartitions for ((k,v) <- partitionSpec) { if (partitionSpec(k) == "") { - if (numStaPart > 0) { // found a DP, but there exists ST as subpartition + if (tmpNumStaticPartitions > 0) { // found a DP, but there exists ST as subpartition throw new SparkException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg()) } } else { - numStaPart -= 1 + tmpNumStaticPartitions -= 1 } } } @@ -299,19 +302,16 @@ case class InsertIntoHiveTable( val outputData = new Array[Any](fieldOIs.length) val defaultPartName = jobConfSer.value.get( "hive.exec.default.partition.name ", "__HIVE_DEFAULT_PARTITION__") - var partColStr: Array[String] = null; - if (fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns") != null) { - partColStr = fileSinkConf - .getTableInfo - .getProperties - .getProperty("partition_columns") - .split("/") - } + + val partitionColumns = fileSinkConf.getTableInfo. + getProperties.getProperty("partition_columns") // a String like "colname1/colname2" + val partitionColumnNames = Option(partitionColumns).map(_.split("/")).orNull iter.map { row => var dynamicPartPath: String = null - if (dynamicPartNum > 0) { - dynamicPartPath = getDynamicPartDir(partColStr, row, dynamicPartNum, defaultPartName) + if (numDynamicPartitions > 0) { + dynamicPartPath = getDynamicPartDir(partitionColumnNames, row, + numDynamicPartitions, defaultPartName) } var i = 0 while (i < fieldOIs.length) { @@ -319,7 +319,7 @@ case class InsertIntoHiveTable( outputData(i) = wrap(row(i), fieldOIs(i)) i += 1 } - + // pass the dynamicPartPath to downStream RDD serializer.serialize(outputData, standardOI) -> dynamicPartPath } } @@ -329,7 +329,7 @@ case class InsertIntoHiveTable( fileSinkConf, jobConfSer, sc.hiveconf.getBoolean("hive.exec.compress.output", false), - dynamicPartNum) + numDynamicPartitions) val outputPath = FileOutputFormat.getOutputPath(jobConf) // Have to construct the format of dbname.tablename. @@ -346,13 +346,13 @@ case class InsertIntoHiveTable( val inheritTableSpecs = true // TODO: Correctly set isSkewedStoreAsSubdir. val isSkewedStoreAsSubdir = false - if (dynamicPartNum > 0) { + if (numDynamicPartitions > 0) { db.loadDynamicPartitions( outputPath, qualifiedTableName, partitionSpec, overwrite, - dynamicPartNum, + numDynamicPartitions, holdDDLTime, isSkewedStoreAsSubdir ) From 997c990855d94dd2df2ab412566c438def97de61 Mon Sep 17 00:00:00 2001 From: baishuo Date: Tue, 9 Sep 2014 19:51:20 -0700 Subject: [PATCH 17/28] use HiveConf.DEFAULTPARTITIONNAME to replace hive.exec.default.partition.name --- .../apache/spark/sql/hive/execution/InsertIntoHiveTable.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 5153a4c2bc38c..c750187597a99 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -301,8 +301,8 @@ case class InsertIntoHiveTable( val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray val outputData = new Array[Any](fieldOIs.length) val defaultPartName = jobConfSer.value.get( - "hive.exec.default.partition.name ", "__HIVE_DEFAULT_PARTITION__") - + HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname, HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultVal) + val partitionColumns = fileSinkConf.getTableInfo. getProperties.getProperty("partition_columns") // a String like "colname1/colname2" val partitionColumnNames = Option(partitionColumns).map(_.split("/")).orNull From b8216119605132bb70a8862e921b0861d5ec7f9f Mon Sep 17 00:00:00 2001 From: baishuo Date: Tue, 9 Sep 2014 20:09:20 -0700 Subject: [PATCH 18/28] pass check style --- .../apache/spark/sql/hive/execution/InsertIntoHiveTable.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index c750187597a99..12b4c3eec6ab0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -301,7 +301,8 @@ case class InsertIntoHiveTable( val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray val outputData = new Array[Any](fieldOIs.length) val defaultPartName = jobConfSer.value.get( - HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname, HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultVal) + HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname, + HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultVal) val partitionColumns = fileSinkConf.getTableInfo. getProperties.getProperty("partition_columns") // a String like "colname1/colname2" From d53daa5a263cf0b0fdf47111f1d8ba0c55d08b24 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 16 Sep 2014 00:06:25 -0700 Subject: [PATCH 19/28] Refactors dynamic partitioning support Conflicts: sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala --- .../{ => sql/hive}/SparkHadoopWriter.scala | 197 ++++++-------- .../hive/execution/InsertIntoHiveTable.scala | 254 ++++++------------ 2 files changed, 172 insertions(+), 279 deletions(-) rename sql/hive/src/main/scala/org/apache/spark/{ => sql/hive}/SparkHadoopWriter.scala (50%) diff --git a/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SparkHadoopWriter.scala similarity index 50% rename from sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala rename to sql/hive/src/main/scala/org/apache/spark/sql/hive/SparkHadoopWriter.scala index bbb66ae6005bd..6e07b51f49230 100644 --- a/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SparkHadoopWriter.scala @@ -21,20 +21,23 @@ import java.io.IOException import java.text.NumberFormat import java.util.Date +import scala.collection.mutable + import org.apache.hadoop.fs.Path import org.apache.hadoop.hive.ql.exec.{FileSinkOperator, Utilities} import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat} import org.apache.hadoop.hive.ql.plan.FileSinkDesc -import org.apache.hadoop.mapred._ import org.apache.hadoop.io.Writable +import org.apache.hadoop.mapred._ +import org.apache.spark.sql.Row import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter} /** * Internal helper class that saves an RDD using a Hive OutputFormat. * It is based on [[SparkHadoopWriter]]. */ -private[hive] class SparkHiveHadoopWriter( +private[hive] class SparkHiveWriterContainer( @transient jobConf: JobConf, fileSinkConf: FileSinkDesc) extends Logging @@ -42,7 +45,7 @@ private[hive] class SparkHiveHadoopWriter( with Serializable { private val now = new Date() - private val conf = new SerializableWritable(jobConf) + protected val conf = new SerializableWritable(jobConf) private var jobID = 0 private var splitID = 0 @@ -51,152 +54,75 @@ private[hive] class SparkHiveHadoopWriter( private var taID: SerializableWritable[TaskAttemptID] = null @transient private var writer: FileSinkOperator.RecordWriter = null - @transient private var format: HiveOutputFormat[AnyRef, Writable] = null - @transient private var committer: OutputCommitter = null - @transient private var jobContext: JobContext = null - @transient private var taskContext: TaskAttemptContext = null + @transient private lazy val committer = conf.value.getOutputCommitter + @transient private lazy val jobContext = newJobContext(conf.value, jID.value) + @transient private lazy val taskContext = newTaskAttemptContext(conf.value, taID.value) + @transient private lazy val outputFormat = + conf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef,Writable]] - def preSetup() { + def driverSideSetup() { setIDs(0, 0, 0) setConfParams() - - val jCtxt = getJobContext() - getOutputCommitter().setupJob(jCtxt) + committer.setupJob(jobContext) } - - def setup(jobid: Int, splitid: Int, attemptid: Int) { - setIDs(jobid, splitid, attemptid) + def executorSideSetup(jobId: Int, splitId: Int, attemptId: Int) { + setIDs(jobId, splitId, attemptId) setConfParams() - } - - def open() { - val numfmt = NumberFormat.getInstance() - numfmt.setMinimumIntegerDigits(5) - numfmt.setGroupingUsed(false) - - val extension = Utilities.getFileExtension( - conf.value, - fileSinkConf.getCompressed, - getOutputFormat()) - - val outputName = "part-" + numfmt.format(splitID) + extension - val path = FileOutputFormat.getTaskOutputPath(conf.value, outputName) - - getOutputCommitter().setupTask(getTaskContext()) - writer = HiveFileFormatUtils.getHiveRecordWriter( - conf.value, - fileSinkConf.getTableInfo, - conf.value.getOutputValueClass.asInstanceOf[Class[Writable]], - fileSinkConf, - path, - null) + committer.setupTask(taskContext) } /** - * create an HiveRecordWriter. imitate the above function open() - * @param dynamicPartPath the relative path for dynamic partition - * - * since this function is used to create different writer for - * different dynamic partition.So we need a parameter dynamicPartPath - * and use it we can calculate a new path and pass the new path to - * the function HiveFileFormatUtils.getHiveRecordWriter + * Create a `HiveRecordWriter`. A relative dynamic partition path can be used to create a writer + * for writing data to a dynamic partition. */ - def open(dynamicPartPath: String) { - val numfmt = NumberFormat.getInstance() - numfmt.setMinimumIntegerDigits(5) - numfmt.setGroupingUsed(false) - - val extension = Utilities.getFileExtension( - conf.value, - fileSinkConf.getCompressed, - getOutputFormat()) - - val outputName = "part-" + numfmt.format(splitID) + extension - val outputPath: Path = FileOutputFormat.getOutputPath(conf.value) - if (outputPath == null) { - throw new IOException("Undefined job output-path") - } - val workPath = new Path(outputPath, dynamicPartPath.stripPrefix("/")) // remove "/" - val path = new Path(workPath, outputName) - getOutputCommitter().setupTask(getTaskContext()) + def open() { writer = HiveFileFormatUtils.getHiveRecordWriter( conf.value, fileSinkConf.getTableInfo, conf.value.getOutputValueClass.asInstanceOf[Class[Writable]], fileSinkConf, - path, + FileOutputFormat.getTaskOutputPath(conf.value, getOutputName), Reporter.NULL) } - def write(value: Writable) { - if (writer != null) { - writer.write(value) - } else { - throw new IOException("Writer is null, open() has not been called") - } + protected def getOutputName: String = { + val numberFormat = NumberFormat.getInstance() + numberFormat.setMinimumIntegerDigits(5) + numberFormat.setGroupingUsed(false) + val extension = Utilities.getFileExtension(conf.value, fileSinkConf.getCompressed, outputFormat) + "part-" + numberFormat.format(splitID) + extension } + def getLocalFileWriter(row: Row): FileSinkOperator.RecordWriter = writer + def close() { // Seems the boolean value passed into close does not matter. writer.close(false) } def commit() { - val taCtxt = getTaskContext() - val cmtr = getOutputCommitter() - if (cmtr.needsTaskCommit(taCtxt)) { + if (committer.needsTaskCommit(taskContext)) { try { - cmtr.commitTask(taCtxt) + committer.commitTask(taskContext) logInfo (taID + ": Committed") } catch { case e: IOException => logError("Error committing the output of task: " + taID.value, e) - cmtr.abortTask(taCtxt) + committer.abortTask(taskContext) throw e } } else { - logWarning ("No need to commit output of task: " + taID.value) + logInfo("No need to commit output of task: " + taID.value) } } def commitJob() { - // always ? Or if cmtr.needsTaskCommit ? - val cmtr = getOutputCommitter() - cmtr.commitJob(getJobContext()) + committer.commitJob(jobContext) } // ********* Private Functions ********* - private def getOutputFormat(): HiveOutputFormat[AnyRef,Writable] = { - if (format == null) { - format = conf.value.getOutputFormat() - .asInstanceOf[HiveOutputFormat[AnyRef,Writable]] - } - format - } - - private def getOutputCommitter(): OutputCommitter = { - if (committer == null) { - committer = conf.value.getOutputCommitter - } - committer - } - - private def getJobContext(): JobContext = { - if (jobContext == null) { - jobContext = newJobContext(conf.value, jID.value) - } - jobContext - } - - private def getTaskContext(): TaskAttemptContext = { - if (taskContext == null) { - taskContext = newTaskAttemptContext(conf.value, taID.value) - } - taskContext - } - private def setIDs(jobId: Int, splitId: Int, attemptId: Int) { jobID = jobId splitID = splitId @@ -216,7 +142,7 @@ private[hive] class SparkHiveHadoopWriter( } } -private[hive] object SparkHiveHadoopWriter { +private[hive] object SparkHiveWriterContainer { def createPathFromString(path: String, conf: JobConf): Path = { if (path == null) { throw new IllegalArgumentException("Output path is null") @@ -226,6 +152,59 @@ private[hive] object SparkHiveHadoopWriter { if (outputPath == null || fs == null) { throw new IllegalArgumentException("Incorrectly formatted output path") } - outputPath.makeQualified(fs) + outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory) + } +} + +private[spark] class SparkHiveDynamicPartitionWriterContainer( + @transient jobConf: JobConf, + fileSinkConf: FileSinkDesc, + dynamicPartColNames: Array[String], + defaultPartName: String) + extends SparkHiveWriterContainer(jobConf, fileSinkConf) { + + @transient var writers: mutable.HashMap[String, FileSinkOperator.RecordWriter] = _ + + override def open(): Unit = { + writers = mutable.HashMap.empty[String, FileSinkOperator.RecordWriter] + } + + override def close(): Unit = { + writers.values.foreach(_.close(false)) + } + + override def getLocalFileWriter(row: Row): FileSinkOperator.RecordWriter = { + val dynamicPartPath = dynamicPartColNames + .zip(row.takeRight(dynamicPartColNames.length)) + .map { case (col, rawVal) => + val string = String.valueOf(rawVal) + s"/$col=${if (rawVal == null || string.isEmpty) defaultPartName else string}" + } + .mkString + + val path = { + val outputPath = FileOutputFormat.getOutputPath(conf.value) + assert(outputPath != null, "Undefined job output-path") + val workPath = new Path(outputPath, dynamicPartPath.stripPrefix("/")) + new Path(workPath, getOutputName) + } + + def newWriter = { + val newFileSinkDesc = new FileSinkDesc( + fileSinkConf.getDirName + dynamicPartPath, + fileSinkConf.getTableInfo, + fileSinkConf.getCompressed) + newFileSinkDesc.setCompressCodec(fileSinkConf.getCompressCodec) + newFileSinkDesc.setCompressType(fileSinkConf.getCompressType) + HiveFileFormatUtils.getHiveRecordWriter( + conf.value, + fileSinkConf.getTableInfo, + conf.value.getOutputValueClass.asInstanceOf[Class[Writable]], + newFileSinkDesc, + path, + Reporter.NULL) + } + + writers.getOrElseUpdate(dynamicPartPath, newWriter) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 12b4c3eec6ab0..c88ae70063b4b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -18,30 +18,29 @@ package org.apache.spark.sql.hive.execution import scala.collection.JavaConversions._ - -import java.util.{HashMap => JHashMap} +import scala.collection.mutable import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar} +import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.metastore.MetaStoreUtils -import org.apache.hadoop.hive.ql.Context -import org.apache.hadoop.hive.ql.ErrorMsg import org.apache.hadoop.hive.ql.metadata.Hive import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc} +import org.apache.hadoop.hive.ql.{Context, ErrorMsg} import org.apache.hadoop.hive.serde2.Serializer -import org.apache.hadoop.hive.serde2.objectinspector._ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption -import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveDecimalObjectInspector -import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveVarcharObjectInspector +import org.apache.hadoop.hive.serde2.objectinspector._ +import org.apache.hadoop.hive.serde2.objectinspector.primitive.{JavaHiveDecimalObjectInspector, JavaHiveVarcharObjectInspector} import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf} -import org.apache.spark.{SerializableWritable, SparkException, TaskContext} +import org.apache.spark.SparkContext._ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.Row import org.apache.spark.sql.execution.{SparkPlan, UnaryNode} -import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation, SparkHiveHadoopWriter} -import org.apache.hadoop.hive.conf.HiveConf +import org.apache.spark.sql.hive._ +import org.apache.spark.{SerializableWritable, SparkException, TaskContext} /** * :: DeveloperApi :: @@ -102,30 +101,23 @@ case class InsertIntoHiveTable( obj } - /** - * since we should get directory of dynamic partition from upstream RDD - * reference the code "serializer.serialize(outputData, standardOI) -> dynamicPartPath" - * So The type of the elment in RDD is (Writable, String) - */ def saveAsHiveFile( - rdd: RDD[(Writable, String)], + rdd: RDD[Row], valueClass: Class[_], fileSinkConf: FileSinkDesc, conf: SerializableWritable[JobConf], isCompressed: Boolean, - dynamicPartNum: Int) { - if (valueClass == null) { - throw new SparkException("Output value class not set") - } + writerContainer: SparkHiveWriterContainer) { + assert(valueClass != null, "Output value class not set") conf.value.setOutputValueClass(valueClass) - if (fileSinkConf.getTableInfo.getOutputFileFormatClassName == null) { - throw new SparkException("Output format class not set") - } + + assert(fileSinkConf.getTableInfo.getOutputFileFormatClassName != null) // Doesn't work in Scala 2.9 due to what may be a generics bug // TODO: Should we uncomment this for Scala 2.10? // conf.setOutputFormat(outputFormatClass) - conf.value.set("mapred.output.format.class", - fileSinkConf.getTableInfo.getOutputFileFormatClassName) + conf.value.set( + "mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName) + if (isCompressed) { // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec", // and "mapred.output.compression.type" have no impact on ORC because it uses table properties @@ -139,101 +131,44 @@ case class InsertIntoHiveTable( FileOutputFormat.setOutputPath( conf.value, - SparkHiveHadoopWriter.createPathFromString(fileSinkConf.getDirName, conf.value)) + SparkHiveWriterContainer.createPathFromString(fileSinkConf.getDirName, conf.value)) log.debug("Saving as hadoop file of type " + valueClass.getSimpleName) - var writer: SparkHiveHadoopWriter = null - // Map restore writesr for Dynamic Partition - var writerMap: scala.collection.mutable.HashMap[String, SparkHiveHadoopWriter] = null - if (dynamicPartNum == 0) { - writer = new SparkHiveHadoopWriter(conf.value, fileSinkConf) - writer.preSetup() - sc.sparkContext.runJob(rdd, writeToFile _) - writer.commitJob() - } else { - writerMap = new scala.collection.mutable.HashMap[String, SparkHiveHadoopWriter] - sc.sparkContext.runJob(rdd, writeToFile _) - for ((k,v) <- writerMap) { - v.commitJob() - } - writerMap.clear() - } - def writeToFile(context: TaskContext, iter: Iterator[(Writable, String)]) { + writerContainer.driverSideSetup() + sc.sparkContext.runJob(rdd, writeToFile _) + writerContainer.commitJob() + + // Note that this function is executed on executor side + def writeToFile(context: TaskContext, iterator: Iterator[Row]) { + val serializer = newSerializer(fileSinkConf.getTableInfo) + val standardOI = ObjectInspectorUtils + .getStandardObjectInspector( + fileSinkConf.getTableInfo.getDeserializer.getObjectInspector, + ObjectInspectorCopyOption.JAVA) + .asInstanceOf[StructObjectInspector] + + val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray + val outputData = new Array[Any](fieldOIs.length) + // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it // around by taking a mod. We expect that no task will be attempted 2 billion times. val attemptNumber = (context.attemptId % Int.MaxValue).toInt + writerContainer.executorSideSetup(context.stageId, context.partitionId, attemptNumber) + writerContainer.open() - if (dynamicPartNum == 0) { // for All static partition - writer.setup(context.stageId, context.partitionId, attemptNumber) - writer.open() - // writer for Dynamic Partition - while(iter.hasNext) { - val record = iter.next() - writer.write(record._1) - } - writer.close() - writer.commit() - } else { // if there is dynamic Partition - while(iter.hasNext) { - val record = iter.next() - val location = fileSinkConf.getDirName - val partLocation = location + record._2 // different writer related with different file - def createNewWriter(): SparkHiveHadoopWriter = { - val tempWriter = new SparkHiveHadoopWriter(conf.value, - new FileSinkDesc(partLocation, fileSinkConf.getTableInfo, false)) - tempWriter.setup(context.stageId, context.partitionId, attemptNumber) - tempWriter.open(record._2) - writerMap += (record._2 -> tempWriter) - tempWriter - } - val writer2 = writerMap.getOrElseUpdate(record._2, createNewWriter) - writer2.write(record._1) - } - for ((k,v) <- writerMap) { - v.close() - v.commit() + iterator.foreach { row => + var i = 0 + while (i < fieldOIs.length) { + outputData(i) = wrap(row(i), fieldOIs(i)) + i += 1 } - } - } - } - /** - * Returns the Dynamic partition directory for the given row. - * @param partCols an array containing the string names of the partition columns - * - * we get the last dynamicPartNum elements from partCols and - * last dynamicPartNum elements from the current row, - * then we can construct a String for dynamic partition directory - * For example: - * for sql: Insert.....tablename(part1,part2) select ....val1,val2 from ... - * return: /part1=val1/part2=val2 - * for sql: Insert.....tablename(part1=val1,part2) select ....,val2 from ... - * return: /part2=val2 - * for sql: Insert.....tablename(part1=val1,part2,part3) select ....,val2,val3 from ... - * return: /part2=val2/part3=val3 - */ - private def getDynamicPartDir(partCols: Array[String], - row: Row, - dynamicPartNum: Int, - defaultPartName: String): String = { - assert(dynamicPartNum > 0) - // TODO needs optimization - partCols - .takeRight(dynamicPartNum) - .zip(row.takeRight(dynamicPartNum)) - .map { case (c, v) => s"/$c=${handleNull(v, defaultPartName)}" } - .mkString - } + val writer = writerContainer.getLocalFileWriter(row) + writer.write(serializer.serialize(outputData, standardOI)) + } - /** - * Returns `rowVal` as a String. - * If `rowVal` is null or equal to "", returns the default partition name. - */ - private def handleNull(rowVal: Any, defaultPartName: String): String = { - if (rowVal == null ||String.valueOf(rowVal).length == 0) { - defaultPartName - } else { - String.valueOf(rowVal) + writerContainer.close() + writerContainer.commit() } } @@ -247,9 +182,6 @@ case class InsertIntoHiveTable( * Note: this is run once and then kept to avoid double insertions. */ private lazy val result: RDD[Row] = { - val childRdd = child.execute() - assert(childRdd != null) - // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer // instances within the closure, since Serializer is not serializable while TableDesc is. val tableDesc = table.tableDesc @@ -257,80 +189,62 @@ case class InsertIntoHiveTable( val tmpLocation = hiveContext.getExternalTmpFileURI(tableLocation) val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) - val numDynamicPartitions = partition.values.filter(_.isEmpty).size - val numStaticPartitions = partition.values.filter(_.isDefined).size + val numDynamicPartitions = partition.values.count(_.isEmpty) + val numStaticPartitions = partition.values.count(_.nonEmpty) val partitionSpec = partition.map { - case (key, Some(value)) => - key -> value - case (key, None) => - key -> "" + case (key, Some(value)) => key -> value + case (key, None) => key -> "" } - val jobConf = new JobConf(sc.hiveconf) - val jobConfSer = new SerializableWritable(jobConf) - // check if the partition spec is valid + // All partition column names in the format of "//..." + val partitionColumns = fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns") + val partitionColumnNames = Option(partitionColumns).map(_.split("/")).orNull + + // Validate partition spec if there exist any dynamic partitions if (numDynamicPartitions > 0) { + // Report error if dynamic partitioning is not enabled if (!sc.hiveconf.getBoolVar(HiveConf.ConfVars.DYNAMICPARTITIONING)) { - throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg()) + throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg) } + + // Report error if dynamic partition strict mode is on but no static partition is found if (numStaticPartitions == 0 && sc.hiveconf.getVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equalsIgnoreCase("strict")) { - throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg()) + throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg) } - // check if static partition appear after dynamic partitions - var tmpNumStaticPartitions = numStaticPartitions - for ((k,v) <- partitionSpec) { - if (partitionSpec(k) == "") { - if (tmpNumStaticPartitions > 0) { // found a DP, but there exists ST as subpartition - throw new SparkException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg()) - } - } else { - tmpNumStaticPartitions -= 1 - } + + // Report error if any static partition appears after a dynamic partition + val isDynamic = partitionColumnNames.map(partitionSpec(_).isEmpty) + isDynamic.init.zip(isDynamic.tail).find(_ == (true, false)).foreach { _ => + throw new SparkException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg) } } - val rdd = childRdd.mapPartitions { iter => - val serializer = newSerializer(fileSinkConf.getTableInfo) - val standardOI = ObjectInspectorUtils - .getStandardObjectInspector( - fileSinkConf.getTableInfo.getDeserializer.getObjectInspector, - ObjectInspectorCopyOption.JAVA) - .asInstanceOf[StructObjectInspector] + val jobConf = new JobConf(sc.hiveconf) + val jobConfSer = new SerializableWritable(jobConf) - val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray - val outputData = new Array[Any](fieldOIs.length) - val defaultPartName = jobConfSer.value.get( - HiveConf.ConfVars.DEFAULTPARTITIONNAME.varname, - HiveConf.ConfVars.DEFAULTPARTITIONNAME.defaultVal) - - val partitionColumns = fileSinkConf.getTableInfo. - getProperties.getProperty("partition_columns") // a String like "colname1/colname2" - val partitionColumnNames = Option(partitionColumns).map(_.split("/")).orNull - - iter.map { row => - var dynamicPartPath: String = null - if (numDynamicPartitions > 0) { - dynamicPartPath = getDynamicPartDir(partitionColumnNames, row, - numDynamicPartitions, defaultPartName) - } - var i = 0 - while (i < fieldOIs.length) { - // Casts Strings to HiveVarchars when necessary. - outputData(i) = wrap(row(i), fieldOIs(i)) - i += 1 - } - // pass the dynamicPartPath to downStream RDD - serializer.serialize(outputData, standardOI) -> dynamicPartPath - } + val defaultPartName = jobConf.get( + ConfVars.DEFAULTPARTITIONNAME.varname, ConfVars.DEFAULTPARTITIONNAME.defaultVal) + val writerContainer = if (numDynamicPartitions > 0) { + new SparkHiveDynamicPartitionWriterContainer( + jobConf, + fileSinkConf, + partitionColumnNames.takeRight(numDynamicPartitions), + defaultPartName) + } else { + new SparkHiveWriterContainer(jobConf, fileSinkConf) } + + val isCompressed = jobConf.getBoolean( + ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal) + saveAsHiveFile( - rdd, + child.execute(), outputClass, fileSinkConf, jobConfSer, - sc.hiveconf.getBoolean("hive.exec.compress.output", false), - numDynamicPartitions) + isCompressed, + writerContainer) val outputPath = FileOutputFormat.getOutputPath(jobConf) // Have to construct the format of dbname.tablename. From 6fb16d76089e07c4fc273a86c7f887949e904b57 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 16 Sep 2014 18:32:12 -0700 Subject: [PATCH 20/28] Fixes typo in test name, regenerated golden answer files --- ...tition-0-be33aaa7253c8f248ff3921cd7dae340} | 0 ...tition-1-640552dd462707563fd255a713f83b41} | 0 ...tition-2-36456c9d0d2e3ef72ab5ba9ba48e5493} | 0 ...tition-3-b7f7fa7ebf666f4fee27e149d8c6961f} | 0 ...tition-4-8bdb71ad8cb3cc3026043def2525de3a} | 0 ...tition-5-c630dce438f3792e7fb0f523fbbb3e1e} | 0 ...tition-6-7abc9ec8a36cdc5e89e955265a7fd7cf} | 0 ...tition-7-be33aaa7253c8f248ff3921cd7dae340} | 0 .../sql/hive/execution/HiveQuerySuite.scala | 34 ++++++++++++------- 9 files changed, 22 insertions(+), 12 deletions(-) rename sql/hive/src/test/resources/golden/{dynamic_partiton-0-310dfcd4399a7d152dd76020fb41ecef => dynamic_partition-0-be33aaa7253c8f248ff3921cd7dae340} (100%) rename sql/hive/src/test/resources/golden/{dynamic_partiton-1-2bba07855af8c11899cc6b89f8c0ee02 => dynamic_partition-1-640552dd462707563fd255a713f83b41} (100%) rename sql/hive/src/test/resources/golden/{dynamic_partiton-2-16367c381d4b189b3640c92511244bfe => dynamic_partition-2-36456c9d0d2e3ef72ab5ba9ba48e5493} (100%) rename sql/hive/src/test/resources/golden/{dynamic_partiton-3-b855e84c1d159eb6fa5fbb8ca371d318 => dynamic_partition-3-b7f7fa7ebf666f4fee27e149d8c6961f} (100%) rename sql/hive/src/test/resources/golden/{dynamic_partiton-4-ccc7d6efb0b13d5649ff98006e7ce182 => dynamic_partition-4-8bdb71ad8cb3cc3026043def2525de3a} (100%) rename sql/hive/src/test/resources/golden/{dynamic_partiton-5-516a04c3833a10c0241ec00dd6474dee => dynamic_partition-5-c630dce438f3792e7fb0f523fbbb3e1e} (100%) rename sql/hive/src/test/resources/golden/{dynamic_partiton-6-b00f7cece45f474c6383b2a9346284ed => dynamic_partition-6-7abc9ec8a36cdc5e89e955265a7fd7cf} (100%) rename sql/hive/src/test/resources/golden/{dynamic_partiton-7-310dfcd4399a7d152dd76020fb41ecef => dynamic_partition-7-be33aaa7253c8f248ff3921cd7dae340} (100%) diff --git a/sql/hive/src/test/resources/golden/dynamic_partiton-0-310dfcd4399a7d152dd76020fb41ecef b/sql/hive/src/test/resources/golden/dynamic_partition-0-be33aaa7253c8f248ff3921cd7dae340 similarity index 100% rename from sql/hive/src/test/resources/golden/dynamic_partiton-0-310dfcd4399a7d152dd76020fb41ecef rename to sql/hive/src/test/resources/golden/dynamic_partition-0-be33aaa7253c8f248ff3921cd7dae340 diff --git a/sql/hive/src/test/resources/golden/dynamic_partiton-1-2bba07855af8c11899cc6b89f8c0ee02 b/sql/hive/src/test/resources/golden/dynamic_partition-1-640552dd462707563fd255a713f83b41 similarity index 100% rename from sql/hive/src/test/resources/golden/dynamic_partiton-1-2bba07855af8c11899cc6b89f8c0ee02 rename to sql/hive/src/test/resources/golden/dynamic_partition-1-640552dd462707563fd255a713f83b41 diff --git a/sql/hive/src/test/resources/golden/dynamic_partiton-2-16367c381d4b189b3640c92511244bfe b/sql/hive/src/test/resources/golden/dynamic_partition-2-36456c9d0d2e3ef72ab5ba9ba48e5493 similarity index 100% rename from sql/hive/src/test/resources/golden/dynamic_partiton-2-16367c381d4b189b3640c92511244bfe rename to sql/hive/src/test/resources/golden/dynamic_partition-2-36456c9d0d2e3ef72ab5ba9ba48e5493 diff --git a/sql/hive/src/test/resources/golden/dynamic_partiton-3-b855e84c1d159eb6fa5fbb8ca371d318 b/sql/hive/src/test/resources/golden/dynamic_partition-3-b7f7fa7ebf666f4fee27e149d8c6961f similarity index 100% rename from sql/hive/src/test/resources/golden/dynamic_partiton-3-b855e84c1d159eb6fa5fbb8ca371d318 rename to sql/hive/src/test/resources/golden/dynamic_partition-3-b7f7fa7ebf666f4fee27e149d8c6961f diff --git a/sql/hive/src/test/resources/golden/dynamic_partiton-4-ccc7d6efb0b13d5649ff98006e7ce182 b/sql/hive/src/test/resources/golden/dynamic_partition-4-8bdb71ad8cb3cc3026043def2525de3a similarity index 100% rename from sql/hive/src/test/resources/golden/dynamic_partiton-4-ccc7d6efb0b13d5649ff98006e7ce182 rename to sql/hive/src/test/resources/golden/dynamic_partition-4-8bdb71ad8cb3cc3026043def2525de3a diff --git a/sql/hive/src/test/resources/golden/dynamic_partiton-5-516a04c3833a10c0241ec00dd6474dee b/sql/hive/src/test/resources/golden/dynamic_partition-5-c630dce438f3792e7fb0f523fbbb3e1e similarity index 100% rename from sql/hive/src/test/resources/golden/dynamic_partiton-5-516a04c3833a10c0241ec00dd6474dee rename to sql/hive/src/test/resources/golden/dynamic_partition-5-c630dce438f3792e7fb0f523fbbb3e1e diff --git a/sql/hive/src/test/resources/golden/dynamic_partiton-6-b00f7cece45f474c6383b2a9346284ed b/sql/hive/src/test/resources/golden/dynamic_partition-6-7abc9ec8a36cdc5e89e955265a7fd7cf similarity index 100% rename from sql/hive/src/test/resources/golden/dynamic_partiton-6-b00f7cece45f474c6383b2a9346284ed rename to sql/hive/src/test/resources/golden/dynamic_partition-6-7abc9ec8a36cdc5e89e955265a7fd7cf diff --git a/sql/hive/src/test/resources/golden/dynamic_partiton-7-310dfcd4399a7d152dd76020fb41ecef b/sql/hive/src/test/resources/golden/dynamic_partition-7-be33aaa7253c8f248ff3921cd7dae340 similarity index 100% rename from sql/hive/src/test/resources/golden/dynamic_partiton-7-310dfcd4399a7d152dd76020fb41ecef rename to sql/hive/src/test/resources/golden/dynamic_partition-7-be33aaa7253c8f248ff3921cd7dae340 diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 59639e5d65bf2..129d8c58c2940 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -271,18 +271,6 @@ class HiveQuerySuite extends HiveComparisonTest { |insert overwrite table src_lv2 SELECT key, D.* lateral view explode(array(key+3, key+4)) D as CX """.stripMargin) - createQueryTest("dynamic_partiton", - """ - |drop table IF EXISTS dynamic_part_table; - |create table dynamic_part_table(intcol int) partitioned by (partcol1 int, partcol2 int); - |set hive.exec.dynamic.partition.mode=nonstrict; - |insert into table dynamic_part_table partition(partcol1, partcol2) select 1, 1, 1 from src where key=150; - |insert into table dynamic_part_table partition(partcol1, partcol2) select 1, NULL, 1 from src where key=150; - |insert into table dynamic_part_table partition(partcol1, partcol2) select 1, 1, NULL from src where key=150; - |insert into table dynamic_part_table partition(partcol1, partcol2) select 1, NULL, NULL from src where key=150; - |drop table IF EXISTS dynamic_part_table; - """.stripMargin) - createQueryTest("lateral view5", "FROM src SELECT explode(array(key+3, key+4))") @@ -580,6 +568,28 @@ class HiveQuerySuite extends HiveComparisonTest { case class LogEntry(filename: String, message: String) case class LogFile(name: String) + createQueryTest("dynamic_partition", + """ + |DROP TABLE IF EXISTS dynamic_part_table; + |CREATE TABLE dynamic_part_table(intcol INT) PARTITIONED BY (partcol1 INT, partcol2 INT); + | + |SET hive.exec.dynamic.partition.mode=nonstrict; + | + |INSERT INTO TABLE dynamic_part_table PARTITION(partcol1, partcol2) + |SELECT 1, 1, 1 FROM src WHERE key=150; + | + |INSERT INTO TABLE dynamic_part_table PARTITION(partcol1, partcol2) + |SELECT 1, NULL, 1 FROM src WHERE key=150; + | + |INSERT INTO TABLE dynamic_part_table PARTITION(partcol1, partcol2) + |SELECT 1, 1, NULL FROM src WHERE key=150; + | + |INSERT INTO TABLe dynamic_part_table PARTITION(partcol1, partcol2) + |SELECT 1, NULL, NULL FROM src WHERE key=150; + | + |DROP TABLE IF EXISTS dynamic_part_table; + """.stripMargin) + test("SPARK-3414 regression: should store analyzed logical plan when registering a temp table") { sparkContext.makeRDD(Seq.empty[LogEntry]).registerTempTable("rawLogs") sparkContext.makeRDD(Seq.empty[LogFile]).registerTempTable("logFiles") From c47470e4660058b06f9a7704bad0920e69b246fe Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 16 Sep 2014 22:13:36 -0700 Subject: [PATCH 21/28] Refactors InsertIntoHiveTable to a Command --- .../sql/hive/execution/InsertIntoHiveTable.scala | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index c88ae70063b4b..8e9f7e5aa7374 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.hive.execution import scala.collection.JavaConversions._ -import scala.collection.mutable import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar} import org.apache.hadoop.hive.conf.HiveConf @@ -31,14 +30,12 @@ import org.apache.hadoop.hive.serde2.Serializer import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption import org.apache.hadoop.hive.serde2.objectinspector._ import org.apache.hadoop.hive.serde2.objectinspector.primitive.{JavaHiveDecimalObjectInspector, JavaHiveVarcharObjectInspector} -import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf} -import org.apache.spark.SparkContext._ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.Row -import org.apache.spark.sql.execution.{SparkPlan, UnaryNode} +import org.apache.spark.sql.execution.{Command, SparkPlan, UnaryNode} import org.apache.spark.sql.hive._ import org.apache.spark.{SerializableWritable, SparkException, TaskContext} @@ -52,7 +49,7 @@ case class InsertIntoHiveTable( child: SparkPlan, overwrite: Boolean) (@transient sc: HiveContext) - extends UnaryNode { + extends UnaryNode with Command { @transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass @transient private lazy val hiveContext = new Context(sc.hiveconf) @@ -172,8 +169,6 @@ case class InsertIntoHiveTable( } } - override def execute() = result - /** * Inserts all the rows in the table into Hive. Row objects are properly serialized with the * `org.apache.hadoop.hive.serde2.SerDe` and the @@ -181,7 +176,7 @@ case class InsertIntoHiveTable( * * Note: this is run once and then kept to avoid double insertions. */ - private lazy val result: RDD[Row] = { + override protected[sql] lazy val sideEffectResult: Seq[Row] = { // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer // instances within the closure, since Serializer is not serializable while TableDesc is. val tableDesc = table.tableDesc @@ -293,6 +288,6 @@ case class InsertIntoHiveTable( // however for now we return an empty list to simplify compatibility checks with hive, which // does not return anything for insert operations. // TODO: implement hive compatibility as rules. - sc.sparkContext.makeRDD(Nil, 1) + Seq.empty[Row] } } From 922718165903bf12b3a2ee40241a60cb20d21886 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Tue, 16 Sep 2014 22:41:56 -0700 Subject: [PATCH 22/28] Minor refactoring --- .../spark/sql/hive/SparkHadoopWriter.scala | 9 +++-- .../hive/execution/InsertIntoHiveTable.scala | 33 +++++-------------- 2 files changed, 15 insertions(+), 27 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SparkHadoopWriter.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SparkHadoopWriter.scala index 6e07b51f49230..5fdca4fcdabe5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SparkHadoopWriter.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SparkHadoopWriter.scala @@ -24,6 +24,7 @@ import java.util.Date import scala.collection.mutable import org.apache.hadoop.fs.Path +import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.ql.exec.{FileSinkOperator, Utilities} import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat} import org.apache.hadoop.hive.ql.plan.FileSinkDesc @@ -159,11 +160,13 @@ private[hive] object SparkHiveWriterContainer { private[spark] class SparkHiveDynamicPartitionWriterContainer( @transient jobConf: JobConf, fileSinkConf: FileSinkDesc, - dynamicPartColNames: Array[String], - defaultPartName: String) + dynamicPartColNames: Array[String]) extends SparkHiveWriterContainer(jobConf, fileSinkConf) { - @transient var writers: mutable.HashMap[String, FileSinkOperator.RecordWriter] = _ + private val defaultPartName = jobConf.get( + ConfVars.DEFAULTPARTITIONNAME.varname, ConfVars.DEFAULTPARTITIONNAME.defaultVal) + + @transient private var writers: mutable.HashMap[String, FileSinkOperator.RecordWriter] = _ override def open(): Unit = { writers = mutable.HashMap.empty[String, FileSinkOperator.RecordWriter] diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 8e9f7e5aa7374..ea884dc4ffa24 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -103,17 +103,16 @@ case class InsertIntoHiveTable( valueClass: Class[_], fileSinkConf: FileSinkDesc, conf: SerializableWritable[JobConf], - isCompressed: Boolean, writerContainer: SparkHiveWriterContainer) { assert(valueClass != null, "Output value class not set") conf.value.setOutputValueClass(valueClass) - assert(fileSinkConf.getTableInfo.getOutputFileFormatClassName != null) - // Doesn't work in Scala 2.9 due to what may be a generics bug - // TODO: Should we uncomment this for Scala 2.10? - // conf.setOutputFormat(outputFormatClass) - conf.value.set( - "mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName) + val outputFileFormatClassName = fileSinkConf.getTableInfo.getOutputFileFormatClassName + assert(outputFileFormatClassName != null, "Output format class not set") + conf.value.set("mapred.output.format.class", outputFileFormatClassName) + + val isCompressed = conf.value.getBoolean( + ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal) if (isCompressed) { // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec", @@ -218,28 +217,14 @@ case class InsertIntoHiveTable( val jobConf = new JobConf(sc.hiveconf) val jobConfSer = new SerializableWritable(jobConf) - val defaultPartName = jobConf.get( - ConfVars.DEFAULTPARTITIONNAME.varname, ConfVars.DEFAULTPARTITIONNAME.defaultVal) val writerContainer = if (numDynamicPartitions > 0) { - new SparkHiveDynamicPartitionWriterContainer( - jobConf, - fileSinkConf, - partitionColumnNames.takeRight(numDynamicPartitions), - defaultPartName) + val dynamicPartColNames = partitionColumnNames.takeRight(numDynamicPartitions) + new SparkHiveDynamicPartitionWriterContainer(jobConf, fileSinkConf, dynamicPartColNames) } else { new SparkHiveWriterContainer(jobConf, fileSinkConf) } - val isCompressed = jobConf.getBoolean( - ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal) - - saveAsHiveFile( - child.execute(), - outputClass, - fileSinkConf, - jobConfSer, - isCompressed, - writerContainer) + saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer) val outputPath = FileOutputFormat.getOutputPath(jobConf) // Have to construct the format of dbname.tablename. From 26632c366ad6c8255b50d5e8a41bc23cddbd396b Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 17 Sep 2014 00:28:57 -0700 Subject: [PATCH 23/28] Adds more tests --- .../sql/hive/execution/HiveQuerySuite.scala | 37 ++++++++++++++++--- 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 129d8c58c2940..291a72cd16a18 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.execution import scala.util.Try +import org.apache.spark.SparkException import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ @@ -380,7 +381,7 @@ class HiveQuerySuite extends HiveComparisonTest { def isExplanation(result: SchemaRDD) = { val explanation = result.select('plan).collect().map { case Row(plan: String) => plan } - explanation.exists(_ == "== Physical Plan ==") + explanation.contains("== Physical Plan ==") } test("SPARK-1704: Explain commands as a SchemaRDD") { @@ -590,6 +591,30 @@ class HiveQuerySuite extends HiveComparisonTest { |DROP TABLE IF EXISTS dynamic_part_table; """.stripMargin) + test("Partition spec validation") { + sql("DROP TABLE IF EXISTS dp_test") + sql("CREATE TABLE dp_test(key INT, value STRING) PARTITIONED BY (dp INT, sp INT)") + sql("SET hive.exec.dynamic.partition.mode=strict") + + // Should throw when using strict dynamic partition mode without any static partition + intercept[SparkException] { + sql( + """INSERT INTO TABLE dp_test PARTITION(dp) + |SELECT key, value, key % 5 FROM src + """.stripMargin) + } + + sql("SET hive.exec.dynamic.partition.mode=nonstrict") + + // Should throw when a static partition appears after a dynamic partition + intercept[SparkException] { + sql( + """INSERT INTO TABLE dp_test PARTITION(dp, sp = 1) + |SELECT key, value, key % 5 FROM src + """.stripMargin) + } + } + test("SPARK-3414 regression: should store analyzed logical plan when registering a temp table") { sparkContext.makeRDD(Seq.empty[LogEntry]).registerTempTable("rawLogs") sparkContext.makeRDD(Seq.empty[LogFile]).registerTempTable("logFiles") @@ -647,27 +672,27 @@ class HiveQuerySuite extends HiveComparisonTest { assert(sql("SET").collect().size == 0) assertResult(Set(testKey -> testVal)) { - collectResults(hql(s"SET $testKey=$testVal")) + collectResults(sql(s"SET $testKey=$testVal")) } assert(hiveconf.get(testKey, "") == testVal) assertResult(Set(testKey -> testVal)) { - collectResults(hql("SET")) + collectResults(sql("SET")) } sql(s"SET ${testKey + testKey}=${testVal + testVal}") assert(hiveconf.get(testKey + testKey, "") == testVal + testVal) assertResult(Set(testKey -> testVal, (testKey + testKey) -> (testVal + testVal))) { - collectResults(hql("SET")) + collectResults(sql("SET")) } // "set key" assertResult(Set(testKey -> testVal)) { - collectResults(hql(s"SET $testKey")) + collectResults(sql(s"SET $testKey")) } assertResult(Set(nonexistentKey -> "")) { - collectResults(hql(s"SET $nonexistentKey")) + collectResults(sql(s"SET $nonexistentKey")) } // Assert that sql() should have the same effects as sql() by repeating the above using sql(). From 0eed349f5824ef3917af1e380bfb529f9875b0c1 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 17 Sep 2014 18:08:01 -0700 Subject: [PATCH 24/28] Addresses @yhuai's comments --- .../hive/execution/InsertIntoHiveTable.scala | 3 +- ...riter.scala => hiveWriterContainers.scala} | 58 ++++++++++--------- 2 files changed, 32 insertions(+), 29 deletions(-) rename sql/hive/src/main/scala/org/apache/spark/sql/hive/{SparkHadoopWriter.scala => hiveWriterContainers.scala} (90%) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index ea884dc4ffa24..3d2ee010696f6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -150,11 +150,11 @@ case class InsertIntoHiveTable( // around by taking a mod. We expect that no task will be attempted 2 billion times. val attemptNumber = (context.attemptId % Int.MaxValue).toInt writerContainer.executorSideSetup(context.stageId, context.partitionId, attemptNumber) - writerContainer.open() iterator.foreach { row => var i = 0 while (i < fieldOIs.length) { + // TODO (lian) avoid per row dynamic dispatching and pattern matching cost in `wrap` outputData(i) = wrap(row(i), fieldOIs(i)) i += 1 } @@ -164,7 +164,6 @@ case class InsertIntoHiveTable( } writerContainer.close() - writerContainer.commit() } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SparkHadoopWriter.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala similarity index 90% rename from sql/hive/src/main/scala/org/apache/spark/sql/hive/SparkHadoopWriter.scala rename to sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala index 5fdca4fcdabe5..a667188fa53bd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SparkHadoopWriter.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala @@ -71,20 +71,7 @@ private[hive] class SparkHiveWriterContainer( setIDs(jobId, splitId, attemptId) setConfParams() committer.setupTask(taskContext) - } - - /** - * Create a `HiveRecordWriter`. A relative dynamic partition path can be used to create a writer - * for writing data to a dynamic partition. - */ - def open() { - writer = HiveFileFormatUtils.getHiveRecordWriter( - conf.value, - fileSinkConf.getTableInfo, - conf.value.getOutputValueClass.asInstanceOf[Class[Writable]], - fileSinkConf, - FileOutputFormat.getTaskOutputPath(conf.value, getOutputName), - Reporter.NULL) + initWriters() } protected def getOutputName: String = { @@ -100,9 +87,26 @@ private[hive] class SparkHiveWriterContainer( def close() { // Seems the boolean value passed into close does not matter. writer.close(false) + commit() + } + + def commitJob() { + committer.commitJob(jobContext) } - def commit() { + protected def initWriters() { + // NOTE this method is executed at the executor side. + // For Hive tables without partitions or with only static partitions, only 1 writer is needed. + writer = HiveFileFormatUtils.getHiveRecordWriter( + conf.value, + fileSinkConf.getTableInfo, + conf.value.getOutputValueClass.asInstanceOf[Class[Writable]], + fileSinkConf, + FileOutputFormat.getTaskOutputPath(conf.value, getOutputName), + Reporter.NULL) + } + + protected def commit() { if (committer.needsTaskCommit(taskContext)) { try { committer.commitTask(taskContext) @@ -118,10 +122,6 @@ private[hive] class SparkHiveWriterContainer( } } - def commitJob() { - committer.commitJob(jobContext) - } - // ********* Private Functions ********* private def setIDs(jobId: Int, splitId: Int, attemptId: Int) { @@ -168,12 +168,15 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( @transient private var writers: mutable.HashMap[String, FileSinkOperator.RecordWriter] = _ - override def open(): Unit = { + override protected def initWriters(): Unit = { + // NOTE: This method is executed at the executor side. + // Actual writers are created for each dynamic partition on the fly. writers = mutable.HashMap.empty[String, FileSinkOperator.RecordWriter] } override def close(): Unit = { writers.values.foreach(_.close(false)) + commit() } override def getLocalFileWriter(row: Row): FileSinkOperator.RecordWriter = { @@ -185,13 +188,6 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( } .mkString - val path = { - val outputPath = FileOutputFormat.getOutputPath(conf.value) - assert(outputPath != null, "Undefined job output-path") - val workPath = new Path(outputPath, dynamicPartPath.stripPrefix("/")) - new Path(workPath, getOutputName) - } - def newWriter = { val newFileSinkDesc = new FileSinkDesc( fileSinkConf.getDirName + dynamicPartPath, @@ -199,6 +195,14 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( fileSinkConf.getCompressed) newFileSinkDesc.setCompressCodec(fileSinkConf.getCompressCodec) newFileSinkDesc.setCompressType(fileSinkConf.getCompressType) + + val path = { + val outputPath = FileOutputFormat.getOutputPath(conf.value) + assert(outputPath != null, "Undefined job output-path") + val workPath = new Path(outputPath, dynamicPartPath.stripPrefix("/")) + new Path(workPath, getOutputName) + } + HiveFileFormatUtils.getHiveRecordWriter( conf.value, fileSinkConf.getTableInfo, From 9c6eb2db8de06db0d800ebefb37fc016ecd4c88c Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Sun, 21 Sep 2014 21:49:31 -0700 Subject: [PATCH 25/28] Adds tests to verify dynamic partitioning folder layout --- .../sql/hive/execution/HiveQuerySuite.scala | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 291a72cd16a18..5d743a51b47c5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.hive.execution import scala.util.Try +import org.apache.hadoop.hive.conf.HiveConf.ConfVars + import org.apache.spark.SparkException import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.test.TestHive @@ -591,6 +593,45 @@ class HiveQuerySuite extends HiveComparisonTest { |DROP TABLE IF EXISTS dynamic_part_table; """.stripMargin) + test("Dynamic partition folder layout") { + sql("DROP TABLE IF EXISTS dynamic_part_table") + sql("CREATE TABLE dynamic_part_table(intcol INT) PARTITIONED BY (partcol1 INT, partcol2 INT)") + sql("SET hive.exec.dynamic.partition.mode=nonstrict") + + val data = Map( + Seq("1", "1") -> 1, + Seq("1", "NULL") -> 2, + Seq("NULL", "1") -> 3, + Seq("NULL", "NULL") -> 4) + + data.foreach { case (parts, value) => + sql( + s"""INSERT INTO TABLE dynamic_part_table PARTITION(partcol1, partcol2) + |SELECT $value, ${parts.mkString(", ")} FROM src WHERE key=150 + """.stripMargin) + + val partFolder = Seq("partcol1", "partcol2") + .zip(parts) + .map { case (k, v) => + if (v == "NULL") { + s"$k=${ConfVars.DEFAULTPARTITIONNAME.defaultVal}" + } else { + s"$k=$v" + } + } + .mkString("/") + + // Loads partition data to a temporary table to verify contents + val path = s"$warehousePath/dynamic_part_table/$partFolder/part-00000" + + sql("DROP TABLE IF EXISTS dp_verify") + sql("CREATE TABLE dp_verify(intcol INT)") + sql(s"LOAD DATA LOCAL INPATH '$path' INTO TABLE dp_verify") + + assert(sql("SELECT * FROM dp_verify").collect() === Array(Row(value))) + } + } + test("Partition spec validation") { sql("DROP TABLE IF EXISTS dp_test") sql("CREATE TABLE dp_test(key INT, value STRING) PARTITIONED BY (dp INT, sp INT)") From a132c800ecb785a0a5c36a969b824908eba2801c Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 1 Oct 2014 21:13:59 +0800 Subject: [PATCH 26/28] Fixes output compression --- .../hive/execution/InsertIntoHiveTable.scala | 25 +++++++++---------- .../spark/sql/hive/hiveWriterContainers.scala | 4 +-- 2 files changed, 14 insertions(+), 15 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 3d2ee010696f6..16a8c782acdfa 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -110,19 +110,6 @@ case class InsertIntoHiveTable( val outputFileFormatClassName = fileSinkConf.getTableInfo.getOutputFileFormatClassName assert(outputFileFormatClassName != null, "Output format class not set") conf.value.set("mapred.output.format.class", outputFileFormatClassName) - - val isCompressed = conf.value.getBoolean( - ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal) - - if (isCompressed) { - // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec", - // and "mapred.output.compression.type" have no impact on ORC because it uses table properties - // to store compression information. - conf.value.set("mapred.output.compress", "true") - fileSinkConf.setCompressed(true) - fileSinkConf.setCompressCodec(conf.value.get("mapred.output.compression.codec")) - fileSinkConf.setCompressType(conf.value.get("mapred.output.compression.type")) - } conf.value.setOutputCommitter(classOf[FileOutputCommitter]) FileOutputFormat.setOutputPath( @@ -181,6 +168,18 @@ case class InsertIntoHiveTable( val tableLocation = table.hiveQlTable.getDataLocation val tmpLocation = hiveContext.getExternalTmpFileURI(tableLocation) val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) + val isCompressed = sc.hiveconf.getBoolean( + ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal) + + if (isCompressed) { + // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec", + // and "mapred.output.compression.type" have no impact on ORC because it uses table properties + // to store compression information. + sc.hiveconf.set("mapred.output.compress", "true") + fileSinkConf.setCompressed(true) + fileSinkConf.setCompressCodec(sc.hiveconf.get("mapred.output.compression.codec")) + fileSinkConf.setCompressType(sc.hiveconf.get("mapred.output.compression.type")) + } val numDynamicPartitions = partition.values.count(_.isEmpty) val numStaticPartitions = partition.values.count(_.nonEmpty) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala index a667188fa53bd..ac5c7a8220296 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala @@ -183,8 +183,8 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( val dynamicPartPath = dynamicPartColNames .zip(row.takeRight(dynamicPartColNames.length)) .map { case (col, rawVal) => - val string = String.valueOf(rawVal) - s"/$col=${if (rawVal == null || string.isEmpty) defaultPartName else string}" + val string = if (rawVal == null) null else String.valueOf(rawVal) + s"/$col=${if (string == null || string.isEmpty) defaultPartName else string}" } .mkString From f471c4b7ddece2267a7223879fcdcd93d7326e70 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 1 Oct 2014 21:58:09 +0800 Subject: [PATCH 27/28] PreInsertionCasts should take table partitions into account --- .../scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 3 ++- .../org/apache/spark/sql/hive/execution/HiveQuerySuite.scala | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 9a0b9b46ac4ee..25766644cd1ed 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -142,7 +142,8 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with val childOutputDataTypes = child.output.map(_.dataType) // Only check attributes, not partitionKeys since they are always strings. // TODO: Fully support inserting into partitioned tables. - val tableOutputDataTypes = table.attributes.map(_.dataType) + val tableOutputDataTypes = + table.attributes.map(_.dataType) ++ table.partitionKeys.map(_.dataType) if (childOutputDataTypes == tableOutputDataTypes) { p diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 5d743a51b47c5..2e282a9ade40c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -167,7 +167,7 @@ class HiveQuerySuite extends HiveComparisonTest { createQueryTest("Cast Timestamp to Timestamp in UDF", """ - | SELECT DATEDIFF(CAST(value AS timestamp), CAST('2002-03-21 00:00:00' AS timestamp)) + | SELECT DATEDIFF(CAST(value AS timestamp), CAST('2002-03-21 00:00:00' AS timestamp)) | FROM src LIMIT 1 """.stripMargin) From 21935b66a1be14add4c047334115eeb81b95dae9 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Thu, 2 Oct 2014 00:53:07 +0800 Subject: [PATCH 28/28] Adds back deleted trailing space --- .../org/apache/spark/sql/hive/execution/HiveQuerySuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 2e282a9ade40c..5d743a51b47c5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -167,7 +167,7 @@ class HiveQuerySuite extends HiveComparisonTest { createQueryTest("Cast Timestamp to Timestamp in UDF", """ - | SELECT DATEDIFF(CAST(value AS timestamp), CAST('2002-03-21 00:00:00' AS timestamp)) + | SELECT DATEDIFF(CAST(value AS timestamp), CAST('2002-03-21 00:00:00' AS timestamp)) | FROM src LIMIT 1 """.stripMargin)