diff --git a/README.md b/README.md index ca92552..89ca5a4 100644 --- a/README.md +++ b/README.md @@ -51,4 +51,5 @@ If you are ready for more, dive in: * [Derivers Guide](docs/derivers.adoc) - detailed information on each provided deriver, and how to write custom derivers * [Planners Guide](docs/planners.adoc) - directions and details on when, why, and how to use planners and associated outputs * [Looping Guide](docs/looping.adoc) - information and an example for defining loops in an Envelope pipeline +* [Decisions Guide](docs/decisions.adoc) - information on using decisions to dynamically choose which parts of the pipeline to run * [Contributing to Envelope](docs/contributing.adoc) - guidelines and best practices for both developing and sharing Envelope components and applications diff --git a/docs/configurations.adoc b/docs/configurations.adoc index aee2c42..89e8861 100644 --- a/docs/configurations.adoc +++ b/docs/configurations.adoc @@ -91,7 +91,7 @@ Step configurations have the `steps.[stepname].` prefix. All steps can have the |Configuration suffix|Description |type -|The step type. Envelope supports `data` and `loop`. Default `data`. +|The step type. Envelope supports `data`, `loop`, and `decision`. Default `data`. |dependencies |The list of step names that Envelope will submit before submitting this step. @@ -100,7 +100,7 @@ Step configurations have the `steps.[stepname].` prefix. All steps can have the === Data steps -Data steps can additionally have the below configurations. +Data steps can, additionally to the step configurations, have the below configurations. [cols="2,8", options="header"] |=== @@ -125,7 +125,7 @@ Data steps can additionally have the below configurations. === Loop steps -Loop steps can additionally have the below configurations. +Loop steps can, additionally to the step configurations, have the below configurations. For more information on loop steps see the link:looping.adoc[looping guide]. [cols="2,8", options="header"] |=== @@ -154,9 +154,34 @@ Loop steps can additionally have the below configurations. |=== +=== Decision steps + +Decision steps can, additionally to the step configurations, have the below configurations. For more information on decision steps see the link:decisions.adoc[decisions guide]. + +[cols="2,8", options="header"] +|=== +|Configuration suffix|Description + +|if-true-steps +|Required. The list of dependent step names that will be kept if the decision result is true. The steps listed must directly depend on the decision step. The remaining directly dependent steps of the decision step will be kept if the decision result is false. Any steps subsequently dependent on the removed steps will also be removed. + +|method +|Required. The method by which the decision step will make the decision. Envelope supports `literal`, `step_by_key`, `step_by_value`. + +|result +|Required if `method` is `literal`. The true or false result for the decision. + +|step +|Required if `method` is `step_by_key` or `step_by_value`. The name of the previous step from which to extract the decision result. + +|key +|Required if `method` is `step_by_key`. The specific key of the previous step to look up the boolean result by. + +|=== + === Inputs -Input configurations belong to data steps, and have the `steps.[stepname].input.` prefix. +Input configurations belong to data steps, and have the `steps.[stepname].input.` prefix. For more information on inputs see the link:inputs.adoc[inputs guide]. [cols="2,8", options="header"] |=== @@ -442,7 +467,7 @@ Translator configurations belong to data steps, and have the `steps.[stepname].i === Derivers -Deriver configurations belong to data steps, and have the `steps.[stepname].deriver.` prefix. +Deriver configurations belong to data steps, and have the `steps.[stepname].deriver.` prefix. For more information on derivers see the link:derivers.adoc[derivers guide]. [cols="2,8", options="header"] |=== @@ -629,7 +654,7 @@ Partitioner configurations belong to data steps, and have the `steps.[stepname]. === Planners -Planner configurations belong to data steps, and have the `steps.[stepname].planner.` prefix. +Planner configurations belong to data steps, and have the `steps.[stepname].planner.` prefix. For more information on planners see the link:planners.adoc[planners guide]. [cols="2,8", options="header"] |=== @@ -952,6 +977,8 @@ cell sizes you may want to reduce this number or increase the relevant client bu === Repetitions +For more information on repetitions see the link:repetitions.adoc[repetitions guide]. + The general configuration parameters for repetitions are: [cols="2,8a", options="header"] diff --git a/docs/decisions.adoc b/docs/decisions.adoc new file mode 100644 index 0000000..156362f --- /dev/null +++ b/docs/decisions.adoc @@ -0,0 +1,189 @@ += Decisions guide + +Envelope provides the ability for a pipeline to make a decision that will determine which steps will be run. This is achieved by including a decision step that, when itself is run, decides which subsequent steps of the pipeline to remove and which ones to keep. + +A decision step makes a decision that returns a true or false result. The `if-true-steps` configuration of a decision step specifies which of its dependent steps will be kept if the result is true. The remaining dependent steps will be kept if the result is false. + +A decision step can make a decision using one of three methods, which are outlined with examples in the section below. + +== Decision methods + +=== Literal + +The `literal` decision method takes the true or false result directly from the configuration of the decision step. This method would be useful if the result is provided by a parameter, which in turn can be populated by a `spark2-submit` argument or an environment variable. + +In this self-contained example the value of the `${result}` parameter will determine whether `run_if_true` and `run_after_run_if_true`, or `run_if_false` and `run_after_run_if_false`, are run: + +---- +application.name = Decision step by literal +steps { + decide { + type = decision + if-true-steps = [run_if_true] + method = literal + result = ${result} + } + run_if_true { + dependencies = [decide] + deriver { + type = sql + query.literal = "SELECT true" + } + print.data.enabled = true + } + run_after_run_if_true { + dependencies = [run_if_true] + deriver { + type = sql + query.literal = "SELECT 'No, really, it was true!'" + } + print.data.enabled = true + } + run_if_false { + dependencies = [decide] + deriver { + type = sql + query.literal = "SELECT false" + } + print.data.enabled = true + } + run_after_run_if_false { + dependencies = [run_if_false] + deriver { + type = sql + query.literal = "SELECT 'No, really, it was false!'" + } + print.data.enabled = true + } +} +---- + +This pipeline could be run with `${result}` populated by using an argument after the configuration file: + + spark2-submit envelope-*.jar pipeline.conf result=true + +=== Step by key + +The `step_by_key` decision method takes the result from the data of a previous step, where the result is looked up in that data by a specific key. This method would be useful for making decisions on data quality results that provide a true or false result for each dataset-scoped check. + +The data of the step must contain only two columns: first a string (the key), and second a boolean (the result). + +In this self-contained example the corresponding value of the `test1` key in the `generate` step will determine whether `run_if_true` and `run_after_run_if_true`, or `run_if_false` and `run_after_run_if_false`, are run: + +---- +application.name = Decision step by step by key +steps { + generate { + deriver { + type = sql + query.literal = "SELECT 'test1', true UNION ALL SELECT 'test2', false" + } + } + decide { + dependencies = [generate] + type = decision + if.true.steps = [run_if_true] + decision.method = step_by_key + step = generate + key = test1 + } + run_if_true { + dependencies = [decide] + deriver { + type = sql + query.literal = "SELECT true" + } + print.data.enabled = true + } + run_after_true { + dependencies = [run_if_true] + deriver { + type = sql + query.literal = "SELECT 'No, really, it was true!'" + } + print.data.enabled = true + } + run_if_false { + dependencies = [decide] + deriver { + type = sql + query.literal = "SELECT false" + } + print.data.enabled = true + } + run_after_false { + dependencies = [run_if_false] + deriver { + type = sql + query.literal = "SELECT 'No, really, it was false!'" + } + print.data.enabled = true + } +} +---- + +=== Step by value + +The `step_by_value` decision method takes the result from the single boolean value of a previous step. This method would be useful when a previous step has a deriver that aggregates into a single result. + +The data of the step must contain a single boolean column and only a single row. + +In this self-contained example the sole value of `aggregate` step will determine whether `run_if_true` and `run_after_run_if_true`, or `run_if_false` and `run_after_run_if_false`, are run: + +---- +application.name = Decision step by step by value +steps { + generate { + deriver { + type = sql + query.literal = "SELECT 'test1' AS key, true AS result UNION ALL SELECT 'test2' AS key, false AS result" + } + } + aggregate { + deriver { + type = sql + query.literal = "SELECT MIN(result) = true AS result FROM generate" + } + } + decide { + dependencies = [aggregate] + type = decision + if.true.steps = [run_if_true] + decision.method = step_by_key + step = generate + key = test1 + } + run_if_true { + dependencies = [decide] + deriver { + type = sql + query.literal = "SELECT true" + } + print.data.enabled = true + } + run_after_true { + dependencies = [run_if_true] + deriver { + type = sql + query.literal = "SELECT 'No, really, it was true!'" + } + print.data.enabled = true + } + run_if_false { + dependencies = [decide] + deriver { + type = sql + query.literal = "SELECT false" + } + print.data.enabled = true + } + run_after_false { + dependencies = [run_if_false] + deriver { + type = sql + query.literal = "SELECT 'No, really, it was false!'" + } + print.data.enabled = true + } +} +---- diff --git a/src/main/java/com/cloudera/labs/envelope/run/DecisionStep.java b/src/main/java/com/cloudera/labs/envelope/run/DecisionStep.java new file mode 100644 index 0000000..1f0f40a --- /dev/null +++ b/src/main/java/com/cloudera/labs/envelope/run/DecisionStep.java @@ -0,0 +1,207 @@ +/** + * Copyright © 2016-2017 Cloudera, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cloudera.labs.envelope.run; + +import java.util.List; +import java.util.Set; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.types.DataTypes; + +import com.cloudera.labs.envelope.utils.ConfigUtils; +import com.cloudera.labs.envelope.utils.StepUtils; +import com.google.common.base.Optional; +import com.google.common.collect.Sets; +import com.typesafe.config.Config; + +public class DecisionStep extends RefactorStep { + + public static final String IF_TRUE_STEP_NAMES_PROPERTY = "if-true-steps"; + public static final String DECISION_METHOD_PROPERTY = "method"; + public static final String LITERAL_DECISION_METHOD = "literal"; + public static final String LITERAL_RESULT_PROPERTY = "result"; + public static final String STEP_BY_KEY_DECISION_METHOD = "step_by_key"; + public static final String STEP_BY_KEY_STEP_PROPERTY = "step"; + public static final String STEP_BY_KEY_KEY_PROPERTY = "key"; + public static final String STEP_BY_VALUE_DECISION_METHOD = "step_by_value"; + public static final String STEP_BY_VALUE_STEP_PROPERTY = "step"; + + private enum DecisionMethod { + STEP_BY_VALUE, + STEP_BY_KEY, + LITERAL + } + + private List ifTrueStepNames; + private DecisionMethod decisionMethod; + private boolean literalResult; + private String stepByKeyStepName; + private String stepByKeyKey; + private String stepByValueStepName; + + public DecisionStep(String name, Config config) { + super(name, config); + + ConfigUtils.assertConfig(config, IF_TRUE_STEP_NAMES_PROPERTY); + this.ifTrueStepNames = config.getStringList(IF_TRUE_STEP_NAMES_PROPERTY); + + ConfigUtils.assertConfig(config, DECISION_METHOD_PROPERTY); + try { + this.decisionMethod = DecisionMethod.valueOf(config.getString(DECISION_METHOD_PROPERTY).toUpperCase()); + } + catch (IllegalArgumentException e) { + throw new RuntimeException("Unsupported decision method: " + config.getString(DECISION_METHOD_PROPERTY)); + } + + switch (decisionMethod) { + case LITERAL: + ConfigUtils.assertConfig(config, LITERAL_RESULT_PROPERTY); + this.literalResult = config.getBoolean(LITERAL_RESULT_PROPERTY); + break; + case STEP_BY_KEY: + ConfigUtils.assertConfig(config, STEP_BY_KEY_STEP_PROPERTY); + this.stepByKeyStepName = config.getString(STEP_BY_KEY_STEP_PROPERTY); + ConfigUtils.assertConfig(config, STEP_BY_KEY_KEY_PROPERTY); + this.stepByKeyKey = config.getString(STEP_BY_KEY_KEY_PROPERTY); + break; + case STEP_BY_VALUE: + ConfigUtils.assertConfig(config, STEP_BY_VALUE_STEP_PROPERTY); + this.stepByValueStepName = config.getString(STEP_BY_VALUE_STEP_PROPERTY); + break; + } + } + + // Envelope runs decision steps by pruning out the steps of the pipeline that can not be + // submitted as a result of a decision. This allows multiple sub-graphs to depend on the decision + // step but only a subset of them to continue based on the decision result. + // The configuration of a decision step defines how to make a boolean decision + // (i.e. true or false), and then which of the immediately dependent steps to allow to run if the + // decision result is true. Inversely, only the remaining immediately dependent steps will be + // allowed to run if the decision result is false. Subsequent steps that can never be submitted + // as a result of the pruning of immediately dependent steps are also pruned. + @Override + public Set refactor(Set steps) { + Set decisionDependentSteps = StepUtils.getImmediateDependentSteps(this, steps); + Set pruneSteps = getPruneSteps(decisionDependentSteps, steps); + + steps.removeAll(pruneSteps); + + this.setSubmitted(true); + + return steps; + } + + private Set getPruneSteps(Set decideSteps, Set allSteps) { + Set pruneSteps = Sets.newHashSet(); + + boolean decision = evaluateDecision(allSteps); + + for (Step decideStep : decideSteps) { + if (decision != this.ifTrueStepNames.contains(decideStep.getName())) { + pruneSteps.add(decideStep); + pruneSteps.addAll(StepUtils.getAllDependentSteps(decideStep, allSteps)); + } + } + + return pruneSteps; + } + + private boolean evaluateDecision(Set steps) { + switch (decisionMethod) { + case LITERAL: + return evaluateLiteralDecision(); + case STEP_BY_KEY: + return evaluateStepByKeyDecision(steps); + case STEP_BY_VALUE: + return evaluateStepByValueDecision(steps); + default: + throw new RuntimeException("Decision step's decision method was not initialized"); + } + } + + private boolean evaluateLiteralDecision() { + return literalResult; + } + + private boolean evaluateStepByKeyDecision(Set steps) { + Optional optionalStep = StepUtils.getStepForName(stepByKeyStepName, steps); + + if (!optionalStep.isPresent()) { + throw new RuntimeException("Unknown decision step's key step: " + stepByValueStepName); + } + + if (!(optionalStep.get() instanceof DataStep)) { + throw new RuntimeException("Decision step's key step is not a data step: " + optionalStep.get().getName()); + } + + Dataset keyDataset = ((DataStep)optionalStep.get()).getData(); + + if (keyDataset.schema().fields().length != 2 || + keyDataset.schema().fields()[0].dataType() != DataTypes.StringType || + keyDataset.schema().fields()[1].dataType() != DataTypes.BooleanType) + { + throw new RuntimeException("Decision step's key step must contain a string column and then a boolean column"); + } + + String keyColumnName = keyDataset.schema().fieldNames()[0]; + String whereClause = keyColumnName + " = '" + stepByKeyKey + "'"; + Dataset decisionDataset = keyDataset.where(whereClause); + + if (decisionDataset.count() != 1) { + throw new RuntimeException("Decision step's key step must contain a single record for the given key"); + } + + boolean decision = decisionDataset.collectAsList().get(0).getBoolean(1); + + return decision; + } + + private boolean evaluateStepByValueDecision(Set steps) { + Optional optionalStep = StepUtils.getStepForName(stepByValueStepName, steps); + + if (!optionalStep.isPresent()) { + throw new RuntimeException("Unknown decision step's value step: " + stepByValueStepName); + } + + if (!(optionalStep.get() instanceof DataStep)) { + throw new RuntimeException("Decision step's value step is not a data step: " + optionalStep.get().getName()); + } + + Dataset valueDataset = ((DataStep)optionalStep.get()).getData(); + + if (valueDataset.schema().fields().length != 1 || + valueDataset.schema().fields()[0].dataType() != DataTypes.BooleanType || + valueDataset.count() != 1) + { + throw new RuntimeException("Decision step's value step must contain a single boolean column with a single row"); + } + + boolean decision = valueDataset.collectAsList().get(0).getBoolean(0); + + return decision; + } + + @Override + public Step copy() { + Step copy = new DecisionStep(this.getName(), this.getConfig()); + + copy.setSubmitted(hasSubmitted()); + + return copy; + } + +} diff --git a/src/main/java/com/cloudera/labs/envelope/run/LoopStep.java b/src/main/java/com/cloudera/labs/envelope/run/LoopStep.java index c38d17f..588954d 100644 --- a/src/main/java/com/cloudera/labs/envelope/run/LoopStep.java +++ b/src/main/java/com/cloudera/labs/envelope/run/LoopStep.java @@ -33,7 +33,7 @@ import com.google.common.collect.Sets; import com.typesafe.config.Config; -public class LoopStep extends Step { +public class LoopStep extends RefactorStep { public static final String MODE_PROPERTY = "mode"; public static final String MODE_SERIAL = "serial"; @@ -57,7 +57,8 @@ public LoopStep(String name, Config config) { // Envelope runs loops by unrolling the loop when the loop step is run. This means // that the iterations of the loop must be known when the loop step runs, either from // static values in the configuration or from dynamic values provided by previous steps. - public Set unrollLoop(Set steps) { + @Override + public Set refactor(Set steps) { // The values that the loop iterates over List values = getValues(steps); diff --git a/src/main/java/com/cloudera/labs/envelope/run/RefactorStep.java b/src/main/java/com/cloudera/labs/envelope/run/RefactorStep.java new file mode 100644 index 0000000..66e2468 --- /dev/null +++ b/src/main/java/com/cloudera/labs/envelope/run/RefactorStep.java @@ -0,0 +1,30 @@ +/** + * Copyright © 2016-2017 Cloudera, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cloudera.labs.envelope.run; + +import java.util.Set; + +import com.typesafe.config.Config; + +public abstract class RefactorStep extends Step { + + public RefactorStep(String name, Config config) { + super(name, config); + } + + public abstract Set refactor(Set steps); + +} diff --git a/src/main/java/com/cloudera/labs/envelope/run/Runner.java b/src/main/java/com/cloudera/labs/envelope/run/Runner.java index 2accc1f..a857026 100644 --- a/src/main/java/com/cloudera/labs/envelope/run/Runner.java +++ b/src/main/java/com/cloudera/labs/envelope/run/Runner.java @@ -35,7 +35,6 @@ import com.cloudera.labs.envelope.input.Input; import com.cloudera.labs.envelope.input.InputFactory; import com.cloudera.labs.envelope.input.StreamInput; -import com.cloudera.labs.envelope.repetition.Repetitions; import com.cloudera.labs.envelope.spark.AccumulatorRequest; import com.cloudera.labs.envelope.spark.Accumulators; import com.cloudera.labs.envelope.spark.Contexts; @@ -56,6 +55,10 @@ @SuppressWarnings("serial") public class Runner { + public static final String TYPE_PROPERTY = "type"; + public static final String DATA_TYPE = "data"; + public static final String LOOP_TYPE = "loop"; + public static final String DECISION_TYPE = "decision"; public static final String PIPELINE_THREADS_PROPERTY = "application.pipeline.threads"; private static ExecutorService threadPool; @@ -105,7 +108,7 @@ private static Set extractSteps(Config config) throws Exception { Step step; - if (!stepConfig.hasPath("type") || stepConfig.getString("type").equals("data")) { + if (!stepConfig.hasPath(TYPE_PROPERTY) || stepConfig.getString(TYPE_PROPERTY).equals(DATA_TYPE)) { if (stepConfig.hasPath("input")) { Config stepInputConfig = stepConfig.getConfig("input"); Input stepInput = InputFactory.create(stepInputConfig); @@ -127,12 +130,16 @@ else if (stepInput instanceof StreamInput) { step = new BatchStep(stepName, stepConfig); } } - else if (stepConfig.getString("type").equals("loop")) { + else if (stepConfig.getString(TYPE_PROPERTY).equals(LOOP_TYPE)) { LOG.debug("Adding loop step: " + stepName); step = new LoopStep(stepName, stepConfig); } + else if (stepConfig.getString(TYPE_PROPERTY).equals(DECISION_TYPE)) { + LOG.debug("Adding decision step: " + stepName); + step = new DecisionStep(stepName, stepConfig); + } else { - throw new RuntimeException("Unknown step type: " + stepConfig.getString("type")); + throw new RuntimeException("Unknown step type: " + stepConfig.getString(TYPE_PROPERTY)); } LOG.debug("With configuration: " + stepConfig); @@ -227,14 +234,14 @@ private static void runBatch(Set steps) throws Exception { final Set dependencies = StepUtils.getDependencies(step, steps); if (StepUtils.allStepsSubmitted(dependencies)) { - LOG.debug("Step dependencies have finished, running step off main thread"); + LOG.debug("Step dependencies have been submitted, running step off main thread"); // Batch steps are run off the main thread so that if they contain outputs they will // not block the parallel execution of independent steps. Future offMainThreadStep = runStepOffMainThread(batchStep, dependencies, threadPool); offMainThreadSteps.add(offMainThreadStep); } else { - LOG.debug("Step dependencies have not finished"); + LOG.debug("Step dependencies have not been submitted"); } } else { @@ -244,22 +251,20 @@ private static void runBatch(Set steps) throws Exception { else if (step instanceof StreamingStep) { LOG.debug("Step is streaming"); } - else if (step instanceof LoopStep) { - LOG.debug("Step is a loop"); + else if (step instanceof RefactorStep) { + LOG.debug("Step is a refactor step"); - LoopStep loopStep = (LoopStep)step; + RefactorStep refactorStep = (RefactorStep)step; - if (!loopStep.hasSubmitted()) { + if (!refactorStep.hasSubmitted()) { LOG.debug("Step has not been submitted"); final Set dependencies = StepUtils.getDependencies(step, steps); if (StepUtils.allStepsSubmitted(dependencies)) { - LOG.debug("Step dependencies have finished, unrolling loop"); - refactoredSteps = loopStep.unrollLoop(steps); - LOG.debug("Loop unrolled"); - // We can't mutate the steps while we are iterating over them, so we break out - // of the for-loop to then replace the steps with the loop step unrolled. + LOG.debug("Step dependencies have submitted, refactoring steps"); + refactoredSteps = refactorStep.refactor(steps); + LOG.debug("Steps refactored"); break; } else { diff --git a/src/test/java/com/cloudera/labs/envelope/run/TestDecisionStep.java b/src/test/java/com/cloudera/labs/envelope/run/TestDecisionStep.java new file mode 100644 index 0000000..d604d44 --- /dev/null +++ b/src/test/java/com/cloudera/labs/envelope/run/TestDecisionStep.java @@ -0,0 +1,238 @@ +/** + * Copyright © 2016-2017 Cloudera, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.cloudera.labs.envelope.run; + +import static org.junit.Assert.assertEquals; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; +import org.junit.Before; +import org.junit.Test; + +import com.cloudera.labs.envelope.spark.Contexts; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +public class TestDecisionStep { + + BatchStep step1, step3, step4, step5, step6, step7, step8; + Set steps = Sets.newHashSet(); + + @Before + public void initializeSteps() { + /* + * Batch -> Decision -true> Batch -> Batch + * -false> Batch -> Batch + * -true> Batch -> Batch + */ + + Map step1ConfigMap = Maps.newHashMap(); + step1ConfigMap.put("dependencies", Lists.newArrayList()); + step1ConfigMap.put(DataStep.CACHE_PROPERTY, false); + Config step1Config = ConfigFactory.parseMap(step1ConfigMap); + step1 = new BatchStep("step1", step1Config); + steps.add(step1); + + // step2 is the decision step added in the tests + + Map step3ConfigMap = Maps.newHashMap(); + step3ConfigMap.put("dependencies", Lists.newArrayList("step2")); + Config step3Config = ConfigFactory.parseMap(step3ConfigMap); + step3 = new BatchStep("step3", step3Config); + steps.add(step3); + + Map step4ConfigMap = Maps.newHashMap(); + step4ConfigMap.put("dependencies", Lists.newArrayList("step3")); + Config step4Config = ConfigFactory.parseMap(step4ConfigMap); + step4 = new BatchStep("step4", step4Config); + steps.add(step4); + + Map step5ConfigMap = Maps.newHashMap(); + step5ConfigMap.put("dependencies", Lists.newArrayList("step2")); + Config step5Config = ConfigFactory.parseMap(step5ConfigMap); + step5 = new BatchStep("step5", step5Config); + steps.add(step5); + + Map step6ConfigMap = Maps.newHashMap(); + step6ConfigMap.put("dependencies", Lists.newArrayList("step5")); + Config step6Config = ConfigFactory.parseMap(step6ConfigMap); + step6 = new BatchStep("step6", step6Config); + steps.add(step6); + + Map step7ConfigMap = Maps.newHashMap(); + step7ConfigMap.put("dependencies", Lists.newArrayList("step2")); + Config step7Config = ConfigFactory.parseMap(step7ConfigMap); + step7 = new BatchStep("step7", step7Config); + steps.add(step7); + + Map step8ConfigMap = Maps.newHashMap(); + step8ConfigMap.put("dependencies", Lists.newArrayList("step7")); + Config step8Config = ConfigFactory.parseMap(step8ConfigMap); + step8 = new BatchStep("step8", step8Config); + steps.add(step8); + } + + @Test + public void testPruneByLiteralTrue() { + Map step2ConfigMap = Maps.newHashMap(); + step2ConfigMap.put("dependencies", Lists.newArrayList("step1")); + step2ConfigMap.put(DecisionStep.IF_TRUE_STEP_NAMES_PROPERTY, Lists.newArrayList("step3", "step7")); + step2ConfigMap.put(DecisionStep.DECISION_METHOD_PROPERTY, DecisionStep.LITERAL_DECISION_METHOD); + step2ConfigMap.put(DecisionStep.LITERAL_RESULT_PROPERTY, true); + Config step2Config = ConfigFactory.parseMap(step2ConfigMap); + RefactorStep step2 = new DecisionStep("step2", step2Config); + steps.add(step2); + + Set refactored = step2.refactor(steps); + + assertEquals(refactored, Sets.newHashSet(step1, step2, step3, step4, step7, step8)); + } + + @Test + public void testPruneByLiteralFalse() { + Map step2ConfigMap = Maps.newHashMap(); + step2ConfigMap.put("dependencies", Lists.newArrayList("step1")); + step2ConfigMap.put(DecisionStep.IF_TRUE_STEP_NAMES_PROPERTY, Lists.newArrayList("step3", "step7")); + step2ConfigMap.put(DecisionStep.DECISION_METHOD_PROPERTY, DecisionStep.LITERAL_DECISION_METHOD); + step2ConfigMap.put(DecisionStep.LITERAL_RESULT_PROPERTY, false); + Config step2Config = ConfigFactory.parseMap(step2ConfigMap); + RefactorStep step2 = new DecisionStep("step2", step2Config); + steps.add(step2); + + Set refactored = step2.refactor(steps); + + assertEquals(refactored, Sets.newHashSet(step1, step2, step5, step6)); + } + + @Test + public void testPruneByStepKeyTrue() { + StructType schema = new StructType(new StructField[] { + new StructField("name", DataTypes.StringType, false, Metadata.empty()), + new StructField("result", DataTypes.BooleanType, false, Metadata.empty()) + }); + List rows = Lists.newArrayList( + RowFactory.create("namecheck", false), + RowFactory.create("agerange", true) + ); + Dataset ds = Contexts.getSparkSession().createDataFrame(rows, schema); + step1.setData(ds); + + Map step2ConfigMap = Maps.newHashMap(); + step2ConfigMap.put("dependencies", Lists.newArrayList("step1")); + step2ConfigMap.put(DecisionStep.IF_TRUE_STEP_NAMES_PROPERTY, Lists.newArrayList("step3", "step7")); + step2ConfigMap.put(DecisionStep.DECISION_METHOD_PROPERTY, DecisionStep.STEP_BY_KEY_DECISION_METHOD); + step2ConfigMap.put(DecisionStep.STEP_BY_KEY_STEP_PROPERTY, "step1"); + step2ConfigMap.put(DecisionStep.STEP_BY_KEY_KEY_PROPERTY, "agerange"); + Config step2Config = ConfigFactory.parseMap(step2ConfigMap); + RefactorStep step2 = new DecisionStep("step2", step2Config); + steps.add(step2); + + Set refactored = step2.refactor(steps); + + assertEquals(refactored, Sets.newHashSet(step1, step2, step3, step4, step7, step8)); + } + + @Test + public void testPruneByStepKeyFalse() { + StructType schema = new StructType(new StructField[] { + new StructField("name", DataTypes.StringType, false, Metadata.empty()), + new StructField("result", DataTypes.BooleanType, false, Metadata.empty()) + }); + List rows = Lists.newArrayList( + RowFactory.create("namecheck", false), + RowFactory.create("agerange", true) + ); + Dataset ds = Contexts.getSparkSession().createDataFrame(rows, schema); + step1.setData(ds); + + Map step2ConfigMap = Maps.newHashMap(); + step2ConfigMap.put("dependencies", Lists.newArrayList("step1")); + step2ConfigMap.put(DecisionStep.IF_TRUE_STEP_NAMES_PROPERTY, Lists.newArrayList("step3", "step7")); + step2ConfigMap.put(DecisionStep.DECISION_METHOD_PROPERTY, DecisionStep.STEP_BY_KEY_DECISION_METHOD); + step2ConfigMap.put(DecisionStep.STEP_BY_KEY_STEP_PROPERTY, "step1"); + step2ConfigMap.put(DecisionStep.STEP_BY_KEY_KEY_PROPERTY, "namecheck"); + Config step2Config = ConfigFactory.parseMap(step2ConfigMap); + RefactorStep step2 = new DecisionStep("step2", step2Config); + steps.add(step2); + + Set refactored = step2.refactor(steps); + + assertEquals(refactored, Sets.newHashSet(step1, step2, step5, step6)); + } + + @Test + public void testPruneByStepValueTrue() { + StructType schema = new StructType(new StructField[] { + new StructField("outcome", DataTypes.BooleanType, false, Metadata.empty()) + }); + List rows = Lists.newArrayList( + RowFactory.create(true) + ); + Dataset ds = Contexts.getSparkSession().createDataFrame(rows, schema); + step1.setData(ds); + + Map step2ConfigMap = Maps.newHashMap(); + step2ConfigMap.put("dependencies", Lists.newArrayList("step1")); + step2ConfigMap.put(DecisionStep.IF_TRUE_STEP_NAMES_PROPERTY, Lists.newArrayList("step3", "step7")); + step2ConfigMap.put(DecisionStep.DECISION_METHOD_PROPERTY, DecisionStep.STEP_BY_VALUE_DECISION_METHOD); + step2ConfigMap.put(DecisionStep.STEP_BY_VALUE_STEP_PROPERTY, "step1"); + Config step2Config = ConfigFactory.parseMap(step2ConfigMap); + RefactorStep step2 = new DecisionStep("step2", step2Config); + steps.add(step2); + + Set refactored = step2.refactor(steps); + + assertEquals(refactored, Sets.newHashSet(step1, step2, step3, step4, step7, step8)); + } + + @Test + public void testPruneByStepValueFalse() { + StructType schema = new StructType(new StructField[] { + new StructField("outcome", DataTypes.BooleanType, false, Metadata.empty()) + }); + List rows = Lists.newArrayList( + RowFactory.create(false) + ); + Dataset ds = Contexts.getSparkSession().createDataFrame(rows, schema); + step1.setData(ds); + + Map step2ConfigMap = Maps.newHashMap(); + step2ConfigMap.put("dependencies", Lists.newArrayList("step1")); + step2ConfigMap.put(DecisionStep.IF_TRUE_STEP_NAMES_PROPERTY, Lists.newArrayList("step3", "step7")); + step2ConfigMap.put(DecisionStep.DECISION_METHOD_PROPERTY, DecisionStep.STEP_BY_VALUE_DECISION_METHOD); + step2ConfigMap.put(DecisionStep.STEP_BY_VALUE_STEP_PROPERTY, "step1"); + Config step2Config = ConfigFactory.parseMap(step2ConfigMap); + RefactorStep step2 = new DecisionStep("step2", step2Config); + steps.add(step2); + + Set refactored = step2.refactor(steps); + + assertEquals(refactored, Sets.newHashSet(step1, step2, step5, step6)); + } + +} diff --git a/src/test/java/com/cloudera/labs/envelope/run/TestLoopStep.java b/src/test/java/com/cloudera/labs/envelope/run/TestLoopStep.java index fd2cd69..7446ff0 100644 --- a/src/test/java/com/cloudera/labs/envelope/run/TestLoopStep.java +++ b/src/test/java/com/cloudera/labs/envelope/run/TestLoopStep.java @@ -52,7 +52,7 @@ public void testRangeValues() { loopStepConfigMap.put(LoopStep.RANGE_START_PROPERTY, 5); loopStepConfigMap.put(LoopStep.RANGE_END_PROPERTY, 7); Config loopStepConfig = ConfigFactory.parseMap(loopStepConfigMap); - LoopStep loopStep = new LoopStep("loop_step", loopStepConfig); + RefactorStep loopStep = new LoopStep("loop_step", loopStepConfig); steps.add(loopStep); Map step1ConfigMap = Maps.newHashMap(); @@ -61,7 +61,7 @@ public void testRangeValues() { Step step1 = new BatchStep("step1", step1Config); steps.add(step1); - Set unrolled = loopStep.unrollLoop(steps); + Set unrolled = loopStep.refactor(steps); assertEquals(unrolled.size(), 4); @@ -85,7 +85,7 @@ public void testListIntegerValues() { loopStepConfigMap.put(LoopStep.SOURCE_PROPERTY, LoopStep.SOURCE_LIST); loopStepConfigMap.put(LoopStep.LIST_PROPERTY, Lists.newArrayList(1, 10, 100)); Config loopStepConfig = ConfigFactory.parseMap(loopStepConfigMap); - LoopStep loopStep = new LoopStep("loop_step", loopStepConfig); + RefactorStep loopStep = new LoopStep("loop_step", loopStepConfig); steps.add(loopStep); Map step1ConfigMap = Maps.newHashMap(); @@ -94,7 +94,7 @@ public void testListIntegerValues() { Step step1 = new BatchStep("step1", step1Config); steps.add(step1); - Set unrolled = loopStep.unrollLoop(steps); + Set unrolled = loopStep.refactor(steps); assertEquals(unrolled.size(), 4); @@ -113,7 +113,7 @@ public void testListStringValues() { loopStepConfigMap.put(LoopStep.SOURCE_PROPERTY, LoopStep.SOURCE_LIST); loopStepConfigMap.put(LoopStep.LIST_PROPERTY, Lists.newArrayList("hello", "world", "bloop", "gloop")); Config loopStepConfig = ConfigFactory.parseMap(loopStepConfigMap); - LoopStep loopStep = new LoopStep("loop_step", loopStepConfig); + RefactorStep loopStep = new LoopStep("loop_step", loopStepConfig); steps.add(loopStep); Map step1ConfigMap = Maps.newHashMap(); @@ -122,7 +122,7 @@ public void testListStringValues() { Step step1 = new BatchStep("step1", step1Config); steps.add(step1); - Set unrolled = loopStep.unrollLoop(steps); + Set unrolled = loopStep.refactor(steps); assertEquals(unrolled.size(), 5); @@ -151,7 +151,7 @@ public void testStepValues() throws Exception { loopStepConfigMap.put(LoopStep.SOURCE_PROPERTY, LoopStep.SOURCE_STEP); loopStepConfigMap.put(LoopStep.STEP_PROPERTY, "source_step"); Config loopStepConfig = ConfigFactory.parseMap(loopStepConfigMap); - LoopStep loopStep = new LoopStep("loop_step", loopStepConfig); + RefactorStep loopStep = new LoopStep("loop_step", loopStepConfig); steps.add(loopStep); Map step1ConfigMap = Maps.newHashMap(); @@ -160,7 +160,7 @@ public void testStepValues() throws Exception { Step step1 = new BatchStep("step1", step1Config); steps.add(step1); - Set unrolled = loopStep.unrollLoop(steps); + Set unrolled = loopStep.refactor(steps); assertEquals(unrolled.size(), 5); @@ -181,7 +181,7 @@ public void testGraphOneLoopOnce() { loopStepConfigMap.put(LoopStep.RANGE_START_PROPERTY, 5); loopStepConfigMap.put(LoopStep.RANGE_END_PROPERTY, 5); Config loopStepConfig = ConfigFactory.parseMap(loopStepConfigMap); - LoopStep loopStep = new LoopStep("loop_step", loopStepConfig); + RefactorStep loopStep = new LoopStep("loop_step", loopStepConfig); steps.add(loopStep); Map step1ConfigMap = Maps.newHashMap(); @@ -190,7 +190,7 @@ public void testGraphOneLoopOnce() { Step step1 = new BatchStep("step1", step1Config); steps.add(step1); - Set unrolled = loopStep.unrollLoop(steps); + Set unrolled = loopStep.refactor(steps); assertEquals(unrolled.size(), 2); @@ -208,7 +208,7 @@ public void testGraphMultipleLoopOnce() { loopStepConfigMap.put(LoopStep.RANGE_START_PROPERTY, 100); loopStepConfigMap.put(LoopStep.RANGE_END_PROPERTY, 100); Config loopStepConfig = ConfigFactory.parseMap(loopStepConfigMap); - LoopStep loopStep = new LoopStep("loop_step", loopStepConfig); + RefactorStep loopStep = new LoopStep("loop_step", loopStepConfig); steps.add(loopStep); Map step1ConfigMap = Maps.newHashMap(); @@ -241,7 +241,7 @@ public void testGraphMultipleLoopOnce() { Step step5 = new BatchStep("step5", step5Config); steps.add(step5); - Set unrolled = loopStep.unrollLoop(steps); + Set unrolled = loopStep.refactor(steps); assertEquals(unrolled.size(), 6); @@ -263,7 +263,7 @@ public void testGraphOneLoopMany() { loopStepConfigMap.put(LoopStep.RANGE_START_PROPERTY, 5); loopStepConfigMap.put(LoopStep.RANGE_END_PROPERTY, 7); Config loopStepConfig = ConfigFactory.parseMap(loopStepConfigMap); - LoopStep loopStep = new LoopStep("loop_step", loopStepConfig); + RefactorStep loopStep = new LoopStep("loop_step", loopStepConfig); steps.add(loopStep); Map step1ConfigMap = Maps.newHashMap(); @@ -272,7 +272,7 @@ public void testGraphOneLoopMany() { Step step1 = new BatchStep("step1", step1Config); steps.add(step1); - Set unrolled = loopStep.unrollLoop(steps); + Set unrolled = loopStep.refactor(steps); assertEquals(unrolled.size(), 4); @@ -292,7 +292,7 @@ public void testGraphMultipleLoopMany() { loopStepConfigMap.put(LoopStep.RANGE_START_PROPERTY, 100); loopStepConfigMap.put(LoopStep.RANGE_END_PROPERTY, 102); Config loopStepConfig = ConfigFactory.parseMap(loopStepConfigMap); - LoopStep loopStep = new LoopStep("loop_step", loopStepConfig); + RefactorStep loopStep = new LoopStep("loop_step", loopStepConfig); steps.add(loopStep); Map step1ConfigMap = Maps.newHashMap(); @@ -325,7 +325,7 @@ public void testGraphMultipleLoopMany() { Step step5 = new BatchStep("step5", step5Config); steps.add(step5); - Set unrolled = loopStep.unrollLoop(steps); + Set unrolled = loopStep.refactor(steps); assertEquals(unrolled.size(), 16); @@ -357,7 +357,7 @@ public void testParallelMode() { loopStepConfigMap.put(LoopStep.RANGE_START_PROPERTY, 5); loopStepConfigMap.put(LoopStep.RANGE_END_PROPERTY, 7); Config loopStepConfig = ConfigFactory.parseMap(loopStepConfigMap); - LoopStep loopStep = new LoopStep("loop_step", loopStepConfig); + RefactorStep loopStep = new LoopStep("loop_step", loopStepConfig); steps.add(loopStep); Map step1ConfigMap = Maps.newHashMap(); @@ -366,7 +366,7 @@ public void testParallelMode() { Step step1 = new BatchStep("step1", step1Config); steps.add(step1); - Set unrolled = loopStep.unrollLoop(steps); + Set unrolled = loopStep.refactor(steps); assertEquals(unrolled.size(), 4); @@ -386,7 +386,7 @@ public void testSerialMode() { loopStepConfigMap.put(LoopStep.RANGE_START_PROPERTY, 5); loopStepConfigMap.put(LoopStep.RANGE_END_PROPERTY, 7); Config loopStepConfig = ConfigFactory.parseMap(loopStepConfigMap); - LoopStep loopStep = new LoopStep("loop_step", loopStepConfig); + RefactorStep loopStep = new LoopStep("loop_step", loopStepConfig); steps.add(loopStep); Map step1ConfigMap = Maps.newHashMap(); @@ -395,7 +395,7 @@ public void testSerialMode() { Step step1 = new BatchStep("step1", step1Config); steps.add(step1); - Set unrolled = loopStep.unrollLoop(steps); + Set unrolled = loopStep.refactor(steps); assertEquals(unrolled.size(), 4); @@ -415,7 +415,7 @@ public void testAfterLoop() { loopStepConfigMap.put(LoopStep.RANGE_START_PROPERTY, 5); loopStepConfigMap.put(LoopStep.RANGE_END_PROPERTY, 7); Config loopStepConfig = ConfigFactory.parseMap(loopStepConfigMap); - LoopStep loopStep = new LoopStep("loop_step", loopStepConfig); + RefactorStep loopStep = new LoopStep("loop_step", loopStepConfig); steps.add(loopStep); Map step1ConfigMap = Maps.newHashMap(); @@ -436,7 +436,7 @@ public void testAfterLoop() { Step afterAfterLoop = new BatchStep("after_after_loop", afterAfterLoopConfig); steps.add(afterAfterLoop); - Set unrolled = loopStep.unrollLoop(steps); + Set unrolled = loopStep.refactor(steps); assertEquals(unrolled.size(), 6); @@ -458,7 +458,7 @@ public void testWithoutParameter() { loopStepConfigMap.put(LoopStep.RANGE_START_PROPERTY, 5); loopStepConfigMap.put(LoopStep.RANGE_END_PROPERTY, 5); Config loopStepConfig = ConfigFactory.parseMap(loopStepConfigMap); - LoopStep loopStep = new LoopStep("loop_step", loopStepConfig); + RefactorStep loopStep = new LoopStep("loop_step", loopStepConfig); steps.add(loopStep); Map step1ConfigMap = Maps.newHashMap(); @@ -467,7 +467,7 @@ public void testWithoutParameter() { Step step1 = new BatchStep("step1", step1Config); steps.add(step1); - Set unrolled = loopStep.unrollLoop(steps); + Set unrolled = loopStep.refactor(steps); assertEquals(unrolled.size(), 2); @@ -486,7 +486,7 @@ public void testWithParameter() throws Exception { loopStepConfigMap.put(LoopStep.RANGE_END_PROPERTY, 7); loopStepConfigMap.put(LoopStep.PARAMETER_PROPERTY, "loop_value"); Config loopStepConfig = ConfigFactory.parseMap(loopStepConfigMap); - LoopStep loopStep = new LoopStep("loop_step", loopStepConfig); + RefactorStep loopStep = new LoopStep("loop_step", loopStepConfig); steps.add(loopStep); Map step1ConfigMap = Maps.newHashMap(); @@ -498,7 +498,7 @@ public void testWithParameter() throws Exception { Step step1 = new BatchStep("step1", step1Config); steps.add(step1); - Set unrolled = loopStep.unrollLoop(steps); + Set unrolled = loopStep.refactor(steps); assertEquals(unrolled.size(), 4); @@ -528,7 +528,7 @@ public void testLoopWithinLoop() { step1ConfigMap.put(LoopStep.RANGE_START_PROPERTY, 10); step1ConfigMap.put(LoopStep.RANGE_END_PROPERTY, 11); Config step1Config = ConfigFactory.parseMap(step1ConfigMap); - LoopStep step1 = new LoopStep("loop_step1", step1Config); + RefactorStep step1 = new LoopStep("loop_step1", step1Config); Map step2ConfigMap = Maps.newHashMap(); step2ConfigMap.put("dependencies", Lists.newArrayList("loop_step1")); @@ -542,7 +542,7 @@ public void testLoopWithinLoop() { step3ConfigMap.put(LoopStep.RANGE_START_PROPERTY, 12); step3ConfigMap.put(LoopStep.RANGE_END_PROPERTY, 13); Config step3Config = ConfigFactory.parseMap(step3ConfigMap); - LoopStep step3 = new LoopStep("loop_step3", step3Config); + RefactorStep step3 = new LoopStep("loop_step3", step3Config); Map step4ConfigMap = Maps.newHashMap(); step4ConfigMap.put("dependencies", Lists.newArrayList("loop_step1", "loop_step3")); @@ -562,17 +562,17 @@ public void testLoopWithinLoop() { Set steps = Sets.newHashSet(step1, step2, step3, step4, step5, step6); // Unroll the top level loop - Set unrolled = step1.unrollLoop(steps); + Set unrolled = step1.refactor(steps); // Unroll the unrolled two bottom level loops for (Step unrolledStep : unrolled) { if (unrolledStep instanceof LoopStep && !unrolledStep.hasSubmitted()) { - unrolled = ((LoopStep)unrolledStep).unrollLoop(unrolled); + unrolled = ((LoopStep)unrolledStep).refactor(unrolled); break; } } for (Step unrolledStep : unrolled) { if (unrolledStep instanceof LoopStep && !unrolledStep.hasSubmitted()) { - unrolled = ((LoopStep)unrolledStep).unrollLoop(unrolled); + unrolled = ((LoopStep)unrolledStep).refactor(unrolled); break; } }