Skip to content

Commit

Permalink
Throw an exception with a specific message if getPartitionsByFilter
Browse files Browse the repository at this point in the history
fails and Hive's direct SQL is enabled
  • Loading branch information
Michael Allman committed Nov 2, 2016
1 parent f5d0186 commit 4de1596
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, CatalogTablePartition, FunctionResource, FunctionResourceType}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{IntegralType, StringType}
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -586,18 +587,30 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]]
} else {
logDebug(s"Hive metastore filter is '$filter'.")
val tryDirectSqlConfVar = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL
val tryDirectSql =
hive.getConf.getBoolean(tryDirectSqlConfVar.varname, tryDirectSqlConfVar.defaultBoolVal)
try {
// Hive may throw an exception when calling this method in some circumstances, such as
// when filtering on a non-string partition column when the hive config key
// hive.metastore.try.direct.sql is false
getPartitionsByFilterMethod.invoke(hive, table, filter)
.asInstanceOf[JArrayList[Partition]]
} catch {
case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] =>
logWarning("Caught MetaException attempting to get partition metadata by filter " +
"from Hive. Falling back to fetching all partition metadata.", ex)
// Return all partitions. HiveShim clients are expected to handle this possibility
case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] &&
!tryDirectSql =>
logWarning("Caught Hive MetaException attempting to get partition metadata by " +
"filter from Hive. Falling back to fetching all partition metadata, which will " +
"degrade performance. Consider modifying your Hive metastore configuration to " +
s"set ${tryDirectSqlConfVar.varname} to true.", ex)
// HiveShim clients are expected to handle a superset of the requested partitions
getAllPartitionsMethod.invoke(hive, table).asInstanceOf[JSet[Partition]]
case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] &&
tryDirectSql =>
throw new RuntimeException("Caught Hive MetaException attempting to get partition " +
"metadata by filter from Hive. Set the Spark configuration setting " +
s"${SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key} to false and report a bug: " +
"https://issues.apache.org/jira/browse/SPARK", ex)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.hive.client

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.conf.HiveConf

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.catalog._
Expand All @@ -28,7 +29,9 @@ import org.apache.spark.sql.types.IntegerType
class HiveClientSuite extends SparkFunSuite {
private val clientBuilder = new HiveClientBuilder

test(s"getPartitionsByFilter when hive.metastore.try.direct.sql=false") {
private val tryDirectSqlKey = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname

test(s"getPartitionsByFilter returns all partitions when $tryDirectSqlKey=false") {
val testPartitionCount = 5

val storageFormat = CatalogStorageFormat(
Expand All @@ -40,7 +43,7 @@ class HiveClientSuite extends SparkFunSuite {
properties = Map.empty)

val hadoopConf = new Configuration()
hadoopConf.setBoolean("hive.metastore.try.direct.sql", false)
hadoopConf.setBoolean(tryDirectSqlKey, false)
val client = clientBuilder.buildClient(HiveUtils.hiveExecutionVersion, hadoopConf)
client.runSqlHive("CREATE TABLE test (value INT) PARTITIONED BY (part INT)")

Expand All @@ -50,11 +53,9 @@ class HiveClientSuite extends SparkFunSuite {
client.createPartitions(
"default", "test", partitions, ignoreIfExists = false)

val result = client.getPartitionsByFilter(client.getTable("default", "test"),
val filteredPartitions = client.getPartitionsByFilter(client.getTable("default", "test"),
Seq(EqualTo(AttributeReference("part", IntegerType)(), Literal(3))))

// Don't throw an exception when hive.metastore.try.direct.sql=false. Return all partitions
// instead
assert(result.size == testPartitionCount)
assert(filteredPartitions.size == testPartitionCount)
}
}

0 comments on commit 4de1596

Please sign in to comment.