From 43d736316ac699559eb14a415e55f0005ffe6ffa Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Mon, 8 May 2017 12:16:00 +0900 Subject: [PATCH] [SPARK-12297][SQL] Hive compatibility for Parquet Timestamps ## What changes were proposed in this pull request? This change allows timestamps in parquet-based hive table to behave as a "floating time", without a timezone, as timestamps are for other file formats. If the storage timezone is the same as the session timezone, this conversion is a no-op. When data is read from a hive table, the table property is *always* respected. This allows spark to not change behavior when reading old data, but read newly written data correctly (whatever the source of the data is). Spark inherited the original behavior from Hive, but Hive is also updating behavior to use the same scheme in HIVE-12767 / HIVE-16231. The default for Spark remains unchanged; created tables do not include the new table property. This will only apply to hive tables; nothing is added to parquet metadata to indicate the timezone, so data that is read or written directly from parquet files will never have any conversions applied. ## How was this patch tested? Added a unit test which creates tables, reads and writes data, under a variety of permutations (different storage timezones, different session timezones, vectorized reading on and off). Author: Imran Rashid Closes #16781 from squito/SPARK-12297. --- .../sql/catalyst/catalog/interface.scala | 4 +- .../sql/catalyst/util/DateTimeUtils.scala | 5 + .../parquet/VectorizedColumnReader.java | 28 +- .../VectorizedParquetRecordReader.java | 6 +- .../spark/sql/execution/command/tables.scala | 8 +- .../parquet/ParquetFileFormat.scala | 2 + .../parquet/ParquetReadSupport.scala | 3 +- .../parquet/ParquetRecordMaterializer.scala | 9 +- .../parquet/ParquetRowConverter.scala | 53 ++- .../parquet/ParquetWriteSupport.scala | 25 +- .../spark/sql/hive/HiveExternalCatalog.scala | 11 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 12 +- .../hive/ParquetHiveCompatibilitySuite.scala | 379 +++++++++++++++++- 13 files changed, 516 insertions(+), 29 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index cc0cbba275b81..c39017ebbfe60 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -132,10 +132,10 @@ case class CatalogTablePartition( /** * Given the partition schema, returns a row with that schema holding the partition values. */ - def toRow(partitionSchema: StructType, defaultTimeZondId: String): InternalRow = { + def toRow(partitionSchema: StructType, defaultTimeZoneId: String): InternalRow = { val caseInsensitiveProperties = CaseInsensitiveMap(storage.properties) val timeZoneId = caseInsensitiveProperties.getOrElse( - DateTimeUtils.TIMEZONE_OPTION, defaultTimeZondId) + DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId) InternalRow.fromSeq(partitionSchema.map { field => val partValue = if (spec(field.name) == ExternalCatalogUtils.DEFAULT_PARTITION_NAME) { null diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index 6c1592fd8881d..bf596fa0a89db 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -498,6 +498,11 @@ object DateTimeUtils { false } + lazy val validTimezones = TimeZone.getAvailableIDs().toSet + def isValidTimezone(timezoneId: String): Boolean = { + validTimezones.contains(timezoneId) + } + /** * Returns the microseconds since year zero (-17999) from microseconds since epoch. */ diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index 9d641b528723a..dabbc2b6387e4 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -18,7 +18,9 @@ package org.apache.spark.sql.execution.datasources.parquet; import java.io.IOException; +import java.util.TimeZone; +import org.apache.hadoop.conf.Configuration; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.Dictionary; @@ -30,6 +32,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils; import org.apache.spark.sql.execution.vectorized.ColumnVector; +import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.DecimalType; @@ -90,11 +93,30 @@ public class VectorizedColumnReader { private final PageReader pageReader; private final ColumnDescriptor descriptor; + private final TimeZone storageTz; + private final TimeZone sessionTz; - public VectorizedColumnReader(ColumnDescriptor descriptor, PageReader pageReader) + public VectorizedColumnReader(ColumnDescriptor descriptor, PageReader pageReader, + Configuration conf) throws IOException { this.descriptor = descriptor; this.pageReader = pageReader; + // If the table has a timezone property, apply the correct conversions. See SPARK-12297. + // The conf is sometimes null in tests. + String sessionTzString = + conf == null ? null : conf.get(SQLConf.SESSION_LOCAL_TIMEZONE().key()); + if (sessionTzString == null || sessionTzString.isEmpty()) { + sessionTz = DateTimeUtils.defaultTimeZone(); + } else { + sessionTz = TimeZone.getTimeZone(sessionTzString); + } + String storageTzString = + conf == null ? null : conf.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY()); + if (storageTzString == null || storageTzString.isEmpty()) { + storageTz = sessionTz; + } else { + storageTz = TimeZone.getTimeZone(storageTzString); + } this.maxDefLevel = descriptor.getMaxDefinitionLevel(); DictionaryPage dictionaryPage = pageReader.readDictionaryPage(); @@ -289,7 +311,7 @@ private void decodeDictionaryIds(int rowId, int num, ColumnVector column, // TODO: Convert dictionary of Binaries to dictionary of Longs if (!column.isNullAt(i)) { Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); - column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v)); + column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v, sessionTz, storageTz)); } } } else { @@ -422,7 +444,7 @@ private void readBinaryBatch(int rowId, int num, ColumnVector column) throws IOE if (defColumn.readInteger() == maxDefLevel) { column.putLong(rowId + i, // Read 12 bytes for INT96 - ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12))); + ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12), sessionTz, storageTz)); } else { column.putNull(rowId + i); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java index 51bdf0f0f2291..d8974ddf24704 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java @@ -21,6 +21,7 @@ import java.util.Arrays; import java.util.List; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.parquet.column.ColumnDescriptor; @@ -95,6 +96,8 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa */ private boolean returnColumnarBatch; + private Configuration conf; + /** * The default config on whether columnarBatch should be offheap. */ @@ -107,6 +110,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException, UnsupportedOperationException { super.initialize(inputSplit, taskAttemptContext); + this.conf = taskAttemptContext.getConfiguration(); initializeInternal(); } @@ -277,7 +281,7 @@ private void checkEndOfRowGroup() throws IOException { for (int i = 0; i < columns.size(); ++i) { if (missingColumns[i]) continue; columnReaders[i] = new VectorizedColumnReader(columns.get(i), - pages.getPageReader(columns.get(i))); + pages.getPageReader(columns.get(i)), conf); } totalCountLoadedSoFar += pages.getRowCount(); } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index ebf03e1bf8869..5843c5b56d44c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -26,7 +26,6 @@ import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal import scala.util.Try -import org.apache.commons.lang3.StringEscapeUtils import org.apache.hadoop.fs.Path import org.apache.spark.sql.{AnalysisException, Row, SparkSession} @@ -37,7 +36,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTableType._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.util.quoteIdentifier -import org.apache.spark.sql.execution.datasources.{DataSource, FileFormat, PartitioningUtils} +import org.apache.spark.sql.execution.datasources.{DataSource, PartitioningUtils} import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat import org.apache.spark.sql.execution.datasources.json.JsonFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat @@ -74,6 +73,10 @@ case class CreateTableLikeCommand( sourceTableDesc.provider } + val properties = sourceTableDesc.properties.filter { case (k, _) => + k == ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY + } + // If the location is specified, we create an external table internally. // Otherwise create a managed table. val tblType = if (location.isEmpty) CatalogTableType.MANAGED else CatalogTableType.EXTERNAL @@ -86,6 +89,7 @@ case class CreateTableLikeCommand( locationUri = location.map(CatalogUtils.stringToURI(_))), schema = sourceTableDesc.schema, provider = newProvider, + properties = properties, partitionColumnNames = sourceTableDesc.partitionColumnNames, bucketSpec = sourceTableDesc.bucketSpec) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 2f3a2c62b912c..8113768cd793f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -632,4 +632,6 @@ object ParquetFileFormat extends Logging { Failure(cause) }.toOption } + + val PARQUET_TIMEZONE_TABLE_PROPERTY = "parquet.mr.int96.write.zone" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala index f1a35dd8a6200..bf395a0bef745 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala @@ -95,7 +95,8 @@ private[parquet] class ParquetReadSupport extends ReadSupport[UnsafeRow] with Lo new ParquetRecordMaterializer( parquetRequestedSchema, ParquetReadSupport.expandUDT(catalystRequestedSchema), - new ParquetSchemaConverter(conf)) + new ParquetSchemaConverter(conf), + conf) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala index 4e49a0dac97c0..df041996cdea9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources.parquet +import org.apache.hadoop.conf.Configuration import org.apache.parquet.io.api.{GroupConverter, RecordMaterializer} import org.apache.parquet.schema.MessageType @@ -29,13 +30,17 @@ import org.apache.spark.sql.types.StructType * @param parquetSchema Parquet schema of the records to be read * @param catalystSchema Catalyst schema of the rows to be constructed * @param schemaConverter A Parquet-Catalyst schema converter that helps initializing row converters + * @param hadoopConf hadoop Configuration for passing extra params for parquet conversion */ private[parquet] class ParquetRecordMaterializer( - parquetSchema: MessageType, catalystSchema: StructType, schemaConverter: ParquetSchemaConverter) + parquetSchema: MessageType, + catalystSchema: StructType, + schemaConverter: ParquetSchemaConverter, + hadoopConf: Configuration) extends RecordMaterializer[UnsafeRow] { private val rootConverter = - new ParquetRowConverter(schemaConverter, parquetSchema, catalystSchema, NoopUpdater) + new ParquetRowConverter(schemaConverter, parquetSchema, catalystSchema, hadoopConf, NoopUpdater) override def getCurrentRecord: UnsafeRow = rootConverter.currentRecord diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 32e6c60cd9766..d52ff62d93b26 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -19,10 +19,12 @@ package org.apache.spark.sql.execution.datasources.parquet import java.math.{BigDecimal, BigInteger} import java.nio.ByteOrder +import java.util.TimeZone import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer +import org.apache.hadoop.conf.Configuration import org.apache.parquet.column.Dictionary import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter} import org.apache.parquet.schema.{GroupType, MessageType, OriginalType, Type} @@ -34,6 +36,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData} import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -117,12 +120,14 @@ private[parquet] class ParquetPrimitiveConverter(val updater: ParentContainerUpd * @param parquetType Parquet schema of Parquet records * @param catalystType Spark SQL schema that corresponds to the Parquet record type. User-defined * types should have been expanded. + * @param hadoopConf a hadoop Configuration for passing any extra parameters for parquet conversion * @param updater An updater which propagates converted field values to the parent container */ private[parquet] class ParquetRowConverter( schemaConverter: ParquetSchemaConverter, parquetType: GroupType, catalystType: StructType, + hadoopConf: Configuration, updater: ParentContainerUpdater) extends ParquetGroupConverter(updater) with Logging { @@ -261,18 +266,18 @@ private[parquet] class ParquetRowConverter( case TimestampType => // TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that. + // If the table has a timezone property, apply the correct conversions. See SPARK-12297. + val sessionTzString = hadoopConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key) + val sessionTz = Option(sessionTzString).map(TimeZone.getTimeZone(_)) + .getOrElse(DateTimeUtils.defaultTimeZone()) + val storageTzString = hadoopConf.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY) + val storageTz = Option(storageTzString).map(TimeZone.getTimeZone(_)).getOrElse(sessionTz) new ParquetPrimitiveConverter(updater) { // Converts nanosecond timestamps stored as INT96 override def addBinary(value: Binary): Unit = { - assert( - value.length() == 12, - "Timestamps (with nanoseconds) are expected to be stored in 12-byte long binaries, " + - s"but got a ${value.length()}-byte binary.") - - val buf = value.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN) - val timeOfDayNanos = buf.getLong - val julianDay = buf.getInt - updater.setLong(DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos)) + val timestamp = ParquetRowConverter.binaryToSQLTimestamp(value, sessionTz = sessionTz, + storageTz = storageTz) + updater.setLong(timestamp) } } @@ -302,7 +307,7 @@ private[parquet] class ParquetRowConverter( case t: StructType => new ParquetRowConverter( - schemaConverter, parquetType.asGroupType(), t, new ParentContainerUpdater { + schemaConverter, parquetType.asGroupType(), t, hadoopConf, new ParentContainerUpdater { override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy()) }) @@ -651,6 +656,7 @@ private[parquet] class ParquetRowConverter( } private[parquet] object ParquetRowConverter { + def binaryToUnscaledLong(binary: Binary): Long = { // The underlying `ByteBuffer` implementation is guaranteed to be `HeapByteBuffer`, so here // we are using `Binary.toByteBuffer.array()` to steal the underlying byte array without @@ -673,12 +679,35 @@ private[parquet] object ParquetRowConverter { unscaled } - def binaryToSQLTimestamp(binary: Binary): SQLTimestamp = { + /** + * Converts an int96 to a SQLTimestamp, given both the storage timezone and the local timezone. + * The timestamp is really meant to be interpreted as a "floating time", but since we + * actually store it as micros since epoch, why we have to apply a conversion when timezones + * change. + * + * @param binary a parquet Binary which holds one int96 + * @param sessionTz the session timezone. This will be used to determine how to display the time, + * and compute functions on the timestamp which involve a timezone, eg. extract + * the hour. + * @param storageTz the timezone which was used to store the timestamp. This should come from the + * timestamp table property, or else assume its the same as the sessionTz + * @return a timestamp (millis since epoch) which will render correctly in the sessionTz + */ + def binaryToSQLTimestamp( + binary: Binary, + sessionTz: TimeZone, + storageTz: TimeZone): SQLTimestamp = { assert(binary.length() == 12, s"Timestamps (with nanoseconds) are expected to be stored in" + s" 12-byte long binaries. Found a ${binary.length()}-byte binary instead.") val buffer = binary.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN) val timeOfDayNanos = buffer.getLong val julianDay = buffer.getInt - DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos) + val utcEpochMicros = DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos) + // avoid expensive time logic if possible. + if (sessionTz.getID() != storageTz.getID()) { + DateTimeUtils.convertTz(utcEpochMicros, sessionTz, storageTz) + } else { + utcEpochMicros + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala index 38b0e33937f3c..679ed8e361b74 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.parquet import java.nio.{ByteBuffer, ByteOrder} import java.util +import java.util.TimeZone import scala.collection.JavaConverters.mapAsJavaMapConverter @@ -75,6 +76,9 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit // Reusable byte array used to write decimal values private val decimalBuffer = new Array[Byte](minBytesForPrecision(DecimalType.MAX_PRECISION)) + private var storageTz: TimeZone = _ + private var sessionTz: TimeZone = _ + override def init(configuration: Configuration): WriteContext = { val schemaString = configuration.get(ParquetWriteSupport.SPARK_ROW_SCHEMA) this.schema = StructType.fromString(schemaString) @@ -91,6 +95,19 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit this.rootFieldWriters = schema.map(_.dataType).map(makeWriter) + // If the table has a timezone property, apply the correct conversions. See SPARK-12297. + val sessionTzString = configuration.get(SQLConf.SESSION_LOCAL_TIMEZONE.key) + sessionTz = if (sessionTzString == null || sessionTzString == "") { + TimeZone.getDefault() + } else { + TimeZone.getTimeZone(sessionTzString) + } + val storageTzString = configuration.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY) + storageTz = if (storageTzString == null || storageTzString == "") { + sessionTz + } else { + TimeZone.getTimeZone(storageTzString) + } val messageType = new ParquetSchemaConverter(configuration).convert(schema) val metadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> schemaString).asJava @@ -178,7 +195,13 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit // NOTE: Starting from Spark 1.5, Spark SQL `TimestampType` only has microsecond // precision. Nanosecond parts of timestamp values read from INT96 are simply stripped. - val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(row.getLong(ordinal)) + val rawMicros = row.getLong(ordinal) + val adjustedMicros = if (sessionTz.getID() == storageTz.getID()) { + rawMicros + } else { + DateTimeUtils.convertTz(rawMicros, storageTz, sessionTz) + } + val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(adjustedMicros) val buf = ByteBuffer.wrap(timestampBuffer) buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay) recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer)) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index ba48facff2933..8fef467f5f5cb 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -39,9 +39,10 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.ColumnStat -import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.PartitioningUtils +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.hive.client.HiveClient import org.apache.spark.sql.internal.HiveSerDe import org.apache.spark.sql.internal.StaticSQLConf._ @@ -224,6 +225,14 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat throw new TableAlreadyExistsException(db = db, table = table) } + val tableTz = tableDefinition.properties.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY) + tableTz.foreach { tz => + if (!DateTimeUtils.isValidTimezone(tz)) { + throw new AnalysisException(s"Cannot set" + + s" ${ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY} to invalid timezone $tz") + } + } + if (tableDefinition.tableType == VIEW) { client.createTable(tableDefinition, ignoreIfExists) } else { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 6b98066cb76c8..e0b565c0d79a0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.internal.SQLConf.HiveCaseSensitiveInferenceMode._ import org.apache.spark.sql.types._ @@ -174,7 +175,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log // We don't support hive bucketed tables, only ones we write out. bucketSpec = None, fileFormat = fileFormat, - options = options)(sparkSession = sparkSession) + options = options ++ getStorageTzOptions(relation))(sparkSession = sparkSession) val created = LogicalRelation(fsRelation, updatedTable) tableRelationCache.put(tableIdentifier, created) created @@ -201,7 +202,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log userSpecifiedSchema = Option(dataSchema), // We don't support hive bucketed tables, only ones we write out. bucketSpec = None, - options = options, + options = options ++ getStorageTzOptions(relation), className = fileType).resolveRelation(), table = updatedTable) @@ -222,6 +223,13 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log result.copy(output = newOutput) } + private def getStorageTzOptions(relation: CatalogRelation): Map[String, String] = { + // We add the table timezone to the relation options, which automatically gets injected into the + // hadoopConf for the Parquet Converters + val storageTzKey = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY + relation.tableMeta.properties.get(storageTzKey).map(storageTzKey -> _).toMap + } + private def inferIfNeeded( relation: CatalogRelation, options: Map[String, String], diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala index 05b6059472f59..2bfd63d9b56e6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala @@ -17,12 +17,22 @@ package org.apache.spark.sql.hive +import java.io.File +import java.net.URLDecoder import java.sql.Timestamp +import java.util.TimeZone -import org.apache.spark.sql.Row -import org.apache.spark.sql.execution.datasources.parquet.ParquetCompatibilityTest +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.parquet.hadoop.ParquetFileReader +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName + +import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompatibilityTest, ParquetFileFormat} +import org.apache.spark.sql.functions._ import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{StringType, StructType, TimestampType} class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHiveSingleton { /** @@ -141,4 +151,369 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi Row(Seq(Row(1))), "ARRAY>") } + + val testTimezones = Seq( + "UTC" -> "UTC", + "LA" -> "America/Los_Angeles", + "Berlin" -> "Europe/Berlin" + ) + // Check creating parquet tables with timestamps, writing data into them, and reading it back out + // under a variety of conditions: + // * tables with explicit tz and those without + // * altering table properties directly + // * variety of timezones, local & non-local + val sessionTimezones = testTimezones.map(_._2).map(Some(_)) ++ Seq(None) + sessionTimezones.foreach { sessionTzOpt => + val sparkSession = spark.newSession() + sessionTzOpt.foreach { tz => sparkSession.conf.set(SQLConf.SESSION_LOCAL_TIMEZONE.key, tz) } + testCreateWriteRead(sparkSession, "no_tz", None, sessionTzOpt) + val localTz = TimeZone.getDefault.getID() + testCreateWriteRead(sparkSession, "local", Some(localTz), sessionTzOpt) + // check with a variety of timezones. The unit tests currently are configured to always use + // America/Los_Angeles, but even if they didn't, we'd be sure to cover a non-local timezone. + testTimezones.foreach { case (tableName, zone) => + if (zone != localTz) { + testCreateWriteRead(sparkSession, tableName, Some(zone), sessionTzOpt) + } + } + } + + private def testCreateWriteRead( + sparkSession: SparkSession, + baseTable: String, + explicitTz: Option[String], + sessionTzOpt: Option[String]): Unit = { + testCreateAlterTablesWithTimezone(sparkSession, baseTable, explicitTz, sessionTzOpt) + testWriteTablesWithTimezone(sparkSession, baseTable, explicitTz, sessionTzOpt) + testReadTablesWithTimezone(sparkSession, baseTable, explicitTz, sessionTzOpt) + } + + private def checkHasTz(spark: SparkSession, table: String, tz: Option[String]): Unit = { + val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(table)) + assert(tableMetadata.properties.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY) === tz) + } + + private def testCreateAlterTablesWithTimezone( + spark: SparkSession, + baseTable: String, + explicitTz: Option[String], + sessionTzOpt: Option[String]): Unit = { + test(s"SPARK-12297: Create and Alter Parquet tables and timezones; explicitTz = $explicitTz; " + + s"sessionTzOpt = $sessionTzOpt") { + val key = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY + withTable(baseTable, s"like_$baseTable", s"select_$baseTable", s"partitioned_$baseTable") { + // If we ever add a property to set the table timezone by default, defaultTz would change + val defaultTz = None + // check that created tables have correct TBLPROPERTIES + val tblProperties = explicitTz.map { + tz => s"""TBLPROPERTIES ($key="$tz")""" + }.getOrElse("") + spark.sql( + s"""CREATE TABLE $baseTable ( + | x int + | ) + | STORED AS PARQUET + | $tblProperties + """.stripMargin) + val expectedTableTz = explicitTz.orElse(defaultTz) + checkHasTz(spark, baseTable, expectedTableTz) + spark.sql( + s"""CREATE TABLE partitioned_$baseTable ( + | x int + | ) + | PARTITIONED BY (y int) + | STORED AS PARQUET + | $tblProperties + """.stripMargin) + checkHasTz(spark, s"partitioned_$baseTable", expectedTableTz) + spark.sql(s"CREATE TABLE like_$baseTable LIKE $baseTable") + checkHasTz(spark, s"like_$baseTable", expectedTableTz) + spark.sql( + s"""CREATE TABLE select_$baseTable + | STORED AS PARQUET + | AS + | SELECT * from $baseTable + """.stripMargin) + checkHasTz(spark, s"select_$baseTable", defaultTz) + + // check alter table, setting, unsetting, resetting the property + spark.sql( + s"""ALTER TABLE $baseTable SET TBLPROPERTIES ($key="America/Los_Angeles")""") + checkHasTz(spark, baseTable, Some("America/Los_Angeles")) + spark.sql(s"""ALTER TABLE $baseTable SET TBLPROPERTIES ($key="UTC")""") + checkHasTz(spark, baseTable, Some("UTC")) + spark.sql(s"""ALTER TABLE $baseTable UNSET TBLPROPERTIES ($key)""") + checkHasTz(spark, baseTable, None) + explicitTz.foreach { tz => + spark.sql(s"""ALTER TABLE $baseTable SET TBLPROPERTIES ($key="$tz")""") + checkHasTz(spark, baseTable, expectedTableTz) + } + } + } + } + + val desiredTimestampStrings = Seq( + "2015-12-31 22:49:59.123", + "2015-12-31 23:50:59.123", + "2016-01-01 00:39:59.123", + "2016-01-01 01:29:59.123" + ) + // We don't want to mess with timezones inside the tests themselves, since we use a shared + // spark context, and then we might be prone to issues from lazy vals for timezones. Instead, + // we manually adjust the timezone just to determine what the desired millis (since epoch, in utc) + // is for various "wall-clock" times in different timezones, and then we can compare against those + // in our tests. + val timestampTimezoneToMillis = { + val originalTz = TimeZone.getDefault + try { + desiredTimestampStrings.flatMap { timestampString => + Seq("America/Los_Angeles", "Europe/Berlin", "UTC").map { tzId => + TimeZone.setDefault(TimeZone.getTimeZone(tzId)) + val timestamp = Timestamp.valueOf(timestampString) + (timestampString, tzId) -> timestamp.getTime() + } + }.toMap + } finally { + TimeZone.setDefault(originalTz) + } + } + + private def createRawData(spark: SparkSession): Dataset[(String, Timestamp)] = { + import spark.implicits._ + val df = desiredTimestampStrings.toDF("display") + // this will get the millis corresponding to the display time given the current *session* + // timezone. + df.withColumn("ts", expr("cast(display as timestamp)")).as[(String, Timestamp)] + } + + private def testWriteTablesWithTimezone( + spark: SparkSession, + baseTable: String, + explicitTz: Option[String], + sessionTzOpt: Option[String]) : Unit = { + val key = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY + test(s"SPARK-12297: Write to Parquet tables with Timestamps; explicitTz = $explicitTz; " + + s"sessionTzOpt = $sessionTzOpt") { + + withTable(s"saveAsTable_$baseTable", s"insert_$baseTable", s"partitioned_ts_$baseTable") { + val sessionTzId = sessionTzOpt.getOrElse(TimeZone.getDefault().getID()) + // check that created tables have correct TBLPROPERTIES + val tblProperties = explicitTz.map { + tz => s"""TBLPROPERTIES ($key="$tz")""" + }.getOrElse("") + + val rawData = createRawData(spark) + // Check writing data out. + // We write data into our tables, and then check the raw parquet files to see whether + // the correct conversion was applied. + rawData.write.saveAsTable(s"saveAsTable_$baseTable") + checkHasTz(spark, s"saveAsTable_$baseTable", None) + spark.sql( + s"""CREATE TABLE insert_$baseTable ( + | display string, + | ts timestamp + | ) + | STORED AS PARQUET + | $tblProperties + """.stripMargin) + checkHasTz(spark, s"insert_$baseTable", explicitTz) + rawData.write.insertInto(s"insert_$baseTable") + // no matter what, roundtripping via the table should leave the data unchanged + val readFromTable = spark.table(s"insert_$baseTable").collect() + .map { row => (row.getAs[String](0), row.getAs[Timestamp](1)).toString() }.sorted + assert(readFromTable === rawData.collect().map(_.toString()).sorted) + + // Now we load the raw parquet data on disk, and check if it was adjusted correctly. + // Note that we only store the timezone in the table property, so when we read the + // data this way, we're bypassing all of the conversion logic, and reading the raw + // values in the parquet file. + val onDiskLocation = spark.sessionState.catalog + .getTableMetadata(TableIdentifier(s"insert_$baseTable")).location.getPath + // we test reading the data back with and without the vectorized reader, to make sure we + // haven't broken reading parquet from non-hive tables, with both readers. + Seq(false, true).foreach { vectorized => + spark.conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, vectorized) + val readFromDisk = spark.read.parquet(onDiskLocation).collect() + val storageTzId = explicitTz.getOrElse(sessionTzId) + readFromDisk.foreach { row => + val displayTime = row.getAs[String](0) + val millis = row.getAs[Timestamp](1).getTime() + val expectedMillis = timestampTimezoneToMillis((displayTime, storageTzId)) + assert(expectedMillis === millis, s"Display time '$displayTime' was stored " + + s"incorrectly with sessionTz = ${sessionTzOpt}; Got $millis, expected " + + s"$expectedMillis (delta = ${millis - expectedMillis})") + } + } + + // check tables partitioned by timestamps. We don't compare the "raw" data in this case, + // since they are adjusted even when we bypass the hive table. + rawData.write.partitionBy("ts").saveAsTable(s"partitioned_ts_$baseTable") + val partitionDiskLocation = spark.sessionState.catalog + .getTableMetadata(TableIdentifier(s"partitioned_ts_$baseTable")).location.getPath + // no matter what mix of timezones we use, the dirs should specify the value with the + // same time we use for display. + val parts = new File(partitionDiskLocation).list().collect { + case name if name.startsWith("ts=") => URLDecoder.decode(name.stripPrefix("ts=")) + }.toSet + assert(parts === desiredTimestampStrings.toSet) + } + } + } + + private def testReadTablesWithTimezone( + spark: SparkSession, + baseTable: String, + explicitTz: Option[String], + sessionTzOpt: Option[String]): Unit = { + val key = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY + test(s"SPARK-12297: Read from Parquet tables with Timestamps; explicitTz = $explicitTz; " + + s"sessionTzOpt = $sessionTzOpt") { + withTable(s"external_$baseTable", s"partitioned_$baseTable") { + // we intentionally save this data directly, without creating a table, so we can + // see that the data is read back differently depending on table properties. + // we'll save with adjusted millis, so that it should be the correct millis after reading + // back. + val rawData = createRawData(spark) + // to avoid closing over entire class + val timestampTimezoneToMillis = this.timestampTimezoneToMillis + import spark.implicits._ + val adjustedRawData = (explicitTz match { + case Some(tzId) => + rawData.map { case (displayTime, _) => + val storageMillis = timestampTimezoneToMillis((displayTime, tzId)) + (displayTime, new Timestamp(storageMillis)) + } + case _ => + rawData + }).withColumnRenamed("_1", "display").withColumnRenamed("_2", "ts") + withTempPath { basePath => + val unpartitionedPath = new File(basePath, "flat") + val partitionedPath = new File(basePath, "partitioned") + adjustedRawData.write.parquet(unpartitionedPath.getCanonicalPath) + val options = Map("path" -> unpartitionedPath.getCanonicalPath) ++ + explicitTz.map { tz => Map(key -> tz) }.getOrElse(Map()) + + spark.catalog.createTable( + tableName = s"external_$baseTable", + source = "parquet", + schema = new StructType().add("display", StringType).add("ts", TimestampType), + options = options + ) + + // also write out a partitioned table, to make sure we can access that correctly. + // add a column we can partition by (value doesn't particularly matter). + val partitionedData = adjustedRawData.withColumn("id", monotonicallyIncreasingId) + partitionedData.write.partitionBy("id") + .parquet(partitionedPath.getCanonicalPath) + // unfortunately, catalog.createTable() doesn't let us specify partitioning, so just use + // a "CREATE TABLE" stmt. + val tblOpts = explicitTz.map { tz => s"""TBLPROPERTIES ($key="$tz")""" }.getOrElse("") + spark.sql(s"""CREATE EXTERNAL TABLE partitioned_$baseTable ( + | display string, + | ts timestamp + |) + |PARTITIONED BY (id bigint) + |STORED AS parquet + |LOCATION 'file:${partitionedPath.getCanonicalPath}' + |$tblOpts + """.stripMargin) + spark.sql(s"msck repair table partitioned_$baseTable") + + for { + vectorized <- Seq(false, true) + partitioned <- Seq(false, true) + } { + withClue(s"vectorized = $vectorized; partitioned = $partitioned") { + spark.conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, vectorized) + val sessionTz = sessionTzOpt.getOrElse(TimeZone.getDefault().getID()) + val table = if (partitioned) s"partitioned_$baseTable" else s"external_$baseTable" + val query = s"select display, cast(ts as string) as ts_as_string, ts " + + s"from $table" + val collectedFromExternal = spark.sql(query).collect() + assert( collectedFromExternal.size === 4) + collectedFromExternal.foreach { row => + val displayTime = row.getAs[String](0) + // the timestamp should still display the same, despite the changes in timezones + assert(displayTime === row.getAs[String](1).toString()) + // we'll also check that the millis behind the timestamp has the appropriate + // adjustments. + val millis = row.getAs[Timestamp](2).getTime() + val expectedMillis = timestampTimezoneToMillis((displayTime, sessionTz)) + val delta = millis - expectedMillis + val deltaHours = delta / (1000L * 60 * 60) + assert(millis === expectedMillis, s"Display time '$displayTime' did not have " + + s"correct millis: was $millis, expected $expectedMillis; delta = $delta " + + s"($deltaHours hours)") + } + + // Now test that the behavior is still correct even with a filter which could get + // pushed down into parquet. We don't need extra handling for pushed down + // predicates because (a) in ParquetFilters, we ignore TimestampType and (b) parquet + // does not read statistics from int96 fields, as they are unsigned. See + // scalastyle:off line.size.limit + // https://github.com/apache/parquet-mr/blob/2fd62ee4d524c270764e9b91dca72e5cf1a005b7/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java#L419 + // https://github.com/apache/parquet-mr/blob/2fd62ee4d524c270764e9b91dca72e5cf1a005b7/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java#L348 + // scalastyle:on line.size.limit + // + // Just to be defensive in case anything ever changes in parquet, this test checks + // the assumption on column stats, and also the end-to-end behavior. + + val hadoopConf = sparkContext.hadoopConfiguration + val fs = FileSystem.get(hadoopConf) + val parts = if (partitioned) { + val subdirs = fs.listStatus(new Path(partitionedPath.getCanonicalPath)) + .filter(_.getPath().getName().startsWith("id=")) + fs.listStatus(subdirs.head.getPath()) + .filter(_.getPath().getName().endsWith(".parquet")) + } else { + fs.listStatus(new Path(unpartitionedPath.getCanonicalPath)) + .filter(_.getPath().getName().endsWith(".parquet")) + } + // grab the meta data from the parquet file. The next section of asserts just make + // sure the test is configured correctly. + assert(parts.size == 1) + val oneFooter = ParquetFileReader.readFooter(hadoopConf, parts.head.getPath) + assert(oneFooter.getFileMetaData.getSchema.getColumns.size === 2) + assert(oneFooter.getFileMetaData.getSchema.getColumns.get(1).getType() === + PrimitiveTypeName.INT96) + val oneBlockMeta = oneFooter.getBlocks().get(0) + val oneBlockColumnMeta = oneBlockMeta.getColumns().get(1) + val columnStats = oneBlockColumnMeta.getStatistics + // This is the important assert. Column stats are written, but they are ignored + // when the data is read back as mentioned above, b/c int96 is unsigned. This + // assert makes sure this holds even if we change parquet versions (if eg. there + // were ever statistics even on unsigned columns). + assert(columnStats.isEmpty) + + // These queries should return the entire dataset, but if the predicates were + // applied to the raw values in parquet, they would incorrectly filter data out. + Seq( + ">" -> "2015-12-31 22:00:00", + "<" -> "2016-01-01 02:00:00" + ).foreach { case (comparison, value) => + val query = + s"select ts from $table where ts $comparison '$value'" + val countWithFilter = spark.sql(query).count() + assert(countWithFilter === 4, query) + } + } + } + } + } + } + } + + test("SPARK-12297: exception on bad timezone") { + val key = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY + val badTzException = intercept[AnalysisException] { + spark.sql( + s"""CREATE TABLE bad_tz_table ( + | x int + | ) + | STORED AS PARQUET + | TBLPROPERTIES ($key="Blart Versenwald III") + """.stripMargin) + } + assert(badTzException.getMessage.contains("Blart Versenwald III")) + } }