From 0eed349f5824ef3917af1e380bfb529f9875b0c1 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 17 Sep 2014 18:08:01 -0700 Subject: [PATCH] Addresses @yhuai's comments --- .../hive/execution/InsertIntoHiveTable.scala | 3 +- ...riter.scala => hiveWriterContainers.scala} | 58 ++++++++++--------- 2 files changed, 32 insertions(+), 29 deletions(-) rename sql/hive/src/main/scala/org/apache/spark/sql/hive/{SparkHadoopWriter.scala => hiveWriterContainers.scala} (90%) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index ea884dc4ffa24..3d2ee010696f6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -150,11 +150,11 @@ case class InsertIntoHiveTable( // around by taking a mod. We expect that no task will be attempted 2 billion times. val attemptNumber = (context.attemptId % Int.MaxValue).toInt writerContainer.executorSideSetup(context.stageId, context.partitionId, attemptNumber) - writerContainer.open() iterator.foreach { row => var i = 0 while (i < fieldOIs.length) { + // TODO (lian) avoid per row dynamic dispatching and pattern matching cost in `wrap` outputData(i) = wrap(row(i), fieldOIs(i)) i += 1 } @@ -164,7 +164,6 @@ case class InsertIntoHiveTable( } writerContainer.close() - writerContainer.commit() } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SparkHadoopWriter.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala similarity index 90% rename from sql/hive/src/main/scala/org/apache/spark/sql/hive/SparkHadoopWriter.scala rename to sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala index 5fdca4fcdabe5..a667188fa53bd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SparkHadoopWriter.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala @@ -71,20 +71,7 @@ private[hive] class SparkHiveWriterContainer( setIDs(jobId, splitId, attemptId) setConfParams() committer.setupTask(taskContext) - } - - /** - * Create a `HiveRecordWriter`. A relative dynamic partition path can be used to create a writer - * for writing data to a dynamic partition. - */ - def open() { - writer = HiveFileFormatUtils.getHiveRecordWriter( - conf.value, - fileSinkConf.getTableInfo, - conf.value.getOutputValueClass.asInstanceOf[Class[Writable]], - fileSinkConf, - FileOutputFormat.getTaskOutputPath(conf.value, getOutputName), - Reporter.NULL) + initWriters() } protected def getOutputName: String = { @@ -100,9 +87,26 @@ private[hive] class SparkHiveWriterContainer( def close() { // Seems the boolean value passed into close does not matter. writer.close(false) + commit() + } + + def commitJob() { + committer.commitJob(jobContext) } - def commit() { + protected def initWriters() { + // NOTE this method is executed at the executor side. + // For Hive tables without partitions or with only static partitions, only 1 writer is needed. + writer = HiveFileFormatUtils.getHiveRecordWriter( + conf.value, + fileSinkConf.getTableInfo, + conf.value.getOutputValueClass.asInstanceOf[Class[Writable]], + fileSinkConf, + FileOutputFormat.getTaskOutputPath(conf.value, getOutputName), + Reporter.NULL) + } + + protected def commit() { if (committer.needsTaskCommit(taskContext)) { try { committer.commitTask(taskContext) @@ -118,10 +122,6 @@ private[hive] class SparkHiveWriterContainer( } } - def commitJob() { - committer.commitJob(jobContext) - } - // ********* Private Functions ********* private def setIDs(jobId: Int, splitId: Int, attemptId: Int) { @@ -168,12 +168,15 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( @transient private var writers: mutable.HashMap[String, FileSinkOperator.RecordWriter] = _ - override def open(): Unit = { + override protected def initWriters(): Unit = { + // NOTE: This method is executed at the executor side. + // Actual writers are created for each dynamic partition on the fly. writers = mutable.HashMap.empty[String, FileSinkOperator.RecordWriter] } override def close(): Unit = { writers.values.foreach(_.close(false)) + commit() } override def getLocalFileWriter(row: Row): FileSinkOperator.RecordWriter = { @@ -185,13 +188,6 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( } .mkString - val path = { - val outputPath = FileOutputFormat.getOutputPath(conf.value) - assert(outputPath != null, "Undefined job output-path") - val workPath = new Path(outputPath, dynamicPartPath.stripPrefix("/")) - new Path(workPath, getOutputName) - } - def newWriter = { val newFileSinkDesc = new FileSinkDesc( fileSinkConf.getDirName + dynamicPartPath, @@ -199,6 +195,14 @@ private[spark] class SparkHiveDynamicPartitionWriterContainer( fileSinkConf.getCompressed) newFileSinkDesc.setCompressCodec(fileSinkConf.getCompressCodec) newFileSinkDesc.setCompressType(fileSinkConf.getCompressType) + + val path = { + val outputPath = FileOutputFormat.getOutputPath(conf.value) + assert(outputPath != null, "Undefined job output-path") + val workPath = new Path(outputPath, dynamicPartPath.stripPrefix("/")) + new Path(workPath, getOutputName) + } + HiveFileFormatUtils.getHiveRecordWriter( conf.value, fileSinkConf.getTableInfo,