diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index cc0cbba275b81..c39017ebbfe60 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -132,10 +132,10 @@ case class CatalogTablePartition( /** * Given the partition schema, returns a row with that schema holding the partition values. */ - def toRow(partitionSchema: StructType, defaultTimeZondId: String): InternalRow = { + def toRow(partitionSchema: StructType, defaultTimeZoneId: String): InternalRow = { val caseInsensitiveProperties = CaseInsensitiveMap(storage.properties) val timeZoneId = caseInsensitiveProperties.getOrElse( - DateTimeUtils.TIMEZONE_OPTION, defaultTimeZondId) + DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId) InternalRow.fromSeq(partitionSchema.map { field => val partValue = if (spec(field.name) == ExternalCatalogUtils.DEFAULT_PARTITION_NAME) { null diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala index eb6aad5b2d2bb..6d1732cf0f6b8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala @@ -498,6 +498,11 @@ object DateTimeUtils { false } + lazy val validTimezones = TimeZone.getAvailableIDs().toSet + def isValidTimezone(timezoneId: String): Boolean = { + validTimezones.contains(timezoneId) + } + /** * Returns the microseconds since year zero (-17999) from microseconds since epoch. */ diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java index 9d641b528723a..dabbc2b6387e4 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java @@ -18,7 +18,9 @@ package org.apache.spark.sql.execution.datasources.parquet; import java.io.IOException; +import java.util.TimeZone; +import org.apache.hadoop.conf.Configuration; import org.apache.parquet.bytes.BytesUtils; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.Dictionary; @@ -30,6 +32,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils; import org.apache.spark.sql.execution.vectorized.ColumnVector; +import org.apache.spark.sql.internal.SQLConf; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.DecimalType; @@ -90,11 +93,30 @@ public class VectorizedColumnReader { private final PageReader pageReader; private final ColumnDescriptor descriptor; + private final TimeZone storageTz; + private final TimeZone sessionTz; - public VectorizedColumnReader(ColumnDescriptor descriptor, PageReader pageReader) + public VectorizedColumnReader(ColumnDescriptor descriptor, PageReader pageReader, + Configuration conf) throws IOException { this.descriptor = descriptor; this.pageReader = pageReader; + // If the table has a timezone property, apply the correct conversions. See SPARK-12297. + // The conf is sometimes null in tests. + String sessionTzString = + conf == null ? null : conf.get(SQLConf.SESSION_LOCAL_TIMEZONE().key()); + if (sessionTzString == null || sessionTzString.isEmpty()) { + sessionTz = DateTimeUtils.defaultTimeZone(); + } else { + sessionTz = TimeZone.getTimeZone(sessionTzString); + } + String storageTzString = + conf == null ? null : conf.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY()); + if (storageTzString == null || storageTzString.isEmpty()) { + storageTz = sessionTz; + } else { + storageTz = TimeZone.getTimeZone(storageTzString); + } this.maxDefLevel = descriptor.getMaxDefinitionLevel(); DictionaryPage dictionaryPage = pageReader.readDictionaryPage(); @@ -289,7 +311,7 @@ private void decodeDictionaryIds(int rowId, int num, ColumnVector column, // TODO: Convert dictionary of Binaries to dictionary of Longs if (!column.isNullAt(i)) { Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i)); - column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v)); + column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v, sessionTz, storageTz)); } } } else { @@ -422,7 +444,7 @@ private void readBinaryBatch(int rowId, int num, ColumnVector column) throws IOE if (defColumn.readInteger() == maxDefLevel) { column.putLong(rowId + i, // Read 12 bytes for INT96 - ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12))); + ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12), sessionTz, storageTz)); } else { column.putNull(rowId + i); } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java index 51bdf0f0f2291..d8974ddf24704 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java @@ -21,6 +21,7 @@ import java.util.Arrays; import java.util.List; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.parquet.column.ColumnDescriptor; @@ -95,6 +96,8 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa */ private boolean returnColumnarBatch; + private Configuration conf; + /** * The default config on whether columnarBatch should be offheap. */ @@ -107,6 +110,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException, UnsupportedOperationException { super.initialize(inputSplit, taskAttemptContext); + this.conf = taskAttemptContext.getConfiguration(); initializeInternal(); } @@ -277,7 +281,7 @@ private void checkEndOfRowGroup() throws IOException { for (int i = 0; i < columns.size(); ++i) { if (missingColumns[i]) continue; columnReaders[i] = new VectorizedColumnReader(columns.get(i), - pages.getPageReader(columns.get(i))); + pages.getPageReader(columns.get(i)), conf); } totalCountLoadedSoFar += pages.getRowCount(); } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index ebf03e1bf8869..5843c5b56d44c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -26,7 +26,6 @@ import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal import scala.util.Try -import org.apache.commons.lang3.StringEscapeUtils import org.apache.hadoop.fs.Path import org.apache.spark.sql.{AnalysisException, Row, SparkSession} @@ -37,7 +36,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTableType._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.catalyst.util.quoteIdentifier -import org.apache.spark.sql.execution.datasources.{DataSource, FileFormat, PartitioningUtils} +import org.apache.spark.sql.execution.datasources.{DataSource, PartitioningUtils} import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat import org.apache.spark.sql.execution.datasources.json.JsonFileFormat import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat @@ -74,6 +73,10 @@ case class CreateTableLikeCommand( sourceTableDesc.provider } + val properties = sourceTableDesc.properties.filter { case (k, _) => + k == ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY + } + // If the location is specified, we create an external table internally. // Otherwise create a managed table. val tblType = if (location.isEmpty) CatalogTableType.MANAGED else CatalogTableType.EXTERNAL @@ -86,6 +89,7 @@ case class CreateTableLikeCommand( locationUri = location.map(CatalogUtils.stringToURI(_))), schema = sourceTableDesc.schema, provider = newProvider, + properties = properties, partitionColumnNames = sourceTableDesc.partitionColumnNames, bucketSpec = sourceTableDesc.bucketSpec) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index 2f3a2c62b912c..8113768cd793f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -632,4 +632,6 @@ object ParquetFileFormat extends Logging { Failure(cause) }.toOption } + + val PARQUET_TIMEZONE_TABLE_PROPERTY = "parquet.mr.int96.write.zone" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala index f1a35dd8a6200..bf395a0bef745 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala @@ -95,7 +95,8 @@ private[parquet] class ParquetReadSupport extends ReadSupport[UnsafeRow] with Lo new ParquetRecordMaterializer( parquetRequestedSchema, ParquetReadSupport.expandUDT(catalystRequestedSchema), - new ParquetSchemaConverter(conf)) + new ParquetSchemaConverter(conf), + conf) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala index 4e49a0dac97c0..df041996cdea9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRecordMaterializer.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources.parquet +import org.apache.hadoop.conf.Configuration import org.apache.parquet.io.api.{GroupConverter, RecordMaterializer} import org.apache.parquet.schema.MessageType @@ -29,13 +30,17 @@ import org.apache.spark.sql.types.StructType * @param parquetSchema Parquet schema of the records to be read * @param catalystSchema Catalyst schema of the rows to be constructed * @param schemaConverter A Parquet-Catalyst schema converter that helps initializing row converters + * @param hadoopConf hadoop Configuration for passing extra params for parquet conversion */ private[parquet] class ParquetRecordMaterializer( - parquetSchema: MessageType, catalystSchema: StructType, schemaConverter: ParquetSchemaConverter) + parquetSchema: MessageType, + catalystSchema: StructType, + schemaConverter: ParquetSchemaConverter, + hadoopConf: Configuration) extends RecordMaterializer[UnsafeRow] { private val rootConverter = - new ParquetRowConverter(schemaConverter, parquetSchema, catalystSchema, NoopUpdater) + new ParquetRowConverter(schemaConverter, parquetSchema, catalystSchema, hadoopConf, NoopUpdater) override def getCurrentRecord: UnsafeRow = rootConverter.currentRecord diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 32e6c60cd9766..d52ff62d93b26 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -19,10 +19,12 @@ package org.apache.spark.sql.execution.datasources.parquet import java.math.{BigDecimal, BigInteger} import java.nio.ByteOrder +import java.util.TimeZone import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer +import org.apache.hadoop.conf.Configuration import org.apache.parquet.column.Dictionary import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter} import org.apache.parquet.schema.{GroupType, MessageType, OriginalType, Type} @@ -34,6 +36,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData} import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -117,12 +120,14 @@ private[parquet] class ParquetPrimitiveConverter(val updater: ParentContainerUpd * @param parquetType Parquet schema of Parquet records * @param catalystType Spark SQL schema that corresponds to the Parquet record type. User-defined * types should have been expanded. + * @param hadoopConf a hadoop Configuration for passing any extra parameters for parquet conversion * @param updater An updater which propagates converted field values to the parent container */ private[parquet] class ParquetRowConverter( schemaConverter: ParquetSchemaConverter, parquetType: GroupType, catalystType: StructType, + hadoopConf: Configuration, updater: ParentContainerUpdater) extends ParquetGroupConverter(updater) with Logging { @@ -261,18 +266,18 @@ private[parquet] class ParquetRowConverter( case TimestampType => // TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that. + // If the table has a timezone property, apply the correct conversions. See SPARK-12297. + val sessionTzString = hadoopConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key) + val sessionTz = Option(sessionTzString).map(TimeZone.getTimeZone(_)) + .getOrElse(DateTimeUtils.defaultTimeZone()) + val storageTzString = hadoopConf.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY) + val storageTz = Option(storageTzString).map(TimeZone.getTimeZone(_)).getOrElse(sessionTz) new ParquetPrimitiveConverter(updater) { // Converts nanosecond timestamps stored as INT96 override def addBinary(value: Binary): Unit = { - assert( - value.length() == 12, - "Timestamps (with nanoseconds) are expected to be stored in 12-byte long binaries, " + - s"but got a ${value.length()}-byte binary.") - - val buf = value.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN) - val timeOfDayNanos = buf.getLong - val julianDay = buf.getInt - updater.setLong(DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos)) + val timestamp = ParquetRowConverter.binaryToSQLTimestamp(value, sessionTz = sessionTz, + storageTz = storageTz) + updater.setLong(timestamp) } } @@ -302,7 +307,7 @@ private[parquet] class ParquetRowConverter( case t: StructType => new ParquetRowConverter( - schemaConverter, parquetType.asGroupType(), t, new ParentContainerUpdater { + schemaConverter, parquetType.asGroupType(), t, hadoopConf, new ParentContainerUpdater { override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy()) }) @@ -651,6 +656,7 @@ private[parquet] class ParquetRowConverter( } private[parquet] object ParquetRowConverter { + def binaryToUnscaledLong(binary: Binary): Long = { // The underlying `ByteBuffer` implementation is guaranteed to be `HeapByteBuffer`, so here // we are using `Binary.toByteBuffer.array()` to steal the underlying byte array without @@ -673,12 +679,35 @@ private[parquet] object ParquetRowConverter { unscaled } - def binaryToSQLTimestamp(binary: Binary): SQLTimestamp = { + /** + * Converts an int96 to a SQLTimestamp, given both the storage timezone and the local timezone. + * The timestamp is really meant to be interpreted as a "floating time", but since we + * actually store it as micros since epoch, why we have to apply a conversion when timezones + * change. + * + * @param binary a parquet Binary which holds one int96 + * @param sessionTz the session timezone. This will be used to determine how to display the time, + * and compute functions on the timestamp which involve a timezone, eg. extract + * the hour. + * @param storageTz the timezone which was used to store the timestamp. This should come from the + * timestamp table property, or else assume its the same as the sessionTz + * @return a timestamp (millis since epoch) which will render correctly in the sessionTz + */ + def binaryToSQLTimestamp( + binary: Binary, + sessionTz: TimeZone, + storageTz: TimeZone): SQLTimestamp = { assert(binary.length() == 12, s"Timestamps (with nanoseconds) are expected to be stored in" + s" 12-byte long binaries. Found a ${binary.length()}-byte binary instead.") val buffer = binary.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN) val timeOfDayNanos = buffer.getLong val julianDay = buffer.getInt - DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos) + val utcEpochMicros = DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos) + // avoid expensive time logic if possible. + if (sessionTz.getID() != storageTz.getID()) { + DateTimeUtils.convertTz(utcEpochMicros, sessionTz, storageTz) + } else { + utcEpochMicros + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala index 38b0e33937f3c..679ed8e361b74 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetWriteSupport.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.parquet import java.nio.{ByteBuffer, ByteOrder} import java.util +import java.util.TimeZone import scala.collection.JavaConverters.mapAsJavaMapConverter @@ -75,6 +76,9 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit // Reusable byte array used to write decimal values private val decimalBuffer = new Array[Byte](minBytesForPrecision(DecimalType.MAX_PRECISION)) + private var storageTz: TimeZone = _ + private var sessionTz: TimeZone = _ + override def init(configuration: Configuration): WriteContext = { val schemaString = configuration.get(ParquetWriteSupport.SPARK_ROW_SCHEMA) this.schema = StructType.fromString(schemaString) @@ -91,6 +95,19 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit this.rootFieldWriters = schema.map(_.dataType).map(makeWriter) + // If the table has a timezone property, apply the correct conversions. See SPARK-12297. + val sessionTzString = configuration.get(SQLConf.SESSION_LOCAL_TIMEZONE.key) + sessionTz = if (sessionTzString == null || sessionTzString == "") { + TimeZone.getDefault() + } else { + TimeZone.getTimeZone(sessionTzString) + } + val storageTzString = configuration.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY) + storageTz = if (storageTzString == null || storageTzString == "") { + sessionTz + } else { + TimeZone.getTimeZone(storageTzString) + } val messageType = new ParquetSchemaConverter(configuration).convert(schema) val metadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> schemaString).asJava @@ -178,7 +195,13 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit // NOTE: Starting from Spark 1.5, Spark SQL `TimestampType` only has microsecond // precision. Nanosecond parts of timestamp values read from INT96 are simply stripped. - val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(row.getLong(ordinal)) + val rawMicros = row.getLong(ordinal) + val adjustedMicros = if (sessionTz.getID() == storageTz.getID()) { + rawMicros + } else { + DateTimeUtils.convertTz(rawMicros, storageTz, sessionTz) + } + val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(adjustedMicros) val buf = ByteBuffer.wrap(timestampBuffer) buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay) recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer)) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index ba48facff2933..8fef467f5f5cb 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -39,9 +39,10 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.ColumnStat -import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.PartitioningUtils +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.hive.client.HiveClient import org.apache.spark.sql.internal.HiveSerDe import org.apache.spark.sql.internal.StaticSQLConf._ @@ -224,6 +225,14 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat throw new TableAlreadyExistsException(db = db, table = table) } + val tableTz = tableDefinition.properties.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY) + tableTz.foreach { tz => + if (!DateTimeUtils.isValidTimezone(tz)) { + throw new AnalysisException(s"Cannot set" + + s" ${ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY} to invalid timezone $tz") + } + } + if (tableDefinition.tableType == VIEW) { client.createTable(tableDefinition, ignoreIfExists) } else { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 6b98066cb76c8..e0b565c0d79a0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.internal.SQLConf.HiveCaseSensitiveInferenceMode._ import org.apache.spark.sql.types._ @@ -174,7 +175,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log // We don't support hive bucketed tables, only ones we write out. bucketSpec = None, fileFormat = fileFormat, - options = options)(sparkSession = sparkSession) + options = options ++ getStorageTzOptions(relation))(sparkSession = sparkSession) val created = LogicalRelation(fsRelation, updatedTable) tableRelationCache.put(tableIdentifier, created) created @@ -201,7 +202,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log userSpecifiedSchema = Option(dataSchema), // We don't support hive bucketed tables, only ones we write out. bucketSpec = None, - options = options, + options = options ++ getStorageTzOptions(relation), className = fileType).resolveRelation(), table = updatedTable) @@ -222,6 +223,13 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log result.copy(output = newOutput) } + private def getStorageTzOptions(relation: CatalogRelation): Map[String, String] = { + // We add the table timezone to the relation options, which automatically gets injected into the + // hadoopConf for the Parquet Converters + val storageTzKey = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY + relation.tableMeta.properties.get(storageTzKey).map(storageTzKey -> _).toMap + } + private def inferIfNeeded( relation: CatalogRelation, options: Map[String, String], diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala index 05b6059472f59..2bfd63d9b56e6 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala @@ -17,12 +17,22 @@ package org.apache.spark.sql.hive +import java.io.File +import java.net.URLDecoder import java.sql.Timestamp +import java.util.TimeZone -import org.apache.spark.sql.Row -import org.apache.spark.sql.execution.datasources.parquet.ParquetCompatibilityTest +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.parquet.hadoop.ParquetFileReader +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName + +import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.execution.datasources.parquet.{ParquetCompatibilityTest, ParquetFileFormat} +import org.apache.spark.sql.functions._ import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{StringType, StructType, TimestampType} class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHiveSingleton { /** @@ -141,4 +151,369 @@ class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with TestHi Row(Seq(Row(1))), "ARRAY>") } + + val testTimezones = Seq( + "UTC" -> "UTC", + "LA" -> "America/Los_Angeles", + "Berlin" -> "Europe/Berlin" + ) + // Check creating parquet tables with timestamps, writing data into them, and reading it back out + // under a variety of conditions: + // * tables with explicit tz and those without + // * altering table properties directly + // * variety of timezones, local & non-local + val sessionTimezones = testTimezones.map(_._2).map(Some(_)) ++ Seq(None) + sessionTimezones.foreach { sessionTzOpt => + val sparkSession = spark.newSession() + sessionTzOpt.foreach { tz => sparkSession.conf.set(SQLConf.SESSION_LOCAL_TIMEZONE.key, tz) } + testCreateWriteRead(sparkSession, "no_tz", None, sessionTzOpt) + val localTz = TimeZone.getDefault.getID() + testCreateWriteRead(sparkSession, "local", Some(localTz), sessionTzOpt) + // check with a variety of timezones. The unit tests currently are configured to always use + // America/Los_Angeles, but even if they didn't, we'd be sure to cover a non-local timezone. + testTimezones.foreach { case (tableName, zone) => + if (zone != localTz) { + testCreateWriteRead(sparkSession, tableName, Some(zone), sessionTzOpt) + } + } + } + + private def testCreateWriteRead( + sparkSession: SparkSession, + baseTable: String, + explicitTz: Option[String], + sessionTzOpt: Option[String]): Unit = { + testCreateAlterTablesWithTimezone(sparkSession, baseTable, explicitTz, sessionTzOpt) + testWriteTablesWithTimezone(sparkSession, baseTable, explicitTz, sessionTzOpt) + testReadTablesWithTimezone(sparkSession, baseTable, explicitTz, sessionTzOpt) + } + + private def checkHasTz(spark: SparkSession, table: String, tz: Option[String]): Unit = { + val tableMetadata = spark.sessionState.catalog.getTableMetadata(TableIdentifier(table)) + assert(tableMetadata.properties.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY) === tz) + } + + private def testCreateAlterTablesWithTimezone( + spark: SparkSession, + baseTable: String, + explicitTz: Option[String], + sessionTzOpt: Option[String]): Unit = { + test(s"SPARK-12297: Create and Alter Parquet tables and timezones; explicitTz = $explicitTz; " + + s"sessionTzOpt = $sessionTzOpt") { + val key = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY + withTable(baseTable, s"like_$baseTable", s"select_$baseTable", s"partitioned_$baseTable") { + // If we ever add a property to set the table timezone by default, defaultTz would change + val defaultTz = None + // check that created tables have correct TBLPROPERTIES + val tblProperties = explicitTz.map { + tz => s"""TBLPROPERTIES ($key="$tz")""" + }.getOrElse("") + spark.sql( + s"""CREATE TABLE $baseTable ( + | x int + | ) + | STORED AS PARQUET + | $tblProperties + """.stripMargin) + val expectedTableTz = explicitTz.orElse(defaultTz) + checkHasTz(spark, baseTable, expectedTableTz) + spark.sql( + s"""CREATE TABLE partitioned_$baseTable ( + | x int + | ) + | PARTITIONED BY (y int) + | STORED AS PARQUET + | $tblProperties + """.stripMargin) + checkHasTz(spark, s"partitioned_$baseTable", expectedTableTz) + spark.sql(s"CREATE TABLE like_$baseTable LIKE $baseTable") + checkHasTz(spark, s"like_$baseTable", expectedTableTz) + spark.sql( + s"""CREATE TABLE select_$baseTable + | STORED AS PARQUET + | AS + | SELECT * from $baseTable + """.stripMargin) + checkHasTz(spark, s"select_$baseTable", defaultTz) + + // check alter table, setting, unsetting, resetting the property + spark.sql( + s"""ALTER TABLE $baseTable SET TBLPROPERTIES ($key="America/Los_Angeles")""") + checkHasTz(spark, baseTable, Some("America/Los_Angeles")) + spark.sql(s"""ALTER TABLE $baseTable SET TBLPROPERTIES ($key="UTC")""") + checkHasTz(spark, baseTable, Some("UTC")) + spark.sql(s"""ALTER TABLE $baseTable UNSET TBLPROPERTIES ($key)""") + checkHasTz(spark, baseTable, None) + explicitTz.foreach { tz => + spark.sql(s"""ALTER TABLE $baseTable SET TBLPROPERTIES ($key="$tz")""") + checkHasTz(spark, baseTable, expectedTableTz) + } + } + } + } + + val desiredTimestampStrings = Seq( + "2015-12-31 22:49:59.123", + "2015-12-31 23:50:59.123", + "2016-01-01 00:39:59.123", + "2016-01-01 01:29:59.123" + ) + // We don't want to mess with timezones inside the tests themselves, since we use a shared + // spark context, and then we might be prone to issues from lazy vals for timezones. Instead, + // we manually adjust the timezone just to determine what the desired millis (since epoch, in utc) + // is for various "wall-clock" times in different timezones, and then we can compare against those + // in our tests. + val timestampTimezoneToMillis = { + val originalTz = TimeZone.getDefault + try { + desiredTimestampStrings.flatMap { timestampString => + Seq("America/Los_Angeles", "Europe/Berlin", "UTC").map { tzId => + TimeZone.setDefault(TimeZone.getTimeZone(tzId)) + val timestamp = Timestamp.valueOf(timestampString) + (timestampString, tzId) -> timestamp.getTime() + } + }.toMap + } finally { + TimeZone.setDefault(originalTz) + } + } + + private def createRawData(spark: SparkSession): Dataset[(String, Timestamp)] = { + import spark.implicits._ + val df = desiredTimestampStrings.toDF("display") + // this will get the millis corresponding to the display time given the current *session* + // timezone. + df.withColumn("ts", expr("cast(display as timestamp)")).as[(String, Timestamp)] + } + + private def testWriteTablesWithTimezone( + spark: SparkSession, + baseTable: String, + explicitTz: Option[String], + sessionTzOpt: Option[String]) : Unit = { + val key = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY + test(s"SPARK-12297: Write to Parquet tables with Timestamps; explicitTz = $explicitTz; " + + s"sessionTzOpt = $sessionTzOpt") { + + withTable(s"saveAsTable_$baseTable", s"insert_$baseTable", s"partitioned_ts_$baseTable") { + val sessionTzId = sessionTzOpt.getOrElse(TimeZone.getDefault().getID()) + // check that created tables have correct TBLPROPERTIES + val tblProperties = explicitTz.map { + tz => s"""TBLPROPERTIES ($key="$tz")""" + }.getOrElse("") + + val rawData = createRawData(spark) + // Check writing data out. + // We write data into our tables, and then check the raw parquet files to see whether + // the correct conversion was applied. + rawData.write.saveAsTable(s"saveAsTable_$baseTable") + checkHasTz(spark, s"saveAsTable_$baseTable", None) + spark.sql( + s"""CREATE TABLE insert_$baseTable ( + | display string, + | ts timestamp + | ) + | STORED AS PARQUET + | $tblProperties + """.stripMargin) + checkHasTz(spark, s"insert_$baseTable", explicitTz) + rawData.write.insertInto(s"insert_$baseTable") + // no matter what, roundtripping via the table should leave the data unchanged + val readFromTable = spark.table(s"insert_$baseTable").collect() + .map { row => (row.getAs[String](0), row.getAs[Timestamp](1)).toString() }.sorted + assert(readFromTable === rawData.collect().map(_.toString()).sorted) + + // Now we load the raw parquet data on disk, and check if it was adjusted correctly. + // Note that we only store the timezone in the table property, so when we read the + // data this way, we're bypassing all of the conversion logic, and reading the raw + // values in the parquet file. + val onDiskLocation = spark.sessionState.catalog + .getTableMetadata(TableIdentifier(s"insert_$baseTable")).location.getPath + // we test reading the data back with and without the vectorized reader, to make sure we + // haven't broken reading parquet from non-hive tables, with both readers. + Seq(false, true).foreach { vectorized => + spark.conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, vectorized) + val readFromDisk = spark.read.parquet(onDiskLocation).collect() + val storageTzId = explicitTz.getOrElse(sessionTzId) + readFromDisk.foreach { row => + val displayTime = row.getAs[String](0) + val millis = row.getAs[Timestamp](1).getTime() + val expectedMillis = timestampTimezoneToMillis((displayTime, storageTzId)) + assert(expectedMillis === millis, s"Display time '$displayTime' was stored " + + s"incorrectly with sessionTz = ${sessionTzOpt}; Got $millis, expected " + + s"$expectedMillis (delta = ${millis - expectedMillis})") + } + } + + // check tables partitioned by timestamps. We don't compare the "raw" data in this case, + // since they are adjusted even when we bypass the hive table. + rawData.write.partitionBy("ts").saveAsTable(s"partitioned_ts_$baseTable") + val partitionDiskLocation = spark.sessionState.catalog + .getTableMetadata(TableIdentifier(s"partitioned_ts_$baseTable")).location.getPath + // no matter what mix of timezones we use, the dirs should specify the value with the + // same time we use for display. + val parts = new File(partitionDiskLocation).list().collect { + case name if name.startsWith("ts=") => URLDecoder.decode(name.stripPrefix("ts=")) + }.toSet + assert(parts === desiredTimestampStrings.toSet) + } + } + } + + private def testReadTablesWithTimezone( + spark: SparkSession, + baseTable: String, + explicitTz: Option[String], + sessionTzOpt: Option[String]): Unit = { + val key = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY + test(s"SPARK-12297: Read from Parquet tables with Timestamps; explicitTz = $explicitTz; " + + s"sessionTzOpt = $sessionTzOpt") { + withTable(s"external_$baseTable", s"partitioned_$baseTable") { + // we intentionally save this data directly, without creating a table, so we can + // see that the data is read back differently depending on table properties. + // we'll save with adjusted millis, so that it should be the correct millis after reading + // back. + val rawData = createRawData(spark) + // to avoid closing over entire class + val timestampTimezoneToMillis = this.timestampTimezoneToMillis + import spark.implicits._ + val adjustedRawData = (explicitTz match { + case Some(tzId) => + rawData.map { case (displayTime, _) => + val storageMillis = timestampTimezoneToMillis((displayTime, tzId)) + (displayTime, new Timestamp(storageMillis)) + } + case _ => + rawData + }).withColumnRenamed("_1", "display").withColumnRenamed("_2", "ts") + withTempPath { basePath => + val unpartitionedPath = new File(basePath, "flat") + val partitionedPath = new File(basePath, "partitioned") + adjustedRawData.write.parquet(unpartitionedPath.getCanonicalPath) + val options = Map("path" -> unpartitionedPath.getCanonicalPath) ++ + explicitTz.map { tz => Map(key -> tz) }.getOrElse(Map()) + + spark.catalog.createTable( + tableName = s"external_$baseTable", + source = "parquet", + schema = new StructType().add("display", StringType).add("ts", TimestampType), + options = options + ) + + // also write out a partitioned table, to make sure we can access that correctly. + // add a column we can partition by (value doesn't particularly matter). + val partitionedData = adjustedRawData.withColumn("id", monotonicallyIncreasingId) + partitionedData.write.partitionBy("id") + .parquet(partitionedPath.getCanonicalPath) + // unfortunately, catalog.createTable() doesn't let us specify partitioning, so just use + // a "CREATE TABLE" stmt. + val tblOpts = explicitTz.map { tz => s"""TBLPROPERTIES ($key="$tz")""" }.getOrElse("") + spark.sql(s"""CREATE EXTERNAL TABLE partitioned_$baseTable ( + | display string, + | ts timestamp + |) + |PARTITIONED BY (id bigint) + |STORED AS parquet + |LOCATION 'file:${partitionedPath.getCanonicalPath}' + |$tblOpts + """.stripMargin) + spark.sql(s"msck repair table partitioned_$baseTable") + + for { + vectorized <- Seq(false, true) + partitioned <- Seq(false, true) + } { + withClue(s"vectorized = $vectorized; partitioned = $partitioned") { + spark.conf.set(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key, vectorized) + val sessionTz = sessionTzOpt.getOrElse(TimeZone.getDefault().getID()) + val table = if (partitioned) s"partitioned_$baseTable" else s"external_$baseTable" + val query = s"select display, cast(ts as string) as ts_as_string, ts " + + s"from $table" + val collectedFromExternal = spark.sql(query).collect() + assert( collectedFromExternal.size === 4) + collectedFromExternal.foreach { row => + val displayTime = row.getAs[String](0) + // the timestamp should still display the same, despite the changes in timezones + assert(displayTime === row.getAs[String](1).toString()) + // we'll also check that the millis behind the timestamp has the appropriate + // adjustments. + val millis = row.getAs[Timestamp](2).getTime() + val expectedMillis = timestampTimezoneToMillis((displayTime, sessionTz)) + val delta = millis - expectedMillis + val deltaHours = delta / (1000L * 60 * 60) + assert(millis === expectedMillis, s"Display time '$displayTime' did not have " + + s"correct millis: was $millis, expected $expectedMillis; delta = $delta " + + s"($deltaHours hours)") + } + + // Now test that the behavior is still correct even with a filter which could get + // pushed down into parquet. We don't need extra handling for pushed down + // predicates because (a) in ParquetFilters, we ignore TimestampType and (b) parquet + // does not read statistics from int96 fields, as they are unsigned. See + // scalastyle:off line.size.limit + // https://github.com/apache/parquet-mr/blob/2fd62ee4d524c270764e9b91dca72e5cf1a005b7/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java#L419 + // https://github.com/apache/parquet-mr/blob/2fd62ee4d524c270764e9b91dca72e5cf1a005b7/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java#L348 + // scalastyle:on line.size.limit + // + // Just to be defensive in case anything ever changes in parquet, this test checks + // the assumption on column stats, and also the end-to-end behavior. + + val hadoopConf = sparkContext.hadoopConfiguration + val fs = FileSystem.get(hadoopConf) + val parts = if (partitioned) { + val subdirs = fs.listStatus(new Path(partitionedPath.getCanonicalPath)) + .filter(_.getPath().getName().startsWith("id=")) + fs.listStatus(subdirs.head.getPath()) + .filter(_.getPath().getName().endsWith(".parquet")) + } else { + fs.listStatus(new Path(unpartitionedPath.getCanonicalPath)) + .filter(_.getPath().getName().endsWith(".parquet")) + } + // grab the meta data from the parquet file. The next section of asserts just make + // sure the test is configured correctly. + assert(parts.size == 1) + val oneFooter = ParquetFileReader.readFooter(hadoopConf, parts.head.getPath) + assert(oneFooter.getFileMetaData.getSchema.getColumns.size === 2) + assert(oneFooter.getFileMetaData.getSchema.getColumns.get(1).getType() === + PrimitiveTypeName.INT96) + val oneBlockMeta = oneFooter.getBlocks().get(0) + val oneBlockColumnMeta = oneBlockMeta.getColumns().get(1) + val columnStats = oneBlockColumnMeta.getStatistics + // This is the important assert. Column stats are written, but they are ignored + // when the data is read back as mentioned above, b/c int96 is unsigned. This + // assert makes sure this holds even if we change parquet versions (if eg. there + // were ever statistics even on unsigned columns). + assert(columnStats.isEmpty) + + // These queries should return the entire dataset, but if the predicates were + // applied to the raw values in parquet, they would incorrectly filter data out. + Seq( + ">" -> "2015-12-31 22:00:00", + "<" -> "2016-01-01 02:00:00" + ).foreach { case (comparison, value) => + val query = + s"select ts from $table where ts $comparison '$value'" + val countWithFilter = spark.sql(query).count() + assert(countWithFilter === 4, query) + } + } + } + } + } + } + } + + test("SPARK-12297: exception on bad timezone") { + val key = ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY + val badTzException = intercept[AnalysisException] { + spark.sql( + s"""CREATE TABLE bad_tz_table ( + | x int + | ) + | STORED AS PARQUET + | TBLPROPERTIES ($key="Blart Versenwald III") + """.stripMargin) + } + assert(badTzException.getMessage.contains("Blart Versenwald III")) + } }