Skip to content

Commit

Permalink
feat(ecs): Ensure ECS containers have a logging configuration specifi…
Browse files Browse the repository at this point in the history
…ed (#5234)

Co-authored-by: Sergio Garcia <[email protected]>
  • Loading branch information
MarioRgzLpz and sergargar authored Sep 30, 2024
1 parent 3db541a commit 30e3fd9
Show file tree
Hide file tree
Showing 12 changed files with 247 additions and 16 deletions.
4 changes: 4 additions & 0 deletions prowler/providers/aws/services/ecs/ecs_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ def _describe_task_definition(self, task_definition):
),
user=container.get("user", ""),
environment=environment,
log_driver=container.get("logConfiguration", {}).get(
"logDriver", ""
),
)
)
task_definition.pid_mode = response["taskDefinition"].get("pidMode", "")
Expand Down Expand Up @@ -173,6 +176,7 @@ class ContainerDefinition(BaseModel):
readonly_rootfilesystem: bool = False
user: str
environment: list[ContainerEnvVariable]
log_driver: Optional[str]


class TaskDefinition(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def execute(self):
report.resource_arn = task_definition.arn
report.resource_tags = task_definition.tags
report.status = "PASS"
report.status_extended = f"ECS task definition {task_definition.name} does not have containers with write access to the root filesystems."
report.status_extended = f"ECS task definition {task_definition.name} with revision {task_definition.revision} does not have containers with write access to the root filesystems."

failed_containers = []
for container in task_definition.container_definitions:
Expand All @@ -21,7 +21,7 @@ def execute(self):
failed_containers.append(container.name)

if failed_containers:
report.status_extended = f"ECS task definition {task_definition.name} has containers with write access to the root filesystem: {', '.join(failed_containers)}"
report.status_extended = f"ECS task definition {task_definition.name} with revision {task_definition.revision} has containers with write access to the root filesystem: {', '.join(failed_containers)}"
findings.append(report)

return findings
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def execute(self):
report.resource_arn = task_definition.arn
report.resource_tags = task_definition.tags
report.status = "PASS"
report.status_extended = f"ECS task definition {task_definition.name} does not have host network mode."
report.status_extended = f"ECS task definition {task_definition.name} with revision {task_definition.revision} does not have host network mode."
failed_containers = []
if task_definition.network_mode == "host":
for container in task_definition.container_definitions:
Expand All @@ -23,8 +23,8 @@ def execute(self):
failed_containers.append(container.name)

if failed_containers:
report.status_extended = f"ECS task definition {task_definition.name} has containers with host network mode and non-privileged containers running as root or with no user specified: {', '.join(failed_containers)}"
report.status_extended = f"ECS task definition {task_definition.name} with revision {task_definition.revision} has containers with host network mode and non-privileged containers running as root or with no user specified: {', '.join(failed_containers)}"
else:
report.status_extended = f"ECS task definition {task_definition.name} has host network mode but no containers running as root or with no user specified."
report.status_extended = f"ECS task definition {task_definition.name} with revision {task_definition.revision} has host network mode but no containers running as root or with no user specified."
findings.append(report)
return findings
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{
"Provider": "aws",
"CheckID": "ecs_task_definitions_logging_enabled",
"CheckTitle": "ECS task definitions containers should have a logging configuration",
"CheckType": [
"Software and Configuration Checks/AWS Security Best Practices"
],
"ServiceName": "ecs",
"SubServiceName": "",
"ResourceIdTemplate": "arn:aws:ecs:{region}:{account-id}:task-definition/{task-definition-name}",
"Severity": "high",
"ResourceType": "AwsEcsTaskDefinition",
"Description": "This control checks if the latest active Amazon ECS task definition has a logging configuration specified. The control fails if the task definition doesn't have the logConfiguration property defined or if the value for logDriver is null in at least one container definition.",
"Risk": "Without a logging configuration, important data may be lost, making it difficult to troubleshoot issues, monitor performance, and ensure compliance with auditing requirements.",
"RelatedUrl": "https://docs.aws.amazon.com/config/latest/developerguide/ecs-task-definition-log-configuration.html",
"Remediation": {
"Code": {
"CLI": "aws ecs register-task-definition --family <task-family> --container-definitions '[{\"name\":\"<container-name>\",\"image\":\"<image>\",\"logConfiguration\":{\"logDriver\":\"awslogs\",\"options\":{\"awslogs-group\":\"<log-group>\",\"awslogs-region\":\"<region>\",\"awslogs-stream-prefix\":\"ecs\"}}}]'",
"NativeIaC": "",
"Other": "https://docs.aws.amazon.com/securityhub/latest/userguide/ecs-controls.html#ecs-9",
"Terraform": ""
},
"Recommendation": {
"Text": "Define a logging configuration in the ECS task definition to ensure important data is captured and available for debugging, monitoring, and auditing purposes.",
"Url": "https://docs.aws.amazon.com/AmazonECS/latest/developerguide/using_awslogs.html#specify-log-config"
}
},
"Categories": [
"logging"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.ecs.ecs_client import ecs_client


class ecs_task_definitions_logging_enabled(Check):
def execute(self):
findings = []
for task_definition in ecs_client.task_definitions.values():
report = Check_Report_AWS(self.metadata())
report.region = task_definition.region
report.resource_id = f"{task_definition.name}:{task_definition.revision}"
report.resource_arn = task_definition.arn
report.resource_tags = task_definition.tags
report.status = "PASS"
report.status_extended = f"ECS task definition {task_definition.name} with revision {task_definition.revision} containers have logging configured."
failed_containers = []
for container in task_definition.container_definitions:
if not container.log_driver:
report.status = "FAIL"
failed_containers.append(container.name)

if failed_containers:
report.status_extended = f"ECS task definition {task_definition.name} with revision {task_definition.revision} has containers running with no logging configuration: {', '.join(failed_containers)}"

findings.append(report)
return findings
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ def execute(self):
report.resource_arn = task_definition.arn
report.resource_tags = task_definition.tags
report.status = "PASS"
report.status_extended = f"ECS task definition {task_definition.name} does not have privileged containers."
report.status_extended = f"ECS task definition {task_definition.name} with revision {task_definition.revision} does not have privileged containers."
failed_containers = []
for container in task_definition.container_definitions:
if container.privileged:
report.status = "FAIL"
failed_containers.append(container.name)

if failed_containers:
report.status_extended = f"ECS task definition {task_definition.name} has privileged containers: {', '.join(failed_containers)}"
report.status_extended = f"ECS task definition {task_definition.name} with revision {task_definition.revision} has privileged containers: {', '.join(failed_containers)}"
findings.append(report)
return findings
1 change: 1 addition & 0 deletions tests/providers/aws/services/ecs/ecs_service_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ def test_describe_task_definitions(self):
assert ecs.task_definitions[task_arn].network_mode == "host"
assert not ecs.task_definitions[task_arn].container_definitions[0].privileged
assert ecs.task_definitions[task_arn].container_definitions[0].user == ""
assert ecs.task_definitions[task_arn].container_definitions[0].log_driver == ""
assert ecs.task_definitions[task_arn].pid_mode == "host"
assert (
not ecs.task_definitions[task_arn]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def test_task_definition_all_containers_readonly(self):
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"ECS task definition {TASK_NAME} does not have containers with write access to the root filesystems."
== f"ECS task definition {TASK_NAME} with revision {TASK_REVISION} does not have containers with write access to the root filesystems."
)

def test_task_definition_some_containers_not_readonly(self):
Expand Down Expand Up @@ -100,7 +100,7 @@ def test_task_definition_some_containers_not_readonly(self):
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"ECS task definition {TASK_NAME} has containers with write access to the root filesystem: {CONTAINER_NAME}"
== f"ECS task definition {TASK_NAME} with revision {TASK_REVISION} has containers with write access to the root filesystem: {CONTAINER_NAME}"
)

def test_task_definition_mixed_containers(self):
Expand Down Expand Up @@ -145,5 +145,5 @@ def test_task_definition_mixed_containers(self):
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"ECS task definition {TASK_NAME} has containers with write access to the root filesystem: {CONTAINER_NAME}"
== f"ECS task definition {TASK_NAME} with revision {TASK_REVISION} has containers with write access to the root filesystem: {CONTAINER_NAME}"
)
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def test_task_definition_no_host_network_mode(self):
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"ECS task definition {TASK_NAME} does not have host network mode."
== f"ECS task definition {TASK_NAME} with revision {TASK_REVISION} does not have host network mode."
)

