Skip to content

Commit

Permalink
[SYSTEMDS-2600] Rework federated runtime backend (framework, ops)
Browse files Browse the repository at this point in the history
This patch makes a major rework of the exiting federated runtime backend
and operations in order to simplify the joint development of all
remaining federated operations.

The new design has only four command types: read, put, get, exec_inst,
which allows to read federated matrices, put and get variables, and
execute arbitrary instructions over these variables. With this approach,
we can reuse the existing symbol table and CP/Spark instructions and
only need to handle their orchestration and global compensations.
Furthermore, the new design adds several primitives like broadcast,
broadcastSliced, aggregations, and rbind/cbind and more convenient data
structures.

Finally, this patch also includes minor reworks of the execution
context, and reblock rewrite to allow for specific characteristics of
federated execution.
  • Loading branch information
mboehm7 committed Aug 8, 2020
1 parent 7af2ae0 commit a4f992e
Show file tree
Hide file tree
Showing 34 changed files with 692 additions and 952 deletions.
2 changes: 1 addition & 1 deletion src/main/java/org/apache/sysds/common/Types.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public enum ExecMode {
/**
* Execution type of individual operations.
*/
public enum ExecType { CP, CP_FILE, SPARK, GPU, INVALID }
public enum ExecType { CP, CP_FILE, SPARK, GPU, FED, INVALID }

/**
* Data types (tensor, matrix, scalar, frame, object, unknown).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,9 @@ else if (dop.getOp().isTransient()) {
}
}
else if (dop.getOp() == OpOpData.FEDERATED) {
// TODO maybe do something here?
} else {
dop.setBlocksize(blocksize);
}
else {
throw new HopsException(hop.printErrorLocation() + "unexpected non-scalar Data HOP in reblock.\n");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@
import org.apache.sysds.hops.OptimizerUtils;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.caching.LazyWriteBuffer.RPolicy;
import org.apache.sysds.runtime.controlprogram.federated.FederatedData;
import org.apache.sysds.runtime.controlprogram.federated.FederatedRange;
import org.apache.sysds.runtime.controlprogram.federated.FederationMap;
import org.apache.sysds.runtime.controlprogram.parfor.stat.InfrastructureAnalyzer;
import org.apache.sysds.runtime.controlprogram.parfor.util.IDSequence;
import org.apache.sysds.runtime.instructions.cp.Data;
Expand Down Expand Up @@ -170,8 +169,7 @@ public enum CacheStatus {
*/
protected PrivacyConstraint _privacyConstraint = null;

protected Map<FederatedRange, FederatedData> _fedMapping = null;

protected FederationMap _fedMapping = null;

/** The name of HDFS file in which the data is backed up. */
protected String _hdfsFileName = null; // file name and path
Expand Down Expand Up @@ -357,15 +355,15 @@ public boolean isFederated() {
* Gets the mapping of indices ranges to federated objects.
* @return fedMapping mapping
*/
public Map<FederatedRange, FederatedData> getFedMapping() {
public FederationMap getFedMapping() {
return _fedMapping;
}

/**
* Sets the mapping of indices ranges to federated objects.
* @param fedMapping mapping
*/
public void setFedMapping(Map<FederatedRange, FederatedData> fedMapping) {
public void setFedMapping(FederationMap fedMapping) {
_fedMapping = fedMapping;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import java.util.List;
import java.util.concurrent.Future;

import static org.apache.sysds.runtime.util.UtilFunctions.requestFederatedData;

public class FrameObject extends CacheableData<FrameBlock>
{
Expand Down Expand Up @@ -169,7 +168,7 @@ public FrameBlock acquireRead() {
FrameBlock result = new FrameBlock(_schema);
// provide long support?
result.ensureAllocatedColumns((int) _metaData.getDataCharacteristics().getRows());
List<Pair<FederatedRange, Future<FederatedResponse>>> readResponses = requestFederatedData(_fedMapping);
List<Pair<FederatedRange, Future<FederatedResponse>>> readResponses = _fedMapping.requestFederatedData();
try {
for(Pair<FederatedRange, Future<FederatedResponse>> readResponse : readResponses) {
FederatedRange range = readResponse.getLeft();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

package org.apache.sysds.runtime.controlprogram.caching;

import static org.apache.sysds.runtime.util.UtilFunctions.requestFederatedData;

import java.io.IOException;
import java.lang.ref.SoftReference;
import java.util.List;
Expand Down Expand Up @@ -405,7 +403,7 @@ public MatrixBlock acquireRead() {
long[] dims = getDataCharacteristics().getDims();
// TODO sparse optimization
MatrixBlock result = new MatrixBlock((int) dims[0], (int) dims[1], false);
List<Pair<FederatedRange, Future<FederatedResponse>>> readResponses = requestFederatedData(_fedMapping);
List<Pair<FederatedRange, Future<FederatedResponse>>> readResponses = _fedMapping.requestFederatedData();
try {
for (Pair<FederatedRange, Future<FederatedResponse>> readResponse : readResponses) {
FederatedRange range = readResponse.getLeft();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sysds.api.DMLScript;
import org.apache.sysds.common.Types;
import org.apache.sysds.common.Types.FileFormat;
import org.apache.sysds.common.Types.ValueType;
import org.apache.sysds.conf.ConfigurationManager;
import org.apache.sysds.hops.OptimizerUtils;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.LocalVariableMap;
import org.apache.sysds.runtime.controlprogram.Program;
import org.apache.sysds.runtime.controlprogram.caching.CacheBlock;
import org.apache.sysds.runtime.controlprogram.caching.CacheableData;
import org.apache.sysds.runtime.controlprogram.caching.FrameObject;
import org.apache.sysds.runtime.controlprogram.caching.MatrixObject;
Expand Down Expand Up @@ -66,7 +70,8 @@ public class ExecutionContext {

//symbol table
protected LocalVariableMap _variables;

protected boolean _autoCreateVars;

//lineage map, cache, prepared dedup blocks
protected Lineage _lineage;

Expand All @@ -83,12 +88,14 @@ protected ExecutionContext() {
protected ExecutionContext( boolean allocateVariableMap, boolean allocateLineage, Program prog ) {
//protected constructor to force use of ExecutionContextFactory
_variables = allocateVariableMap ? new LocalVariableMap() : null;
_autoCreateVars = false;
_lineage = allocateLineage ? new Lineage() : null;
_prog = prog;
}

public ExecutionContext(LocalVariableMap vars) {
_variables = vars;
_autoCreateVars = false;
_lineage = null;
_prog = null;
}
Expand Down Expand Up @@ -116,6 +123,14 @@ public Lineage getLineage() {
public void setLineage(Lineage lineage) {
_lineage = lineage;
}

public boolean isAutoCreateVars() {
return _autoCreateVars;
}

public void setAutoCreateVars(boolean flag) {
_autoCreateVars = flag;
}

/**
* Get the i-th GPUContext
Expand Down Expand Up @@ -502,13 +517,17 @@ public void releaseMatrixOutputForGPUInstruction(String varName) {
}

public void setMatrixOutput(String varName, MatrixBlock outputData) {
if( isAutoCreateVars() && !containsVariable(varName) )
setVariable(varName, createMatrixObject(outputData));
MatrixObject mo = getMatrixObject(varName);
mo.acquireModify(outputData);
mo.release();
setVariable(varName, mo);
}

public void setMatrixOutput(String varName, MatrixBlock outputData, UpdateType flag) {
if( isAutoCreateVars() && !containsVariable(varName) )
setVariable(varName, createMatrixObject(outputData));
if( flag.isInPlace() ) {
//modify metadata to carry update status
MatrixObject mo = getMatrixObject(varName);
Expand All @@ -517,10 +536,6 @@ public void setMatrixOutput(String varName, MatrixBlock outputData, UpdateType f
setMatrixOutput(varName, outputData);
}

public void setMatrixOutput(String varName, MatrixBlock outputData, UpdateType flag, String opcode) {
setMatrixOutput(varName, outputData, flag);
}

public void setTensorOutput(String varName, TensorBlock outputData) {
TensorObject to = getTensorObject(varName);
to.acquireModify(outputData);
Expand All @@ -529,11 +544,42 @@ public void setTensorOutput(String varName, TensorBlock outputData) {
}

public void setFrameOutput(String varName, FrameBlock outputData) {
if( isAutoCreateVars() && !containsVariable(varName) )
setVariable(varName, createFrameObject(outputData));
FrameObject fo = getFrameObject(varName);
fo.acquireModify(outputData);
fo.release();
setVariable(varName, fo);
}

public static CacheableData<?> createCacheableData(CacheBlock cb) {
if( cb instanceof MatrixBlock )
return createMatrixObject((MatrixBlock) cb);
else if( cb instanceof FrameBlock )
return createFrameObject((FrameBlock) cb);
return null;
}

private static CacheableData<?> createMatrixObject(MatrixBlock mb) {
MatrixObject ret = new MatrixObject(Types.ValueType.FP64,
OptimizerUtils.getUniqueTempFileName());
ret.acquireModify(mb);
ret.setMetaData(new MetaDataFormat(new MatrixCharacteristics(
mb.getNumRows(), mb.getNumColumns()), FileFormat.BINARY));
ret.getMetaData().getDataCharacteristics()
.setBlocksize(ConfigurationManager.getBlocksize());
ret.release();
return ret;
}

private static CacheableData<?> createFrameObject(FrameBlock fb) {
FrameObject ret = new FrameObject(OptimizerUtils.getUniqueTempFileName());
ret.acquireModify(fb);
ret.setMetaData(new MetaDataFormat(new MatrixCharacteristics(
fb.getNumRows(), fb.getNumColumns()), FileFormat.BINARY));
ret.release();
return ret;
}

public List<MatrixBlock> getMatrixInputs(CPOperand[] inputs) {
return getMatrixInputs(inputs, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,14 +381,14 @@ public JavaPairRDD<?,?> getRDDHandleForMatrixObject( MatrixObject mo, FileFormat
rdd = mo.getRDDHandle().getRDD();
}
//CASE 2: dirty in memory data or cached result of rdd operations
else if( mo.isDirty() || mo.isCached(false) )
else if( mo.isDirty() || mo.isCached(false) || mo.isFederated() )
{
//get in-memory matrix block and parallelize it
//w/ guarded parallelize (fallback to export, rdd from file if too large)
DataCharacteristics dc = mo.getDataCharacteristics();
boolean fromFile = false;
if( !OptimizerUtils.checkSparkCollectMemoryBudget(dc, 0) || !_parRDDs.reserve(
OptimizerUtils.estimatePartitionedSizeExactSparsity(dc))) {
if( !mo.isFederated() && (!OptimizerUtils.checkSparkCollectMemoryBudget(dc, 0)
|| !_parRDDs.reserve(OptimizerUtils.estimatePartitionedSizeExactSparsity(dc)))) {
if( mo.isDirty() || !mo.isHDFSFileExists() ) //write if necessary
mo.exportData();
rdd = sc.hadoopFile( mo.getFileName(), inputInfo.inputFormatClass, inputInfo.keyClass, inputInfo.valueClass);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.sysds.common.Types;
import org.apache.sysds.conf.DMLConfig;
import org.apache.sysds.runtime.DMLRuntimeException;
import org.apache.sysds.runtime.controlprogram.federated.FederatedRequest.RequestType;

import java.net.InetSocketAddress;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -83,51 +84,15 @@ public boolean isInitialized() {
return _varID != -1;
}

public synchronized Future<FederatedResponse> initFederatedData() {
public synchronized Future<FederatedResponse> initFederatedData(long id) {
if(isInitialized())
throw new DMLRuntimeException("Tried to init already initialized data");
FederatedRequest.FedMethod fedMethod;
switch(_dataType) {
case MATRIX:
fedMethod = FederatedRequest.FedMethod.READ_MATRIX;
break;
case FRAME:
fedMethod = FederatedRequest.FedMethod.READ_FRAME;
break;
default:
throw new DMLRuntimeException("Federated datatype \"" + _dataType.toString() + "\" is not supported.");
}
FederatedRequest request = new FederatedRequest(fedMethod);
if(!_dataType.isMatrix() && !_dataType.isFrame())
throw new DMLRuntimeException("Federated datatype \"" + _dataType.toString() + "\" is not supported.");
_varID = id;
FederatedRequest request = new FederatedRequest(RequestType.READ_VAR, id);
request.appendParam(_filepath);
return executeFederatedOperation(request);
}

/**
* Executes an federated operation on a federated worker and default variable.
*
* @param request the requested operation
* @param withVarID true if we should add the default varID (initialized) or false if we should not
* @return the response
*/
public Future<FederatedResponse> executeFederatedOperation(FederatedRequest request, boolean withVarID) {
if (withVarID) {
if( !isInitialized() )
throw new DMLRuntimeException("Tried to execute federated operation on data non initialized federated data.");
return executeFederatedOperation(request, _varID);
}
return executeFederatedOperation(request);
}

/**
* Executes an federated operation on a federated worker.
*
* @param request the requested operation
* @param varID variable ID
* @return the response
*/
public Future<FederatedResponse> executeFederatedOperation(FederatedRequest request, long varID) {
request = request.deepClone();
request.appendParam(varID);
request.appendParam(_dataType.name());
return executeFederatedOperation(request);
}

Expand All @@ -137,7 +102,7 @@ public Future<FederatedResponse> executeFederatedOperation(FederatedRequest requ
* @param request the requested operation
* @return the response
*/
public synchronized Future<FederatedResponse> executeFederatedOperation(FederatedRequest request) {
public synchronized Future<FederatedResponse> executeFederatedOperation(FederatedRequest... request) {
// Careful with the number of threads. Each thread opens connections to multiple files making resulting in
// java.io.IOException: Too many open files
EventLoopGroup workerGroup = new NioEventLoopGroup(_nrThreads);
Expand All @@ -148,9 +113,9 @@ public synchronized Future<FederatedResponse> executeFederatedOperation(Federate
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast("ObjectDecoder",
new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingResolver(ClassLoader.getSystemClassLoader())))
.addLast("FederatedOperationHandler", handler)
.addLast("ObjectEncoder", new ObjectEncoder());
new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.weakCachingResolver(ClassLoader.getSystemClassLoader())))
.addLast("FederatedOperationHandler", handler)
.addLast("ObjectEncoder", new ObjectEncoder());
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,14 @@ public int compareTo(FederatedRange o) {
public String toString() {
return Arrays.toString(_beginDims) + " - " + Arrays.toString(_endDims);
}

public FederatedRange shift(long rshift, long cshift) {
//row shift
_beginDims[0] += rshift;
_endDims[0] += rshift;
//column shift
_beginDims[1] += cshift;
_endDims[1] += cshift;
return this;
}
}
Loading

0 comments on commit a4f992e

Please sign in to comment.