diff --git a/.github/workflows/README.md b/.github/workflows/README.md index 0bc117ef..e5f23933 100644 --- a/.github/workflows/README.md +++ b/.github/workflows/README.md @@ -1,5 +1,7 @@ # recover github workflows +## Overview + Recover ETL has four github workflows: - workflows/upload-and-deploy.yaml @@ -14,28 +16,31 @@ Recover ETL has four github workflows: | codeql-analysis | on-push from feature branch, feature branch merged into main | | cleanup | feature branch deleted | + +## upload-files + +Copies pilot data sets from ingestion bucket to input data bucket for use in integration test. Note that this behavior assumes that there are files in the ingestion bucket. Could add updates to make this robust and throw an error if the ingestion bucket path is empty. + ## upload-and-deploy Here are some more detailed descriptions and troubleshooting tips for some jobs within each workflow: -### upload-files - -Copies pilot data sets from ingestion bucket to input data bucket for use in integration test. Note that this behavior assumes that there are files in the ingestion bucket. Could add updates to make this robust and throw an error if the ingestion bucket path is empty. +### Current Testing Related Jobs -### nonglue-unit-tests +#### nonglue-unit-tests See [testing](/tests/README.md) for more info on the background behind these tests. Here, both the `recover-dev-input-data` and `recover-dev-processed-data` buckets' synapse folders are tested for STS access every time something is pushed to the feature branch and when the feature branch is merged to main. This is like an integration test and because it depends on connection to Synapse, sometimes the connection will be stalled, broken, etc. Usually this test will only take 1 min or less. Sometimes just re-running this job will do the trick. -### pytest-docker +#### pytest-docker This sets up and uploads the two docker images to ECR repository. **Note: A ECR repo called `pytest` would need to exist in the AWS account we are pushing docker images to prior to running this GH action.** Some behavioral aspects to note - there were limitations with the matrix method in Github action jobs thus had to unmask account id to pass it as an output for `glue-unit-tests` to use. The matrix method at this time [see issue thread](https://github.com/orgs/community/discussions/17245) doesn't support dynamic job outputs and the workaround seemed more complex to implement, thus we weren't able to pass the path of the uploaded docker container directly and had to use a static output. This leads us to use `steps.login-ecr.outputs.registry` which contains account id directly so the output could be passed and the docker container could be found and used. -### glue-unit-tests +#### glue-unit-tests See [testing](/tests/README.md) for more info on the background behind these tests. @@ -43,6 +48,24 @@ For the JSON to Parquet tests sometimes there may be a scenario where a github w With the current way when the `test_json_to_parquet.py` run, sometimes the glue table, glue crawler role and other resources may have been created already for the given branch (and didn’t get deleted because the test didn’t run all the way through) and will error out when the github workflow gets triggered again because it hits the `AlreadyExistsException`. This is currently resolved manually by deleting the resource(s) that has been created in the AWS account and re-running the github jobs that failed. +### Adding Test Commands to Github Workflow Jobs + +After developing and running tests locally, you need to ensure the tests are run in the CI pipeline. To add your tests to under the `upload-and-deploy` job: + +Add your test commands under the appropriate job (see above for summaries on the specific testing related jobs), for example: + +```yaml +jobs: + build: + runs-on: ubuntu-latest + steps: + # Other steps... + - name: Run tests + run: | + pytest tests/ + +``` + ### sceptre-deploy-develop ### integration-test-develop-cleanup diff --git a/.github/workflows/upload-and-deploy.yaml b/.github/workflows/upload-and-deploy.yaml index b3277a1f..bd4f2837 100755 --- a/.github/workflows/upload-and-deploy.yaml +++ b/.github/workflows/upload-and-deploy.yaml @@ -134,11 +134,14 @@ jobs: pipenv install ecs_logging~=2.0 pipenv install pytest-datadir - - name: Test lambda scripts with pytest + - name: Test scripts with pytest (lambda, etc.) run: | - pipenv run python -m pytest tests/test_s3_event_config_lambda.py -v - pipenv run python -m pytest tests/test_s3_to_glue_lambda.py -v - pipenv run python -m pytest -v tests/test_lambda_raw.py + pipenv run python -m pytest \ + tests/test_s3_event_config_lambda.py \ + tests/test_s3_to_glue_lambda.py \ + tests/test_lambda_dispatch.py \ + tests/test_consume_logs.py \ + tests/test_lambda_raw.py -v - name: Test dev synapse folders for STS access with pytest run: > @@ -249,18 +252,25 @@ jobs: if: github.ref_name != 'main' run: echo "NAMESPACE=$GITHUB_REF_NAME" >> $GITHUB_ENV - - name: Run Pytest unit tests under AWS 3.0 + - name: Run Pytest unit tests under AWS Glue 3.0 if: matrix.tag_name == 'aws_glue_3' run: | - su - glue_user --command "cd $GITHUB_WORKSPACE && python3 -m pytest tests/test_s3_to_json.py -v" - su - glue_user --command "cd $GITHUB_WORKSPACE && python3 -m pytest tests/test_compare_parquet_datasets.py -v" + su - glue_user --command "cd $GITHUB_WORKSPACE && python3 -m pytest \ + tests/test_s3_to_json.py \ + tests/test_compare_parquet_datasets.py -v" - - name: Run Pytest unit tests under AWS 4.0 + - name: Run unit tests for JSON to Parquet under AWS Glue 4.0 if: matrix.tag_name == 'aws_glue_4' run: > su - glue_user --command "cd $GITHUB_WORKSPACE && python3 -m pytest tests/test_json_to_parquet.py --namespace $NAMESPACE -v" + - name: Run unit tests for Great Expectations on Parquet under AWS Glue 4.0 + if: matrix.tag_name == 'aws_glue_4' + run: > + su - glue_user --command "cd $GITHUB_WORKSPACE && + python3 -m pytest tests/test_run_great_expectations_on_parquet.py -v" + sceptre-deploy-develop: name: Deploys branch using sceptre runs-on: ubuntu-latest @@ -287,7 +297,7 @@ jobs: run: echo "NAMESPACE=$GITHUB_REF_NAME" >> $GITHUB_ENV - name: "Deploy sceptre stacks to dev" - run: pipenv run sceptre --var "namespace=${{ env.NAMESPACE }}" launch develop --yes + run: pipenv run sceptre --debug --var "namespace=${{ env.NAMESPACE }}" launch develop --yes - name: Delete preexisting S3 event notification for this namespace uses: gagoar/invoke-aws-lambda@v3 diff --git a/config/config.yaml b/config/config.yaml index 07930a8e..ff15f5f7 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -7,6 +7,7 @@ template_key_prefix: "{{ var.namespace | default('main') }}/templates" glue_python_shell_python_version: "3.9" glue_python_shell_glue_version: "3.0" json_to_parquet_glue_version: "4.0" +great_expectations_job_glue_version: "4.0" default_stack_tags: Department: DNT Project: recover diff --git a/config/develop/glue-job-role.yaml b/config/develop/glue-job-role.yaml index 7e0feb26..9396a782 100644 --- a/config/develop/glue-job-role.yaml +++ b/config/develop/glue-job-role.yaml @@ -6,5 +6,6 @@ parameters: S3IntermediateBucketName: {{ stack_group_config.intermediate_bucket_name }} S3ParquetBucketName: {{ stack_group_config.processed_data_bucket_name }} S3ArtifactBucketName: {{ stack_group_config.template_bucket_name }} + S3ShareableArtifactBucketName: {{ stack_group_config.shareable_artifacts_vpn_bucket_name }} stack_tags: {{ stack_group_config.default_stack_tags }} diff --git a/config/develop/namespaced/glue-job-run-great-expectations-on-parquet.yaml b/config/develop/namespaced/glue-job-run-great-expectations-on-parquet.yaml new file mode 100644 index 00000000..1e6d461e --- /dev/null +++ b/config/develop/namespaced/glue-job-run-great-expectations-on-parquet.yaml @@ -0,0 +1,18 @@ +template: + path: glue-job-run-great-expectations-on-parquet.j2 +dependencies: + - develop/glue-job-role.yaml +stack_name: "{{ stack_group_config.namespace }}-glue-job-RunGreatExpectationsParquet" +parameters: + Namespace: {{ stack_group_config.namespace }} + JobDescription: Runs great expectations on a set of data + JobRole: !stack_output_external glue-job-role::RoleArn + TempS3Bucket: {{ stack_group_config.processed_data_bucket_name }} + S3ScriptBucket: {{ stack_group_config.template_bucket_name }} + S3ScriptKey: '{{ stack_group_config.namespace }}/src/glue/jobs/run_great_expectations_on_parquet.py' + GlueVersion: "{{ stack_group_config.great_expectations_job_glue_version }}" + AdditionalPythonModules: "great_expectations~=0.18,urllib3<2" +stack_tags: + {{ stack_group_config.default_stack_tags }} +sceptre_user_data: + dataset_schemas: !file src/glue/resources/table_columns.yaml diff --git a/config/develop/namespaced/glue-workflow.yaml b/config/develop/namespaced/glue-workflow.yaml index d8deb13e..6861a72e 100644 --- a/config/develop/namespaced/glue-workflow.yaml +++ b/config/develop/namespaced/glue-workflow.yaml @@ -6,6 +6,7 @@ dependencies: - develop/namespaced/glue-job-S3ToJsonS3.yaml - develop/namespaced/glue-job-JSONToParquet.yaml - develop/namespaced/glue-job-compare-parquet.yaml + - develop/namespaced/glue-job-run-great-expectations-on-parquet.yaml - develop/glue-job-role.yaml - develop/s3-cloudformation-bucket.yaml parameters: @@ -19,7 +20,10 @@ parameters: CompareParquetMainNamespace: "main" S3SourceBucketName: {{ stack_group_config.input_bucket_name }} CloudformationBucketName: {{ stack_group_config.template_bucket_name }} + ShareableArtifactsBucketName: {{ stack_group_config.shareable_artifacts_vpn_bucket_name }} + ExpectationSuiteKey: "{{ stack_group_config.namespace }}/src/glue/resources/data_values_expectations.json" stack_tags: {{ stack_group_config.default_stack_tags }} sceptre_user_data: dataset_schemas: !file src/glue/resources/table_columns.yaml + data_values_expectations: !file src/glue/resources/data_values_expectations.json diff --git a/config/prod/glue-job-role.yaml b/config/prod/glue-job-role.yaml index 7e0feb26..9396a782 100644 --- a/config/prod/glue-job-role.yaml +++ b/config/prod/glue-job-role.yaml @@ -6,5 +6,6 @@ parameters: S3IntermediateBucketName: {{ stack_group_config.intermediate_bucket_name }} S3ParquetBucketName: {{ stack_group_config.processed_data_bucket_name }} S3ArtifactBucketName: {{ stack_group_config.template_bucket_name }} + S3ShareableArtifactBucketName: {{ stack_group_config.shareable_artifacts_vpn_bucket_name }} stack_tags: {{ stack_group_config.default_stack_tags }} diff --git a/config/prod/namespaced/glue-job-run-great-expectations-on-parquet.yaml b/config/prod/namespaced/glue-job-run-great-expectations-on-parquet.yaml new file mode 100644 index 00000000..f0e8dd2a --- /dev/null +++ b/config/prod/namespaced/glue-job-run-great-expectations-on-parquet.yaml @@ -0,0 +1,18 @@ +template: + path: glue-job-run-great-expectations-on-parquet.j2 +dependencies: + - prod/glue-job-role.yaml +stack_name: "{{ stack_group_config.namespace }}-glue-job-RunGreatExpectationsParquet" +parameters: + Namespace: {{ stack_group_config.namespace }} + JobDescription: Runs great expectations on a set of data + JobRole: !stack_output_external glue-job-role::RoleArn + TempS3Bucket: {{ stack_group_config.processed_data_bucket_name }} + S3ScriptBucket: {{ stack_group_config.template_bucket_name }} + S3ScriptKey: '{{ stack_group_config.namespace }}/src/glue/jobs/run_great_expectations_on_parquet.py' + GlueVersion: "{{ stack_group_config.great_expectations_job_glue_version }}" + AdditionalPythonModules: "great_expectations~=0.18,urllib3<2" +stack_tags: + {{ stack_group_config.default_stack_tags }} +sceptre_user_data: + dataset_schemas: !file src/glue/resources/table_columns.yaml diff --git a/config/prod/namespaced/glue-workflow.yaml b/config/prod/namespaced/glue-workflow.yaml index d2260f8b..3223adb9 100644 --- a/config/prod/namespaced/glue-workflow.yaml +++ b/config/prod/namespaced/glue-workflow.yaml @@ -6,6 +6,7 @@ dependencies: - prod/namespaced/glue-job-S3ToJsonS3.yaml - prod/namespaced/glue-job-JSONToParquet.yaml - prod/namespaced/glue-job-compare-parquet.yaml + - prod/namespaced/glue-job-run-great-expectations-on-parquet.yaml - prod/glue-job-role.yaml - prod/s3-cloudformation-bucket.yaml parameters: @@ -19,7 +20,10 @@ parameters: CompareParquetMainNamespace: "main" S3SourceBucketName: {{ stack_group_config.input_bucket_name }} CloudformationBucketName: {{ stack_group_config.template_bucket_name }} + ShareableArtifactsBucketName: {{ stack_group_config.shareable_artifacts_vpn_bucket_name }} + ExpectationSuiteKey: "{{ stack_group_config.namespace }}/src/glue/resources/data_values_expectations.json" stack_tags: {{ stack_group_config.default_stack_tags }} sceptre_user_data: dataset_schemas: !file src/glue/resources/table_columns.yaml + data_values_expectations: !file src/glue/resources/data_values_expectations.json diff --git a/src/glue/jobs/run_great_expectations_on_parquet.py b/src/glue/jobs/run_great_expectations_on_parquet.py new file mode 100644 index 00000000..68c0e990 --- /dev/null +++ b/src/glue/jobs/run_great_expectations_on_parquet.py @@ -0,0 +1,408 @@ +import json +import logging +import sys +from datetime import datetime +from typing import Dict + +import boto3 +import great_expectations as gx +from awsglue.context import GlueContext +from awsglue.utils import getResolvedOptions +from great_expectations.core.batch import RuntimeBatchRequest +from great_expectations.core.expectation_configuration import ExpectationConfiguration +from great_expectations.core.run_identifier import RunIdentifier +from great_expectations.core.yaml_handler import YAMLHandler +from great_expectations.data_context.types.base import DataContextConfig +from great_expectations.data_context.types.resource_identifiers import ( + ExpectationSuiteIdentifier, + ValidationResultIdentifier, +) +from pyspark.context import SparkContext + +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) +handler = logging.StreamHandler(sys.stdout) +handler.setLevel(logging.DEBUG) +formatter = logging.Formatter("%(levelname)s:%(name)s:%(message)s") +handler.setFormatter(formatter) +logger.addHandler(handler) + + +def read_args() -> dict: + """Returns the specific params that our code needs to run""" + args = getResolvedOptions( + sys.argv, + [ + "parquet-bucket", + "shareable-artifacts-bucket", + "cfn-bucket", + "namespace", + "data-type", + "expectation-suite-key", + ], + ) + for arg in args: + validate_args(args[arg]) + return args + + +def validate_args(value: str) -> None: + """Checks to make sure none of the input command line arguments are empty strings + + Args: + value (str): the value of the command line argument parsed by argparse + + Raises: + ValueError: when value is an empty string + """ + if value == "": + raise ValueError("Argument value cannot be an empty string") + else: + return None + + +def create_context( + s3_bucket: str, namespace: str, key_prefix: str +) -> "EphemeralDataContext": + """Creates the data context and adds stores, + datasource and data docs configurations + + Args: + s3_bucket (str): name of s3 bucket to store to + namespace (str): namespace + key_prefix (str): s3 key prefix + + Returns: + EphemeralDataContext: context object with all + configurations + """ + context = gx.get_context() + add_datasource(context) + add_validation_stores(context, s3_bucket, namespace, key_prefix) + add_data_docs_sites(context, s3_bucket, namespace, key_prefix) + return context + + +def add_datasource(context: "EphemeralDataContext") -> "EphemeralDataContext": + """Adds the spark datasource + + Args: + context (EphemeralDataContext): data context to add to + + Returns: + EphemeralDataContext: data context object with datasource configuration + added + """ + yaml = YAMLHandler() + context.add_datasource( + **yaml.load( + """ + name: spark_datasource + class_name: Datasource + execution_engine: + class_name: SparkDFExecutionEngine + force_reuse_spark_context: true + data_connectors: + runtime_data_connector: + class_name: RuntimeDataConnector + batch_identifiers: + - batch_identifier + """ + ) + ) + return context + + +def add_validation_stores( + context: "EphemeralDataContext", + s3_bucket: str, + namespace: str, + key_prefix: str, +) -> "EphemeralDataContext": + """Adds the validation store configurations to the context object + + Args: + context (EphemeralDataContext): data context to add to + s3_bucket (str): name of the s3 bucket to save validation results to + namespace (str): name of the namespace + key_prefix (str): s3 key prefix to save the + validation results to + + Returns: + EphemeralDataContext: data context object with validation stores' + configuration added + """ + # Programmatically configure the validation result store and + # DataDocs to use S3 + context.add_store( + "validation_result_store", + { + "class_name": "ValidationsStore", + "store_backend": { + "class_name": "TupleS3StoreBackend", + "bucket": s3_bucket, + "prefix": f"{namespace}/{key_prefix}", + }, + }, + ) + return context + + +def add_data_docs_sites( + context: "EphemeralDataContext", + s3_bucket: str, + namespace: str, + key_prefix: str, +) -> "EphemeralDataContext": + """Adds the data docs sites configuration to the context object + so data docs can be saved to a s3 location. This is a special + workaround to add the data docs because we're using EphemeralDataContext + context objects and they don't store to memory. + + Args: + context (EphemeralDataContext): data context to add to + s3_bucket (str): name of the s3 bucket to save gx docs to + namespace (str): name of the namespace + key_prefix (str): s3 key prefix to save the + gx docs to + + Returns: + EphemeralDataContext: data context object with data docs sites' + configuration added + """ + data_context_config = DataContextConfig() + data_context_config["data_docs_sites"] = { + "s3_site": { + "class_name": "SiteBuilder", + "store_backend": { + "class_name": "TupleS3StoreBackend", + "bucket": s3_bucket, + "prefix": f"{namespace}/{key_prefix}", + }, + "site_index_builder": {"class_name": "DefaultSiteIndexBuilder"}, + } + } + context._project_config["data_docs_sites"] = data_context_config["data_docs_sites"] + return context + + +def get_spark_df( + glue_context: GlueContext, parquet_bucket: str, namespace: str, data_type: str +) -> "pyspark.sql.dataframe.DataFrame": + """Reads in the parquet dataset as a Dynamic Frame and converts it + to a spark dataframe + + Args: + glue_context (GlueContext): the aws glue context object + parquet_bucket (str): the name of the bucket holding parquet files + namespace (str): the namespace + data_type (str): the data type name + + Returns: + pyspark.sql.dataframe.DataFrame: spark dataframe of the read in parquet dataset + """ + s3_parquet_path = f"s3://{parquet_bucket}/{namespace}/parquet/dataset_{data_type}/" + dynamic_frame = glue_context.create_dynamic_frame_from_options( + connection_type="s3", + connection_options={"paths": [s3_parquet_path]}, + format="parquet", + ) + spark_df = dynamic_frame.toDF() + return spark_df + + +def get_batch_request( + spark_dataset: "pyspark.sql.dataframe.DataFrame", + data_type: str, + run_id: RunIdentifier, +) -> RuntimeBatchRequest: + """Retrieves the unique metadata for this batch request + + Args: + spark_dataset (pyspark.sql.dataframe.DataFrame): parquet dataset as spark df + data_type (str): data type name + run_id (RunIdentifier): contains the run name and + run time metadata of this batch run + + Returns: + RuntimeBatchRequest: contains metadata for the batch run request + to identify this great expectations run + """ + batch_request = RuntimeBatchRequest( + datasource_name="spark_datasource", + data_connector_name="runtime_data_connector", + data_asset_name=f"{data_type}-parquet-data-asset", + runtime_parameters={"batch_data": spark_dataset}, + batch_identifiers={"batch_identifier": f"{data_type}_{run_id.run_name}_batch"}, + ) + return batch_request + + +def read_json( + s3: boto3.client, + s3_bucket: str, + key: str, +) -> Dict[str, str]: + """Reads in a json object + + Args: + s3 (boto3.client): s3 client connection + s3_bucket (str): name of the s3 bucket to read from + key (str): s3 key prefix of the + location of the json to read from + + Returns: + Dict[str, str]: the data read in from json + """ + # read in the json filelist + s3_response_object = s3.get_object(Bucket=s3_bucket, Key=key) + json_content = s3_response_object["Body"].read().decode("utf-8") + expectations = json.loads(json_content) + return expectations + + +def add_expectations_from_json( + expectations_data: Dict[str, str], + context: "EphemeralDataContext", + data_type: str, +) -> "EphemeralDataContext": + """Adds in the read in expectations to the context object + + Args: + expectations_data (Dict[str, str]): expectations + context (EphemeralDataContext): context object + data_type (str): name of the data type + + Raises: + ValueError: thrown when no expectations exist for this data type + + Returns: + EphemeralDataContext: context object with expectations added + """ + # Ensure the data type exists in the JSON file + if data_type not in expectations_data: + raise ValueError(f"No expectations found for data type '{data_type}'") + + # Extract the expectation suite and expectations for the dataset + suite_data = expectations_data[data_type] + expectation_suite_name = suite_data["expectation_suite_name"] + new_expectations = suite_data["expectations"] + + # Convert new expectations from JSON format to ExpectationConfiguration objects + new_expectations_configs = [ + ExpectationConfiguration( + expectation_type=exp["expectation_type"], kwargs=exp["kwargs"] + ) + for exp in new_expectations + ] + + # Update the expectation suite in the data context + context.add_or_update_expectation_suite( + expectation_suite_name=expectation_suite_name, + expectations=new_expectations_configs, + ) + return context + + +def add_validation_results_to_store( + context: "EphemeralDataContext", + expectation_suite_name: str, + validation_result: Dict[str, str], + batch_identifier: RuntimeBatchRequest, + run_identifier: RunIdentifier, +) -> "EphemeralDataContext": + """Adds the validation results manually to the validation store. + This is a workaround for a EphemeralDataContext context object, + and for us to avoid complicating our folder structure to include + checkpoints/other more persistent data context object types + until we need that feature + + Args: + context (EphemeralDataContext): context object to add results to + expectation_suite_name (str): name of expectation suite + validation_result (Dict[str, str]): results outputted by gx + validator to be stored + batch_identifier (RuntimeBatchRequest): metadata containing details of + the batch request + run_identifier (RunIdentifier): metadata containing details of the gx run + + Returns: + EphemeralDataContext: context object with validation results added to + """ + expectation_suite = context.get_expectation_suite(expectation_suite_name) + # Create an ExpectationSuiteIdentifier + expectation_suite_identifier = ExpectationSuiteIdentifier( + expectation_suite_name=expectation_suite.expectation_suite_name + ) + + # Create a ValidationResultIdentifier using the run_id, expectation suite, and batch identifier + validation_result_identifier = ValidationResultIdentifier( + expectation_suite_identifier=expectation_suite_identifier, + batch_identifier=batch_identifier, + run_id=run_identifier, + ) + + context.validations_store.set(validation_result_identifier, validation_result) + return context + + +def main(): + args = read_args() + run_id = RunIdentifier(run_name=f"run_{datetime.now().strftime('%Y%m%d_%H%M%S')}") + expectation_suite_name = f"{args['data_type']}_expectations" + s3 = boto3.client("s3") + context = create_context( + s3_bucket=args["shareable_artifacts_bucket"], + namespace=args["namespace"], + key_prefix=f"great_expectation_reports/{args['data_type']}/parquet/", + ) + glue_context = GlueContext(SparkContext.getOrCreate()) + logger.info("get_spark_df") + spark_df = get_spark_df( + glue_context=glue_context, + parquet_bucket=args["parquet_bucket"], + namespace=args["namespace"], + data_type=args["data_type"], + ) + logger.info("get_batch_request") + batch_request = get_batch_request(spark_df, args["data_type"], run_id) + logger.info("add_expectations") + + # Load the JSON file with the expectations + logger.info("reads_expectations_from_json") + expectations_data = read_json( + s3=s3, + s3_bucket=args["cfn_bucket"], + key=args["expectation_suite_key"], + ) + logger.info("adds_expectations_from_json") + add_expectations_from_json( + expectations_data=expectations_data, + context=context, + data_type=args["data_type"], + ) + logger.info("get_validator") + validator = context.get_validator( + batch_request=batch_request, + expectation_suite_name=expectation_suite_name, + ) + logger.info("validator.validate") + validation_result = validator.validate() + + logger.info("validation_result: %s", validation_result) + + add_validation_results_to_store( + context, + expectation_suite_name, + validation_result, + batch_identifier=batch_request["batch_identifiers"]["batch_identifier"], + run_identifier=run_id, + ) + context.build_data_docs( + site_names=["s3_site"], + ) + logger.info("data docs saved!") + + +if __name__ == "__main__": + main() diff --git a/src/glue/resources/data_values_expectations.json b/src/glue/resources/data_values_expectations.json new file mode 100644 index 00000000..05473c0e --- /dev/null +++ b/src/glue/resources/data_values_expectations.json @@ -0,0 +1,42 @@ +{ + "fitbitdailydata": { + "expectation_suite_name": "fitbitdailydata_expectations", + "expectations": [ + { + "expectation_type": "expect_column_values_to_be_between", + "kwargs": { + "column": "Calories", + "min_value": 300, + "max_value": 25000 + } + }, + { + "expectation_type": "expect_column_values_to_be_between", + "kwargs": { + "column": "Steps", + "min_value": 1, + "max_value": 200000 + } + }, + { + "expectation_type": "expect_column_values_to_be_between", + "kwargs": { + "column": "BreathingRate", + "min_value": 4, + "max_value": 40 + } + } + ] + }, + "healthkitv2workouts": { + "expectation_suite_name": "healthkitv2workouts_expectations", + "expectations": [ + { + "expectation_type": "expect_column_to_exist", + "kwargs": { + "column": "HealthKitWorkoutKey" + } + } + ] + } +} diff --git a/templates/glue-job-role.yaml b/templates/glue-job-role.yaml index 5546f12b..efbbd3cf 100644 --- a/templates/glue-job-role.yaml +++ b/templates/glue-job-role.yaml @@ -20,6 +20,9 @@ Parameters: Type: String Description: Name of the S3 bucket where cloudformation templates and tests are stored. + S3ShareableArtifactBucketName: + Type: String + Description: Name of the S3 bucket where the shareable artifacts (like great expectations reports) are stored. Resources: @@ -65,6 +68,8 @@ Resources: - !Sub arn:aws:s3:::${S3IntermediateBucketName}/* - !Sub arn:aws:s3:::${S3ParquetBucketName} - !Sub arn:aws:s3:::${S3ParquetBucketName}/* + - !Sub arn:aws:s3:::${S3ShareableArtifactBucketName} + - !Sub arn:aws:s3:::${S3ShareableArtifactBucketName}/* - PolicyName: ReadS3 PolicyDocument: Version: '2012-10-17' diff --git a/templates/glue-job-run-great-expectations-on-parquet.j2 b/templates/glue-job-run-great-expectations-on-parquet.j2 new file mode 100644 index 00000000..455b5e89 --- /dev/null +++ b/templates/glue-job-run-great-expectations-on-parquet.j2 @@ -0,0 +1,104 @@ +AWSTemplateFormatVersion: '2010-09-09' +Description: >- + An AWS Glue job in the data catalog. An AWS Glue job encapsulates a script + that connects to your source data, processes it, and then writes it out + to your data target. + +Parameters: + Namespace: + Type: String + Description: >- + The namespace string used to for the individual glue job names + + JobDescription: + Type: String + Description: A fuller description of what the job does. + Default: '' + + JobRole: + Type: String + Description: The name or ARN of the IAM role that will run this job. + + TempS3Bucket: + Type: String + Description: The name of the S3 bucket where temporary files and logs are written. + + S3ScriptBucket: + Type: String + Description: The name of the S3 bucket where the script file is located. + + S3ScriptKey: + Type: String + Description: The bucket key where the script file is located. + + GlueVersion: + Type: String + Description: The version of glue to use for this job + + DefaultWorkerType: + Type: String + Description: >- + Which worker type to use for this job. + Default: 'Standard' + + DefaultNumberOfWorkers: + Type: Number + Description: >- + How many DPUs to allot to this job. This parameter is not used for types + FitbitIntradayCombined and HealthKitV2Samples. + Default: 1 + + MaxRetries: + Type: Number + Description: How many times to retry the job if it fails (integer). + Default: 0 # TODO change this to 1 after initial development + + TimeoutInMinutes: + Type: Number + Description: The job timeout in minutes (integer). + Default: 1200 + + AdditionalPythonModules: + Type: String + Description: >- + Additional python packages to install as a comma-delimited list. + Any format supported by pip3 is supported here. + + +Resources: + + {% set datasets = [] %} + {% for v in sceptre_user_data.dataset_schemas.tables.keys() if not "Deleted" in v %} + {% set dataset = {} %} + {% do dataset.update({"type": v}) %} + {% do dataset.update({"table_name": "dataset_" + v.lower()})%} + {% do dataset.update({"stackname_prefix": "{}".format(v.replace("_",""))}) %} + {% do datasets.append(dataset) %} + {% endfor %} + + {% for dataset in datasets %} + {{ dataset["stackname_prefix"] }}GreatExpectationsParquetJob: + Type: AWS::Glue::Job + Properties: + Command: + Name: glueetl + ScriptLocation: !Sub s3://${S3ScriptBucket}/${S3ScriptKey} + DefaultArguments: + --TempDir: !Sub s3://${TempS3Bucket}/tmp + --enable-continuous-cloudwatch-log: true + --enable-metrics: true + --enable-spark-ui: true + --spark-event-logs-path: !Sub s3://${TempS3Bucket}/spark-logs/${AWS::StackName}/ + --job-bookmark-option: job-bookmark-disable + --job-language: python + --additional-python-modules: !Ref AdditionalPythonModules + # --conf spark.sql.adaptive.enabled + Description: !Sub "${JobDescription} for data type {{ dataset['type'] }}" + GlueVersion: !Ref GlueVersion + MaxRetries: !Ref MaxRetries + Name: !Sub "${Namespace}-{{ dataset["stackname_prefix"] }}-GreatExpectationsParquetJob" + WorkerType: !Ref DefaultWorkerType + NumberOfWorkers: !Ref DefaultNumberOfWorkers + Role: !Ref JobRole + Timeout: !Ref TimeoutInMinutes + {% endfor %} diff --git a/templates/glue-workflow.j2 b/templates/glue-workflow.j2 index c5ce9e9b..16d8950d 100644 --- a/templates/glue-workflow.j2 +++ b/templates/glue-workflow.j2 @@ -82,6 +82,14 @@ Parameters: Description: >- The name of the bucket where the cloudformation and artifacts are stored. + ShareableArtifactsBucketName: + Type: String + Description: The name of the bucket where shareable artifacts are stored. + + ExpectationSuiteKey: + Type: String + Description: The s3 key prefix of the expectation suite. + Conditions: IsMainNamespace: !Equals [!Ref Namespace, "main"] IsDevelopmentNamespace: !Not [!Equals [!Ref Namespace, "main"]] @@ -270,11 +278,11 @@ Resources: StartOnCreation: true WorkflowName: !Ref JsonToParquetWorkflow - JsontoParquetCompleteTrigger: + CompareParquetTrigger: Type: AWS::Glue::Trigger Condition: IsDevelopmentNamespace Properties: - Name: !Sub "${Namespace}-JsontoParquetCompleteTrigger" + Name: !Sub "${Namespace}-CompareParquetTrigger" Actions: {% for dataset in datasets %} - JobName: !Sub ${Namespace}-{{ dataset["stackname_prefix"] }}-CompareParquetJob @@ -287,7 +295,7 @@ Resources: "--parquet-bucket": !Ref ParquetBucketName "--additional-python-modules": "datacompy~=0.8 flask~=2.0 flask-cors~=3.0" {% endfor %} - Description: This trigger runs after completion of all JSON to Parquet jobs + Description: This trigger runs the compare parquet jobs after completion of all JSON to Parquet jobs Type: CONDITIONAL Predicate: Conditions: @@ -300,6 +308,33 @@ Resources: StartOnCreation: true WorkflowName: !Ref JsonToParquetWorkflow + + {% for dataset in datasets if dataset["data_type"].lower() in sceptre_user_data.data_values_expectations %} + {{ dataset['stackname_prefix'] }}GreatExpectationsParquetTrigger: + Type: AWS::Glue::Trigger + Properties: + Name: !Sub "${Namespace}-{{ dataset['stackname_prefix'] }}GreatExpectationsParquetTrigger" + Actions: + - JobName: !Sub ${Namespace}-{{ dataset["stackname_prefix"] }}-GreatExpectationsParquetJob + Arguments: + "--data-type": {{ dataset["data_type"].lower() }} + "--namespace": !Ref Namespace + "--cfn-bucket": !Ref CloudformationBucketName + "--parquet-bucket": !Ref ParquetBucketName + "--shareable-artifacts-bucket": !Ref ShareableArtifactsBucketName + "--expectation-suite-key": !Ref ExpectationSuiteKey + "--additional-python-modules": "great_expectations~=0.18,urllib3<2" + Description: This trigger runs the great expectation parquet job for this data type after completion of the JSON to Parquet job for this data type + Type: CONDITIONAL + Predicate: + Conditions: + - JobName: !Sub "${Namespace}-{{ dataset['stackname_prefix'] }}-Job" + State: SUCCEEDED + LogicalOperator: EQUALS + StartOnCreation: true + WorkflowName: !Ref JsonToParquetWorkflow + {% endfor %} + Outputs: S3ToJsonWorkflowName: diff --git a/tests/Dockerfile.aws_glue_4 b/tests/Dockerfile.aws_glue_4 index 485ecf55..bf19d84d 100644 --- a/tests/Dockerfile.aws_glue_4 +++ b/tests/Dockerfile.aws_glue_4 @@ -1,4 +1,4 @@ FROM amazon/aws-glue-libs:glue_libs_4.0.0_image_01 -RUN pip3 install moto~=4.1 pytest-datadir ecs_logging~=2.0 +RUN pip3 install moto~=4.1 pytest-datadir ecs_logging~=2.0 great_expectations~=0.18 ENTRYPOINT ["bash", "-l"] diff --git a/tests/README.md b/tests/README.md index 729ade60..5f4093c0 100644 --- a/tests/README.md +++ b/tests/README.md @@ -1,6 +1,9 @@ -### Running tests +### Tests in RECOVER Tests are defined in the `tests` folder in this project. +### Running tests locally +See here for information on how to run the tests locally especially when you are adding tests. + #### Running tests using Docker All tests can be run inside a Docker container which includes all the necessary Glue/Spark dependencies and simulates the environment which the Glue jobs @@ -17,6 +20,7 @@ These scripts needs to be run inside a Docker container: - Under AWS 4.0 (Dockerfile.aws_glue_4) - test_json_to_parquet.py (Note that these tests deploys test resources to aws and will take several min to run) + - test_run_great_expectations_on_parquet.py Run the following commands to run tests for: @@ -83,6 +87,8 @@ pytest with other tests because they have to be run in a Dockerfile with the AWS #### Running tests for lambda Run the following command from the repo root to run tests for the lambda functions (in develop). +Example: + ```shell script python3 -m pytest tests/test_s3_to_glue_lambda.py -v ``` @@ -103,3 +109,6 @@ python3 -m pytest tests/test_setup_external_storage.py --namespace --test_sts_permission ``` + +#### Adding tests to Recover CI/CD +Tests are run automatically as part of `upload-and-deploy.yaml` Github workflow. See [Github Workflows README](.github/workflows/README.md#adding-test-commands-to-github-workflow-jobs) for more details on the workflow. diff --git a/tests/test_run_great_expectations_on_parquet.py b/tests/test_run_great_expectations_on_parquet.py new file mode 100644 index 00000000..074e1407 --- /dev/null +++ b/tests/test_run_great_expectations_on_parquet.py @@ -0,0 +1,381 @@ +from unittest.mock import MagicMock, patch + +import great_expectations as gx +import pytest +from great_expectations.core.batch import RuntimeBatchRequest +from great_expectations.core.run_identifier import RunIdentifier +from great_expectations.core.yaml_handler import YAMLHandler +from great_expectations.data_context.types.resource_identifiers import ( + ExpectationSuiteIdentifier, + ValidationResultIdentifier, +) +from pyspark.sql import SparkSession + +from src.glue.jobs import run_great_expectations_on_parquet as run_gx_on_pq + + +@pytest.fixture +def test_context(scope="function"): + context = gx.get_context() + yield context + + +@pytest.fixture(scope="function") +def test_spark(): + yield SparkSession.builder.appName("BatchRequestTest").getOrCreate() + + +def test_create_context(): + with ( + patch.object(gx, "get_context") as mock_get_context, + patch.object(run_gx_on_pq, "add_datasource") as mock_add_datasource, + patch.object( + run_gx_on_pq, "add_validation_stores" + ) as mock_add_validation_stores, + patch.object(run_gx_on_pq, "add_data_docs_sites") as mock_add_data_docs_sites, + ): + mock_context = MagicMock() + mock_get_context.return_value = mock_context + + s3_bucket = "test-bucket" + namespace = "test-namespace" + key_prefix = "test-prefix" + + # Call the function + result_context = run_gx_on_pq.create_context(s3_bucket, namespace, key_prefix) + + # Assert that the context returned is the mock context + assert result_context == mock_context + + # Assert that the other functions were called + mock_add_datasource.assert_called_once_with(mock_context) + mock_add_validation_stores.assert_called_once_with( + mock_context, s3_bucket, namespace, key_prefix + ) + mock_add_data_docs_sites.assert_called_once_with( + mock_context, s3_bucket, namespace, key_prefix + ) + + +def test_that_add_datasource_calls_correctly(): + mock_context = MagicMock() + result_context = run_gx_on_pq.add_datasource(mock_context) + + # Verify that the datasource was added + mock_context.add_datasource.assert_called_once() + assert result_context == mock_context + + +@pytest.mark.integration +def test_that_add_datasource_adds_correctly(test_context): + # Assuming you've already added a datasource, you can list it + run_gx_on_pq.add_datasource(test_context) + datasources = test_context.list_datasources() + + # Define the expected datasource name + expected_datasource_name = "spark_datasource" + + # Check that the expected datasource is present and other details are correct + assert any( + ds["name"] == expected_datasource_name for ds in datasources + ), f"Datasource '{expected_datasource_name}' was not added correctly." + datasource = next( + ds for ds in datasources if ds["name"] == expected_datasource_name + ) + assert datasource["class_name"] == "Datasource" + assert "SparkDFExecutionEngine" in datasource["execution_engine"]["class_name"] + + +def test_add_validation_stores_has_expected_calls(): + mock_context = MagicMock() + s3_bucket = "test-bucket" + namespace = "test-namespace" + key_prefix = "test-prefix" + + with patch.object(mock_context, "add_store") as mock_add_store: + # Call the function + result_context = run_gx_on_pq.add_validation_stores( + mock_context, s3_bucket, namespace, key_prefix + ) + + # Verify that the validation store is added + mock_add_store.assert_called_once_with( + "validation_result_store", + { + "class_name": "ValidationsStore", + "store_backend": { + "class_name": "TupleS3StoreBackend", + "bucket": s3_bucket, + "prefix": f"{namespace}/{key_prefix}", + }, + }, + ) + + assert result_context == mock_context + + +@pytest.mark.integration +def test_validation_store_details(test_context): + # Mock context and stores + run_gx_on_pq.add_validation_stores( + test_context, + s3_bucket="test-bucket", + namespace="test", + key_prefix="test_folder/", + ) + + # Run the test logic + stores = test_context.list_stores() + expected_store_name = "validation_result_store" + + assert any(store["name"] == expected_store_name for store in stores) + # pulls the store we want + store_config = [store for store in stores if store["name"] == expected_store_name][ + 0 + ] + + assert store_config["class_name"] == "ValidationsStore" + assert store_config["store_backend"]["class_name"] == "TupleS3StoreBackend" + assert store_config["store_backend"]["bucket"] == "test-bucket" + assert store_config["store_backend"]["prefix"] == "test/test_folder/" + + +def test_get_spark_df_has_expected_calls(): + glue_context = MagicMock() + mock_dynamic_frame = MagicMock() + mock_spark_df = MagicMock() + mock_dynamic_frame.toDF.return_value = mock_spark_df + + with patch.object( + glue_context, "create_dynamic_frame_from_options" + ) as mock_create_dynamic_frame: + mock_create_dynamic_frame.return_value = mock_dynamic_frame + + parquet_bucket = "test-bucket" + namespace = "test-namespace" + data_type = "test-data" + + result_df = run_gx_on_pq.get_spark_df( + glue_context, parquet_bucket, namespace, data_type + ) + + # Verify the S3 path and the creation of the DynamicFrame + expected_path = f"s3://test-bucket/test-namespace/parquet/dataset_test-data/" + mock_create_dynamic_frame.assert_called_once_with( + connection_type="s3", + connection_options={"paths": [expected_path]}, + format="parquet", + ) + + # Verify the conversion to DataFrame + assert result_df == mock_spark_df + + +def test_get_batch_request(): + spark_dataset = MagicMock() + data_type = "test-data" + run_id = RunIdentifier(run_name="2023_09_04") + + batch_request = run_gx_on_pq.get_batch_request(spark_dataset, data_type, run_id) + + # Verify the RuntimeBatchRequest is correctly set up + assert isinstance(batch_request, RuntimeBatchRequest) + assert batch_request.data_asset_name == f"{data_type}-parquet-data-asset" + assert batch_request.batch_identifiers == { + "batch_identifier": f"{data_type}_{run_id.run_name}_batch" + } + assert batch_request.runtime_parameters == {"batch_data": spark_dataset} + + +@pytest.mark.integration +def test_that_get_batch_request_details_are_correct(test_spark): + # Create a simple PySpark DataFrame to simulate the dataset + data = [("Alice", 34), ("Bob", 45), ("Charlie", 29)] + columns = ["name", "age"] + spark_dataset = test_spark.createDataFrame(data, columns) + + # Create a RunIdentifier + run_id = RunIdentifier(run_name="test_run_2023") + + # Call the function and get the RuntimeBatchRequest + data_type = "user_data" + batch_request = run_gx_on_pq.get_batch_request(spark_dataset, data_type, run_id) + + # Assertions to check that the batch request is properly populated + assert isinstance(batch_request, RuntimeBatchRequest) + assert batch_request.datasource_name == "spark_datasource" + assert batch_request.data_connector_name == "runtime_data_connector" + assert batch_request.data_asset_name == "user_data-parquet-data-asset" + assert ( + batch_request.batch_identifiers["batch_identifier"] + == "user_data_test_run_2023_batch" + ) + assert batch_request.runtime_parameters["batch_data"] == spark_dataset + + +def test_read_json_correctly_returns_expected_values(): + s3_bucket = "test-bucket" + key = "test-key" + + # Mock the S3 response + mock_s3_response = MagicMock() + mock_s3_response["Body"].read.return_value = '{"test_key": "test_value"}'.encode( + "utf-8" + ) + + # Use patch to mock the boto3 s3 client + with patch("boto3.client") as mock_s3_client: + # Mock get_object method + mock_s3_client.return_value.get_object.return_value = mock_s3_response + + # Call the function + result = run_gx_on_pq.read_json(mock_s3_client.return_value, s3_bucket, key) + + # Verify that the S3 client was called with the correct parameters + mock_s3_client.return_value.get_object.assert_called_once_with( + Bucket=s3_bucket, Key=key + ) + + # Verify the result + assert result == {"test_key": "test_value"} + + +def test_that_add_expectations_from_json_has_expected_call(): + mock_context = MagicMock() + + # Sample expectations data + expectations_data = { + "test-data": { + "expectation_suite_name": "test_suite", + "expectations": [ + { + "expectation_type": "expect_column_to_exist", + "kwargs": {"column": "test_column"}, + }, + ], + } + } + + data_type = "test-data" + + # Call the function + run_gx_on_pq.add_expectations_from_json(expectations_data, mock_context, data_type) + + # Verify expectations were added to the context + mock_context.add_or_update_expectation_suite.assert_called_once() + + +def test_that_add_expectations_from_json_throws_value_error(): + mock_context = MagicMock() + + # Sample expectations data + expectations_data = { + "not-test-data": { + "expectation_suite_name": "test_suite", + "expectations": [ + { + "expectation_type": "expect_column_to_exist", + "kwargs": {"column": "test_column"}, + }, + ], + } + } + + data_type = "test-data" + with pytest.raises( + ValueError, match="No expectations found for data type 'test-data'" + ): + run_gx_on_pq.add_expectations_from_json( + expectations_data, mock_context, data_type + ) + + +@pytest.mark.integration +def test_add_expectations_from_json_adds_details_correctly(test_context): + # Mock expectations data + expectations_data = { + "user_data": { + "expectation_suite_name": "user_data_suite", + "expectations": [ + { + "expectation_type": "expect_column_to_exist", + "kwargs": {"column": "user_id"}, + }, + { + "expectation_type": "expect_column_values_to_be_between", + "kwargs": {"column": "age", "min_value": 18, "max_value": 65}, + }, + ], + } + } + + data_type = "user_data" + + # Call the function to add expectations + test_context = run_gx_on_pq.add_expectations_from_json( + expectations_data, test_context, data_type + ) + + # Retrieve the expectation suite to verify that expectations were added + expectation_suite = test_context.get_expectation_suite("user_data_suite") + + assert expectation_suite.expectation_suite_name == "user_data_suite" + assert len(expectation_suite.expectations) == 2 + + # Verify the details of the first expectation + first_expectation = expectation_suite.expectations[0] + assert first_expectation.expectation_type == "expect_column_to_exist" + assert first_expectation.kwargs == {"column": "user_id"} + + # Verify the details of the second expectation + second_expectation = expectation_suite.expectations[1] + assert second_expectation.expectation_type == "expect_column_values_to_be_between" + assert second_expectation.kwargs == { + "column": "age", + "min_value": 18, + "max_value": 65, + } + + +def test_that_add_validation_results_to_store_has_expected_calls(): + # Mock the EphemeralDataContext and the necessary components + mock_context = MagicMock() + mock_expectation_suite = MagicMock() + mock_context.get_expectation_suite.return_value = mock_expectation_suite + mock_expectation_suite.expectation_suite_name = "test_suite" + + # Mock the validation result data + validation_result = {"result": "test_result"} + + # Create a mock batch identifier and run identifier + mock_batch_identifier = MagicMock(spec=RuntimeBatchRequest) + mock_run_identifier = MagicMock(spec=RunIdentifier) + + # Call the function with mocked inputs + result_context = run_gx_on_pq.add_validation_results_to_store( + context=mock_context, + expectation_suite_name="test_suite", + validation_result=validation_result, + batch_identifier=mock_batch_identifier, + run_identifier=mock_run_identifier, + ) + + # Assert that the expectation suite was retrieved correctly + mock_context.get_expectation_suite.assert_called_once_with("test_suite") + + expected_expectation_suite_identifier = ExpectationSuiteIdentifier( + expectation_suite_name="test_suite" + ) + expected_validation_result_identifier = ValidationResultIdentifier( + expectation_suite_identifier=expected_expectation_suite_identifier, + batch_identifier=mock_batch_identifier, + run_id=mock_run_identifier, + ) + + # Verify that the validation result was added to the validations store + mock_context.validations_store.set.assert_called_once_with( + expected_validation_result_identifier, validation_result + ) + + # Check that the context is returned + assert result_context == mock_context