diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala new file mode 100644 index 0000000000000..2572c010a16fa --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst + +import scala.collection.immutable + +private[spark] object CatalystConf{ + val CASE_SENSITIVE = "spark.sql.caseSensitive" +} + +private[spark] trait CatalystConf { + def setConf(key: String, value: String) : Unit + def getConf(key: String) : String + def getConf(key: String, defaultValue: String) : String + def getAllConfs: immutable.Map[String, String] +} + +/** + * A trivial conf that is empty. Used for testing when all + * relations are already filled in and the analyser needs only to resolve attribute references. + */ +object EmptyConf extends CatalystConf { + def setConf(key: String, value: String) : Unit = { + throw new UnsupportedOperationException + } + + def getConf(key: String) : String = { + throw new UnsupportedOperationException + } + + def getConf(key: String, defaultValue: String) : String = { + throw new UnsupportedOperationException + } + + def getAllConfs: immutable.Map[String, String] = { + throw new UnsupportedOperationException + } +} \ No newline at end of file diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 1c4088b8438e1..8407349295dd2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -24,13 +24,15 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.types.StructType import org.apache.spark.sql.catalyst.types.IntegerType +import org.apache.spark.sql.catalyst.CatalystConf +import org.apache.spark.sql.catalyst.EmptyConf /** * A trivial [[Analyzer]] with an [[EmptyCatalog]] and [[EmptyFunctionRegistry]]. Used for testing * when all relations are already filled in and the analyser needs only to resolve attribute * references. */ -object SimpleAnalyzer extends Analyzer(EmptyCatalog, EmptyFunctionRegistry, true) +object SimpleAnalyzer extends Analyzer(EmptyCatalog, EmptyFunctionRegistry, EmptyConf) /** * Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and @@ -39,11 +41,15 @@ object SimpleAnalyzer extends Analyzer(EmptyCatalog, EmptyFunctionRegistry, true */ class Analyzer(catalog: Catalog, registry: FunctionRegistry, - caseSensitive: Boolean, + conf: CatalystConf, maxIterations: Int = 100) extends RuleExecutor[LogicalPlan] with HiveTypeCoercion { - val resolver = if (caseSensitive) caseSensitiveResolution else caseInsensitiveResolution + val resolver = if (conf.getConf(CatalystConf.CASE_SENSITIVE, "true").toBoolean) { + caseSensitiveResolution + } else { + caseInsensitiveResolution + } val fixedPoint = FixedPoint(maxIterations) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala index 0415d74bd8141..83f3dcf857919 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala @@ -20,13 +20,15 @@ package org.apache.spark.sql.catalyst.analysis import scala.collection.mutable import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery} +import org.apache.spark.sql.catalyst.CatalystConf +import org.apache.spark.sql.catalyst.EmptyConf /** * An interface for looking up relations by name. Used by an [[Analyzer]]. */ trait Catalog { - def caseSensitive: Boolean + val conf: CatalystConf def tableExists(db: Option[String], tableName: String): Boolean @@ -44,7 +46,7 @@ trait Catalog { protected def processDatabaseAndTableName( databaseName: Option[String], tableName: String): (Option[String], String) = { - if (!caseSensitive) { + if (!conf.getConf(CatalystConf.CASE_SENSITIVE, "true").toBoolean) { (databaseName.map(_.toLowerCase), tableName.toLowerCase) } else { (databaseName, tableName) @@ -54,7 +56,7 @@ trait Catalog { protected def processDatabaseAndTableName( databaseName: String, tableName: String): (String, String) = { - if (!caseSensitive) { + if (!conf.getConf(CatalystConf.CASE_SENSITIVE, "true").toBoolean) { (databaseName.toLowerCase, tableName.toLowerCase) } else { (databaseName, tableName) @@ -62,7 +64,7 @@ trait Catalog { } } -class SimpleCatalog(val caseSensitive: Boolean) extends Catalog { +class SimpleCatalog(val conf: CatalystConf) extends Catalog { val tables = new mutable.HashMap[String, LogicalPlan]() override def registerTable( @@ -165,7 +167,7 @@ trait OverrideCatalog extends Catalog { */ object EmptyCatalog extends Catalog { - val caseSensitive: Boolean = true + override val conf: CatalystConf = EmptyConf def tableExists(db: Option[String], tableName: String): Boolean = { throw new UnsupportedOperationException diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/test/SimpleConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/test/SimpleConf.scala new file mode 100644 index 0000000000000..1f6675f89190d --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/test/SimpleConf.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.test + +import org.apache.spark.sql.catalyst.CatalystConf + +import scala.collection.immutable +import scala.collection.mutable + +/** A CatalystConf that can be used for local testing. */ +class SimpleConf extends CatalystConf{ + val map = mutable.Map[String, String]() + + def setConf(key: String, value: String) : Unit = { + map.put(key, value) + } + def getConf(key: String) : String ={ + map.get(key).get + } + def getConf(key: String, defaultValue: String) : String = { + map.getOrElse(key, defaultValue) + } + def getAllConfs: immutable.Map[String, String] = { + map.toMap + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index 82f2101d8ce17..25e5c20fa3a5a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -23,17 +23,23 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference} import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.catalyst.CatalystConf +import org.apache.spark.sql.catalyst.test.SimpleConf import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ class AnalysisSuite extends FunSuite with BeforeAndAfter { - val caseSensitiveCatalog = new SimpleCatalog(true) - val caseInsensitiveCatalog = new SimpleCatalog(false) + val caseSensitiveConf = new SimpleConf() + caseSensitiveConf.setConf(CatalystConf.CASE_SENSITIVE, "true") + val caseInsensitiveConf = new SimpleConf() + caseInsensitiveConf.setConf(CatalystConf.CASE_SENSITIVE, "false") + val caseSensitiveCatalog = new SimpleCatalog(caseSensitiveConf) + val caseInsensitiveCatalog = new SimpleCatalog(caseInsensitiveConf) val caseSensitiveAnalyze = - new Analyzer(caseSensitiveCatalog, EmptyFunctionRegistry, caseSensitive = true) + new Analyzer(caseSensitiveCatalog, EmptyFunctionRegistry, caseSensitiveConf) val caseInsensitiveAnalyze = - new Analyzer(caseInsensitiveCatalog, EmptyFunctionRegistry, caseSensitive = false) + new Analyzer(caseInsensitiveCatalog, EmptyFunctionRegistry, caseInsensitiveConf) val testRelation = LocalRelation(AttributeReference("a", IntegerType, nullable = true)()) val testRelation2 = LocalRelation( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala index d5b7d2789a103..12c409625cac3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DecimalPrecisionSuite.scala @@ -20,11 +20,13 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.{Project, LocalRelation} import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.catalyst.test.SimpleConf import org.scalatest.{BeforeAndAfter, FunSuite} class DecimalPrecisionSuite extends FunSuite with BeforeAndAfter { - val catalog = new SimpleCatalog(false) - val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, caseSensitive = false) + val conf = new SimpleConf + val catalog = new SimpleCatalog(conf) + val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf) val relation = LocalRelation( AttributeReference("i", IntegerType)(), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index f5bf935522dad..2a9b6f7064d9c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql +import org.apache.spark.sql.catalyst.CatalystConf + import scala.collection.immutable import scala.collection.JavaConversions._ @@ -61,7 +63,7 @@ private[spark] object SQLConf { * * SQLConf is thread-safe (internally synchronized, so safe to be used in multiple threads). */ -private[sql] trait SQLConf { +private[sql] trait SQLConf extends CatalystConf { import SQLConf._ /** Only low degree of contention is expected for conf, thus NOT using ConcurrentHashMap. */ @@ -196,4 +198,3 @@ private[sql] trait SQLConf { settings.clear() } } - diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 7a13302229012..934c0a17ebe95 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -58,14 +58,14 @@ class SQLContext(@transient val sparkContext: SparkContext) self => @transient - protected[sql] lazy val catalog: Catalog = new SimpleCatalog(true) + protected[sql] lazy val catalog: Catalog = new SimpleCatalog(this) @transient protected[sql] lazy val functionRegistry: FunctionRegistry = new SimpleFunctionRegistry @transient protected[sql] lazy val analyzer: Analyzer = - new Analyzer(catalog, functionRegistry, caseSensitive = true) + new Analyzer(catalog, functionRegistry, this) @transient protected[sql] lazy val optimizer: Optimizer = DefaultOptimizer diff --git a/sql/core/src/main/scala/org/apache/spark/sql/test/TestCaseInsensitiveSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/test/TestCaseInsensitiveSQLContext.scala new file mode 100644 index 0000000000000..806b73fc549d4 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/test/TestCaseInsensitiveSQLContext.scala @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.test + +import org.apache.spark.sql.{SQLConf, SQLContext} +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.sql.catalyst.CatalystConf + +/** A case insensitive SQLContext that can be used for local testing. */ +object TestCaseInsensitiveSQLContext + extends SQLContext( + new SparkContext( + "local[2]", + "CaseInsensitiveSQLContext", + new SparkConf())) { + + this.setConf(CatalystConf.CASE_SENSITIVE, "false") + + /** Fewer partitions to speed up testing. */ + override private[spark] def numShufflePartitions: Int = + getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt +} + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryCaseInsensitiveSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryCaseInsensitiveSuite.scala new file mode 100644 index 0000000000000..c21a5d9ca60d0 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryCaseInsensitiveSuite.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import java.util.TimeZone + +import org.apache.spark.sql.test.TestCaseInsensitiveSQLContext +import org.scalatest.BeforeAndAfterAll +import org.apache.spark.sql.catalyst.CatalystConf + +/* Implicits */ + +import org.apache.spark.sql.test.TestCaseInsensitiveSQLContext._ + +object CaseInsensitiveTestData{ + case class StringData(s: String) + val table = TestCaseInsensitiveSQLContext.sparkContext.parallelize(StringData("test") :: Nil) + table.registerTempTable("caseInsensitiveTable") +} + +class SQLQueryCaseInsensitiveSuite extends QueryTest with BeforeAndAfterAll { + CaseInsensitiveTestData + + var origZone: TimeZone = _ + + override protected def beforeAll() { + origZone = TimeZone.getDefault + TimeZone.setDefault(TimeZone.getTimeZone("UTC")) + } + + override protected def afterAll() { + TimeZone.setDefault(origZone) + } + + test("SPARK-4699 case sensitivity SQL query") { + setConf(CatalystConf.CASE_SENSITIVE, "false") + checkAnswer(sql("SELECT S FROM CASEINSENSITIVETABLE"), "test") + } + +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala index 9626252e742e5..d5291cd5af251 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/DataSourceTest.scala @@ -18,17 +18,15 @@ package org.apache.spark.sql.sources import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.analysis.Analyzer +import org.apache.spark.sql.catalyst.CatalystConf import org.apache.spark.sql.test.TestSQLContext import org.scalatest.BeforeAndAfter abstract class DataSourceTest extends QueryTest with BeforeAndAfter { - // Case sensitivity is not configurable yet, but we want to test some edge cases. - // TODO: Remove when it is configurable - implicit val caseInsensisitiveContext = new SQLContext(TestSQLContext.sparkContext) { - @transient - override protected[sql] lazy val analyzer: Analyzer = - new Analyzer(catalog, functionRegistry, caseSensitive = false) - } + + // We want to test some edge cases. + implicit val caseInsensisitiveContext = new SQLContext(TestSQLContext.sparkContext) + + caseInsensisitiveContext.setConf(CatalystConf.CASE_SENSITIVE, "false") } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 56fe27a77b838..698bbf9a89af5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateAnalysisOperat import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.types.DecimalType import org.apache.spark.sql.catalyst.types.decimal.Decimal +import org.apache.spark.sql.catalyst.CatalystConf import org.apache.spark.sql.execution.{SparkPlan, ExecutedCommand, ExtractPythonUdfs, QueryExecutionException} import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DescribeHiveTableCommand} import org.apache.spark.sql.sources.DataSourceStrategy @@ -248,9 +249,12 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { runSqlHive(s"SET $key=$value") } + /* By default it should be case insensitive to match Hive */ + this.setConf(CatalystConf.CASE_SENSITIVE, "false") + /* A catalyst metadata catalog that points to the Hive Metastore. */ @transient - override protected[sql] lazy val catalog = new HiveMetastoreCatalog(this) with OverrideCatalog + override protected[sql] lazy val catalog = new HiveMetastoreCatalog(this, this) with OverrideCatalog // Note that HiveUDFs will be overridden by functions registered in this context. @transient @@ -260,7 +264,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) { /* An analyzer that uses the Hive metastore. */ @transient override protected[sql] lazy val analyzer = - new Analyzer(catalog, functionRegistry, caseSensitive = false) { + new Analyzer(catalog, functionRegistry, this) { override val extendedRules = catalog.CreateTables :: catalog.PreInsertionCasts :: diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index b31a3ec25096b..cdc88b07dd858 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -44,12 +44,14 @@ import org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.types._ +import org.apache.spark.sql.catalyst.CatalystConf import org.apache.spark.util.Utils /* Implicit conversions */ import scala.collection.JavaConversions._ -private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging { +private[hive] class HiveMetastoreCatalog(hive: HiveContext, val conf: CatalystConf) + extends Catalog with Logging { import org.apache.spark.sql.hive.HiveMetastoreTypes._ /** Connection to hive metastore. Usages should lock on `this`. */ diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index f57f31af15566..fe9fcd9dbd059 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -195,4 +195,10 @@ class SQLQuerySuite extends QueryTest { checkAnswer(sql("SELECT sum( distinct key) FROM src group by key order by key"), sql("SELECT distinct key FROM src order by key").collect().toSeq) } + + test("SPARK-4699 HiveContext should be case insensitive by default") { + checkAnswer( + sql("SELECT KEY FROM Src ORDER BY value"), + sql("SELECT key FROM src ORDER BY value").collect().toSeq) + } }