Skip to content

Commit

Permalink
[SPARK-12297][SQL] Hive compatibility for Parquet Timestamps
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

This change allows timestamps in parquet-based hive table to behave as a "floating time", without a timezone, as timestamps are for other file formats.  If the storage timezone is the same as the session timezone, this conversion is a no-op.  When data is read from a hive table, the table property is *always* respected.  This allows spark to not change behavior when reading old data, but read newly written data correctly (whatever the source of the data is).

Spark inherited the original behavior from Hive, but Hive is also updating behavior to use the same  scheme in HIVE-12767 / HIVE-16231.

The default for Spark remains unchanged; created tables do not include the new table property.

This will only apply to hive tables; nothing is added to parquet metadata to indicate the timezone, so data that is read or written directly from parquet files will never have any conversions applied.

## How was this patch tested?

Added a unit test which creates tables, reads and writes data, under a variety of permutations (different storage timezones, different session timezones, vectorized reading on and off).

Author: Imran Rashid <[email protected]>

Closes apache#16781 from squito/SPARK-12297.
  • Loading branch information
squito authored and liyichao committed May 24, 2017
1 parent 11e4e90 commit 43d7363
Show file tree
Hide file tree
Showing 13 changed files with 516 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,10 @@ case class CatalogTablePartition(
/**
* Given the partition schema, returns a row with that schema holding the partition values.
*/
def toRow(partitionSchema: StructType, defaultTimeZondId: String): InternalRow = {
def toRow(partitionSchema: StructType, defaultTimeZoneId: String): InternalRow = {
val caseInsensitiveProperties = CaseInsensitiveMap(storage.properties)
val timeZoneId = caseInsensitiveProperties.getOrElse(
DateTimeUtils.TIMEZONE_OPTION, defaultTimeZondId)
DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId)
InternalRow.fromSeq(partitionSchema.map { field =>
val partValue = if (spec(field.name) == ExternalCatalogUtils.DEFAULT_PARTITION_NAME) {
null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,11 @@ object DateTimeUtils {
false
}

lazy val validTimezones = TimeZone.getAvailableIDs().toSet
def isValidTimezone(timezoneId: String): Boolean = {
validTimezones.contains(timezoneId)
}

/**
* Returns the microseconds since year zero (-17999) from microseconds since epoch.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.spark.sql.execution.datasources.parquet;

import java.io.IOException;
import java.util.TimeZone;

import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Dictionary;
Expand All @@ -30,6 +32,7 @@

import org.apache.spark.sql.catalyst.util.DateTimeUtils;
import org.apache.spark.sql.execution.vectorized.ColumnVector;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.DecimalType;

Expand Down Expand Up @@ -90,11 +93,30 @@ public class VectorizedColumnReader {

private final PageReader pageReader;
private final ColumnDescriptor descriptor;
private final TimeZone storageTz;
private final TimeZone sessionTz;

public VectorizedColumnReader(ColumnDescriptor descriptor, PageReader pageReader)
public VectorizedColumnReader(ColumnDescriptor descriptor, PageReader pageReader,
Configuration conf)
throws IOException {
this.descriptor = descriptor;
this.pageReader = pageReader;
// If the table has a timezone property, apply the correct conversions. See SPARK-12297.
// The conf is sometimes null in tests.
String sessionTzString =
conf == null ? null : conf.get(SQLConf.SESSION_LOCAL_TIMEZONE().key());
if (sessionTzString == null || sessionTzString.isEmpty()) {
sessionTz = DateTimeUtils.defaultTimeZone();
} else {
sessionTz = TimeZone.getTimeZone(sessionTzString);
}
String storageTzString =
conf == null ? null : conf.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY());
if (storageTzString == null || storageTzString.isEmpty()) {
storageTz = sessionTz;
} else {
storageTz = TimeZone.getTimeZone(storageTzString);
}
this.maxDefLevel = descriptor.getMaxDefinitionLevel();

DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
Expand Down Expand Up @@ -289,7 +311,7 @@ private void decodeDictionaryIds(int rowId, int num, ColumnVector column,
// TODO: Convert dictionary of Binaries to dictionary of Longs
if (!column.isNullAt(i)) {
Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v));
column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v, sessionTz, storageTz));
}
}
} else {
Expand Down Expand Up @@ -422,7 +444,7 @@ private void readBinaryBatch(int rowId, int num, ColumnVector column) throws IOE
if (defColumn.readInteger() == maxDefLevel) {
column.putLong(rowId + i,
// Read 12 bytes for INT96
ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12)));
ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12), sessionTz, storageTz));
} else {
column.putNull(rowId + i);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Arrays;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.parquet.column.ColumnDescriptor;
Expand Down Expand Up @@ -95,6 +96,8 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
*/
private boolean returnColumnarBatch;

private Configuration conf;

/**
* The default config on whether columnarBatch should be offheap.
*/
Expand All @@ -107,6 +110,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException, UnsupportedOperationException {
super.initialize(inputSplit, taskAttemptContext);
this.conf = taskAttemptContext.getConfiguration();
initializeInternal();
}

Expand Down Expand Up @@ -277,7 +281,7 @@ private void checkEndOfRowGroup() throws IOException {
for (int i = 0; i < columns.size(); ++i) {
if (missingColumns[i]) continue;
columnReaders[i] = new VectorizedColumnReader(columns.get(i),
pages.getPageReader(columns.get(i)));
pages.getPageReader(columns.get(i)), conf);
}
totalCountLoadedSoFar += pages.getRowCount();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal
import scala.util.Try

import org.apache.commons.lang3.StringEscapeUtils
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
Expand All @@ -37,7 +36,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.execution.datasources.{DataSource, FileFormat, PartitioningUtils}
import org.apache.spark.sql.execution.datasources.{DataSource, PartitioningUtils}
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
Expand Down Expand Up @@ -74,6 +73,10 @@ case class CreateTableLikeCommand(
sourceTableDesc.provider
}

val properties = sourceTableDesc.properties.filter { case (k, _) =>
k == ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY
}

// If the location is specified, we create an external table internally.
// Otherwise create a managed table.
val tblType = if (location.isEmpty) CatalogTableType.MANAGED else CatalogTableType.EXTERNAL
Expand All @@ -86,6 +89,7 @@ case class CreateTableLikeCommand(
locationUri = location.map(CatalogUtils.stringToURI(_))),
schema = sourceTableDesc.schema,
provider = newProvider,
properties = properties,
partitionColumnNames = sourceTableDesc.partitionColumnNames,
bucketSpec = sourceTableDesc.bucketSpec)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -632,4 +632,6 @@ object ParquetFileFormat extends Logging {
Failure(cause)
}.toOption
}

val PARQUET_TIMEZONE_TABLE_PROPERTY = "parquet.mr.int96.write.zone"
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ private[parquet] class ParquetReadSupport extends ReadSupport[UnsafeRow] with Lo
new ParquetRecordMaterializer(
parquetRequestedSchema,
ParquetReadSupport.expandUDT(catalystRequestedSchema),
new ParquetSchemaConverter(conf))
new ParquetSchemaConverter(conf),
conf)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.execution.datasources.parquet

import org.apache.hadoop.conf.Configuration
import org.apache.parquet.io.api.{GroupConverter, RecordMaterializer}
import org.apache.parquet.schema.MessageType

Expand All @@ -29,13 +30,17 @@ import org.apache.spark.sql.types.StructType
* @param parquetSchema Parquet schema of the records to be read
* @param catalystSchema Catalyst schema of the rows to be constructed
* @param schemaConverter A Parquet-Catalyst schema converter that helps initializing row converters
* @param hadoopConf hadoop Configuration for passing extra params for parquet conversion
*/
private[parquet] class ParquetRecordMaterializer(
parquetSchema: MessageType, catalystSchema: StructType, schemaConverter: ParquetSchemaConverter)
parquetSchema: MessageType,
catalystSchema: StructType,
schemaConverter: ParquetSchemaConverter,
hadoopConf: Configuration)
extends RecordMaterializer[UnsafeRow] {

private val rootConverter =
new ParquetRowConverter(schemaConverter, parquetSchema, catalystSchema, NoopUpdater)
new ParquetRowConverter(schemaConverter, parquetSchema, catalystSchema, hadoopConf, NoopUpdater)

override def getCurrentRecord: UnsafeRow = rootConverter.currentRecord

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ package org.apache.spark.sql.execution.datasources.parquet

import java.math.{BigDecimal, BigInteger}
import java.nio.ByteOrder
import java.util.TimeZone

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import org.apache.hadoop.conf.Configuration
import org.apache.parquet.column.Dictionary
import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter}
import org.apache.parquet.schema.{GroupType, MessageType, OriginalType, Type}
Expand All @@ -34,6 +36,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -117,12 +120,14 @@ private[parquet] class ParquetPrimitiveConverter(val updater: ParentContainerUpd
* @param parquetType Parquet schema of Parquet records
* @param catalystType Spark SQL schema that corresponds to the Parquet record type. User-defined
* types should have been expanded.
* @param hadoopConf a hadoop Configuration for passing any extra parameters for parquet conversion
* @param updater An updater which propagates converted field values to the parent container
*/
private[parquet] class ParquetRowConverter(
schemaConverter: ParquetSchemaConverter,
parquetType: GroupType,
catalystType: StructType,
hadoopConf: Configuration,
updater: ParentContainerUpdater)
extends ParquetGroupConverter(updater) with Logging {

Expand Down Expand Up @@ -261,18 +266,18 @@ private[parquet] class ParquetRowConverter(

case TimestampType =>
// TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that.
// If the table has a timezone property, apply the correct conversions. See SPARK-12297.
val sessionTzString = hadoopConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)
val sessionTz = Option(sessionTzString).map(TimeZone.getTimeZone(_))
.getOrElse(DateTimeUtils.defaultTimeZone())
val storageTzString = hadoopConf.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY)
val storageTz = Option(storageTzString).map(TimeZone.getTimeZone(_)).getOrElse(sessionTz)
new ParquetPrimitiveConverter(updater) {
// Converts nanosecond timestamps stored as INT96
override def addBinary(value: Binary): Unit = {
assert(
value.length() == 12,
"Timestamps (with nanoseconds) are expected to be stored in 12-byte long binaries, " +
s"but got a ${value.length()}-byte binary.")

val buf = value.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN)
val timeOfDayNanos = buf.getLong
val julianDay = buf.getInt
updater.setLong(DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos))
val timestamp = ParquetRowConverter.binaryToSQLTimestamp(value, sessionTz = sessionTz,
storageTz = storageTz)
updater.setLong(timestamp)
}
}

