Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for Google Secret Manager for issue 543 #578

Merged
merged 5 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
243 changes: 243 additions & 0 deletions plugins/lookup/gcp_secret_manager.py
Copy link

@tze-dev tze-dev Jun 22, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for implementing this - it's awesome to see this feature being worked on.

In terms of the code for this lookup, is there a reason of not leveraging ansible_collections.google.cloud.plugins.module_utils.gcp_utils as a helper for handling the authentication workflow? IMO it would simplify the code and also gain the benefit of being able to use the new OAUTH token as well - recently added by this PR - #574.

I have used it in my private Collection and tested working fine. Code snippet here - you can add the additional env handling, but the heavy lifting of the authentication workflows and API requests are taken care of by gcp_utils

try:
    import os
    import requests
    import json
    import base64
except ImportError:
    pass

try:
    from ansible_collections.google.cloud.plugins.module_utils.gcp_utils import (
        GcpSession,
    )
    HAS_GOOGLE_CLOUD_COLLECTION = True
except ImportError:
    HAS_GOOGLE_CLOUD_COLLECTION = False

display = Display()

class GcpMockModule(object):
    def __init__(self, params):
        self.params = params

    def fail_json(self, *args, **kwargs):
        raise AnsibleError(kwargs["msg"])

    def raise_for_status(self, response):
        try:
            response.raise_for_status()
        except getattr(requests.exceptions, "RequestException"):
            self.fail_json(msg="GCP returned error: %s" % response.json())


class GcpSecretLookup:
    def run(self, variables=None, **kwargs):
        params = {
            "project": kwargs.get("project", None),
            "secret": kwargs.get("secret", None),
            "version": kwargs.get("version", "latest"),
            "auth_kind": kwargs.get("auth_kind", None),
            "service_account_file": kwargs.get("service_account_file", None),
            "service_account_email": kwargs.get("service_account_email", None),
            "access_token": kwargs.get("access_token", None), # added for https://github.com/ansible-collections/google.cloud/pull/574
            "scopes": kwargs.get("scopes", None),
        }
        if not params["scopes"]:
            params["scopes"] = ["https://www.googleapis.com/auth/cloud-platform"]
        fake_module = GcpMockModule(params)
        result = self.get_secret(fake_module)
        return [base64.b64decode(result)]

    def get_secret(self, module):
        auth = GcpSession(module, "secretmanager")
        url = "https://secretmanager.googleapis.com/v1/projects/{project}/secrets/{secret}/versions/{version}:access".format(
            **module.params
        )
        response = auth.get(url)
        return response.json()['payload']['data']

class LookupModule(LookupBase):
    def run(self, terms, variables=None, **kwargs):
        if not HAS_GOOGLE_CLOUD_COLLECTION:
            raise AnsibleError(
                "gcp_secret lookup needs a supported version of the google.cloud collection installed. Use `ansible-galaxy collection install google.cloud` to install it"
            )
        return GcpSecretLookup().run(terms, variables=variables, **kwargs)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you so much for this suggestion. I'll integrate these changes.

Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later

from __future__ import (absolute_import, division, print_function)
__metaclass__ = type

