diff --git a/sonic-psud/scripts/psud b/sonic-psud/scripts/psud index 287565f67bf5..bb7ed567ff03 100644 --- a/sonic-psud/scripts/psud +++ b/sonic-psud/scripts/psud @@ -1,4 +1,4 @@ -#!/usr/bin/env python2 +#!/usr/bin/env python3 """ psud @@ -9,27 +9,32 @@ The loop interval is PSU_INFO_UPDATE_PERIOD_SECS in seconds. """ -try: - import os - import signal - import sys - import threading - from datetime import datetime - from sonic_py_common import daemon_base, logger - - # If unit testing is occurring, mock swsscommon and module_base - if os.getenv("PSUD_UNIT_TESTING") == "1": - from tests import mock_swsscommon as swsscommon - else: - from swsscommon import swsscommon -except ImportError as e: - raise ImportError(str(e) + " - required module not found") +import os +import signal +import sys +import threading +from datetime import datetime + +from sonic_py_common import daemon_base, logger + +# If unit testing is occurring, mock swsscommon and module_base +if os.getenv("PSUD_UNIT_TESTING") == "1": + from tests.mock_platform import MockPsu as Psu + from tests import mock_swsscommon as swsscommon +else: + from sonic_platform.psu import Psu + from swsscommon import swsscommon # # Constants ==================================================================== # +# TODO: Once we no longer support Python 2, we can eliminate this and get the +# name using the 'name' field (e.g., `signal.SIGINT.name`) starting with Python 3.5 +SIGNALS_TO_NAMES_DICT = dict((getattr(signal, n), n) \ + for n in dir(signal) if n.startswith('SIG') and '_' not in n ) + SYSLOG_IDENTIFIER = "psud" PLATFORM_SPECIFIC_MODULE_NAME = "psuutil" @@ -91,7 +96,7 @@ def _wrapper_get_num_psus(): return platform_psuutil.get_num_psus() -def _wrapper_get_psus_presence(psu_index): +def _wrapper_get_psu_presence(psu_index): if platform_chassis is not None: try: return platform_chassis.get_psu(psu_index - 1).get_presence() @@ -100,7 +105,7 @@ def _wrapper_get_psus_presence(psu_index): return platform_psuutil.get_psu_presence(psu_index) -def _wrapper_get_psus_status(psu_index): +def _wrapper_get_psu_status(psu_index): if platform_chassis is not None: try: return platform_chassis.get_psu(psu_index - 1).get_powergood_status() @@ -120,9 +125,9 @@ def get_psu_key(psu_index): def psu_db_update(psu_tbl, psu_num): for psu_index in range(1, psu_num + 1): fvs = swsscommon.FieldValuePairs([(PSU_INFO_PRESENCE_FIELD, - 'true' if _wrapper_get_psus_presence(psu_index) else 'false'), + 'true' if _wrapper_get_psu_presence(psu_index) else 'false'), (PSU_INFO_STATUS_FIELD, - 'true' if _wrapper_get_psus_status(psu_index) else 'false')]) + 'true' if _wrapper_get_psu_status(psu_index) else 'false')]) psu_tbl.set(get_psu_key(psu_index), fvs) @@ -251,22 +256,20 @@ class PsuChassisInfo(logger.Logger): self.master_status_good = master_status_good - return True - - def _set_psu_master_led(self, master_status): + # Update the PSU master status LED try: - try: - if os.getenv("PSUD_UNIT_TESTING") == "1": - from tests.mock_platform import MockPsu as Psu - else: - from sonic_platform.psu import Psu - except ImportError as e: - raise ImportError(str(e) + " - required module not found") - - color = Psu.STATUS_LED_COLOR_GREEN if master_status else Psu.STATUS_LED_COLOR_RED + color = Psu.STATUS_LED_COLOR_GREEN if master_status_good else Psu.STATUS_LED_COLOR_RED Psu.set_status_master_led(color) - except NotImplementedError as e: - pass + except NotImplementedError: + self.log_warning("set_status_master_led() not implemented") + + log_on_status_changed(self, self.master_status_good, + 'PSU supplied power warning cleared: supplied power is back to normal.', + 'PSU supplied power warning: {}W supplied-power less than {}W consumed-power'.format( + self.total_supplied_power, self.total_consumed_power) + ) + + return True # PSU status =================================================================== # @@ -365,7 +368,7 @@ class DaemonPsud(daemon_base.DaemonBase): self.log_info("Caught SIGTERM - exiting...") self.stop.set() else: - self.log_warning("Caught unhandled signal '" + sig + "'") + self.log_warning("Caught unhandled signal '{}'".format(SIGNALS_TO_NAMES_DICT[sig])) # Run daemon def run(self): @@ -410,9 +413,8 @@ class DaemonPsud(daemon_base.DaemonBase): self.update_psu_data(psu_tbl) self._update_led_color(psu_tbl) - if platform_chassis is not None and platform_chassis.is_modular_chassis(): + if platform_chassis and platform_chassis.is_modular_chassis(): self.update_psu_chassis_info(chassis_tbl) - self.update_master_led_color(chassis_tbl) self.first_run = False @@ -439,7 +441,7 @@ class DaemonPsud(daemon_base.DaemonBase): def _update_single_psu_data(self, index, psu, psu_tbl): name = get_psu_key(index) - presence = _wrapper_get_psus_presence(index) + presence = _wrapper_get_psu_presence(index) power_good = False voltage = None voltage_high_threshold = None @@ -450,7 +452,7 @@ class DaemonPsud(daemon_base.DaemonBase): power = None is_replaceable = try_get(psu.is_replaceable, False) if presence: - power_good = _wrapper_get_psus_status(index) + power_good = _wrapper_get_psu_status(index) voltage = try_get(psu.get_voltage) voltage_high_threshold = try_get(psu.get_voltage_high_threshold) voltage_low_threshold = try_get(psu.get_voltage_low_threshold) @@ -542,17 +544,17 @@ class DaemonPsud(daemon_base.DaemonBase): :return: """ psu_name = get_psu_key(psu_index) - presence = _wrapper_get_psus_presence(psu_index) + presence = _wrapper_get_psu_presence(psu_index) fan_list = psu.get_all_fans() for index, fan in enumerate(fan_list): fan_name = try_get(fan.get_name, '{} FAN {}'.format(psu_name, index + 1)) - direction = try_get(fan.get_direction) if presence else NOT_AVAILABLE - speed = try_get(fan.get_speed) if presence else NOT_AVAILABLE + direction = try_get(fan.get_direction, NOT_AVAILABLE) if presence else NOT_AVAILABLE + speed = try_get(fan.get_speed, NOT_AVAILABLE) if presence else NOT_AVAILABLE status = "True" if presence else "False" fvs = swsscommon.FieldValuePairs( [(FAN_INFO_PRESENCE_FIELD, str(presence)), - (FAN_INFO_STATUS_FIELD, str(status)), - (FAN_INFO_DIRECTION_FIELD, str(direction)), + (FAN_INFO_STATUS_FIELD, status), + (FAN_INFO_DIRECTION_FIELD, direction), (FAN_INFO_SPEED_FIELD, str(speed)), (FAN_INFO_TIMESTAMP_FIELD, datetime.now().strftime('%Y%m%d %H:%M:%S')) ]) @@ -562,22 +564,16 @@ class DaemonPsud(daemon_base.DaemonBase): try: color = psu.STATUS_LED_COLOR_GREEN if psu_status.is_ok() else psu.STATUS_LED_COLOR_RED psu.set_status_led(color) - except NotImplementedError as e: - pass + except NotImplementedError: + self.log_warning("set_status_led() not implemented") def _update_led_color(self, psu_tbl): if not platform_chassis: return for index, psu_status in self.psu_status_dict.items(): - try: - fvs = swsscommon.FieldValuePairs([ - ('led_status', str(try_get(psu_status.psu.get_status_led))) - ]) - except Exception as e: - self.log_warning('Failed to get led status for psu {}'.format(index)) - fvs = swsscommon.FieldValuePairs([ - ('led_status', NOT_AVAILABLE) + fvs = swsscommon.FieldValuePairs([ + ('led_status', str(try_get(psu_status.psu.get_status_led, NOT_AVAILABLE))) ]) psu_tbl.set(get_psu_key(index), fvs) self._update_psu_fan_led_status(psu_status.psu, index) @@ -587,15 +583,9 @@ class DaemonPsud(daemon_base.DaemonBase): fan_list = psu.get_all_fans() for index, fan in enumerate(fan_list): fan_name = try_get(fan.get_name, '{} FAN {}'.format(psu_name, index + 1)) - try: - fvs = swsscommon.FieldValuePairs([ - (FAN_INFO_LED_STATUS_FIELD, str(try_get(fan.get_status_led))) - ]) - except Exception as e: - self.log_warning('Failed to get led status for fan {}'.format(fan_name)) - fvs = swsscommon.FieldValuePairs([ - (FAN_INFO_LED_STATUS_FIELD, NOT_AVAILABLE) - ]) + fvs = swsscommon.FieldValuePairs([ + (FAN_INFO_LED_STATUS_FIELD, str(try_get(fan.get_status_led, NOT_AVAILABLE))) + ]) self.fan_tbl.set(fan_name, fvs) def update_psu_chassis_info(self, chassis_tbl): @@ -606,25 +596,14 @@ class DaemonPsud(daemon_base.DaemonBase): self.psu_chassis_info = PsuChassisInfo(SYSLOG_IDENTIFIER, platform_chassis) self.psu_chassis_info.run_power_budget(chassis_tbl) - - def update_master_led_color(self, chassis_tbl): - if not platform_chassis or not self.psu_chassis_info: - return - - psu_chassis_info = self.psu_chassis_info - if psu_chassis_info.update_master_status(): - log_on_status_changed(self, psu_chassis_info.master_status_good, - 'PSU supplied power warning cleared: supplied power is back to normal.', - 'PSU supplied power warning: {}W supplied-power less than {}W consumed-power'.format( - psu_chassis_info.total_supplied_power, psu_chassis_info.total_consumed_power) - ) - psu_chassis_info._set_psu_master_led(psu_chassis_info.master_status_good) + self.psu_chassis_info.update_master_status() # # Main ========================================================================= # + def main(): psud = DaemonPsud(SYSLOG_IDENTIFIER) psud.run() diff --git a/sonic-psud/tests/mock_platform.py b/sonic-psud/tests/mock_platform.py index 878203037648..d1530d666a66 100644 --- a/sonic-psud/tests/mock_platform.py +++ b/sonic-psud/tests/mock_platform.py @@ -30,17 +30,21 @@ def get_serial(self): class MockPsu(MockDevice): + master_led_color = MockDevice.STATUS_LED_COLOR_OFF - psu_master_led_color = MockDevice.STATUS_LED_COLOR_OFF - - def __init__(self, psu_presence, psu_status, psu_name): - self.name = psu_name + def __init__(self, presence, status, name, position_in_parent): + self.name = name self.presence = True - self.psu_status = psu_status + self.status = status self.status_led_color = self.STATUS_LED_COLOR_OFF + self.position_in_parent = position_in_parent + self._fan_list = [] + + def get_all_fans(self): + return self._fan_list def get_powergood_status(self): - return self.psu_status + return self.status def set_status_led(self, color): self.status_led_color = color @@ -50,7 +54,10 @@ def get_status_led(self): return self.status_led_color def set_status(self, status): - self.psu_status = status + self.status = status + + def get_position_in_parent(self): + return self.position_in_parent def set_maximum_supplied_power(self, supplied_power): self.max_supplied_power = supplied_power @@ -60,11 +67,11 @@ def get_maximum_supplied_power(self): @classmethod def set_status_master_led(cls, color): - cls.psu_master_led_color = color + cls.master_led_color = color @classmethod def get_status_master_led(cls): - return cls.psu_master_led_color + return cls.master_led_color class MockFanDrawer(MockDevice): @@ -86,6 +93,31 @@ def get_maximum_consumed_power(self): return self.max_consumed_power +class MockFan(MockDevice): + FAN_DIRECTION_INTAKE = "intake" + FAN_DIRECTION_EXHAUST = "exhaust" + + def __init__(self, name, direction, speed=50): + self.name = name + self.direction = direction + self.speed = speed + self.status_led_color = self.STATUS_LED_COLOR_OFF + + def get_direction(self): + return self.direction + + def get_speed(self): + return self.speed + + def set_status_led(self, color): + self.status_led_color = color + return True + + def get_status_led(self): + return self.status_led_color + + + class MockModule(MockDevice): def __init__(self, module_presence, module_status, module_name): self.name = module_name @@ -106,7 +138,6 @@ def get_maximum_consumed_power(self): class MockChassis: - def __init__(self): self.psu_list = [] self.fan_drawer_list = [] diff --git a/sonic-psud/tests/mock_swsscommon.py b/sonic-psud/tests/mock_swsscommon.py index 7ceabf43724d..ba8c9d1b262f 100644 --- a/sonic-psud/tests/mock_swsscommon.py +++ b/sonic-psud/tests/mock_swsscommon.py @@ -20,9 +20,28 @@ def get(self, key): return None -class FieldValuePairs(dict): - def __init__(self, len): - self.fv_dict = {} +class FieldValuePairs: + fv_dict = {} - def __setitem__(self, key, val_tuple): - self.fv_dict[val_tuple[0]] = val_tuple[1] + def __init__(self, tuple_list): + if isinstance(tuple_list, list) and isinstance(tuple_list[0], tuple): + self.fv_dict = dict(tuple_list) + + def __setitem__(self, key, kv_tuple): + self.fv_dict[kv_tuple[0]] = kv_tuple[1] + + def __getitem__(self, key): + return self.fv_dict[key] + + def __eq__(self, other): + if not isinstance(other, FieldValuePairs): + # don't attempt to compare against unrelated types + return NotImplemented + + return self.fv_dict == other.fv_dict + + def __repr__(self): + return repr(self.fv_dict) + + def __str__(self): + return repr(self.fv_dict) diff --git a/sonic-psud/tests/test_DaemonPsud.py b/sonic-psud/tests/test_DaemonPsud.py new file mode 100644 index 000000000000..c905686e490b --- /dev/null +++ b/sonic-psud/tests/test_DaemonPsud.py @@ -0,0 +1,352 @@ +import datetime +import os +import sys +from imp import load_source + +import pytest + +# TODO: Clean this up once we no longer need to support Python 2 +if sys.version_info.major == 3: + from unittest import mock +else: + import mock +from sonic_py_common import daemon_base + +from . import mock_platform +from . import mock_swsscommon + +SYSLOG_IDENTIFIER = 'psud_test' +NOT_AVAILABLE = 'N/A' + +daemon_base.db_connect = mock.MagicMock() + +test_path = os.path.dirname(os.path.abspath(__file__)) +modules_path = os.path.dirname(test_path) +scripts_path = os.path.join(modules_path, "scripts") +sys.path.insert(0, modules_path) + +os.environ["PSUD_UNIT_TESTING"] = "1" +load_source('psud', scripts_path + '/psud') +import psud + +class TestDaemonPsud(object): + """ + Test cases to cover functionality in DaemonPsud class + """ + + def test_signal_handler(self): + daemon_psud = psud.DaemonPsud(SYSLOG_IDENTIFIER) + daemon_psud.stop.set = mock.MagicMock() + daemon_psud.log_info = mock.MagicMock() + daemon_psud.log_warning = mock.MagicMock() + + # Test SIGHUP + daemon_psud.signal_handler(psud.signal.SIGHUP, None) + assert daemon_psud.log_info.call_count == 1 + daemon_psud.log_info.assert_called_with("Caught SIGHUP - ignoring...") + assert daemon_psud.log_warning.call_count == 0 + assert daemon_psud.stop.set.call_count == 0 + + # Test SIGINT + daemon_psud.log_info.reset_mock() + daemon_psud.log_warning.reset_mock() + daemon_psud.stop.set.reset_mock() + daemon_psud.signal_handler(psud.signal.SIGINT, None) + assert daemon_psud.log_info.call_count == 1 + daemon_psud.log_info.assert_called_with("Caught SIGINT - exiting...") + assert daemon_psud.log_warning.call_count == 0 + assert daemon_psud.stop.set.call_count == 1 + + # Test SIGTERM + daemon_psud.log_info.reset_mock() + daemon_psud.log_warning.reset_mock() + daemon_psud.stop.set.reset_mock() + daemon_psud.signal_handler(psud.signal.SIGTERM, None) + assert daemon_psud.log_info.call_count == 1 + daemon_psud.log_info.assert_called_with("Caught SIGTERM - exiting...") + assert daemon_psud.log_warning.call_count == 0 + assert daemon_psud.stop.set.call_count == 1 + + # Test an unhandled signal + daemon_psud.log_info.reset_mock() + daemon_psud.log_warning.reset_mock() + daemon_psud.stop.set.reset_mock() + daemon_psud.signal_handler(psud.signal.SIGUSR1, None) + assert daemon_psud.log_warning.call_count == 1 + daemon_psud.log_warning.assert_called_with("Caught unhandled signal 'SIGUSR1'") + assert daemon_psud.log_info.call_count == 0 + assert daemon_psud.stop.set.call_count == 0 + + def test_update_psu_data(self): + mock_psu1 = mock_platform.MockPsu(True, True, "PSU 1", 0) + mock_psu2 = mock_platform.MockPsu(True, True, "PSU 2", 1) + mock_psu_tbl = mock.MagicMock() + + daemon_psud = psud.DaemonPsud(SYSLOG_IDENTIFIER) + daemon_psud._update_single_psu_data = mock.MagicMock() + daemon_psud.log_warning = mock.MagicMock() + + # Test platform_chassis is None + psud.platform_chassis = None + daemon_psud.update_psu_data(mock_psu_tbl) + assert daemon_psud._update_single_psu_data.call_count == 0 + assert daemon_psud.log_warning.call_count == 0 + + # Test with mocked platform_chassis + psud.platform_chassis = mock_platform.MockChassis() + psud.platform_chassis.psu_list = [mock_psu1, mock_psu2] + daemon_psud.update_psu_data(mock_psu_tbl) + assert daemon_psud._update_single_psu_data.call_count == 2 + daemon_psud._update_single_psu_data.assert_called_with(2, mock_psu2, mock_psu_tbl) + assert daemon_psud.log_warning.call_count == 0 + daemon_psud.log_warning = mock.MagicMock() + + daemon_psud._update_single_psu_data.reset_mock() + + # Test _update_single_psu_data() throws exception + daemon_psud._update_single_psu_data.side_effect = Exception("Test message") + daemon_psud.update_psu_data(mock_psu_tbl) + assert daemon_psud._update_single_psu_data.call_count == 2 + daemon_psud._update_single_psu_data.assert_called_with(2, mock_psu2, mock_psu_tbl) + assert daemon_psud.log_warning.call_count == 2 + daemon_psud.log_warning.assert_called_with("Failed to update PSU data - Test message") + + def test_set_psu_led(self): + mock_logger = mock.MagicMock() + mock_psu = mock_platform.MockPsu(True, True, "PSU 1", 0) + psu_status = psud.PsuStatus(mock_logger, mock_psu) + + daemon_psud = psud.DaemonPsud(SYSLOG_IDENTIFIER) + + psu_status.presence = True + psu_status.power_good = True + psu_status.voltage_good = True + psu_status.temperature_good = True + daemon_psud._set_psu_led(mock_psu, psu_status) + assert mock_psu.get_status_led() == mock_psu.STATUS_LED_COLOR_GREEN + + psu_status.presence = False + daemon_psud._set_psu_led(mock_psu, psu_status) + assert mock_psu.get_status_led() == mock_psu.STATUS_LED_COLOR_RED + + psu_status.presence = True + daemon_psud._set_psu_led(mock_psu, psu_status) + assert mock_psu.get_status_led() == mock_psu.STATUS_LED_COLOR_GREEN + + psu_status.power_good = False + daemon_psud._set_psu_led(mock_psu, psu_status) + assert mock_psu.get_status_led() == mock_psu.STATUS_LED_COLOR_RED + + psu_status.power_good = True + daemon_psud._set_psu_led(mock_psu, psu_status) + assert mock_psu.get_status_led() == mock_psu.STATUS_LED_COLOR_GREEN + + psu_status.voltage_good = False + daemon_psud._set_psu_led(mock_psu, psu_status) + assert mock_psu.get_status_led() == mock_psu.STATUS_LED_COLOR_RED + + psu_status.voltage_good = True + daemon_psud._set_psu_led(mock_psu, psu_status) + assert mock_psu.get_status_led() == mock_psu.STATUS_LED_COLOR_GREEN + + psu_status.temperature_good = False + daemon_psud._set_psu_led(mock_psu, psu_status) + assert mock_psu.get_status_led() == mock_psu.STATUS_LED_COLOR_RED + + psu_status.temperature_good = True + daemon_psud._set_psu_led(mock_psu, psu_status) + assert mock_psu.get_status_led() == mock_psu.STATUS_LED_COLOR_GREEN + + # Test set_status_led not implemented + mock_psu.set_status_led = mock.MagicMock(side_effect = NotImplementedError) + daemon_psud.log_warning = mock.MagicMock() + daemon_psud._set_psu_led(mock_psu, psu_status) + assert daemon_psud.log_warning.call_count == 1 + daemon_psud.log_warning.assert_called_with("set_status_led() not implemented") + + def test_update_led_color(self): + mock_psu = mock_platform.MockPsu(True, True, "PSU 1", 0) + mock_psu_tbl = mock.MagicMock() + mock_logger = mock.MagicMock() + psu_status = psud.PsuStatus(mock_logger, mock_psu) + + daemon_psud = psud.DaemonPsud(SYSLOG_IDENTIFIER) + daemon_psud._update_psu_fan_led_status = mock.MagicMock() + + # If psud.platform_chassis is None, _update_psu_fan_led_status() should do nothing + psud.platform_chassis = None + daemon_psud._update_led_color(mock_psu_tbl) + assert mock_psu_tbl.set.call_count == 0 + assert daemon_psud._update_psu_fan_led_status.call_count == 0 + + psud.platform_chassis = mock_platform.MockChassis() + daemon_psud.psu_status_dict[1] = psu_status + expected_fvp = psud.swsscommon.FieldValuePairs([('led_status', mock_platform.MockPsu.STATUS_LED_COLOR_OFF)]) + daemon_psud._update_led_color(mock_psu_tbl) + assert mock_psu_tbl.set.call_count == 1 + mock_psu_tbl.set.assert_called_with(psud.PSU_INFO_KEY_TEMPLATE.format(1), expected_fvp) + assert daemon_psud._update_psu_fan_led_status.call_count == 1 + daemon_psud._update_psu_fan_led_status.assert_called_with(mock_psu, 1) + + mock_psu_tbl.set.reset_mock() + daemon_psud._update_psu_fan_led_status.reset_mock() + + mock_psu.set_status_led(mock_platform.MockPsu.STATUS_LED_COLOR_GREEN) + expected_fvp = psud.swsscommon.FieldValuePairs([('led_status', mock_platform.MockPsu.STATUS_LED_COLOR_GREEN)]) + daemon_psud._update_led_color(mock_psu_tbl) + assert mock_psu_tbl.set.call_count == 1 + mock_psu_tbl.set.assert_called_with(psud.PSU_INFO_KEY_TEMPLATE.format(1), expected_fvp) + assert daemon_psud._update_psu_fan_led_status.call_count == 1 + daemon_psud._update_psu_fan_led_status.assert_called_with(mock_psu, 1) + + mock_psu_tbl.set.reset_mock() + daemon_psud._update_psu_fan_led_status.reset_mock() + + mock_psu.set_status_led(mock_platform.MockPsu.STATUS_LED_COLOR_RED) + expected_fvp = psud.swsscommon.FieldValuePairs([('led_status', mock_platform.MockPsu.STATUS_LED_COLOR_RED)]) + daemon_psud._update_led_color(mock_psu_tbl) + assert mock_psu_tbl.set.call_count == 1 + mock_psu_tbl.set.assert_called_with(psud.PSU_INFO_KEY_TEMPLATE.format(1), expected_fvp) + assert daemon_psud._update_psu_fan_led_status.call_count == 1 + daemon_psud._update_psu_fan_led_status.assert_called_with(mock_psu, 1) + + mock_psu_tbl.set.reset_mock() + daemon_psud._update_psu_fan_led_status.reset_mock() + + # Test exception handling + mock_psu.get_status_led = mock.Mock(side_effect = NotImplementedError) + expected_fvp = psud.swsscommon.FieldValuePairs([('led_status', psud.NOT_AVAILABLE)]) + daemon_psud._update_led_color(mock_psu_tbl) + assert mock_psu_tbl.set.call_count == 1 + mock_psu_tbl.set.assert_called_with(psud.PSU_INFO_KEY_TEMPLATE.format(1), expected_fvp) + assert daemon_psud._update_psu_fan_led_status.call_count == 1 + daemon_psud._update_psu_fan_led_status.assert_called_with(mock_psu, 1) + + def test_update_psu_fan_led_status(self): + mock_fan = mock_platform.MockFan("PSU 1 Test Fan 1", mock_platform.MockFan.FAN_DIRECTION_INTAKE) + mock_psu = mock_platform.MockPsu(True, True, "PSU 1", 0) + mock_psu._fan_list = [mock_fan] + mock_logger = mock.MagicMock() + + psud.platform_chassis = mock_platform.MockChassis() + + daemon_psud = psud.DaemonPsud(SYSLOG_IDENTIFIER) + daemon_psud.fan_tbl = mock.MagicMock() + + expected_fvp = psud.swsscommon.FieldValuePairs([(psud.FAN_INFO_LED_STATUS_FIELD, mock_platform.MockFan.STATUS_LED_COLOR_OFF)]) + daemon_psud._update_psu_fan_led_status(mock_psu, 1) + assert daemon_psud.fan_tbl.set.call_count == 1 + daemon_psud.fan_tbl.set.assert_called_with("PSU 1 Test Fan 1", expected_fvp) + + daemon_psud.fan_tbl.set.reset_mock() + + # Test Fan.get_status_led not implemented + mock_fan.get_status_led = mock.Mock(side_effect = NotImplementedError) + expected_fvp = psud.swsscommon.FieldValuePairs([(psud.FAN_INFO_LED_STATUS_FIELD, psud.NOT_AVAILABLE)]) + daemon_psud._update_psu_fan_led_status(mock_psu, 1) + assert daemon_psud.fan_tbl.set.call_count == 1 + daemon_psud.fan_tbl.set.assert_called_with("PSU 1 Test Fan 1", expected_fvp) + + daemon_psud.fan_tbl.set.reset_mock() + + # Test Fan.get_name not implemented + mock_fan.get_name = mock.Mock(side_effect = NotImplementedError) + daemon_psud._update_psu_fan_led_status(mock_psu, 1) + assert daemon_psud.fan_tbl.set.call_count == 1 + daemon_psud.fan_tbl.set.assert_called_with("PSU 1 FAN 1", expected_fvp) + + @mock.patch('psud.PsuChassisInfo', mock.MagicMock()) + def test_update_psu_chassis_info(self): + daemon_psud = psud.DaemonPsud(SYSLOG_IDENTIFIER) + + # If psud.platform_chassis is None, update_psu_chassis_info() should do nothing + psud.platform_chassis = None + daemon_psud.psu_chassis_info = None + daemon_psud.update_psu_chassis_info(None) + assert daemon_psud.psu_chassis_info is None + + # Now we mock platform_chassis, so that daemon_psud.psu_chassis_info should be instantiated and run_power_budget() should be called + psud.platform_chassis = mock_platform.MockChassis() + daemon_psud.update_psu_chassis_info(None) + assert daemon_psud.psu_chassis_info is not None + assert daemon_psud.psu_chassis_info.run_power_budget.call_count == 1 + daemon_psud.psu_chassis_info.run_power_budget.assert_called_with(None) + + def test_update_psu_entity_info(self): + mock_psu1 = mock_platform.MockPsu(True, True, "PSU 1", 0) + mock_psu2 = mock_platform.MockPsu(True, True, "PSU 2", 1) + + daemon_psud = psud.DaemonPsud(SYSLOG_IDENTIFIER) + daemon_psud._update_single_psu_entity_info = mock.MagicMock() + + # If psud.platform_chassis is None, _update_psu_entity_info() should do nothing + psud.platform_chassis = None + daemon_psud._update_psu_entity_info() + assert daemon_psud._update_single_psu_entity_info.call_count == 0 + + psud.platform_chassis = mock_platform.MockChassis() + psud.platform_chassis.psu_list = [mock_psu1] + daemon_psud._update_psu_entity_info() + assert daemon_psud._update_single_psu_entity_info.call_count == 1 + daemon_psud._update_single_psu_entity_info.assert_called_with(1, mock_psu1) + + daemon_psud._update_single_psu_entity_info.reset_mock() + psud.platform_chassis.psu_list = [mock_psu1, mock_psu2] + daemon_psud._update_psu_entity_info() + assert daemon_psud._update_single_psu_entity_info.call_count == 2 + daemon_psud._update_single_psu_entity_info.assert_called_with(2, mock_psu2) + + # Test behavior if _update_single_psu_entity_info raises an exception + daemon_psud._update_single_psu_entity_info.reset_mock() + daemon_psud._update_single_psu_entity_info.side_effect = Exception("Test message") + daemon_psud.log_warning = mock.MagicMock() + psud.platform_chassis.psu_list = [mock_psu1] + daemon_psud._update_psu_entity_info() + assert daemon_psud._update_single_psu_entity_info.call_count == 1 + daemon_psud._update_single_psu_entity_info.assert_called_with(1, mock_psu1) + assert daemon_psud.log_warning.call_count == 1 + daemon_psud.log_warning.assert_called_with("Failed to update PSU data - Test message") + + @mock.patch('psud.try_get', mock.MagicMock(return_value=0)) + def test_update_single_psu_entity_info(self): + mock_psu1 = mock_platform.MockPsu(True, True, "PSU 1", 0) + + expected_fvp = psud.swsscommon.FieldValuePairs( + [('position_in_parent', '0'), + ('parent_name', psud.CHASSIS_INFO_KEY), + ]) + + daemon_psud = psud.DaemonPsud(SYSLOG_IDENTIFIER) + daemon_psud.phy_entity_tbl = mock.MagicMock() + + daemon_psud._update_single_psu_entity_info(0, mock_psu1) + daemon_psud.phy_entity_tbl.set.assert_called_with('PSU 0', expected_fvp) + + @mock.patch('psud.datetime') + def test_update_psu_fan_data(self, mock_datetime): + fake_time = datetime.datetime(2021, 1, 1, 12, 34, 56) + mock_datetime.now.return_value = fake_time + + mock_fan = mock_platform.MockFan("PSU 1 Test Fan 1", mock_platform.MockFan.FAN_DIRECTION_INTAKE) + mock_psu1 = mock_platform.MockPsu(True, True, "PSU 1", 0) + mock_psu1._fan_list = [mock_fan] + mock_logger = mock.MagicMock() + + psud.platform_chassis = mock_platform.MockChassis() + psud.platform_chassis.psu_list = [mock_psu1] + + daemon_psud = psud.DaemonPsud(SYSLOG_IDENTIFIER) + daemon_psud.fan_tbl = mock.MagicMock() + + expected_fvp = psud.swsscommon.FieldValuePairs( + [(psud.FAN_INFO_PRESENCE_FIELD, "True"), + (psud.FAN_INFO_STATUS_FIELD, "True"), + (psud.FAN_INFO_DIRECTION_FIELD, mock_fan.get_direction()), + (psud.FAN_INFO_SPEED_FIELD, str(mock_fan.get_speed())), + (psud.FAN_INFO_TIMESTAMP_FIELD, fake_time.strftime('%Y%m%d %H:%M:%S')) + ]) + daemon_psud._update_psu_fan_data(mock_psu1, 1) + assert daemon_psud.fan_tbl.set.call_count == 1 + daemon_psud.fan_tbl.set.assert_called_with("PSU 1 Test Fan 1", expected_fvp) + + daemon_psud.fan_tbl.set.reset_mock() diff --git a/sonic-psud/tests/test_PsuChassisInfo.py b/sonic-psud/tests/test_PsuChassisInfo.py new file mode 100644 index 000000000000..2757f3e557bb --- /dev/null +++ b/sonic-psud/tests/test_PsuChassisInfo.py @@ -0,0 +1,265 @@ +import os +import sys +from imp import load_source + +import pytest + +# TODO: Clean this up once we no longer need to support Python 2 +if sys.version_info.major == 3: + from unittest import mock +else: + import mock +from sonic_py_common import daemon_base + +from . import mock_swsscommon +from .mock_platform import MockChassis, MockPsu, MockFanDrawer, MockModule + +SYSLOG_IDENTIFIER = 'test_PsuChassisInfo' +NOT_AVAILABLE = 'N/A' + +daemon_base.db_connect = mock.MagicMock() + +test_path = os.path.dirname(os.path.abspath(__file__)) +modules_path = os.path.dirname(test_path) +scripts_path = os.path.join(modules_path, "scripts") +sys.path.insert(0, modules_path) + +os.environ["PSUD_UNIT_TESTING"] = "1" +load_source('psud', scripts_path + '/psud') +import psud + +CHASSIS_INFO_TABLE = 'CHASSIS_INFO' +CHASSIS_INFO_KEY_TEMPLATE = 'chassis {}' + +CHASSIS_INFO_POWER_CONSUMER_FIELD = 'Consumed Power {}' +CHASSIS_INFO_POWER_SUPPLIER_FIELD = 'Supplied Power {}' +CHASSIS_INFO_TOTAL_POWER_CONSUMED_FIELD = 'Total Consumed Power' +CHASSIS_INFO_TOTAL_POWER_SUPPLIED_FIELD = 'Total Supplied Power' +CHASSIS_INFO_POWER_KEY_TEMPLATE = 'chassis_power_budget {}' + + +@pytest.fixture(scope="class") +def mock_log_methods(): + psud.PsuChassisInfo.log_notice = mock.MagicMock() + psud.PsuChassisInfo.log_warning = mock.MagicMock() + yield + psud.PsuChassisInfo.log_notice.reset() + psud.PsuChassisInfo.log_warning.reset() + + +@pytest.mark.usefixtures("mock_log_methods") +class TestPsuChassisInfo(object): + """ + Test cases to cover functionality in PsuChassisInfo class + """ + def test_update_master_status(self): + chassis = MockChassis() + chassis_info = psud.PsuChassisInfo(SYSLOG_IDENTIFIER, chassis) + + # Test good values while in bad state + chassis_info.total_supplied_power = 510.0 + chassis_info.total_consumed_power = 350.0 + chassis_info.master_status_good = False + ret = chassis_info.update_master_status() + assert ret == True + assert chassis_info.master_status_good == True + + # Test good values while in good state + ret = chassis_info.update_master_status() + assert ret == False + assert chassis_info.master_status_good == True + + # Test unknown total_supplied_power (0.0) + chassis_info.total_supplied_power = 0.0 + chassis_info.master_status_good = False + ret = chassis_info.update_master_status() + assert ret == False + assert chassis_info.master_status_good == True + + # Test unknown total_consumed_power (0.0) + chassis_info.total_supplied_power = 510.0 + chassis_info.total_consumed_power = 0.0 + chassis_info.master_status_good = False + ret = chassis_info.update_master_status() + assert ret == False + assert chassis_info.master_status_good == True + + # Test bad values while in good state + chassis_info.total_supplied_power = 300.0 + chassis_info.total_consumed_power = 350.0 + chassis_info.master_status_good = True + ret = chassis_info.update_master_status() + assert ret == True + assert chassis_info.master_status_good == False + + # Test bad values while in good state + ret = chassis_info.update_master_status() + assert ret == False + assert chassis_info.master_status_good == False + + # Test set_status_master_led not implemented + with mock.patch.object(MockPsu, "set_status_master_led", mock.Mock(side_effect = NotImplementedError)): + chassis_info.total_supplied_power = 510.0 + chassis_info.total_consumed_power = 350.0 + chassis_info.master_status_good = False + chassis_info.log_warning = mock.MagicMock() + ret = chassis_info.update_master_status() + assert ret == True + assert chassis_info.master_status_good == True + chassis_info.log_warning.assert_called_with("set_status_master_led() not implemented") + + def test_supplied_power(self): + chassis = MockChassis() + psu1 = MockPsu(True, True, "PSU 1", 0) + psu1_power = 510.0 + psu1.set_maximum_supplied_power(psu1_power) + chassis.psu_list.append(psu1) + + psu2 = MockPsu(True, True, "PSU 2", 1) + psu2_power = 800.0 + psu2.set_maximum_supplied_power(psu2_power) + chassis.psu_list.append(psu2) + + psu3 = MockPsu(True, True, "PSU 3", 2) + psu3_power = 350.0 + psu3.set_maximum_supplied_power(psu3_power) + chassis.psu_list.append(psu3) + + total_power = psu1_power + psu2_power + psu3_power + state_db = daemon_base.db_connect("STATE_DB") + chassis_tbl = mock_swsscommon.Table(state_db, CHASSIS_INFO_TABLE) + chassis_info = psud.PsuChassisInfo(SYSLOG_IDENTIFIER, chassis) + chassis_info.run_power_budget(chassis_tbl) + fvs = chassis_tbl.get(CHASSIS_INFO_POWER_KEY_TEMPLATE.format(1)) + + # Check if supplied power is recorded in DB + assert total_power == float(fvs[CHASSIS_INFO_TOTAL_POWER_SUPPLIED_FIELD]) + + # Check if psu1 is not present + psu1.set_presence(False) + total_power = psu2_power + psu3_power + chassis_info.run_power_budget(chassis_tbl) + fvs = chassis_tbl.get(CHASSIS_INFO_POWER_KEY_TEMPLATE.format(1)) + assert total_power == float(fvs[CHASSIS_INFO_TOTAL_POWER_SUPPLIED_FIELD]) + + # Check if psu2 status is NOT_OK + psu2.set_status(False) + total_power = psu3_power + chassis_info.run_power_budget(chassis_tbl) + fvs = chassis_tbl.get(CHASSIS_INFO_POWER_KEY_TEMPLATE.format(1)) + assert total_power == float(fvs[CHASSIS_INFO_TOTAL_POWER_SUPPLIED_FIELD]) + + def test_consumed_power(self): + chassis = MockChassis() + fan_drawer1 = MockFanDrawer(True, True, "FanDrawer 1") + fan_drawer1_power = 510.0 + fan_drawer1.set_maximum_consumed_power(fan_drawer1_power) + chassis.fan_drawer_list.append(fan_drawer1) + + module1 = MockFanDrawer(True, True, "Module 1") + module1_power = 700.0 + module1.set_maximum_consumed_power(module1_power) + chassis.module_list.append(module1) + + total_power = fan_drawer1_power + module1_power + state_db = daemon_base.db_connect("STATE_DB") + chassis_tbl = mock_swsscommon.Table(state_db, CHASSIS_INFO_TABLE) + chassis_info = psud.PsuChassisInfo(SYSLOG_IDENTIFIER, chassis) + chassis_info.run_power_budget(chassis_tbl) + fvs = chassis_tbl.get(CHASSIS_INFO_POWER_KEY_TEMPLATE.format(1)) + + # Check if supplied power is recorded in DB + assert total_power == float(fvs[CHASSIS_INFO_TOTAL_POWER_CONSUMED_FIELD]) + + # Check if fan_drawer1 present + fan_drawer1.set_presence(False) + total_power = module1_power + chassis_info.run_power_budget(chassis_tbl) + fvs = chassis_tbl.get(CHASSIS_INFO_POWER_KEY_TEMPLATE.format(1)) + assert total_power == float(fvs[CHASSIS_INFO_TOTAL_POWER_CONSUMED_FIELD]) + + # Check if module1 present + fan_drawer1.set_presence(True) + module1.set_presence(False) + total_power = fan_drawer1_power + chassis_info.run_power_budget(chassis_tbl) + fvs = chassis_tbl.get(CHASSIS_INFO_POWER_KEY_TEMPLATE.format(1)) + assert total_power == float(fvs[CHASSIS_INFO_TOTAL_POWER_CONSUMED_FIELD]) + + + def test_power_budget(self): + chassis = MockChassis() + psu = MockPsu(True, True, "PSU 1", 0) + psu1_power = 510.0 + psu.set_maximum_supplied_power(psu1_power) + chassis.psu_list.append(psu) + + fan_drawer1 = MockFanDrawer(True, True, "FanDrawer 1") + fan_drawer1_power = 510.0 + fan_drawer1.set_maximum_consumed_power(fan_drawer1_power) + chassis.fan_drawer_list.append(fan_drawer1) + + module1 = MockFanDrawer(True, True, "Module 1") + module1_power = 700.0 + module1.set_maximum_consumed_power(module1_power) + chassis.module_list.append(module1) + + state_db = daemon_base.db_connect("STATE_DB") + chassis_tbl = mock_swsscommon.Table(state_db, CHASSIS_INFO_TABLE) + chassis_info = psud.PsuChassisInfo(SYSLOG_IDENTIFIER, chassis) + + # Check if supplied_power < consumed_power + chassis_info.run_power_budget(chassis_tbl) + chassis_info.update_master_status() + fvs = chassis_tbl.get(CHASSIS_INFO_POWER_KEY_TEMPLATE.format(1)) + + assert float(fvs[CHASSIS_INFO_TOTAL_POWER_SUPPLIED_FIELD]) < float(fvs[CHASSIS_INFO_TOTAL_POWER_CONSUMED_FIELD]) + assert chassis_info.master_status_good == False + assert MockPsu.get_status_master_led() == MockPsu.STATUS_LED_COLOR_RED + + # Add a PSU + psu = MockPsu(True, True, "PSU 2", 1) + psu2_power = 800.0 + psu.set_maximum_supplied_power(psu2_power) + chassis.psu_list.append(psu) + + # Check if supplied_power > consumed_power + chassis_info.run_power_budget(chassis_tbl) + chassis_info.update_master_status() + fvs = chassis_tbl.get(CHASSIS_INFO_POWER_KEY_TEMPLATE.format(1)) + + assert float(fvs[CHASSIS_INFO_TOTAL_POWER_SUPPLIED_FIELD]) > float(fvs[CHASSIS_INFO_TOTAL_POWER_CONSUMED_FIELD]) + assert chassis_info.master_status_good == True + assert MockPsu.get_status_master_led() == MockPsu.STATUS_LED_COLOR_GREEN + + + def test_get_psu_key(self): + assert psud.get_psu_key(0) == psud.PSU_INFO_KEY_TEMPLATE.format(0) + assert psud.get_psu_key(1) == psud.PSU_INFO_KEY_TEMPLATE.format(1) + + + def test_try_get(self): + # Test a proper, working callback + GOOD_CALLBACK_RETURN_VALUE = "This is a test" + + def callback1(): + return GOOD_CALLBACK_RETURN_VALUE + + ret = psud.try_get(callback1) + assert ret == GOOD_CALLBACK_RETURN_VALUE + + # Ensure try_get returns default value if callback returns None + DEFAULT_VALUE = "Default value" + + def callback2(): + return None + + ret = psud.try_get(callback2, default=DEFAULT_VALUE) + assert ret == DEFAULT_VALUE + + # Ensure try_get returns default value if callback returns None + def callback3(): + raise NotImplementedError + + ret = psud.try_get(callback3, default=DEFAULT_VALUE) + assert ret == DEFAULT_VALUE diff --git a/sonic-psud/tests/test_PsuStatus.py b/sonic-psud/tests/test_PsuStatus.py index e96da0c9398c..e55384d8d239 100644 --- a/sonic-psud/tests/test_PsuStatus.py +++ b/sonic-psud/tests/test_PsuStatus.py @@ -4,9 +4,9 @@ # TODO: Clean this up once we no longer need to support Python 2 if sys.version_info.major == 3: - from unittest.mock import MagicMock + from unittest import mock else: - from mock import MagicMock + import mock from .mock_platform import MockPsu @@ -17,7 +17,7 @@ os.environ["PSUD_UNIT_TESTING"] = "1" load_source('psud', scripts_path + '/psud') -from psud import * +import psud class TestPsuStatus(object): @@ -26,10 +26,10 @@ class TestPsuStatus(object): """ def test_set_presence(self): - mock_logger = MagicMock() - mock_psu = MockPsu(True, True, "PSU 1") + mock_logger = mock.MagicMock() + mock_psu = MockPsu(True, True, "PSU 1", 0) - psu_status = PsuStatus(mock_logger, mock_psu) + psu_status = psud.PsuStatus(mock_logger, mock_psu) assert psu_status.presence == False # Test toggling presence to True @@ -48,10 +48,10 @@ def test_set_presence(self): assert psu_status.presence == False def test_set_power_good(self): - mock_logger = MagicMock() - mock_psu = MockPsu(True, True, "PSU 1") + mock_logger = mock.MagicMock() + mock_psu = MockPsu(True, True, "PSU 1", 0) - psu_status = PsuStatus(mock_logger, mock_psu) + psu_status = psud.PsuStatus(mock_logger, mock_psu) assert psu_status.power_good == False # Test toggling power_good to True @@ -75,10 +75,10 @@ def test_set_power_good(self): assert psu_status.power_good == False def test_set_voltage(self): - mock_logger = MagicMock() - mock_psu = MockPsu(True, True, "PSU 1") + mock_logger = mock.MagicMock() + mock_psu = MockPsu(True, True, "PSU 1", 0) - psu_status = PsuStatus(mock_logger, mock_psu) + psu_status = psud.PsuStatus(mock_logger, mock_psu) assert psu_status.voltage_good == False # Pass in a good voltage @@ -142,10 +142,10 @@ def test_set_voltage(self): assert psu_status.voltage_good == True def test_set_temperature(self): - mock_logger = MagicMock() - mock_psu = MockPsu(True, True, "PSU 1") + mock_logger = mock.MagicMock() + mock_psu = MockPsu(True, True, "PSU 1", 0) - psu_status = PsuStatus(mock_logger, mock_psu) + psu_status = psud.PsuStatus(mock_logger, mock_psu) assert psu_status.temperature_good == False # Pass in a good temperature @@ -192,10 +192,10 @@ def test_set_temperature(self): assert psu_status.temperature_good == True def test_is_ok(self): - mock_logger = MagicMock() - mock_psu = MockPsu(True, True, "PSU 1") + mock_logger = mock.MagicMock() + mock_psu = MockPsu(True, True, "PSU 1", 0) - psu_status = PsuStatus(mock_logger, mock_psu) + psu_status = psud.PsuStatus(mock_logger, mock_psu) psu_status.presence = True psu_status.power_good = True psu_status.voltage_good = True diff --git a/sonic-psud/tests/test_psud.py b/sonic-psud/tests/test_psud.py index d5ffa56a7a64..a8d36fd84ac9 100644 --- a/sonic-psud/tests/test_psud.py +++ b/sonic-psud/tests/test_psud.py @@ -2,19 +2,22 @@ import sys from imp import load_source +import pytest + # TODO: Clean this up once we no longer need to support Python 2 if sys.version_info.major == 3: - from unittest.mock import Mock, MagicMock, patch + from unittest import mock else: - from mock import Mock, MagicMock, patch + import mock from sonic_py_common import daemon_base +from . import mock_swsscommon from .mock_platform import MockChassis, MockPsu, MockFanDrawer, MockModule SYSLOG_IDENTIFIER = 'psud_test' NOT_AVAILABLE = 'N/A' -daemon_base.db_connect = MagicMock() +daemon_base.db_connect = mock.MagicMock() test_path = os.path.dirname(os.path.abspath(__file__)) modules_path = os.path.dirname(test_path) @@ -23,237 +26,148 @@ os.environ["PSUD_UNIT_TESTING"] = "1" load_source('psud', scripts_path + '/psud') -from psud import * - -CHASSIS_INFO_TABLE = 'CHASSIS_INFO' -CHASSIS_INFO_KEY_TEMPLATE = 'chassis {}' - -CHASSIS_INFO_POWER_CONSUMER_FIELD = 'Consumed Power {}' -CHASSIS_INFO_POWER_SUPPLIER_FIELD = 'Supplied Power {}' -CHASSIS_INFO_TOTAL_POWER_CONSUMED_FIELD = 'Total Consumed Power' -CHASSIS_INFO_TOTAL_POWER_SUPPLIED_FIELD = 'Total Supplied Power' -CHASSIS_INFO_POWER_KEY_TEMPLATE = 'chassis_power_budget {}' - - -def setup_function(): - PsuChassisInfo.log_notice = MagicMock() - PsuChassisInfo.log_warning = MagicMock() - - -def teardown_function(): - PsuChassisInfo.log_notice.reset() - PsuChassisInfo.log_warning.reset() - -# Test cases to cover functionality in PsuChassisInfo class - - -def test_psuchassis_check_psu_supplied_power(): - chassis = MockChassis() - psu1 = MockPsu(True, True, "PSU 1") - psu1_power = 510.0 - psu1.set_maximum_supplied_power(psu1_power) - chassis.psu_list.append(psu1) - - psu2 = MockPsu(True, True, "PSU 2") - psu2_power = 800.0 - psu2.set_maximum_supplied_power(psu2_power) - chassis.psu_list.append(psu2) - - psu3 = MockPsu(True, True, "PSU 3") - psu3_power = 350.0 - psu3.set_maximum_supplied_power(psu3_power) - chassis.psu_list.append(psu3) - - total_power = psu1_power + psu2_power + psu3_power - state_db = daemon_base.db_connect("STATE_DB") - chassis_tbl = swsscommon.Table(state_db, CHASSIS_INFO_TABLE) - chassis_info = PsuChassisInfo(SYSLOG_IDENTIFIER, chassis) - chassis_info.run_power_budget(chassis_tbl) - fvs = chassis_tbl.get(CHASSIS_INFO_POWER_KEY_TEMPLATE.format(1)) - - # Check if supplied power is recorded in DB - assert total_power == float(fvs[CHASSIS_INFO_TOTAL_POWER_SUPPLIED_FIELD]) - - # Check if psu1 is not present - psu1.set_presence(False) - total_power = psu2_power + psu3_power - chassis_info.run_power_budget(chassis_tbl) - fvs = chassis_tbl.get(CHASSIS_INFO_POWER_KEY_TEMPLATE.format(1)) - assert total_power == float(fvs[CHASSIS_INFO_TOTAL_POWER_SUPPLIED_FIELD]) - - # Check if psu2 status is NOT_OK - psu2.set_status(False) - total_power = psu3_power - chassis_info.run_power_budget(chassis_tbl) - fvs = chassis_tbl.get(CHASSIS_INFO_POWER_KEY_TEMPLATE.format(1)) - assert total_power == float(fvs[CHASSIS_INFO_TOTAL_POWER_SUPPLIED_FIELD]) - - -def test_psuchassis_check_consumed_power(): - chassis = MockChassis() - fan_drawer1 = MockFanDrawer(True, True, "FanDrawer 1") - fan_drawer1_power = 510.0 - fan_drawer1.set_maximum_consumed_power(fan_drawer1_power) - chassis.fan_drawer_list.append(fan_drawer1) - - module1 = MockFanDrawer(True, True, "Module 1") - module1_power = 700.0 - module1.set_maximum_consumed_power(module1_power) - chassis.module_list.append(module1) - - total_power = fan_drawer1_power + module1_power - state_db = daemon_base.db_connect("STATE_DB") - chassis_tbl = swsscommon.Table(state_db, CHASSIS_INFO_TABLE) - chassis_info = PsuChassisInfo(SYSLOG_IDENTIFIER, chassis) - chassis_info.run_power_budget(chassis_tbl) - fvs = chassis_tbl.get(CHASSIS_INFO_POWER_KEY_TEMPLATE.format(1)) - - # Check if supplied power is recorded in DB - assert total_power == float(fvs[CHASSIS_INFO_TOTAL_POWER_CONSUMED_FIELD]) - - # Check if fan_drawer1 present - fan_drawer1.set_presence(False) - total_power = module1_power - chassis_info.run_power_budget(chassis_tbl) - fvs = chassis_tbl.get(CHASSIS_INFO_POWER_KEY_TEMPLATE.format(1)) - assert total_power == float(fvs[CHASSIS_INFO_TOTAL_POWER_CONSUMED_FIELD]) - - # Check if module1 present - fan_drawer1.set_presence(True) - module1.set_presence(False) - total_power = fan_drawer1_power - chassis_info.run_power_budget(chassis_tbl) - fvs = chassis_tbl.get(CHASSIS_INFO_POWER_KEY_TEMPLATE.format(1)) - assert total_power == float(fvs[CHASSIS_INFO_TOTAL_POWER_CONSUMED_FIELD]) - - -def test_psuchassis_check_power_budget(): - chassis = MockChassis() - psu = MockPsu(True, True, "PSU 1") - psu1_power = 510.0 - psu.set_maximum_supplied_power(psu1_power) - chassis.psu_list.append(psu) - - fan_drawer1 = MockFanDrawer(True, True, "FanDrawer 1") - fan_drawer1_power = 510.0 - fan_drawer1.set_maximum_consumed_power(fan_drawer1_power) - chassis.fan_drawer_list.append(fan_drawer1) - - module1 = MockFanDrawer(True, True, "Module 1") - module1_power = 700.0 - module1.set_maximum_consumed_power(module1_power) - chassis.module_list.append(module1) - - state_db = daemon_base.db_connect("STATE_DB") - chassis_tbl = swsscommon.Table(state_db, CHASSIS_INFO_TABLE) - chassis_info = PsuChassisInfo(SYSLOG_IDENTIFIER, chassis) - - # Check if supplied_power < consumed_power - chassis_info.run_power_budget(chassis_tbl) - if chassis_info.update_master_status(): - chassis_info._set_psu_master_led(chassis_info.master_status_good) - fvs = chassis_tbl.get(CHASSIS_INFO_POWER_KEY_TEMPLATE.format(1)) - - assert float(fvs[CHASSIS_INFO_TOTAL_POWER_SUPPLIED_FIELD]) < float(fvs[CHASSIS_INFO_TOTAL_POWER_CONSUMED_FIELD]) - assert chassis_info.master_status_good == False - assert MockPsu.get_status_master_led() == MockPsu.STATUS_LED_COLOR_RED - - # Add a PSU - psu = MockPsu(True, True, "PSU 2") - psu2_power = 800.0 - psu.set_maximum_supplied_power(psu2_power) - chassis.psu_list.append(psu) - - # Check if supplied_power > consumed_power - chassis_info.run_power_budget(chassis_tbl) - if chassis_info.update_master_status(): - chassis_info._set_psu_master_led(chassis_info.master_status_good) - fvs = chassis_tbl.get(CHASSIS_INFO_POWER_KEY_TEMPLATE.format(1)) - - assert float(fvs[CHASSIS_INFO_TOTAL_POWER_SUPPLIED_FIELD]) > float(fvs[CHASSIS_INFO_TOTAL_POWER_CONSUMED_FIELD]) - assert chassis_info.master_status_good == True - assert MockPsu.get_status_master_led() == MockPsu.STATUS_LED_COLOR_GREEN - - -def test_get_psu_key(): - assert get_psu_key(0) == PSU_INFO_KEY_TEMPLATE.format(0) - assert get_psu_key(1) == PSU_INFO_KEY_TEMPLATE.format(1) - - -def test_try_get(): - # Test a proper, working callback - GOOD_CALLBACK_RETURN_VALUE = "This is a test" - - def callback1(): - return GOOD_CALLBACK_RETURN_VALUE - - ret = try_get(callback1) - assert ret == GOOD_CALLBACK_RETURN_VALUE - - # Ensure try_get returns default value if callback returns None - DEFAULT_VALUE = "Default value" - - def callback2(): - return None - - ret = try_get(callback2, default=DEFAULT_VALUE) - assert ret == DEFAULT_VALUE - - # Ensure try_get returns default value if callback returns None - def callback3(): - raise NotImplementedError - - ret = try_get(callback3, default=DEFAULT_VALUE) - assert ret == DEFAULT_VALUE - - -class TestDaemonPsud(object): - """ - Test cases to cover functionality in DaemonPsud class - """ - - def test_set_psu_led(self): - mock_logger = MagicMock() - mock_psu = MockPsu(True, True, "PSU 1") - psu_status = PsuStatus(mock_logger, mock_psu) - - daemon_psud = DaemonPsud(SYSLOG_IDENTIFIER) - - psu_status.presence = True - psu_status.power_good = True - psu_status.voltage_good = True - psu_status.temperature_good = True - daemon_psud._set_psu_led(mock_psu, psu_status) - assert mock_psu.get_status_led() == mock_psu.STATUS_LED_COLOR_GREEN - - psu_status.presence = False - daemon_psud._set_psu_led(mock_psu, psu_status) - assert mock_psu.get_status_led() == mock_psu.STATUS_LED_COLOR_RED - - psu_status.presence = True - daemon_psud._set_psu_led(mock_psu, psu_status) - assert mock_psu.get_status_led() == mock_psu.STATUS_LED_COLOR_GREEN - - psu_status.power_good = False - daemon_psud._set_psu_led(mock_psu, psu_status) - assert mock_psu.get_status_led() == mock_psu.STATUS_LED_COLOR_RED - - psu_status.power_good = True - daemon_psud._set_psu_led(mock_psu, psu_status) - assert mock_psu.get_status_led() == mock_psu.STATUS_LED_COLOR_GREEN - - psu_status.voltage_good = False - daemon_psud._set_psu_led(mock_psu, psu_status) - assert mock_psu.get_status_led() == mock_psu.STATUS_LED_COLOR_RED - - psu_status.voltage_good = True - daemon_psud._set_psu_led(mock_psu, psu_status) - assert mock_psu.get_status_led() == mock_psu.STATUS_LED_COLOR_GREEN - - psu_status.temperature_good = False - daemon_psud._set_psu_led(mock_psu, psu_status) - assert mock_psu.get_status_led() == mock_psu.STATUS_LED_COLOR_RED - - psu_status.temperature_good = True - daemon_psud._set_psu_led(mock_psu, psu_status) - assert mock_psu.get_status_led() == mock_psu.STATUS_LED_COLOR_GREEN +import psud + +@mock.patch('psud.platform_chassis', mock.MagicMock()) +@mock.patch('psud.platform_psuutil', mock.MagicMock()) +def test_wrapper_get_num_psus(): + # Test new platform API is available and implemented + psud._wrapper_get_num_psus() + assert psud.platform_chassis.get_num_psus.call_count == 1 + assert psud.platform_psuutil.get_num_psus.call_count == 0 + + # Test new platform API is available but not implemented + psud.platform_chassis.get_num_psus.side_effect = NotImplementedError + psud._wrapper_get_num_psus() + assert psud.platform_chassis.get_num_psus.call_count == 2 + assert psud.platform_psuutil.get_num_psus.call_count == 1 + + # Test new platform API not available + psud.platform_chassis = None + psud._wrapper_get_num_psus() + assert psud.platform_psuutil.get_num_psus.call_count == 2 + + +@mock.patch('psud.platform_chassis', mock.MagicMock()) +@mock.patch('psud.platform_psuutil', mock.MagicMock()) +def test_wrapper_get_psu_presence(): + # Test new platform API is available + psud._wrapper_get_psu_presence(1) + assert psud.platform_chassis.get_psu(0).get_presence.call_count == 1 + assert psud.platform_psuutil.get_psu_presence.call_count == 0 + + # Test new platform API is available but not implemented + psud.platform_chassis.get_psu(0).get_presence.side_effect = NotImplementedError + psud._wrapper_get_psu_presence(1) + assert psud.platform_chassis.get_psu(0).get_presence.call_count == 2 + assert psud.platform_psuutil.get_psu_presence.call_count == 1 + + # Test new platform API not available + psud.platform_chassis = None + psud._wrapper_get_psu_presence(1) + assert psud.platform_psuutil.get_psu_presence.call_count == 2 + psud.platform_psuutil.get_psu_presence.assert_called_with(1) + + +@mock.patch('psud.platform_chassis', mock.MagicMock()) +@mock.patch('psud.platform_psuutil', mock.MagicMock()) +def test_wrapper_get_psu_status(): + # Test new platform API is available + psud._wrapper_get_psu_status(1) + assert psud.platform_chassis.get_psu(0).get_powergood_status.call_count == 1 + assert psud.platform_psuutil.get_psu_status.call_count == 0 + + # Test new platform API is available but not implemented + psud.platform_chassis.get_psu(0).get_powergood_status.side_effect = NotImplementedError + psud._wrapper_get_psu_status(1) + assert psud.platform_chassis.get_psu(0).get_powergood_status.call_count == 2 + assert psud.platform_psuutil.get_psu_status.call_count == 1 + + # Test new platform API not available + psud.platform_chassis = None + psud._wrapper_get_psu_status(1) + assert psud.platform_psuutil.get_psu_status.call_count == 2 + psud.platform_psuutil.get_psu_status.assert_called_with(1) + + +@mock.patch('psud._wrapper_get_psu_presence', mock.MagicMock()) +@mock.patch('psud._wrapper_get_psu_status', mock.MagicMock()) +def test_psu_db_update(): + psu_tbl = mock.MagicMock() + + psud._wrapper_get_psu_presence.return_value = True + psud._wrapper_get_psu_status.return_value = True + expected_fvp = psud.swsscommon.FieldValuePairs( + [(psud.PSU_INFO_PRESENCE_FIELD, 'true'), + (psud.PSU_INFO_STATUS_FIELD, 'true'), + ]) + psud.psu_db_update(psu_tbl, 1) + assert psu_tbl.set.call_count == 1 + psu_tbl.set.assert_called_with(psud.PSU_INFO_KEY_TEMPLATE.format(1), expected_fvp) + + psu_tbl.set.reset_mock() + + psud._wrapper_get_psu_presence.return_value = False + psud._wrapper_get_psu_status.return_value = True + expected_fvp = psud.swsscommon.FieldValuePairs( + [(psud.PSU_INFO_PRESENCE_FIELD, 'false'), + (psud.PSU_INFO_STATUS_FIELD, 'true'), + ]) + psud.psu_db_update(psu_tbl, 1) + assert psu_tbl.set.call_count == 1 + psu_tbl.set.assert_called_with(psud.PSU_INFO_KEY_TEMPLATE.format(1), expected_fvp) + + psu_tbl.set.reset_mock() + + psud._wrapper_get_psu_presence.return_value = True + psud._wrapper_get_psu_status.return_value = False + expected_fvp = psud.swsscommon.FieldValuePairs( + [(psud.PSU_INFO_PRESENCE_FIELD, 'true'), + (psud.PSU_INFO_STATUS_FIELD, 'false'), + ]) + psud.psu_db_update(psu_tbl, 1) + assert psu_tbl.set.call_count == 1 + psu_tbl.set.assert_called_with(psud.PSU_INFO_KEY_TEMPLATE.format(1), expected_fvp) + + psu_tbl.set.reset_mock() + + psud._wrapper_get_psu_presence.return_value = False + psud._wrapper_get_psu_status.return_value = False + expected_fvp = psud.swsscommon.FieldValuePairs( + [(psud.PSU_INFO_PRESENCE_FIELD, 'false'), + (psud.PSU_INFO_STATUS_FIELD, 'false'), + ]) + psud.psu_db_update(psu_tbl, 1) + assert psu_tbl.set.call_count == 1 + psu_tbl.set.assert_called_with(psud.PSU_INFO_KEY_TEMPLATE.format(1), expected_fvp) + + psu_tbl.set.reset_mock() + + psud._wrapper_get_psu_presence.return_value = True + psud._wrapper_get_psu_status.return_value = True + expected_fvp = psud.swsscommon.FieldValuePairs( + [(psud.PSU_INFO_PRESENCE_FIELD, 'true'), + (psud.PSU_INFO_STATUS_FIELD, 'true'), + ]) + psud.psu_db_update(psu_tbl, 32) + assert psu_tbl.set.call_count == 32 + psu_tbl.set.assert_called_with(psud.PSU_INFO_KEY_TEMPLATE.format(32), expected_fvp) + + +def test_log_on_status_changed(): + normal_log = "Normal log message" + abnormal_log = "Abnormal log message" + + mock_logger = mock.MagicMock() + + psud.log_on_status_changed(mock_logger, True, normal_log, abnormal_log) + assert mock_logger.log_notice.call_count == 1 + assert mock_logger.log_warning.call_count == 0 + mock_logger.log_notice.assert_called_with(normal_log) + + mock_logger.log_notice.reset_mock() + + psud.log_on_status_changed(mock_logger, False, normal_log, abnormal_log) + assert mock_logger.log_notice.call_count == 0 + assert mock_logger.log_warning.call_count == 1 + mock_logger.log_warning.assert_called_with(abnormal_log)