Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-33567][SQL] DSv2: Use callback instead of passing Spark session and v2 relation for refreshing cache #30491

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
}
}

private def refreshCache(r: DataSourceV2Relation)(): Unit = {
session.sharedState.cacheManager.recacheByPlan(session, r)
}

private def invalidateCache(r: ResolvedTable)(): Unit = {
val v2Relation = DataSourceV2Relation.create(r.table, Some(r.catalog), Some(r.identifier))
session.sharedState.cacheManager.uncacheQuery(session, v2Relation, cascade = true)
}

override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(project, filters,
relation @ DataSourceV2ScanRelation(_, V1ScanWrapper(scan, translated, pushed), output)) =>
Expand Down Expand Up @@ -128,7 +137,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
}

case RefreshTable(r: ResolvedTable) =>
RefreshTableExec(session, r.catalog, r.table, r.identifier) :: Nil
RefreshTableExec(r.catalog, r.identifier, invalidateCache(r)) :: Nil

case ReplaceTable(catalog, ident, schema, parts, props, orCreate) =>
val propsWithOwner = CatalogV2Util.withDefaultOwnership(props)
Expand Down Expand Up @@ -172,9 +181,9 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
case AppendData(r: DataSourceV2Relation, query, writeOptions, _) =>
r.table.asWritable match {
case v1 if v1.supports(TableCapability.V1_BATCH_WRITE) =>
AppendDataExecV1(v1, writeOptions.asOptions, query, r) :: Nil
AppendDataExecV1(v1, writeOptions.asOptions, query, refreshCache(r)) :: Nil
case v2 =>
AppendDataExec(session, v2, r, writeOptions.asOptions, planLater(query)) :: Nil
AppendDataExec(v2, writeOptions.asOptions, planLater(query), refreshCache(r)) :: Nil
}

case OverwriteByExpression(r: DataSourceV2Relation, deleteExpr, query, writeOptions, _) =>
Expand All @@ -186,15 +195,16 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
}.toArray
r.table.asWritable match {
case v1 if v1.supports(TableCapability.V1_BATCH_WRITE) =>
OverwriteByExpressionExecV1(v1, filters, writeOptions.asOptions, query, r) :: Nil
OverwriteByExpressionExecV1(v1, filters, writeOptions.asOptions,
query, refreshCache(r)) :: Nil
case v2 =>
OverwriteByExpressionExec(session, v2, r, filters,
writeOptions.asOptions, planLater(query)) :: Nil
OverwriteByExpressionExec(v2, filters,
writeOptions.asOptions, planLater(query), refreshCache(r)) :: Nil
}

case OverwritePartitionsDynamic(r: DataSourceV2Relation, query, writeOptions, _) =>
OverwritePartitionsDynamicExec(
session, r.table.asWritable, r, writeOptions.asOptions, planLater(query)) :: Nil
r.table.asWritable, writeOptions.asOptions, planLater(query), refreshCache(r)) :: Nil

case DeleteFromTable(relation, condition) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For DeleteFromTable, do we need to invalidate cache too? Doesn't this command also update table data?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes good catch! I think we should. I'll work on this in a separate PR.

relation match {
Expand Down Expand Up @@ -232,7 +242,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
throw new AnalysisException("Describing columns is not supported for v2 tables.")

case DropTable(r: ResolvedTable, ifExists, purge) =>
DropTableExec(session, r.catalog, r.table, r.identifier, ifExists, purge) :: Nil
DropTableExec(r.catalog, r.identifier, ifExists, purge, invalidateCache(r)) :: Nil

case _: NoopDropTable =>
LocalTableScanExec(Nil, Nil) :: Nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,24 @@

package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}

/**
* Physical plan node for dropping a table.
*/
case class DropTableExec(
session: SparkSession,
catalog: TableCatalog,
table: Table,
ident: Identifier,
ifExists: Boolean,
purge: Boolean) extends V2CommandExec {
purge: Boolean,
invalidateCache: () => Unit) extends V2CommandExec {

override def run(): Seq[InternalRow] = {
if (catalog.tableExists(ident)) {
val v2Relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident))
session.sharedState.cacheManager.uncacheQuery(session, v2Relation, cascade = true)
invalidateCache()
catalog.dropTable(ident, purge)
} else if (!ifExists) {
throw new NoSuchTableException(ident)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,20 @@

package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog}
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}

case class RefreshTableExec(
session: SparkSession,
catalog: TableCatalog,
table: Table,
ident: Identifier) extends V2CommandExec {
ident: Identifier,
invalidateCache: () => Unit) extends V2CommandExec {
override protected def run(): Seq[InternalRow] = {
catalog.invalidateTable(ident)

// invalidate all caches referencing the given table
// TODO(SPARK-33437): re-cache the table itself once we support caching a DSv2 table
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sunchao let's also fix this TODO in a separate PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Will do that soon.

val v2Relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident))
session.sharedState.cacheManager.uncacheQuery(session, v2Relation, cascade = true)
invalidateCache()