DOCUMENTATION = '''
author:
- Dave Costakos <[email protected]>
name: gcp_secret_manager
short_description: Get Secrets from Google Cloud as a Lookup plugin
description:
- retrieve secret keys in Secret Manager for use in playbooks
- see https://cloud.google.com/iam/docs/service-account-creds for details on creating
credentials for Google Cloud and the format of such credentials
- once a secret value is retreived, it is returned decoded. It is up to the developer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- once a secret value is retreived, it is returned decoded. It is up to the developer
- once a secret value is retrieved, it is returned decoded. It is up to the developer

to maintain secrecy of this value once returned.

options:
key:
description:
- the name of the secret to look up in Secret Manager
type: str
required: True
aliases:
- name
- secret
- secret_id
project:
description:
- The name of the google cloud project
- defaults to OS env variable GCP_PROJECT if not present
type: str
auth_kind:
description:
- the type of authentication to use with Google Cloud (i.e. serviceaccount or machineaccount)
- defaults to OS env variable GCP_AUTH_KIND if not present
type: str
version:
description:
- the version name of your secret to retrieve
type: str
default: latest
required: False
service_account_email:
description:
- email associated with the service account
- defaults to OS env variable GCP_SERVICE_ACCOUNT_EMAIL if not present
type: str
required: False
service_account_file:
description:
- JSON Credential file obtained from Google Cloud
- defaults to OS env variable GCP_SERVICE_ACCOUNT_FILE if not present
- see https://cloud.google.com/iam/docs/service-account-creds for details
type: str
required: False
service_account_info:
description:
- JSON Object representing the contents of a service_account_file obtained from Google Cloud
- defaults to OS env variable GCP_SERVICE_ACCOUNT_INFO if not present
type: jsonarg
required: False
access_token:
description:
- support for GCP Access Token
- defaults to OS env variable GCP_ACCESS_TOKEN if not present
type: str
required: False
on_error:
description:
- how to handle errors
- strict means raise an exception
- warn means warn, and return none
- ignore means just return none
type: str
required: False
choices:
- 'strict'
- 'warn'
- 'ignore'
default: 'strict'
scopes:
description:
- Authenticaiton scopes for Google Secret Manager
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- Authenticaiton scopes for Google Secret Manager
- Authentication scopes for Google Secret Manager

type: list
default: ["https://www.googleapis.com/auth/cloud-platform"]
'''

EXAMPLES = '''
- name: Test secret using env variables for credentials
ansible.builtin.debug:
msg: "{{ lookup('google.cloud.gcp_secret_manager', key='secret_key') }}"

- name: Test secret using explicit credentials
ansible.builtin.debug:
msg: "{{ lookup('google.cloud.gcp_secret_manager', key='secret_key', project='project', auth_kind='serviceaccount', service_account_file='file.json') }}"

- name: Test getting specific version of a secret (old version)
ansible.builtin.debug:
msg: "{{ lookup('google.cloud.gcp_secret_manager', key='secret_key', version='1') }}"

- name: Test getting specific version of a secret (new version)
ansible.builtin.debug:
msg: "{{ lookup('google.cloud.gcp_secret_manager', key='secret_key', version='2') }}"
'''

RETURN = '''
_raw:
description: the contents of the secret requested (please use "no_log" to not expose this secret)
type: list
elements: str
'''

################################################################################
# Imports
################################################################################

import os
import base64

from ansible.plugins.lookup import LookupBase
from ansible.errors import AnsibleError
from ansible.utils.display import Display

try:
import requests
HAS_REQUESTS = True
except ImportError:
HAS_REQUESTS = False

try:
from ansible_collections.google.cloud.plugins.module_utils.gcp_utils import (
GcpSession,
)
HAS_GOOGLE_CLOUD_COLLECTION = True
except ImportError:
HAS_GOOGLE_CLOUD_COLLECTION = False


class GcpLookupException(Exception):
pass


class GcpMockModule(object):
def __init__(self, params):
self.params = params

def fail_json(self, *args, **kwargs):
raise AnsibleError(kwargs["msg"])

def raise_for_status(self, response):
try:
response.raise_for_status()
except getattr(requests.exceptions, "RequestException"):
self.fail_json(msg="GCP returned error: %s" % response.json())


class LookupModule(LookupBase):
def run(self, terms=None, variables=None, **kwargs):
self._display = Display()
if not HAS_GOOGLE_CLOUD_COLLECTION:
raise AnsibleError(
"""gcp_secret lookup needs a supported version of the google.cloud
collection installed. Use `ansible-galaxy collection install google.cloud`
to install it"""
)
self.set_options(var_options=variables, direct=kwargs)
params = {
"key": self.get_option("key"),
"version": self.get_option("version"),
"access_token": self.get_option("access_token"),
"scopes": self.get_option("scopes"),
"on_error": self.get_option("on_error")
}

