Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Experimental] Addition of dataset comparison utilities #449

Merged
merged 17 commits into from
Feb 22, 2023

Conversation

rdsharma26
Copy link
Contributor

Description of changes:

These changes bring in two utility classes which can be used for dataset comparisons.

  • Referential Integrity: This can be used to check if the set of values of a column in one dataset is a subset of values of a column in another dataset.
  • Data Synchronization: This can be used to compare two datasets.

Original author: Fernan Gonzalez (@fergonp) as part of their internship project at Amazon.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.

object DataSynchronization {

/**
* Compare two DataFrames 1 to 1 with specific columns inputted by the customer.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add an example for how this will be used with the params below?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Example added.

Comment on lines 46 to 49
val ds1Unique = ds1.groupBy(colKeyMap.keys.toSeq.map(col): _*).count()
val ds2Unique = ds2.groupBy(colKeyMap.values.toSeq.map(col): _*).count()

if (!(ds1Unique.count() == ds1.count() && ds2Unique.count() == ds2.count())) return false

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add some comments as to what you're trying to achieve here?

ds2Unique is a dataframe with the join keys and their aggregate counts, but the next line compares the aggregate count to the data set count, i.e.

ds1Unique.count() == ds1.count()

should always fail right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comment explaining the check.

def columnMatch(ds1: DataFrame,
ds2: DataFrame,
colKeyMap: Map[String, String],
compCols: Option[Map[String, String]],

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can default to an empty map instead of Option

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in fact, it might be better to overload the method; so one without compCols and one with it to do the column comparisons.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added overloaded methods.

Comment on lines 67 to 69
} else {
false
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if comparison columns are non-empty we just fail the assertion?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't understand the entire if/else branch.. what is going on here? can we add some comments?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the API to have better error messaging.


if (compCols.isDefined) {

val mergedMaps = colKeyMap.++(compCols.get)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: colKeyMap ++ comCols.get

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


val joined = ds1.join(ds2, joinExpression, "inner")

val mostRows = if (ds1.count() > ds2.count()) ds1.count() else ds2.count()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it's a 1 to 1 check, why do we expect the row counts to be different?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the logic.

object ReferentialIntegrity {

/**
* Checks to what extend a column from a DataFrame is a subset of another column

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "extent"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

assertion((primaryCount - mismatchCount).toDouble / primaryCount)
}
} else {
false

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the column does not exist, shouldn't it throw an error instead? seems like there are [at least] two states:

  • column does not exist in primary or reference => error
  • column not a subset of reference => error

i think it's better to have them as separate errors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the API to have better error messaging that captures the states you described.

if (mismatchCount == 0) {
assertion(1.0)
} else {
val primaryCount = primary.count

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if is 0 it will return below NaN

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the logic.

val ds2 = testDS2
val colKeyMap = Map("id" -> "id")
val compCols = Some(Map("name" -> "name"))
val assertion: Double => Boolean = _ >= 0.60

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't you assert == 0.66? or close? this leaves room for test error if the calculation changes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

- Added documentation for Data Sync APIs, with examples.
- Updated the error messaging, based on different scenarios.
Copy link
Contributor

@mentekid mentekid left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me

@rdsharma26 rdsharma26 merged commit d2551bc into awslabs:master Feb 22, 2023
rdsharma26 added a commit that referenced this pull request Feb 28, 2023
* Referential Integrity check and test, with Data Synchronization Check and Test

* remove .DS_Store files

* Cleaner versions of Referential Integrity and Data Synchronization checks and tests.

* save save

* Newest version of my three checks

* Version for code review, for all of my checks

* Final code review

* Pull request version of my code

* Pull request version of my code

* Final Version Pull Request

* remove .DS_Store files Duplicate

* .DS_Store banished!

* Removing

* Removings

* Delete DS_Stores

* Cleanup: Update parameter names, descriptions, remove unnecessary whitespace etc.

* Changes in accordance with comments on the pull request.

- Added documentation for Data Sync APIs, with examples.
- Updated the error messaging, based on different scenarios.

---------

Co-authored-by: Fernan Gonzalez <[email protected]>
@rdsharma26 rdsharma26 mentioned this pull request Jun 28, 2023
rdsharma26 added a commit that referenced this pull request Apr 16, 2024
* Referential Integrity check and test, with Data Synchronization Check and Test

* remove .DS_Store files

* Cleaner versions of Referential Integrity and Data Synchronization checks and tests.

* save save

* Newest version of my three checks

* Version for code review, for all of my checks

* Final code review

* Pull request version of my code

* Pull request version of my code

* Final Version Pull Request

* remove .DS_Store files Duplicate

* .DS_Store banished!

* Removing

* Removings

* Delete DS_Stores

* Cleanup: Update parameter names, descriptions, remove unnecessary whitespace etc.

* Changes in accordance with comments on the pull request.

- Added documentation for Data Sync APIs, with examples.
- Updated the error messaging, based on different scenarios.

---------

Co-authored-by: Fernan Gonzalez <[email protected]>
rdsharma26 added a commit that referenced this pull request Apr 16, 2024
* Referential Integrity check and test, with Data Synchronization Check and Test

* remove .DS_Store files

* Cleaner versions of Referential Integrity and Data Synchronization checks and tests.

* save save

* Newest version of my three checks

* Version for code review, for all of my checks

* Final code review

* Pull request version of my code

* Pull request version of my code

* Final Version Pull Request

* remove .DS_Store files Duplicate

* .DS_Store banished!

* Removing

* Removings

* Delete DS_Stores

* Cleanup: Update parameter names, descriptions, remove unnecessary whitespace etc.

* Changes in accordance with comments on the pull request.

- Added documentation for Data Sync APIs, with examples.
- Updated the error messaging, based on different scenarios.

---------

Co-authored-by: Fernan Gonzalez <[email protected]>
rdsharma26 added a commit that referenced this pull request Apr 16, 2024
* Referential Integrity check and test, with Data Synchronization Check and Test

* remove .DS_Store files

* Cleaner versions of Referential Integrity and Data Synchronization checks and tests.

* save save

* Newest version of my three checks

* Version for code review, for all of my checks

* Final code review

* Pull request version of my code

* Pull request version of my code

* Final Version Pull Request

* remove .DS_Store files Duplicate

* .DS_Store banished!

* Removing

* Removings

* Delete DS_Stores

* Cleanup: Update parameter names, descriptions, remove unnecessary whitespace etc.

* Changes in accordance with comments on the pull request.

- Added documentation for Data Sync APIs, with examples.
- Updated the error messaging, based on different scenarios.

---------

Co-authored-by: Fernan Gonzalez <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants