Skip to content

Commit

Permalink
[SPARK-32921][SHUFFLE] MapOutputTracker extensions to support push-ba…
Browse files Browse the repository at this point in the history
…sed shuffle

### What changes were proposed in this pull request?
This is one of the patches for SPIP SPARK-30602 for push-based shuffle.
Summary of changes:

- Introduce `MergeStatus` which tracks the partition level metadata for a merged shuffle partition in the Spark driver
- Unify `MergeStatus` and `MapStatus` under a single trait to allow code reusing inside `MapOutputTracker`
- Extend `MapOutputTracker` to support registering / unregistering `MergeStatus`, calculate preferred locations for a shuffle taking into consideration of merged shuffle partitions, and serving reducer requests for block fetching locations with merged shuffle partitions.

The added APIs in `MapOutputTracker` will be used by `DAGScheduler` in SPARK-32920 and by `ShuffleBlockFetcherIterator` in SPARK-32922

### Why are the changes needed?
Refer to SPARK-30602

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added unit tests.

Lead-authored-by: Min Shen mshenlinkedin.com
Co-authored-by: Chandni Singh chsinghlinkedin.com
Co-authored-by: Venkata Sowrirajan vsowrirajanlinkedin.com

Closes apache#30480 from Victsm/SPARK-32921.

Lead-authored-by: Venkata krishnan Sowrirajan <[email protected]>
Co-authored-by: Min Shen <[email protected]>
Co-authored-by: Chandni Singh <[email protected]>
Co-authored-by: Chandni Singh <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
  • Loading branch information
4 people authored and Mridul Muralidharan committed Apr 26, 2021
1 parent 2d6467d commit 38ef477
Show file tree
Hide file tree
Showing 8 changed files with 1,006 additions and 125 deletions.
670 changes: 570 additions & 100 deletions core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,8 @@ private[spark] class DAGScheduler(
// since we can't do it in the RDD constructor because # of partitions is unknown
logInfo(s"Registering RDD ${rdd.id} (${rdd.getCreationSite}) as input to " +
s"shuffle ${shuffleDep.shuffleId}")
mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length,
shuffleDep.partitioner.numPartitions)
}
stage
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,18 @@ import org.apache.spark.internal.config
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.Utils

/**
* A common trait between [[MapStatus]] and [[MergeStatus]]. This allows us to reuse existing
* code to handle MergeStatus inside MapOutputTracker.
*/
private[spark] trait ShuffleOutputStatus

/**
* Result returned by a ShuffleMapTask to a scheduler. Includes the block manager address that the
* task has shuffle files stored on as well as the sizes of outputs for each reducer, for passing
* on to the reduce tasks.
*/
private[spark] sealed trait MapStatus {
private[spark] sealed trait MapStatus extends ShuffleOutputStatus {
/** Location where this task output is. */
def location: BlockManagerId

Expand Down
113 changes: 113 additions & 0 deletions core/src/main/scala/org/apache/spark/scheduler/MergeStatus.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.scheduler

import java.io.{Externalizable, ObjectInput, ObjectOutput}

import org.roaringbitmap.RoaringBitmap

import org.apache.spark.network.shuffle.protocol.MergeStatuses
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.Utils

/**
* The status for the result of merging shuffle partition blocks per individual shuffle partition
* maintained by the scheduler. The scheduler would separate the
* [[org.apache.spark.network.shuffle.protocol.MergeStatuses]] received from
* ExternalShuffleService into individual [[MergeStatus]] which is maintained inside
* MapOutputTracker to be served to the reducers when they start fetching shuffle partition
* blocks. Note that, the reducers are ultimately fetching individual chunks inside a merged
* shuffle file, as explained in [[org.apache.spark.network.shuffle.RemoteBlockPushResolver]].
* Between the scheduler maintained MergeStatus and the shuffle service maintained per shuffle
* partition meta file, we are effectively dividing the metadata for a push-based shuffle into
* 2 layers. The scheduler would track the top-level metadata at the shuffle partition level
* with MergeStatus, and the shuffle service would maintain the partition level metadata about
* how to further divide a merged shuffle partition into multiple chunks with the per-partition
* meta file. This helps to reduce the amount of data the scheduler needs to maintain for
* push-based shuffle.
*/
private[spark] class MergeStatus(
private[this] var loc: BlockManagerId,
private[this] var mapTracker: RoaringBitmap,
private[this] var size: Long)
extends Externalizable with ShuffleOutputStatus {

protected def this() = this(null, null, -1) // For deserialization only

def location: BlockManagerId = loc

def totalSize: Long = size

def tracker: RoaringBitmap = mapTracker

/**
* Get the list of mapper IDs for missing mapper partition blocks that are not merged.
* The reducer will use this information to decide which shuffle partition blocks to
* fetch in the original way.
*/
def getMissingMaps(numMaps: Int): Seq[Int] = {
(0 until numMaps).filter(i => !mapTracker.contains(i))
}

/**
* Get the number of missing map outputs for missing mapper partition blocks that are not merged.
*/
def getNumMissingMapOutputs(numMaps: Int): Int = {
(0 until numMaps).count(i => !mapTracker.contains(i))
}

override def writeExternal(out: ObjectOutput): Unit = Utils.tryOrIOException {
loc.writeExternal(out)
mapTracker.writeExternal(out)
out.writeLong(size)
}

override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException {
loc = BlockManagerId(in)
mapTracker = new RoaringBitmap()
mapTracker.readExternal(in)
size = in.readLong()
}
}

private[spark] object MergeStatus {
// Dummy number of reduces for the tests where push based shuffle is not enabled
val SHUFFLE_PUSH_DUMMY_NUM_REDUCES = 1

/**
* Separate a MergeStatuses received from an ExternalShuffleService into individual
* MergeStatus. The scheduler is responsible for providing the location information
* for the given ExternalShuffleService.
*/
def convertMergeStatusesToMergeStatusArr(
mergeStatuses: MergeStatuses,
loc: BlockManagerId): Seq[(Int, MergeStatus)] = {
assert(mergeStatuses.bitmaps.length == mergeStatuses.reduceIds.length &&
mergeStatuses.bitmaps.length == mergeStatuses.sizes.length)
val mergerLoc = BlockManagerId(BlockManagerId.SHUFFLE_MERGER_IDENTIFIER, loc.host, loc.port)
mergeStatuses.bitmaps.zipWithIndex.map {
case (bitmap, index) =>
val mergeStatus = new MergeStatus(mergerLoc, bitmap, mergeStatuses.sizes(index))
(mergeStatuses.reduceIds(index), mergeStatus)
}
}

def apply(loc: BlockManagerId, bitmap: RoaringBitmap, size: Long): MergeStatus = {
new MergeStatus(loc, bitmap, size)
}
}
Loading

0 comments on commit 38ef477

Please sign in to comment.