diff --git a/sonic_platform_base/sonic_ssd/ssd_emmc.py b/sonic_platform_base/sonic_ssd/ssd_emmc.py new file mode 100644 index 0000000000..dfa7036f37 --- /dev/null +++ b/sonic_platform_base/sonic_ssd/ssd_emmc.py @@ -0,0 +1,51 @@ +# +# ssd_emmc.py +# +# Implementation of SSD Utility API for eMMC. +# It reads eMMC health, model, firmware, and serial from /sys/block/*. +# + +try: + import os + from .ssd_base import SsdBase +except ImportError as e: + raise ImportError(str(e) + "- required module not found") + + +class EmmcUtil(SsdBase): + def __init__(self, diskdev): + self.diskdev = diskdev + self.path = os.path.join('/sys/block', os.path.basename(diskdev)) + + def _read_device_entry(self, entry, default=None): + path = os.path.join(self.path, 'device', entry) + try: + with open(path) as f: + return f.read().rstrip() + except OSError: + return default + + def _is_slc(self): + return bool(self._read_device_entry('enhanced_area_offset')) + + def get_health(self): + data = self._read_device_entry('life_time') + if data is None: + raise NotImplementedError + value = int(data.split()[0 if self._is_slc() else 1], 0) + return float(100 - (10 * (value - 1))) + + def get_temperature(self): + return 'N/A' + + def get_model(self): + return self._read_device_entry('name') + + def get_firmware(self): + return self._read_device_entry('fwrev') + + def get_serial(self): + return self._read_device_entry('serial') + + def get_vendor_output(self): + return '' diff --git a/tests/ssd_emmc_test.py b/tests/ssd_emmc_test.py new file mode 100644 index 0000000000..d8a5fefbb8 --- /dev/null +++ b/tests/ssd_emmc_test.py @@ -0,0 +1,40 @@ +import sys +if sys.version_info.major == 3: + from unittest.mock import mock_open, patch +else: + from mock import mock_open, patch + +from sonic_platform_base.sonic_ssd.ssd_emmc import EmmcUtil + +mocked_files = { + '/sys/block/emmctest/device/enhanced_area_offset': '0', + '/sys/block/emmctest/device/life_time': '0x02 0x02', + '/sys/block/emmctest/device/name': 'Test eMMC device', + '/sys/block/emmctest/device/fwrev': '0xAA00000000000000', + '/sys/block/emmctest/device/serial': '0xabcdefef' +} + + +def build_mocked_sys_fs_open(files): + mocks = dict([(fname, mock_open(read_data=cnt).return_value) + for fname, cnt in files.items()]) + + def mopen(fname): + if fname in mocks: + return mocks[fname] + else: + raise FileNotFoundError(fname) + return mopen + + +class TestSsdEMMC: + + @patch('builtins.open', new=build_mocked_sys_fs_open(mocked_files)) + def test_check(self, *args): + util = EmmcUtil('emmctest') + + assert (util.get_health() == 90.0) + assert (util.get_temperature() == 'N/A') + assert (util.get_model() == 'Test eMMC device') + assert (util.get_firmware() == '0xAA00000000000000') + assert (util.get_serial() == '0xabcdefef')