Expand Down Expand Up @@ -302,7 +307,7 @@ private[parquet] class ParquetRowConverter(

case t: StructType =>
new ParquetRowConverter(
schemaConverter, parquetType.asGroupType(), t, new ParentContainerUpdater {
schemaConverter, parquetType.asGroupType(), t, hadoopConf, new ParentContainerUpdater {
override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy())
})

Expand Down Expand Up @@ -651,6 +656,7 @@ private[parquet] class ParquetRowConverter(
}

private[parquet] object ParquetRowConverter {

def binaryToUnscaledLong(binary: Binary): Long = {
// The underlying `ByteBuffer` implementation is guaranteed to be `HeapByteBuffer`, so here
// we are using `Binary.toByteBuffer.array()` to steal the underlying byte array without
Expand All @@ -673,12 +679,35 @@ private[parquet] object ParquetRowConverter {
unscaled
}

def binaryToSQLTimestamp(binary: Binary): SQLTimestamp = {
/**
* Converts an int96 to a SQLTimestamp, given both the storage timezone and the local timezone.
* The timestamp is really meant to be interpreted as a "floating time", but since we
* actually store it as micros since epoch, why we have to apply a conversion when timezones
* change.
*
* @param binary a parquet Binary which holds one int96
* @param sessionTz the session timezone. This will be used to determine how to display the time,
* and compute functions on the timestamp which involve a timezone, eg. extract
* the hour.
* @param storageTz the timezone which was used to store the timestamp. This should come from the
* timestamp table property, or else assume its the same as the sessionTz
* @return a timestamp (millis since epoch) which will render correctly in the sessionTz
*/
def binaryToSQLTimestamp(
binary: Binary,
sessionTz: TimeZone,
storageTz: TimeZone): SQLTimestamp = {
assert(binary.length() == 12, s"Timestamps (with nanoseconds) are expected to be stored in" +
s" 12-byte long binaries. Found a ${binary.length()}-byte binary instead.")
val buffer = binary.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN)
val timeOfDayNanos = buffer.getLong
val julianDay = buffer.getInt
DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos)
val utcEpochMicros = DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos)
// avoid expensive time logic if possible.
if (sessionTz.getID() != storageTz.getID()) {
DateTimeUtils.convertTz(utcEpochMicros, sessionTz, storageTz)
} else {
utcEpochMicros
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.parquet

import java.nio.{ByteBuffer, ByteOrder}
import java.util
import java.util.TimeZone

import scala.collection.JavaConverters.mapAsJavaMapConverter

Expand Down Expand Up @@ -75,6 +76,9 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit
// Reusable byte array used to write decimal values
private val decimalBuffer = new Array[Byte](minBytesForPrecision(DecimalType.MAX_PRECISION))

private var storageTz: TimeZone = _
private var sessionTz: TimeZone = _

override def init(configuration: Configuration): WriteContext = {
val schemaString = configuration.get(ParquetWriteSupport.SPARK_ROW_SCHEMA)
this.schema = StructType.fromString(schemaString)
Expand All @@ -91,6 +95,19 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit


this.rootFieldWriters = schema.map(_.dataType).map(makeWriter)
// If the table has a timezone property, apply the correct conversions. See SPARK-12297.
val sessionTzString = configuration.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)
sessionTz = if (sessionTzString == null || sessionTzString == "") {
TimeZone.getDefault()
} else {
TimeZone.getTimeZone(sessionTzString)
}
val storageTzString = configuration.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY)
storageTz = if (storageTzString == null || storageTzString == "") {
sessionTz
} else {
TimeZone.getTimeZone(storageTzString)
}

val messageType = new ParquetSchemaConverter(configuration).convert(schema)
val metadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> schemaString).asJava
Expand Down Expand Up @@ -178,7 +195,13 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit

// NOTE: Starting from Spark 1.5, Spark SQL `TimestampType` only has microsecond
// precision. Nanosecond parts of timestamp values read from INT96 are simply stripped.
val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(row.getLong(ordinal))
val rawMicros = row.getLong(ordinal)
val adjustedMicros = if (sessionTz.getID() == storageTz.getID()) {
rawMicros
} else {
DateTimeUtils.convertTz(rawMicros, storageTz, sessionTz)
}
val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(adjustedMicros)
val buf = ByteBuffer.wrap(timestampBuffer)
buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay)
recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer))
Expand Down
Loading

0 comments on commit 43d7363

Please sign in to comment.