Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3007][SQL] Fixes dynamic partitioning support for lower Hadoop versions #2663

Closed
wants to merge 1 commit into from

Conversation

liancheng
Copy link
Contributor

This is a follow up of #2226 and #2616 to fix Jenkins master SBT build failures for lower Hadoop versions (1.0.x and 2.0.x).

The root cause is the semantics difference of FileSystem.globStatus() between different versions of Hadoop, as illustrated by the following test code:

object GlobExperiments extends App {
  val conf = new Configuration()
  val fs = FileSystem.getLocal(conf)
  fs.globStatus(new Path("/tmp/wh/*/*/*")).foreach { status =>
    println(status.getPath)
  }
}

Target directory structure:

/tmp/wh
├── dir0
│   ├── dir1
│   │   └── level2
│   └── level1
└── level0

Hadoop 2.4.1 result:

file:/tmp/wh/dir0/dir1/level2

Hadoop 1.0.4 result:

file:/tmp/wh/dir0/dir1/level2
file:/tmp/wh/dir0/level1
file:/tmp/wh/level0

In #2226 and #2616, we call FileOutputCommitter.commitJob() at the end of the job, and the _SUCCESS mark file is written. When working with lower Hadoop versions, due to the globStatus() semantics issue, _SUCCESS is included as a separate partition data file by Hive.loadDynamicPartitions(), and fails partition spec checking. The fix introduced in this PR is kind of a hack: when inserting data with dynamic partitioning, we intentionally avoid writing the _SUCCESS marker to workaround this issue.

Hive doesn't suffer this issue because FileSinkOperator doesn't call FileOutputCommitter.commitJob(), instead, it calls Utilities.mvFileToFinalPath() to cleanup the output directory and then loads it into Hive warehouse by with loadDynamicPartitions()/loadPartition()/loadTable(). This approach is better because it handles failed job and speculative tasks properly. We should add this step to InsertIntoHiveTable in another PR.

@SparkQA
Copy link

SparkQA commented Oct 5, 2014

QA tests have started for PR 2663 at commit 0177dae.

  • This patch merges cleanly.

@liancheng
Copy link
Contributor Author

@yhuai Would you mind to leave some suggestions/comments? I reached the conclusion in the PR description by combing insertion and loading related code in Hive, wondering is there any better approaches?

@SparkQA
Copy link

SparkQA commented Oct 5, 2014

QA tests have finished for PR 2663 at commit 0177dae.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@marmbrus
Copy link
Contributor

marmbrus commented Oct 5, 2014

I'm going to merge this so we can fix Jenkins. If @yhuai has comments they can be addressed in a follow up.

@asfgit asfgit closed this in 1b97a94 Oct 5, 2014
@yhuai
Copy link
Contributor

yhuai commented Oct 5, 2014

I think it is good.

Just a note. For Hive, seems it also set the output committer to its NullOutputCommitter. It is done before a MR job is submitted.

@liancheng
Copy link
Contributor Author

@yhuai Thanks for pointing out the NullOutputCommitter part, that's the missing piece I was looking for :)

@liancheng liancheng deleted the dp-hadoop-1-fix branch October 6, 2014 09:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants