Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(formating and typing): add more type hints [DNS-1270] #6

Merged
merged 1 commit into from
Apr 10, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 19 additions & 17 deletions certbot_dns_ionos/ionos.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from certbot import errors
from certbot.plugins import dns_common
from typing import Any

logger = logging.getLogger(__name__)

Expand All @@ -24,34 +25,35 @@ def __init__(self, *args, **kwargs):
self.credentials = None

@classmethod
def add_parser_arguments(cls, add): # pylint: disable=arguments-differ
def add_parser_arguments(cls, add):
super(Authenticator, cls).add_parser_arguments(
add, default_propagation_seconds=120
)
add("credentials", help="credentials INI file.")

def more_info(self): # pylint: disable=missing-docstring,no-self-use
def more_info(self) -> str:
return (
"This plugin configures a DNS TXT record to respond to a dns-01"
+ " challenge using the IONOS REST API."
)

def _setup_credentials(self):
def _setup_credentials(self) -> None:
self.credentials = self._configure_credentials(
"credentials",
"IONOS API credentials INI file. Only Bearer token"
+ " authentication is supported",
{"token": "access token for the IONOS API"},
)

def _perform(self, domain, validation_name, validation):
self._get_ionos_client().add_txt_record(domain, validation_name, validation)

def _cleanup(self, domain, validation_name, validation):
self._get_ionos_client().del_txt_record(domain, validation_name, validation)
def _perform(self, domain, validation_name, validation) -> None:
_IONOSClient(self.credentials.conf("token")).add_txt_record(
domain, validation_name, validation
)

def _get_ionos_client(self):
return _IONOSClient(self.credentials.conf("token"))
def _cleanup(self, domain, validation_name, validation) -> None:
_IONOSClient(self.credentials.conf("token")).del_txt_record(
domain, validation_name, validation
)


class _IONOSClient(object):
Expand All @@ -63,7 +65,7 @@ def __init__(self, token: str):
logger.debug("creating IONOS Client")
self.headers = {"Authorization": f"Bearer {token}"}

def _handle_response(self, resp: requests.Response):
def _handle_response(self, resp: requests.Response) -> Any:
if resp.status_code != 200 and resp.status_code != 202:
raise errors.PluginError(
"Received non OK status from IONOS API {0}".format(resp.status_code)
Expand All @@ -73,7 +75,7 @@ def _handle_response(self, resp: requests.Response):
except json.decoder.JSONDecodeError:
raise errors.PluginError("API response with non JSON: {0}".format(resp.text))

def add_txt_record(self, domain, record_name: str, record_content):
def add_txt_record(self, domain: str, record_name: str, record_content: str):
"""
Add a TXT record using the supplied information.

Expand Down Expand Up @@ -106,7 +108,7 @@ def add_txt_record(self, domain, record_name: str, record_content):
logger.info("insert new txt record")
self._insert_txt_record(zone_id, record_name_without_domain, record_content)

def del_txt_record(self, domain, record_name: str, record_content: str):
def del_txt_record(self, domain: str, record_name: str, record_content: str):
"""
Delete a TXT record using the supplied information.
:param str domain: The domain to use to look up the managed zone.
Expand Down Expand Up @@ -135,7 +137,7 @@ def del_txt_record(self, domain, record_name: str, record_content: str):
)
)

def _insert_txt_record(self, zone_id, record_name, record_content):
def _insert_txt_record(self, zone_id: str, record_name: str, record_content: str):
new_record = {
"properties": {
"name": record_name,
Expand All @@ -153,7 +155,7 @@ def _insert_txt_record(self, zone_id, record_name, record_content):
)
logger.debug("create with payload: %s", new_record)

def _update_txt_record(self, zone_id, record):
def _update_txt_record(self, zone_id: str, record: dict):
self._handle_response(
requests.put(
f"{dns_api_base_url}/zones/{zone_id}/records/{record.get('id')}",
Expand All @@ -163,7 +165,7 @@ def _update_txt_record(self, zone_id, record):
)
logger.debug("update with payload: %s", record)

def _find_zone_id(self, domain):
def _find_zone_id(self, domain: str) -> str | None:
"""
Find the zone for a given domain.

Expand All @@ -188,7 +190,7 @@ def _find_zone_id(self, domain):

return None

def get_existing_txt_acme_record(self, zone_id, record_name):
def get_existing_txt_acme_record(self, zone_id: str, record_name: str) -> Any | None:
"""
Get existing TXT records for the record name.

Expand Down