Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/master' into SPARK-6542
Browse files Browse the repository at this point in the history
  • Loading branch information
mengxr committed Mar 26, 2015
2 parents 85f3106 + 276ef1c commit c78e31a
Show file tree
Hide file tree
Showing 15 changed files with 331 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ class SparkStatusTracker private[spark] (sc: SparkContext) {
*/
def getJobIdsForGroup(jobGroup: String): Array[Int] = {
jobProgressListener.synchronized {
val jobData = jobProgressListener.jobIdToData.valuesIterator
jobData.filter(_.jobGroup.orNull == jobGroup).map(_.jobId).toArray
jobProgressListener.jobGroupToJobIds.getOrElse(jobGroup, Seq.empty).toArray
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
// These type aliases are public because they're used in the types of public fields:

type JobId = Int
type JobGroupId = String
type StageId = Int
type StageAttemptId = Int
type PoolName = String
Expand All @@ -54,6 +55,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
val completedJobs = ListBuffer[JobUIData]()
val failedJobs = ListBuffer[JobUIData]()
val jobIdToData = new HashMap[JobId, JobUIData]
val jobGroupToJobIds = new HashMap[JobGroupId, HashSet[JobId]]

// Stages:
val pendingStages = new HashMap[StageId, StageInfo]
Expand Down Expand Up @@ -119,7 +121,10 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
Map(
"jobIdToData" -> jobIdToData.size,
"stageIdToData" -> stageIdToData.size,
"stageIdToStageInfo" -> stageIdToInfo.size
"stageIdToStageInfo" -> stageIdToInfo.size,
"jobGroupToJobIds" -> jobGroupToJobIds.values.map(_.size).sum,
// Since jobGroupToJobIds is map of sets, check that we don't leak keys with empty values:
"jobGroupToJobIds keySet" -> jobGroupToJobIds.keys.size
)
}

Expand All @@ -140,7 +145,19 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
if (jobs.size > retainedJobs) {
val toRemove = math.max(retainedJobs / 10, 1)
jobs.take(toRemove).foreach { job =>
jobIdToData.remove(job.jobId)
// Remove the job's UI data, if it exists
jobIdToData.remove(job.jobId).foreach { removedJob =>
// A null jobGroupId is used for jobs that are run without a job group
val jobGroupId = removedJob.jobGroup.orNull
// Remove the job group -> job mapping entry, if it exists
jobGroupToJobIds.get(jobGroupId).foreach { jobsInGroup =>
jobsInGroup.remove(job.jobId)
// If this was the last job in this job group, remove the map entry for the job group
if (jobsInGroup.isEmpty) {
jobGroupToJobIds.remove(jobGroupId)
}
}
}
}
jobs.trimStart(toRemove)
}
Expand All @@ -158,6 +175,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
stageIds = jobStart.stageIds,
jobGroup = jobGroup,
status = JobExecutionStatus.RUNNING)
// A null jobGroupId is used for jobs that are run without a job group
jobGroupToJobIds.getOrElseUpdate(jobGroup.orNull, new HashSet[JobId]).add(jobStart.jobId)
jobStart.stageInfos.foreach(x => pendingStages(x.stageId) = x)
// Compute (a potential underestimate of) the number of tasks that will be run by this job.
// This may be an underestimate because the job start event references all of the result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.ui.jobs

import java.util.Properties

import org.scalatest.FunSuite
import org.scalatest.Matchers

Expand Down Expand Up @@ -44,11 +46,19 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
SparkListenerStageCompleted(stageInfo)
}

