Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12297][SQL] Hive compatibility for Parquet Timestamps #16781

Closed
wants to merge 52 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
53d0744
very basic test for adjusting read parquet data
squito Jan 27, 2017
69a3c8c
wip
squito Jan 27, 2017
51e24f2
working version for non-vectorized read -- lots of garbage too
squito Jan 31, 2017
7e61841
working for vectorized reads -- not sure about all code paths
squito Jan 31, 2017
9fbde13
more tests for write path
squito Feb 1, 2017
bac9eb0
expand tests; fix some metastore interaction; cleanup a lot of garbage
squito Feb 1, 2017
1b05978
more cleanup
squito Feb 1, 2017
b622d27
handle bad timezones; include unit test
squito Feb 1, 2017
0604403
write support; lots more unit tests
squito Feb 2, 2017
f45516d
add tests for alter table
squito Feb 2, 2017
d4511a6
utc or gmt; cleanup
squito Feb 2, 2017
223ce2c
more cleanup
squito Feb 2, 2017
5b49ae0
fix compatibility
squito Feb 2, 2017
9ef60a4
Merge branch 'master' into SPARK-12297
squito Mar 1, 2017
0b6883c
Merge branch 'master' into SPARK-12297
squito Mar 2, 2017
69b8142
wip
squito Mar 3, 2017
7ca2c86
fix
squito Mar 3, 2017
6f982d3
fixes; passes tests now
squito Mar 6, 2017
1ad2f83
Merge branch 'master' into SPARK-12297
squito Mar 6, 2017
2c8a228
fix merge
squito Mar 6, 2017
f0b89fd
fix
squito Mar 6, 2017
db0216f
Merge branch 'master' into SPARK-12297
squito Mar 13, 2017
46fab8d
refactor the test
squito Mar 13, 2017
c242fb8
cleanup
squito Mar 13, 2017
c87a573
reset timezone property after tests; make tests more granular
squito Mar 14, 2017
db7e514
Merge branch 'master' into SPARK-12297
squito Mar 14, 2017
f4dca27
separate tests for reading & writing
squito Mar 15, 2017
2891582
Merge branch 'master' into SPARK-12297
squito Mar 15, 2017
d951443
fix merge
squito Mar 15, 2017
38e19cd
Merge branch 'master' into SPARK-12297
squito Mar 16, 2017
1e3b768
Merge branch 'master' into SPARK-12297
squito Mar 23, 2017
39f506c
cleanup
squito Mar 23, 2017
f33bc91
Merge branch 'master' into SPARK-12297
squito Mar 24, 2017
17565e8
remove config for setting table time zone automatically
squito Mar 24, 2017
a96806f
fixup
squito Mar 24, 2017
7582b2c
predicate pushdown tests
squito Apr 4, 2017
5817064
minor cleanup
squito Apr 6, 2017
773704a
Merge branch 'master' into SPARK-12297
squito Apr 10, 2017
be134be
fix merge
squito Apr 10, 2017
d15b660
swap conversion logic
squito Apr 10, 2017
283b1c7
update tests a bunch; tests session timezones, and tests are easier t…
squito Apr 11, 2017
6ccaa92
session timezones
squito Apr 11, 2017
71c7e60
cleanup
squito Apr 11, 2017
75e8579
Merge branch 'master' into SPARK-12297
squito Apr 11, 2017
e4e88a5
partial review feedback
squito Apr 19, 2017
44a8bbb
better param names and docs
squito Apr 20, 2017
e31657a
review feedback
squito Apr 25, 2017
d4ff9fd
Merge branch 'master' into SPARK-12297
squito May 2, 2017
acc72ea
add check for partitioned tables
squito May 2, 2017
b9c03e9
fix typo
squito May 2, 2017
fc17a2e
review feedback
squito May 2, 2017
2537437
review feedback
squito May 3, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,10 @@ case class CatalogTablePartition(
/**
* Given the partition schema, returns a row with that schema holding the partition values.
*/
def toRow(partitionSchema: StructType, defaultTimeZondId: String): InternalRow = {
def toRow(partitionSchema: StructType, defaultTimeZoneId: String): InternalRow = {
val caseInsensitiveProperties = CaseInsensitiveMap(storage.properties)
val timeZoneId = caseInsensitiveProperties.getOrElse(
DateTimeUtils.TIMEZONE_OPTION, defaultTimeZondId)
DateTimeUtils.TIMEZONE_OPTION, defaultTimeZoneId)
InternalRow.fromSeq(partitionSchema.map { field =>
val partValue = if (spec(field.name) == ExternalCatalogUtils.DEFAULT_PARTITION_NAME) {
null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,11 @@ object DateTimeUtils {
false
}

lazy val validTimezones = TimeZone.getAvailableIDs().toSet
def isValidTimezone(timezoneId: String): Boolean = {
validTimezones.contains(timezoneId)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be case insensitive?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Java does a case-sensitive check, which means Hive does too. I don't think we want to write out a timezone w/ the wrong capitalization, and then have another tool throw an error.

scala> val tzId = "America/Los_Angeles"
tzId: String = America/Los_Angeles

scala> java.util.TimeZone.getTimeZone(tzId).getID()
res1: String = America/Los_Angeles

scala> java.util.TimeZone.getTimeZone(tzId.toLowerCase()).getID()
res2: String = GMT

https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/timestamp/NanoTimeUtils.java#L167

We could try to auto-convert the user's timezone to the correct capitilazation, but do you think that is worth it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. Let's keep current work.

}

/**
* Returns the microseconds since year zero (-17999) from microseconds since epoch.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.spark.sql.execution.datasources.parquet;

import java.io.IOException;
import java.util.TimeZone;

import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Dictionary;
Expand All @@ -30,6 +32,7 @@

import org.apache.spark.sql.catalyst.util.DateTimeUtils;
import org.apache.spark.sql.execution.vectorized.ColumnVector;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.DecimalType;

Expand Down Expand Up @@ -90,11 +93,30 @@ public class VectorizedColumnReader {

private final PageReader pageReader;
private final ColumnDescriptor descriptor;
private final TimeZone storageTz;
private final TimeZone sessionTz;

public VectorizedColumnReader(ColumnDescriptor descriptor, PageReader pageReader)
public VectorizedColumnReader(ColumnDescriptor descriptor, PageReader pageReader,
Configuration conf)
throws IOException {
this.descriptor = descriptor;
this.pageReader = pageReader;
// If the table has a timezone property, apply the correct conversions. See SPARK-12297.
// The conf is sometimes null in tests.
String sessionTzString =
conf == null ? null : conf.get(SQLConf.SESSION_LOCAL_TIMEZONE().key());
if (sessionTzString == null || sessionTzString.isEmpty()) {
sessionTz = DateTimeUtils.defaultTimeZone();
} else {
sessionTz = TimeZone.getTimeZone(sessionTzString);
}
String storageTzString =
conf == null ? null : conf.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY());
if (storageTzString == null || storageTzString.isEmpty()) {
storageTz = sessionTz;
} else {
storageTz = TimeZone.getTimeZone(storageTzString);
}
this.maxDefLevel = descriptor.getMaxDefinitionLevel();

DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
Expand Down Expand Up @@ -289,7 +311,7 @@ private void decodeDictionaryIds(int rowId, int num, ColumnVector column,
// TODO: Convert dictionary of Binaries to dictionary of Longs
if (!column.isNullAt(i)) {
Binary v = dictionary.decodeToBinary(dictionaryIds.getDictId(i));
column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v));
column.putLong(i, ParquetRowConverter.binaryToSQLTimestamp(v, sessionTz, storageTz));
}
}
} else {
Expand Down Expand Up @@ -422,7 +444,7 @@ private void readBinaryBatch(int rowId, int num, ColumnVector column) throws IOE
if (defColumn.readInteger() == maxDefLevel) {
column.putLong(rowId + i,
// Read 12 bytes for INT96
ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12)));
ParquetRowConverter.binaryToSQLTimestamp(data.readBinary(12), sessionTz, storageTz));
} else {
column.putNull(rowId + i);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Arrays;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.parquet.column.ColumnDescriptor;
Expand Down Expand Up @@ -95,6 +96,8 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
*/
private boolean returnColumnarBatch;

private Configuration conf;

/**
* The default config on whether columnarBatch should be offheap.
*/
Expand All @@ -107,6 +110,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException, UnsupportedOperationException {
super.initialize(inputSplit, taskAttemptContext);
this.conf = taskAttemptContext.getConfiguration();
initializeInternal();
}

Expand Down Expand Up @@ -277,7 +281,7 @@ private void checkEndOfRowGroup() throws IOException {
for (int i = 0; i < columns.size(); ++i) {
if (missingColumns[i]) continue;
columnReaders[i] = new VectorizedColumnReader(columns.get(i),
pages.getPageReader(columns.get(i)));
pages.getPageReader(columns.get(i)), conf);
}
totalCountLoadedSoFar += pages.getRowCount();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal
import scala.util.Try

import org.apache.commons.lang3.StringEscapeUtils
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
Expand All @@ -37,7 +36,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.execution.datasources.{DataSource, FileFormat, PartitioningUtils}
import org.apache.spark.sql.execution.datasources.{DataSource, PartitioningUtils}
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
Expand Down Expand Up @@ -74,6 +73,10 @@ case class CreateTableLikeCommand(
sourceTableDesc.provider
}

val properties = sourceTableDesc.properties.filter { case (k, _) =>
k == ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY
}

// If the location is specified, we create an external table internally.
// Otherwise create a managed table.
val tblType = if (location.isEmpty) CatalogTableType.MANAGED else CatalogTableType.EXTERNAL
Expand All @@ -86,6 +89,7 @@ case class CreateTableLikeCommand(
locationUri = location.map(CatalogUtils.stringToURI(_))),
schema = sourceTableDesc.schema,
provider = newProvider,
properties = properties,
partitionColumnNames = sourceTableDesc.partitionColumnNames,
bucketSpec = sourceTableDesc.bucketSpec)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -632,4 +632,6 @@ object ParquetFileFormat extends Logging {
Failure(cause)
}.toOption
}

val PARQUET_TIMEZONE_TABLE_PROPERTY = "parquet.mr.int96.write.zone"
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ private[parquet] class ParquetReadSupport extends ReadSupport[UnsafeRow] with Lo
new ParquetRecordMaterializer(
parquetRequestedSchema,
ParquetReadSupport.expandUDT(catalystRequestedSchema),
new ParquetSchemaConverter(conf))
new ParquetSchemaConverter(conf),
conf)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.execution.datasources.parquet

import org.apache.hadoop.conf.Configuration
import org.apache.parquet.io.api.{GroupConverter, RecordMaterializer}
import org.apache.parquet.schema.MessageType

Expand All @@ -29,13 +30,17 @@ import org.apache.spark.sql.types.StructType
* @param parquetSchema Parquet schema of the records to be read
* @param catalystSchema Catalyst schema of the rows to be constructed
* @param schemaConverter A Parquet-Catalyst schema converter that helps initializing row converters
* @param hadoopConf hadoop Configuration for passing extra params for parquet conversion
*/
private[parquet] class ParquetRecordMaterializer(
parquetSchema: MessageType, catalystSchema: StructType, schemaConverter: ParquetSchemaConverter)
parquetSchema: MessageType,
catalystSchema: StructType,
schemaConverter: ParquetSchemaConverter,
hadoopConf: Configuration)
extends RecordMaterializer[UnsafeRow] {

private val rootConverter =
new ParquetRowConverter(schemaConverter, parquetSchema, catalystSchema, NoopUpdater)
new ParquetRowConverter(schemaConverter, parquetSchema, catalystSchema, hadoopConf, NoopUpdater)

override def getCurrentRecord: UnsafeRow = rootConverter.currentRecord

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ package org.apache.spark.sql.execution.datasources.parquet

import java.math.{BigDecimal, BigInteger}
import java.nio.ByteOrder
import java.util.TimeZone

import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer

import org.apache.hadoop.conf.Configuration
import org.apache.parquet.column.Dictionary
import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter}
import org.apache.parquet.schema.{GroupType, MessageType, OriginalType, Type}
Expand All @@ -34,6 +36,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -117,12 +120,14 @@ private[parquet] class ParquetPrimitiveConverter(val updater: ParentContainerUpd
* @param parquetType Parquet schema of Parquet records
* @param catalystType Spark SQL schema that corresponds to the Parquet record type. User-defined
* types should have been expanded.
* @param hadoopConf a hadoop Configuration for passing any extra parameters for parquet conversion
* @param updater An updater which propagates converted field values to the parent container
*/
private[parquet] class ParquetRowConverter(
schemaConverter: ParquetSchemaConverter,
parquetType: GroupType,
catalystType: StructType,
hadoopConf: Configuration,
updater: ParentContainerUpdater)
extends ParquetGroupConverter(updater) with Logging {

Expand Down Expand Up @@ -261,18 +266,18 @@ private[parquet] class ParquetRowConverter(

case TimestampType =>
// TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that.
// If the table has a timezone property, apply the correct conversions. See SPARK-12297.
val sessionTzString = hadoopConf.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)
val sessionTz = Option(sessionTzString).map(TimeZone.getTimeZone(_))
.getOrElse(DateTimeUtils.defaultTimeZone())
val storageTzString = hadoopConf.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY)
val storageTz = Option(storageTzString).map(TimeZone.getTimeZone(_)).getOrElse(sessionTz)
new ParquetPrimitiveConverter(updater) {
// Converts nanosecond timestamps stored as INT96
override def addBinary(value: Binary): Unit = {
assert(
value.length() == 12,
"Timestamps (with nanoseconds) are expected to be stored in 12-byte long binaries, " +
s"but got a ${value.length()}-byte binary.")

val buf = value.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN)
val timeOfDayNanos = buf.getLong
val julianDay = buf.getInt
updater.setLong(DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos))
val timestamp = ParquetRowConverter.binaryToSQLTimestamp(value, sessionTz = sessionTz,
storageTz = storageTz)
updater.setLong(timestamp)
}
}

