Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(aws): add new check organizations_opt_out_ai_services_policy #5152

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"Provider": "aws",
"CheckID": "organizations_opt_out_ai_services_policy",
"CheckTitle": "Ensure that AWS Organizations opt-out of AI services policy is enabled.",
"CheckType": [],
"ServiceName": "organizations",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:service::account-id:organization/organization-id",
"Severity": "low",
"ResourceType": "Other",
"Description": "This control checks whether the AWS Organizations opt-out of AI services policy is enabled. The control fails if the policy is not enabled.",
"Risk": "By default, AWS may be using your data to train its AI models. This may include data from your AWS CloudTrail logs, AWS Config rules, and AWS GuardDuty findings. If you opt out of AI services, AWS will not use your data to train its AI models.",
"RelatedUrl": "https://docs.aws.amazon.com/organizations/latest/userguide/orgs_manage_policies_ai-opt-out_all.html",
"Remediation": {
"Code": {
"CLI": "aws organizations enable-policy-type --root-id <root-id> --policy-type AI_SERVICES_OPT_OUT {'services': {'default': {'opt_out_policy': {'@@assign': 'optOut'}}}}",
"NativeIaC": "",
"Other": "",
"Terraform": ""
},
"Recommendation": {
"Text": "Artificial Intelligence (AI) services opt-out policies enable you to control whether AWS AI services can store and use your content. Enable the AWS Organizations opt-out of AI services policy.",
"Url": "https://docs.aws.amazon.com/organizations/latest/userguide/disable-policy-type.html"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.organizations.organizations_client import (
organizations_client,
)


class organizations_opt_out_ai_services_policy(Check):
def execute(self):
findings = []

for org in organizations_client.organizations:
report = Check_Report_AWS(self.metadata())
report.resource_id = org.id
report.resource_arn = org.arn
report.region = organizations_client.region
report.status = "FAIL"
report.status_extended = (
"AWS Organizations is not in-use for this AWS Account."
)
if org.status == "ACTIVE":
report.status_extended = f"AWS Organization {org.id} has not opted out of all AI services, granting consent for AWS to access its data."
if org.policies is not None: # Access Denied to list_policies
for policy in org.policies.get("AISERVICES_OPT_OUT_POLICY", []):
if (
policy.content.get("services", {})
.get("default", {})
.get("opt_out_policy", {})
.get("@@assign")
== "optOut"
):
report.status = "PASS"
report.status_extended = f"AWS Organization {org.id} has opted out of all AI services, not granting consent for AWS to access its data."
break

findings.append(report)

return findings
Original file line number Diff line number Diff line change
Expand Up @@ -16,91 +16,81 @@ def execute(self):
report.resource_id = org.id
report.resource_arn = org.arn
report.region = organizations_client.region
report.status = "FAIL"
report.status_extended = (
"AWS Organizations is not in-use for this AWS Account."
)

if org.status == "ACTIVE":
if org.policies is None:
# Access Denied to list_policies
continue
if not org.policies:
report.status = "FAIL"
report.status_extended = (
f"AWS Organization {org.id} has no SCP policies."
)
else:
# We use this flag if we find a statement that is restricting regions but not all the configured ones:
is_region_restricted_statement = False
report.status_extended = (
f"AWS Organizations {org.id} does not have SCP policies."
)
# We use this flag if we find a statement that is restricting regions but not all the configured ones:
is_region_restricted_statement = False

for policy in org.policies:
# We only check SCP policies here
if policy.type != "SERVICE_CONTROL_POLICY":
continue
for policy in org.policies.get("SERVICE_CONTROL_POLICY", []):

# Statements are not always list
statements = policy.content.get("Statement")
if type(policy.content["Statement"]) is not list:
statements = [policy.content.get("Statement")]
# Statements are not always list
statements = policy.content.get("Statement")
if type(policy.content["Statement"]) is not list:
statements = [policy.content.get("Statement")]

for statement in statements:
# Deny if Condition = {"StringNotEquals": {"aws:RequestedRegion": [region1, region2]}}
if (
statement.get("Effect") == "Deny"
and "Condition" in statement
and "StringNotEquals" in statement["Condition"]
and "aws:RequestedRegion"
in statement["Condition"]["StringNotEquals"]
for statement in statements:
# Deny if Condition = {"StringNotEquals": {"aws:RequestedRegion": [region1, region2]}}
if (
statement.get("Effect") == "Deny"
and "Condition" in statement
and "StringNotEquals" in statement["Condition"]
and "aws:RequestedRegion"
in statement["Condition"]["StringNotEquals"]
):
if all(
region
in statement["Condition"]["StringNotEquals"][
"aws:RequestedRegion"
]
for region in organizations_enabled_regions
):
if all(
region
in statement["Condition"]["StringNotEquals"][
"aws:RequestedRegion"
]
for region in organizations_enabled_regions
):
# All defined regions are restricted, we exit here, no need to continue.
report.status = "PASS"
report.status_extended = f"AWS Organization {org.id} has SCP policy {policy.id} restricting all configured regions found."
findings.append(report)
return findings
else:
# Regions are restricted, but not the ones defined, we keep this finding, but we continue analyzing:
is_region_restricted_statement = True
report.status = "FAIL"
report.status_extended = f"AWS Organization {org.id} has SCP policies {policy.id} restricting some AWS Regions, but not all the configured ones, please check config."
# All defined regions are restricted, we exit here, no need to continue.
report.status = "PASS"
report.status_extended = f"AWS Organization {org.id} has SCP policy {policy.id} restricting all configured regions found."
findings.append(report)
return findings
else:
# Regions are restricted, but not the ones defined, we keep this finding, but we continue analyzing:
is_region_restricted_statement = True
report.status = "FAIL"
report.status_extended = f"AWS Organization {org.id} has SCP policies {policy.id} restricting some AWS Regions, but not all the configured ones, please check config."

# Allow if Condition = {"StringEquals": {"aws:RequestedRegion": [region1, region2]}}
if (
policy.content.get("Statement") == "Allow"
and "Condition" in statement
and "StringEquals" in statement["Condition"]
and "aws:RequestedRegion"
in statement["Condition"]["StringEquals"]
# Allow if Condition = {"StringEquals": {"aws:RequestedRegion": [region1, region2]}}
if (
policy.content.get("Statement") == "Allow"
and "Condition" in statement
and "StringEquals" in statement["Condition"]
and "aws:RequestedRegion"
in statement["Condition"]["StringEquals"]
):
if all(
region
in statement["Condition"]["StringEquals"][
"aws:RequestedRegion"
]
for region in organizations_enabled_regions
):
if all(
region
in statement["Condition"]["StringEquals"][
"aws:RequestedRegion"
]
for region in organizations_enabled_regions
):
# All defined regions are restricted, we exit here, no need to continue.
report.status = "PASS"
report.status_extended = f"AWS Organization {org.id} has SCP policy {policy.id} restricting all configured regions found."
findings.append(report)
return findings
else:
# Regions are restricted, but not the ones defined, we keep this finding, but we continue analyzing:
is_region_restricted_statement = True
report.status = "FAIL"
report.status_extended = f"AWS Organization {org.id} has SCP policies {policy.id} restricting some AWS Regions, but not all the configured ones, please check config."
# All defined regions are restricted, we exit here, no need to continue.
report.status = "PASS"
report.status_extended = f"AWS Organization {org.id} has SCP policy {policy.id} restricting all configured regions found."
findings.append(report)
return findings
else:
# Regions are restricted, but not the ones defined, we keep this finding, but we continue analyzing:
is_region_restricted_statement = True
report.status = "FAIL"
report.status_extended = f"AWS Organization {org.id} has SCP policies {policy.id} restricting some AWS Regions, but not all the configured ones, please check config."

if not is_region_restricted_statement:
report.status = "FAIL"
report.status_extended = f"AWS Organization {org.id} has SCP policies but don't restrict AWS Regions."

else:
report.status = "FAIL"
report.status_extended = (
"AWS Organizations is not in-use for this AWS Account."
)
if not is_region_restricted_statement:
report.status = "FAIL"
report.status_extended = f"AWS Organization {org.id} has SCP policies but don't restrict AWS Regions."

findings.append(report)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,37 +8,33 @@
from prowler.lib.scan_filters.scan_filters import is_resource_filtered
from prowler.providers.aws.lib.service.service import AWSService

available_organizations_policies = [
AVAILABLE_ORGANIZATIONS_POLICIES = [
"SERVICE_CONTROL_POLICY",
"TAG_POLICY",
"BACKUP_POLICY",
"AISERVICES_OPT_OUT_POLICY",
]


################## Organizations
class Organizations(AWSService):
def __init__(self, provider):
# Call AWSService's __init__
super().__init__(__class__.__name__, provider)
self.organizations = []
self.policies = []
self.policies = {}
self.delegated_administrators = []
self._describe_organization()

def _describe_organization(self):
logger.info("Organizations - Describe Organization...")

try:
# Check if Organizations is in-use
try:
organization_desc = self.client.describe_organization()["Organization"]
organization_arn = organization_desc.get("Arn")
organization_id = organization_desc.get("Id")
organization_master_id = organization_desc.get("MasterAccountId")
# Fetch policies for organization:
organization_policies = self._list_policies()
# Fetch delegated administrators for organization:
organization_delegated_administrator = (
self._list_delegated_administrators()
)
Expand Down Expand Up @@ -89,23 +85,24 @@ def _describe_organization(self):
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)

# I'm using list_policies instead of list_policies_for_target, because the last one only returns "Attached directly" policies but not "Inherited from..." policies.
def _list_policies(self):
logger.info("Organizations - List policies...")

try:
list_policies_paginator = self.client.get_paginator("list_policies")
for policy_type in available_organizations_policies:
policies = {}
for policy_type in AVAILABLE_ORGANIZATIONS_POLICIES:
logger.info(
"Organizations - List policies... - Type: %s",
policy_type,
)
policies[policy_type] = []
for page in list_policies_paginator.paginate(Filter=policy_type):
for policy in page["Policies"]:
policy_id = policy.get("Id")
policy_content = self._describe_policy(policy_id)
policy_targets = self._list_targets_for_policy(policy_id)
self.policies.append(
policies[policy_type].append(
Policy(
arn=policy.get("Arn"),
id=policy_id,
Expand All @@ -118,20 +115,18 @@ def _list_policies(self):

except ClientError as error:
if error.response["Error"]["Code"] == "AccessDeniedException":
self.policies = None
policies = None

except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)

finally:
return self.policies
return policies

def _describe_policy(self, policy_id) -> dict:
logger.info("Organizations - Describe policy: %s ...", policy_id)

# This operation can be called only from the organization’s management account or by a member account that is a delegated administrator for an Amazon Web Services service.
try:
policy_content = {}
if policy_id:
Expand All @@ -143,8 +138,7 @@ def _describe_policy(self, policy_id) -> dict:
if isinstance(policy_content, str):
policy_content = json.loads(policy_content)

return policy_content # This could be not be a dict, because json.loads could return a list or a string depending on the content of policy_content object.

return policy_content
except Exception as error:
logger.error(
f"{self.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
Expand All @@ -170,7 +164,7 @@ def _list_targets_for_policy(self, policy_id) -> list:
return []

def _list_delegated_administrators(self):
logger.info("Organizations - List Delegated Administrators")
logger.info("Organizations - List Delegated Administrators...")

try:
list_delegated_administrators_paginator = self.client.get_paginator(
Expand Down Expand Up @@ -225,5 +219,5 @@ class Organization(BaseModel):
id: str
status: str
master_id: str
policies: list[Policy] = None
policies: dict[str, list[Policy]] = {}
delegated_administrators: list[DelegatedAdministrator] = None
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,17 @@ def execute(self):
report.status_extended = (
"AWS Organizations is not in-use for this AWS Account."
)
if org.status == "ACTIVE":
if org.policies is None:
# Access Denied to list_policies
continue
for policy in org.policies:
# We only check SCP policies here
if policy.type != "TAG_POLICY":
continue

report.status_extended = f"AWS Organization {org.id} has tag policies enabled but not attached."

if policy.targets:
report.status = "PASS"
report.status_extended = f"AWS Organization {org.id} has tag policies enabled and attached to an AWS account."
if org.status == "ACTIVE":
report.status_extended = (
f"AWS Organizations {org.id} does not have tag policies."
)
if org.policies is not None: # Access Denied to list_policies
for policy in org.policies.get("TAG_POLICY", []):
report.status_extended = f"AWS Organization {org.id} has tag policies enabled but not attached."
if policy.targets:
report.status = "PASS"
report.status_extended = f"AWS Organization {org.id} has tag policies enabled and attached to an AWS account."

findings.append(report)

Expand Down
Loading