From 18896156aac8c62c3fe098b546ee601ff386da98 Mon Sep 17 00:00:00 2001 From: ferruzzi Date: Mon, 16 May 2022 12:26:17 -0700 Subject: [PATCH] Convert Athena Sample DAG to System Test --- .../example-dags.rst | 23 +++ .../apache-airflow-providers-amazon/index.rst | 2 +- .../operators/athena.rst | 4 +- tests/providers/amazon/aws/system/__init__.py | 16 ++ .../amazon/aws/system/utils/__init__.py | 16 ++ .../amazon/aws/system/utils/test_helpers.py | 144 +++++++++++++++ tests/system/providers/amazon/README.md | 77 ++++++++ .../providers/amazon/aws/example_athena.py | 173 ++++++++++++++++++ .../providers/amazon/aws/utils/__init__.py | 137 ++++++++++++++ 9 files changed, 589 insertions(+), 3 deletions(-) create mode 100644 docs/apache-airflow-providers-amazon/example-dags.rst create mode 100644 tests/providers/amazon/aws/system/__init__.py create mode 100644 tests/providers/amazon/aws/system/utils/__init__.py create mode 100644 tests/providers/amazon/aws/system/utils/test_helpers.py create mode 100644 tests/system/providers/amazon/README.md create mode 100644 tests/system/providers/amazon/aws/example_athena.py create mode 100644 tests/system/providers/amazon/aws/utils/__init__.py diff --git a/docs/apache-airflow-providers-amazon/example-dags.rst b/docs/apache-airflow-providers-amazon/example-dags.rst new file mode 100644 index 0000000000000..7921246429df8 --- /dev/null +++ b/docs/apache-airflow-providers-amazon/example-dags.rst @@ -0,0 +1,23 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +Example DAGs +============ + +You can learn how to use Amazon AWS integrations by analyzing the source code of the example DAGs: + +* `Amazon AWS `__ diff --git a/docs/apache-airflow-providers-amazon/index.rst b/docs/apache-airflow-providers-amazon/index.rst index 41fba1f06e283..e45b62c272f58 100644 --- a/docs/apache-airflow-providers-amazon/index.rst +++ b/docs/apache-airflow-providers-amazon/index.rst @@ -40,7 +40,7 @@ Content :maxdepth: 1 :caption: Resources - Example DAGs + Example DAGs PyPI Repository Installing from sources diff --git a/docs/apache-airflow-providers-amazon/operators/athena.rst b/docs/apache-airflow-providers-amazon/operators/athena.rst index 1e132dc192003..3b1fd3b6e83c2 100644 --- a/docs/apache-airflow-providers-amazon/operators/athena.rst +++ b/docs/apache-airflow-providers-amazon/operators/athena.rst @@ -45,7 +45,7 @@ In the following example, we query an existing Athena table and send the results an existing Amazon S3 bucket. For more examples of how to use this operator, please see the `Sample DAG `__. -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_athena.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_athena.py :language: python :start-after: [START howto_operator_athena] :dedent: 4 @@ -62,7 +62,7 @@ Wait on Amazon Athena query results Use the :class:`~airflow.providers.amazon.aws.sensors.athena.AthenaSensor` to wait for the results of a query in Amazon Athena. -.. exampleinclude:: /../../airflow/providers/amazon/aws/example_dags/example_athena.py +.. exampleinclude:: /../../tests/system/providers/amazon/aws/example_athena.py :language: python :start-after: [START howto_sensor_athena] :dedent: 4 diff --git a/tests/providers/amazon/aws/system/__init__.py b/tests/providers/amazon/aws/system/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/providers/amazon/aws/system/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/amazon/aws/system/utils/__init__.py b/tests/providers/amazon/aws/system/utils/__init__.py new file mode 100644 index 0000000000000..13a83393a9124 --- /dev/null +++ b/tests/providers/amazon/aws/system/utils/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/tests/providers/amazon/aws/system/utils/test_helpers.py b/tests/providers/amazon/aws/system/utils/test_helpers.py new file mode 100644 index 0000000000000..fd4c7e1f579fa --- /dev/null +++ b/tests/providers/amazon/aws/system/utils/test_helpers.py @@ -0,0 +1,144 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +This module contains the unit tests for the helper methods included in the Amazon System Tests found at +tests/system/providers/amazon/aws/utils/__init__.py +""" +import io +import os +import sys +from unittest.mock import ANY, patch + +import pytest + +from tests.system.providers.amazon.aws import utils +from tests.system.providers.amazon.aws.utils import ( + DEFAULT_ENV_ID_LEN, + DEFAULT_ENV_ID_PREFIX, + ENV_ID_ENVIRON_KEY, + INVALID_ENV_ID_MSG, + LOWERCASE_ENV_ID_MSG, + NO_VALUE_MSG, + _validate_env_id, + set_env_id, +) + +try: + from moto import mock_ssm +except ImportError: + mock_ssm = None + +TEST_NAME: str = 'example_test' +ANY_STR: str = 'any' + +ENV_VALUE: str = 'foo' +SSM_VALUE: str = 'bar' +DEFAULT_VALUE: str = 'baz' + + +@pytest.fixture(autouse=True) +def provide_test_name(): + with patch.object(utils, '_get_test_name', return_value=TEST_NAME) as name: + yield name + + +@pytest.mark.skipif(mock_ssm is None, reason='mock_ssm package not present') +@mock_ssm +class TestAmazonSystemTestHelpers: + FETCH_VARIABLE_TEST_CASES = [ + # Format is: + # (Environment Variable value, Fetched SSM value, Provided Default value, Expected Result) + (ENV_VALUE, SSM_VALUE, DEFAULT_VALUE, ENV_VALUE), + (ENV_VALUE, SSM_VALUE, None, ENV_VALUE), + (ENV_VALUE, None, DEFAULT_VALUE, ENV_VALUE), + (ENV_VALUE, None, None, ENV_VALUE), + (None, SSM_VALUE, DEFAULT_VALUE, SSM_VALUE), + (None, SSM_VALUE, None, SSM_VALUE), + (None, None, DEFAULT_VALUE, DEFAULT_VALUE), + # For the (None, None, None ) test case, see: test_fetch_variable_no_value_found_raises_exception + ] + + @pytest.mark.parametrize( + 'env_value, ssm_value, default_value, expected_result', FETCH_VARIABLE_TEST_CASES + ) + @patch.object(os, 'getenv') + def test_fetch_variable_success( + self, mock_getenv, env_value, ssm_value, default_value, expected_result + ) -> None: + mock_getenv.return_value = env_value if env_value else ssm_value + + result = utils.fetch_variable(ANY, default_value) if default_value else utils.fetch_variable(ANY_STR) + + assert result == expected_result + + def test_fetch_variable_no_value_found_raises_exception(self): + # This would be the (None, None, None) test case from above. + with pytest.raises(ValueError) as raised_exception: + utils.fetch_variable(ANY_STR) + + assert NO_VALUE_MSG.format(key=ANY_STR) in str(raised_exception.value) + + ENV_ID_TEST_CASES = [ + # Happy Cases + ('ABCD', True), + ('AbCd', True), + ('abcd', True), + ('ab12', True), + # Failure Cases + # Must be alphanumeric + ('not_alphanumeric', False), + # Can not be empty + ('', False), + # Must start with a letter + ('1234', False), + ('12ab', False), + ('12AB', False), + ('12Ab', False), + ] + + @pytest.mark.parametrize('env_id, is_valid', ENV_ID_TEST_CASES) + def test_validate_env_id_success(self, env_id, is_valid): + if is_valid: + captured_output = io.StringIO() + sys.stdout = captured_output + + result = _validate_env_id(env_id) + sys.stdout = sys.__stdout__ + + assert result == env_id.lower() + assert result.isalnum() + if not result == env_id: + assert LOWERCASE_ENV_ID_MSG in captured_output.getvalue() + else: + with pytest.raises(ValueError) as raised_exception: + _validate_env_id(env_id) + + assert INVALID_ENV_ID_MSG in str(raised_exception.value) + + def test_set_env_id_generates_if_required(self): + # No environment variable nor SSM value has been found + result = set_env_id() + + assert len(result) == DEFAULT_ENV_ID_LEN + len(DEFAULT_ENV_ID_PREFIX) + assert result.isalnum() + assert result.islower() + + def test_set_env_id_exports_environment_variable(self): + env_id = set_env_id() + + assert os.environ[ENV_ID_ENVIRON_KEY] == env_id diff --git a/tests/system/providers/amazon/README.md b/tests/system/providers/amazon/README.md new file mode 100644 index 0000000000000..d87b5828d4783 --- /dev/null +++ b/tests/system/providers/amazon/README.md @@ -0,0 +1,77 @@ + + +# Amazon provider system tests + +## Tests structure + +All AWS-related system tests are located inside `tests/system/providers/amazon/aws/`. +In this directory you will find test files in the form of Example DAGs, one DAG per file. +Each test should be self-contained but in the case where additional resources are required, +they can be found in the `resources` directory on the same level as tests or noted in the +test's docstring. Each test file should start with prefix `example_*`. + +Example directory structure: + +``` +tests/system/providers/amazon/aws +├── resources +│ ├── example_athena_data.csv +│ └── example_sagemaker_constants.py +├── example_athena.py +├── example_batch.py +. +├── example_step_functions.py +└── * +``` + +## Initial configuration + +Each test requires some environment variables. Check how to set them up on your +operating system, but on UNIX-based operating systems `export NAME_OF_ENV_VAR=value` +should work. To confirm that it is set up correctly, run `echo $NAME_OF_ENV_VAR` +which will display its value. + +When manually running tests using pytest, you can define them inline with the command. +For example: + +```commandline +NAME_OF_ENV_VAR=value pytest --system amazon tests/system/providers/amazon/aws/example_test.py +``` + +### Required environment variables + +- `SYSTEM_TESTS_ENV_ID` - AWS System Tests will generate and export this value if one does not exist. + + An environment ID is a unique value across different executions of system tests. This + is needed because the CI environment may run the tests on various versions of Airflow + in parallel. If this is the case, the value of this variable ensures that resources + that are created during the tests will not interfere with each other. + + The value is used as part of the name for resources which have different requirements. + For example: an S3 bucket name can not use underscores, but an Athena table name can not + use hyphens. In order to minimize conflicts, this variable should be a randomized value + using only lowercase letters A-Z and digits 0-9, and start with a letter. + +## Settings for specific tests + +Amazon system test files are designed to be as self-contained as possible. They will contain +any sample data and configuration values which are required, and they will create and tear +down any required infrastructure. Some tests will require an IAM Role ARN, and the requirements +for those Roles should be documented inside the docstring of the test file. diff --git a/tests/system/providers/amazon/aws/example_athena.py b/tests/system/providers/amazon/aws/example_athena.py new file mode 100644 index 0000000000000..42d3abf55a1a6 --- /dev/null +++ b/tests/system/providers/amazon/aws/example_athena.py @@ -0,0 +1,173 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from datetime import datetime + +import boto3 + +from airflow import DAG +from airflow.decorators import task +from airflow.models.baseoperator import chain +from airflow.providers.amazon.aws.hooks.s3 import S3Hook +from airflow.providers.amazon.aws.operators.athena import AthenaOperator +from airflow.providers.amazon.aws.operators.s3 import ( + S3CreateBucketOperator, + S3CreateObjectOperator, + S3DeleteBucketOperator, +) +from airflow.providers.amazon.aws.sensors.athena import AthenaSensor +from tests.system.providers.amazon.aws.utils import set_env_id + +ENV_ID = set_env_id() +DAG_ID = 'example_athena' + +S3_BUCKET = f'{ENV_ID.lower()}-athena-bucket' +ATHENA_TABLE = f'{ENV_ID}_test_table' +ATHENA_DATABASE = f'{ENV_ID}_default' + +SAMPLE_DATA = '''"Alice",20 + "Bob",25 + "Charlie",30 + ''' +SAMPLE_FILENAME = 'airflow_sample.csv' + +QUERY_CREATE_DATABASE = f'CREATE DATABASE IF NOT EXISTS {ATHENA_DATABASE}' +QUERY_CREATE_TABLE = f'''CREATE EXTERNAL TABLE IF NOT EXISTS {ATHENA_DATABASE}.{ATHENA_TABLE} + ( `name` string, `age` int ) + ROW FORMAT SERDE "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe" + WITH SERDEPROPERTIES ( "serialization.format" = ",", "field.delim" = "," ) + LOCATION "s3://{S3_BUCKET}//{ATHENA_TABLE}" + TBLPROPERTIES ("has_encrypted_data"="false") + ''' +QUERY_READ_TABLE = f'SELECT * from {ATHENA_DATABASE}.{ATHENA_TABLE}' +QUERY_DROP_TABLE = f'DROP TABLE IF EXISTS {ATHENA_DATABASE}.{ATHENA_TABLE}' +QUERY_DROP_DATABASE = f'DROP DATABASE IF EXISTS {ATHENA_DATABASE}' + + +@task +def await_bucket(): + # Avoid a race condition after creating the S3 Bucket. + client = boto3.client('s3') + waiter = client.get_waiter('bucket_exists') + waiter.wait(Bucket=S3_BUCKET) + + +@task +def read_results_from_s3(query_execution_id): + s3_hook = S3Hook() + file_obj = s3_hook.get_conn().get_object(Bucket=S3_BUCKET, Key=f'{query_execution_id}.csv') + file_content = file_obj['Body'].read().decode('utf-8') + print(file_content) + + +with DAG( + dag_id=DAG_ID, + schedule_interval='@once', + start_date=datetime(2021, 1, 1), + tags=['example'], + catchup=False, +) as dag: + create_s3_bucket = S3CreateBucketOperator(task_id='create_s3_bucket', bucket_name=S3_BUCKET) + + upload_sample_data = S3CreateObjectOperator( + task_id='upload_sample_data', + s3_bucket=S3_BUCKET, + s3_key=f'{ATHENA_TABLE}/{SAMPLE_FILENAME}', + data=SAMPLE_DATA, + replace=True, + ) + + create_database = AthenaOperator( + task_id='create_database', + query=QUERY_CREATE_DATABASE, + database=ATHENA_DATABASE, + output_location=f's3://{S3_BUCKET}/', + ) + + create_table = AthenaOperator( + task_id='create_table', + query=QUERY_CREATE_TABLE, + database=ATHENA_DATABASE, + output_location=f's3://{S3_BUCKET}/', + ) + + # [START howto_operator_athena] + read_table = AthenaOperator( + task_id='read_table', + query=QUERY_READ_TABLE, + database=ATHENA_DATABASE, + output_location=f's3://{S3_BUCKET}/', + ) + # [END howto_operator_athena] + + # [START howto_sensor_athena] + await_query = AthenaSensor( + task_id='await_query', + query_execution_id=read_table.output, + ) + # [END howto_sensor_athena] + + drop_table = AthenaOperator( + task_id='drop_table', + query=QUERY_DROP_TABLE, + database=ATHENA_DATABASE, + output_location=f's3://{S3_BUCKET}/', + trigger_rule="all_done", + ) + + drop_database = AthenaOperator( + task_id='drop_database', + query=QUERY_DROP_DATABASE, + database=ATHENA_DATABASE, + output_location=f's3://{S3_BUCKET}/', + trigger_rule="all_done", + ) + + delete_s3_bucket = S3DeleteBucketOperator( + task_id='delete_s3_bucket', + bucket_name=S3_BUCKET, + force_delete=True, + trigger_rule="all_done", + ) + + chain( + # TEST SETUP + create_s3_bucket, + await_bucket(), + upload_sample_data, + create_database, + # TEST BODY + create_table, + read_table, + await_query, + read_results_from_s3(read_table.output), + # TEST TEARDOWN + drop_table, + drop_database, + delete_s3_bucket, + ) + + from tests.system.utils.watcher import watcher + + # This test needs watcher in order to properly mark success/failure + # when "tearDown" task with trigger rule is part of the DAG + list(dag.tasks) >> watcher() + + +from tests.system.utils import get_test_run # noqa: E402 + +# Needed to run the example DAG with pytest (see: tests/system/README.md#run_via_pytest) +test_run = get_test_run(dag) diff --git a/tests/system/providers/amazon/aws/utils/__init__.py b/tests/system/providers/amazon/aws/utils/__init__.py new file mode 100644 index 0000000000000..09b7fff0a18b1 --- /dev/null +++ b/tests/system/providers/amazon/aws/utils/__init__.py @@ -0,0 +1,137 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import inspect +import json +import os +from os.path import basename, splitext +from typing import Optional +from uuid import uuid4 + +import boto3 +from botocore.client import BaseClient +from botocore.exceptions import NoCredentialsError + +ENV_ID_ENVIRON_KEY: str = 'SYSTEM_TESTS_ENV_ID' +DEFAULT_ENV_ID_PREFIX: str = 'env' +DEFAULT_ENV_ID_LEN: int = 8 +DEFAULT_ENV_ID: str = f'{DEFAULT_ENV_ID_PREFIX}{str(uuid4())[:DEFAULT_ENV_ID_LEN]}' + +# All test file names will contain this string. +TEST_FILE_IDENTIFIER: str = 'example' + +INVALID_ENV_ID_MSG: str = ( + 'In order to maximize compatibility, the SYSTEM_TESTS_ENV_ID must be an alphanumeric string ' + 'which starts with a letter. Please see `tests/system/providers/amazon/aws/README.md`.' +) +LOWERCASE_ENV_ID_MSG: str = ( + 'The provided Environment ID contains uppercase letters and ' + 'will be converted to lowercase for the AWS System Tests.' +) +NO_VALUE_MSG: str = 'No Value Found: Variable {key} could not be found and no default value was provided.' + + +def _get_test_name() -> str: + """ + Extracts the module name from the test module. + + :return: The name of the test module that called the helper method. + """ + # The exact layer of the stack will depend on if this is called directly + # or from another helper, but the test will always contain the identifier. + test_filename: str = [ + frame.filename for frame in inspect.stack() if TEST_FILE_IDENTIFIER in frame.filename + ][0] + return splitext(basename(test_filename))[0] + + +def _validate_env_id(env_id: str) -> str: + """ + Verifies that a prospective Environment ID value fits requirements. + An Environment ID for an AWS System test must be a lowercase alphanumeric + string which starts with a letter. + + :param env_id: An Environment ID to validate. + :return: A validated string cast to lowercase. + """ + if any(char.isupper() for char in str(env_id)): + print(LOWERCASE_ENV_ID_MSG) + if not env_id.isalnum() or not env_id[0].isalpha(): + raise ValueError(INVALID_ENV_ID_MSG) + + return env_id.lower() + + +def _fetch_from_ssm(key: str) -> str: + """ + Test values are stored in the SSM Value as a JSON-encoded dict of key/value pairs. + + :param key: The key to search for within the returned Parameter Value. + :return: The value of the provided key from SSM + """ + test_name: str = _get_test_name() + ssm_client: BaseClient = boto3.client('ssm') + value: str = '' + + try: + value = json.loads(ssm_client.get_parameter(Name=test_name)['Parameter']['Value'])[key] + # Since a default value after the SSM check is allowed, these exceptions should not stop execution. + except NoCredentialsError: + # No boto credentials found. + pass + except ssm_client.exceptions.ParameterNotFound: + # SSM does not contain any values for this test. + pass + except KeyError: + # SSM contains values for this test, but not the requested value. + pass + return value + + +def fetch_variable(key: str, default_value: Optional[str] = None) -> str: + """ + Given a Parameter name: first check for an existing Environment Variable, + then check SSM for a value. If neither are available, fall back on the + optional default value. + + :param key: The name of the Parameter to fetch a value for. + :param default_value: The default value to use if no value can be found. + :return: The value of the parameter. + """ + + value: Optional[str] = os.getenv(key, _fetch_from_ssm(key)) or default_value + if not value: + raise ValueError(NO_VALUE_MSG.format(key=key)) + return value + + +def set_env_id() -> str: + """ + Retrieves or generates an Environment ID, validate that it is suitable, + export it as an Environment Variable, and return it. + + If an Environment ID has already been generated, use that. + Otherwise, try to fetch it and export it as an Environment Variable. + If there is not one available to fetch then generate one and export it as an Environment Variable. + + :return: A valid System Test Environment ID. + """ + env_id: str = fetch_variable(ENV_ID_ENVIRON_KEY, DEFAULT_ENV_ID) + env_id = _validate_env_id(env_id) + + os.environ[ENV_ID_ENVIRON_KEY] = env_id + return env_id