Skip to content

Commit

Permalink
fix tests + some linting
Browse files Browse the repository at this point in the history
  • Loading branch information
LuminatiHD committed May 23, 2024
1 parent f9e80f1 commit 0e4c8d1
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 25 deletions.
47 changes: 29 additions & 18 deletions plugins/modules/system_high_availability_settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def check_hasync_node(config: OPNsenseModuleConfig):
When an opnsense instance is created, the hasync block does not exist at all.
This function checks if the opnsense/hasync exists in the tree. If not, it
adds that parent node with the default settings (pfsyncinterface=LAN
remote_system all None)
synchronize_config_to_ip, remote_system_username and remote_system_password all None)
Args:
config (OPNsenseModuleConfig): The configuration for the opnsense firewall
"""
Expand All @@ -116,7 +116,9 @@ def check_hasync_node(config: OPNsenseModuleConfig):
)
# default settings when nothing is selected
synchronize_interface(config, "lan")
remote_system_synchronization(config, None, None, None)
config.set(value=None, setting="synchronize_config_to_ip")
config.set(value=None, setting="remote_system_username")
config.set(value=None, setting="remote_system_password")


def synchronize_states(config: OPNsenseModuleConfig, setting: bool):
Expand All @@ -129,20 +131,22 @@ def synchronize_states(config: OPNsenseModuleConfig, setting: bool):
if setting and config.get("synchronize_states") is None:
config.set(value="on", setting="synchronize_states")
elif not setting and config.get("synchronize_states") is not None:
config._config_xml_tree.find("hasync").remove(config.get("synchronize_states"))
config.get("hasync").remove(config.get("synchronize_states"))


def get_configured_interface_with_descr(config: OPNsenseModuleConfig) -> Dict[str, str]:
def get_configured_interface_with_descr() -> Dict[str, str]:
"""
Get all interfaces that are allowed to be used for synchronize_interface
Args:
config (OPNsenseModuleConfig): The configuration for the opnsense firewall
"""
# https://github.com/opnsense/core/blob/7d212f3e5d9eb2456acf2165987dd850cd78c710/src/etc/inc/util.inc#L822
# load requirements
php_requirements = ["/usr/local/etc/inc/interfaces.inc",
"/usr/local/etc/inc/util.inc",
"/usr/local/etc/inc/config.inc"]
php_requirements = [
"/usr/local/etc/inc/interfaces.inc",
"/usr/local/etc/inc/util.inc",
"/usr/local/etc/inc/config.inc",
]
php_command = """
foreach (get_configured_interface_with_descr() as $key => $item) {
echo $key.':'.$item.',';
Expand Down Expand Up @@ -184,7 +188,7 @@ def synchronize_interface(config: OPNsenseModuleConfig, sync_interface: str):
it will utilize this interface for communication.
"""
interfaces = {"lo0": "Loopback"}
interfaces.update(get_configured_interface_with_descr(config))
interfaces.update(get_configured_interface_with_descr())
for ident, desc in interfaces.items():
if sync_interface.lower() in (ident.lower(), desc.lower()):
config.set(ident, "synchronize_interface")
Expand All @@ -205,7 +209,7 @@ def synchronize_peer_ip(config: OPNsenseModuleConfig, peer_ip: str):
if peer_ip:
config.set(value=peer_ip, setting="synchronize_peer_ip")
elif not peer_ip and config.get("synchronize_peer_ip") is not None:
config._config_xml_tree.find("hasync").remove(config.get("synchronize_peer_ip"))
config.get("hasync").remove(config.get("synchronize_peer_ip"))


def remote_system_synchronization(
Expand All @@ -231,7 +235,7 @@ def remote_system_synchronization(
config.set(value=password, setting="remote_system_password")


def plugins_xmlrpc_sync(config: OPNsenseModuleConfig) -> Dict[str, str]:
def plugins_xmlrpc_sync() -> Dict[str, str]:
"""
Get all services on the firewall which can even be synced
"""
Expand All @@ -252,16 +256,20 @@ def plugins_xmlrpc_sync(config: OPNsenseModuleConfig) -> Dict[str, str]:
# check for stderr
if result.get("stderr"):
raise OPNSenseGetInterfacesError("error encounterd while getting interfaces")
allowed_services = dict(service.split(",") for service in result.get("stdout_lines"))
allowed_services = dict(
service.split(",") for service in result.get("stdout_lines")
)
return allowed_services


def services_get_lookup_name(service: str):
return service.lower().replace("/ ", "").replace(":", "").replace(" ", "_")


def services_to_synchronize(config: OPNsenseModuleConfig, sync_services: List[str]):
allowed_services = plugins_xmlrpc_sync(config)
"""
Handler function for the setting services_to_synchronize.
Args:
config (OPNsenseModuleConfig): The configuration for the opnsense firewall
sync_services (List[str]): A list of services that should be synchronized
"""
allowed_services = plugins_xmlrpc_sync()
if isinstance(sync_services, str):
sync_services = [sync_services]
for service in sync_services:
Expand All @@ -284,8 +292,11 @@ def services_to_synchronize(config: OPNsenseModuleConfig, sync_services: List[st
config.get("hasync").append(xml_elem)
for service_id, service_description in allowed_services.items():
service_xml_elem = config.get("hasync").find(f"synchronize{service_id}")
if (service_id not in sync_services and service_description not in sync_services and
service_xml_elem is not None):
if (
service_id not in sync_services
and service_description not in sync_services
and service_xml_elem is not None
):
config.get("hasync").remove(service_xml_elem)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@

TEST_VERSION_MAP = {
"OPNsense Test": {
"system_high_availability_settings": {
"system_high_availability_settings": {
# Add other mappings here
"hasync": "hasync",
"synchronize_states": "hasync/pfsyncenabled",
Expand All @@ -52,7 +52,7 @@
"/usr/local/etc/inc/util.inc",
"/usr/local/etc/inc/config.inc",
"/usr/local/etc/inc/plugins.inc",
]
],
},
}
}
Expand Down Expand Up @@ -182,7 +182,7 @@ def test_synchronize_interface_failure(
with pytest.raises(OPNSenseGetInterfacesError) as excinfo:
_ = synchronize_interface(sample_config, "LAN")
assert (
"error encounterd while getting interfaces, less than one interface available"
"error encountered while getting interfaces, less than one interface available"
in str(excinfo.value)
)

Expand All @@ -202,7 +202,7 @@ def test_synchronize_interface_success(
):
with pytest.raises(OPNSenseGetInterfacesError) as excinfo:
_ = synchronize_interface(sample_config, "LAN")
assert "error encounterd while getting interfaces" in str(excinfo.value)
assert "error encountered while getting interfaces" in str(excinfo.value)


@patch(
Expand Down Expand Up @@ -240,12 +240,19 @@ def test_remote_system_synchronization(mocked_version_utils: MagicMock, sample_c
"ansible_collections.puzzle.opnsense.plugins.module_utils.opnsense_utils.run_command",
return_value={
"stdout_lines": [
"aliases,Aliases", "authservers,Auth Servers", "captiveportal,Captive Portal", "certs,Certificates"
], "stderr": ""},
"aliases,Aliases",
"authservers,Auth Servers",
"captiveportal,Captive Portal",
"certs,Certificates",
],
"stderr": "",
},
)
@patch.dict(in_dict=VERSION_MAP, values=TEST_VERSION_MAP, clear=True)
@pytest.mark.parametrize("sample_config", [XML_CONFIG], indirect=True)
def test_services_to_synchronize(mocked_version_utils: MagicMock, mocked_command_out: MagicMock, sample_config):
def test_services_to_synchronize(
mocked_version_utils: MagicMock, mocked_command_out: MagicMock, sample_config
):
for _ in range(2):
services = ["Aliases", "Auth Servers", "Captive Portal", "Certificates"]
services_to_synchronize(sample_config, services)
Expand Down

0 comments on commit 0e4c8d1

Please sign in to comment.