Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(aws): handle none type attributes #5216

Merged
merged 3 commits into from
Sep 30, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def execute(self):
report.resource_tags = rest_api.tags
if rest_api.public_endpoint:
report.status = "FAIL"
report.status_extended = f"API Gateway {rest_api.name} ID {rest_api.id} is internet accesible."
report.status_extended = f"API Gateway {rest_api.name} ID {rest_api.id} is internet accessible."
else:
report.status = "PASS"
report.status_extended = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ def execute(self):
report.resource_tags = repository.tags
report.status = "PASS"
report.status_extended = (
f"Repository {repository.name} is not publicly accesible."
f"Repository {repository.name} is not publicly accessible."
)
if is_policy_public(repository.policy):
report.status = "FAIL"
report.status_extended = f"Repository {repository.name} policy may allow anonymous users to perform actions (Principal: '*')."
if repository.policy:
if is_policy_public(repository.policy):
report.status = "FAIL"
report.status_extended = (
f"Repository {repository.name} is publicly accessible."
)

findings.append(report)

Expand Down
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please add tests for the following lines:

  • 74
  • 82 to 85
  • 88 to 90

If we are changing the logic of the check we should cover those scenarios.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jfagoagas the logic of the check was not changed, only the following line was added:
if org.policies is not None: # Access denied to list policies

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes but even with that I think those lines are not being tested, you should add a test to verify when org.policies is None, don't you?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, test was added!

Original file line number Diff line number Diff line change
Expand Up @@ -12,86 +12,87 @@
)

for org in organizations_client.organizations:
report = Check_Report_AWS(self.metadata())
report.resource_id = org.id
report.resource_arn = org.arn
report.region = organizations_client.region
report.status = "FAIL"
report.status_extended = (
"AWS Organizations is not in-use for this AWS Account."
)

if org.status == "ACTIVE":
if org.policies is not None: # Access denied to list policies
report = Check_Report_AWS(self.metadata())
report.resource_id = org.id
report.resource_arn = org.arn
report.region = organizations_client.region
report.status = "FAIL"
report.status_extended = (
f"AWS Organizations {org.id} does not have SCP policies."
"AWS Organizations is not in-use for this AWS Account."
)
# We use this flag if we find a statement that is restricting regions but not all the configured ones:
is_region_restricted_statement = False

for policy in org.policies.get("SERVICE_CONTROL_POLICY", []):
if org.status == "ACTIVE":
report.status_extended = (
f"AWS Organizations {org.id} does not have SCP policies."
)
# We use this flag if we find a statement that is restricting regions but not all the configured ones:
is_region_restricted_statement = False

for policy in org.policies.get("SERVICE_CONTROL_POLICY", []):

# Statements are not always list
statements = policy.content.get("Statement")
if type(policy.content["Statement"]) is not list:
statements = [policy.content.get("Statement")]
# Statements are not always list
statements = policy.content.get("Statement")
if type(policy.content["Statement"]) is not list:
statements = [policy.content.get("Statement")]

