Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(networkfirewall): add new check networkfirewall_logging_enabled #5145

Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{
"Provider": "aws",
"CheckID": "networkfirewall_logging_enabled",
"CheckTitle": "Ensure Network Firewall Logging is Enabled",
"CheckType": [
"Software and Configuration Checks/Industry and Regulatory Standards/NIST 800-53"
],
"ServiceName": "networkfirewall",
MrCloudSec marked this conversation as resolved.
Show resolved Hide resolved
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:network-firewall::account-id:firewall/firewall-name",
"Severity": "medium",
"ResourceType": "AwsNetworkFirewallFirewall",
"Description": "This control checks whether logging is enabled for an AWS Network Firewall firewall. The control fails if logging isn't enabled for at least one log type or if the logging destination doesn't exist.",
"Risk": "Failing to enable logging on an AWS Network Firewall can lead to a lack of visibility into network traffic, making it difficult to monitor and respond to security incidents effectively, which could jeopardize the security and integrity of your infrastructure.",
"RelatedUrl": "https://docs.aws.amazon.com/network-firewall/latest/developerguide/firewall-logging.html",
"Remediation": {
"Code": {
"CLI": "aws network-firewall update-logging-configuration --firewall-arn <firewall-arn> --logging-configuration <configuration>",
"NativeIaC": "",
"Other": "https://docs.aws.amazon.com/securityhub/latest/userguide/networkfirewall-controls.html#networkfirewall-2",
"Terraform": ""
},
"Recommendation": {
"Text": "Enable logging for your AWS Network Firewall by updating its logging configuration to ensure comprehensive tracking of network traffic and facilitate better incident response and auditing capabilities.",
"Url": "https://docs.aws.amazon.com/network-firewall/latest/developerguide/firewall-update-logging-configuration.html"
}
},
"Categories": [
"logging"
],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.networkfirewall.networkfirewall_client import (
networkfirewall_client,
)
from prowler.providers.aws.services.networkfirewall.networkfirewall_service import (
LogType,
)


class networkfirewall_logging_enabled(Check):
def execute(self):
findings = []
for arn, firewall in networkfirewall_client.network_firewalls.items():
report = Check_Report_AWS(self.metadata())
report.region = firewall.region
report.resource_id = firewall.name
report.resource_arn = arn
report.resource_tags = firewall.tags
report.status = "FAIL"
report.status_extended = f"Network Firewall {firewall.name} does not have logging enabled in any destination."

for configuration in firewall.logging_configuration:
if configuration.log_type in LogType or configuration.log_destination:
report.status = "PASS"
report.status_extended = f"Network Firewall {firewall.name} has logging enabled in at least one destination."
break

findings.append(report)

return findings
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from enum import Enum
from typing import Optional

from pydantic import BaseModel

from prowler.lib.logger import logger
Expand All @@ -17,6 +20,9 @@ def __init__(self, provider):
self.__threading_call__(
self._describe_firewall_policy, self.network_firewalls.values()
)
self.__threading_call__(
self._describe_logging_configuration, self.network_firewalls.values()
)

def _list_firewalls(self, regional_client):
logger.info("Network Firewall - Listing Network Firewalls...")
Expand Down Expand Up @@ -84,8 +90,67 @@ def _describe_firewall_policy(self, network_firewall):
f"{error.__class__.__name__}:{error.__traceback__.tb_lineno} -- {error}"
)

def _describe_logging_configuration(self, network_firewall):
logger.info(
"Network Firewall - Describe Network Firewalls Logging Configuration..."
)
try:
describe_logging_configuration = (
self.regional_clients[network_firewall.region]
.describe_logging_configuration(FirewallArn=network_firewall.arn)
.get("LoggingConfiguration", {})
)
destination_configs = describe_logging_configuration.get(
"LogDestinationConfigs", []
)
network_firewall.logging_configuration = []
if destination_configs:
for log_destination_config in destination_configs:
log_type = LogType(log_destination_config.get("LogType", "FLOW"))
log_destination_type = LogDestinationType(
log_destination_config.get("LogDestinationType", "S3")
)
log_destination = log_destination_config.get("LogDestination", {})
network_firewall.logging_configuration.append(
LoggingConfiguration(
log_type=log_type,
log_destination_type=log_destination_type,
log_destination=log_destination,
)
)
except Exception as error:
logger.error(
f"{error.__class__.__name__}:{error.__traceback__.tb_lineno} -- {error}"
)


