Skip to content

Commit

Permalink
make caseSensitive configurable in Analyzer
Browse files Browse the repository at this point in the history
  • Loading branch information
jackylk committed Dec 20, 2014
2 parents c25c669 + f57f15c commit 91b1b96
Show file tree
Hide file tree
Showing 14 changed files with 243 additions and 29 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst

import scala.collection.immutable

private[spark] object CatalystConf{
val CASE_SENSITIVE = "spark.sql.caseSensitive"
}

private[spark] trait CatalystConf {
def setConf(key: String, value: String) : Unit
def getConf(key: String) : String
def getConf(key: String, defaultValue: String) : String
def getAllConfs: immutable.Map[String, String]
}

/**
* A trivial conf that is empty. Used for testing when all
* relations are already filled in and the analyser needs only to resolve attribute references.
*/
object EmptyConf extends CatalystConf {
def setConf(key: String, value: String) : Unit = {
throw new UnsupportedOperationException
}

def getConf(key: String) : String = {
throw new UnsupportedOperationException
}

def getConf(key: String, defaultValue: String) : String = {
throw new UnsupportedOperationException
}

def getAllConfs: immutable.Map[String, String] = {
throw new UnsupportedOperationException
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.catalyst.types.StructType
import org.apache.spark.sql.catalyst.types.IntegerType
import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.catalyst.EmptyConf

/**
* A trivial [[Analyzer]] with an [[EmptyCatalog]] and [[EmptyFunctionRegistry]]. Used for testing
* when all relations are already filled in and the analyser needs only to resolve attribute
* references.
*/
object SimpleAnalyzer extends Analyzer(EmptyCatalog, EmptyFunctionRegistry, true)
object SimpleAnalyzer extends Analyzer(EmptyCatalog, EmptyFunctionRegistry, EmptyConf)

/**
* Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and
Expand All @@ -39,11 +41,15 @@ object SimpleAnalyzer extends Analyzer(EmptyCatalog, EmptyFunctionRegistry, true
*/
class Analyzer(catalog: Catalog,
registry: FunctionRegistry,
caseSensitive: Boolean,
conf: CatalystConf,
maxIterations: Int = 100)
extends RuleExecutor[LogicalPlan] with HiveTypeCoercion {

val resolver = if (caseSensitive) caseSensitiveResolution else caseInsensitiveResolution
val resolver = if (conf.getConf(CatalystConf.CASE_SENSITIVE, "true").toBoolean) {
caseSensitiveResolution
} else {
caseInsensitiveResolution
}

val fixedPoint = FixedPoint(maxIterations)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ package org.apache.spark.sql.catalyst.analysis
import scala.collection.mutable

import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery}
import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.catalyst.EmptyConf

/**
* An interface for looking up relations by name. Used by an [[Analyzer]].
*/
trait Catalog {

def caseSensitive: Boolean
val conf: CatalystConf

def tableExists(db: Option[String], tableName: String): Boolean

Expand All @@ -44,7 +46,7 @@ trait Catalog {
protected def processDatabaseAndTableName(
databaseName: Option[String],
tableName: String): (Option[String], String) = {
if (!caseSensitive) {
if (!conf.getConf(CatalystConf.CASE_SENSITIVE, "true").toBoolean) {
(databaseName.map(_.toLowerCase), tableName.toLowerCase)
} else {
(databaseName, tableName)
Expand All @@ -54,15 +56,15 @@ trait Catalog {
protected def processDatabaseAndTableName(
databaseName: String,
tableName: String): (String, String) = {
if (!caseSensitive) {
if (!conf.getConf(CatalystConf.CASE_SENSITIVE, "true").toBoolean) {
(databaseName.toLowerCase, tableName.toLowerCase)
} else {
(databaseName, tableName)
}
}
}

class SimpleCatalog(val caseSensitive: Boolean) extends Catalog {
class SimpleCatalog(val conf: CatalystConf) extends Catalog {
val tables = new mutable.HashMap[String, LogicalPlan]()

override def registerTable(
Expand Down Expand Up @@ -165,7 +167,7 @@ trait OverrideCatalog extends Catalog {
*/
object EmptyCatalog extends Catalog {

val caseSensitive: Boolean = true
override val conf: CatalystConf = EmptyConf

def tableExists(db: Option[String], tableName: String): Boolean = {
throw new UnsupportedOperationException
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.test

import org.apache.spark.sql.catalyst.CatalystConf

import scala.collection.immutable
import scala.collection.mutable

/** A CatalystConf that can be used for local testing. */
class SimpleConf extends CatalystConf{
val map = mutable.Map[String, String]()

def setConf(key: String, value: String) : Unit = {
map.put(key, value)
}
def getConf(key: String) : String ={
map.get(key).get
}
def getConf(key: String, defaultValue: String) : String = {
map.getOrElse(key, defaultValue)
}
def getAllConfs: immutable.Map[String, String] = {
map.toMap
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,23 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference}
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.catalyst.test.SimpleConf

import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._

class AnalysisSuite extends FunSuite with BeforeAndAfter {
val caseSensitiveCatalog = new SimpleCatalog(true)
val caseInsensitiveCatalog = new SimpleCatalog(false)
val caseSensitiveConf = new SimpleConf()
caseSensitiveConf.setConf(CatalystConf.CASE_SENSITIVE, "true")
val caseInsensitiveConf = new SimpleConf()
caseInsensitiveConf.setConf(CatalystConf.CASE_SENSITIVE, "false")
val caseSensitiveCatalog = new SimpleCatalog(caseSensitiveConf)
val caseInsensitiveCatalog = new SimpleCatalog(caseInsensitiveConf)
val caseSensitiveAnalyze =
new Analyzer(caseSensitiveCatalog, EmptyFunctionRegistry, caseSensitive = true)
new Analyzer(caseSensitiveCatalog, EmptyFunctionRegistry, caseSensitiveConf)
val caseInsensitiveAnalyze =
new Analyzer(caseInsensitiveCatalog, EmptyFunctionRegistry, caseSensitive = false)
new Analyzer(caseInsensitiveCatalog, EmptyFunctionRegistry, caseInsensitiveConf)

val testRelation = LocalRelation(AttributeReference("a", IntegerType, nullable = true)())
val testRelation2 = LocalRelation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{Project, LocalRelation}
import org.apache.spark.sql.catalyst.types._
import org.apache.spark.sql.catalyst.test.SimpleConf
import org.scalatest.{BeforeAndAfter, FunSuite}

class DecimalPrecisionSuite extends FunSuite with BeforeAndAfter {
val catalog = new SimpleCatalog(false)
val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, caseSensitive = false)
val conf = new SimpleConf
val catalog = new SimpleCatalog(conf)
val analyzer = new Analyzer(catalog, EmptyFunctionRegistry, conf)

val relation = LocalRelation(
AttributeReference("i", IntegerType)(),
Expand Down
5 changes: 3 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql

import org.apache.spark.sql.catalyst.CatalystConf

import scala.collection.immutable
import scala.collection.JavaConversions._

Expand Down Expand Up @@ -61,7 +63,7 @@ private[spark] object SQLConf {
*
* SQLConf is thread-safe (internally synchronized, so safe to be used in multiple threads).
*/
private[sql] trait SQLConf {
private[sql] trait SQLConf extends CatalystConf {
import SQLConf._

/** Only low degree of contention is expected for conf, thus NOT using ConcurrentHashMap. */
Expand Down Expand Up @@ -196,4 +198,3 @@ private[sql] trait SQLConf {
settings.clear()
}
}

4 changes: 2 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,14 @@ class SQLContext(@transient val sparkContext: SparkContext)
self =>

@transient
protected[sql] lazy val catalog: Catalog = new SimpleCatalog(true)
protected[sql] lazy val catalog: Catalog = new SimpleCatalog(this)

@transient
protected[sql] lazy val functionRegistry: FunctionRegistry = new SimpleFunctionRegistry

@transient
protected[sql] lazy val analyzer: Analyzer =
new Analyzer(catalog, functionRegistry, caseSensitive = true)
new Analyzer(catalog, functionRegistry, this)

@transient
protected[sql] lazy val optimizer: Optimizer = DefaultOptimizer
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.test

import org.apache.spark.sql.{SQLConf, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.catalyst.CatalystConf

/** A case insensitive SQLContext that can be used for local testing. */
object TestCaseInsensitiveSQLContext
extends SQLContext(
new SparkContext(
"local[2]",
"CaseInsensitiveSQLContext",
new SparkConf())) {

this.setConf(CatalystConf.CASE_SENSITIVE, "false")

/** Fewer partitions to speed up testing. */
override private[spark] def numShufflePartitions: Int =
getConf(SQLConf.SHUFFLE_PARTITIONS, "5").toInt
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql

import java.util.TimeZone

import org.apache.spark.sql.test.TestCaseInsensitiveSQLContext
import org.scalatest.BeforeAndAfterAll
import org.apache.spark.sql.catalyst.CatalystConf

/* Implicits */

import org.apache.spark.sql.test.TestCaseInsensitiveSQLContext._

object CaseInsensitiveTestData{
case class StringData(s: String)
val table = TestCaseInsensitiveSQLContext.sparkContext.parallelize(StringData("test") :: Nil)
table.registerTempTable("caseInsensitiveTable")
}

class SQLQueryCaseInsensitiveSuite extends QueryTest with BeforeAndAfterAll {
CaseInsensitiveTestData

var origZone: TimeZone = _

override protected def beforeAll() {
origZone = TimeZone.getDefault
TimeZone.setDefault(TimeZone.getTimeZone("UTC"))
}

override protected def afterAll() {
TimeZone.setDefault(origZone)
}

test("SPARK-4699 case sensitivity SQL query") {
setConf(CatalystConf.CASE_SENSITIVE, "false")
checkAnswer(sql("SELECT S FROM CASEINSENSITIVETABLE"), "test")
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,15 @@
package org.apache.spark.sql.sources

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.Analyzer
import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.test.TestSQLContext
import org.scalatest.BeforeAndAfter

abstract class DataSourceTest extends QueryTest with BeforeAndAfter {
// Case sensitivity is not configurable yet, but we want to test some edge cases.
// TODO: Remove when it is configurable
implicit val caseInsensisitiveContext = new SQLContext(TestSQLContext.sparkContext) {
@transient
override protected[sql] lazy val analyzer: Analyzer =
new Analyzer(catalog, functionRegistry, caseSensitive = false)
}

// We want to test some edge cases.
implicit val caseInsensisitiveContext = new SQLContext(TestSQLContext.sparkContext)

caseInsensisitiveContext.setConf(CatalystConf.CASE_SENSITIVE, "false")
}

Loading

0 comments on commit 91b1b96

Please sign in to comment.