Skip to content

Commit

Permalink
feat(networkfirewall): add new check networkfirewall_logging_enabled (
Browse files Browse the repository at this point in the history
#5145)

Co-authored-by: Sergio Garcia <[email protected]>
  • Loading branch information
HugoPBrito and MrCloudSec authored Oct 2, 2024
1 parent b2151e2 commit ff10108
Show file tree
Hide file tree
Showing 6 changed files with 328 additions and 0 deletions.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{
"Provider": "aws",
"CheckID": "networkfirewall_logging_enabled",
"CheckTitle": "Ensure Network Firewall Logging is Enabled",
"CheckType": [
"Software and Configuration Checks/Industry and Regulatory Standards/NIST 800-53"
],
"ServiceName": "network-firewall",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:network-firewall::account-id:firewall/firewall-name",
"Severity": "medium",
"ResourceType": "AwsNetworkFirewallFirewall",
"Description": "This control checks whether logging is enabled for an AWS Network Firewall firewall. The control fails if logging isn't enabled for at least one log type or if the logging destination doesn't exist.",
"Risk": "Failing to enable logging on an AWS Network Firewall can lead to a lack of visibility into network traffic, making it difficult to monitor and respond to security incidents effectively, which could jeopardize the security and integrity of your infrastructure.",
"RelatedUrl": "https://docs.aws.amazon.com/network-firewall/latest/developerguide/firewall-logging.html",
"Remediation": {
"Code": {
"CLI": "aws network-firewall update-logging-configuration --firewall-arn <firewall-arn> --logging-configuration <configuration>",
"NativeIaC": "",
"Other": "https://docs.aws.amazon.com/securityhub/latest/userguide/networkfirewall-controls.html#networkfirewall-2",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable logging for your AWS Network Firewall by updating its logging configuration to ensure comprehensive tracking of network traffic and facilitate better incident response and auditing capabilities.",
"Url": "https://docs.aws.amazon.com/network-firewall/latest/developerguide/firewall-update-logging-configuration.html"
}
},
"Categories": [
"logging"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.networkfirewall.networkfirewall_client import (
networkfirewall_client,
)


class networkfirewall_logging_enabled(Check):
def execute(self):
findings = []
for arn, firewall in networkfirewall_client.network_firewalls.items():
report = Check_Report_AWS(self.metadata())
report.region = firewall.region
report.resource_id = firewall.name
report.resource_arn = arn
report.resource_tags = firewall.tags
report.status = "FAIL"
report.status_extended = (
f"Network Firewall {firewall.name} does not have logging enabled."
)

for configuration in firewall.logging_configuration:
if configuration.log_type or configuration.log_destination:
report.status = "PASS"
report.status_extended = (
f"Network Firewall {firewall.name} has logging enabled."
)
break

findings.append(report)

return findings
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from enum import Enum
from typing import Optional

from pydantic import BaseModel

from prowler.lib.logger import logger
Expand All @@ -17,6 +20,9 @@ def __init__(self, provider):
self.__threading_call__(
self._describe_firewall_policy, self.network_firewalls.values()
)
self.__threading_call__(
self._describe_logging_configuration, self.network_firewalls.values()
)

def _list_firewalls(self, regional_client):
logger.info("Network Firewall - Listing Network Firewalls...")
Expand Down Expand Up @@ -84,8 +90,67 @@ def _describe_firewall_policy(self, network_firewall):
f"{error.__class__.__name__}:{error.__traceback__.tb_lineno} -- {error}"
)

def _describe_logging_configuration(self, network_firewall):
logger.info(
"Network Firewall - Describe Network Firewalls Logging Configuration..."
)
try:
describe_logging_configuration = (
self.regional_clients[network_firewall.region]
.describe_logging_configuration(FirewallArn=network_firewall.arn)
.get("LoggingConfiguration", {})
)
destination_configs = describe_logging_configuration.get(
"LogDestinationConfigs", []
)
network_firewall.logging_configuration = []
if destination_configs:
for log_destination_config in destination_configs:
log_type = LogType(log_destination_config.get("LogType", "FLOW"))
log_destination_type = LogDestinationType(
log_destination_config.get("LogDestinationType", "S3")
)
log_destination = log_destination_config.get("LogDestination", {})
network_firewall.logging_configuration.append(
LoggingConfiguration(
log_type=log_type,
log_destination_type=log_destination_type,
log_destination=log_destination,
)
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}:{error.__traceback__.tb_lineno} -- {error}"
)


class LogType(Enum):
"""Log Type for Network Firewall"""

alert = "ALERT"
flow = "FLOW"
tls = "TLS"


class LogDestinationType(Enum):
"""Log Destination Type for Network Firewall"""

s3 = "S3"
cloudwatch_logs = "CloudWatchLogs"
kinesis_data_firehose = "KinesisDataFirehose"


class LoggingConfiguration(BaseModel):
"""Logging Configuration for Network Firewall"""

log_type: LogType
log_destination_type: LogDestinationType
log_destination: dict = {}


class Firewall(BaseModel):
"""Firewall Model for Network Firewall"""

arn: str
name: str
region: str
Expand All @@ -94,5 +159,6 @@ class Firewall(BaseModel):
tags: list = []
encryption_type: str = None
deletion_protection: bool = False
logging_configuration: Optional[list[LoggingConfiguration]]
stateless_rule_groups: list[str] = []
stateful_rule_groups: list[str] = []
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
from unittest import mock

from prowler.providers.aws.services.networkfirewall.networkfirewall_service import (
Firewall,
LogDestinationType,
LoggingConfiguration,
LogType,
)
from tests.providers.aws.utils import AWS_REGION_US_EAST_1, set_mocked_aws_provider

FIREWALL_ARN = "arn:aws:network-firewall:us-east-1:123456789012:firewall/my-firewall"
FIREWALL_NAME = "my-firewall"
VPC_ID_PROTECTED = "vpc-12345678901234567"
VPC_ID_UNPROTECTED = "vpc-12345678901234568"
POLICY_ARN = "arn:aws:network-firewall:us-east-1:123456789012:firewall-policy/my-policy"


class Test_networkfirewall_logging_enabled:
def test_no_networkfirewall(self):
networkfirewall_client = mock.MagicMock
networkfirewall_client.provider = set_mocked_aws_provider(
[AWS_REGION_US_EAST_1]
)
networkfirewall_client.region = AWS_REGION_US_EAST_1
networkfirewall_client.network_firewalls = {}

aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])