class LogType(Enum):
"""Log Type for Network Firewall"""

alert = "ALERT"
flow = "FLOW"
tls = "TLS"


class LogDestinationType(Enum):
"""Log Destination Type for Network Firewall"""

s3 = "S3"
cloudwatch_logs = "CloudWatchLogs"
kinesis_data_firehose = "KinesisDataFirehose"


class LoggingConfiguration(BaseModel):
"""Logging Configuration for Network Firewall"""

log_type: LogType
log_destination_type: LogDestinationType
log_destination: dict = {}


class Firewall(BaseModel):
"""Firewall Model for Network Firewall"""

arn: str
name: str
region: str
Expand All @@ -94,5 +159,6 @@ class Firewall(BaseModel):
tags: list = []
encryption_type: str = None
deletion_protection: bool = False
logging_configuration: Optional[list[LoggingConfiguration]]
stateless_rule_groups: list[str] = []
stateful_rule_groups: list[str] = []
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
from unittest import mock

from prowler.providers.aws.services.networkfirewall.networkfirewall_service import (
Firewall,
LogDestinationType,
LoggingConfiguration,
LogType,
)
from tests.providers.aws.utils import AWS_REGION_US_EAST_1, set_mocked_aws_provider

FIREWALL_ARN = "arn:aws:network-firewall:us-east-1:123456789012:firewall/my-firewall"
FIREWALL_NAME = "my-firewall"
VPC_ID_PROTECTED = "vpc-12345678901234567"
VPC_ID_UNPROTECTED = "vpc-12345678901234568"
POLICY_ARN = "arn:aws:network-firewall:us-east-1:123456789012:firewall-policy/my-policy"


class Test_networkfirewall_logging_enabled:
def test_no_networkfirewall(self):
networkfirewall_client = mock.MagicMock
networkfirewall_client.provider = set_mocked_aws_provider(
[AWS_REGION_US_EAST_1]
)
networkfirewall_client.region = AWS_REGION_US_EAST_1
networkfirewall_client.network_firewalls = {}

aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])

with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.networkfirewall.networkfirewall_logging_enabled.networkfirewall_logging_enabled.networkfirewall_client",
new=networkfirewall_client,
):
# Test Check
from prowler.providers.aws.services.networkfirewall.networkfirewall_logging_enabled.networkfirewall_logging_enabled import (
networkfirewall_logging_enabled,
)

check = networkfirewall_logging_enabled()
result = check.execute()

assert len(result) == 0

def test_networkfirewall_logging_disabled(self):
networkfirewall_client = mock.MagicMock
networkfirewall_client.provider = set_mocked_aws_provider(
[AWS_REGION_US_EAST_1]
)
networkfirewall_client.region = AWS_REGION_US_EAST_1
networkfirewall_client.network_firewalls = {
FIREWALL_ARN: Firewall(
arn=FIREWALL_ARN,
name=FIREWALL_NAME,
region=AWS_REGION_US_EAST_1,
policy_arn=POLICY_ARN,
vpc_id=VPC_ID_PROTECTED,
tags=[],
encryption_type="CUSTOMER_KMS",
logging_configuration=[],
)
}

aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])

with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.networkfirewall.networkfirewall_logging_enabled.networkfirewall_logging_enabled.networkfirewall_client",
new=networkfirewall_client,
):
# Test Check
from prowler.providers.aws.services.networkfirewall.networkfirewall_logging_enabled.networkfirewall_logging_enabled import (
networkfirewall_logging_enabled,
)

check = networkfirewall_logging_enabled()
result = check.execute()

