Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove GpuToTimestampImproved and spark.rapids.sql.improvedTimeOps.enabled #10033

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion docs/additional-functionality/advanced_configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ Name | Description | Default Value | Applicable at
<a name="sql.hasExtendedYearValues"></a>spark.rapids.sql.hasExtendedYearValues|Spark 3.2.0+ extended parsing of years in dates and timestamps to support the full range of possible values. Prior to this it was limited to a positive 4 digit year. The Accelerator does not support the extended range yet. This config indicates if your data includes this extended range or not, or if you don't care about getting the correct values on values with the extended range.|true|Runtime
<a name="sql.hashOptimizeSort.enabled"></a>spark.rapids.sql.hashOptimizeSort.enabled|Whether sorts should be inserted after some hashed operations to improve output ordering. This can improve output file sizes when saving to columnar formats.|false|Runtime
<a name="sql.improvedFloatOps.enabled"></a>spark.rapids.sql.improvedFloatOps.enabled|For some floating point operations spark uses one way to compute the value and the underlying cudf implementation can use an improved algorithm. In some cases this can result in cudf producing an answer when spark overflows.|true|Runtime
<a name="sql.improvedTimeOps.enabled"></a>spark.rapids.sql.improvedTimeOps.enabled|When set to true, some operators will avoid overflowing by converting epoch days directly to seconds without first converting to microseconds|false|Runtime
<a name="sql.incompatibleDateFormats.enabled"></a>spark.rapids.sql.incompatibleDateFormats.enabled|When parsing strings as dates and timestamps in functions like unix_timestamp, some formats are fully supported on the GPU and some are unsupported and will fall back to the CPU. Some formats behave differently on the GPU than the CPU. Spark on the CPU interprets date formats with unsupported trailing characters as nulls, while Spark on the GPU will parse the date with invalid trailing characters. More detail can be found at [parsing strings as dates or timestamps](../compatibility.md#parsing-strings-as-dates-or-timestamps).|false|Runtime
<a name="sql.incompatibleOps.enabled"></a>spark.rapids.sql.incompatibleOps.enabled|For operations that work, but are not 100% compatible with the Spark equivalent set if they should be enabled by default or disabled by default.|true|Runtime
<a name="sql.join.cross.enabled"></a>spark.rapids.sql.join.cross.enabled|When set to true cross joins are enabled on the GPU|true|Runtime
Expand Down
19 changes: 0 additions & 19 deletions integration_tests/src/main/python/date_time_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,16 +379,6 @@ def fun(spark):

assert_gpu_and_cpu_are_equal_collect(fun, conf=copy_and_update(parser_policy_dic, ansi_enabled_conf))

@pytest.mark.parametrize('ansi_enabled', [True, False], ids=['ANSI_ON', 'ANSI_OFF'])
@pytest.mark.parametrize('data_gen', date_n_time_gens, ids=idfn)
@allow_non_gpu(*non_utc_allow)
def test_unix_timestamp_improved(data_gen, ansi_enabled):
conf = {"spark.rapids.sql.improvedTimeOps.enabled": "true",
"spark.sql.legacy.timeParserPolicy": "CORRECTED"}
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).select(f.unix_timestamp(f.col('a'))),
copy_and_update({'spark.sql.ansi.enabled': ansi_enabled}, conf))

@pytest.mark.parametrize('ansi_enabled', [True, False], ids=['ANSI_ON', 'ANSI_OFF'])
@pytest.mark.parametrize('data_gen', date_n_time_gens, ids=idfn)
@allow_non_gpu(*non_utc_allow)
Expand All @@ -397,15 +387,6 @@ def test_unix_timestamp(data_gen, ansi_enabled):
lambda spark : unary_op_df(spark, data_gen).select(f.unix_timestamp(f.col("a"))),
{'spark.sql.ansi.enabled': ansi_enabled})