Expand Down Expand Up @@ -302,7 +307,7 @@ private[parquet] class ParquetRowConverter(

case t: StructType =>
new ParquetRowConverter(
schemaConverter, parquetType.asGroupType(), t, new ParentContainerUpdater {
schemaConverter, parquetType.asGroupType(), t, hadoopConf, new ParentContainerUpdater {
override def set(value: Any): Unit = updater.set(value.asInstanceOf[InternalRow].copy())
})

Expand Down Expand Up @@ -651,6 +656,7 @@ private[parquet] class ParquetRowConverter(
}

private[parquet] object ParquetRowConverter {

def binaryToUnscaledLong(binary: Binary): Long = {
// The underlying `ByteBuffer` implementation is guaranteed to be `HeapByteBuffer`, so here
// we are using `Binary.toByteBuffer.array()` to steal the underlying byte array without
Expand All @@ -673,12 +679,35 @@ private[parquet] object ParquetRowConverter {
unscaled
}

def binaryToSQLTimestamp(binary: Binary): SQLTimestamp = {
/**
* Converts an int96 to a SQLTimestamp, given both the storage timezone and the local timezone.
* The timestamp is really meant to be interpreted as a "floating time", but since we
* actually store it as micros since epoch, why we have to apply a conversion when timezones
* change.
*
* @param binary a parquet Binary which holds one int96
* @param sessionTz the session timezone. This will be used to determine how to display the time,
* and compute functions on the timestamp which involve a timezone, eg. extract
* the hour.
* @param storageTz the timezone which was used to store the timestamp. This should come from the
* timestamp table property, or else assume its the same as the sessionTz
* @return a timestamp (millis since epoch) which will render correctly in the sessionTz
*/
def binaryToSQLTimestamp(
binary: Binary,
sessionTz: TimeZone,
storageTz: TimeZone): SQLTimestamp = {
assert(binary.length() == 12, s"Timestamps (with nanoseconds) are expected to be stored in" +
s" 12-byte long binaries. Found a ${binary.length()}-byte binary instead.")
val buffer = binary.toByteBuffer.order(ByteOrder.LITTLE_ENDIAN)
val timeOfDayNanos = buffer.getLong
val julianDay = buffer.getInt
DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos)
val utcEpochMicros = DateTimeUtils.fromJulianDay(julianDay, timeOfDayNanos)
// avoid expensive time logic if possible.
if (sessionTz.getID() != storageTz.getID()) {
DateTimeUtils.convertTz(utcEpochMicros, sessionTz, storageTz)
} else {
utcEpochMicros
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.parquet

import java.nio.{ByteBuffer, ByteOrder}
import java.util
import java.util.TimeZone

import scala.collection.JavaConverters.mapAsJavaMapConverter

Expand Down Expand Up @@ -75,6 +76,9 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit
// Reusable byte array used to write decimal values
private val decimalBuffer = new Array[Byte](minBytesForPrecision(DecimalType.MAX_PRECISION))

private var storageTz: TimeZone = _
private var sessionTz: TimeZone = _

override def init(configuration: Configuration): WriteContext = {
val schemaString = configuration.get(ParquetWriteSupport.SPARK_ROW_SCHEMA)
this.schema = StructType.fromString(schemaString)
Expand All @@ -91,6 +95,19 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit


this.rootFieldWriters = schema.map(_.dataType).map(makeWriter)
// If the table has a timezone property, apply the correct conversions. See SPARK-12297.
val sessionTzString = configuration.get(SQLConf.SESSION_LOCAL_TIMEZONE.key)
sessionTz = if (sessionTzString == null || sessionTzString == "") {
TimeZone.getDefault()
} else {
TimeZone.getTimeZone(sessionTzString)
}
val storageTzString = configuration.get(ParquetFileFormat.PARQUET_TIMEZONE_TABLE_PROPERTY)
storageTz = if (storageTzString == null || storageTzString == "") {
sessionTz
} else {
TimeZone.getTimeZone(storageTzString)
}

val messageType = new ParquetSchemaConverter(configuration).convert(schema)
val metadata = Map(ParquetReadSupport.SPARK_METADATA_KEY -> schemaString).asJava
Expand Down Expand Up @@ -178,7 +195,13 @@ private[parquet] class ParquetWriteSupport extends WriteSupport[InternalRow] wit

// NOTE: Starting from Spark 1.5, Spark SQL `TimestampType` only has microsecond
// precision. Nanosecond parts of timestamp values read from INT96 are simply stripped.
val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(row.getLong(ordinal))
val rawMicros = row.getLong(ordinal)
val adjustedMicros = if (sessionTz.getID() == storageTz.getID()) {
rawMicros
} else {
DateTimeUtils.convertTz(rawMicros, storageTz, sessionTz)
}
val (julianDay, timeOfDayNanos) = DateTimeUtils.toJulianDay(adjustedMicros)
val buf = ByteBuffer.wrap(timestampBuffer)
buf.order(ByteOrder.LITTLE_ENDIAN).putLong(timeOfDayNanos).putInt(julianDay)
recordConsumer.addBinary(Binary.fromReusedByteArray(timestampBuffer))
Expand Down
Loading