Skip to content

Commit

Permalink
resolving a few of the initial comments while still preserving correc…
Browse files Browse the repository at this point in the history
…tness of e2e tests
  • Loading branch information
ifilonenko committed Jan 16, 2019
1 parent c91574d commit c2231a0
Show file tree
Hide file tree
Showing 23 changed files with 326 additions and 194 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,6 @@ public class ExternalShuffleBlockResolver {
private static final String APP_KEY_PREFIX = "AppExecShuffleInfo";
private static final StoreVersion CURRENT_VERSION = new StoreVersion(1, 0);

// TODO: Dont necessarily write to local
private final File shuffleDir;

private static final Pattern MULTIPLE_SEPARATORS = Pattern.compile(File.separator + "{2,}");

// Map containing all registered executors' metadata.
Expand All @@ -96,8 +93,8 @@ public class ExternalShuffleBlockResolver {
final DB db;

private final List<String> knownManagers = Arrays.asList(
"org.apache.spark.shuffle.sort.SortShuffleManager",
"org.apache.spark.shuffle.unsafe.UnsafeShuffleManager");
"org.apache.spark.shuffle.sort.SortShuffleManager",
"org.apache.spark.shuffle.unsafe.UnsafeShuffleManager");

public ExternalShuffleBlockResolver(TransportConf conf, File registeredExecutorFile)
throws IOException {
Expand Down Expand Up @@ -136,17 +133,13 @@ public int weigh(File file, ShuffleIndexInformation indexInfo) {
executors = Maps.newConcurrentMap();
}

// TODO: Remove local writes
this.shuffleDir = Files.createTempDirectory("spark-shuffle-dir").toFile();

this.directoryCleaner = directoryCleaner;
}

public int getRegisteredExecutorsSize() {
return executors.size();
}


/** Registers a new Executor with all the configuration we need to find its shuffle files. */
public void registerExecutor(
String appId,
Expand Down Expand Up @@ -313,8 +306,8 @@ private ManagedBuffer getSortBasedShuffleBlockData(
* Hashes a filename into the corresponding local directory, in a manner consistent with
* Spark's DiskBlockManager.getFile().
*/

public static File getFile(String[] localDirs, int subDirsPerLocalDir, String filename) {
@VisibleForTesting
static File getFile(String[] localDirs, int subDirsPerLocalDir, String filename) {
int hash = JavaUtils.nonNegativeHash(filename);
String localDir = localDirs[hash % localDirs.length];
int subDirId = (hash / localDirs.length) % subDirsPerLocalDir;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public abstract class BlockTransferMessage implements Encodable {
public enum Type {
OPEN_BLOCKS(0), UPLOAD_BLOCK(1), REGISTER_EXECUTOR(2), STREAM_HANDLE(3), REGISTER_DRIVER(4),
HEARTBEAT(5), UPLOAD_BLOCK_STREAM(6), UPLOAD_SHUFFLE_PARTITION_STREAM(7),
UPLOAD_SHUFFLE_INDEX_STREAM(8), OPEN_SHUFFLE_PARTITION(9);
REGISTER_SHUFFLE_INDEX(8), OPEN_SHUFFLE_PARTITION(9), UPLOAD_SHUFFLE_INDEX(10);

private final byte id;

Expand All @@ -68,8 +68,9 @@ public static BlockTransferMessage fromByteBuffer(ByteBuffer msg) {
case 5: return ShuffleServiceHeartbeat.decode(buf);
case 6: return UploadBlockStream.decode(buf);
case 7: return UploadShufflePartitionStream.decode(buf);
case 8: return UploadShuffleIndexStream.decode(buf);
case 8: return RegisterShuffleIndex.decode(buf);
case 9: return OpenShufflePartition.decode(buf);
case 10: return UploadShuffleIndex.decode(buf);
default: throw new IllegalArgumentException("Unknown message type: " + type);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.shuffle.protocol;

import com.google.common.base.Objects;
import io.netty.buffer.ByteBuf;
import org.apache.spark.network.protocol.Encoders;

// Needed by ScalaDoc. See SPARK-7726
import static org.apache.spark.network.shuffle.protocol.BlockTransferMessage.Type;

/**
* Register shuffle index to the External Shuffle Service.
*/
public class RegisterShuffleIndex extends BlockTransferMessage {
public final String appId;
public final int shuffleId;
public final int mapId;

public RegisterShuffleIndex(
String appId,
int shuffleId,
int mapId) {
this.appId = appId;
this.shuffleId = shuffleId;
this.mapId = mapId;
}

@Override
public boolean equals(Object other) {
if (other != null && other instanceof UploadShufflePartitionStream) {
UploadShufflePartitionStream o = (UploadShufflePartitionStream) other;
return Objects.equal(appId, o.appId)
&& shuffleId == o.shuffleId
&& mapId == o.mapId;
}
return false;
}

@Override
protected Type type() {
return Type.REGISTER_SHUFFLE_INDEX;
}

@Override
public int hashCode() {
return Objects.hashCode(appId, shuffleId, mapId);
}

@Override
public String toString() {
return Objects.toStringHelper(this)
.add("appId", appId)
.add("shuffleId", shuffleId)
.add("mapId", mapId)
.toString();
}

@Override
public int encodedLength() {
return Encoders.Strings.encodedLength(appId) + 4 + 4;
}

@Override
public void encode(ByteBuf buf) {
Encoders.Strings.encode(buf, appId);
buf.writeInt(shuffleId);
buf.writeInt(mapId);
}

public static RegisterShuffleIndex decode(ByteBuf buf) {
String appId = Encoders.Strings.decode(buf);
int shuffleId = buf.readInt();
int mapId = buf.readInt();
return new RegisterShuffleIndex(appId, shuffleId, mapId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@
/**
* Upload shuffle index request to the External Shuffle Service.
*/
public class UploadShuffleIndexStream extends BlockTransferMessage {
public class UploadShuffleIndex extends BlockTransferMessage {
public final String appId;
public final int shuffleId;
public final int mapId;

public UploadShuffleIndexStream(
public UploadShuffleIndex(
String appId,
int shuffleId,
int mapId) {
Expand All @@ -54,7 +54,7 @@ public boolean equals(Object other) {

@Override
protected Type type() {
return Type.UPLOAD_SHUFFLE_INDEX_STREAM;
return Type.UPLOAD_SHUFFLE_INDEX;
}

@Override
Expand Down Expand Up @@ -83,10 +83,10 @@ public void encode(ByteBuf buf) {
buf.writeInt(mapId);
}

public static UploadShuffleIndexStream decode(ByteBuf buf) {
public static UploadShuffleIndex decode(ByteBuf buf) {
String appId = Encoders.Strings.decode(buf);
int shuffleId = buf.readInt();
int mapId = buf.readInt();
return new UploadShuffleIndexStream(appId, shuffleId, mapId);
return new UploadShuffleIndex(appId, shuffleId, mapId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,19 @@ public class UploadShufflePartitionStream extends BlockTransferMessage {
public final int shuffleId;
public final int mapId;
public final int partitionId;
public final int partitionLength;

public UploadShufflePartitionStream(
String appId,
int shuffleId,
int mapId,
int partitionId) {
int partitionId,
int partitionLength) {
this.appId = appId;
this.shuffleId = shuffleId;
this.mapId = mapId;
this.partitionId = partitionId;
this.partitionLength = partitionLength;
}

@Override
Expand All @@ -51,7 +54,8 @@ public boolean equals(Object other) {
return Objects.equal(appId, o.appId)
&& shuffleId == o.shuffleId
&& mapId == o.mapId
&& partitionId == o.partitionId;
&& partitionId == o.partitionId
&& partitionLength == o.partitionLength;
}
return false;
}
Expand All @@ -63,7 +67,7 @@ protected Type type() {

@Override
public int hashCode() {
return Objects.hashCode(appId, shuffleId, mapId, partitionId);
return Objects.hashCode(appId, shuffleId, mapId, partitionId, partitionLength);
}

@Override
Expand All @@ -72,12 +76,14 @@ public String toString() {
.add("appId", appId)
.add("shuffleId", shuffleId)
.add("mapId", mapId)
.add("partitionId", partitionId)
.add("partitionLength", partitionLength)
.toString();
}

@Override
public int encodedLength() {
return Encoders.Strings.encodedLength(appId) + 4 + 4 + 4;
return Encoders.Strings.encodedLength(appId) + 4 + 4 + 4 + 4;
}

@Override
Expand All @@ -86,13 +92,16 @@ public void encode(ByteBuf buf) {
buf.writeInt(shuffleId);
buf.writeInt(mapId);
buf.writeInt(partitionId);
buf.writeInt(partitionLength);
}

public static UploadShufflePartitionStream decode(ByteBuf buf) {
String appId = Encoders.Strings.decode(buf);
int shuffleId = buf.readInt();
int mapId = buf.readInt();
int partitionId = buf.readInt();
return new UploadShufflePartitionStream(appId, shuffleId, mapId, partitionId);
int partitionLength = buf.readInt();
return new UploadShufflePartitionStream(
appId, shuffleId, mapId, partitionId, partitionLength);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public interface ShuffleMapOutputWriter {

ShufflePartitionWriter newPartitionWriter(int partitionId);

void commitAllPartitions(long[] partitionLengths);
void commitAllPartitions();

void abort(Exception exception);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import org.apache.spark.SparkConf;
import org.apache.spark.SparkEnv;
import org.apache.spark.network.TransportContext;
import org.apache.spark.network.netty.SparkTransportConf;
import org.apache.spark.network.server.NoOpRpcHandler;
import org.apache.spark.network.util.TransportConf;
import org.apache.spark.shuffle.api.ShuffleDataIO;
import org.apache.spark.shuffle.api.ShuffleReadSupport;
Expand All @@ -12,44 +14,41 @@

public class ExternalShuffleDataIO implements ShuffleDataIO {

private static final String SHUFFLE_SERVICE_PORT_CONFIG = "spark.shuffle.service.port";
private static final String DEFAULT_SHUFFLE_PORT = "7337";

private static final SparkEnv sparkEnv = SparkEnv.get();
private static final BlockManager blockManager = sparkEnv.blockManager();

private final SparkConf sparkConf;
private final TransportConf conf;
private final SecurityManager securityManager;
private final String hostname;
private final int port;
private final TransportContext context;
private static BlockManager blockManager;
private static SecurityManager securityManager;
private static String hostname;
private static int port;

public ExternalShuffleDataIO(
SparkConf sparkConf) {
this.sparkConf = sparkConf;
this.conf = SparkTransportConf.fromSparkConf(sparkConf, "shuffle", 1);

this.securityManager = sparkEnv.securityManager();
this.hostname = blockManager.getRandomShuffleHost();
this.port = blockManager.getRandomShufflePort();
// Close idle connections
this.context = new TransportContext(conf, new NoOpRpcHandler(), true, true);
}

@Override
public void initialize() {
// TODO: move registerDriver and registerExecutor here
SparkEnv env = SparkEnv.get();
blockManager = env.blockManager();
securityManager = env.securityManager();
hostname = blockManager.getRandomShuffleHost();
port = blockManager.getRandomShufflePort();
// TODO: Register Driver and Executor
}

@Override
public ShuffleReadSupport readSupport() {
return new ExternalShuffleReadSupport(
conf, securityManager.isAuthenticationEnabled(),
securityManager, hostname, port);
conf, context, securityManager.isAuthenticationEnabled(),
securityManager, hostname, port);
}

@Override
public ShuffleWriteSupport writeSupport() {
return new ExternalShuffleWriteSupport(
conf, securityManager.isAuthenticationEnabled(),
securityManager, hostname, port);
conf, context, securityManager.isAuthenticationEnabled(),
securityManager, hostname, port);
}
}
Loading

0 comments on commit c2231a0

Please sign in to comment.