@pytest.mark.parametrize('ansi_enabled', [True, False], ids=['ANSI_ON', 'ANSI_OFF'])
@pytest.mark.parametrize('data_gen', date_n_time_gens, ids=idfn)
@allow_non_gpu(*non_utc_allow)
def test_to_unix_timestamp_improved(data_gen, ansi_enabled):
conf = {"spark.rapids.sql.improvedTimeOps.enabled": "true"}
assert_gpu_and_cpu_are_equal_collect(
lambda spark : unary_op_df(spark, data_gen).selectExpr("to_unix_timestamp(a)"),
copy_and_update({'spark.sql.ansi.enabled': ansi_enabled}, conf))

str_date_and_format_gen = [pytest.param(StringGen('[0-9]{4}/[01][0-9]'),'yyyy/MM', marks=pytest.mark.xfail(reason="cudf does no checks")),
(StringGen('[0-9]{4}/[01][12]/[0-2][1-8]'),'yyyy/MM/dd'),
(StringGen('[01][12]/[0-2][1-8]'), 'MM/dd'),
Expand Down
1 change: 0 additions & 1 deletion integration_tests/src/main/python/spark_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ def _from_scala_map(scala_map):
'spark.rapids.sql.hasExtendedYearValues': 'true',
'spark.rapids.sql.hashOptimizeSort.enabled': 'false',
'spark.rapids.sql.improvedFloatOps.enabled': 'false',
'spark.rapids.sql.improvedTimeOps.enabled': 'false',
'spark.rapids.sql.incompatibleDateFormats.enabled': 'false',
'spark.rapids.sql.incompatibleOps.enabled': 'false',
'spark.rapids.sql.mode': 'executeongpu',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1696,12 +1696,7 @@ object GpuOverrides extends Logging {
TypeSig.STRING)),
(a, conf, p, r) => new UnixTimeExprMeta[ToUnixTimestamp](a, conf, p, r) {
override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = {
if (conf.isImprovedTimestampOpsEnabled) {
// passing the already converted strf string for a little optimization
GpuToUnixTimestampImproved(lhs, rhs, sparkFormat, strfFormat)
} else {
GpuToUnixTimestamp(lhs, rhs, sparkFormat, strfFormat)
}
GpuToUnixTimestamp(lhs, rhs, sparkFormat, strfFormat)
}
}),
expr[UnixTimestamp](
Expand All @@ -1715,12 +1710,7 @@ object GpuOverrides extends Logging {
TypeSig.STRING)),
(a, conf, p, r) => new UnixTimeExprMeta[UnixTimestamp](a, conf, p, r) {
override def convertToGpu(lhs: Expression, rhs: Expression): GpuExpression = {
if (conf.isImprovedTimestampOpsEnabled) {
// passing the already converted strf string for a little optimization
GpuUnixTimestampImproved(lhs, rhs, sparkFormat, strfFormat)
} else {
GpuUnixTimestamp(lhs, rhs, sparkFormat, strfFormat)
}
GpuUnixTimestamp(lhs, rhs, sparkFormat, strfFormat)
}
}),
expr[Hour](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -667,13 +667,6 @@ object RapidsConf {

// ENABLE/DISABLE PROCESSING

val IMPROVED_TIMESTAMP_OPS =
conf("spark.rapids.sql.improvedTimeOps.enabled")
.doc("When set to true, some operators will avoid overflowing by converting epoch days " +
"directly to seconds without first converting to microseconds")
.booleanConf
.createWithDefault(false)

val SQL_ENABLED = conf("spark.rapids.sql.enabled")
.doc("Enable (true) or disable (false) sql operations on the GPU")
.commonlyUsed()
Expand Down Expand Up @@ -2378,8 +2371,6 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val shouldExplainAll: Boolean = explain.equalsIgnoreCase("ALL")

lazy val isImprovedTimestampOpsEnabled: Boolean = get(IMPROVED_TIMESTAMP_OPS)

lazy val chunkedReaderEnabled: Boolean = get(CHUNKED_READER)

lazy val maxReadBatchSizeRows: Int = get(MAX_READER_BATCH_SIZE_ROWS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -866,59 +866,6 @@ abstract class GpuToTimestamp
}
}

/**
* An improved version of GpuToTimestamp conversion which converts time to UNIX timestamp without
* first converting to microseconds
*/
abstract class GpuToTimestampImproved extends GpuToTimestamp {
import GpuToTimestamp._

override def doColumnar(lhs: GpuColumnVector, rhs: GpuScalar): ColumnVector = {
val tmp = if (lhs.dataType == StringType) {
// rhs is ignored we already parsed the format
if (getTimeParserPolicy == LegacyTimeParserPolicy) {
parseStringAsTimestampWithLegacyParserPolicy(
lhs,
sparkFormat,
strfFormat,
DType.TIMESTAMP_SECONDS,
(col, strfFormat) => col.asTimestampSeconds(strfFormat))
} else {
parseStringAsTimestamp(
lhs,
sparkFormat,
strfFormat,
DType.TIMESTAMP_SECONDS,
failOnError)
}
} else if (lhs.dataType() == DateType){
lhs.getBase.asTimestampSeconds()
} else { // Timestamp
// https://github.com/rapidsai/cudf/issues/5166
// The time is off by 1 second if the result is < 0
val longSecs = withResource(lhs.getBase.asTimestampSeconds()) { secs =>
secs.asLongs()
}
withResource(longSecs) { secs =>
val plusOne = withResource(Scalar.fromLong(1)) { one =>
secs.add(one)
}
withResource(plusOne) { plusOne =>
withResource(Scalar.fromLong(0)) { zero =>
withResource(secs.lessThan(zero)) { neg =>
neg.ifElse(plusOne, secs)
}
}
}
}
}
withResource(tmp) { r =>
// The type we are returning is a long not an actual timestamp
r.asLongs()
}
}
}

case class GpuUnixTimestamp(strTs: Expression,
format: Expression,
sparkFormat: String,
Expand Down Expand Up @@ -949,36 +896,6 @@ case class GpuToUnixTimestamp(strTs: Expression,

}

case class GpuUnixTimestampImproved(strTs: Expression,
format: Expression,
sparkFormat: String,
strf: String,
timeZoneId: Option[String] = None) extends GpuToTimestampImproved {
override def strfFormat = strf
override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = {
copy(timeZoneId = Option(timeZoneId))
}

override def left: Expression = strTs
override def right: Expression = format

}

case class GpuToUnixTimestampImproved(strTs: Expression,
format: Expression,
sparkFormat: String,
strf: String,
timeZoneId: Option[String] = None) extends GpuToTimestampImproved {
override def strfFormat = strf
override def withTimeZone(timeZoneId: String): TimeZoneAwareExpression = {
copy(timeZoneId = Option(timeZoneId))
}

override def left: Expression = strTs
override def right: Expression = format

}

case class GpuGetTimestamp(
strTs: Expression,
format: Expression,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,16 +158,6 @@ class ParseDateTimeSuite extends SparkQueryCompareTestSuite with BeforeAndAfterE
}
}

testSparkResultsAreEqual("to_unix_timestamp parse yyyy/MM (improvedTimeOps)",
timestampsAsStrings,
new SparkConf().set(SQLConf.LEGACY_TIME_PARSER_POLICY.key, "CORRECTED")
.set(RapidsConf.IMPROVED_TIMESTAMP_OPS.key, "true")) {
df => {
df.createOrReplaceTempView("df")
df.sqlContext.sql("SELECT c0, to_unix_timestamp(c0, 'yyyy/MM') FROM df")
}
}

testSparkResultsAreEqual("unix_timestamp parse timestamp",
timestampsAsStrings,
CORRECTED_TIME_PARSER_POLICY) {
Expand Down