diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index 5def549ac411a..31b07998d8630 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -298,7 +298,7 @@ class SparkHadoopUtil extends Logging { logDebug(text + " matched " + HADOOP_CONF_PATTERN) val key = matched.substring(13, matched.length() - 1) // remove ${hadoopconf- .. } val eval = Option[String](hadoopConf.get(key)) - .map { value => + .map { value => logDebug("Substituted " + matched + " with " + value) text.replace(matched, value) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 96a39416925b5..ee4b1c72a2148 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -303,7 +303,7 @@ private[sql] case class ParquetRelation2( if (partitionDirs.nonEmpty) { // Parses names and values of partition columns, and infer their data types. - ParquetRelation2.parsePartitions(partitionDirs, defaultPartitionName) + PartitioningUtils.parsePartitions(partitionDirs, defaultPartitionName) } else { // No partition directories found, makes an empty specification PartitionSpec(StructType(Seq.empty[StructField]), Seq.empty[Partition]) @@ -837,178 +837,4 @@ private[sql] object ParquetRelation2 extends Logging { .filter(_.nullable) StructType(parquetSchema ++ missingFields) } - - - // TODO Data source implementations shouldn't touch Catalyst types (`Literal`). - // However, we are already using Catalyst expressions for partition pruning and predicate - // push-down here... - private[parquet] case class PartitionValues(columnNames: Seq[String], literals: Seq[Literal]) { - require(columnNames.size == literals.size) - } - - /** - * Given a group of qualified paths, tries to parse them and returns a partition specification. - * For example, given: - * {{{ - * hdfs://:/path/to/partition/a=1/b=hello/c=3.14 - * hdfs://:/path/to/partition/a=2/b=world/c=6.28 - * }}} - * it returns: - * {{{ - * PartitionSpec( - * partitionColumns = StructType( - * StructField(name = "a", dataType = IntegerType, nullable = true), - * StructField(name = "b", dataType = StringType, nullable = true), - * StructField(name = "c", dataType = DoubleType, nullable = true)), - * partitions = Seq( - * Partition( - * values = Row(1, "hello", 3.14), - * path = "hdfs://:/path/to/partition/a=1/b=hello/c=3.14"), - * Partition( - * values = Row(2, "world", 6.28), - * path = "hdfs://:/path/to/partition/a=2/b=world/c=6.28"))) - * }}} - */ - private[parquet] def parsePartitions( - paths: Seq[Path], - defaultPartitionName: String): PartitionSpec = { - val partitionValues = resolvePartitions(paths.map(parsePartition(_, defaultPartitionName))) - val fields = { - val (PartitionValues(columnNames, literals)) = partitionValues.head - columnNames.zip(literals).map { case (name, Literal(_, dataType)) => - StructField(name, dataType, nullable = true) - } - } - - val partitions = partitionValues.zip(paths).map { - case (PartitionValues(_, literals), path) => - Partition(Row(literals.map(_.value): _*), path.toString) - } - - PartitionSpec(StructType(fields), partitions) - } - - /** - * Parses a single partition, returns column names and values of each partition column. For - * example, given: - * {{{ - * path = hdfs://:/path/to/partition/a=42/b=hello/c=3.14 - * }}} - * it returns: - * {{{ - * PartitionValues( - * Seq("a", "b", "c"), - * Seq( - * Literal.create(42, IntegerType), - * Literal.create("hello", StringType), - * Literal.create(3.14, FloatType))) - * }}} - */ - private[parquet] def parsePartition( - path: Path, - defaultPartitionName: String): PartitionValues = { - val columns = ArrayBuffer.empty[(String, Literal)] - // Old Hadoop versions don't have `Path.isRoot` - var finished = path.getParent == null - var chopped = path - - while (!finished) { - val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName) - maybeColumn.foreach(columns += _) - chopped = chopped.getParent - finished = maybeColumn.isEmpty || chopped.getParent == null - } - - val (columnNames, values) = columns.reverse.unzip - PartitionValues(columnNames, values) - } - - private def parsePartitionColumn( - columnSpec: String, - defaultPartitionName: String): Option[(String, Literal)] = { - val equalSignIndex = columnSpec.indexOf('=') - if (equalSignIndex == -1) { - None - } else { - val columnName = columnSpec.take(equalSignIndex) - assert(columnName.nonEmpty, s"Empty partition column name in '$columnSpec'") - - val rawColumnValue = columnSpec.drop(equalSignIndex + 1) - assert(rawColumnValue.nonEmpty, s"Empty partition column value in '$columnSpec'") - - val literal = inferPartitionColumnValue(rawColumnValue, defaultPartitionName) - Some(columnName -> literal) - } - } - - /** - * Resolves possible type conflicts between partitions by up-casting "lower" types. The up- - * casting order is: - * {{{ - * NullType -> - * IntegerType -> LongType -> - * FloatType -> DoubleType -> DecimalType.Unlimited -> - * StringType - * }}} - */ - private[parquet] def resolvePartitions(values: Seq[PartitionValues]): Seq[PartitionValues] = { - // Column names of all partitions must match - val distinctPartitionsColNames = values.map(_.columnNames).distinct - assert(distinctPartitionsColNames.size == 1, { - val list = distinctPartitionsColNames.mkString("\t", "\n", "") - s"Conflicting partition column names detected:\n$list" - }) - - // Resolves possible type conflicts for each column - val columnCount = values.head.columnNames.size - val resolvedValues = (0 until columnCount).map { i => - resolveTypeConflicts(values.map(_.literals(i))) - } - - // Fills resolved literals back to each partition - values.zipWithIndex.map { case (d, index) => - d.copy(literals = resolvedValues.map(_(index))) - } - } - - /** - * Converts a string to a `Literal` with automatic type inference. Currently only supports - * [[IntegerType]], [[LongType]], [[FloatType]], [[DoubleType]], [[DecimalType.Unlimited]], and - * [[StringType]]. - */ - private[parquet] def inferPartitionColumnValue( - raw: String, - defaultPartitionName: String): Literal = { - // First tries integral types - Try(Literal.create(Integer.parseInt(raw), IntegerType)) - .orElse(Try(Literal.create(JLong.parseLong(raw), LongType))) - // Then falls back to fractional types - .orElse(Try(Literal.create(JFloat.parseFloat(raw), FloatType))) - .orElse(Try(Literal.create(JDouble.parseDouble(raw), DoubleType))) - .orElse(Try(Literal.create(new JBigDecimal(raw), DecimalType.Unlimited))) - // Then falls back to string - .getOrElse { - if (raw == defaultPartitionName) Literal.create(null, NullType) - else Literal.create(raw, StringType) - } - } - - private val upCastingOrder: Seq[DataType] = - Seq(NullType, IntegerType, LongType, FloatType, DoubleType, DecimalType.Unlimited, StringType) - - /** - * Given a collection of [[Literal]]s, resolves possible type conflicts by up-casting "lower" - * types. - */ - private def resolveTypeConflicts(literals: Seq[Literal]): Seq[Literal] = { - val desiredType = { - val topType = literals.map(_.dataType).maxBy(upCastingOrder.indexOf(_)) - // Falls back to string if all values of this column are null or empty string - if (topType == NullType) StringType else topType - } - - literals.map { case l @ Literal(_, dataType) => - Literal.create(Cast(l, desiredType).eval(), desiredType) - } - } }