Seq.empty
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ case class AppendDataExecV1(
table: SupportsWrite,
writeOptions: CaseInsensitiveStringMap,
plan: LogicalPlan,
v2Relation: DataSourceV2Relation) extends V1FallbackWriters {
refreshCache: () => Unit) extends V1FallbackWriters {

override protected def run(): Seq[InternalRow] = {
writeWithV1(newWriteBuilder().buildForV1Write(), Some(v2Relation))
writeWithV1(newWriteBuilder().buildForV1Write(), refreshCache = refreshCache)
}
}

Expand All @@ -61,7 +61,7 @@ case class OverwriteByExpressionExecV1(
deleteWhere: Array[Filter],
writeOptions: CaseInsensitiveStringMap,
plan: LogicalPlan,
v2Relation: DataSourceV2Relation) extends V1FallbackWriters {
refreshCache: () => Unit) extends V1FallbackWriters {

private def isTruncate(filters: Array[Filter]): Boolean = {
filters.length == 1 && filters(0).isInstanceOf[AlwaysTrue]
Expand All @@ -70,10 +70,11 @@ case class OverwriteByExpressionExecV1(
override protected def run(): Seq[InternalRow] = {
newWriteBuilder() match {
case builder: SupportsTruncate if isTruncate(deleteWhere) =>
writeWithV1(builder.truncate().asV1Builder.buildForV1Write(), Some(v2Relation))
writeWithV1(builder.truncate().asV1Builder.buildForV1Write(), refreshCache = refreshCache)

case builder: SupportsOverwrite =>
writeWithV1(builder.overwrite(deleteWhere).asV1Builder.buildForV1Write(), Some(v2Relation))
writeWithV1(builder.overwrite(deleteWhere).asV1Builder.buildForV1Write(),
refreshCache = refreshCache)

case _ =>
throw new SparkException(s"Table does not support overwrite by expression: $table")
Expand Down Expand Up @@ -116,11 +117,11 @@ trait SupportsV1Write extends SparkPlan {

protected def writeWithV1(
relation: InsertableRelation,
v2Relation: Option[DataSourceV2Relation] = None): Seq[InternalRow] = {
refreshCache: () => Unit = () => ()): Seq[InternalRow] = {
val session = sqlContext.sparkSession
// The `plan` is already optimized, we should not analyze and optimize it again.
relation.insert(AlreadyOptimized.dataFrame(session, plan), overwrite = false)
v2Relation.foreach(r => session.sharedState.cacheManager.recacheByPlan(session, r))
refreshCache()

Nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,15 +213,14 @@ case class AtomicReplaceTableAsSelectExec(
* Rows in the output data set are appended.
*/
case class AppendDataExec(
session: SparkSession,
table: SupportsWrite,
relation: DataSourceV2Relation,
writeOptions: CaseInsensitiveStringMap,
query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper {
query: SparkPlan,
refreshCache: () => Unit) extends V2TableWriteExec with BatchWriteHelper {

override protected def run(): Seq[InternalRow] = {
val writtenRows = writeWithV2(newWriteBuilder().buildForBatch())
session.sharedState.cacheManager.recacheByPlan(session, relation)
HyukjinKwon marked this conversation as resolved.
Show resolved Hide resolved
refreshCache()
writtenRows
}
}
Expand All @@ -237,12 +236,11 @@ case class AppendDataExec(
* AlwaysTrue to delete all rows.
*/
case class OverwriteByExpressionExec(
session: SparkSession,
table: SupportsWrite,
relation: DataSourceV2Relation,
deleteWhere: Array[Filter],
writeOptions: CaseInsensitiveStringMap,
query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper {
query: SparkPlan,
refreshCache: () => Unit) extends V2TableWriteExec with BatchWriteHelper {

private def isTruncate(filters: Array[Filter]): Boolean = {
filters.length == 1 && filters(0).isInstanceOf[AlwaysTrue]
Expand All @@ -259,7 +257,7 @@ case class OverwriteByExpressionExec(
case _ =>
throw new SparkException(s"Table does not support overwrite by expression: $table")
}
session.sharedState.cacheManager.recacheByPlan(session, relation)
refreshCache()
writtenRows
}
}
Expand All @@ -275,11 +273,10 @@ case class OverwriteByExpressionExec(
* are not modified.
*/
case class OverwritePartitionsDynamicExec(
session: SparkSession,
table: SupportsWrite,
relation: DataSourceV2Relation,
writeOptions: CaseInsensitiveStringMap,
query: SparkPlan) extends V2TableWriteExec with BatchWriteHelper {
query: SparkPlan,
refreshCache: () => Unit) extends V2TableWriteExec with BatchWriteHelper {

override protected def run(): Seq[InternalRow] = {
val writtenRows = newWriteBuilder() match {
Expand All @@ -289,7 +286,7 @@ case class OverwritePartitionsDynamicExec(
case _ =>
throw new SparkException(s"Table does not support dynamic partition overwrite: $table")
}
session.sharedState.cacheManager.recacheByPlan(session, relation)
refreshCache()
writtenRows
}
}
Expand Down