with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.networkfirewall.networkfirewall_logging_enabled.networkfirewall_logging_enabled.networkfirewall_client",
new=networkfirewall_client,
):
# Test Check
from prowler.providers.aws.services.networkfirewall.networkfirewall_logging_enabled.networkfirewall_logging_enabled import (
networkfirewall_logging_enabled,
)

check = networkfirewall_logging_enabled()
result = check.execute()

assert len(result) == 0

def test_networkfirewall_logging_disabled(self):
networkfirewall_client = mock.MagicMock
networkfirewall_client.provider = set_mocked_aws_provider(
[AWS_REGION_US_EAST_1]
)
networkfirewall_client.region = AWS_REGION_US_EAST_1
networkfirewall_client.network_firewalls = {
FIREWALL_ARN: Firewall(
arn=FIREWALL_ARN,
name=FIREWALL_NAME,
region=AWS_REGION_US_EAST_1,
policy_arn=POLICY_ARN,
vpc_id=VPC_ID_PROTECTED,
tags=[],
encryption_type="CUSTOMER_KMS",
logging_configuration=[],
)
}

aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])

with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.networkfirewall.networkfirewall_logging_enabled.networkfirewall_logging_enabled.networkfirewall_client",
new=networkfirewall_client,
):
# Test Check
from prowler.providers.aws.services.networkfirewall.networkfirewall_logging_enabled.networkfirewall_logging_enabled import (
networkfirewall_logging_enabled,
)

check = networkfirewall_logging_enabled()
result = check.execute()

assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Network Firewall {FIREWALL_NAME} does not have logging enabled."
)
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_id == FIREWALL_NAME
assert result[0].resource_tags == []
assert result[0].resource_arn == FIREWALL_ARN

def test_networkfirewall_logging_enabled(self):
networkfirewall_client = mock.MagicMock
networkfirewall_client.provider = set_mocked_aws_provider(
[AWS_REGION_US_EAST_1]
)
networkfirewall_client.region = AWS_REGION_US_EAST_1
networkfirewall_client.network_firewalls = {
FIREWALL_ARN: Firewall(
arn=FIREWALL_ARN,
name=FIREWALL_NAME,
region=AWS_REGION_US_EAST_1,
policy_arn=POLICY_ARN,
vpc_id=VPC_ID_PROTECTED,
tags=[],
encryption_type="CUSTOMER_KMS",
logging_configuration=[
LoggingConfiguration(
log_type=LogType.flow,
log_destination_type=LogDestinationType.s3,
log_destination={"bucket_name": "my-bucket"},
)
],
),
}

aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])

with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.networkfirewall.networkfirewall_logging_enabled.networkfirewall_logging_enabled.networkfirewall_client",
new=networkfirewall_client,
):
# Test Check
from prowler.providers.aws.services.networkfirewall.networkfirewall_logging_enabled.networkfirewall_logging_enabled import (
networkfirewall_logging_enabled,
)

check = networkfirewall_logging_enabled()
result = check.execute()

assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Network Firewall {FIREWALL_NAME} has logging enabled."
)
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_id == FIREWALL_NAME
assert result[0].resource_tags == []
assert result[0].resource_arn == FIREWALL_ARN
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
from unittest import mock
from unittest.mock import patch

import botocore

from prowler.providers.aws.services.networkfirewall.networkfirewall_service import (
Firewall,
LogDestinationType,
LoggingConfiguration,
LogType,
NetworkFirewall,
)
from tests.providers.aws.utils import AWS_REGION_US_EAST_1, set_mocked_aws_provider
Expand Down Expand Up @@ -86,6 +91,51 @@ def test_list_firewalls(self):
)
assert networkfirewall.network_firewalls[FIREWALL_ARN].name == FIREWALL_NAME

def test_describe_logging_configuration(self):
networkfirewall = mock.MagicMock
networkfirewall.provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
networkfirewall.region = AWS_REGION_US_EAST_1
networkfirewall.network_firewalls = {
FIREWALL_ARN: Firewall(
arn=FIREWALL_ARN,
name=FIREWALL_NAME,
region=AWS_REGION_US_EAST_1,
policy_arn=POLICY_ARN,
vpc_id=VPC_ID,
tags=[{"Key": "test_tag", "Value": "test_value"}],
encryption_type="CUSTOMER_KMS",
logging_configuration=[
LoggingConfiguration(
log_type=LogType.flow,
log_destination_type=LogDestinationType.s3,
log_destination={
"bucket_name": "my-bucket",
},
)
],
)
}
assert len(networkfirewall.network_firewalls) == 1
assert (
networkfirewall.network_firewalls[FIREWALL_ARN].region
== AWS_REGION_US_EAST_1
)
assert networkfirewall.network_firewalls[FIREWALL_ARN].name == FIREWALL_NAME
assert networkfirewall.network_firewalls[FIREWALL_ARN].policy_arn == POLICY_ARN
assert networkfirewall.network_firewalls[FIREWALL_ARN].vpc_id == VPC_ID
assert networkfirewall.network_firewalls[FIREWALL_ARN].tags == [
{"Key": "test_tag", "Value": "test_value"}
]
assert networkfirewall.network_firewalls[
FIREWALL_ARN
].logging_configuration == [
LoggingConfiguration(
log_type=LogType.flow,
log_destination_type=LogDestinationType.s3,
log_destination={"bucket_name": "my-bucket"},
)
]

def test_describe_firewall(self):
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
networkfirewall = NetworkFirewall(aws_provider)
Expand Down

0 comments on commit ff10108

Please sign in to comment.