Skip to content

Commit

Permalink
[SYSTEMDS-3594] Multi-level reuse of RDDs
Browse files Browse the repository at this point in the history
This patch extends the multi-level reuse framework to support functions
and statement blocks returning RDDs. Similar to instruction-level reuse,
we persist the function outputs on the second call. Based on if the
original instruction is shuffle-based, we also reuse the function output
locally.

Closes #1858
  • Loading branch information
phaniarnab committed Jul 6, 2023
1 parent f0f8f0c commit f8acaab
Show file tree
Hide file tree
Showing 7 changed files with 191 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.sysds.runtime.instructions.spark.data;

import org.apache.spark.api.java.JavaPairRDD;
import org.apache.sysds.runtime.meta.DataCharacteristics;

public class RDDObject extends LineageObject
{
Expand All @@ -31,6 +32,7 @@ public class RDDObject extends LineageObject
private String _hdfsFname = null; //hdfs filename, if created from hdfs.
private boolean _parRDD = false; //is a parallelized rdd at driver
private boolean _pending = true; //is a pending rdd operation
private DataCharacteristics _dc = null;

public RDDObject( JavaPairRDD<?,?> rddvar) {
super();
Expand Down Expand Up @@ -84,6 +86,14 @@ public void setPending(boolean flag) {
public boolean isPending() {
return _pending;
}

public void setDataCharacteristics(DataCharacteristics dc) {
_dc = dc;
}

public DataCharacteristics getDataCharacteristics() {
return _dc;
}


/**
Expand Down
154 changes: 107 additions & 47 deletions src/main/java/org/apache/sysds/runtime/lineage/LineageCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.sysds.runtime.lineage;

import jcuda.Pointer;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.commons.lang3.tuple.Pair;
Expand Down Expand Up @@ -56,6 +57,7 @@
import org.apache.sysds.runtime.instructions.gpu.GPUInstruction;
import org.apache.sysds.runtime.instructions.gpu.context.GPUObject;
import org.apache.sysds.runtime.instructions.spark.ComputationSPInstruction;
import org.apache.sysds.runtime.instructions.spark.data.BroadcastObject;
import org.apache.sysds.runtime.instructions.spark.data.RDDObject;
import org.apache.sysds.runtime.lineage.LineageCacheConfig.LineageCacheStatus;
import org.apache.sysds.runtime.lineage.LineageCacheConfig.ReuseCacheType;
Expand Down Expand Up @@ -157,15 +159,11 @@ else if (e.isRDDPersist()) {
switch(e.getCacheStatus()) {
case TOPERSISTRDD:
//Mark for caching on the second hit
persistRDD(inst, e, ec);
//Update status to indicate persisted in the executors
e.setCacheStatus(LineageCacheStatus.PERSISTEDRDD);
//Even not persisted, reuse the rdd locally for shuffle operations
if (!LineageCacheConfig.isShuffleOp(inst))
boolean persisted = persistRDD(inst, e, ec);
//Return if not already persisted and not a shuffle operations
if (!persisted && !LineageCacheConfig.isShuffleOp(inst.getOpcode()))
return false;

((SparkExecutionContext) ec).setRDDHandleForVariable(outName, rdd);
break;
//Else, fall through to reuse (local or distributed)
case PERSISTEDRDD:
//Reuse the persisted intermediate at the executors
((SparkExecutionContext) ec).setRDDHandleForVariable(outName, rdd);
Expand All @@ -175,16 +173,19 @@ else if (e.isRDDPersist()) {
}
}
else { //TODO handle locks on gpu objects
Pointer gpuPtr = e.getGPUPointer();
if (gpuPtr == null && e.getCacheStatus() == LineageCacheStatus.NOTCACHED)
return false; //the executing thread removed this entry from cache
//Create a GPUObject with the cached pointer
GPUObject gpuObj = new GPUObject(ec.getGPUContext(0),
ec.getMatrixObject(outName), e.getGPUPointer());
ec.getMatrixObject(outName), gpuPtr);
ec.getMatrixObject(outName).setGPUObject(ec.getGPUContext(0), gpuObj);
//Set dirty to true, so that it is later copied to the host for write
ec.getMatrixObject(outName).getGPUObject(ec.getGPUContext(0)).setDirty(true);
//Set the cached data characteristics to the output matrix object
ec.getMatrixObject(outName).updateDataCharacteristics(e.getDataCharacteristics());
//Increment the live count for this pointer
LineageGPUCacheEviction.incrementLiveCount(e.getGPUPointer());
LineageGPUCacheEviction.incrementLiveCount(gpuPtr);
}
//Replace the live lineage trace with the cached one (if not parfor, dedup)
ec.replaceLineageItem(outName, e._key);
Expand All @@ -210,6 +211,7 @@ public static boolean reuse(List<String> outNames, List<DataIdentifier> outParam
long savedComputeTime = 0;
HashMap<String, Data> funcOutputs = new HashMap<>();
HashMap<String, LineageItem> funcLIs = new HashMap<>();
ArrayList<LineageCacheEntry> funcOutLIs = new ArrayList<>();
for (int i=0; i<numOutputs; i++) {
String opcode = name + String.valueOf(i+1);
LineageItem li = new LineageItem(opcode, liInputs);
Expand All @@ -220,6 +222,7 @@ public static boolean reuse(List<String> outNames, List<DataIdentifier> outParam
synchronized(_cache) {
if (LineageCache.probe(li)) {
e = LineageCache.getIntern(li);
funcOutLIs.add(e);
}
else {
//create a placeholder if no reuse to avoid redundancy
Expand All @@ -244,16 +247,25 @@ public static boolean reuse(List<String> outNames, List<DataIdentifier> outParam
((MatrixObject)boundValue).release();
}
else if (e.isGPUObject()) {
Pointer gpuPtr = e.getGPUPointer();
if (gpuPtr == null && e.getCacheStatus() == LineageCacheStatus.NOTCACHED)
return false; //the executing thread removed this entry from cache
MetaDataFormat md = new MetaDataFormat(e.getDataCharacteristics(), FileFormat.BINARY);
boundValue = new MatrixObject(ValueType.FP64, boundVarName, md);
//Create a GPUObject with the cached pointer
GPUObject gpuObj = new GPUObject(ec.getGPUContext(0),
((MatrixObject)boundValue), e.getGPUPointer());
((MatrixObject)boundValue), gpuPtr);
//Set dirty to true, so that it is later copied to the host for write
gpuObj.setDirty(true);
((MatrixObject) boundValue).setGPUObject(ec.getGPUContext(0), gpuObj);
//Increment the live count for this pointer
LineageGPUCacheEviction.incrementLiveCount(e.getGPUPointer());
}
else if (e.isRDDPersist()) {
RDDObject rdd = e.getRDDObject();
if (rdd == null && e.getCacheStatus() == LineageCacheStatus.NOTCACHED)
return false; //the executing thread removed this entry from cache
MetaDataFormat md = new MetaDataFormat(rdd.getDataCharacteristics(),FileFormat.BINARY);
boundValue = new MatrixObject(ValueType.FP64, boundVarName, md);
((MatrixObject) boundValue).setRDDHandle(rdd);
}
else if (e.isScalarValue()) {
boundValue = e.getSOValue();
Expand All @@ -277,6 +289,32 @@ else if (e.isScalarValue()) {
}

if (reuse) {
//Additional maintenance for GPU pointers and RDDs
for (LineageCacheEntry e : funcOutLIs) {
if (e.isGPUObject())
//Increment the live count for this pointer
LineageGPUCacheEviction.incrementLiveCount(e.getGPUPointer());
else if (e.isRDDPersist()) {
//Reuse the cached RDD (local or persisted at the executors)
RDDObject rdd = e.getRDDObject();
switch(e.getCacheStatus()) {
case TOPERSISTRDD:
//Mark for caching on the second hit
long estimatedSize = MatrixBlock.estimateSizeInMemory(rdd.getDataCharacteristics());
boolean persisted = persistRDD(e, estimatedSize);
//Return if not already persisted and not a shuffle operations
if (!persisted && !LineageCacheConfig.isShuffleOp(e._origItem.getOpcode()))
return false;
//Else, fall through to reuse (local or distributed)
case PERSISTEDRDD:
//Reuse the persisted intermediate at the executors
break;
default:
return false;
}
}
}

funcOutputs.forEach((var, val) -> {
//cleanup existing data bound to output variable name
Data exdata = ec.removeVariable(var);
Expand All @@ -291,7 +329,7 @@ else if (e.isScalarValue()) {
if (DMLScript.STATISTICS) //increment saved time
LineageCacheStatistics.incrementSavedComputeTime(savedComputeTime);
}

return reuse;
}

Expand Down Expand Up @@ -707,6 +745,7 @@ private static void putValueRDD(Instruction inst, LineageItem instLI, ExecutionC
RDDObject rddObj = cd.getRDDHandle();
// Set the RDD object in the cache and set the status to TOPERSISTRDD
rddObj.setLineageCached();
rddObj.setDataCharacteristics(cd.getDataCharacteristics());
centry.setRDDValue(rddObj, computetime);
}
}
Expand Down Expand Up @@ -1003,8 +1042,10 @@ private static void mvIntern(LineageItem item, LineageItem probeItem, long compu
// Add to missed compute time
LineageCacheStatistics.incrementMissedComputeTime(e._computeTime);

//maintain order for eviction
LineageCacheEviction.addEntry(e);
// Maintain order for eviction
if (!e.isRDDPersist() && !e.isGPUObject())
LineageCacheEviction.addEntry(e);
// TODO: Handling of func/SB cache entries for Spark and GPU
}
else
removePlaceholder(item); //remove the placeholder
Expand Down Expand Up @@ -1040,39 +1081,58 @@ private static boolean isMarkedForCaching (Instruction inst, ExecutionContext ec
return true;
}

private static void persistRDD(Instruction inst, LineageCacheEntry centry, ExecutionContext ec) {
boolean opToPersist = LineageCacheConfig.isReusableRDDType(inst);
// Return if the operation is not in the list of instructions which benefit
// from persisting and the local only RDD caching is disabled
if (!opToPersist && !LineageCacheConfig.ENABLE_LOCAL_ONLY_RDD_CACHING)
return;

if (opToPersist && centry.getCacheStatus() == LineageCacheStatus.TOPERSISTRDD) {
CacheableData<?> cd = ec.getCacheableData(((ComputationSPInstruction)inst).output.getName());
// Estimate worst case dense size
long estimatedSize = MatrixBlock.estimateSizeInMemory(cd.getDataCharacteristics());
// Skip if the entry is bigger than the total storage.
if (estimatedSize > LineageSparkCacheEviction.getSparkStorageLimit())
return;

// Mark the rdd for lazy checkpointing
RDDObject rddObj = centry.getRDDObject();
JavaPairRDD<?,?> rdd = rddObj.getRDD();
rdd = rdd.persist(StorageLevel.MEMORY_AND_DISK());
rddObj.setRDD(rdd);
rddObj.setCheckpointRDD(true);

// Make space based on the estimated size
if(!LineageSparkCacheEviction.isBelowThreshold(estimatedSize))
LineageSparkCacheEviction.makeSpace(_cache, estimatedSize);
LineageSparkCacheEviction.updateSize(estimatedSize, true);
// Maintain order for eviction
LineageSparkCacheEviction.addEntry(centry, estimatedSize);
private static boolean persistRDD(Instruction inst, LineageCacheEntry centry, ExecutionContext ec) {
// If already persisted, change the status and return true.
// Else, persist, change cache status and return false.
if (probeRDDDistributed(centry._key)) {
// Update status to indicate persisted in the executors
centry.setCacheStatus(LineageCacheStatus.PERSISTEDRDD);
return true;
}
CacheableData<?> cd = ec.getCacheableData(((ComputationSPInstruction)inst).output.getName());
// Estimate worst case dense size
long estimatedSize = MatrixBlock.estimateSizeInMemory(cd.getDataCharacteristics());
// Skip if the entry is bigger than the total storage.
if (estimatedSize > LineageSparkCacheEviction.getSparkStorageLimit())
return false;
// Mark for distributed caching and change status
persistRDDIntern(centry, estimatedSize);
centry.setCacheStatus(LineageCacheStatus.PERSISTEDRDD);
return false;
}

// Count number of RDDs marked for caching at the executors
if (DMLScript.STATISTICS)
LineageCacheStatistics.incrementRDDPersists();
private static boolean persistRDD(LineageCacheEntry centry, long estimatedSize) {
// If already persisted, change the status and return true.
// Else, persist, change cache status and return false.
if (probeRDDDistributed(centry._key)) {
// Update status to indicate persisted in the executors
centry.setCacheStatus(LineageCacheStatus.PERSISTEDRDD);
return true;
}
// Mark for distributed caching and change status
persistRDDIntern(centry, estimatedSize);
centry.setCacheStatus(LineageCacheStatus.PERSISTEDRDD);
return false;
}

private static void persistRDDIntern(LineageCacheEntry centry, long estimatedSize) {
// Mark the rdd for lazy checkpointing
RDDObject rddObj = centry.getRDDObject();
JavaPairRDD<?,?> rdd = rddObj.getRDD();
rdd = rdd.persist(StorageLevel.MEMORY_AND_DISK());
rddObj.setRDD(rdd);
rddObj.setCheckpointRDD(true);

// Make space based on the estimated size
if(!LineageSparkCacheEviction.isBelowThreshold(estimatedSize))
LineageSparkCacheEviction.makeSpace(_cache, estimatedSize);
LineageSparkCacheEviction.updateSize(estimatedSize, true);
// Maintain order for eviction
LineageSparkCacheEviction.addEntry(centry, estimatedSize);

// Count number of RDDs marked for caching at the executors
if (DMLScript.STATISTICS)
LineageCacheStatistics.incrementRDDPersists();
}

@Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class LineageCacheConfig

// Relatively inexpensive instructions.
private static final String[] PERSIST_OPCODES2 = new String[] {
"mapmm,"
"mapmm"
};

private static String[] REUSE_OPCODES = new String[] {};
Expand Down Expand Up @@ -300,8 +300,8 @@ protected static boolean isReusableRDDType(Instruction inst) {
return insttype && rightOp;
}

protected static boolean isShuffleOp(Instruction inst) {
return ArrayUtils.contains(PERSIST_OPCODES1, inst.getOpcode());
protected static boolean isShuffleOp(String opcode) {
return ArrayUtils.contains(PERSIST_OPCODES1, opcode);
}

protected static int getComputeGroup(String opcode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ public synchronized void copyValueFrom(LineageCacheEntry src, long computetime)
_gpuPointer = src._gpuPointer;
_rddObject = src._rddObject;
_computeTime = src._computeTime;
_status = isNullVal() ? LineageCacheStatus.EMPTY : LineageCacheStatus.CACHED;
_status = src._status; //requires for multi-level reuse of RDDs
// resume all threads waiting for val
notifyAll();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,11 @@ private static void removeEntry(Map<LineageItem, LineageCacheEntry> cache, Linea

private static void setSparkStorageLimit() {
// Set the limit only during the first RDD caching to avoid context creation
if (SPARK_STORAGE_LIMIT == 0)
SPARK_STORAGE_LIMIT = (long) SparkExecutionContext.getDataMemoryBudget(false, true); //FIXME
// Cache size = 70% of unified Spark memory = 0.7 * 0.6 = 42%.
if (SPARK_STORAGE_LIMIT == 0) {
long unifiedSparkMem = (long) SparkExecutionContext.getDataMemoryBudget(false, true);
SPARK_STORAGE_LIMIT = (long)(unifiedSparkMem * 0.7d);
}
}

protected static double getSparkStorageLimit() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class LineageReuseSparkTest extends AutomatedTestBase {

protected static final String TEST_DIR = "functions/async/";
protected static final String TEST_NAME = "LineageReuseSpark";
protected static final int TEST_VARIANTS = 3;
protected static final int TEST_VARIANTS = 4;
protected static String TEST_CLASS_DIR = TEST_DIR + LineageReuseSparkTest.class.getSimpleName() + "/";

@Override
Expand Down Expand Up @@ -73,6 +73,12 @@ public void testL2svm() {
runTest(TEST_NAME+"3", ExecMode.SPARK, 3);
}

@Test
public void testlmdsMultiLevel() {
// Cache RDD and matrix block function returns and reuse
runTest(TEST_NAME+"4", ExecMode.HYBRID, 4);
}

public void runTest(String testname, ExecMode execMode, int testId) {
boolean old_simplification = OptimizerUtils.ALLOW_ALGEBRAIC_SIMPLIFICATION;
boolean old_sum_product = OptimizerUtils.ALLOW_SUM_PRODUCT_REWRITES;
Expand All @@ -92,7 +98,6 @@ public void runTest(String testname, ExecMode execMode, int testId) {

//proArgs.add("-explain");
proArgs.add("-stats");
proArgs.add("-explain");
proArgs.add("-args");
proArgs.add(output("R"));
programArgs = proArgs.toArray(new String[proArgs.size()]);
Expand All @@ -109,7 +114,7 @@ public void runTest(String testname, ExecMode execMode, int testId) {
//proArgs.add("recompile_runtime");
proArgs.add("-stats");
proArgs.add("-lineage");
proArgs.add(LineageCacheConfig.ReuseCacheType.REUSE_FULL.name().toLowerCase());
proArgs.add(LineageCacheConfig.ReuseCacheType.REUSE_MULTILEVEL.name().toLowerCase());
proArgs.add("-args");
proArgs.add(output("R"));
programArgs = proArgs.toArray(new String[proArgs.size()]);
Expand Down
Loading

0 comments on commit f8acaab

Please sign in to comment.