Skip to content

Commit

Permalink
feat(certbot dns plugin): remove basic auth authentication and clean …
Browse files Browse the repository at this point in the history
…up [DNS-1270]
  • Loading branch information
zak905 committed Apr 15, 2024
1 parent 71f1d28 commit 0df37a7
Showing 1 changed file with 9 additions and 37 deletions.
46 changes: 9 additions & 37 deletions certbot_dns_ionos/ionos.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,20 @@
import logging

import requests
from base64 import b64encode

from certbot import errors
from certbot.plugins import dns_common
from certbot.plugins.dns_common import CredentialsConfiguration

logger = logging.getLogger(__name__)

dns_api_base_url = "https://dns.de-fra.ionos.com"
dns_api_base_url = "https://dev.dns.de-fra.ionos.com"


class Authenticator(dns_common.DNSAuthenticator):
"""DNS Authenticator for IONOS
This Authenticator uses the IONOS API to fulfill a dns-01 challenge.
"""

def __init__(self, *args, **kwargs):
super(Authenticator, self).__init__(*args, **kwargs)
self.credentials = None
Expand All @@ -40,28 +37,10 @@ def more_info(self): # pylint: disable=missing-docstring,no-self-use
def _setup_credentials(self):
self.credentials = self._configure_credentials(
"credentials",
"IONOS API credentials INI file. Either basic authentication (username and password)"
+ "or a token should be provided.",
None,
validator=self._validate_credentials
"IONOS API credentials INI file. Only Bearer token authentication is supported",
{"token":"access token for the IONOS API"},
)

def _validate_credentials(self, credentials: CredentialsConfiguration) -> None:
token = credentials.conf('token')
username = credentials.conf('username')
password = credentials.conf('password')

if token:
if username or password:
raise errors.PluginError('{}: ionos_username and ionos_password are '
'not needed when using ionos_token'
.format(credentials.confobj.filename))
elif not username or not password:
raise errors.PluginError('{}: ionos_username and ionos_password are required when '
'ionos_token is not provided'
.format(credentials.confobj.filename))


def _perform(self, domain, validation_name, validation):
self._get_ionos_client().add_txt_record(
domain, validation_name, validation
Expand All @@ -74,9 +53,7 @@ def _cleanup(self, domain, validation_name, validation):

def _get_ionos_client(self):
return _IONOSClient(
self.credentials.conf("token"),
self.credentials.conf("username"),
self.credentials.conf("password"),
self.credentials.conf("token")
)


Expand All @@ -85,12 +62,9 @@ class _IONOSClient(object):
Encapsulates all communication with the IONOS Cloud DNS API.
"""

def __init__(self, token, username, password):
def __init__(self, token:str):
logger.debug("creating IONOS Client")
if token:
self.headers = {"Authorization": "Bearer " + token}
else:
self.headers = {"Authorization": "Basic " + b64encode(username+":"+password)}
self.headers = {"Authorization": "Bearer " + token}


def _handle_response(self, resp: requests.Response):
Expand All @@ -105,9 +79,6 @@ def _handle_response(self, resp: requests.Response):
"API response with non JSON: {0}".format(resp.text)
)

def _get_url(self, action):
return "{0}?{1}".format(self.endpoint, action)

def add_txt_record(self, domain, record_name, record_content):
"""
Add a TXT record using the supplied information.
Expand All @@ -117,13 +88,14 @@ def add_txt_record(self, domain, record_name, record_content):
:param str record_content: The record content (typically the challenge validation).
:raises certbot.errors.PluginError: if an error occurs communicating with the IONOS API
"""
print("getting started")
zone_id = self._find_zone_id(domain)
if zone_id is None:
raise errors.PluginError("Domain not known")
logger.debug("domain found: %s with id: %s", domain, zone_id)
record = self.get_existing_txt_acme_record(zone_id, record_name)
record_properties = record["properties"]
if record is not None:
record_properties = record["properties"]
if record_properties["content"] == record_content:
logger.info("already there, id {0}".format(record["id"]))
return
Expand Down Expand Up @@ -151,8 +123,8 @@ def del_txt_record(self, domain, record_name, record_content):
raise errors.PluginError("Domain not known")
logger.debug("domain found: %s with id: %s", domain, zone_id)
record = self.get_existing_txt_acme_record(zone_id, record_name)
record_properties = record["properties"]
if record is not None:
record_properties = record["properties"]
if record_properties["content"] == record_content:
logger.debug("delete TXT record: %s", record["id"])
self._handle_response(
Expand Down

0 comments on commit 0df37a7

Please sign in to comment.