Skip to content

Commit

Permalink
[SPARK-46393][SQL][FOLLOWUP] Classify exceptions in JDBCTableCatalog.…
Browse files Browse the repository at this point in the history
…loadTable and Fix UT

### What changes were proposed in this pull request?
This is a followup of #46905, to fix `some UT` on GA.

### Why are the changes needed?
Fix UT.

### Does this PR introduce _any_ user-facing change?
No.,

### How was this patch tested?
Manually test.
Pass GA

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #46912 from panbingkun/SPARK-46393_FOLLOWUP.

Lead-authored-by: panbingkun <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
panbingkun and cloud-fan committed Jun 7, 2024
1 parent d81b1e3 commit 8911d59
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 24 deletions.
5 changes: 5 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -1255,6 +1255,11 @@
"List namespaces."
]
},
"LOAD_TABLE" : {
"message" : [
"Load the table <tableName>."
]
},
"NAMESPACE_EXISTS" : {
"message" : [
"Check that the namespace <namespace> exists."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ package org.apache.spark.sql.jdbc.v2
import org.apache.logging.log4j.Level

import org.apache.spark.sql.{AnalysisException, DataFrame}
import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.analysis.{IndexAlreadyExistsException, NoSuchIndexException}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Sample, Sort}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.connector.catalog.{Catalogs, Identifier, TableCatalog}
import org.apache.spark.sql.connector.catalog.index.SupportsIndex
import org.apache.spark.sql.connector.expressions.NullOrdering
Expand Down Expand Up @@ -84,6 +83,17 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu

def testCreateTableWithProperty(tbl: String): Unit = {}

def checkErrorFailedLoadTable(e: AnalysisException, tbl: String): Unit = {
checkError(
exception = e,
errorClass = "FAILED_JDBC.UNCLASSIFIED",
parameters = Map(
"url" -> "jdbc:",
"message" -> s"Failed to load table: $tbl"
)
)
}

test("SPARK-33034: ALTER TABLE ... add new columns") {
withTable(s"$catalogName.alt_table") {
sql(s"CREATE TABLE $catalogName.alt_table (ID STRING)")
Expand Down Expand Up @@ -122,9 +132,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $catalogName.not_existing_table ADD COLUMNS (C4 STRING)")
}
checkErrorTableNotFound(e, s"`$catalogName`.`not_existing_table`",
ExpectedContext(s"$catalogName.not_existing_table", 12,
11 + s"$catalogName.not_existing_table".length))
checkErrorFailedLoadTable(e, "not_existing_table")
}

test("SPARK-33034: ALTER TABLE ... drop column") {
Expand All @@ -146,9 +154,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $catalogName.not_existing_table DROP COLUMN C1")
}
checkErrorTableNotFound(e, s"`$catalogName`.`not_existing_table`",
ExpectedContext(s"$catalogName.not_existing_table", 12,
11 + s"$catalogName.not_existing_table".length))
checkErrorFailedLoadTable(e, "not_existing_table")
}

test("SPARK-33034: ALTER TABLE ... update column type") {
Expand All @@ -164,9 +170,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $catalogName.not_existing_table ALTER COLUMN id TYPE DOUBLE")
}
checkErrorTableNotFound(e, s"`$catalogName`.`not_existing_table`",
ExpectedContext(s"$catalogName.not_existing_table", 12,
11 + s"$catalogName.not_existing_table".length))
checkErrorFailedLoadTable(e, "not_existing_table")
}

test("SPARK-33034: ALTER TABLE ... rename column") {
Expand Down Expand Up @@ -194,11 +198,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $catalogName.not_existing_table RENAME COLUMN ID TO C")
}
checkErrorTableNotFound(e,
UnresolvedAttribute.parseAttributeName(s"$catalogName.not_existing_table")
.map(part => quoteIdentifier(part)).mkString("."),
ExpectedContext(s"$catalogName.not_existing_table", 12,
11 + s"$catalogName.not_existing_table".length))
checkErrorFailedLoadTable(e, "not_existing_table")
}

test("SPARK-33034: ALTER TABLE ... update column nullability") {
Expand All @@ -209,9 +209,7 @@ private[v2] trait V2JDBCTest extends SharedSparkSession with DockerIntegrationFu
val e = intercept[AnalysisException] {
sql(s"ALTER TABLE $catalogName.not_existing_table ALTER COLUMN ID DROP NOT NULL")
}
checkErrorTableNotFound(e, s"`$catalogName`.`not_existing_table`",
ExpectedContext(s"$catalogName.not_existing_table", 12,
11 + s"$catalogName.not_existing_table".length))
checkErrorFailedLoadTable(e, "not_existing_table")
}

test("CREATE TABLE with table comment") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,13 +131,16 @@ class JDBCTableCatalog extends TableCatalog
checkNamespace(ident.namespace())
val optionsWithTableName = new JDBCOptions(
options.parameters + (JDBCOptions.JDBC_TABLE_NAME -> getTableName(ident)))
try {
JdbcUtils.classifyException(
errorClass = "FAILED_JDBC.LOAD_TABLE",
messageParameters = Map(
"url" -> options.getRedactUrl(),
"tableName" -> toSQLId(ident)),
dialect,
description = s"Failed to load table: $ident"
) {
val schema = JDBCRDD.resolveTable(optionsWithTableName)
JDBCTable(ident, schema, optionsWithTableName)
} catch {
case e: SQLException =>
logWarning("Failed to load table", e)
throw QueryCompilationErrors.noSuchTableError(ident)
}
}

Expand Down

0 comments on commit 8911d59

Please sign in to comment.