private def createJobStartEvent(jobId: Int, stageIds: Seq[Int]) = {
private def createJobStartEvent(
jobId: Int,
stageIds: Seq[Int],
jobGroup: Option[String] = None): SparkListenerJobStart = {
val stageInfos = stageIds.map { stageId =>
new StageInfo(stageId, 0, stageId.toString, 0, null, "")
}
SparkListenerJobStart(jobId, jobSubmissionTime, stageInfos)
val properties: Option[Properties] = jobGroup.map { groupId =>
val props = new Properties()
props.setProperty(SparkContext.SPARK_JOB_GROUP_ID, groupId)
props
}
SparkListenerJobStart(jobId, jobSubmissionTime, stageInfos, properties.orNull)
}

private def createJobEndEvent(jobId: Int, failed: Boolean = false) = {
Expand Down Expand Up @@ -110,6 +120,23 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
listener.stageIdToActiveJobIds.size should be (0)
}

test("test clearing of jobGroupToJobIds") {
val conf = new SparkConf()
conf.set("spark.ui.retainedJobs", 5.toString)
val listener = new JobProgressListener(conf)

// Run 50 jobs, each with one stage
for (jobId <- 0 to 50) {
listener.onJobStart(createJobStartEvent(jobId, Seq(0), jobGroup = Some(jobId.toString)))
listener.onStageSubmitted(createStageStartEvent(0))
listener.onStageCompleted(createStageEndEvent(0, failed = false))
listener.onJobEnd(createJobEndEvent(jobId, false))
}
assertActiveJobsStateIsEmpty(listener)
// This collection won't become empty, but it should be bounded by spark.ui.retainedJobs
listener.jobGroupToJobIds.size should be (5)
}

test("test LRU eviction of jobs") {
val conf = new SparkConf()
conf.set("spark.ui.retainedStages", 5.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ class AttributeSet private (val baseSet: Set[AttributeEquals])

/** Returns true if the members of this AttributeSet and other are the same. */
override def equals(other: Any): Boolean = other match {
case otherSet: AttributeSet => baseSet.map(_.a).forall(otherSet.contains)
case otherSet: AttributeSet =>
otherSet.size == baseSet.size && baseSet.map(_.a).forall(otherSet.contains)
case _ => false
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,10 +394,17 @@ case class Cast(child: Expression, dataType: DataType) extends UnaryExpression w
val casts = from.fields.zip(to.fields).map {
case (fromField, toField) => cast(fromField.dataType, toField.dataType)
}
// TODO: This is very slow!
buildCast[Row](_, row => Row(row.toSeq.zip(casts).map {
case (v, cast) => if (v == null) null else cast(v)
}: _*))
// TODO: Could be faster?
val newRow = new GenericMutableRow(from.fields.size)
buildCast[Row](_, row => {
var i = 0
while (i < row.length) {
val v = row(i)
newRow.update(i, if (v == null) null else casts(i)(v))
i += 1
}
newRow.copy()
})
}

private[this] def cast(from: DataType, to: DataType): Any => Any = to match {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.expressions

import org.scalatest.FunSuite

import org.apache.spark.sql.types.IntegerType

class AttributeSetSuite extends FunSuite {

val aUpper = AttributeReference("A", IntegerType)(exprId = ExprId(1))
val aLower = AttributeReference("a", IntegerType)(exprId = ExprId(1))
val fakeA = AttributeReference("a", IntegerType)(exprId = ExprId(3))
val aSet = AttributeSet(aLower :: Nil)

val bUpper = AttributeReference("B", IntegerType)(exprId = ExprId(2))
val bLower = AttributeReference("b", IntegerType)(exprId = ExprId(2))
val bSet = AttributeSet(bUpper :: Nil)

val aAndBSet = AttributeSet(aUpper :: bUpper :: Nil)

test("sanity check") {
assert(aUpper != aLower)
assert(bUpper != bLower)
}

test("checks by id not name") {
assert(aSet.contains(aUpper) === true)
assert(aSet.contains(aLower) === true)
assert(aSet.contains(fakeA) === false)

assert(aSet.contains(bUpper) === false)
assert(aSet.contains(bLower) === false)
}

test("++ preserves AttributeSet") {
assert((aSet ++ bSet).contains(aUpper) === true)
assert((aSet ++ bSet).contains(aLower) === true)
}

test("extracts all references references") {
val addSet = AttributeSet(Add(aUpper, Alias(bUpper, "test")()):: Nil)
assert(addSet.contains(aUpper))
assert(addSet.contains(aLower))
assert(addSet.contains(bUpper))
assert(addSet.contains(bLower))
}

test("dedups attributes") {
assert(AttributeSet(aUpper :: aLower :: Nil).size === 1)
}

test("subset") {
assert(aSet.subsetOf(aAndBSet) === true)
assert(aAndBSet.subsetOf(aSet) === false)
}

test("equality") {
assert(aSet != aAndBSet)
assert(aAndBSet != aSet)
assert(aSet != bSet)
assert(bSet != aSet)

assert(aSet == aSet)
assert(aSet == AttributeSet(aUpper :: Nil))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
val parquetRelation = convertToParquetRelation(relation)
val attributedRewrites = relation.output.zip(parquetRelation.output)
(relation, parquetRelation, attributedRewrites)
(relation -> relation.output, parquetRelation, attributedRewrites)

// Write path
case InsertIntoHiveTable(relation: MetastoreRelation, _, _, _)
Expand All @@ -470,7 +470,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
val parquetRelation = convertToParquetRelation(relation)
val attributedRewrites = relation.output.zip(parquetRelation.output)
(relation, parquetRelation, attributedRewrites)
(relation -> relation.output, parquetRelation, attributedRewrites)

// Read path
case p @ PhysicalOperation(_, _, relation: MetastoreRelation)
Expand All @@ -479,33 +479,35 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
val parquetRelation = convertToParquetRelation(relation)
val attributedRewrites = relation.output.zip(parquetRelation.output)
(relation, parquetRelation, attributedRewrites)
(relation -> relation.output, parquetRelation, attributedRewrites)
}

// Quick fix for SPARK-6450: Notice that we're using both the MetastoreRelation instances and
// their output attributes as the key of the map. This is because MetastoreRelation.equals
// doesn't take output attributes into account, thus multiple MetastoreRelation instances
// pointing to the same table get collapsed into a single entry in the map. A proper fix for
// this should be overriding equals & hashCode in MetastoreRelation.
val relationMap = toBeReplaced.map(r => (r._1, r._2)).toMap
val attributedRewrites = AttributeMap(toBeReplaced.map(_._3).fold(Nil)(_ ++: _))

// Replaces all `MetastoreRelation`s with corresponding `ParquetRelation2`s, and fixes
// attribute IDs referenced in other nodes.
plan.transformUp {
case r: MetastoreRelation if relationMap.contains(r) => {
val parquetRelation = relationMap(r)
val withAlias =
r.alias.map(a => Subquery(a, parquetRelation)).getOrElse(
Subquery(r.tableName, parquetRelation))
case r: MetastoreRelation if relationMap.contains(r -> r.output) =>
val parquetRelation = relationMap(r -> r.output)
val alias = r.alias.getOrElse(r.tableName)
Subquery(alias, parquetRelation)

withAlias
}
case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite)
if relationMap.contains(r) => {
val parquetRelation = relationMap(r)
if relationMap.contains(r -> r.output) =>
val parquetRelation = relationMap(r -> r.output)
InsertIntoTable(parquetRelation, partition, child, overwrite)
}

case InsertIntoHiveTable(r: MetastoreRelation, partition, child, overwrite)
if relationMap.contains(r) => {
val parquetRelation = relationMap(r)
if relationMap.contains(r -> r.output) =>
val parquetRelation = relationMap(r -> r.output)
InsertIntoTable(parquetRelation, partition, child, overwrite)
}

case other => other.transformExpressions {
case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, a)
}
Expand Down
Loading

0 comments on commit c78e31a

Please sign in to comment.