Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Execution failure crosstalk between different checks in a suite #467

Closed
marcantony opened this issue Apr 13, 2023 · 2 comments · Fixed by samarth-c1/deequ#1 or #478
Closed

Execution failure crosstalk between different checks in a suite #467

marcantony opened this issue Apr 13, 2023 · 2 comments · Fixed by samarth-c1/deequ#1 or #478
Labels
bug Something isn't working good first issue Good for newcomers

Comments

@marcantony
Copy link
Contributor

It's possible for one check in a suite to fail because a different check in the suite encounters some exception while executing its constraints. I ran into this issue when working with column names that don't exist in the data and I'm unsure whether or not it's intended behavior.

This unit test reproduces the issue:

import com.amazon.deequ.VerificationSuite
import com.amazon.deequ.checks.{Check, CheckLevel, CheckStatus}
import org.apache.spark.sql.SparkSession
import org.scalatest.flatspec.AnyFlatSpec

class RuleFailureCrosstalkTest extends AnyFlatSpec {

  case class MyData(value: String)
  private val data = Seq(MyData("foo"), MyData("bar"))

  "A well-defined check" should "pass even if an ill-defined check is also configured" in {
    val spark = SparkSession.builder().master("local[*]").getOrCreate()
    val df = spark.createDataFrame(data)

    val checkThatShouldSucceed =
      Check(CheckLevel.Error, "shouldSucceed").isComplete("value")
    val verificationResult = VerificationSuite()
      .onData(df)
      .addCheck(checkThatShouldSucceed)
      .addCheck(
        Check(CheckLevel.Error, "shouldError")
          .isContainedIn("fakeColumn", 1, 3)
      )
      .run()

    val checkResult = verificationResult.checkResults(checkThatShouldSucceed)
    System.out.println(checkResult.constraintResults.map(_.message))
    assert(checkResult.status == CheckStatus.Success)
  }
}

Expected outcome: Unit test passes because the column value is present and complete, so the corresponding check shouldSucceed should succeed.

Actual outcome: Unit test fails. The following message is output from the constraint:

org.apache.spark.sql.AnalysisException: cannot resolve 'fakeColumn' given input columns: [value]; line 1 pos 0;
'Aggregate [sum(cast(isnotnull(value#0) as int)) AS sum(CAST((value IS NOT NULL) AS INT))#15L, count(1) AS count(1)#16L, sum(cast((isnull('fakeColumn) OR (('fakeColumn >= 1.0) AND ('fakeColumn <= 3.0))) as int)) AS sum(CAST(((fakeColumn IS NULL) OR ((fakeColumn >= 1.0) AND (fakeColumn <= 3.0))) AS INT))#17, count(1) AS count(1)#18L]
+- LocalRelation [value#0]

Although the shouldSucceed check doesn't have a constraint on the column fakeColumn, it fails because it's not present.

I noticed the above behavior with this combination of isComplete and isContainedIn, but I didn't check what other combinations might also cause it. Notably though, I noticed that the test actually succeeds for some constraint choices on the shouldError check. (For example, if you replace the isContainedIn constraint on fakeColumn with isComplete, the shouldSucceed check then succeeds.)

Deequ version: 2.0.3-spark-3.3
Java version: Corretto 19.0.2

@mentekid
Copy link
Contributor

Thanks for reporting this. We will review and let you know if it's a bug or intended behavior.

@mentekid
Copy link
Contributor

Looked into this a bit more. This is a bug in our execution logic.

We collect all scans required for analyzers and run them all at once here:
https://github.com/awslabs/deequ/blob/master/src/main/scala/com/amazon/deequ/analyzers/runners/AnalysisRunner.scala#L325

Spark throws because one of the scans required involves fakeColumn, which doesn't exist. We catch that exception and we fail all Analyzers with it:

try {
  ...
  val results = data.agg(aggregations.head, aggregations.tail: _*).collect().head
  ...
} catch {
  case error: Exception =>
    shareableAnalyzers.map { analyzer => analyzer -> analyzer.toFailureMetric(error) }
}

agg is a Spark function and when it throws we cannot tell which of the aggregation functions failed.

A quick fix would be to have all Checks that work on one or more columns have a precondition that verifies the column exists. That way we don't have to react to a Spark exception, but instead fail the Check on a precondition being invalid.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working good first issue Good for newcomers
Projects
None yet
2 participants