Skip to content

Commit

Permalink
[SPARK-49756][SQL] Postgres dialect supports pushdown datetime functions
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
This PR propose to make Postgres dialect supports pushdown datetime functions.

### Why are the changes needed?
Currently, DS V2 pushdown framework pushed the datetime functions with in a common way. But Postgres doesn't support some datetime functions.

### Does this PR introduce _any_ user-facing change?
'No'.
This is a new feature for Postgres dialect.

### How was this patch tested?
GA.

### Was this patch authored or co-authored using generative AI tooling?
'No'.

Closes apache#48210 from beliefer/SPARK-49756.

Authored-by: beliefer <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
beliefer authored and cloud-fan committed Oct 10, 2024
1 parent e693af0 commit ab1315b
Show file tree
Hide file tree
Showing 2 changed files with 108 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,17 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT
|)
""".stripMargin
).executeUpdate()
connection.prepareStatement(
"CREATE TABLE datetime (name VARCHAR(32), date1 DATE, time1 TIMESTAMP)")
.executeUpdate()
}

override def dataPreparation(connection: Connection): Unit = {
super.dataPreparation(connection)
connection.prepareStatement("INSERT INTO datetime VALUES " +
"('amy', '2022-05-19', '2022-05-19 00:00:00')").executeUpdate()
connection.prepareStatement("INSERT INTO datetime VALUES " +
"('alex', '2022-05-18', '2022-05-18 00:00:00')").executeUpdate()
}

override def testUpdateColumnType(tbl: String): Unit = {
Expand Down Expand Up @@ -123,4 +134,77 @@ class PostgresIntegrationSuite extends DockerJDBCIntegrationV2Suite with V2JDBCT
)
}
}

override def testDatetime(tbl: String): Unit = {
val df1 = sql(s"SELECT name FROM $tbl WHERE " +
"dayofyear(date1) > 100 AND dayofmonth(date1) > 10 ")
checkFilterPushed(df1)
val rows1 = df1.collect()
assert(rows1.length === 2)
assert(rows1(0).getString(0) === "amy")
assert(rows1(1).getString(0) === "alex")

val df2 = sql(s"SELECT name FROM $tbl WHERE year(date1) = 2022 AND quarter(date1) = 2")
checkFilterPushed(df2)
val rows2 = df2.collect()
assert(rows2.length === 2)
assert(rows2(0).getString(0) === "amy")
assert(rows2(1).getString(0) === "alex")

val df3 = sql(s"SELECT name FROM $tbl WHERE second(time1) = 0 AND month(date1) = 5")
checkFilterPushed(df3)
val rows3 = df3.collect()
assert(rows3.length === 2)
assert(rows3(0).getString(0) === "amy")
assert(rows3(1).getString(0) === "alex")

val df4 = sql(s"SELECT name FROM $tbl WHERE hour(time1) = 0 AND minute(time1) = 0")
checkFilterPushed(df4)
val rows4 = df4.collect()
assert(rows4.length === 2)
assert(rows4(0).getString(0) === "amy")
assert(rows4(1).getString(0) === "alex")

val df5 = sql(s"SELECT name FROM $tbl WHERE " +
"extract(WEEk from date1) > 10 AND extract(YEAROFWEEK from date1) = 2022")
checkFilterPushed(df5)
val rows5 = df5.collect()
assert(rows5.length === 2)
assert(rows5(0).getString(0) === "amy")
assert(rows5(1).getString(0) === "alex")

val df6 = sql(s"SELECT name FROM $tbl WHERE date_add(date1, 1) = date'2022-05-20' " +
"AND datediff(date1, '2022-05-10') > 0")
checkFilterPushed(df6, false)
val rows6 = df6.collect()
assert(rows6.length === 1)
assert(rows6(0).getString(0) === "amy")

val df7 = sql(s"SELECT name FROM $tbl WHERE weekday(date1) = 2")
checkFilterPushed(df7)
val rows7 = df7.collect()
assert(rows7.length === 1)
assert(rows7(0).getString(0) === "alex")

val df8 = sql(s"SELECT name FROM $tbl WHERE dayofweek(date1) = 4")
checkFilterPushed(df8)
val rows8 = df8.collect()
assert(rows8.length === 1)
assert(rows8(0).getString(0) === "alex")

val df9 = sql(s"SELECT name FROM $tbl WHERE " +
"dayofyear(date1) > 100 order by dayofyear(date1) limit 1")
checkFilterPushed(df9)
val rows9 = df9.collect()
assert(rows9.length === 1)
assert(rows9(0).getString(0) === "alex")

// Postgres does not support
val df10 = sql(s"SELECT name FROM $tbl WHERE trunc(date1, 'week') = date'2022-05-16'")
checkFilterPushed(df10, false)
val rows10 = df10.collect()
assert(rows10.length === 2)
assert(rows10(0).getString(0) === "amy")
assert(rows10(1).getString(0) === "alex")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,15 @@ import java.util
import java.util.Locale

import scala.util.Using
import scala.util.control.NonFatal

import org.apache.spark.SparkThrowable
import org.apache.spark.internal.LogKeys.COLUMN_NAME
import org.apache.spark.internal.MDC
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NonEmptyNamespaceException, NoSuchIndexException}
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.connector.expressions.NamedReference
import org.apache.spark.sql.connector.expressions.{Expression, NamedReference}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
import org.apache.spark.sql.execution.datasources.v2.TableSampleInfo
Expand Down Expand Up @@ -300,6 +301,28 @@ private case class PostgresDialect()
}
}

class PostgresSQLBuilder extends JDBCSQLBuilder {
override def visitExtract(field: String, source: String): String = {
field match {
case "DAY_OF_YEAR" => s"EXTRACT(DOY FROM $source)"
case "YEAR_OF_WEEK" => s"EXTRACT(YEAR FROM $source)"
case "DAY_OF_WEEK" => s"EXTRACT(DOW FROM $source)"
case _ => super.visitExtract(field, source)
}
}
}

override def compileExpression(expr: Expression): Option[String] = {
val postgresSQLBuilder = new PostgresSQLBuilder()
try {
Some(postgresSQLBuilder.build(expr))
} catch {
case NonFatal(e) =>
logWarning("Error occurs while compiling V2 expression", e)
None
}
}

override def supportsLimit: Boolean = true

override def supportsOffset: Boolean = true
Expand Down

0 comments on commit ab1315b

Please sign in to comment.