def test_task_definition_host_mode_container_root_non_privileged(self):
Expand Down Expand Up @@ -104,7 +104,7 @@ def test_task_definition_host_mode_container_root_non_privileged(self):
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"ECS task definition {TASK_NAME} has containers with host network mode and non-privileged containers running as root or with no user specified: {CONTAINER_NAME}"
== f"ECS task definition {TASK_NAME} with revision {TASK_REVISION} has containers with host network mode and non-privileged containers running as root or with no user specified: {CONTAINER_NAME}"
)

def test_task_definition_host_mode_container_privileged(self):
Expand Down Expand Up @@ -140,7 +140,7 @@ def test_task_definition_host_mode_container_privileged(self):
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"ECS task definition {TASK_NAME} has host network mode but no containers running as root or with no user specified."
== f"ECS task definition {TASK_NAME} with revision {TASK_REVISION} has host network mode but no containers running as root or with no user specified."
)

def test_task_definition_host_mode_container_not_root(self):
Expand Down Expand Up @@ -176,5 +176,5 @@ def test_task_definition_host_mode_container_not_root(self):
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"ECS task definition {TASK_NAME} has host network mode but no containers running as root or with no user specified."
== f"ECS task definition {TASK_NAME} with revision {TASK_REVISION} has host network mode but no containers running as root or with no user specified."
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
from unittest import mock

import botocore

