From e299f62ec55a0c65574a6dbb1a2070cb6a562061 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Fri, 5 Jul 2019 12:24:50 +0200 Subject: [PATCH] [SPARK-28241][SQL] Show metadata operations on ThriftServerTab ## What changes were proposed in this pull request? This pr add support show metadata operations on ThriftServerTab. ## How was this patch tested? manual tests: ![image](https://user-images.githubusercontent.com/5399861/60579741-4cd2c180-9db6-11e9-822a-0433be509b67.png) Closes #25043 from wangyum/SPARK-28241. Authored-by: Yuming Wang Signed-off-by: herman --- .../SparkGetColumnsOperation.scala | 22 ++++-- .../SparkGetSchemasOperation.scala | 21 ++++- .../SparkGetTablesOperation.scala | 76 ++++++++++++------- 3 files changed, 84 insertions(+), 35 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala index 6d3c9fcbfbc6d..99ba968e1ae83 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetColumnsOperation.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.hive.thriftserver +import java.util.UUID import java.util.regex.Pattern import scala.collection.JavaConverters.seqAsJavaListConverter @@ -33,6 +34,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.SessionCatalog import org.apache.spark.sql.hive.thriftserver.ThriftserverShimUtils.toJavaSQLType import org.apache.spark.sql.types.StructType +import org.apache.spark.util.{Utils => SparkUtils} /** * Spark's own SparkGetColumnsOperation @@ -57,15 +59,24 @@ private[hive] class SparkGetColumnsOperation( val catalog: SessionCatalog = sqlContext.sessionState.catalog override def runInternal(): Unit = { - val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName, tablePattern : $tableName" + - s", columnName : $columnName" - logInfo(s"GetColumnsOperation: $cmdStr") + val statementId = UUID.randomUUID().toString + // Do not change cmdStr. It's used for Hive auditing and authorization. + val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName, tablePattern : $tableName" + val logMsg = s"Listing columns '$cmdStr, columnName : $columnName'" + logInfo(s"$logMsg with $statementId") setState(OperationState.RUNNING) // Always use the latest class loader provided by executionHive's state. val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader Thread.currentThread().setContextClassLoader(executionHiveClassLoader) + HiveThriftServer2.listener.onStatementStart( + statementId, + parentSession.getSessionHandle.getSessionId.toString, + logMsg, + statementId, + parentSession.getUsername) + val schemaPattern = convertSchemaPattern(schemaName) val tablePattern = convertIdentifierPattern(tableName, true) @@ -80,8 +91,6 @@ private[hive] class SparkGetColumnsOperation( if (isAuthV2Enabled) { val privObjs = seqAsJavaListConverter(getPrivObjs(db2Tabs)).asJava - val cmdStr = - s"catalog : $catalogName, schemaPattern : $schemaName, tablePattern : $tableName" authorizeMetaGets(HiveOperationType.GET_COLUMNS, privObjs, cmdStr) } @@ -115,8 +124,11 @@ private[hive] class SparkGetColumnsOperation( } catch { case e: HiveSQLException => setState(OperationState.ERROR) + HiveThriftServer2.listener.onStatementError( + statementId, e.getMessage, SparkUtils.exceptionString(e)) throw e } + HiveThriftServer2.listener.onStatementFinish(statementId) } private def addToRowSet( diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala index e2acd956e969f..3ecbbd036c87f 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetSchemasOperation.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.hive.thriftserver +import java.util.UUID import java.util.regex.Pattern import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType @@ -25,7 +26,9 @@ import org.apache.hive.service.cli.operation.GetSchemasOperation import org.apache.hive.service.cli.operation.MetadataOperation.DEFAULT_HIVE_CATALOG import org.apache.hive.service.cli.session.HiveSession +import org.apache.spark.internal.Logging import org.apache.spark.sql.SQLContext +import org.apache.spark.util.{Utils => SparkUtils} /** * Spark's own GetSchemasOperation @@ -40,19 +43,30 @@ private[hive] class SparkGetSchemasOperation( parentSession: HiveSession, catalogName: String, schemaName: String) - extends GetSchemasOperation(parentSession, catalogName, schemaName) { + extends GetSchemasOperation(parentSession, catalogName, schemaName) with Logging { override def runInternal(): Unit = { + val statementId = UUID.randomUUID().toString + // Do not change cmdStr. It's used for Hive auditing and authorization. + val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName" + val logMsg = s"Listing databases '$cmdStr'" + logInfo(s"$logMsg with $statementId") setState(OperationState.RUNNING) // Always use the latest class loader provided by executionHive's state. val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader Thread.currentThread().setContextClassLoader(executionHiveClassLoader) if (isAuthV2Enabled) { - val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName" authorizeMetaGets(HiveOperationType.GET_TABLES, null, cmdStr) } + HiveThriftServer2.listener.onStatementStart( + statementId, + parentSession.getSessionHandle.getSessionId.toString, + logMsg, + statementId, + parentSession.getUsername) + try { val schemaPattern = convertSchemaPattern(schemaName) sqlContext.sessionState.catalog.listDatabases(schemaPattern).foreach { dbName => @@ -68,7 +82,10 @@ private[hive] class SparkGetSchemasOperation( } catch { case e: HiveSQLException => setState(OperationState.ERROR) + HiveThriftServer2.listener.onStatementError( + statementId, e.getMessage, SparkUtils.exceptionString(e)) throw e } + HiveThriftServer2.listener.onStatementFinish(statementId) } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala index 5090124a2a060..878683692fb60 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkGetTablesOperation.scala @@ -17,10 +17,10 @@ package org.apache.spark.sql.hive.thriftserver -import java.util.{List => JList} +import java.util.{List => JList, UUID} import java.util.regex.Pattern -import scala.collection.JavaConverters.seqAsJavaListConverter +import scala.collection.JavaConverters._ import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObjectUtils @@ -28,10 +28,12 @@ import org.apache.hive.service.cli._ import org.apache.hive.service.cli.operation.GetTablesOperation import org.apache.hive.service.cli.session.HiveSession +import org.apache.spark.internal.Logging import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.catalog.CatalogTableType import org.apache.spark.sql.catalyst.catalog.CatalogTableType._ import org.apache.spark.sql.hive.HiveUtils +import org.apache.spark.util.{Utils => SparkUtils} /** * Spark's own GetTablesOperation @@ -50,13 +52,16 @@ private[hive] class SparkGetTablesOperation( schemaName: String, tableName: String, tableTypes: JList[String]) - extends GetTablesOperation(parentSession, catalogName, schemaName, tableName, tableTypes) { - - if (tableTypes != null) { - this.tableTypes.addAll(tableTypes) - } + extends GetTablesOperation(parentSession, catalogName, schemaName, tableName, tableTypes) + with Logging{ override def runInternal(): Unit = { + val statementId = UUID.randomUUID().toString + // Do not change cmdStr. It's used for Hive auditing and authorization. + val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName" + val tableTypesStr = if (tableTypes == null) "null" else tableTypes.asScala.mkString(",") + val logMsg = s"Listing tables '$cmdStr, tableTypes : $tableTypesStr, tableName : $tableName'" + logInfo(s"$logMsg with $statementId") setState(OperationState.RUNNING) // Always use the latest class loader provided by executionHive's state. val executionHiveClassLoader = sqlContext.sharedState.jarClassLoader @@ -70,35 +75,50 @@ private[hive] class SparkGetTablesOperation( if (isAuthV2Enabled) { val privObjs = HivePrivilegeObjectUtils.getHivePrivDbObjects(seqAsJavaListConverter(matchingDbs).asJava) - val cmdStr = s"catalog : $catalogName, schemaPattern : $schemaName" authorizeMetaGets(HiveOperationType.GET_TABLES, privObjs, cmdStr) } - // Tables and views - matchingDbs.foreach { dbName => - val tables = catalog.listTables(dbName, tablePattern, includeLocalTempViews = false) - catalog.getTablesByName(tables).foreach { table => - val tableType = tableTypeString(table.tableType) - if (tableTypes == null || tableTypes.isEmpty || tableTypes.contains(tableType)) { - addToRowSet(table.database, table.identifier.table, tableType, table.comment) + HiveThriftServer2.listener.onStatementStart( + statementId, + parentSession.getSessionHandle.getSessionId.toString, + logMsg, + statementId, + parentSession.getUsername) + + try { + // Tables and views + matchingDbs.foreach { dbName => + val tables = catalog.listTables(dbName, tablePattern, includeLocalTempViews = false) + catalog.getTablesByName(tables).foreach { table => + val tableType = tableTypeString(table.tableType) + if (tableTypes == null || tableTypes.isEmpty || tableTypes.contains(tableType)) { + addToRowSet(table.database, table.identifier.table, tableType, table.comment) + } } } - } - // Temporary views and global temporary views - if (tableTypes == null || tableTypes.isEmpty || tableTypes.contains(VIEW.name)) { - val globalTempViewDb = catalog.globalTempViewManager.database - val databasePattern = Pattern.compile(CLIServiceUtils.patternToRegex(schemaName)) - val tempViews = if (databasePattern.matcher(globalTempViewDb).matches()) { - catalog.listTables(globalTempViewDb, tablePattern, includeLocalTempViews = true) - } else { - catalog.listLocalTempViews(tablePattern) - } - tempViews.foreach { view => - addToRowSet(view.database.orNull, view.table, VIEW.name, None) + // Temporary views and global temporary views + if (tableTypes == null || tableTypes.isEmpty || tableTypes.contains(VIEW.name)) { + val globalTempViewDb = catalog.globalTempViewManager.database + val databasePattern = Pattern.compile(CLIServiceUtils.patternToRegex(schemaName)) + val tempViews = if (databasePattern.matcher(globalTempViewDb).matches()) { + catalog.listTables(globalTempViewDb, tablePattern, includeLocalTempViews = true) + } else { + catalog.listLocalTempViews(tablePattern) + } + tempViews.foreach { view => + addToRowSet(view.database.orNull, view.table, VIEW.name, None) + } } + setState(OperationState.FINISHED) + } catch { + case e: HiveSQLException => + setState(OperationState.ERROR) + HiveThriftServer2.listener.onStatementError( + statementId, e.getMessage, SparkUtils.exceptionString(e)) + throw e } - setState(OperationState.FINISHED) + HiveThriftServer2.listener.onStatementFinish(statementId) } private def addToRowSet(