Skip to content

Commit

Permalink
Removes duplicated partition discovery code in new Parquet
Browse files Browse the repository at this point in the history
  • Loading branch information
liancheng committed May 12, 2015
1 parent f18dec2 commit ca1805b
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 176 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ class SparkHadoopUtil extends Logging {
logDebug(text + " matched " + HADOOP_CONF_PATTERN)
val key = matched.substring(13, matched.length() - 1) // remove ${hadoopconf- .. }
val eval = Option[String](hadoopConf.get(key))
.map { value =>
.map { value =>
logDebug("Substituted " + matched + " with " + value)
text.replace(matched, value)
}
Expand Down
176 changes: 1 addition & 175 deletions sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ private[sql] case class ParquetRelation2(

if (partitionDirs.nonEmpty) {
// Parses names and values of partition columns, and infer their data types.
ParquetRelation2.parsePartitions(partitionDirs, defaultPartitionName)
PartitioningUtils.parsePartitions(partitionDirs, defaultPartitionName)
} else {
// No partition directories found, makes an empty specification
PartitionSpec(StructType(Seq.empty[StructField]), Seq.empty[Partition])
Expand Down Expand Up @@ -837,178 +837,4 @@ private[sql] object ParquetRelation2 extends Logging {
.filter(_.nullable)
StructType(parquetSchema ++ missingFields)
}


// TODO Data source implementations shouldn't touch Catalyst types (`Literal`).
// However, we are already using Catalyst expressions for partition pruning and predicate
// push-down here...
private[parquet] case class PartitionValues(columnNames: Seq[String], literals: Seq[Literal]) {
require(columnNames.size == literals.size)
}

/**
* Given a group of qualified paths, tries to parse them and returns a partition specification.
* For example, given:
* {{{
* hdfs://<host>:<port>/path/to/partition/a=1/b=hello/c=3.14
* hdfs://<host>:<port>/path/to/partition/a=2/b=world/c=6.28
* }}}
* it returns:
* {{{
* PartitionSpec(
* partitionColumns = StructType(
* StructField(name = "a", dataType = IntegerType, nullable = true),
* StructField(name = "b", dataType = StringType, nullable = true),
* StructField(name = "c", dataType = DoubleType, nullable = true)),
* partitions = Seq(
* Partition(
* values = Row(1, "hello", 3.14),
* path = "hdfs://<host>:<port>/path/to/partition/a=1/b=hello/c=3.14"),
* Partition(
* values = Row(2, "world", 6.28),
* path = "hdfs://<host>:<port>/path/to/partition/a=2/b=world/c=6.28")))
* }}}
*/
private[parquet] def parsePartitions(
paths: Seq[Path],
defaultPartitionName: String): PartitionSpec = {
val partitionValues = resolvePartitions(paths.map(parsePartition(_, defaultPartitionName)))
val fields = {
val (PartitionValues(columnNames, literals)) = partitionValues.head
columnNames.zip(literals).map { case (name, Literal(_, dataType)) =>
StructField(name, dataType, nullable = true)
}
}

val partitions = partitionValues.zip(paths).map {
case (PartitionValues(_, literals), path) =>
Partition(Row(literals.map(_.value): _*), path.toString)
}

PartitionSpec(StructType(fields), partitions)
}

/**
* Parses a single partition, returns column names and values of each partition column. For
* example, given:
* {{{
* path = hdfs://<host>:<port>/path/to/partition/a=42/b=hello/c=3.14
* }}}
* it returns:
* {{{
* PartitionValues(
* Seq("a", "b", "c"),
* Seq(
* Literal.create(42, IntegerType),
* Literal.create("hello", StringType),
* Literal.create(3.14, FloatType)))
* }}}
*/
private[parquet] def parsePartition(
path: Path,
defaultPartitionName: String): PartitionValues = {
val columns = ArrayBuffer.empty[(String, Literal)]
// Old Hadoop versions don't have `Path.isRoot`
var finished = path.getParent == null
var chopped = path

while (!finished) {
val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName)
maybeColumn.foreach(columns += _)
chopped = chopped.getParent
finished = maybeColumn.isEmpty || chopped.getParent == null
}

val (columnNames, values) = columns.reverse.unzip
PartitionValues(columnNames, values)
}

private def parsePartitionColumn(
columnSpec: String,
defaultPartitionName: String): Option[(String, Literal)] = {
val equalSignIndex = columnSpec.indexOf('=')
if (equalSignIndex == -1) {
None
} else {
val columnName = columnSpec.take(equalSignIndex)
assert(columnName.nonEmpty, s"Empty partition column name in '$columnSpec'")

val rawColumnValue = columnSpec.drop(equalSignIndex + 1)
assert(rawColumnValue.nonEmpty, s"Empty partition column value in '$columnSpec'")

val literal = inferPartitionColumnValue(rawColumnValue, defaultPartitionName)
Some(columnName -> literal)
}
}

/**
* Resolves possible type conflicts between partitions by up-casting "lower" types. The up-
* casting order is:
* {{{
* NullType ->
* IntegerType -> LongType ->
* FloatType -> DoubleType -> DecimalType.Unlimited ->
* StringType
* }}}
*/
private[parquet] def resolvePartitions(values: Seq[PartitionValues]): Seq[PartitionValues] = {
// Column names of all partitions must match
val distinctPartitionsColNames = values.map(_.columnNames).distinct
assert(distinctPartitionsColNames.size == 1, {
val list = distinctPartitionsColNames.mkString("\t", "\n", "")
s"Conflicting partition column names detected:\n$list"
})

// Resolves possible type conflicts for each column
val columnCount = values.head.columnNames.size
val resolvedValues = (0 until columnCount).map { i =>
resolveTypeConflicts(values.map(_.literals(i)))
}

// Fills resolved literals back to each partition
values.zipWithIndex.map { case (d, index) =>
d.copy(literals = resolvedValues.map(_(index)))
}
}

/**
* Converts a string to a `Literal` with automatic type inference. Currently only supports
* [[IntegerType]], [[LongType]], [[FloatType]], [[DoubleType]], [[DecimalType.Unlimited]], and
* [[StringType]].
*/
private[parquet] def inferPartitionColumnValue(
raw: String,
defaultPartitionName: String): Literal = {
// First tries integral types
Try(Literal.create(Integer.parseInt(raw), IntegerType))
.orElse(Try(Literal.create(JLong.parseLong(raw), LongType)))
// Then falls back to fractional types
.orElse(Try(Literal.create(JFloat.parseFloat(raw), FloatType)))
.orElse(Try(Literal.create(JDouble.parseDouble(raw), DoubleType)))
.orElse(Try(Literal.create(new JBigDecimal(raw), DecimalType.Unlimited)))
// Then falls back to string
.getOrElse {
if (raw == defaultPartitionName) Literal.create(null, NullType)
else Literal.create(raw, StringType)
}
}

private val upCastingOrder: Seq[DataType] =
Seq(NullType, IntegerType, LongType, FloatType, DoubleType, DecimalType.Unlimited, StringType)

/**
* Given a collection of [[Literal]]s, resolves possible type conflicts by up-casting "lower"
* types.
*/
private def resolveTypeConflicts(literals: Seq[Literal]): Seq[Literal] = {
val desiredType = {
val topType = literals.map(_.dataType).maxBy(upCastingOrder.indexOf(_))
// Falls back to string if all values of this column are null or empty string
if (topType == NullType) StringType else topType
}

literals.map { case l @ Literal(_, dataType) =>
Literal.create(Cast(l, desiredType).eval(), desiredType)
}
}
}

0 comments on commit ca1805b

Please sign in to comment.