params['name'] = params['key']

# support GCP_* env variables for some parameters
for param in ["project", "auth_kind", "service_account_file", "service_account_info", "service_account_email", "access_token"]:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you're now using gcp_utils to handle authentication, this part is no longer needed here. This is because in gcp_utils there is already mechanism handling the fallback to env.

params[param] = self.fallback_from_env(param)

self._display.vvv(msg=f"Module Parameters: {params}")
fake_module = GcpMockModule(params)
result = self.get_secret(fake_module)
return [base64.b64decode(result)]

def fallback_from_env(self, arg):
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per above - this is already being handled by gcp_utils

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once again, thank you for reviewing my code. Much appreciated.

There is indeed that functionality within the GcpModule class (in the init method for example) here . however, in your example, and the one I committed, we don't actually use that class only GcpSession.

I don't believe I can reuse the GcpModule class here in my lookup plugin. I did actually try this for grins and I got this error when attempting to call my super class constructor:
TypeError: AnsibleModule.__init__() got multiple values for argument 'argument_spec'

Not sure this is possible? What do you think?

And once again, thank you so much.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dcostakos you're right. Sorry, I got mixed up between the module and lookup helper as I just created a PR for a new module.
Your env handling is absolutely fine.

Thanks for doing this. Hope your PR gets merged soon, and we can all start using the default GCP collection rather than maintaining separate collections.

if self.get_option(arg):
return self.get_option(arg)
else:
env_name = f"GCP_{arg.upper()}"
if env_name in os.environ:
self.set_option(arg, os.environ[env_name])
return self.get_option(arg)

# set version to the latest version because
# we can't be sure that "latest" is always going
# to be set if secret versions get disabled
# see https://issuetracker.google.com/issues/286489671
def get_latest_version(self, module, auth):
url = "https://secretmanager.googleapis.com/v1/projects/{project}/secrets/{name}/versions?filter=state:ENABLED".format(
**module.params
)
response = auth.get(url)
self._display.vvv(msg=f"List Version Response: {response.status_code} for {response.request.url}: {response.json()}")
if response.status_code != 200:
self.raise_error(module, f"unable to list versions of secret {response.status_code}")
version_list = response.json()
if "versions" in version_list:
return sorted(version_list['versions'], key=lambda d: d['name'])[-1]['name'].split('/')[-1]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not return the latest version when you have more than 10. The problem is that it sorts the paths before getting the version, so somepath/12 is smaller than somepath/9, so it always returns 9 as the latest version.

Suggested change
return sorted(version_list['versions'], key=lambda d: d['name'])[-1]['name'].split('/')[-1]
versions_numbers = []
for version in version_list['versions']:
versions_numbers.append(version['name'].split('/')[-1])
return sorted(versions_numbers, key=int)[-1]

else:
self.raise_error(module, f"Unable to list secret versions via {response.request.url}: {response.json()}")

def raise_error(self, module, msg):
if module.params['on_error'] == 'strict':
raise GcpLookupException(msg)
elif module.params['on_error'] == 'warn':
self._display.warning(msg)

return None

def get_secret(self, module):
auth = GcpSession(module, "secretmanager")
if module.params['version'] == "latest":
module.params['calc_version'] = self.get_latest_version(module, auth)
else:
module.params['calc_version'] = module.params['version']

# there was an error listing secret versions
if module.params['calc_version'] is None:
return ''

url = "https://secretmanager.googleapis.com/v1/projects/{project}/secrets/{name}/versions/{calc_version}:access".format(
**module.params
)
response = auth.get(url)
self._display.vvv(msg=f"Response: {response.status_code} for {response.request.url}: {response.json()}")
if response.status_code != 200:
self.raise_error(module, f"Failed to lookup secret value via {response.request.url} {response.status_code}")
return ''

return response.json()['payload']['data']
Loading