From e0f8a357c43807e7a3bf66e14b385c2d243f05c6 Mon Sep 17 00:00:00 2001 From: Stephen Sun <5379172+stephenxs@users.noreply.github.com> Date: Fri, 22 Apr 2022 00:47:37 +0800 Subject: [PATCH] Fix checkReplyType failed issue via recreating xcvr_table_helper on forking subprocess (#255) * Fix message interleaving issue via recreating xcvr_table_helper on forking subprocess Signed-off-by: Stephen Sun * Address comments: change xcvr_table_helper to class member Signed-off-by: Stephen Sun * Fix a typo Signed-off-by: Stephen Sun --- sonic-xcvrd/tests/test_xcvrd.py | 37 ++++++++++++----- sonic-xcvrd/xcvrd/xcvrd.py | 74 +++++++++++++++++---------------- 2 files changed, 65 insertions(+), 46 deletions(-) diff --git a/sonic-xcvrd/tests/test_xcvrd.py b/sonic-xcvrd/tests/test_xcvrd.py index 3fa41ae92d05..619ef59c4593 100644 --- a/sonic-xcvrd/tests/test_xcvrd.py +++ b/sonic-xcvrd/tests/test_xcvrd.py @@ -149,7 +149,7 @@ def test_post_port_sfp_info_to_db(self): @patch('xcvrd.xcvrd.platform_sfputil', MagicMock(return_value=[0])) @patch('xcvrd.xcvrd._wrapper_get_presence', MagicMock(return_value=True)) @patch('xcvrd.xcvrd._wrapper_is_replaceable', MagicMock(return_value=True)) - @patch('xcvrd.xcvrd.xcvr_table_helper', MagicMock()) + @patch('xcvrd.xcvrd.XcvrTableHelper', MagicMock()) @patch('xcvrd.xcvrd._wrapper_get_transceiver_info', MagicMock(return_value={'type': '22.75', 'vendor_rev': '0.5', 'serial': '0.7', @@ -219,19 +219,21 @@ def test_post_port_sfp_dom_info_to_db(self): port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD) port_mapping.handle_port_change_event(port_change_event) stop_event = threading.Event() - post_port_sfp_dom_info_to_db(True, port_mapping, stop_event) + xcvr_table_helper = XcvrTableHelper() + post_port_sfp_dom_info_to_db(True, port_mapping, xcvr_table_helper, stop_event) @patch('xcvrd.xcvrd_utilities.port_mapping.PortMapping.logical_port_name_to_physical_port_list', MagicMock(return_value=[0])) @patch('xcvrd.xcvrd.platform_sfputil', MagicMock(return_value=[0])) @patch('xcvrd.xcvrd._wrapper_get_presence', MagicMock(return_value=True)) @patch('xcvrd.xcvrd._wrapper_is_replaceable', MagicMock(return_value=True)) - @patch('xcvrd.xcvrd.xcvr_table_helper', MagicMock()) + @patch('xcvrd.xcvrd.XcvrTableHelper', MagicMock()) def test_init_port_sfp_status_tbl(self): port_mapping = PortMapping() port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD) port_mapping.handle_port_change_event(port_change_event) stop_event = threading.Event() - init_port_sfp_status_tbl(port_mapping, stop_event) + xcvr_table_helper = XcvrTableHelper() + init_port_sfp_status_tbl(port_mapping, xcvr_table_helper, stop_event) def test_get_media_settings_key(self): xcvr_info_dict = { @@ -544,10 +546,11 @@ def test_CmisManagerTask_task_worker(self, mock_chassis): task.task_worker() assert mock_xcvr_api.tx_disable_channel.call_count == 2 - @patch('xcvrd.xcvrd.xcvr_table_helper', MagicMock()) + @patch('xcvrd.xcvrd.XcvrTableHelper', MagicMock()) def test_DomInfoUpdateTask_handle_port_change_event(self): port_mapping = PortMapping() task = DomInfoUpdateTask(port_mapping) + task.xcvr_table_helper = XcvrTableHelper() port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD) task.on_port_config_change(port_change_event) assert task.port_mapping.logical_port_list.count('Ethernet0') @@ -571,7 +574,7 @@ def test_DomInfoUpdateTask_task_run_stop(self): task.task_stop() assert not task.task_thread.is_alive() - @patch('xcvrd.xcvrd.xcvr_table_helper', MagicMock()) + @patch('xcvrd.xcvrd.XcvrTableHelper', MagicMock()) @patch('xcvrd.xcvrd_utilities.sfp_status_helper.detect_port_in_error_status') @patch('xcvrd.xcvrd.post_port_dom_info_to_db') @patch('xcvrd.xcvrd.post_port_dom_threshold_info_to_db') @@ -587,6 +590,7 @@ def test_DomInfoUpdateTask_task_worker(self, mock_select, mock_sub_table, mock_p port_mapping = PortMapping() task = DomInfoUpdateTask(port_mapping) + task.xcvr_table_helper = XcvrTableHelper() task.task_stopping_event.wait = MagicMock(side_effect=[False, True]) mock_detect_error.return_value = True task.task_worker() @@ -603,7 +607,7 @@ def test_DomInfoUpdateTask_task_worker(self, mock_select, mock_sub_table, mock_p assert mock_post_dom_info.call_count == 1 @patch('xcvrd.xcvrd._wrapper_get_presence', MagicMock(return_value=False)) - @patch('xcvrd.xcvrd.xcvr_table_helper') + @patch('xcvrd.xcvrd.XcvrTableHelper') def test_SfpStateUpdateTask_handle_port_change_event(self, mock_table_helper): mock_table = MagicMock() mock_table.get = MagicMock(return_value=(False, None)) @@ -614,6 +618,7 @@ def test_SfpStateUpdateTask_handle_port_change_event(self, mock_table_helper): port_mapping = PortMapping() retry_eeprom_set = set() task = SfpStateUpdateTask(port_mapping, retry_eeprom_set) + task.xcvr_table_helper = XcvrTableHelper() port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD) wait_time = 5 while wait_time > 0: @@ -650,12 +655,19 @@ def test_SfpStateUpdateTask_task_run_stop(self): task.task_stop() assert wait_until(5, 1, lambda: task.task_process.is_alive() is False) - @patch('xcvrd.xcvrd.xcvr_table_helper', MagicMock()) + @patch('xcvrd.xcvrd.XcvrTableHelper', MagicMock()) @patch('xcvrd.xcvrd.post_port_sfp_info_to_db') def test_SfpStateUpdateTask_retry_eeprom_reading(self, mock_post_sfp_info): + mock_table = MagicMock() + mock_table.get = MagicMock(return_value=(False, None)) + port_mapping = PortMapping() retry_eeprom_set = set() task = SfpStateUpdateTask(port_mapping, retry_eeprom_set) + task.xcvr_table_helper = XcvrTableHelper() + task.xcvr_table_helper.get_intf_tbl = MagicMock(return_value=mock_table) + task.xcvr_table_helper.get_dom_tbl = MagicMock(return_value=mock_table) + task.xcvr_table_helper.get_app_port_tbl = MagicMock(return_value=mock_table) task.retry_eeprom_reading() assert mock_post_sfp_info.call_count == 0 @@ -693,7 +705,7 @@ def test_SfpStateUpdateTask_mapping_event_from_change_event(self): assert task._mapping_event_from_change_event(True, port_dict) == NORMAL_EVENT @patch('time.sleep', MagicMock()) - @patch('xcvrd.xcvrd.xcvr_table_helper', MagicMock()) + @patch('xcvrd.xcvrd.XcvrTableHelper', MagicMock()) @patch('xcvrd.xcvrd._wrapper_soak_sfp_insert_event', MagicMock()) @patch('xcvrd.xcvrd_utilities.port_mapping.subscribe_port_config_change', MagicMock(return_value=(None, None))) @patch('xcvrd.xcvrd_utilities.port_mapping.handle_port_config_change', MagicMock()) @@ -710,6 +722,7 @@ def test_SfpStateUpdateTask_task_worker(self, mock_updata_status, mock_post_sfp_ port_mapping = PortMapping() retry_eeprom_set = set() task = SfpStateUpdateTask(port_mapping, retry_eeprom_set) + task.xcvr_table_helper = XcvrTableHelper() stop_event = multiprocessing.Event() sfp_error_event = multiprocessing.Event() mock_change_event.return_value = (True, {0: 0}, {}) @@ -792,7 +805,7 @@ def test_SfpStateUpdateTask_task_worker(self, mock_updata_status, mock_post_sfp_ assert mock_updata_status.call_count == 1 assert mock_del_dom.call_count == 1 - @patch('xcvrd.xcvrd.xcvr_table_helper') + @patch('xcvrd.xcvrd.XcvrTableHelper') @patch('xcvrd.xcvrd._wrapper_get_presence') @patch('xcvrd.xcvrd.notify_media_setting') @patch('xcvrd.xcvrd.post_port_dom_threshold_info_to_db') @@ -819,6 +832,10 @@ class MockTable: port_mapping = PortMapping() retry_eeprom_set = set() task = SfpStateUpdateTask(port_mapping, retry_eeprom_set) + task.xcvr_table_helper = XcvrTableHelper() + task.xcvr_table_helper.get_status_tbl = mock_table_helper.get_status_tbl + task.xcvr_table_helper.get_intf_tbl = mock_table_helper.get_intf_tbl + task.xcvr_table_helper.get_dom_tbl = mock_table_helper.get_dom_tbl port_change_event = PortChangeEvent('Ethernet0', 1, 0, PortChangeEvent.PORT_ADD) task.port_mapping.handle_port_change_event(port_change_event) # SFP information is in the DB, copy the SFP information for the newly added logical port diff --git a/sonic-xcvrd/xcvrd/xcvrd.py b/sonic-xcvrd/xcvrd/xcvrd.py index e8ce64343f09..12ef0be3af03 100644 --- a/sonic-xcvrd/xcvrd/xcvrd.py +++ b/sonic-xcvrd/xcvrd/xcvrd.py @@ -86,8 +86,6 @@ platform_sfputil = None # Global chassis object based on new platform api platform_chassis = None -# Global xcvr table helper -xcvr_table_helper = None # Global logger instance for helper functions and classes # TODO: Refactor so that we only need the logger inherited @@ -496,7 +494,7 @@ def post_port_dom_info_to_db(logical_port_name, port_mapping, table, stop_event= # Update port dom/sfp info in db -def post_port_sfp_dom_info_to_db(is_warm_start, port_mapping, stop_event=threading.Event()): +def post_port_sfp_dom_info_to_db(is_warm_start, port_mapping, xcvr_table_helper, stop_event=threading.Event()): # Connect to STATE_DB and create transceiver dom/sfp info tables transceiver_dict = {} retry_eeprom_set = set() @@ -803,7 +801,7 @@ def delete_port_from_status_table(logical_port_name, status_tbl): # Init TRANSCEIVER_STATUS table -def init_port_sfp_status_tbl(port_mapping, stop_event=threading.Event()): +def init_port_sfp_status_tbl(port_mapping, xcvr_table_helper, stop_event=threading.Event()): # Init TRANSCEIVER_STATUS table logical_port_list = port_mapping.logical_port_list for logical_port_name in logical_port_list: @@ -1121,6 +1119,8 @@ def test_datapath_state(self, api, channel, states): return done def task_worker(self): + self.xcvr_table_helper = XcvrTableHelper() + self.log_notice("Starting...") # APPL_DB for CONFIG updates, and STATE_DB for insertion/removal @@ -1341,6 +1341,7 @@ def __init__(self, port_mapping): self.port_mapping = copy.deepcopy(port_mapping) def task_worker(self): + self.xcvr_table_helper = XcvrTableHelper() helper_logger.log_info("Start DOM monitoring loop") dom_info_cache = {} dom_th_info_cache = {} @@ -1362,9 +1363,9 @@ def task_worker(self): helper_logger.log_warning("Got invalid asic index for {}, ignored".format(logical_port_name)) continue - if not sfp_status_helper.detect_port_in_error_status(logical_port_name, xcvr_table_helper.get_status_tbl(asic_index)): - post_port_dom_info_to_db(logical_port_name, self.port_mapping, xcvr_table_helper.get_dom_tbl(asic_index), self.task_stopping_event, dom_info_cache=dom_info_cache) - post_port_dom_threshold_info_to_db(logical_port_name, self.port_mapping, xcvr_table_helper.get_dom_tbl(asic_index), self.task_stopping_event, dom_th_info_cache=dom_th_info_cache) + if not sfp_status_helper.detect_port_in_error_status(logical_port_name, self.xcvr_table_helper.get_status_tbl(asic_index)): + post_port_dom_info_to_db(logical_port_name, self.port_mapping, self.xcvr_table_helper.get_dom_tbl(asic_index), self.task_stopping_event, dom_info_cache=dom_info_cache) + post_port_dom_threshold_info_to_db(logical_port_name, self.port_mapping, self.xcvr_table_helper.get_dom_tbl(asic_index), self.task_stopping_event, dom_th_info_cache=dom_th_info_cache) helper_logger.log_info("Stop DOM monitoring loop") @@ -1396,7 +1397,7 @@ def on_remove_logical_port(self, port_change_event): del_port_sfp_dom_info_from_db(port_change_event.port_name, self.port_mapping, None, - xcvr_table_helper.get_dom_tbl(port_change_event.asic_id)) + self.xcvr_table_helper.get_dom_tbl(port_change_event.asic_id)) # Process wrapper class to update sfp state info periodically @@ -1442,6 +1443,8 @@ def _mapping_event_from_change_event(self, status, port_dict): return event def task_worker(self, stopping_event, sfp_error_event): + self.xcvr_table_helper = XcvrTableHelper() + helper_logger.log_info("Start SFP monitoring loop") transceiver_dict = {} @@ -1600,29 +1603,29 @@ def task_worker(self, stopping_event, sfp_error_event): helper_logger.log_info("Got SFP inserted event") # A plugin event will clear the error state. update_port_transceiver_status_table( - logical_port, xcvr_table_helper.get_status_tbl(asic_index), sfp_status_helper.SFP_STATUS_INSERTED) + logical_port, self.xcvr_table_helper.get_status_tbl(asic_index), sfp_status_helper.SFP_STATUS_INSERTED) helper_logger.log_info("receive plug in and update port sfp status table.") - rc = post_port_sfp_info_to_db(logical_port, self.port_mapping, xcvr_table_helper.get_intf_tbl(asic_index), transceiver_dict) + rc = post_port_sfp_info_to_db(logical_port, self.port_mapping, self.xcvr_table_helper.get_intf_tbl(asic_index), transceiver_dict) # If we didn't get the sfp info, assuming the eeprom is not ready, give a try again. if rc == SFP_EEPROM_NOT_READY: helper_logger.log_warning("SFP EEPROM is not ready. One more try...") time.sleep(TIME_FOR_SFP_READY_SECS) - rc = post_port_sfp_info_to_db(logical_port, self.port_mapping, xcvr_table_helper.get_intf_tbl(asic_index), transceiver_dict) + rc = post_port_sfp_info_to_db(logical_port, self.port_mapping, self.xcvr_table_helper.get_intf_tbl(asic_index), transceiver_dict) if rc == SFP_EEPROM_NOT_READY: # If still failed to read EEPROM, put it to retry set self.retry_eeprom_set.add(logical_port) if rc != SFP_EEPROM_NOT_READY: - post_port_dom_info_to_db(logical_port, self.port_mapping, xcvr_table_helper.get_dom_tbl(asic_index)) - post_port_dom_threshold_info_to_db(logical_port, self.port_mapping, xcvr_table_helper.get_dom_tbl(asic_index)) - notify_media_setting(logical_port, transceiver_dict, xcvr_table_helper.get_app_port_tbl(asic_index), self.port_mapping) + post_port_dom_info_to_db(logical_port, self.port_mapping, self.xcvr_table_helper.get_dom_tbl(asic_index)) + post_port_dom_threshold_info_to_db(logical_port, self.port_mapping, self.xcvr_table_helper.get_dom_tbl(asic_index)) + notify_media_setting(logical_port, transceiver_dict, self.xcvr_table_helper.get_app_port_tbl(asic_index), self.port_mapping) transceiver_dict.clear() elif value == sfp_status_helper.SFP_STATUS_REMOVED: helper_logger.log_info("Got SFP removed event") update_port_transceiver_status_table( - logical_port, xcvr_table_helper.get_status_tbl(asic_index), sfp_status_helper.SFP_STATUS_REMOVED) + logical_port, self.xcvr_table_helper.get_status_tbl(asic_index), sfp_status_helper.SFP_STATUS_REMOVED) helper_logger.log_info("receive plug out and pdate port sfp status table.") - del_port_sfp_dom_info_from_db(logical_port, self.port_mapping, xcvr_table_helper.get_intf_tbl(asic_index), xcvr_table_helper.get_dom_tbl(asic_index)) + del_port_sfp_dom_info_from_db(logical_port, self.port_mapping, self.xcvr_table_helper.get_intf_tbl(asic_index), self.xcvr_table_helper.get_dom_tbl(asic_index)) else: try: error_bits = int(value) @@ -1639,12 +1642,12 @@ def task_worker(self, stopping_event, sfp_error_event): # Add error info to database # Any existing error will be replaced by the new one. - update_port_transceiver_status_table(logical_port, xcvr_table_helper.get_status_tbl(asic_index), value, '|'.join(error_descriptions)) + update_port_transceiver_status_table(logical_port, self.xcvr_table_helper.get_status_tbl(asic_index), value, '|'.join(error_descriptions)) helper_logger.log_info("Receive error update port sfp status table.") # In this case EEPROM is not accessible. The DOM info will be removed since it can be out-of-date. # The interface info remains in the DB since it is static. if sfp_status_helper.is_error_block_eeprom_reading(error_bits): - del_port_sfp_dom_info_from_db(logical_port, None, xcvr_table_helper.get_dom_tbl(asic_index)) + del_port_sfp_dom_info_from_db(logical_port, None, self.xcvr_table_helper.get_dom_tbl(asic_index)) except (TypeError, ValueError) as e: helper_logger.log_error("Got unrecognized event {}, ignored".format(value)) @@ -1716,9 +1719,9 @@ def on_remove_logical_port(self, port_change_event): # but it is necessary because TRANSCEIVER_DOM_INFO is also updated in this sub process when a new SFP is inserted. del_port_sfp_dom_info_from_db(port_change_event.port_name, self.port_mapping, - xcvr_table_helper.get_intf_tbl(port_change_event.asic_id), - xcvr_table_helper.get_dom_tbl(port_change_event.asic_id)) - delete_port_from_status_table(port_change_event.port_name, xcvr_table_helper.get_status_tbl(port_change_event.asic_id)) + self.xcvr_table_helper.get_intf_tbl(port_change_event.asic_id), + self.xcvr_table_helper.get_dom_tbl(port_change_event.asic_id)) + delete_port_from_status_table(port_change_event.port_name, self.xcvr_table_helper.get_status_tbl(port_change_event.asic_id)) # The logical port has been removed, no need retry EEPROM reading if port_change_event.port_name in self.retry_eeprom_set: @@ -1746,9 +1749,9 @@ def on_add_logical_port(self, port_change_event): logical_port_event_dict = {} sfp_status = None sibling_port = None - status_tbl = xcvr_table_helper.get_status_tbl(port_change_event.asic_id) - int_tbl = xcvr_table_helper.get_intf_tbl(port_change_event.asic_id) - dom_tbl = xcvr_table_helper.get_dom_tbl(port_change_event.asic_id) + status_tbl = self.xcvr_table_helper.get_status_tbl(port_change_event.asic_id) + int_tbl = self.xcvr_table_helper.get_intf_tbl(port_change_event.asic_id) + dom_tbl = self.xcvr_table_helper.get_dom_tbl(port_change_event.asic_id) physical_port_list = self.port_mapping.logical_port_name_to_physical_port_list(port_change_event.port_name) # Try to find a logical port with same physical index in DB @@ -1812,7 +1815,7 @@ def on_add_logical_port(self, port_change_event): else: post_port_dom_info_to_db(port_change_event.port_name, self.port_mapping, dom_tbl) post_port_dom_threshold_info_to_db(port_change_event.port_name, self.port_mapping, dom_tbl) - notify_media_setting(port_change_event.port_name, transceiver_dict, xcvr_table_helper.get_app_port_tbl(port_change_event.asic_id), self.port_mapping) + notify_media_setting(port_change_event.port_name, transceiver_dict, self.xcvr_table_helper.get_app_port_tbl(port_change_event.asic_id), self.port_mapping) else: status = sfp_status_helper.SFP_STATUS_REMOVED if not status else status logical_port_event_dict[port_change_event.port_name] = status @@ -1837,11 +1840,11 @@ def retry_eeprom_reading(self): retry_success_set = set() for logical_port in self.retry_eeprom_set: asic_index = self.port_mapping.get_asic_id_for_logical_port(logical_port) - rc = post_port_sfp_info_to_db(logical_port, self.port_mapping, xcvr_table_helper.get_intf_tbl(asic_index), transceiver_dict) + rc = post_port_sfp_info_to_db(logical_port, self.port_mapping, self.xcvr_table_helper.get_intf_tbl(asic_index), transceiver_dict) if rc != SFP_EEPROM_NOT_READY: - post_port_dom_info_to_db(logical_port, self.port_mapping, xcvr_table_helper.get_dom_tbl(asic_index)) - post_port_dom_threshold_info_to_db(logical_port, self.port_mapping, xcvr_table_helper.get_dom_tbl(asic_index)) - notify_media_setting(logical_port, transceiver_dict, xcvr_table_helper.get_app_port_tbl(asic_index), self.port_mapping) + post_port_dom_info_to_db(logical_port, self.port_mapping, self.xcvr_table_helper.get_dom_tbl(asic_index)) + post_port_dom_threshold_info_to_db(logical_port, self.port_mapping, self.xcvr_table_helper.get_dom_tbl(asic_index)) + notify_media_setting(logical_port, transceiver_dict, self.xcvr_table_helper.get_app_port_tbl(asic_index), self.port_mapping) transceiver_dict.clear() retry_success_set.add(logical_port) # Update retry EEPROM set @@ -1910,7 +1913,6 @@ def load_media_settings(self): def init(self): global platform_sfputil global platform_chassis - global xcvr_table_helper self.log_info("Start daemon init...") @@ -1942,7 +1944,7 @@ def init(self): swsscommon.SonicDBConfig.initializeGlobalConfig() # Initialize xcvr table helper - xcvr_table_helper = XcvrTableHelper() + self.xcvr_table_helper = XcvrTableHelper() if is_fast_reboot_enabled(): self.log_info("Skip loading media_settings.json in case of fast-reboot") @@ -1956,7 +1958,7 @@ def init(self): # Make sure this daemon started after all port configured self.log_info("Wait for port config is done") - for namespace in xcvr_table_helper.namespaces: + for namespace in self.xcvr_table_helper.namespaces: self.wait_for_port_config_done(namespace) @@ -1964,11 +1966,11 @@ def init(self): # Post all the current interface dom/sfp info to STATE_DB self.log_info("Post all port DOM/SFP info to DB") - retry_eeprom_set = post_port_sfp_dom_info_to_db(is_warm_start, port_mapping_data, self.stop_event) + retry_eeprom_set = post_port_sfp_dom_info_to_db(is_warm_start, port_mapping_data, self.xcvr_table_helper, self.stop_event) # Init port sfp status table self.log_info("Init port sfp status table") - init_port_sfp_status_tbl(port_mapping_data, self.stop_event) + init_port_sfp_status_tbl(port_mapping_data, self.xcvr_table_helper, self.stop_event) return port_mapping_data, retry_eeprom_set @@ -1986,8 +1988,8 @@ def deinit(self): helper_logger.log_warning("Got invalid asic index for {}, ignored".format(logical_port_name)) continue - del_port_sfp_dom_info_from_db(logical_port_name, port_mapping_data, xcvr_table_helper.get_intf_tbl(asic_index), xcvr_table_helper.get_dom_tbl(asic_index)) - delete_port_from_status_table(logical_port_name, xcvr_table_helper.get_status_tbl(asic_index)) + del_port_sfp_dom_info_from_db(logical_port_name, port_mapping_data, self.xcvr_table_helper.get_intf_tbl(asic_index), self.xcvr_table_helper.get_dom_tbl(asic_index)) + delete_port_from_status_table(logical_port_name, self.xcvr_table_helper.get_status_tbl(asic_index)) del globals()['platform_chassis']