Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-12297] Table timezone correction for Timestamps #19250

Closed
wants to merge 23 commits into from

Conversation

squito
Copy link
Contributor

@squito squito commented Sep 15, 2017

What changes were proposed in this pull request?

When reading and writing data, spark will adjust timestamp data based on the delta between the current session timezone and the table time zone (specified either by a persistent table property, or an option to the DataFrameReader / Writer). This is particularly important for parquet data, so that it can be treated equivalently by other SQL engines (eg. Impala and Hive). Furthermore, this is useful if the same data is processed by multiple clusters in different time zones, and "timestamp without time zone" semantics are desired.

Design doc: https://docs.google.com/document/d/1mcbkVo-PSsFh6iOOYx6Rk_34aY25H_zT1E2f7KmLMOU/edit#heading=h.rj5dmsuhw2zg

How was this patch tested?

Unit tests.

more appropriate location.  Ensure rule works without hive support.
Extra checks on when table timezones are set.
@@ -92,7 +92,7 @@ case class CreateHiveTableAsSelectCommand(
}

override def argString: String = {
s"[Database:${tableDesc.database}}, " +
s"[Database:${tableDesc.database}, " +
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

totally unrelated typo fix, but didn't seem worth an independent pr

@SparkQA
Copy link

SparkQA commented Sep 15, 2017

Test build #81830 has finished for PR 19250 at commit 5105b72.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 15, 2017

Test build #81833 has finished for PR 19250 at commit c5571a8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 18, 2017

Test build #81888 has finished for PR 19250 at commit 950d33a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 18, 2017

Test build #81892 has finished for PR 19250 at commit e36851e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor Author

squito commented Sep 19, 2017

Hi @ueshin @rxin, could you please review? thanks!

@SparkQA
Copy link

SparkQA commented Sep 19, 2017

Test build #81931 has finished for PR 19250 at commit 74a9905.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 19, 2017

Test build #81929 has finished for PR 19250 at commit 2069b65.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 19, 2017

Test build #81943 has finished for PR 19250 at commit 515b38b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor Author

squito commented Sep 20, 2017

also cc @yhuai @liancheng would appreciate a review since you've looked at sql & hive compatibility in the past

@SparkQA
Copy link

SparkQA commented Sep 21, 2017

Test build #82037 has finished for PR 19250 at commit a869c6e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor Author

squito commented Sep 26, 2017

@HyukjinKwon you might be interested in this one also

@vanzin
Copy link
Contributor

vanzin commented Oct 3, 2017

FYI Imran is probably going to be out for a few weeks so I'll try to address the feedback here. It would be nice to have people take a look at this, though.

* We intentionally do not provide an ExpressionDescription as this is not meant to be exposed to
* users, its only used for internal conversions.
*/
private[spark] case class TimestampTimezoneCorrection(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need a whole expression for this? can't we just reuse existing expressions? It's just simple arithmetics isn't it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you could use ToUTCTimestamp / FromUTCTimestamp for this, but that would be more expensive since you'd be doing the conversion twice for each value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I'm saying is the analysis rule can just determine the delta, and then just do a simple add/delete.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, let me see if I can figure that out.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately the offset depends on the actual date, so a timezone conversion can not be simplified to a simple delta.

Daylight saving time starts and ends on different days in different timezones, while some timezones don't have DST changes at all. Additionally, timezone rules have changed over time and keep changing. Both the basic timezone offset and the DST rules themselves could change (and have changed) over time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally, timezone rules have changed over time and keep changing.

Ah, yes, that makes sense... it's also why my initial tests were failing at the DST boundaries. :-/

@@ -1015,6 +1020,10 @@ object DateTimeUtils {
guess
}

def convertTz(ts: SQLTimestamp, fromZone: String, toZone: String): SQLTimestamp = {
convertTz(ts, getTimeZone(fromZone), getTimeZone(toZone))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

performance is going to suck here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess caching the value as done by the FromUTCTimestamp expression is the right way to go?

@@ -266,6 +267,10 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* @since 1.4.0
*/
def insertInto(tableName: String): Unit = {
extraOptions.get(TimestampTableTimeZone.TIMEZONE_PROPERTY).foreach { tz =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't seem to be doing this type of validity check in general; otherwise we'd need to add a lot more checks here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The spec requests errors when using invalid time zones. I guess this would still fail with a different error in that case, so I can remove this if you're really against adding it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I tried a couple of things, and while it may be possible to remove some of these checks and replace them with a check in DateTimeUtils.computeTimeZone, that doesn't cover all cases. For example, you could use "ALTER TABLE" with an invalid time zone and that wouldn't trigger the check.

So given the spec I'm inclined to leave the checks as is, unless @zivanfi is open to making the spec more lax in that area. (TimeZone.getTimeZone(invalidId) actually returns the UTC time zone, as unexpected as that behavior may be, so things won't necessarily break without the checks.)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although other table properties don't have similar checks, their effect is usually easy to see. The effect of this specific table property however is not immediately apparent: for new data it is only revealed in interoperability with other components, and for existing data it should not have any visible effect if set correctly. Therefore we decided it would be best to be very strict in checks, because otherwise a typo in the table property value could only be discovered after some data has already been written with irreversible errors. This was the reasoning behind this part of specs.

@@ -230,6 +230,13 @@ case class AlterTableSetPropertiesCommand(
isView: Boolean)
extends RunnableCommand {

if (isView) {
properties.get(TimestampTableTimeZone.TIMEZONE_PROPERTY).foreach { _ =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there even a meaning to set properties for any views? we should either drop this check, or have a more general check.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HiveQL explicitly allows properties in view; I've never used them, though.

* like TIMESTAMP WITHOUT TIMEZONE. This gives correct behavior if you process data with
* machines in different timezones, or if you access the data from multiple SQL engines.
*/
private[sql] case class TimestampTableTimeZone(sparkSession: SparkSession)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i need to take a look at this again to make sure i understand what's happening. conceptually what you are doing is pretty simple and it doesn't seem right it needs so many lines of code, but maybe i'm missing something.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also trying to see if this can be simplified; I guess the main thing is Imran's comment about not being able to use transformUp. I need to take a look at whether that's really the case.

This rule also doesn't seem to handle InsertIntoHiveTable, which is in the hive module so can't be handled here. Probably will need a new rule based on this one in the hive module.

Marcelo Vanzin added 5 commits October 6, 2017 15:38
This triggers use of InsertIntoHiveTable. Tests currently fail
because that command is not yet processed by the new timestamp
code.
Tests still don't pass. Kinda hard to figure out which test is failing
with current code.
This makes it easier to see what combinations fail, even if the
suite itself ends up running for a bit longer. Doesn't seem bad,
though.
Also some other minor fixes and cleanups to the code.
@vanzin
Copy link
Contributor

vanzin commented Oct 9, 2017

I pushed a bunch of changes to address feedback here and also my own feedback that I didn't bother to write down. Main changes:

  • cache the TimeZone instances in generated code
  • rename a bunch of things to names that sounded better to me
  • refactored the test internals so that individual combinations are run as separate tests instead of one humongous test; this makes it easier to debug failures
  • added support (and tests) for raw Hive tables

I have not cleaned up the timestamp correction rule yet; I think I have a better idea of the code now and will do that next.

@SparkQA
Copy link

SparkQA commented Oct 9, 2017

Test build #82561 has finished for PR 19250 at commit e74ce2d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Marcelo Vanzin added 2 commits October 10, 2017 09:42
Running the rule during resolution also allowed to do all the needed
ajustments with a single rule (instead of needing a Hive-specific
rule for InsertIntoHiveTable).
@SparkQA
Copy link

SparkQA commented Oct 10, 2017

Test build #82592 has finished for PR 19250 at commit 5c03e07.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class AdjustTimestamps(conf: SQLConf) extends Rule[LogicalPlan]

@SparkQA
Copy link

SparkQA commented Oct 10, 2017

Test build #82595 has finished for PR 19250 at commit 5607160.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 11, 2017

Test build #82610 has finished for PR 19250 at commit 7e44486.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zivanfi
Copy link

zivanfi commented Oct 11, 2017

@attilajeges has just found a problem with the behavior specified in the requirements:

  • Partitions of a table can use different file formats.
  • As a result, a single table can have data files of different file formats at the same time.
  • Timestamps are already handled differently in these formats (this was our original problem to begin with).
  • As a result, no uniform adjustment can fix timestamps for all file formats of the same table at the same time.

We can still solve the issue using a file-format-specific table property though. @rxin, I would like to ask you opinion of such a solution:

  • It is implemented in the analyzer, as you asked.
  • It is writer-agnostic, as you asked.
  • It is not file-format-agnostic, but Parquet-specific instead for the time being.

Would you find such a soltion be acceptable, given that a file-format-agnostic fix seems infeasible at this point?

Thanks,

Zoltan

@cloud-fan
Copy link
Contributor

why is this patch so complicated? Based on the fact that data sources accept a "timezone" option for read/writre, I'd expect it to be just:

  • when CreateTable, set session local timezone to table property.
  • when planning InsertIntoTable to concrete data source write command, get the timezone from table property and put it in write options.
  • when plannin table scan to cocrete data source scan, get the timezone from table property and put it in read options.

For data sources that doesn't support "timezone" options, there is nothing we can do.

@squito
Copy link
Contributor Author

squito commented Nov 6, 2017

@cloud-fan I think you misunderstand the purpose of this change.

The primary purpose is actually to deal with parquet, where that option doesn't do anything. We need this for parquet for two reasons:

  1. Interoperability with Impala. Impala first used an int96 to store a timestamp in parquet, and it always stored the time as UTC (to go with the SQL standard definition of timezone). But spark (and hive) read it back in the current timezone. Even when you don't change timezones, and the timestamp with time zone vs. timestamp without time zone distinction doesn't matter, you get different values before this change.

  2. SQL STANDARD TIMESTAMP. SQL defines timestamp to be a synonym for timestamp without time zone. The behavior of that type is defined so if you insert "08:30" with time zone "America/New_York", then load the data with time zone "America/Los_Angeles", you should still see "08:30". Since parquet is stored as an instant-in-time, and spark internally applies a timezone, the change in timezone must be reversed, by using some consistent adustment when saving and reloading. This doesn't give you real timestamp without time zone, but gets you closer.

To be honest, I see limited value in this change for formats other than parquet -- I added only because I thought Reynold wanted it (for symmetry across formats, I suppose?). As the purpose of this is to undo timezones, you can already achieve something similar in text-based formats by specifying a format which leaves out the timezone. But it doesn't hurt.

We could reuse "timezone" option for parquet for this purpose, but that would be rather strange as its almost doing the opposite as what that property does for text-based formats, as that property is for adding a timezone, and this is for "removing" it. Its doing something special enough it seems like it deserves a more specific name than just "timezone".

(This is all discussed at greater length, including showing how this type behaves in other sql engines, and how spark's behavior is non-standard, and how it changed in 2.0.1, in the design docs.)

@cloud-fan
Copy link
Contributor

What's the interoperability issue with Impala? I think both Spark and Impala store timestamp as parquet INT96, representing nanoseconds from epoch, there is no timezone confusion. Internally Spark uses a long to store timestamp, representing microseconds from epoch, so we don't and shoud't consider timezone when reading parquet INT96 timestamp.

I think your problem may about display. When Spark displays a timestamp value, via df.show, we convert the internal long value to standard timestamp string according to the session local timezone. Some examples:

// 1000 milliseconds from epoch, no timezone confusion
scala> val df = Seq(new java.sql.Timestamp(1000)).toDF("ts")
df: org.apache.spark.sql.DataFrame = [ts: timestamp]

scala> spark.conf.set("spark.sql.session.timeZone", "GMT")

scala> df.show
+-------------------+
|                 ts|
+-------------------+
|1970-01-01 00:00:01|
+-------------------+

scala> spark.conf.set("spark.sql.session.timeZone", "PST")

scala> df.show
+-------------------+
|                 ts|
+-------------------+
|1969-12-31 16:00:01|
+-------------------+

This behavior, I think makes sense, but may not be SQL-compliant. A clean solution is to add TIMESTAMP WITE TIMEZONE type, so that when we convert the internal long value to string, we can know which timezone to use.

Your proposal seems to hack the internal long value and lie to Spark about the microseconds from eppch, which doesn't look good.

@zivanfi
Copy link

zivanfi commented Nov 7, 2017

The interoperability issue is that Impala follows timezone-agnostic timestamp semantics as mandated by the SQL standard, while SparkSQL follows UTC-normalized semantics instead (which is not SQL-compliant). Please see the design doc for details.

The long-term solution is using separate SQL types for different timestamps semantics indeed as you suggest. However, until we get there we would like to have a short-term solution that fixes timestamps that are already written.

Please note that the "ms from epoch" value not being constant any more is a consequence of the timezone-agnostic semantics. The SQL standard specifies this in Section 4.6.2 of Part 2: Foundation. Pure TIMESTAMP has to mean TIMESTAMP WITHOUT TIME ZONE and the latter must have the same hour, minute, second values regardless of the timezone. It also specifies that the TIMESTAMP WITHOUT TIME ZONE to TIMESTAMP WITH TIME ZONE conversion has to happen by subtracting the session-local time zone offset from the TIMESTAMP WITHOUT TIME ZONE value to calculate a corresponding UTC value. It naturally follows that during this conversion the UTC value will change depending the session-local time zone. It is like parsing dates from text files in SparkSQL, where the UTC value also changes depending on the session-local time zone.

@cloud-fan
Copy link
Contributor

cloud-fan commented Nov 7, 2017

Ah now I understand this issue. Yes Spark doesn't follow the SQL standard, the Spark timestamp is actually TIMESTAMP WITH LOCAL TIME ZONE, which is not SQL standard but used in some databases like Oracle.

Although Impala follows SQL standard, it doesn't follow parquet standard, that's why we need to deal with the parquet INT96 issue here. I think we can follow what Hive/Impala did for interoperability, i.e. create a config to interpret parquet INT96 as timezone-agnostic timestamp in parquet reader of Spark.

However, I'm less sure about the parquet.timezone-adjustment table property. Is this a standard published somewhere? Do Impala and Hive both respect it? I think we need people from both Impapa and Hive to say YES to this proposal.

@squito
Copy link
Contributor Author

squito commented Nov 7, 2017

I think we can follow what Hive/Impala did for interoperability, i.e. create a config to interpret parquet INT96 as timezone-agnostic timestamp in parquet reader of Spark.

If I understand what you are asking correctly, I think this is what went into the original PR:
#16781

However, I'm less sure about the parquet.timezone-adjustment table property. Is this a standard published somewhere? Do Impala and Hive both respect it? I think we need people from both Impapa and Hive to say YES to this proposal.

All three engines were going to make the change, till it was reverted from Spark. Now the process as a whole is blocked on Spark -- if this change (or the prior one) is accepted, then the other engines can move forward too.

@zivanfi
Copy link

zivanfi commented Nov 8, 2017

Hive and Impala introduced the following workaround for timestamp interoperability a long ago: The footer of the Parquet file contains metadata about the library that wrote the file. For Hive and Spark this value is parquet-mr, for Impala it is impala itself, since it has its own implementation. Since Hive and Spark writes using UTC-normalized semantics and Impala writes using timezone-agnostic semantics, we can deduce the used semantics from the writer info. So, when Hive sees a Parquet file written by Impala, it will adjust timestamps to compensate for the difference. Impala has an option (-convert_legacy_hive_parquet_utc_timestamp) to do the same when it sees a Parquet file written by anything else than Impala.

There are two problems with this workaround:

  • Spark does not have a similar logic, so while Impala<->Hive and Hive<->SparkSQL can read each other's timestamps, the Impala->SparkSQL direction does not work. If SparkSQL implemented such a writer-dependent adjustment logic, it would already improve interoperability significantly.
  • The adjustment depends on the local timezone. This can be problematic if a table contains timestamps of mixed semantics, i.e. some data was written by Impala and some data was written by Hive or SparkSQL. In that case, if the local time used for reading differs from the local time used for writing, some of the timestamps will change and some others won't. Both of these behaviors are okay by themselves, since these are two valid timestamp semantics, but both of them happening on a single table is very counter-intuitive. This would be especially problematic for SparkSQL, since SparkSQL has a session-local timezone setting (Impala and Hive use the server timezone, which tends to remain unchanged).

To address both of these issues, we both added the recognition and adjustment of Impala-written timestamps to SparkSQL and also added a table property to record the timezone that should be used for these adjustments, so that mixed table do not lead to unintuitive behaviour any more. We also added this table property to Impala and Hive logic and tested that they can correctly read each other's timestamps.
However, our initial commit (which was the first of two commits and was meant to be followed by a follow-up change) got reverted in Spark due to some concerns. Because the table property only provides interoperability if respected by all affected components, we reverted our changes to Hive and Impala as well until we can reach an agreement with Spark.

To address the concerns that lead to Reynold to revert our initial commit, Imran made three changes compared to our original proposal:

  • The adjustment logic was moved to the analyzer.
  • The writer-specific logic was removed, all timestamps get the same treatment regardless of the component that wrote them. As a result of this, the code became simpler and nicer at the price of a behaviour change: Since Impala-written timestamps are already timezone-agnostic, the user now has to specify UTC in the table property for that table (earlier it didn't matter). It also means that it is no longer possible to fix a table that already has mixed semantics content, since you can not set the table property to UTC as that would make Hive/Spark timestamp wrong and you can't set it to the local timezone either because that would make the Impala timestamps wrong. Although more restricting than our initial proposal, this still seems acceptable, since most existing tables are single-writer only, and once you set the table property all writers will respect it, so after you set the table property you can have mixed-writer tables (they won't become mixed-semantics due to the table property).
  • The adjustment logic was made file-format-agnostic so that it does not only apply to Parquet but to any kinds of tables. However, we then realized that this will lead to further interoperability problems, thereby we would like to stick with a Parquet-specific approach as we originally proposed to avoid making the situation worse.

@cloud-fan
Copy link
Contributor

cloud-fan commented Nov 8, 2017

IIUC, using the parquet.timezone-adjustment table property requires changing the writer. e.g. Impala creates a table and Hive wants to write data to it, then Hive needs to write timezone-agnostic timestamp instead of UTC timestamp. And vice versa.

I think a better solution is the reader-specific approach. Impala keeps writing timezone-agnostic timestamp, and Spark/Hive keep writing UTC timestamp. At the read path, we should detect who wrote the parquet file and do timestamp adjustment based on this information(can also have a config to protect this behavior).

cc @rxin

@zivanfi
Copy link

zivanfi commented Nov 8, 2017

Yes, you understand correctly, the table property affects both the read path and the write path, while the current workaround used by Hive and Impala only affects the read path. (Both are Parquet-specific though, I think you meant to write "writer-specific" when referring to the latter.)

@cloud-fan
Copy link
Contributor

actually I took a look at #16781 , It also proposed a table property, instead of a simple spark config.

@zivanfi
Copy link

zivanfi commented Nov 9, 2017

Yes, that is correct. We introduced the table property to address the 2nd problem I mentioned above: "The adjustment depends on the local timezone." (details in my previous comment). But I think that a simpler workaround similar to what already exists in Hive would already be a big step forward for interoperability of existing data.

@cloud-fan
Copy link
Contributor

ok @squito can we send a new PR to do it? basically in parquet read task, get the writer info from the footer. If the writer is impala, and a config is set, we treat the seconds as seconds from epoch of session local time zone, and adjust the seconds to seconds from Unix epoch.

@squito
Copy link
Contributor Author

squito commented Nov 9, 2017

@cloud-fan yes I can do that, will be next week before I get to it

@vanzin
Copy link
Contributor

vanzin commented Dec 19, 2017

@squito we can close this right?

@squito squito closed this Dec 20, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants