Skip to content

Commit

Permalink
Also lists suspicious non-leaf partition directories
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed Jun 23, 2015
1 parent a935eb8 commit 6b74dd8
Showing 1 changed file with 29 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ private[sql] object PartitioningUtils {
} else {
// This dataset is partitioned. We need to check whether all partitions have the same
// partition columns and resolve potential type conflicts.
val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues.map(_._2))
val resolvedPartitionValues = resolvePartitions(pathsWithPartitionValues)

// Creates the StructType which represents the partition columns.
val fields = {
Expand Down Expand Up @@ -181,24 +181,41 @@ private[sql] object PartitioningUtils {
* StringType
* }}}
*/
private[sql] def resolvePartitions(values: Seq[PartitionValues]): Seq[PartitionValues] = {
// Column names of all partitions must match
val distinctPartitionsColNames = values.map(_.columnNames).distinct

if (distinctPartitionsColNames.isEmpty) {
private[sql] def resolvePartitions(
pathsWithPartitionValues: Seq[(Path, PartitionValues)]): Seq[PartitionValues] = {
if (pathsWithPartitionValues.isEmpty) {
Seq.empty
} else {
assert(distinctPartitionsColNames.size == 1, {
val list = distinctPartitionsColNames.map(_.mkString(", ")).zipWithIndex.map {
val distinctPartColNames = pathsWithPartitionValues.map(_._2.columnNames).distinct

def listConflictingPartitionColumns: String = {
def groupByKey[K, V](seq: Seq[(K, V)]): Map[K, Iterable[V]] =
seq.groupBy { case (key, _) => key }.mapValues(_.map { case (_, value) => value })

val partColNamesToPaths = groupByKey(pathsWithPartitionValues.map {
case (path, partValues) => partValues.columnNames -> path
})

val distinctPartColLists = distinctPartColNames.map(_.mkString(", ")).zipWithIndex.map {
case (names, index) =>
s"\tPartition column name list #$index: $names"
s"Partition column name list #$index: $names"
}

s"Conflicting partition column names detected:\n${list.mkString("\n")}\n" +
"For partitioned table directories, data files should only live in leaf directories."
})
// Lists out those non-leaf partition directories that also contain files
val suspiciousPaths =
distinctPartColNames.sortBy(_.length).init.flatMap(partColNamesToPaths)

s"Conflicting partition column names detected:\n" +
distinctPartColLists.mkString("\n\t", "\n\t", "\n\n") +
"For partitioned table directories, data files should only live in leaf directories. " +
"Please check the following directories for unexpected files:\n" +
suspiciousPaths.mkString("\n\t", "\n\t", "\n")
}

assert(distinctPartColNames.size == 1, listConflictingPartitionColumns)

// Resolves possible type conflicts for each column
val values = pathsWithPartitionValues.map(_._2)
val columnCount = values.head.columnNames.size
val resolvedValues = (0 until columnCount).map { i =>
resolveTypeConflicts(values.map(_.literals(i)))
Expand Down

0 comments on commit 6b74dd8

Please sign in to comment.