Skip to content

Commit

Permalink
Fix replace table, and add tests for DDLParserSuite.scala
Browse files Browse the repository at this point in the history
  • Loading branch information
imback82 committed Nov 3, 2023
1 parent 4237193 commit 6872414
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,6 @@ case class CatalogTablePartition(
case class ClusterBySpec(columnNames: Seq[UnresolvedAttribute]) {
override def toString: String = columnNames.map(_.name).mkString(",")

// TODO: For SHOW CREATE TABLE
lazy val toDDL: String = if (columnNames.nonEmpty) s"CLUSTER BY ($toString)" else ""
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3984,6 +3984,7 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
* [OPTIONS table_property_list]
* [PARTITIONED BY (partition_fields)]
* [CLUSTER BY (col_name, col_name, ...)]
* [CLUSTER BY (col_name, col_name, ...)]
* [CLUSTERED BY (col_name, col_name, ...)
* [SORTED BY (col_name [ASC|DESC], ...)]
* INTO num_buckets BUCKETS
Expand Down Expand Up @@ -4020,7 +4021,10 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging {
}

val partitioning =
partitionExpressions(partTransforms, partCols, ctx) ++ bucketSpec.map(_.asTransform)
partitionExpressions(partTransforms, partCols, ctx) ++
bucketSpec.map(_.asTransform) ++
clusterBySpec.map(_.asTransform)

val tableSpec = UnresolvedTableSpec(properties, provider, options, location, comment,
serdeInfo, external = false)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.{EqualTo, Hex, Literal}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.{GeneratedColumn, ResolveDefaultColumns}
import org.apache.spark.sql.connector.catalog.TableChange.ColumnPosition.{after, first}
import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform}
import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, ClusterByTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform}
import org.apache.spark.sql.connector.expressions.LogicalExpressions.bucket
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{Decimal, IntegerType, LongType, MetadataBuilder, StringType, StructType, TimestampType}
Expand Down Expand Up @@ -190,6 +190,50 @@ class DDLParserSuite extends AnalysisTest {
}
}

test("create/replace table - with cluster by") {
// Testing cluster by single part and multipart name.
Seq(
("a INT, b STRING, ts TIMESTAMP",
"a, b",
new StructType()
.add("a", IntegerType)
.add("b", StringType)
.add("ts", TimestampType),
ClusterByTransform(Seq(FieldReference("a"), FieldReference("b")))),
("a STRUCT<b INT, c STRING>, ts TIMESTAMP",
"a.b, ts",
new StructType()
.add("a",
new StructType()
.add("b", IntegerType)
.add("c", StringType))
.add("ts", TimestampType),
ClusterByTransform(Seq(FieldReference(Seq("a", "b")), FieldReference("ts"))))
).foreach { case (columns, clusteringColumns, schema, clusterByTransform) =>
val createSql =
s"""CREATE TABLE my_tab ($columns) USING parquet
|CLUSTER BY ($clusteringColumns)
|""".stripMargin
val replaceSql =
s"""REPLACE TABLE my_tab ($columns) USING parquet
|CLUSTER BY ($clusteringColumns)
|""".stripMargin
val expectedTableSpec = TableSpec(
Seq("my_tab"),
Some(schema),
Seq(clusterByTransform),
Map.empty[String, String],
Some("parquet"),
OptionList(Seq.empty),
None,
None,
None)
Seq(createSql, replaceSql).foreach { sql =>
testCreateOrReplaceDdl(sql, expectedTableSpec, expectedIfNotExists = false)
}
}
}

test("create/replace table - with comment") {
val createSql = "CREATE TABLE my_tab(a INT, b STRING) USING parquet COMMENT 'abc'"
val replaceSql = "REPLACE TABLE my_tab(a INT, b STRING) USING parquet COMMENT 'abc'"
Expand Down Expand Up @@ -859,6 +903,26 @@ class DDLParserSuite extends AnalysisTest {
fragment = sql18,
start = 0,
stop = 86))

val sql19 = createTableHeader("CLUSTER BY (a)")
checkError(
exception = parseException(sql19),
errorClass = "DUPLICATE_CLAUSES",
parameters = Map("clauseName" -> "CLUSTER BY"),
context = ExpectedContext(
fragment = sql19,
start = 0,
stop = 65))

val sql20 = replaceTableHeader("CLUSTER BY (a)")
checkError(
exception = parseException(sql20),
errorClass = "DUPLICATE_CLAUSES",
parameters = Map("clauseName" -> "CLUSTER BY"),
context = ExpectedContext(
fragment = sql20,
start = 0,
stop = 66))
}

test("support for other types in OPTIONS") {
Expand Down Expand Up @@ -2896,6 +2960,50 @@ class DDLParserSuite extends AnalysisTest {
)
}

test("create table cluster by with bucket") {
val sql1 = "CREATE TABLE my_tab(a INT, b STRING) " +
"USING parquet CLUSTERED BY (a) INTO 2 BUCKETS CLUSTER BY (a)"
checkError(
exception = parseException(sql1),
errorClass = "SPECIFY_CLUSTER_BY_WITH_BUCKETING_IS_NOT_ALLOWED",
parameters = Map.empty,
context = ExpectedContext(fragment = sql1, start = 0, stop = 96)
)
}

test("replace table cluster by with bucket") {
val sql1 = "REPLACE TABLE my_tab(a INT, b STRING) " +
"USING parquet CLUSTERED BY (a) INTO 2 BUCKETS CLUSTER BY (a)"
checkError(
exception = parseException(sql1),
errorClass = "SPECIFY_CLUSTER_BY_WITH_BUCKETING_IS_NOT_ALLOWED",
parameters = Map.empty,
context = ExpectedContext(fragment = sql1, start = 0, stop = 97)
)
}

test("create table cluster by with partitioned by") {
val sql1 = "CREATE TABLE my_tab(a INT, b STRING) " +
"USING parquet CLUSTER BY (a) PARTITIONED BY (a)"
checkError(
exception = parseException(sql1),
errorClass = "SPECIFY_CLUSTER_BY_WITH_PARTITIONED_BY_IS_NOT_ALLOWED",
parameters = Map.empty,
context = ExpectedContext(fragment = sql1, start = 0, stop = 83)
)
}

test("replace table cluster by with partitioned by") {
val sql1 = "REPLACE TABLE my_tab(a INT, b STRING) " +
"USING parquet CLUSTER BY (a) PARTITIONED BY (a)"
checkError(
exception = parseException(sql1),
errorClass = "SPECIFY_CLUSTER_BY_WITH_PARTITIONED_BY_IS_NOT_ALLOWED",
parameters = Map.empty,
context = ExpectedContext(fragment = sql1, start = 0, stop = 84)
)
}

test("AstBuilder don't support `INSERT OVERWRITE DIRECTORY`") {
val insertDirSql =
s"""
Expand Down

0 comments on commit 6872414

Please sign in to comment.