-
Notifications
You must be signed in to change notification settings - Fork 14.2k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add support for AWS Secrets Manager as Secrets Backend (#8186)
Allow retrieving Airflow Connections and Variables from AWS Secrets Manager (https://aws.amazon.com/secrets-manager/)
- Loading branch information
Showing
3 changed files
with
273 additions
and
0 deletions.
There are no files selected for viewing
125 changes: 125 additions & 0 deletions
125
airflow/providers/amazon/aws/secrets/secrets_manager.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
# | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
""" | ||
Objects relating to sourcing secrets from AWS Secrets Manager | ||
""" | ||
|
||
from typing import Optional | ||
|
||
import boto3 | ||
from cached_property import cached_property | ||
|
||
from airflow.secrets import BaseSecretsBackend | ||
from airflow.utils.log.logging_mixin import LoggingMixin | ||
|
||
|
||
class SecretsManagerBackend(BaseSecretsBackend, LoggingMixin): | ||
""" | ||
Retrieves Connection or Variables from AWS Secrets Manager | ||
Configurable via ``airflow.cfg`` like so: | ||
.. code-block:: ini | ||
[secrets] | ||
backend = airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend | ||
backend_kwargs = {"connections_prefix": "airflow/connections"} | ||
For example, if secrets prefix is ``airflow/connections/smtp_default``, this would be accessible | ||
if you provide ``{"connections_prefix": "airflow/connections"}`` and request conn_id ``smtp_default``. | ||
And if variables prefix is ``airflow/variables/hello``, this would be accessible | ||
if you provide ``{"variables_prefix": "airflow/variables"}`` and request variable key ``hello``. | ||
You can also pass additional keyword arguments like ``aws_secret_access_key``, ``aws_access_key_id`` | ||
or ``region_name`` to this class and they would be passed on to Boto3 client. | ||
:param connections_prefix: Specifies the prefix of the secret to read to get Connections. | ||
:type connections_prefix: str | ||
:param variables_prefix: Specifies the prefix of the secret to read to get Variables. | ||
:type variables_prefix: str | ||
:param profile_name: The name of a profile to use. If not given, then the default profile is used. | ||
:type profile_name: str | ||
:param sep: separator used to concatenate secret_prefix and secret_id. Default: "/" | ||
:type sep: str | ||
""" | ||
|
||
def __init__( | ||
self, | ||
connections_prefix: str = 'airflow/connections', | ||
variables_prefix: str = 'airflow/variables', | ||
profile_name: Optional[str] = None, | ||
sep: str = "/", | ||
**kwargs | ||
): | ||
super().__init__(**kwargs) | ||
self.connections_prefix = connections_prefix.rstrip("/") | ||
self.variables_prefix = variables_prefix.rstrip('/') | ||
self.profile_name = profile_name | ||
self.sep = sep | ||
self.kwargs = kwargs | ||
|
||
@cached_property | ||
def client(self): | ||
""" | ||
Create a Secrets Manager client | ||
""" | ||
session = boto3.session.Session( | ||
profile_name=self.profile_name, | ||
) | ||
return session.client(service_name="secretsmanager", **self.kwargs) | ||
|
||
def get_conn_uri(self, conn_id: str) -> Optional[str]: | ||
""" | ||
Get Connection Value | ||
:param conn_id: connection id | ||
:type conn_id: str | ||
""" | ||
return self._get_secret(self.connections_prefix, conn_id) | ||
|
||
def get_variable(self, key: str) -> Optional[str]: | ||
""" | ||
Get Airflow Variable from Environment Variable | ||
:param key: Variable Key | ||
:return: Variable Value | ||
""" | ||
return self._get_secret(self.variables_prefix, key) | ||
|
||
def _get_secret(self, path_prefix: str, secret_id: str) -> Optional[str]: | ||
""" | ||
Get secret value from Secrets Manager | ||
:param path_prefix: Prefix for the Path to get Secret | ||
:type path_prefix: str | ||
:param secret_id: Secret Key | ||
:type secret_id: str | ||
""" | ||
secrets_path = self.build_path(path_prefix, secret_id, self.sep) | ||
try: | ||
response = self.client.get_secret_value( | ||
SecretId=secrets_path, | ||
) | ||
return response.get('SecretString') | ||
except self.client.exceptions.ResourceNotFoundException: | ||
self.log.debug( | ||
"An error occurred (ResourceNotFoundException) when calling the " | ||
"get_secret_value operation: " | ||
"Secret %s not found.", secrets_path | ||
) | ||
return None |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -87,6 +87,62 @@ you would want to store your Variable at ``/airflow/variables/hello``. | |
|
||
Optionally you can supply a profile name to reference aws profile, e.g. defined in ``~/.aws/config``. | ||
|
||
AWS Secrets Manager Backend | ||
^^^^^^^^^^^^^^^^^^^^^^^^^^^^ | ||
|
||
To enable Secrets Manager, specify :py:class:`~airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend` | ||
as the ``backend`` in ``[secrets]`` section of ``airflow.cfg``. | ||
|
||
Here is a sample configuration: | ||
|
||
.. code-block:: ini | ||
[secrets] | ||
backend = airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend | ||
backend_kwargs = {"connections_prefix": "airflow/connections", "variables_prefix": "airflow/variables", "profile_name": "default"} | ||
To authenticate you can either supply a profile name to reference aws profile, e.g. defined in ``~/.aws/config`` or set | ||
environment variables like ``AWS_ACCESS_KEY_ID``, ``AWS_SECRET_ACCESS_KEY``. | ||
|
||
|
||
Storing and Retrieving Connections | ||
"""""""""""""""""""""""""""""""""" | ||
|
||
If you have set ``connections_prefix`` as ``airflow/connections``, then for a connection id of ``smtp_default``, | ||
you would want to store your connection at ``airflow/connections/smtp_default``. | ||
|
||
Example: | ||
|
||
.. code-block:: bash | ||
aws secretsmanager put-secret-value --secret-id airflow/connections/smtp_default --secret-string "smtps://user:[email protected]:465" | ||
Verify that you can get the secret: | ||
|
||
.. code-block:: console | ||
❯ aws secretsmanager get-secret-value --secret-id airflow/connections/smtp_default | ||
{ | ||
"ARN": "arn:aws:secretsmanager:us-east-2:314524341751:secret:airflow/connections/smtp_default-7meuul", | ||
"Name": "airflow/connections/smtp_default", | ||
"VersionId": "34f90eff-ea21-455a-9c8f-5ee74b21be672", | ||
"SecretString": "smtps://user:[email protected]:465", | ||
"VersionStages": [ | ||
"AWSCURRENT" | ||
], | ||
"CreatedDate": "2020-04-08T02:10:35.132000+01:00" | ||
} | ||
The value of the secret must be the :ref:`connection URI representation <generating_connection_uri>` | ||
of the connection object. | ||
|
||
Storing and Retrieving Variables | ||
"""""""""""""""""""""""""""""""" | ||
|
||
If you have set ``variables_prefix`` as ``airflow/variables``, then for an Variable key of ``hello``, | ||
you would want to store your Variable at ``airflow/variables/hello``. | ||
|
||
|
||
.. _hashicorp_vault_secrets: | ||
|
||
Hashicorp Vault Secrets Backend | ||
|
92 changes: 92 additions & 0 deletions
92
tests/providers/amazon/aws/secrets/test_secrets_manager.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
# Licensed to the Apache Software Foundation (ASF) under one | ||
# or more contributor license agreements. See the NOTICE file | ||
# distributed with this work for additional information | ||
# regarding copyright ownership. The ASF licenses this file | ||
# to you under the Apache License, Version 2.0 (the | ||
# "License"); you may not use this file except in compliance | ||
# with the License. You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, | ||
# software distributed under the License is distributed on an | ||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
|
||
from unittest import TestCase, mock | ||
|
||
from moto import mock_secretsmanager | ||
|
||
from airflow.providers.amazon.aws.secrets.secrets_manager import SecretsManagerBackend | ||
|
||
|
||
class TestSecretsManagerBackend(TestCase): | ||
@mock.patch("airflow.providers.amazon.aws.secrets.secrets_manager." | ||
"SecretsManagerBackend.get_conn_uri") | ||
def test_aws_secrets_manager_get_connections(self, mock_get_uri): | ||
mock_get_uri.return_value = "scheme://user:pass@host:100" | ||
conn_list = SecretsManagerBackend().get_connections("fake_conn") | ||
conn = conn_list[0] | ||
assert conn.host == 'host' | ||
|
||
@mock_secretsmanager | ||
def test_get_conn_uri(self): | ||
param = { | ||
'SecretId': 'airflow/connections/test_postgres', | ||
'SecretString': 'postgresql://airflow:airflow@host:5432/airflow' | ||
} | ||
|
||
secrets_manager_backend = SecretsManagerBackend() | ||
secrets_manager_backend.client.put_secret_value(**param) | ||
|
||
returned_uri = secrets_manager_backend.get_conn_uri(conn_id="test_postgres") | ||
self.assertEqual('postgresql://airflow:airflow@host:5432/airflow', returned_uri) | ||
|
||
@mock_secretsmanager | ||
def test_get_conn_uri_non_existent_key(self): | ||
""" | ||
Test that if the key with connection ID is not present, | ||
SecretsManagerBackend.get_connections should return None | ||
""" | ||
conn_id = "test_mysql" | ||
param = { | ||
'SecretId': 'airflow/connections/test_postgres', | ||
'SecretString': 'postgresql://airflow:airflow@host:5432/airflow' | ||
} | ||
|
||
secrets_manager_backend = SecretsManagerBackend() | ||
secrets_manager_backend.client.put_secret_value(**param) | ||
|
||
self.assertIsNone(secrets_manager_backend.get_conn_uri(conn_id=conn_id)) | ||
self.assertEqual([], secrets_manager_backend.get_connections(conn_id=conn_id)) | ||
|
||
@mock_secretsmanager | ||
def test_get_variable(self): | ||
param = { | ||
'SecretId': 'airflow/variables/hello', | ||
'SecretString': 'world' | ||
} | ||
|
||
secrets_manager_backend = SecretsManagerBackend() | ||
secrets_manager_backend.client.put_secret_value(**param) | ||
|
||
returned_uri = secrets_manager_backend.get_variable('hello') | ||
self.assertEqual('world', returned_uri) | ||
|
||
@mock_secretsmanager | ||
def test_get_variable_non_existent_key(self): | ||
""" | ||
Test that if Variable key is not present, | ||
SystemsManagerParameterStoreBackend.get_variables should return None | ||
""" | ||
param = { | ||
'SecretId': 'airflow/variables/hello', | ||
'SecretString': 'world' | ||
} | ||
|
||
secrets_manager_backend = SecretsManagerBackend() | ||
secrets_manager_backend.client.put_secret_value(**param) | ||
|
||
self.assertIsNone(secrets_manager_backend.get_variable("test_mysql")) |