diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 35c51dec0bcf5..90de11182e605 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -31,6 +31,7 @@ private[spark] object SQLConf { val SHUFFLE_PARTITIONS = "spark.sql.shuffle.partitions" val CODEGEN_ENABLED = "spark.sql.codegen" val DIALECT = "spark.sql.dialect" + val PARQUET_BINARY_AS_STRING = "spark.sql.parquet.binaryAsString" object Deprecated { val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks" @@ -87,8 +88,7 @@ trait SQLConf { * * Defaults to false as this feature is currently experimental. */ - private[spark] def codegenEnabled: Boolean = - if (getConf(CODEGEN_ENABLED, "false") == "true") true else false + private[spark] def codegenEnabled: Boolean = getConf(CODEGEN_ENABLED, "false").toBoolean /** * Upper bound on the sizes (in bytes) of the tables qualified for the auto conversion to @@ -108,6 +108,12 @@ trait SQLConf { private[spark] def defaultSizeInBytes: Long = getConf(DEFAULT_SIZE_IN_BYTES, (autoBroadcastJoinThreshold + 1).toString).toLong + /** + * When set to true, we always treat byte arrays in Parquet files as strings. + */ + private[spark] def isParquetBinaryAsString: Boolean = + getConf(PARQUET_BINARY_AS_STRING, "false").toBoolean + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala index b3bae5db0edbc..053b2a154389c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala @@ -60,7 +60,11 @@ private[sql] case class ParquetRelation( .getSchema /** Attributes */ - override val output = ParquetTypesConverter.readSchemaFromFile(new Path(path), conf) + override val output = + ParquetTypesConverter.readSchemaFromFile( + new Path(path), + conf, + sqlContext.isParquetBinaryAsString) override def newInstance = ParquetRelation(path, conf, sqlContext).asInstanceOf[this.type] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala index 6d4ce32ac5bfa..6a657c20fe46c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala @@ -80,9 +80,10 @@ private[parquet] class RowReadSupport extends ReadSupport[Row] with Logging { } } // if both unavailable, fall back to deducing the schema from the given Parquet schema + // TODO: Why it can be null? if (schema == null) { log.debug("falling back to Parquet read schema") - schema = ParquetTypesConverter.convertToAttributes(parquetSchema) + schema = ParquetTypesConverter.convertToAttributes(parquetSchema, false) } log.debug(s"list of attributes that will be read: $schema") new RowRecordMaterializer(parquetSchema, schema) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala index 37091bcf73dd6..b0579f76da073 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala @@ -43,10 +43,13 @@ private[parquet] object ParquetTypesConverter extends Logging { def isPrimitiveType(ctype: DataType): Boolean = classOf[PrimitiveType] isAssignableFrom ctype.getClass - def toPrimitiveDataType(parquetType: ParquetPrimitiveType): DataType = + def toPrimitiveDataType( + parquetType: ParquetPrimitiveType, + binayAsString: Boolean): DataType = parquetType.getPrimitiveTypeName match { case ParquetPrimitiveTypeName.BINARY - if parquetType.getOriginalType == ParquetOriginalType.UTF8 => StringType + if (parquetType.getOriginalType == ParquetOriginalType.UTF8 || + binayAsString) => StringType case ParquetPrimitiveTypeName.BINARY => BinaryType case ParquetPrimitiveTypeName.BOOLEAN => BooleanType case ParquetPrimitiveTypeName.DOUBLE => DoubleType @@ -85,7 +88,7 @@ private[parquet] object ParquetTypesConverter extends Logging { * @param parquetType The type to convert. * @return The corresponding Catalyst type. */ - def toDataType(parquetType: ParquetType): DataType = { + def toDataType(parquetType: ParquetType, isBinaryAsString: Boolean): DataType = { def correspondsToMap(groupType: ParquetGroupType): Boolean = { if (groupType.getFieldCount != 1 || groupType.getFields.apply(0).isPrimitive) { false @@ -107,7 +110,7 @@ private[parquet] object ParquetTypesConverter extends Logging { } if (parquetType.isPrimitive) { - toPrimitiveDataType(parquetType.asPrimitiveType) + toPrimitiveDataType(parquetType.asPrimitiveType, isBinaryAsString) } else { val groupType = parquetType.asGroupType() parquetType.getOriginalType match { @@ -116,7 +119,7 @@ private[parquet] object ParquetTypesConverter extends Logging { case ParquetOriginalType.LIST => { // TODO: check enums! assert(groupType.getFieldCount == 1) val field = groupType.getFields.apply(0) - ArrayType(toDataType(field), containsNull = false) + ArrayType(toDataType(field, isBinaryAsString), containsNull = false) } case ParquetOriginalType.MAP => { assert( @@ -126,9 +129,9 @@ private[parquet] object ParquetTypesConverter extends Logging { assert( keyValueGroup.getFieldCount == 2, "Parquet Map type malformatted: nested group should have 2 (key, value) fields!") - val keyType = toDataType(keyValueGroup.getFields.apply(0)) + val keyType = toDataType(keyValueGroup.getFields.apply(0), isBinaryAsString) assert(keyValueGroup.getFields.apply(0).getRepetition == Repetition.REQUIRED) - val valueType = toDataType(keyValueGroup.getFields.apply(1)) + val valueType = toDataType(keyValueGroup.getFields.apply(1), isBinaryAsString) assert(keyValueGroup.getFields.apply(1).getRepetition == Repetition.REQUIRED) // TODO: set valueContainsNull explicitly instead of assuming valueContainsNull is true // at here. @@ -138,22 +141,22 @@ private[parquet] object ParquetTypesConverter extends Logging { // Note: the order of these checks is important! if (correspondsToMap(groupType)) { // MapType val keyValueGroup = groupType.getFields.apply(0).asGroupType() - val keyType = toDataType(keyValueGroup.getFields.apply(0)) + val keyType = toDataType(keyValueGroup.getFields.apply(0), isBinaryAsString) assert(keyValueGroup.getFields.apply(0).getRepetition == Repetition.REQUIRED) - val valueType = toDataType(keyValueGroup.getFields.apply(1)) + val valueType = toDataType(keyValueGroup.getFields.apply(1), isBinaryAsString) assert(keyValueGroup.getFields.apply(1).getRepetition == Repetition.REQUIRED) // TODO: set valueContainsNull explicitly instead of assuming valueContainsNull is true // at here. MapType(keyType, valueType) } else if (correspondsToArray(groupType)) { // ArrayType - val elementType = toDataType(groupType.getFields.apply(0)) + val elementType = toDataType(groupType.getFields.apply(0), isBinaryAsString) ArrayType(elementType, containsNull = false) } else { // everything else: StructType val fields = groupType .getFields .map(ptype => new StructField( ptype.getName, - toDataType(ptype), + toDataType(ptype, isBinaryAsString), ptype.getRepetition != Repetition.REQUIRED)) StructType(fields) } @@ -276,7 +279,7 @@ private[parquet] object ParquetTypesConverter extends Logging { } } - def convertToAttributes(parquetSchema: ParquetType): Seq[Attribute] = { + def convertToAttributes(parquetSchema: ParquetType, isBinaryAsString: Boolean): Seq[Attribute] = { parquetSchema .asGroupType() .getFields @@ -284,7 +287,7 @@ private[parquet] object ParquetTypesConverter extends Logging { field => new AttributeReference( field.getName, - toDataType(field), + toDataType(field, isBinaryAsString), field.getRepetition != Repetition.REQUIRED)()) } @@ -404,7 +407,10 @@ private[parquet] object ParquetTypesConverter extends Logging { * @param conf The Hadoop configuration to use. * @return A list of attributes that make up the schema. */ - def readSchemaFromFile(origPath: Path, conf: Option[Configuration]): Seq[Attribute] = { + def readSchemaFromFile( + origPath: Path, + conf: Option[Configuration], + isBinaryAsString: Boolean): Seq[Attribute] = { val keyValueMetadata: java.util.Map[String, String] = readMetaData(origPath, conf) .getFileMetaData @@ -413,7 +419,7 @@ private[parquet] object ParquetTypesConverter extends Logging { convertFromString(keyValueMetadata.get(RowReadSupport.SPARK_METADATA_KEY)) } else { val attributes = convertToAttributes( - readMetaData(origPath, conf).getFileMetaData.getSchema) + readMetaData(origPath, conf).getFileMetaData.getSchema, isBinaryAsString) log.info(s"Falling back to schema conversion from Parquet types; result: $attributes") attributes } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index 502f6702e394e..172dcd6aa0ee3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -21,8 +21,6 @@ import org.scalatest.{BeforeAndAfterAll, FunSuiteLike} import parquet.hadoop.ParquetFileWriter import parquet.hadoop.util.ContextUtil -import parquet.schema.MessageTypeParser - import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.mapreduce.Job @@ -33,7 +31,6 @@ import org.apache.spark.sql.catalyst.analysis.{Star, UnresolvedAttribute} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.types.{BooleanType, IntegerType} import org.apache.spark.sql.catalyst.util.getTempFilePath -import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.test.TestSQLContext import org.apache.spark.sql.test.TestSQLContext._ import org.apache.spark.util.Utils @@ -138,6 +135,57 @@ class ParquetQuerySuite extends QueryTest with FunSuiteLike with BeforeAndAfterA } } + test("Treat binary as string") { + val oldIsParquetBinaryAsString = TestSQLContext.isParquetBinaryAsString + + // Create the test file. + val file = getTempFilePath("parquet") + val path = file.toString + val range = (0 to 255) + val rowRDD = TestSQLContext.sparkContext.parallelize(range) + .map(i => org.apache.spark.sql.Row(i, s"val_$i".getBytes)) + // We need to ask Parquet to store the String column as a Binary column. + val schema = StructType( + StructField("c1", IntegerType, false) :: + StructField("c2", BinaryType, false) :: Nil) + val schemaRDD1 = applySchema(rowRDD, schema) + schemaRDD1.saveAsParquetFile(path) + val resultWithBinary = parquetFile(path).collect + range.foreach { + i => + assert(resultWithBinary(i).getInt(0) === i) + assert(resultWithBinary(i)(1) === s"val_$i".getBytes) + } + + TestSQLContext.setConf(SQLConf.PARQUET_BINARY_AS_STRING, "true") + // This ParquetRelation always use Parquet types to derive output. + val parquetRelation = new ParquetRelation( + path.toString, + Some(TestSQLContext.sparkContext.hadoopConfiguration), + TestSQLContext) { + override val output = + ParquetTypesConverter.convertToAttributes( + ParquetTypesConverter.readMetaData(new Path(path), conf).getFileMetaData.getSchema, + TestSQLContext.isParquetBinaryAsString) + } + val schemaRDD = new SchemaRDD(TestSQLContext, parquetRelation) + val resultWithString = schemaRDD.collect + range.foreach { + i => + assert(resultWithString(i).getInt(0) === i) + assert(resultWithString(i)(1) === s"val_$i") + } + + schemaRDD.registerTempTable("tmp") + checkAnswer( + sql("SELECT c1, c2 FROM tmp WHERE c2 = 'val_5' OR c2 = 'val_7'"), + (5, "val_5") :: + (7, "val_7") :: Nil) + + // Set it back. + TestSQLContext.setConf(SQLConf.PARQUET_BINARY_AS_STRING, oldIsParquetBinaryAsString.toString) + } + test("Read/Write All Types with non-primitive type") { val tempDir = getTempFilePath("parquetTest").getCanonicalPath val range = (0 to 255)