from prowler.providers.aws.services.ecs.ecs_service import (
ContainerDefinition,
TaskDefinition,
)
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
)

TASK_NAME = "test-task"
TASK_REVISION = "1"
CONTAINER_NAME = "test-container"
TASK_ARN = f"arn:aws:ecs:{AWS_REGION_US_EAST_1}:{AWS_ACCOUNT_NUMBER}:task-definition/{TASK_NAME}:{TASK_REVISION}"


make_api_call = botocore.client.BaseClient._make_api_call


def mock_make_api_call(self, operation_name, kwarg):
if operation_name == "ListTaskDefinitions":
return {
"taskDefinitionArns": [
"arn:aws:ecs:eu-west-1:123456789012:task-definition/test-task:1"
]
}
if operation_name == "DescribeTaskDefinition":
return {
"taskDefinition": {
"containerDefinitions": [
{
"name": "test-container",
"image": "test-image",
"environment": [
{"name": "DB_PASSWORD", "value": "pass-12343"},
],
}
],
"networkMode": "host",
"tags": [],
}
}
return make_api_call(self, operation_name, kwarg)


class Test_ecs_task_definitions_logging_enabled:
def test_no_task_definitions(self):
ecs_client = mock.MagicMock
ecs_client.task_definitions = {}

with mock.patch(
"prowler.providers.aws.services.ecs.ecs_service.ECS",
ecs_client,
):
from prowler.providers.aws.services.ecs.ecs_task_definitions_logging_enabled.ecs_task_definitions_logging_enabled import (
ecs_task_definitions_logging_enabled,
)

check = ecs_task_definitions_logging_enabled()
result = check.execute()
assert len(result) == 0

@mock.patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
def test_task_definition_no_logconfiguration(self):

from prowler.providers.aws.services.ecs.ecs_service import ECS

aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])

with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.ecs.ecs_task_definitions_logging_enabled.ecs_task_definitions_logging_enabled.ecs_client",
new=ECS(aws_provider),
):
from prowler.providers.aws.services.ecs.ecs_task_definitions_logging_enabled.ecs_task_definitions_logging_enabled import (
ecs_task_definitions_logging_enabled,
)

check = ecs_task_definitions_logging_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"ECS task definition {TASK_NAME} with revision {TASK_REVISION} has containers running with no logging configuration: {CONTAINER_NAME}"
)

def test_task_definition_no_logdriver(self):
ecs_client = mock.MagicMock
ecs_client.task_definitions = {}
ecs_client.task_definitions[TASK_ARN] = TaskDefinition(
name=TASK_NAME,
arn=TASK_ARN,
revision=TASK_REVISION,
region=AWS_REGION_US_EAST_1,
network_mode="host",
container_definitions=[
ContainerDefinition(
name=CONTAINER_NAME,
privileged=True,
user="root",
environment=[],
log_driver="",
)
],
)

with mock.patch(
"prowler.providers.aws.services.ecs.ecs_service.ECS",
ecs_client,
):
from prowler.providers.aws.services.ecs.ecs_task_definitions_logging_enabled.ecs_task_definitions_logging_enabled import (
ecs_task_definitions_logging_enabled,
)

check = ecs_task_definitions_logging_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"ECS task definition {TASK_NAME} with revision {TASK_REVISION} has containers running with no logging configuration: {CONTAINER_NAME}"
)

def test_task_definition_privileged_container(self):
ecs_client = mock.MagicMock
ecs_client.task_definitions = {}
ecs_client.task_definitions[TASK_ARN] = TaskDefinition(
name=TASK_NAME,
arn=TASK_ARN,
revision=TASK_REVISION,
region=AWS_REGION_US_EAST_1,
network_mode="host",
container_definitions=[
ContainerDefinition(
name=CONTAINER_NAME,
privileged=True,
user="root",
environment=[],
log_driver="awslogs",
)
],
)

with mock.patch(
"prowler.providers.aws.services.ecs.ecs_service.ECS",
ecs_client,
):
from prowler.providers.aws.services.ecs.ecs_task_definitions_logging_enabled.ecs_task_definitions_logging_enabled import (
ecs_task_definitions_logging_enabled,
)

check = ecs_task_definitions_logging_enabled()
result = check.execute()
assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"ECS task definition {TASK_NAME} with revision {TASK_REVISION} containers have logging configured."
)
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def test_task_definition_no_priviled_container(self):
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"ECS task definition {TASK_NAME} does not have privileged containers."
== f"ECS task definition {TASK_NAME} with revision {TASK_REVISION} does not have privileged containers."
)

def test_task_definition_privileged_container(self):
Expand Down Expand Up @@ -104,5 +104,5 @@ def test_task_definition_privileged_container(self):
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"ECS task definition {TASK_NAME} has privileged containers: {CONTAINER_NAME}"
== f"ECS task definition {TASK_NAME} with revision {TASK_REVISION} has privileged containers: {CONTAINER_NAME}"
)

0 comments on commit 30e3fd9

Please sign in to comment.