for statement in statements:
# Deny if Condition = {"StringNotEquals": {"aws:RequestedRegion": [region1, region2]}}
if (
statement.get("Effect") == "Deny"
and "Condition" in statement
and "StringNotEquals" in statement["Condition"]
and "aws:RequestedRegion"
in statement["Condition"]["StringNotEquals"]
):
if all(
region
in statement["Condition"]["StringNotEquals"][
"aws:RequestedRegion"
]
for region in organizations_enabled_regions
for statement in statements:
# Deny if Condition = {"StringNotEquals": {"aws:RequestedRegion": [region1, region2]}}
if (
statement.get("Effect") == "Deny"
and "Condition" in statement
and "StringNotEquals" in statement["Condition"]
and "aws:RequestedRegion"
in statement["Condition"]["StringNotEquals"]
):
# All defined regions are restricted, we exit here, no need to continue.
report.status = "PASS"
report.status_extended = f"AWS Organization {org.id} has SCP policy {policy.id} restricting all configured regions found."
findings.append(report)
return findings
else:
# Regions are restricted, but not the ones defined, we keep this finding, but we continue analyzing:
is_region_restricted_statement = True
report.status = "FAIL"
report.status_extended = f"AWS Organization {org.id} has SCP policies {policy.id} restricting some AWS Regions, but not all the configured ones, please check config."
if all(
region
in statement["Condition"]["StringNotEquals"][
"aws:RequestedRegion"
]
for region in organizations_enabled_regions
):
# All defined regions are restricted, we exit here, no need to continue.
report.status = "PASS"
report.status_extended = f"AWS Organization {org.id} has SCP policy {policy.id} restricting all configured regions found."
findings.append(report)
return findings
else:
# Regions are restricted, but not the ones defined, we keep this finding, but we continue analyzing:
is_region_restricted_statement = True
report.status = "FAIL"
report.status_extended = f"AWS Organization {org.id} has SCP policies {policy.id} restricting some AWS Regions, but not all the configured ones, please check config."

# Allow if Condition = {"StringEquals": {"aws:RequestedRegion": [region1, region2]}}
if (
policy.content.get("Statement") == "Allow"
and "Condition" in statement
and "StringEquals" in statement["Condition"]
and "aws:RequestedRegion"
in statement["Condition"]["StringEquals"]
):
if all(
region
in statement["Condition"]["StringEquals"][
"aws:RequestedRegion"
]
for region in organizations_enabled_regions
# Allow if Condition = {"StringEquals": {"aws:RequestedRegion": [region1, region2]}}
if (
policy.content.get("Statement") == "Allow"
and "Condition" in statement
and "StringEquals" in statement["Condition"]
and "aws:RequestedRegion"
in statement["Condition"]["StringEquals"]
):
# All defined regions are restricted, we exit here, no need to continue.
report.status = "PASS"
report.status_extended = f"AWS Organization {org.id} has SCP policy {policy.id} restricting all configured regions found."
findings.append(report)
return findings
else:
# Regions are restricted, but not the ones defined, we keep this finding, but we continue analyzing:
is_region_restricted_statement = True
report.status = "FAIL"
report.status_extended = f"AWS Organization {org.id} has SCP policies {policy.id} restricting some AWS Regions, but not all the configured ones, please check config."
if all(

Check warning on line 74 in prowler/providers/aws/services/organizations/organizations_scp_check_deny_regions/organizations_scp_check_deny_regions.py

View check run for this annotation

Codecov / codecov/patch

prowler/providers/aws/services/organizations/organizations_scp_check_deny_regions/organizations_scp_check_deny_regions.py#L74

Added line #L74 was not covered by tests
region
in statement["Condition"]["StringEquals"][
"aws:RequestedRegion"
]
for region in organizations_enabled_regions
):
# All defined regions are restricted, we exit here, no need to continue.
report.status = "PASS"
report.status_extended = f"AWS Organization {org.id} has SCP policy {policy.id} restricting all configured regions found."
findings.append(report)
return findings

Check warning on line 85 in prowler/providers/aws/services/organizations/organizations_scp_check_deny_regions/organizations_scp_check_deny_regions.py

View check run for this annotation

Codecov / codecov/patch

prowler/providers/aws/services/organizations/organizations_scp_check_deny_regions/organizations_scp_check_deny_regions.py#L82-L85

Added lines #L82 - L85 were not covered by tests
else:
# Regions are restricted, but not the ones defined, we keep this finding, but we continue analyzing:
is_region_restricted_statement = True
report.status = "FAIL"
report.status_extended = f"AWS Organization {org.id} has SCP policies {policy.id} restricting some AWS Regions, but not all the configured ones, please check config."

Check warning on line 90 in prowler/providers/aws/services/organizations/organizations_scp_check_deny_regions/organizations_scp_check_deny_regions.py

View check run for this annotation

Codecov / codecov/patch

prowler/providers/aws/services/organizations/organizations_scp_check_deny_regions/organizations_scp_check_deny_regions.py#L88-L90

Added lines #L88 - L90 were not covered by tests

if not is_region_restricted_statement:
report.status = "FAIL"
report.status_extended = f"AWS Organization {org.id} has SCP policies but don't restrict AWS Regions."
if not is_region_restricted_statement:
report.status = "FAIL"
report.status_extended = f"AWS Organization {org.id} has SCP policies but don't restrict AWS Regions."

findings.append(report)
findings.append(report)

return findings
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,9 @@
except ClientError as error:
if error.response["Error"]["Code"] == "AccessDeniedException":
policies = None
logger.error(

Check warning on line 119 in prowler/providers/aws/services/organizations/organizations_service.py

View check run for this annotation

Codecov / codecov/patch

prowler/providers/aws/services/organizations/organizations_service.py#L119

Added line #L119 was not covered by tests
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)

except Exception as error:
logger.error(
Expand Down Expand Up @@ -219,5 +222,5 @@
id: str
status: str
master_id: str
policies: dict[str, list[Policy]] = {}
policies: Optional[dict[str, list[Policy]]] = {}
delegated_administrators: list[DelegatedAdministrator] = None
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def execute(self):
report.resource_tags = topic.tags
report.status = "PASS"
report.status_extended = (
f"SNS topic {topic.name} is not publicly accesible."
f"SNS topic {topic.name} is not publicly accessible."
)
if topic.policy:
for statement in topic.policy["Statement"]:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def test_apigateway_one_public_rest_api(self):
assert len(result) == 1
assert (
result[0].status_extended
== f"API Gateway test-rest-api ID {rest_api['id']} is internet accesible."
== f"API Gateway test-rest-api ID {rest_api['id']} is internet accessible."
)
assert result[0].resource_id == "test-rest-api"
assert (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,50 @@ def test_repository_not_public(self):
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Repository {repository_name} is not publicly accesible."
== f"Repository {repository_name} is not publicly accessible."
)
assert result[0].resource_id == repository_name
assert result[0].resource_arn == repository_arn

def test_repository_no_policy(self):
ecr_client = mock.MagicMock
ecr_client.registries = {}
ecr_client.registries[AWS_REGION_EU_WEST_1] = Registry(
id=AWS_ACCOUNT_NUMBER,
region=AWS_REGION_EU_WEST_1,
scan_type="BASIC",
repositories=[
Repository(
name=repository_name,
arn=repository_arn,
region=AWS_REGION_EU_WEST_1,
scan_on_push=True,
policy=None,
images_details=None,
lifecycle_policy=None,
)
],
rules=[],
)

with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=set_mocked_aws_provider(),
), mock.patch(
"prowler.providers.aws.services.ecr.ecr_repositories_not_publicly_accessible.ecr_repositories_not_publicly_accessible.ecr_client",
ecr_client,
):
from prowler.providers.aws.services.ecr.ecr_repositories_not_publicly_accessible.ecr_repositories_not_publicly_accessible import (
ecr_repositories_not_publicly_accessible,
)

check = ecr_repositories_not_publicly_accessible()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Repository {repository_name} is not publicly accessible."
)
assert result[0].resource_id == repository_name
assert result[0].resource_arn == repository_arn
Expand Down Expand Up @@ -165,7 +208,7 @@ def test_repository_public(self):
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Repository {repository_name} policy may allow anonymous users to perform actions (Principal: '*')."
== f"Repository {repository_name} is publicly accessible."
)
assert result[0].resource_id == repository_name
assert result[0].resource_arn == repository_arn
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def test_topic_not_public(self):
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"SNS topic {topic_name} is not publicly accesible."
== f"SNS topic {topic_name} is not publicly accessible."
)
assert result[0].resource_id == topic_name
assert result[0].resource_arn == topic_arn
Expand Down Expand Up @@ -167,7 +167,7 @@ def test_topic_no_policy(self):
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"SNS topic {topic_name} is not publicly accesible."
== f"SNS topic {topic_name} is not publicly accessible."
)
assert result[0].resource_id == topic_name
assert result[0].resource_arn == topic_arn
Expand Down