Skip to content

Commit

Permalink
[SPARK-28241][SQL] Show metadata operations on ThriftServerTab
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

This pr add support show metadata operations on ThriftServerTab.

## How was this patch tested?

manual tests:
![image](https://user-images.githubusercontent.com/5399861/60579741-4cd2c180-9db6-11e9-822a-0433be509b67.png)

Closes apache#25043 from wangyum/SPARK-28241.

Authored-by: Yuming Wang <[email protected]>
Signed-off-by: herman <[email protected]>
  • Loading branch information
wangyum authored and hvanhovell committed Jul 5, 2019
1 parent 3663dbe commit e299f62
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.hive.thriftserver

import java.util.UUID
import java.util.regex.Pattern

import scala.collection.JavaConverters.seqAsJavaListConverter
Expand All @@ -33,6 +34,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.hive.thriftserver.ThriftserverShimUtils.toJavaSQLType
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.{Utils => SparkUtils}

/**
* Spark's own SparkGetColumnsOperation
Expand All @@ -57,15 +59,24 @@ private[hive] class SparkGetColumnsOperation(
val catalog: SessionCatalog = sqlContext.sessionState.catalog

override def runInternal(): Unit = {
val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName, tablePattern : $tableName" +
s", columnName : $columnName"
logInfo(s"GetColumnsOperation: $cmdStr")
val statementId = UUID.randomUUID().toString
// Do not change cmdStr. It's used for Hive auditing and authorization.
val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName, tablePattern : $tableName"
val logMsg = s"Listing columns '$cmdStr, columnName : $columnName'"
logInfo(s"$logMsg with $statementId")

setState(OperationState.RUNNING)
// Always use the latest class loader provided by executionHive's state.
val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader
Thread.currentThread().setContextClassLoader(executionHiveClassLoader)

HiveThriftServer2.listener.onStatementStart(
statementId,
parentSession.getSessionHandle.getSessionId.toString,
logMsg,
statementId,
parentSession.getUsername)

val schemaPattern = convertSchemaPattern(schemaName)
val tablePattern = convertIdentifierPattern(tableName, true)

Expand All @@ -80,8 +91,6 @@ private[hive] class SparkGetColumnsOperation(

if (isAuthV2Enabled) {
val privObjs = seqAsJavaListConverter(getPrivObjs(db2Tabs)).asJava
val cmdStr =
s"catalog : $catalogName, schemaPattern : $schemaName, tablePattern : $tableName"
authorizeMetaGets(HiveOperationType.GET_COLUMNS, privObjs, cmdStr)
}

Expand Down Expand Up @@ -115,8 +124,11 @@ private[hive] class SparkGetColumnsOperation(
} catch {
case e: HiveSQLException =>
setState(OperationState.ERROR)
HiveThriftServer2.listener.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
throw e
}
HiveThriftServer2.listener.onStatementFinish(statementId)
}

private def addToRowSet(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.hive.thriftserver

import java.util.UUID
import java.util.regex.Pattern

import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType
Expand All @@ -25,7 +26,9 @@ import org.apache.hive.service.cli.operation.GetSchemasOperation
import org.apache.hive.service.cli.operation.MetadataOperation.DEFAULT_HIVE_CATALOG
import org.apache.hive.service.cli.session.HiveSession

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SQLContext
import org.apache.spark.util.{Utils => SparkUtils}

/**
* Spark's own GetSchemasOperation
Expand All @@ -40,19 +43,30 @@ private[hive] class SparkGetSchemasOperation(
parentSession: HiveSession,
catalogName: String,
schemaName: String)
extends GetSchemasOperation(parentSession, catalogName, schemaName) {
extends GetSchemasOperation(parentSession, catalogName, schemaName) with Logging {

override def runInternal(): Unit = {
val statementId = UUID.randomUUID().toString
// Do not change cmdStr. It's used for Hive auditing and authorization.
val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName"
val logMsg = s"Listing databases '$cmdStr'"
logInfo(s"$logMsg with $statementId")
setState(OperationState.RUNNING)
// Always use the latest class loader provided by executionHive's state.
val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader
Thread.currentThread().setContextClassLoader(executionHiveClassLoader)

if (isAuthV2Enabled) {
val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName"
authorizeMetaGets(HiveOperationType.GET_TABLES, null, cmdStr)
}

HiveThriftServer2.listener.onStatementStart(
statementId,
parentSession.getSessionHandle.getSessionId.toString,
logMsg,
statementId,
parentSession.getUsername)

try {
val schemaPattern = convertSchemaPattern(schemaName)
sqlContext.sessionState.catalog.listDatabases(schemaPattern).foreach { dbName =>
Expand All @@ -68,7 +82,10 @@ private[hive] class SparkGetSchemasOperation(
} catch {
case e: HiveSQLException =>
setState(OperationState.ERROR)
HiveThriftServer2.listener.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
throw e
}
HiveThriftServer2.listener.onStatementFinish(statementId)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,23 @@

package org.apache.spark.sql.hive.thriftserver

import java.util.{List => JList}
import java.util.{List => JList, UUID}
import java.util.regex.Pattern

import scala.collection.JavaConverters.seqAsJavaListConverter
import scala.collection.JavaConverters._

import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType
import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObjectUtils
import org.apache.hive.service.cli._
import org.apache.hive.service.cli.operation.GetTablesOperation
import org.apache.hive.service.cli.session.HiveSession

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.catalog.CatalogTableType
import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.util.{Utils => SparkUtils}

/**
* Spark's own GetTablesOperation
Expand All @@ -50,13 +52,16 @@ private[hive] class SparkGetTablesOperation(
schemaName: String,
tableName: String,
tableTypes: JList[String])
extends GetTablesOperation(parentSession, catalogName, schemaName, tableName, tableTypes) {

if (tableTypes != null) {
this.tableTypes.addAll(tableTypes)
}
extends GetTablesOperation(parentSession, catalogName, schemaName, tableName, tableTypes)
with Logging{

override def runInternal(): Unit = {
val statementId = UUID.randomUUID().toString
// Do not change cmdStr. It's used for Hive auditing and authorization.
val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName"
val tableTypesStr = if (tableTypes == null) "null" else tableTypes.asScala.mkString(",")
val logMsg = s"Listing tables '$cmdStr, tableTypes : $tableTypesStr, tableName : $tableName'"
logInfo(s"$logMsg with $statementId")
setState(OperationState.RUNNING)
// Always use the latest class loader provided by executionHive's state.
val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader
Expand All @@ -70,35 +75,50 @@ private[hive] class SparkGetTablesOperation(
if (isAuthV2Enabled) {
val privObjs =
HivePrivilegeObjectUtils.getHivePrivDbObjects(seqAsJavaListConverter(matchingDbs).asJava)
val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName"
authorizeMetaGets(HiveOperationType.GET_TABLES, privObjs, cmdStr)
}

// Tables and views
matchingDbs.foreach { dbName =>
val tables = catalog.listTables(dbName, tablePattern, includeLocalTempViews = false)
catalog.getTablesByName(tables).foreach { table =>
val tableType = tableTypeString(table.tableType)
if (tableTypes == null || tableTypes.isEmpty || tableTypes.contains(tableType)) {
addToRowSet(table.database, table.identifier.table, tableType, table.comment)
HiveThriftServer2.listener.onStatementStart(
statementId,
parentSession.getSessionHandle.getSessionId.toString,
logMsg,
statementId,
parentSession.getUsername)

try {
// Tables and views
matchingDbs.foreach { dbName =>
val tables = catalog.listTables(dbName, tablePattern, includeLocalTempViews = false)
catalog.getTablesByName(tables).foreach { table =>
val tableType = tableTypeString(table.tableType)
if (tableTypes == null || tableTypes.isEmpty || tableTypes.contains(tableType)) {
addToRowSet(table.database, table.identifier.table, tableType, table.comment)
}
}
}
}

// Temporary views and global temporary views
if (tableTypes == null || tableTypes.isEmpty || tableTypes.contains(VIEW.name)) {
val globalTempViewDb = catalog.globalTempViewManager.database
val databasePattern = Pattern.compile(CLIServiceUtils.patternToRegex(schemaName))
val tempViews = if (databasePattern.matcher(globalTempViewDb).matches()) {
catalog.listTables(globalTempViewDb, tablePattern, includeLocalTempViews = true)
} else {
catalog.listLocalTempViews(tablePattern)
}
tempViews.foreach { view =>
addToRowSet(view.database.orNull, view.table, VIEW.name, None)
// Temporary views and global temporary views
if (tableTypes == null || tableTypes.isEmpty || tableTypes.contains(VIEW.name)) {
val globalTempViewDb = catalog.globalTempViewManager.database
val databasePattern = Pattern.compile(CLIServiceUtils.patternToRegex(schemaName))
val tempViews = if (databasePattern.matcher(globalTempViewDb).matches()) {
catalog.listTables(globalTempViewDb, tablePattern, includeLocalTempViews = true)
} else {
catalog.listLocalTempViews(tablePattern)
}
tempViews.foreach { view =>
addToRowSet(view.database.orNull, view.table, VIEW.name, None)
}
}
setState(OperationState.FINISHED)
} catch {
case e: HiveSQLException =>
setState(OperationState.ERROR)
HiveThriftServer2.listener.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
throw e
}
setState(OperationState.FINISHED)
HiveThriftServer2.listener.onStatementFinish(statementId)
}

private def addToRowSet(
Expand Down

0 comments on commit e299f62

Please sign in to comment.