diff --git a/sharing/src/main/scala/io/delta/sharing/spark/DeltaFormatSharingLimitPushDown.scala b/sharing/src/main/scala/io/delta/sharing/spark/DeltaFormatSharingLimitPushDown.scala index 589df97057a..ccc74006f0a 100644 --- a/sharing/src/main/scala/io/delta/sharing/spark/DeltaFormatSharingLimitPushDown.scala +++ b/sharing/src/main/scala/io/delta/sharing/spark/DeltaFormatSharingLimitPushDown.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.IntegerLiteral import org.apache.spark.sql.catalyst.plans.logical.{LocalLimit, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelationWithTable} // A spark rule that applies limit pushdown to DeltaSharingFileIndex, when the config is enabled. // To allow only fetching needed files from delta sharing server. @@ -38,10 +38,8 @@ object DeltaFormatSharingLimitPushDown extends Rule[LogicalPlan] { p transform { case localLimit @ LocalLimit( literalExpr @ IntegerLiteral(limit), - l @ LogicalRelation( + l @ LogicalRelationWithTable( r @ HadoopFsRelation(remoteIndex: DeltaSharingFileIndex, _, _, _, _, _), - _, - _, _ ) ) if (ConfUtils.limitPushdownEnabled(p.conf) && remoteIndex.limitHint.isEmpty) => diff --git a/spark/src/main/scala-spark-3.5/shims/LogicalRelationShims.scala b/spark/src/main/scala-spark-3.5/shims/LogicalRelationShims.scala new file mode 100644 index 00000000000..5e6b36afbfa --- /dev/null +++ b/spark/src/main/scala-spark-3.5/shims/LogicalRelationShims.scala @@ -0,0 +1,35 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.spark.sql.catalyst.catalog.CatalogTable +import org.apache.spark.sql.sources.BaseRelation + +// Handles a breaking change between Spark 3.5 and Spark Master (4.0). +// `LogicalRelationWithTable` is a new object in Spark 4.0. + +/** + * Extract the [[BaseRelation]] and [[CatalogTable]] from [[LogicalRelation]]. You can also + * retrieve the instance of LogicalRelation like following: + * + * case l @ LogicalRelationWithTable(relation, catalogTable) => ... + */ +object LogicalRelationWithTable { + def unapply(plan: LogicalRelation): Option[(BaseRelation, Option[CatalogTable])] = { + Some(plan.relation, plan.catalogTable) + } +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala index 747400590a0..0b55df7c750 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala @@ -61,8 +61,7 @@ import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTrans import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.command.CreateTableLikeCommand import org.apache.spark.sql.execution.command.RunnableCommand -import org.apache.spark.sql.execution.datasources.HadoopFsRelation -import org.apache.spark.sql.execution.datasources.LogicalRelation +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, LogicalRelationWithTable} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.execution.streaming.StreamingRelation @@ -327,8 +326,8 @@ class DeltaAnalysis(session: SparkSession) case TimeTravel(u: UnresolvedRelation, _, _, _) => u.tableNotFound(u.multipartIdentifier) - case LogicalRelation( - HadoopFsRelation(location, _, _, _, _: ParquetFileFormat, _), _, catalogTable, _) => + case LogicalRelationWithTable( + HadoopFsRelation(location, _, _, _, _: ParquetFileFormat, _), catalogTable) => val tableIdent = catalogTable.map(_.identifier) .getOrElse(TableIdentifier(location.rootPaths.head.toString, Some("parquet"))) val provider = if (catalogTable.isDefined) { @@ -836,7 +835,7 @@ class DeltaAnalysis(session: SparkSession) output = CloneTableCommand.output) // Non-delta metastore table already exists at target - case LogicalRelation(_, _, existingCatalogTable @ Some(catalogTable), _) => + case LogicalRelationWithTable(_, existingCatalogTable @ Some(catalogTable)) => val tblIdent = catalogTable.identifier val path = new Path(catalogTable.location) val newCatalogTable = createCatalogTableForCloneCommand(path, byPath = false, tblIdent, diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala index a4cd714481c..289bcb21478 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, LeafNode, LogicalPla import org.apache.spark.sql.catalyst.util.CharVarcharCodegenUtils import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.connector.expressions.{FieldReference, IdentityTransform} -import org.apache.spark.sql.execution.datasources.{FileFormat, FileIndex, HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.execution.datasources.{FileFormat, FileIndex, HadoopFsRelation, LogicalRelation, LogicalRelationWithTable} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -50,7 +50,7 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap */ object RelationFileIndex { def unapply(a: LogicalRelation): Option[FileIndex] = a match { - case LogicalRelation(hrel: HadoopFsRelation, _, _, _) => Some(hrel.location) + case LogicalRelationWithTable(hrel: HadoopFsRelation, _) => Some(hrel.location) case _ => None } } @@ -416,7 +416,7 @@ object DeltaTableUtils extends PredicateHelper target: LogicalPlan, fileIndex: FileIndex): LogicalPlan = { target transform { - case l @ LogicalRelation(hfsr: HadoopFsRelation, _, _, _) => + case l @ LogicalRelationWithTable(hfsr: HadoopFsRelation, _) => l.copy(relation = hfsr.copy(location = fileIndex)(hfsr.sparkSession)) } } @@ -454,7 +454,7 @@ object DeltaTableUtils extends PredicateHelper } target transformUp { - case l@LogicalRelation(hfsr: HadoopFsRelation, _, _, _) => + case l@LogicalRelationWithTable(hfsr: HadoopFsRelation, _) => // Prune columns from the scan. val prunedOutput = l.output.filterNot { col => columnsToDrop.exists(resolver(_, col.name)) @@ -488,7 +488,7 @@ object DeltaTableUtils extends PredicateHelper target: LogicalPlan, updatedFileFormat: FileFormat): LogicalPlan = { target transform { - case l @ LogicalRelation(hfsr: HadoopFsRelation, _, _, _) => + case l @ LogicalRelationWithTable(hfsr: HadoopFsRelation, _) => l.copy( relation = hfsr.copy(fileFormat = updatedFileFormat)(hfsr.sparkSession)) } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/GenerateRowIDs.scala b/spark/src/main/scala/org/apache/spark/sql/delta/GenerateRowIDs.scala index c9534f36149..c68287a7569 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/GenerateRowIDs.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/GenerateRowIDs.scala @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION -import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, LogicalRelation, LogicalRelationWithTable} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.types.StructType @@ -38,7 +38,7 @@ object GenerateRowIDs extends Rule[LogicalPlan] { */ private object DeltaScanWithRowTrackingEnabled { def unapply(plan: LogicalPlan): Option[LogicalRelation] = plan match { - case scan @ LogicalRelation(relation: HadoopFsRelation, _, _, _) => + case scan @ LogicalRelationWithTable(relation: HadoopFsRelation, _) => relation.fileFormat match { case format: DeltaParquetFileFormat if RowTracking.isEnabled(format.protocol, format.metadata) => Some(scan) @@ -50,7 +50,7 @@ object GenerateRowIDs extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUpWithNewOutput { case DeltaScanWithRowTrackingEnabled( - scan @ LogicalRelation(baseRelation: HadoopFsRelation, _, _, _)) => + scan @ LogicalRelationWithTable(baseRelation: HadoopFsRelation, _)) => // While Row IDs and commit versions are non-nullable, we'll use the Row ID & commit // version attributes to read the materialized values from now on, which can be null. We make // the materialized Row ID & commit version attributes nullable in the scan here. diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVs.scala b/spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVs.scala index 52f4f1d009b..6c6ae77da39 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVs.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVs.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal} import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} -import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, LogicalRelationWithTable} import org.apache.spark.sql.execution.datasources.FileFormat.METADATA_NAME import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.types.StructType @@ -59,9 +59,9 @@ trait PreprocessTableWithDVs extends SubqueryTransformerHelper { object ScanWithDeletionVectors { def unapply(a: LogicalRelation): Option[LogicalPlan] = a match { - case scan @ LogicalRelation( + case scan @ LogicalRelationWithTable( relation @ HadoopFsRelation( - index: TahoeFileIndex, _, _, _, format: DeltaParquetFileFormat, _), _, _, _) => + index: TahoeFileIndex, _, _, _, format: DeltaParquetFileFormat, _), _) => dvEnabledScanFor(scan, relation, format, index) case _ => None } diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVsStrategy.scala b/spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVsStrategy.scala index 0ba2e50f8fb..3a330816f22 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVsStrategy.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/PreprocessTableWithDVsStrategy.scala @@ -20,7 +20,7 @@ import org.apache.spark.sql.{SparkSession, Strategy} import org.apache.spark.sql.catalyst.planning.ScanOperation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.SparkPlan -import org.apache.spark.sql.execution.datasources.{FileSourceStrategy, HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.execution.datasources.{FileSourceStrategy, HadoopFsRelation, LogicalRelationWithTable} /** * Strategy to process tables with DVs and add the skip row column and filters. @@ -35,7 +35,7 @@ case class PreprocessTableWithDVsStrategy(session: SparkSession) with PreprocessTableWithDVs { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case ScanOperation(_, _, _, _ @ LogicalRelation(_: HadoopFsRelation, _, _, _)) => + case ScanOperation(_, _, _, _ @ LogicalRelationWithTable(_: HadoopFsRelation, _)) => val updatedPlan = preprocessTablesWithDVs(plan) FileSourceStrategy(updatedPlan) case _ => Nil diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/DMLWithDeletionVectorsHelper.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/DMLWithDeletionVectorsHelper.scala index caf32c9ab4b..259be11e2e2 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/DMLWithDeletionVectorsHelper.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/DMLWithDeletionVectorsHelper.scala @@ -39,7 +39,7 @@ import org.apache.spark.paths.SparkPath import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} -import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelationWithTable} import org.apache.spark.sql.execution.datasources.FileFormat.{FILE_PATH, METADATA_NAME} import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.functions.{col, lit} @@ -92,8 +92,8 @@ object DMLWithDeletionVectorsHelper extends DeltaCommand { var fileMetadataCol: AttributeReference = null val newTarget = target.transformUp { - case l @ LogicalRelation( - hfsr @ HadoopFsRelation(_, _, _, _, format: DeltaParquetFileFormat, _), _, _, _) => + case l @ LogicalRelationWithTable( + hfsr @ HadoopFsRelation(_, _, _, _, format: DeltaParquetFileFormat, _), _) => fileMetadataCol = format.createFileMetadataCol() // Take the existing schema and add additional metadata columns if (useMetadataRowIndex) { diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeltaCommand.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeltaCommand.scala index 801a569e727..c3c37ab1738 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeltaCommand.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeltaCommand.scala @@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.connector.catalog.V1Table import org.apache.spark.sql.execution.SQLExecution -import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelationWithTable} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} @@ -194,9 +194,9 @@ trait DeltaCommand extends DeltaLogging { try { resolveIdentifier(analyzer, tableIdent) match { // is path - case LogicalRelation(HadoopFsRelation(_, _, _, _, _, _), _, None, _) => false + case LogicalRelationWithTable(HadoopFsRelation(_, _, _, _, _, _), None) => false // is table - case LogicalRelation(HadoopFsRelation(_, _, _, _, _, _), _, Some(_), _) => true + case LogicalRelationWithTable(HadoopFsRelation(_, _, _, _, _, _), Some(_)) => true // is iceberg table case DataSourceV2Relation(_: IcebergTablePlaceHolder, _, _, _, _) => false // could not resolve table/db diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSinkSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSinkSuite.scala index 1578c88c120..0a2826c0ded 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSinkSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSinkSuite.scala @@ -241,7 +241,7 @@ class DeltaSinkSuite // Verify the correct partitioning schema has been inferred val hadoopFsRelations = outputDf.queryExecution.analyzed.collect { - case LogicalRelation(baseRelation, _, _, _) if + case LogicalRelationWithTable(baseRelation, _) if baseRelation.isInstanceOf[HadoopFsRelation] => baseRelation.asInstanceOf[HadoopFsRelation] } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala index 2b4f84c6916..5cf16f3a283 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaSuite.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.catalyst.expressions.InSet import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral import org.apache.spark.sql.catalyst.plans.logical.Filter import org.apache.spark.sql.execution.FileSourceScanExec -import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelationWithTable} import org.apache.spark.sql.execution.streaming.MemoryStream import org.apache.spark.sql.functions.{asc, col, expr, lit, map_values, struct} import org.apache.spark.sql.internal.SQLConf @@ -215,9 +215,7 @@ class DeltaSuite extends QueryTest // Verify the correct partitioning schema is picked up val hadoopFsRelations = df.queryExecution.analyzed.collect { - case LogicalRelation(baseRelation, _, _, _) if - baseRelation.isInstanceOf[HadoopFsRelation] => - baseRelation.asInstanceOf[HadoopFsRelation] + case LogicalRelationWithTable(h: HadoopFsRelation, _) => h } assert(hadoopFsRelations.size === 1) assert(hadoopFsRelations.head.partitionSchema.exists(_.name == "is_odd"))