Skip to content

Commit

Permalink
[CARMEL-3157] Index upgrade to 3.0 (#52)
Browse files Browse the repository at this point in the history
* index synax

* index build

* index prune

* index metrics

* index ut

* [CARMEL-3157] index pruning - upgrade to 3.0

* remove ut for index treated as unsupport feature

* fix conflict

* fix conflict

* fix style
  • Loading branch information
Glee authored and wangyum committed Aug 28, 2020
1 parent 40789c4 commit 8c22d1d
Show file tree
Hide file tree
Showing 56 changed files with 6,265 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ private[spark] object InternalAccumulator {
val DISK_BYTES_SPILLED = METRICS_PREFIX + "diskBytesSpilled"
val PEAK_EXECUTION_MEMORY = METRICS_PREFIX + "peakExecutionMemory"
val UPDATED_BLOCK_STATUSES = METRICS_PREFIX + "updatedBlockStatuses"
val PRUNED_STATS = "index.prunedStats"
val TEST_ACCUM = METRICS_PREFIX + "testAccumulator"

// scalastyle:off
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -901,7 +901,7 @@ private[spark] class Executor(
} else {
taskRunner.task.metrics.accumulators()
}
accumUpdates += ((taskRunner.taskId, accumulatorsToReport))
accumUpdates += ((taskRunner.taskId, accumulatorsToReport.map(_.toImmutable)))
}
}

Expand Down
126 changes: 126 additions & 0 deletions core/src/main/scala/org/apache/spark/executor/PruneMetrics.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.executor

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicLong

import org.apache.spark.internal.Logging
import org.apache.spark.util.AccumulatorV2

case class PrunedStats(name: String, var prunedFiles: AtomicLong, var prunedRowGroups: AtomicLong) {
def isZero: Boolean = {
prunedFiles == 0 && prunedRowGroups == 0
}

def incPrunedFiles(v: Long): Unit = {
prunedFiles.addAndGet(v)
}
def incPrunedRowGroups(v: Long): Unit = {
prunedRowGroups.addAndGet(v)
}

override def toString: String = {
"[name: " + name + ", prunedFiles: " + prunedFiles +
", prunedRowGroups: " + prunedRowGroups + "]"
}
}

