Skip to content

Commit

Permalink
Get Airflow Variables from Hashicorp Vault (apache#7944)
Browse files Browse the repository at this point in the history
(cherry picked from commit c1c88ab)
  • Loading branch information
kaxil committed Apr 1, 2020
1 parent ad095b8 commit 9f481d3
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 13 deletions.
42 changes: 36 additions & 6 deletions airflow/contrib/secrets/hashicorp_vault.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
# specific language governing permissions and limitations
# under the License.
"""
Objects relating to sourcing connections from Hashicorp Vault
Objects relating to sourcing connections & variables from Hashicorp Vault
"""
from typing import Optional

Expand All @@ -31,7 +31,7 @@

class VaultBackend(BaseSecretsBackend, LoggingMixin):
"""
Retrieves Connection object from Hashicorp Vault
Retrieves Connections and Variables from Hashicorp Vault
Configurable via ``airflow.cfg`` as follows:
Expand All @@ -50,7 +50,11 @@ class VaultBackend(BaseSecretsBackend, LoggingMixin):
conn_id ``smtp_default``.
:param connections_path: Specifies the path of the secret to read to get Connections.
(default: 'connections')
:type connections_path: str
:param variables_path: Specifies the path of the secret to read to get Variables.
(default: 'variables')
:type variables_path: str
:param url: Base URL for the Vault instance being addressed.
:type url: str
:param auth_type: Authentication Type for Vault (one of 'token', 'ldap', 'userpass', 'approle',
Expand Down Expand Up @@ -78,7 +82,8 @@ class VaultBackend(BaseSecretsBackend, LoggingMixin):
"""
def __init__( # pylint: disable=too-many-arguments
self,
connections_path, # type: str
connections_path='connections', # type: str
variables_path='variables', # type: str
url=None, # type: Optional[str]
auth_type='token', # type: str
mount_point='secret', # type: str
Expand All @@ -94,6 +99,7 @@ def __init__( # pylint: disable=too-many-arguments
):
super(VaultBackend, self).__init__(**kwargs)
self.connections_path = connections_path.rstrip('/')
self.variables_path = variables_path.rstrip('/')
self.url = url
self.auth_type = auth_type
self.kwargs = kwargs
Expand Down Expand Up @@ -152,7 +158,31 @@ def get_conn_uri(self, conn_id):
:param conn_id: connection id
:type conn_id: str
"""
secret_path = self.build_path(self.connections_path, conn_id)
response = self._get_secret(self.connections_path, conn_id)
return response.get("conn_uri") if response else None

def get_variable(self, key):
# type: (str) -> Optional[str]
"""
Get Airflow Variable from Environment Variable
:param key: Variable Key
:return: Variable Value
"""
response = self._get_secret(self.variables_path, key)
return response.get("value") if response else None

def _get_secret(self, path_prefix, secret_id):
# type: (str, str) -> Optional[dict]
"""
Get secret value from Vault.
:param path_prefix: Prefix for the Path to get Secret
:type path_prefix: str
:param secret_id: Secret Key
:type secret_id: str
"""
secret_path = self.build_path(path_prefix, secret_id)

try:
if self.kv_engine_version == 1:
Expand All @@ -163,8 +193,8 @@ def get_conn_uri(self, conn_id):
response = self.client.secrets.kv.v2.read_secret_version(
path=secret_path, mount_point=self.mount_point)
except InvalidPath:
self.log.info("Connection ID %s not found in Path: %s", conn_id, secret_path)
self.log.info("Secret %s not found in Path: %s", secret_id, secret_path)
return None

return_data = response["data"] if self.kv_engine_version == 1 else response["data"]["data"]
return return_data.get("conn_uri")
return return_data
50 changes: 43 additions & 7 deletions docs/howto/use-alternative-secrets-backend.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@
Alternative secrets backend
---------------------------

In addition to retrieving connections from environment variables or the metastore database, you can enable
an alternative secrets backend to retrieve connections,
In addition to retrieving connections & variables from environment variables or the metastore database, you can enable
an alternative secrets backend to retrieve Airflow connections or Airflow variables,
such as :ref:`AWS SSM Parameter Store <ssm_parameter_store_secrets>`,
:ref:`Hashicorp Vault Secrets<hashicorp_vault_secrets>` or you can :ref:`roll your own <roll_your_own_secrets_backend>`.

Search path
^^^^^^^^^^^
When looking up a connection, by default Airflow will search environment variables first and metastore
When looking up a connection/variable, by default Airflow will search environment variables first and metastore
database second.

If you enable an alternative secrets backend, it will be searched first, followed by environment variables,
Expand Down Expand Up @@ -81,7 +81,7 @@ of the connection object.
Hashicorp Vault Secrets Backend
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

To enable Hashicorp vault to retrieve connection, specify :py:class:`~airflow.contrib.secrets.hashicorp_vault.VaultBackend`
To enable Hashicorp vault to retrieve Airflow connection/variable, specify :py:class:`~airflow.contrib.secrets.hashicorp_vault.VaultBackend`
as the ``backend`` in ``[secrets]`` section of ``airflow.cfg``.

Here is a sample configuration:
Expand All @@ -90,7 +90,7 @@ Here is a sample configuration:
[secrets]
backend = airflow.contrib.secrets.hashicorp_vault.VaultBackend
backend_kwargs = {"connections_path": "connections", "mount_point": "airflow", "url": "http://127.0.0.1:8200"}
backend_kwargs = {"connections_path": "connections", "variables_path": "variables", "mount_point": "airflow", "url": "http://127.0.0.1:8200"}
The default KV version engine is ``2``, pass ``kv_engine_version: 1`` in ``backend_kwargs`` if you use
KV Secrets Engine Version ``1``.
Expand All @@ -105,14 +105,18 @@ key to ``backend_kwargs``:
export VAULT_ADDR="http://127.0.0.1:8200"
Storing and Retrieving Connections
""""""""""""""""""""""""""""""""""

If you have set ``connections_path`` as ``connections`` and ``mount_point`` as ``airflow``, then for a connection id of
``smtp_default``, you would want to store your secret as:

.. code-block:: bash
vault kv put airflow/connections/smtp_default conn_uri=smtps://user:[email protected]:465
Note that the ``key`` is ``conn_uri``, ``value`` is ``postgresql://airflow:airflow@host:5432/airflow`` and
Note that the ``Key`` is ``conn_uri``, ``Value`` is ``postgresql://airflow:airflow@host:5432/airflow`` and
``mount_point`` is ``airflow``.

You can make a ``mount_point`` for ``airflow`` as follows:
Expand Down Expand Up @@ -140,7 +144,39 @@ Verify that you can get the secret from ``vault``:
conn_uri smtps://user:[email protected]:465
The value of the Vault key must be the :ref:`connection URI representation <generating_connection_uri>`
of the connection object.
of the connection object to get connection.

Storing and Retrieving Variables
""""""""""""""""""""""""""""""""

If you have set ``variables_path`` as ``variables`` and ``mount_point`` as ``airflow``, then for a variable with
``hello`` as key, you would want to store your secret as:

.. code-block:: bash
vault kv put airflow/variables/hello value=world
Verify that you can get the secret from ``vault``:

.. code-block:: console
❯ vault kv get airflow/variables/hello
====== Metadata ======
Key Value
--- -----
created_time 2020-03-28T02:10:54.301784Z
deletion_time n/a
destroyed false
version 1
==== Data ====
Key Value
--- -----
value world
Note that the secret ``Key`` is ``value``, and secret ``Value`` is ``world`` and
``mount_point`` is ``airflow``.


.. _secrets_manager_backend:

Expand Down
88 changes: 88 additions & 0 deletions tests/contrib/secrets/test_hashicorp_vault.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,94 @@ def test_get_conn_uri_non_existent_key(self, mock_hvac):
mount_point='airflow', path='connections/test_mysql')
self.assertEqual([], test_client.get_connections(conn_id="test_mysql"))

@mock.patch("airflow.contrib.secrets.hashicorp_vault.hvac")
def test_get_variable_value(self, mock_hvac):
mock_client = mock.MagicMock()
mock_hvac.Client.return_value = mock_client
mock_client.secrets.kv.v2.read_secret_version.return_value = {
'request_id': '2d48a2ad-6bcb-e5b6-429d-da35fdf31f56',
'lease_id': '',
'renewable': False,
'lease_duration': 0,
'data': {'data': {'value': 'world'},
'metadata': {'created_time': '2020-03-28T02:10:54.301784Z',
'deletion_time': '',
'destroyed': False,
'version': 1}},
'wrap_info': None,
'warnings': None,
'auth': None
}

kwargs = {
"variables_path": "variables",
"mount_point": "airflow",
"auth_type": "token",
"url": "http://127.0.0.1:8200",
"token": "s.7AU0I51yv1Q1lxOIg1F3ZRAS"
}

test_client = VaultBackend(**kwargs)
returned_uri = test_client.get_variable("hello")
self.assertEqual('world', returned_uri)

@mock.patch("airflow.contrib.secrets.hashicorp_vault.hvac")
def test_get_variable_value_engine_version_1(self, mock_hvac):
mock_client = mock.MagicMock()
mock_hvac.Client.return_value = mock_client
mock_client.secrets.kv.v1.read_secret.return_value = {
'request_id': '182d0673-618c-9889-4cba-4e1f4cfe4b4b',
'lease_id': '',
'renewable': False,
'lease_duration': 2764800,
'data': {'value': 'world'},
'wrap_info': None,
'warnings': None,
'auth': None}

kwargs = {
"variables_path": "variables",
"mount_point": "airflow",
"auth_type": "token",
"url": "http://127.0.0.1:8200",
"token": "s.7AU0I51yv1Q1lxOIg1F3ZRAS",
"kv_engine_version": 1
}

test_client = VaultBackend(**kwargs)
returned_uri = test_client.get_variable("hello")
mock_client.secrets.kv.v1.read_secret.assert_called_once_with(
mount_point='airflow', path='variables/hello')
self.assertEqual('world', returned_uri)

@mock.patch.dict('os.environ', {
'AIRFLOW_VAR_HELLO': 'world',
})
@mock.patch("airflow.contrib.secrets.hashicorp_vault.hvac")
def test_get_variable_value_non_existent_key(self, mock_hvac):
"""
Test that if the key with connection ID is not present in Vault, VaultClient.get_connections
should return None
"""
mock_client = mock.MagicMock()
mock_hvac.Client.return_value = mock_client
# Response does not contain the requested key
mock_client.secrets.kv.v2.read_secret_version.side_effect = InvalidPath()

kwargs = {
"variables_path": "variables",
"mount_point": "airflow",
"auth_type": "token",
"url": "http://127.0.0.1:8200",
"token": "s.7AU0I51yv1Q1lxOIg1F3ZRAS"
}

test_client = VaultBackend(**kwargs)
self.assertIsNone(test_client.get_variable("hello"))
mock_client.secrets.kv.v2.read_secret_version.assert_called_once_with(
mount_point='airflow', path='variables/hello')
self.assertIsNone(test_client.get_variable("hello"))

@mock.patch("airflow.contrib.secrets.hashicorp_vault.hvac")
def test_auth_failure_raises_error(self, mock_hvac):
mock_client = mock.MagicMock()
Expand Down

0 comments on commit 9f481d3

Please sign in to comment.