Skip to content

Commit

Permalink
The initial support of adding a conf to treat binary columns stored i…
Browse files Browse the repository at this point in the history
…n Parquet as string columns.
  • Loading branch information
yhuai committed Aug 8, 2014
1 parent 4c51098 commit 5d436a1
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 17 deletions.
7 changes: 7 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ private[spark] object SQLConf {
val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions"
val CODEGEN_ENABLED = "spark.sql.codegen"
val DIALECT = "spark.sql.dialect"
val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString"

object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
Expand Down Expand Up @@ -104,6 +105,12 @@ trait SQLConf {
private[spark] def defaultSizeInBytes: Long =
getConf(DEFAULT_SIZE_IN_BYTES, (autoBroadcastJoinThreshold + 1).toString).toLong

/**
* When set to true, we always treat byte arrays in Parquet files as strings.
*/
private[spark] def isParquetBinaryAsString: Boolean =
if (getConf(PARQUET_BINARY_AS_STRING, "false") == "true") true else false

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,11 @@ private[sql] case class ParquetRelation(
.getSchema

/** Attributes */
override val output = ParquetTypesConverter.readSchemaFromFile(new Path(path), conf)
override val output =
ParquetTypesConverter.readSchemaFromFile(
new Path(path),
conf,
sqlContext.isParquetBinaryAsString)

override def newInstance = ParquetRelation(path, conf, sqlContext).asInstanceOf[this.type]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,10 @@ private[parquet] class RowReadSupport extends ReadSupport[Row] with Logging {
}
}
// if both unavailable, fall back to deducing the schema from the given Parquet schema
// TODO: Why it can be null?
if (schema == null) {
log.debug("falling back to Parquet read schema")
schema = ParquetTypesConverter.convertToAttributes(parquetSchema)
schema = ParquetTypesConverter.convertToAttributes(parquetSchema, false)
}
log.debug(s"list of attributes that will be read: $schema")
new RowRecordMaterializer(parquetSchema, schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,13 @@ private[parquet] object ParquetTypesConverter extends Logging {
def isPrimitiveType(ctype: DataType): Boolean =
classOf[PrimitiveType] isAssignableFrom ctype.getClass

def toPrimitiveDataType(parquetType: ParquetPrimitiveType): DataType =
def toPrimitiveDataType(
parquetType: ParquetPrimitiveType,
binayAsString: Boolean): DataType =
parquetType.getPrimitiveTypeName match {
case ParquetPrimitiveTypeName.BINARY
if parquetType.getOriginalType == ParquetOriginalType.UTF8 => StringType
if (parquetType.getOriginalType == ParquetOriginalType.UTF8 ||
binayAsString) => StringType
case ParquetPrimitiveTypeName.BINARY => BinaryType
case ParquetPrimitiveTypeName.BOOLEAN => BooleanType
case ParquetPrimitiveTypeName.DOUBLE => DoubleType
Expand Down Expand Up @@ -85,7 +88,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
* @param parquetType The type to convert.
* @return The corresponding Catalyst type.
*/
def toDataType(parquetType: ParquetType): DataType = {
def toDataType(parquetType: ParquetType, isBinaryAsString: Boolean): DataType = {
def correspondsToMap(groupType: ParquetGroupType): Boolean = {
if (groupType.getFieldCount != 1 || groupType.getFields.apply(0).isPrimitive) {
false
Expand All @@ -107,7 +110,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
}

if (parquetType.isPrimitive) {
toPrimitiveDataType(parquetType.asPrimitiveType)
toPrimitiveDataType(parquetType.asPrimitiveType, isBinaryAsString)
} else {
val groupType = parquetType.asGroupType()
parquetType.getOriginalType match {
Expand All @@ -116,7 +119,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
case ParquetOriginalType.LIST => { // TODO: check enums!
assert(groupType.getFieldCount == 1)
val field = groupType.getFields.apply(0)
ArrayType(toDataType(field), containsNull = false)
ArrayType(toDataType(field, isBinaryAsString), containsNull = false)
}
case ParquetOriginalType.MAP => {
assert(
Expand All @@ -126,9 +129,9 @@ private[parquet] object ParquetTypesConverter extends Logging {
assert(
keyValueGroup.getFieldCount == 2,
"Parquet Map type malformatted: nested group should have 2 (key, value) fields!")
val keyType = toDataType(keyValueGroup.getFields.apply(0))
val keyType = toDataType(keyValueGroup.getFields.apply(0), isBinaryAsString)
assert(keyValueGroup.getFields.apply(0).getRepetition == Repetition.REQUIRED)
val valueType = toDataType(keyValueGroup.getFields.apply(1))
val valueType = toDataType(keyValueGroup.getFields.apply(1), isBinaryAsString)
assert(keyValueGroup.getFields.apply(1).getRepetition == Repetition.REQUIRED)
// TODO: set valueContainsNull explicitly instead of assuming valueContainsNull is true
// at here.
Expand All @@ -138,22 +141,22 @@ private[parquet] object ParquetTypesConverter extends Logging {
// Note: the order of these checks is important!
if (correspondsToMap(groupType)) { // MapType
val keyValueGroup = groupType.getFields.apply(0).asGroupType()
val keyType = toDataType(keyValueGroup.getFields.apply(0))
val keyType = toDataType(keyValueGroup.getFields.apply(0), isBinaryAsString)
assert(keyValueGroup.getFields.apply(0).getRepetition == Repetition.REQUIRED)
val valueType = toDataType(keyValueGroup.getFields.apply(1))
val valueType = toDataType(keyValueGroup.getFields.apply(1), isBinaryAsString)
assert(keyValueGroup.getFields.apply(1).getRepetition == Repetition.REQUIRED)
// TODO: set valueContainsNull explicitly instead of assuming valueContainsNull is true
// at here.
MapType(keyType, valueType)
} else if (correspondsToArray(groupType)) { // ArrayType
val elementType = toDataType(groupType.getFields.apply(0))
val elementType = toDataType(groupType.getFields.apply(0), isBinaryAsString)
ArrayType(elementType, containsNull = false)
} else { // everything else: StructType
val fields = groupType
.getFields
.map(ptype => new StructField(
ptype.getName,
toDataType(ptype),
toDataType(ptype, isBinaryAsString),
ptype.getRepetition != Repetition.REQUIRED))
StructType(fields)
}
Expand Down Expand Up @@ -276,15 +279,15 @@ private[parquet] object ParquetTypesConverter extends Logging {
}
}

def convertToAttributes(parquetSchema: ParquetType): Seq[Attribute] = {
def convertToAttributes(parquetSchema: ParquetType, isBinaryAsString: Boolean): Seq[Attribute] = {
parquetSchema
.asGroupType()
.getFields
.map(
field =>
new AttributeReference(
field.getName,
toDataType(field),
toDataType(field, isBinaryAsString),
field.getRepetition != Repetition.REQUIRED)())
}

Expand Down Expand Up @@ -402,7 +405,10 @@ private[parquet] object ParquetTypesConverter extends Logging {
* @param conf The Hadoop configuration to use.
* @return A list of attributes that make up the schema.
*/
def readSchemaFromFile(origPath: Path, conf: Option[Configuration]): Seq[Attribute] = {
def readSchemaFromFile(
origPath: Path,
conf: Option[Configuration],
isBinaryAsString: Boolean): Seq[Attribute] = {
val keyValueMetadata: java.util.Map[String, String] =
readMetaData(origPath, conf)
.getFileMetaData
Expand All @@ -411,7 +417,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
convertFromString(keyValueMetadata.get(RowReadSupport.SPARK_METADATA_KEY))
} else {
val attributes = convertToAttributes(
readMetaData(origPath, conf).getFileMetaData.getSchema)
readMetaData(origPath, conf).getFileMetaData.getSchema, isBinaryAsString)
log.info(s"Falling back to schema conversion from Parquet types; result: $attributes")
attributes
}
Expand Down

0 comments on commit 5d436a1

Please sign in to comment.