// PrunedMetricsAccum is not thread-safe but only used in single thread environment
class PrunedMetricsAccum() extends
AccumulatorV2[(String, String, Long), List[PrunedStats]] with Logging{
import scala.collection.convert.WrapAsScala._

private val pruneStatsMap: scala.collection.concurrent.Map[String, PrunedStats] =
new ConcurrentHashMap[String, PrunedStats]()

private def addPrunedStat(statName: String): Unit = {
if (!pruneStatsMap.contains(statName)) {
pruneStatsMap.put(statName, PrunedStats(statName, new AtomicLong(0), new AtomicLong(0)))
}
}

def setValue(value: List[PrunedStats]): Unit = {
value.foreach(s => {
pruneStatsMap.put(s.name, s)
})
}
override def isZero: Boolean = pruneStatsMap.isEmpty || pruneStatsMap.forall(p => p._2.isZero)

override def reset(): Unit = {
pruneStatsMap.clear()
}

override def add(v: (String, String, Long)): Unit = {
val statName = v._1
val subMetricName = v._2
val value = v._3
addPrunedStat(statName)
if (pruneStatsMap.contains(statName)) {
if (subMetricName.equalsIgnoreCase(PrunedMetricsAccum.PRUNED_FILES)) {
pruneStatsMap(statName).incPrunedFiles(value)
} else if (subMetricName.equalsIgnoreCase(PrunedMetricsAccum.PRUNED_ROWGROUPS)) {
pruneStatsMap(statName).incPrunedRowGroups(value)
}
}
}

override def copy(): AccumulatorV2[(String, String, Long), List[PrunedStats]] = {
val copiedStats = new PrunedMetricsAccum
pruneStatsMap.foreach(s => {
copiedStats.pruneStatsMap.put(s._1,
new PrunedStats(s._2.name, s._2.prunedFiles, s._2.prunedRowGroups))
})
copiedStats
}

override def merge(other: AccumulatorV2[(String, String, Long), List[PrunedStats]]): Unit = {
if (other.isInstanceOf[PrunedMetricsAccum]) {
val otherStats = other.asInstanceOf[PrunedMetricsAccum]
otherStats.pruneStatsMap.foreach( s => {
if (pruneStatsMap.contains(s._1)) {
pruneStatsMap.get(s._1).get.incPrunedFiles(s._2.prunedFiles.get())
pruneStatsMap.get(s._1).get.incPrunedRowGroups(s._2.prunedRowGroups.get())
} else {
pruneStatsMap.put(s._1, s._2)
}
})
}
}

override def value: List[PrunedStats] = {
pruneStatsMap.values.toList
}

override def toImmutable(): AccumulatorV2[(String, String, Long), List[PrunedStats]] = {
val copiedStats = new PrunedMetricsAccum
copiedStats.metadata = this.metadata
pruneStatsMap.foreach(s => {
copiedStats.pruneStatsMap.put(s._1,
new PrunedStats(s._2.name, s._2.prunedFiles, s._2.prunedRowGroups))
})
copiedStats.atDriverSide(false)
copiedStats
}
}
object PrunedMetricsAccum {
val PRUNED_FILES = "PRUNED_FILES"
val PRUNED_ROWGROUPS = "PRUNED_ROWGROUPS"
}


Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ class TaskMetrics private[spark] () extends Serializable {
private val _diskBytesSpilled = new LongAccumulator
private val _peakExecutionMemory = new LongAccumulator
private val _updatedBlockStatuses = new CollectionAccumulator[(BlockId, BlockStatus)]
private val _prunedStats = new PrunedMetricsAccum

def prunedStats: PrunedMetricsAccum = _prunedStats
/**
* Time taken on the executor to deserialize this task.
*/
Expand Down Expand Up @@ -147,6 +149,9 @@ class TaskMetrics private[spark] () extends Serializable {
_updatedBlockStatuses.setValue(v)
private[spark] def setUpdatedBlockStatuses(v: Seq[(BlockId, BlockStatus)]): Unit =
_updatedBlockStatuses.setValue(v.asJava)
private[spark] def setPrunedStats(v: List[PrunedStats]): Unit = {
_prunedStats.setValue(v)
}

/**
* Metrics related to reading data from a [[org.apache.spark.rdd.HadoopRDD]] or from persisted
Expand Down Expand Up @@ -220,6 +225,7 @@ class TaskMetrics private[spark] () extends Serializable {
DISK_BYTES_SPILLED -> _diskBytesSpilled,
PEAK_EXECUTION_MEMORY -> _peakExecutionMemory,
UPDATED_BLOCK_STATUSES -> _updatedBlockStatuses,
PRUNED_STATS -> _prunedStats,
shuffleRead.REMOTE_BLOCKS_FETCHED -> shuffleReadMetrics._remoteBlocksFetched,
shuffleRead.LOCAL_BLOCKS_FETCHED -> shuffleReadMetrics._localBlocksFetched,
shuffleRead.REMOTE_BYTES_READ -> shuffleReadMetrics._remoteBytesRead,
Expand Down Expand Up @@ -300,6 +306,8 @@ private[spark] object TaskMetrics extends Logging {
val value = info.update.get
if (name == UPDATED_BLOCK_STATUSES) {
tm.setUpdatedBlockStatuses(value.asInstanceOf[java.util.List[(BlockId, BlockStatus)]])
} else if (name == PRUNED_STATS) {
tm.setPrunedStats(value.asInstanceOf[List[PrunedStats]])
} else {
tm.nameToAccums.get(name).foreach(
_.asInstanceOf[LongAccumulator].setValue(value.asInstanceOf[Long])
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ private[spark] abstract class Task[T](
def collectAccumulatorUpdates(taskFailed: Boolean = false): Seq[AccumulatorV2[_, _]] = {
if (context != null) {
// Note: internal accumulators representing task metrics always count failed values
context.taskMetrics.nonZeroInternalAccums() ++
context.taskMetrics.nonZeroInternalAccums().map(_.toImmutable) ++
// zero value external accumulators may still be useful, e.g. SQLMetrics, we should not
// filter them out.
context.taskMetrics.externalAccums.filter(a => !taskFailed || a.countFailedValues)
Expand Down
19 changes: 19 additions & 0 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,21 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We
val stageGraph = parent.store.asOption(parent.store.operationGraphForStage(stageId))
val dagViz = UIUtils.showDagVizForStage(stageId, stageGraph)

val accumulableHeaders: Seq[String] = Seq("Accumulable", "Value")
def accumulableRow(acc: AccumulableInfo): Seq[Node] = {
if (acc.name != null && acc.value != null) {
<tr><td>{acc.name}</td><td>{if (acc.value.getClass.isArray)
{acc.value.asInstanceOf[List[_]].map( v => v.toString).mkString(",")}
else {acc.value}}</td></tr>
} else {
Nil
}
}
val accumulableTable = UIUtils.listingTable(
accumulableHeaders,
accumulableRow,
stageData.accumulatorUpdates.toSeq)

val currentTime = System.currentTimeMillis()
val taskTable = try {
val _taskTable = new TaskPagedTable(
Expand All @@ -220,6 +235,9 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We
null
}

val maybeAccumulableTable: Seq[Node] =
if (hasAccumulators(stageData)) { <h4>Accumulators</h4> ++ accumulableTable } else Seq()

val content =
summary ++
dagViz ++ <div id="showAdditionalMetrics"></div> ++
Expand All @@ -232,6 +250,7 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We
taskPagedTable.dataSource.sliceData(from, to)}).getOrElse(Nil), currentTime,
eventTimelineTaskPage, eventTimelineTaskPageSize, eventTimelineTotalPages, stageId,
stageAttemptId, totalTasks) ++
// maybeAccumulableTable ++
<div id="parent-container">
<script src={UIUtils.prependBaseUri(request, "/static/utils.js")}></script>
<script src={UIUtils.prependBaseUri(request, "/static/stagepage.js")}></script>
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/util/AccumulatorV2.scala
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable {
}
}

final def atDriverSide(driverSide: Boolean): Unit = {
atDriverSide = driverSide
}

/**
* Whether to accumulate values from failed tasks. This is set to true for system and time
* metrics like serialization time or bytes spilled, and false for things with absolute values
Expand Down Expand Up @@ -128,6 +132,12 @@ abstract class AccumulatorV2[IN, OUT] extends Serializable {
copyAcc
}

// in case OptionalDataException happens when serialization the acc
// default to return itself, subclass can override if having nested unprimitive field
def toImmutable(): AccumulatorV2[IN, OUT] = {
this
}

/**
* Creates a new copy of this accumulator.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ class JsonProtocolSuite extends SparkFunSuite {
// Use custom accum ID for determinism
val accumUpdates =
makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800, hasHadoopInput = true, hasOutput = true)
.accumulators().map(AccumulatorSuite.makeInfo)
.accumulators().filterNot(_.name.get.equals(InternalAccumulator.PRUNED_STATS)).
map(AccumulatorSuite.makeInfo)
.zipWithIndex.map { case (a, i) => a.copy(id = i) }
val executorUpdates = new ExecutorMetrics(
Array(543L, 123456L, 12345L, 1234L, 123L, 12L, 432L,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,16 @@ statement
multipartIdentifier AS className=STRING
(USING resource (',' resource)*)? #createFunction
| DROP TEMPORARY? FUNCTION (IF EXISTS)? multipartIdentifier #dropFunction
| CREATE INDEX identifier
ON TABLE tableName=tableIdentifier identifierList
AS indexType=STRING
(WITH DEFERRED REBUILD)?
(COMMENT comment=STRING)? #createIndex
| DROP INDEX (IF EXISTS)? identifier ON tableIdentifier #dropIndex
| ALTER INDEX identifier?
ON tableIdentifier partitionSpec* REBUILD #alterIndex
| SHOW INDEXES ON tableIdentifier #showIndexes
| (DESC | DESCRIBE) INDEX EXTENDED? identifier ON tableIdentifier #describeIndex
| EXPLAIN (LOGICAL | FORMATTED | EXTENDED | CODEGEN | COST)?
statement #explain
| SHOW TABLES ((FROM | IN) multipartIdentifier)?
Expand Down Expand Up @@ -1309,6 +1319,7 @@ nonReserved
| DATABASES
| DAY
| DBPROPERTIES
| DEFERRED
| DEFINED
| DELETE
| DELIMITED
Expand Down Expand Up @@ -1353,6 +1364,7 @@ nonReserved
| GROUPING
| HAVING
| HOUR
| IDXPROPERTIES
| IF
| IGNORE
| IMPORT
Expand Down Expand Up @@ -1421,6 +1433,7 @@ nonReserved
| PURGE
| QUERY
| RANGE
| REBUILD
| RECORDREADER
| RECORDWRITER
| RECOVER
Expand Down Expand Up @@ -1563,6 +1576,7 @@ DATABASES: 'DATABASES' | 'SCHEMAS';
DAY: 'DAY';
DBPROPERTIES: 'DBPROPERTIES';
DEFINED: 'DEFINED';
DEFERRED: 'DEFERRED';
DELETE: 'DELETE';
DELIMITED: 'DELIMITED';
DENY: 'DENY';
Expand Down Expand Up @@ -1607,6 +1621,7 @@ GROUP: 'GROUP';
GROUPING: 'GROUPING';
HAVING: 'HAVING';
HOUR: 'HOUR';
IDXPROPERTIES: 'IDXPROPERTIES';
IF: 'IF';
IGNORE: 'IGNORE';
IMPORT: 'IMPORT';
Expand Down Expand Up @@ -1681,6 +1696,7 @@ PROPERTIES: 'PROPERTIES';
PURGE: 'PURGE';
QUERY: 'QUERY';
RANGE: 'RANGE';
REBUILD: 'REBUILD';
RECORDREADER: 'RECORDREADER';
RECORDWRITER: 'RECORDWRITER';
RECOVER: 'RECOVER';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,4 +293,22 @@ trait ExternalCatalog {
def functionExists(db: String, funcName: String): Boolean

def listFunctions(db: String, pattern: String): Seq[String]

// --------------------------------------------------------------------------
// Index related
// --------------------------------------------------------------------------

def createIndex(index: CatalogIndex): Unit

def dropIndex(
db: String,
baseTableName: String,
index_name: String,
throwException: Boolean): Unit

def getIndexCol(db: String, baseTableName: String, index_name: String): StructType

def getIndex(db: String, baseTableName: String, index_name: String): CatalogIndex

def getAllIndexes(db: String, baseTableName: String): Map[String, CatalogIndex]
}
Original file line number Diff line number Diff line change
Expand Up @@ -303,4 +303,33 @@ class ExternalCatalogWithListener(delegate: ExternalCatalog)
override def listFunctions(db: String, pattern: String): Seq[String] = {
delegate.listFunctions(db, pattern)
}

// --------------------------------------------------------------------------
// Index
// --------------------------------------------------------------------------

override def createIndex(index: CatalogIndex): Unit = {
delegate.createIndex(index);
}

override def dropIndex(
db : String,
baseTableName: String,
index_name: String,
throwException: Boolean): Unit = {
delegate.dropIndex(db, baseTableName, index_name, throwException)
}

override def getIndexCol(db: String, baseTableName: String, index_name: String): StructType = {
delegate.getIndexCol(db, baseTableName, index_name)
}

override def getIndex(db: String, baseTableName: String, index_name: String): CatalogIndex = {
delegate.getIndex(db, baseTableName, index_name)
}

override def getAllIndexes(db: String, baseTableName: String)
: Map[String, CatalogIndex] = {
delegate.getAllIndexes(db, baseTableName)
}
}
Loading

0 comments on commit 8c22d1d

Please sign in to comment.