From d8f9971f18a3217eef92f81c9e009259ded8c712 Mon Sep 17 00:00:00 2001 From: gruebel Date: Wed, 9 Nov 2022 23:14:01 +0100 Subject: [PATCH] only parse valid tag key-pairs in CloudFormation --- checkov/cloudformation/cfn_utils.py | 9 +++++++-- tests/cloudformation/runner/test_runner.py | 21 +++++++++++++-------- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/checkov/cloudformation/cfn_utils.py b/checkov/cloudformation/cfn_utils.py index 2bc4be2d611..ea8537150b5 100644 --- a/checkov/cloudformation/cfn_utils.py +++ b/checkov/cloudformation/cfn_utils.py @@ -17,6 +17,7 @@ from checkov.common.models.consts import YAML_COMMENT_MARK CF_POSSIBLE_ENDINGS = frozenset([".yml", ".yaml", ".json", ".template"]) +TAG_FIELD_NAMES = ("Key", "Value") def get_resource_tags(entity: Dict[StrNode, DictNode], registry: Registry = cfn_registry) -> Optional[Dict[str, str]]: @@ -43,8 +44,12 @@ def get_resource_tags(entity: Dict[StrNode, DictNode], registry: Registry = cfn_ def parse_entity_tags(tags: Union[ListNode, Dict[str, Any]]) -> Optional[Dict[str, str]]: - if isinstance(tags, ListNode): - tag_dict = {get_entity_value_as_string(tag["Key"]): get_entity_value_as_string(tag["Value"]) for tag in tags} + if isinstance(tags, list): + tag_dict = { + get_entity_value_as_string(tag["Key"]): get_entity_value_as_string(tag["Value"]) + for tag in tags + if all(field in tag for field in TAG_FIELD_NAMES) + } return tag_dict elif isinstance(tags, dict): tag_dict = { diff --git a/tests/cloudformation/runner/test_runner.py b/tests/cloudformation/runner/test_runner.py index 532eacb537a..43ee0a4dfce 100644 --- a/tests/cloudformation/runner/test_runner.py +++ b/tests/cloudformation/runner/test_runner.py @@ -183,20 +183,25 @@ def test_get_tags(self): entity = {resource_name: resource} entity_tags = cfn_utils.get_resource_tags(entity) - self.assertIsNone(entity_tags) + self.assertDictEqual( + entity_tags, + { + "Name": "TF-FulfillmentServer", + "terraform-server-tag-key": "terraform-server-tag-value", + } + ) resource_name = 'EKSClusterNodegroup' resource = definitions['Resources'][resource_name] entity = {resource_name: resource} entity_tags = cfn_utils.get_resource_tags(entity) - self.assertEqual(len(entity_tags), 1) - tags = { - 'Name': '{\'Ref\': \'ClusterName\'}-EKS-{\'Ref\': \'NodeGroupName\'}' - } - - for name, value in tags.items(): - self.assertEqual(entity_tags[name], value) + self.assertDictEqual( + entity_tags, + { + 'Name': '{\'Ref\': \'ClusterName\'}-EKS-{\'Ref\': \'NodeGroupName\'}', + } + ) def test_wrong_check_imports(self): wrong_imports = ["arm", "dockerfile", "helm", "kubernetes", "serverless", "terraform"]