assert len(result) == 1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"Network Firewall {FIREWALL_NAME} does not have logging enabled in any destination."
)
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_id == FIREWALL_NAME
assert result[0].resource_tags == []
assert result[0].resource_arn == FIREWALL_ARN

def test_networkfirewall_logging_enabled(self):
networkfirewall_client = mock.MagicMock
networkfirewall_client.provider = set_mocked_aws_provider(
[AWS_REGION_US_EAST_1]
)
networkfirewall_client.region = AWS_REGION_US_EAST_1
networkfirewall_client.network_firewalls = {
FIREWALL_ARN: Firewall(
arn=FIREWALL_ARN,
name=FIREWALL_NAME,
region=AWS_REGION_US_EAST_1,
policy_arn=POLICY_ARN,
vpc_id=VPC_ID_PROTECTED,
tags=[],
encryption_type="CUSTOMER_KMS",
logging_configuration=[
LoggingConfiguration(
log_type=LogType.flow,
log_destination_type=LogDestinationType.s3,
log_destination={"bucket_name": "my-bucket"},
)
],
),
}

aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])

with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.networkfirewall.networkfirewall_logging_enabled.networkfirewall_logging_enabled.networkfirewall_client",
new=networkfirewall_client,
):
# Test Check
from prowler.providers.aws.services.networkfirewall.networkfirewall_logging_enabled.networkfirewall_logging_enabled import (
networkfirewall_logging_enabled,
)

check = networkfirewall_logging_enabled()
result = check.execute()

assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"Network Firewall {FIREWALL_NAME} has logging enabled in at least one destination."
)
assert result[0].region == AWS_REGION_US_EAST_1
assert result[0].resource_id == FIREWALL_NAME
assert result[0].resource_tags == []
assert result[0].resource_arn == FIREWALL_ARN
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
from unittest import mock
from unittest.mock import patch

import botocore

from prowler.providers.aws.services.networkfirewall.networkfirewall_service import (
Firewall,
LogDestinationType,
LoggingConfiguration,
LogType,
NetworkFirewall,
)
from tests.providers.aws.utils import AWS_REGION_US_EAST_1, set_mocked_aws_provider
Expand Down Expand Up @@ -86,6 +91,51 @@ def test_list_firewalls(self):
)
assert networkfirewall.network_firewalls[FIREWALL_ARN].name == FIREWALL_NAME

def test_describe_logging_configuration(self):
networkfirewall = mock.MagicMock
networkfirewall.provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
networkfirewall.region = AWS_REGION_US_EAST_1
networkfirewall.network_firewalls = {
FIREWALL_ARN: Firewall(
arn=FIREWALL_ARN,
name=FIREWALL_NAME,
region=AWS_REGION_US_EAST_1,
policy_arn=POLICY_ARN,
vpc_id=VPC_ID,
tags=[{"Key": "test_tag", "Value": "test_value"}],
encryption_type="CUSTOMER_KMS",
logging_configuration=[
LoggingConfiguration(
log_type=LogType.flow,
log_destination_type=LogDestinationType.s3,
log_destination={
"bucket_name": "my-bucket",
},
)
],
)
}
assert len(networkfirewall.network_firewalls) == 1
assert (
networkfirewall.network_firewalls[FIREWALL_ARN].region
== AWS_REGION_US_EAST_1
)
assert networkfirewall.network_firewalls[FIREWALL_ARN].name == FIREWALL_NAME
assert networkfirewall.network_firewalls[FIREWALL_ARN].policy_arn == POLICY_ARN
assert networkfirewall.network_firewalls[FIREWALL_ARN].vpc_id == VPC_ID
assert networkfirewall.network_firewalls[FIREWALL_ARN].tags == [
{"Key": "test_tag", "Value": "test_value"}
]
assert networkfirewall.network_firewalls[
FIREWALL_ARN
].logging_configuration == [
LoggingConfiguration(
log_type=LogType.flow,
log_destination_type=LogDestinationType.s3,
log_destination={"bucket_name": "my-bucket"},
)
]

def test_describe_firewall(self):
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
networkfirewall = NetworkFirewall(aws_provider)
Expand Down
Loading