-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(azure containerregistry): gather service infos and checks disabl…
…ed admin user (#5191) Co-authored-by: Pedro Martín <[email protected]> Co-authored-by: Rubén De la Torre Vico <[email protected]> Co-authored-by: Sergio <[email protected]>
- Loading branch information
1 parent
da87c0d
commit 234f8c2
Showing
10 changed files
with
1,362 additions
and
935 deletions.
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
Empty file.
Empty file.
30 changes: 30 additions & 0 deletions
30
...containerregistry_admin_user_disabled/containerregistry_admin_user_disabled.metadata.json
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
{ | ||
"Provider": "azure", | ||
"CheckID": "containerregistry_admin_user_disabled", | ||
"CheckTitle": "Ensure admin user is disabled for Azure Container Registry", | ||
"CheckType": [], | ||
"ServiceName": "containerregistry", | ||
"SubServiceName": "", | ||
"ResourceIdTemplate": "", | ||
"Severity": "high", | ||
"ResourceType": "ContainerRegistry", | ||
"Description": "Ensure that the admin user is disabled and Role-Based Access Control (RBAC) is used instead since it could grant unrestricted access to the registry", | ||
"Risk": "If the admin user is enabled, it may lead to unauthorized access to the container registry and its resources, which could compromise the confidentiality, integrity, and availability of the images stored within.", | ||
"RelatedUrl": "https://learn.microsoft.com/en-us/azure/container-registry/container-registry-authentication?tabs=azure-cli#admin-account", | ||
"Remediation": { | ||
"Code": { | ||
"CLI": "az acr update --name <RegistryName> --resource-group <ResourceGroupName> --admin-enabled false", | ||
"NativeIaC": "", | ||
"Other": "", | ||
"Terraform": "" | ||
}, | ||
"Recommendation": { | ||
"Text": "Disable the admin user on Azure Container Registry through the Azure Portal: 1. Navigate to your Container Registry. 2. In the settings, select 'Access keys'. 3. Ensure the 'Admin user' checkbox is not ticked. For all actions relying on registry access, switch to using Role-Based Access Control.", | ||
"Url": "https://learn.microsoft.com/en-us/azure/container-registry/container-registry-authentication?tabs=azure-cli#admin-account" | ||
} | ||
}, | ||
"Categories": [], | ||
"DependsOn": [], | ||
"RelatedTo": [], | ||
"Notes": "The transition away from using the admin user to RBAC will facilitate a more secure and manageable access model, minimizing the potential risk of unauthorized access to your container images." | ||
} |
27 changes: 27 additions & 0 deletions
27
...erregistry/containerregistry_admin_user_disabled/containerregistry_admin_user_disabled.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
from prowler.lib.check.models import Check, Check_Report_Azure | ||
from prowler.providers.azure.services.containerregistry.containerregistry_client import ( | ||
containerregistry_client, | ||
) | ||
|
||
|
||
class containerregistry_admin_user_disabled(Check): | ||
def execute(self) -> list[Check_Report_Azure]: | ||
findings = [] | ||
|
||
for subscription, registries in containerregistry_client.registries.items(): | ||
for registry_id, container_registry_info in registries.items(): | ||
report = Check_Report_Azure(self.metadata()) | ||
report.subscription = subscription | ||
report.resource_name = container_registry_info.name | ||
report.resource_id = registry_id | ||
report.location = container_registry_info.location | ||
report.status = "FAIL" | ||
report.status_extended = f"Container Registry {container_registry_info.name} from subscription {subscription} has its admin user enabled." | ||
|
||
if not container_registry_info.admin_user_enabled: | ||
report.status = "PASS" | ||
report.status_extended = f"Container Registry {container_registry_info.name} from subscription {subscription} has its admin user disabled." | ||
|
||
findings.append(report) | ||
|
||
return findings |
6 changes: 6 additions & 0 deletions
6
prowler/providers/azure/services/containerregistry/containerregistry_client.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
from prowler.providers.azure.services.containerregistry.containerregistry_service import ( | ||
ContainerRegistry, | ||
) | ||
from prowler.providers.common.provider import Provider | ||
|
||
containerregistry_client = ContainerRegistry(Provider.get_global_provider()) |
88 changes: 88 additions & 0 deletions
88
prowler/providers/azure/services/containerregistry/containerregistry_service.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
from dataclasses import dataclass | ||
|
||
from azure.mgmt.containerregistry import ContainerRegistryManagementClient | ||
|
||
from prowler.lib.logger import logger | ||
from prowler.providers.azure.azure_provider import AzureProvider | ||
from prowler.providers.azure.lib.service.service import AzureService | ||
from prowler.providers.azure.services.monitor.monitor_client import monitor_client | ||
from prowler.providers.azure.services.monitor.monitor_service import DiagnosticSetting | ||
|
||
|
||
class ContainerRegistry(AzureService): | ||
def __init__(self, provider: AzureProvider): | ||
super().__init__(ContainerRegistryManagementClient, provider) | ||
self.registries = self._get_container_registries() | ||
|
||
def _get_container_registries(self): | ||
logger.info("Container Registry - Getting registries...") | ||
registries = {} | ||
for subscription, client in self.clients.items(): | ||
try: | ||
registries_list = client.registries.list() | ||
registries.update({subscription: {}}) | ||
|
||
for registry in registries_list: | ||
resource_group = self._get_resource_group(registry.id) | ||
registries[subscription].update( | ||
{ | ||
registry.id: ContainerRegistryInfo( | ||
id=getattr(registry, "id", ""), | ||
name=getattr(registry, "name", ""), | ||
location=getattr(registry, "location", ""), | ||
resource_group=resource_group, | ||
sku=getattr(registry.sku, "name", ""), | ||
login_server=getattr(registry, "login_server", ""), | ||
public_network_access=getattr( | ||
registry, "public_network_access", "" | ||
), | ||
admin_user_enabled=getattr( | ||
registry, "admin_user_enabled", False | ||
), | ||
monitor_diagnostic_settings=self._get_registry_monitor_settings( | ||
registry.name, resource_group, subscription | ||
), | ||
) | ||
} | ||
) | ||
except Exception as error: | ||
logger.error( | ||
f"Subscription name: {subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" | ||
) | ||
return registries | ||
|
||
def _get_resource_group(self, registry_id: str) -> str: | ||
"""Extract resource group from the registry ID.""" | ||
return registry_id.split("/")[4] | ||
|
||
def _get_registry_monitor_settings( | ||
self, registry_name, resource_group, subscription | ||
): | ||
logger.info( | ||
f"Container Registry - Getting monitor diagnostics settings for {registry_name}..." | ||
) | ||
monitor_diagnostics_settings = [] | ||
try: | ||
monitor_diagnostics_settings = monitor_client.diagnostic_settings_with_uri( | ||
self.subscriptions[subscription], | ||
f"subscriptions/{self.subscriptions[subscription]}/resourceGroups/{resource_group}/providers/Microsoft.ContainerRegistry/registries/{registry_name}", | ||
monitor_client.clients[subscription], | ||
) | ||
except Exception as error: | ||
logger.error( | ||
f"Subscription name: {self.subscription} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" | ||
) | ||
return monitor_diagnostics_settings | ||
|
||
|
||
@dataclass | ||
class ContainerRegistryInfo: | ||
id: str | ||
name: str | ||
location: str | ||
resource_group: str | ||
sku: str | ||
login_server: str | ||
public_network_access: str | ||
admin_user_enabled: bool | ||
monitor_diagnostic_settings: list[DiagnosticSetting] = None |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
134 changes: 134 additions & 0 deletions
134
...istry/containerregistry_admin_user_disabled/containerregistry_admin_user_disabled_test.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,134 @@ | ||
from unittest import mock | ||
from unittest.mock import MagicMock | ||
from uuid import uuid4 | ||
|
||
from tests.providers.azure.azure_fixtures import ( | ||
AZURE_SUBSCRIPTION_ID, | ||
set_mocked_azure_provider, | ||
) | ||
|
||
|
||
class TestContainerRegistryAdminUserDisabled: | ||
def test_no_container_registries(self): | ||
containerregistry_client = MagicMock() | ||
containerregistry_client.registries = {} | ||
|
||
with mock.patch( | ||
"prowler.providers.common.provider.Provider.get_global_provider", | ||
return_value=set_mocked_azure_provider(), | ||
), mock.patch( | ||
"prowler.providers.azure.services.containerregistry.containerregistry_admin_user_disabled.containerregistry_admin_user_disabled.containerregistry_client", | ||
new=containerregistry_client, | ||
): | ||
from prowler.providers.azure.services.containerregistry.containerregistry_admin_user_disabled.containerregistry_admin_user_disabled import ( | ||
containerregistry_admin_user_disabled, | ||
) | ||
|
||
check = containerregistry_admin_user_disabled() | ||
result = check.execute() | ||
assert len(result) == 0 | ||
|
||
def test_container_registry_admin_user_enabled(self): | ||
containerregistry_client = MagicMock() | ||
registry_id = str(uuid4()) | ||
|
||
with mock.patch( | ||
"prowler.providers.common.provider.Provider.get_global_provider", | ||
return_value=set_mocked_azure_provider(), | ||
), mock.patch( | ||
"prowler.providers.azure.services.containerregistry.containerregistry_admin_user_disabled.containerregistry_admin_user_disabled.containerregistry_client", | ||
new=containerregistry_client, | ||
): | ||
from prowler.providers.azure.services.containerregistry.containerregistry_admin_user_disabled.containerregistry_admin_user_disabled import ( | ||
containerregistry_admin_user_disabled, | ||
) | ||
from prowler.providers.azure.services.containerregistry.containerregistry_service import ( | ||
ContainerRegistryInfo, | ||
) | ||
|
||
containerregistry_client.registries = { | ||
AZURE_SUBSCRIPTION_ID: { | ||
registry_id: ContainerRegistryInfo( | ||
id=registry_id, | ||
name="mock_registry", | ||
location="westeurope", | ||
resource_group="mock_resource_group", | ||
sku="Basic", | ||
login_server="mock_login_server.azurecr.io", | ||
public_network_access="Enabled", | ||
admin_user_enabled=True, | ||
) | ||
} | ||
} | ||
|
||
check = containerregistry_admin_user_disabled() | ||
|
||
result = check.execute() | ||
assert len(result) == 1 | ||
assert result[0].status == "FAIL" | ||
assert ( | ||
result[0].status_extended | ||
== f"Container Registry mock_registry from subscription {AZURE_SUBSCRIPTION_ID} has its admin user enabled." | ||
) | ||
assert result[0].subscription == AZURE_SUBSCRIPTION_ID | ||
assert result[0].resource_name == "mock_registry" | ||
assert ( | ||
result[0].resource_id | ||
== containerregistry_client.registries[AZURE_SUBSCRIPTION_ID][ | ||
registry_id | ||
].id | ||
) | ||
assert result[0].location == "westeurope" | ||
|
||
def test_container_registry_admin_user_disabled(self): | ||
containerregistry_client = mock.MagicMock() | ||
containerregistry_client.registries = {} | ||
|
||
with mock.patch( | ||
"prowler.providers.common.provider.Provider.get_global_provider", | ||
return_value=set_mocked_azure_provider(), | ||
), mock.patch( | ||
"prowler.providers.azure.services.containerregistry.containerregistry_admin_user_disabled.containerregistry_admin_user_disabled.containerregistry_client", | ||
new=containerregistry_client, | ||
): | ||
from prowler.providers.azure.services.containerregistry.containerregistry_admin_user_disabled.containerregistry_admin_user_disabled import ( | ||
containerregistry_admin_user_disabled, | ||
) | ||
from prowler.providers.azure.services.containerregistry.containerregistry_service import ( | ||
ContainerRegistryInfo, | ||
) | ||
|
||
registry_id = "mock_registry_id" | ||
|
||
containerregistry_client.registries = { | ||
AZURE_SUBSCRIPTION_ID: { | ||
registry_id: ContainerRegistryInfo( | ||
id=registry_id, | ||
name="mock_registry", | ||
location="westeurope", | ||
resource_group="mock_resource_group", | ||
sku="Basic", | ||
login_server="mock_login_server.azurecr.io", | ||
public_network_access="Enabled", | ||
admin_user_enabled=False, | ||
) | ||
} | ||
} | ||
|
||
check = containerregistry_admin_user_disabled() | ||
result = check.execute() | ||
assert len(result) == 1 | ||
assert result[0].status == "PASS" | ||
assert ( | ||
result[0].status_extended | ||
== f"Container Registry mock_registry from subscription {AZURE_SUBSCRIPTION_ID} has its admin user disabled." | ||
) | ||
assert result[0].subscription == AZURE_SUBSCRIPTION_ID | ||
assert result[0].resource_name == "mock_registry" | ||
assert ( | ||
result[0].resource_id | ||
== containerregistry_client.registries[AZURE_SUBSCRIPTION_ID][ | ||
registry_id | ||
].id | ||
) | ||
assert result[0].location == "westeurope" |
87 changes: 87 additions & 0 deletions
87
tests/providers/azure/services/containerregistry/containerregistry_service_test.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,87 @@ | ||
from unittest.mock import MagicMock, patch | ||
from uuid import uuid4 | ||
|
||
from tests.providers.azure.azure_fixtures import ( | ||
AZURE_SUBSCRIPTION_ID, | ||
set_mocked_azure_provider, | ||
) | ||
|
||
|
||
class TestContainerRegistryService: | ||
def test_get_container_registry(self): | ||
with patch( | ||
"prowler.providers.common.provider.Provider.get_global_provider", | ||
return_value=set_mocked_azure_provider(), | ||
), patch( | ||
"prowler.providers.azure.services.monitor.monitor_service.Monitor", | ||
new=MagicMock(), | ||
): | ||
from prowler.providers.azure.services.containerregistry.containerregistry_service import ( | ||
ContainerRegistryInfo, | ||
) | ||
|
||
# Initialize ContainerRegistry with the mocked provider | ||
containerregistry_service = MagicMock() | ||
registry_id = str(uuid4()) | ||
containerregistry_service.registries = { | ||
AZURE_SUBSCRIPTION_ID: { | ||
registry_id: ContainerRegistryInfo( | ||
id=registry_id, | ||
name="mock_registry", | ||
location="westeurope", | ||
resource_group="mock_resource_group", | ||
sku="Basic", | ||
login_server="mock_login_server.azurecr.io", | ||
public_network_access="Enabled", | ||
admin_user_enabled=True, | ||
monitor_diagnostic_settings=[ | ||
{ | ||
"id": "id1/id1", | ||
"logs": [ | ||
{ | ||
"category": "ContainerLogs", | ||
"enabled": True, | ||
}, | ||
{ | ||
"category": "AdminLogs", | ||
"enabled": False, | ||
}, | ||
], | ||
"storage_account_name": "mock_storage_account", | ||
"storage_account_id": "mock_storage_account_id", | ||
"name": "mock_diagnostic_setting", | ||
} | ||
], | ||
) | ||
} | ||
} | ||
|
||
# Assertions to check the populated data in the registries | ||
assert len(containerregistry_service.registries[AZURE_SUBSCRIPTION_ID]) == 1 | ||
|
||
registry_info = containerregistry_service.registries[AZURE_SUBSCRIPTION_ID][ | ||
registry_id | ||
] | ||
|
||
assert registry_info.id == registry_id | ||
assert registry_info.name == "mock_registry" | ||
assert registry_info.location == "westeurope" | ||
assert registry_info.resource_group == "mock_resource_group" | ||
assert registry_info.sku == "Basic" | ||
assert registry_info.login_server == "mock_login_server.azurecr.io" | ||
assert registry_info.public_network_access == "Enabled" | ||
assert registry_info.admin_user_enabled is True | ||
assert isinstance(registry_info.monitor_diagnostic_settings, list) | ||
|
||
# Check the properties of monitor diagnostic settings | ||
monitor_setting = registry_info.monitor_diagnostic_settings[0] | ||
assert monitor_setting["id"] == "id1/id1" # Use dictionary access here | ||
assert monitor_setting["storage_account_name"] == "mock_storage_account" | ||
assert monitor_setting["storage_account_id"] == "mock_storage_account_id" | ||
assert monitor_setting["name"] == "mock_diagnostic_setting" | ||
assert len(monitor_setting["logs"]) == 2 | ||
|
||
assert monitor_setting["logs"][0]["category"] == "ContainerLogs" | ||
assert monitor_setting["logs"][0]["enabled"] is True | ||
assert monitor_setting["logs"][1]["category"] == "AdminLogs" | ||
assert monitor_setting["logs"][1]["enabled"] is False |