From 07ef101f12ee7a09eaa1d6c8aa896d88acfeb1d7 Mon Sep 17 00:00:00 2001 From: Stephen Sun Date: Fri, 11 Sep 2020 19:48:00 +0800 Subject: [PATCH] [buffermgr/bufferorch] Support dynamic buffer calculation 1. Extend the CLI options for buffermgrd: -a: asic_table provided, -p: peripheral_table provided The buffermgrd will start with dynamic headroom calculation mode With -a provided Otherwise it will start the legacy mode (pg_headroom_profile looking up) 2. A new class is provided for dynamic buffer calculation while the old one remains. The daemon will instantiate the corresponding class according to the CLI option when it starts. 3. In both mode, the buffermgrd will copy BUFFER_XXX tables from CONFIG_DB to APPL_DB and the bufferorch will consume BUFFER_XXX tables from APPL_DB The following points are for dynamic buffer calculation mode 4. In the dynamic buffer calculation mode, there are 3 lua plugins are provided for vendor-specific operations: - buffer_headroom_.lua, for calculationg headroom size. - buffer_pool_.lua, for calculating buffer pool size. - buffer_check_headroom_.lua, for checking whether headroom exceeds the limit 5. During initialization, The daemon will: - load asic_table and peripheral_table from the given json file, parse them and push them into STATE_DB.ASIC_TABLE and STATE_DB.PERIPHERAL_TABLE respectively - load all plugins - try to load the STATE_DB.BUFFER_MAX_PARAM.mmu_size which is used for updating buffer pool size - a timer will be started for periodic buffer pool size audit 6. The daemon will listen to and handle the following tables from CONFIG_DB The tables will be cached internally in the damon for the purpose of saving access time - BUFFER_POOL: - if size is provided: insert the entry to APPL_DB - otherwise: cache them and push to APPL_DB after the size is calculated by lua plugin - BUFFER_PROFILE and BUFFER_PG: - items for ingress lossless headroom need to be cached and handled (according to the design) - other items will be inserted to the APPL_DB directly - PORT_TABLE, for ports' speed and MTU update - CABLE_LENGTH, for ports' cable length 7. Other tables will be copied to APPL_DB directly: - BUFFER_QUEUE - BUFFER_PORT_INGRESS_PROFILE_LIST - BUFFER_PORT_EGRESS_PROFILE_LIST As the names of tables in APPL_DB differ from that in CONFIG_DB, all references should be adjusted accordingly 8. BufferOrch modified accordingly: Consume buffer relavent tables from APPL_DB instead of CONFIG_DB 9. Warm reboot: - db_migrator is responsible for copying the data from CONFIG_DB to APPL_DB if switch is warm-rebooted from an old image to the new image for the first time - no specific handling in the daemon side 10.Provide vstest script Signed-off-by: Stephen Sun --- cfgmgr/Makefile.am | 9 +- cfgmgr/buffer_check_headroom_mellanox.lua | 99 ++ cfgmgr/buffer_check_headroom_vs.lua | 1 + cfgmgr/buffer_headroom_mellanox.lua | 129 ++ cfgmgr/buffer_headroom_vs.lua | 1 + cfgmgr/buffer_pool_mellanox.lua | 193 +++ cfgmgr/buffer_pool_vs.lua | 1 + cfgmgr/buffermgr.cpp | 152 +- cfgmgr/buffermgr.h | 16 +- cfgmgr/buffermgrd.cpp | 160 +- cfgmgr/buffermgrdyn.cpp | 1602 +++++++++++++++++++++ cfgmgr/buffermgrdyn.h | 245 ++++ orchagent/bufferorch.cpp | 109 +- orchagent/bufferorch.h | 6 +- orchagent/orchdaemon.cpp | 14 +- tests/mock_tests/portsorch_ut.cpp | 101 +- tests/test_buffer.py | 180 +++ 17 files changed, 2899 insertions(+), 119 deletions(-) create mode 100644 cfgmgr/buffer_check_headroom_mellanox.lua create mode 120000 cfgmgr/buffer_check_headroom_vs.lua create mode 100644 cfgmgr/buffer_headroom_mellanox.lua create mode 120000 cfgmgr/buffer_headroom_vs.lua create mode 100644 cfgmgr/buffer_pool_mellanox.lua create mode 120000 cfgmgr/buffer_pool_vs.lua create mode 100644 cfgmgr/buffermgrdyn.cpp create mode 100644 cfgmgr/buffermgrdyn.h create mode 100644 tests/test_buffer.py diff --git a/cfgmgr/Makefile.am b/cfgmgr/Makefile.am index 001c6f1f99e..077f367f2f4 100644 --- a/cfgmgr/Makefile.am +++ b/cfgmgr/Makefile.am @@ -5,6 +5,13 @@ LIBNL_LIBS = -lnl-genl-3 -lnl-route-3 -lnl-3 bin_PROGRAMS = vlanmgrd teammgrd portmgrd intfmgrd buffermgrd vrfmgrd nbrmgrd vxlanmgrd sflowmgrd natmgrd +cfgmgrdir = $(datadir)/swss + +dist_cfgmgr_DATA = \ + buffer_check_headroom_mellanox.lua \ + buffer_headroom_mellanox.lua \ + buffer_pool_mellanox.lua + if DEBUG DBGFLAGS = -ggdb -DDEBUG else @@ -31,7 +38,7 @@ intfmgrd_CFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_SAI) intfmgrd_CPPFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_SAI) intfmgrd_LDADD = -lswsscommon -buffermgrd_SOURCES = buffermgrd.cpp buffermgr.cpp $(top_srcdir)/orchagent/orch.cpp $(top_srcdir)/orchagent/request_parser.cpp shellcmd.h +buffermgrd_SOURCES = buffermgrd.cpp buffermgr.cpp buffermgrdyn.cpp $(top_srcdir)/orchagent/orch.cpp $(top_srcdir)/orchagent/request_parser.cpp shellcmd.h buffermgrd_CFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_SAI) buffermgrd_CPPFLAGS = $(DBGFLAGS) $(AM_CFLAGS) $(CFLAGS_COMMON) $(CFLAGS_SAI) buffermgrd_LDADD = -lswsscommon diff --git a/cfgmgr/buffer_check_headroom_mellanox.lua b/cfgmgr/buffer_check_headroom_mellanox.lua new file mode 100644 index 00000000000..2f7251b0051 --- /dev/null +++ b/cfgmgr/buffer_check_headroom_mellanox.lua @@ -0,0 +1,99 @@ +-- KEYS - port name + +local port = KEYS[1] +local profile +local lossless_profile +local lossless_headroom_size +local lossless_profile_name +local accumulative_size = 0 + +local appl_db = "0" +local config_db = "4" +local state_db = "6" + +local ret_true = {} +local ret_false = {} +local ret = {} +local default_ret = {} + +table.insert(ret_true, "result:true") +table.insert(ret_false, "result:false") + +-- Fetch the cable length from CONFIG_DB +redis.call('SELECT', config_db) +local cable_length_keys = redis.call('KEYS', 'CABLE_LENGTH*') +if #cable_length_keys == 0 then + return ret_true +end + +-- Check whether cable length exceeds 300m (maximum value in the non-dynamic-buffer solution) +local cable_length_str = redis.call('HGET', cable_length_keys[1], port) +if cable_length_str == nil then + return ret_true +end +local cable_length = tonumber(string.sub(cable_length_str, 1, -2)) +if cable_length > 300 then + default_ret = ret_false +else + default_ret = ret_true +end + +-- Fetch the threshold from STATE_DB +redis.call('SELECT', state_db) + +local max_headroom_size = tonumber(redis.call('HGET', 'BUFFER_MAX_PARAM_TABLE|' .. port, 'max_headroom_size')) +if max_headroom_size == nil then + return default_ret +end + +local asic_keys = redis.call('KEYS', 'ASIC_TABLE*') +local pipeline_delay = tonumber(redis.call('HGET', asic_keys[1], 'pipeline_latency')) +accumulative_size = accumulative_size + 2 * pipeline_delay * 1024 + +-- Fetch all keys in BUFFER_PG according to the port +redis.call('SELECT', appl_db) + +local function get_number_of_pgs(keyname) + local range = string.match(keyname, "Ethernet%d+:([^%s]+)$") + local size + if string.len(range) == 1 then + size = 1 + else + size = 1 + tonumber(string.sub(range, -1)) - tonumber(string.sub(range, 1, 1)) + end + return size +end + +-- Fetch all the PGs, accumulate the sizes +-- Assume there is only one lossless profile configured among all PGs on each port +local pg_keys = redis.call('KEYS', 'BUFFER_PG_TABLE:' .. port .. '*') +for i = 1, #pg_keys do + profile = string.sub(redis.call('HGET', pg_keys[i], 'profile'), 2, -2) + if lossless_profile_name ~= nil then + if profile == lossless_profile_name then + accumulative_size = accumulative_size + lossless_headroom_size * get_number_of_pgs(pg_keys[i]) + end + else + lossless_profile = redis.call('HGETALL', profile) + for j = 1, #lossless_profile, 2 do + if lossless_profile[j] == 'xoff' then + lossless_profile_name = profile + end + if lossless_profile[j] == 'size' then + lossless_headroom_size = tonumber(lossless_profile[j+1]) + accumulative_size = lossless_headroom_size * get_number_of_pgs(pg_keys[i]) + end + end + end +end + +if max_headroom_size > accumulative_size then + table.insert(ret, "result:true") +else + table.insert(ret, "result:false") +end + +table.insert(ret, "max headroom:" .. max_headroom_size) +table.insert(ret, "accumulative headroom:" .. accumulative_size) + +return ret diff --git a/cfgmgr/buffer_check_headroom_vs.lua b/cfgmgr/buffer_check_headroom_vs.lua new file mode 120000 index 00000000000..6f38db14b66 --- /dev/null +++ b/cfgmgr/buffer_check_headroom_vs.lua @@ -0,0 +1 @@ +buffer_check_headroom_mellanox.lua \ No newline at end of file diff --git a/cfgmgr/buffer_headroom_mellanox.lua b/cfgmgr/buffer_headroom_mellanox.lua new file mode 100644 index 00000000000..49655944bde --- /dev/null +++ b/cfgmgr/buffer_headroom_mellanox.lua @@ -0,0 +1,129 @@ +-- KEYS - profile name +-- ARGV[1] - port speed +-- ARGV[2] - cable length +-- ARGV[3] - port mtu +-- ARGV[4] - gearbox delay + +-- parameters retried from databases: +-- From CONFIG_DB.LOSSLESS_TRAFFIC_PATTERN +-- small packet percentage: the parameter which is used to control worst case regarding the cell utilization +-- mtu: the mtu of lossless packet +-- From STATE_DB.ASIC_TABLE: +-- cell size: cell_size of the ASIC +-- pipeline_latency: the latency +-- mac_phy_delay: +-- peer_response_time: + +local lossless_mtu +local small_packet_percentage +local cell_size +local pipeline_latency +local mac_phy_delay +local peer_response_time + +local port_speed = tonumber(ARGV[1]) +local cable_length = tonumber(string.sub(ARGV[2], 1, -2)) +local port_mtu = tonumber(ARGV[3]) +local gearbox_delay = tonumber(ARGV[4]) + +local appl_db = "0" +local config_db = "4" +local state_db = "6" + +local ret = {} + +if gearbox_delay == nil then + gearbox_delay = 0 +end + +-- Fetch ASIC info from ASIC table in STATE_DB +redis.call('SELECT', state_db) +local asic_keys = redis.call('KEYS', 'ASIC_TABLE*') + +-- Only one key should exist +local asic_table_content = redis.call('HGETALL', asic_keys[1]) +for i = 1, #asic_table_content, 2 do + if asic_table_content[i] == "cell_size" then + cell_size = tonumber(asic_table_content[i+1]) + end + if asic_table_content[i] == "pipeline_latency" then + pipeline_latency = tonumber(asic_table_content[i+1]) * 1024 + end + if asic_table_content[i] == "mac_phy_delay" then + mac_phy_delay = tonumber(asic_table_content[i+1]) * 1024 + end + if asic_table_content[i] == "peer_response_time" then + peer_response_time = tonumber(asic_table_content[i+1]) * 1024 + end +end + +-- Fetch lossless traffic info from CONFIG_DB +redis.call('SELECT', config_db) +local lossless_traffic_keys = redis.call('KEYS', 'LOSSLESS_TRAFFIC_PATTERN*') + +-- Only one key should exist +local lossless_traffic_table_content = redis.call('HGETALL', lossless_traffic_keys[1]) +for i = 1, #lossless_traffic_table_content, 2 do + if lossless_traffic_table_content[i] == "mtu" then + lossless_mtu = tonumber(lossless_traffic_table_content[i+1]) + end + if lossless_traffic_table_content[i] == "small_packet_percentage" then + small_packet_percentage = tonumber(lossless_traffic_table_content[i+1]) + end +end + +-- Fetch DEFAULT_LOSSLESS_BUFFER_PARAMETER from CONFIG_DB +local lossless_traffic_keys = redis.call('KEYS', 'DEFAULT_LOSSLESS_BUFFER_PARAMETER*') + +-- Calculate the headroom information +local speed_of_light = 198000000 +local minimal_packet_size = 64 +local cell_occupancy +local worst_case_factor +local propagation_delay +local bytes_on_cable +local bytes_on_gearbox +local xoff_value +local xon_value +local headroom_size +local speed_overhead + +-- Adjustment for 400G +if port_speed == 400000 then + pipeline_latency = 37 + speed_overhead = port_mtu +else + speed_overhead = 0 +end + +if cell_size > 2 * minimal_packet_size then + worst_case_factor = cell_size / minimal_packet_size +else + worst_case_factor = (2 * cell_size) / (1 + cell_size) +end + +cell_occupancy = (100 - small_packet_percentage + small_packet_percentage * worst_case_factor) / 100 + +if (gearbox_delay == 0) then + bytes_on_gearbox = 0 +else + bytes_on_gearbox = port_speed * gearbox_delay / (8 * 1024) +end + +bytes_on_cable = 2 * cable_length * port_speed * 1000000000 / speed_of_light / (8 * 1024) +propagation_delay = port_mtu + bytes_on_cable + 2 * bytes_on_gearbox + mac_phy_delay + peer_response_time + +-- Calculate the xoff and xon and then round up at 1024 bytes +xoff_value = lossless_mtu + propagation_delay * cell_occupancy +xoff_value = math.ceil(xoff_value / 1024) * 1024 +xon_value = pipeline_latency +xon_value = math.ceil(xon_value / 1024) * 1024 + +headroom_size = xoff_value + xon_value + speed_overhead +headroom_size = math.ceil(headroom_size / 1024) * 1024 + +table.insert(ret, "xon" .. ":" .. math.ceil(xon_value)) +table.insert(ret, "xoff" .. ":" .. math.ceil(xoff_value)) +table.insert(ret, "size" .. ":" .. math.ceil(headroom_size)) + +return ret diff --git a/cfgmgr/buffer_headroom_vs.lua b/cfgmgr/buffer_headroom_vs.lua new file mode 120000 index 00000000000..0b1d06ba8aa --- /dev/null +++ b/cfgmgr/buffer_headroom_vs.lua @@ -0,0 +1 @@ +buffer_headroom_mellanox.lua \ No newline at end of file diff --git a/cfgmgr/buffer_pool_mellanox.lua b/cfgmgr/buffer_pool_mellanox.lua new file mode 100644 index 00000000000..da88f186c0a --- /dev/null +++ b/cfgmgr/buffer_pool_mellanox.lua @@ -0,0 +1,193 @@ +-- KEYS - None +-- ARGV - None + +local appl_db = "0" +local config_db = "4" +local state_db = "6" + +local lossypg_reserved = 19 * 1024 +local lossypg_reserved_400g = 37 * 1024 +local lossypg_400g = 0 + +local result = {} +local profiles = {} + +local count_up_port = 0 + +local mgmt_pool_size = 256 * 1024 +local egress_mirror_headroom = 10 * 1024 + +local function find_profile(ref) + -- Remove the surrounding square bracket and the find in the list + local name = string.sub(ref, 2, -2) + for i = 1, #profiles, 1 do + if profiles[i][1] == name then + return i + end + end + return 0 +end + +local function iterate_all_items(all_items) + table.sort(all_items) + local prev_port = "None" + local port + local is_up + local fvpairs + local status + local admin_down_ports = 0 + for i = 1, #all_items, 1 do + -- Check whether the port on which pg or tc hosts is admin down + port = string.match(all_items[i], "Ethernet%d+") + if port ~= nil then + if prev_port ~= port then + status = redis.call('HGET', 'PORT_TABLE:'..port, 'admin_status') + prev_port = port + if status == "down" then + is_up = false + else + is_up = true + end + end + if is_up == true then + local range = string.match(all_items[i], "Ethernet%d+:([^%s]+)$") + local profile = redis.call('HGET', all_items[i], 'profile') + local index = find_profile(profile) + local size + if string.len(range) == 1 then + size = 1 + else + size = 1 + tonumber(string.sub(range, -1)) - tonumber(string.sub(range, 1, 1)) + end + profiles[index][2] = profiles[index][2] + size + local speed = redis.call('HGET', 'PORT_TABLE:'..port, 'speed') + if speed == '400000' and profile == '[BUFFER_PROFILE_TABLE:ingress_lossy_profile]' then + lossypg_400g = lossypg_400g + size + end + end + end + end +end + +-- Connect to CONFIG_DB +redis.call('SELECT', config_db) + +local ports_table = redis.call('KEYS', 'PORT|*') + +for i = 1, #ports_table do + local status = redis.call('HGET', ports_table[i], 'admin_status') + if status == "up" then + count_up_port = count_up_port + 1 + end +end + +local egress_lossless_pool_size = redis.call('HGET', 'BUFFER_POOL|egress_lossless_pool', 'size') + +-- Switch to APPL_DB +redis.call('SELECT', appl_db) + +-- Fetch names of all profiles and insert them into the look up table +local all_profiles = redis.call('KEYS', 'BUFFER_PROFILE*') +for i = 1, #all_profiles, 1 do + table.insert(profiles, {all_profiles[i], 0}) +end + +-- Fetch all the PGs +local all_pgs = redis.call('KEYS', 'BUFFER_PG*') +local all_tcs = redis.call('KEYS', 'BUFFER_QUEUE*') + +iterate_all_items(all_pgs) +iterate_all_items(all_tcs) + +local statistics = {} + +-- Fetch sizes of all of the profiles, accumulate them +local accumulative_occupied_buffer = 0 +for i = 1, #profiles, 1 do + if profiles[i][1] ~= "BUFFER_PROFILE_TABLE_KEY_SET" and profiles[i][1] ~= "BUFFER_PROFILE_TABLE_DEL_SET" then + local size = tonumber(redis.call('HGET', profiles[i][1], 'size')) + if size ~= nil then + if profiles[i][1] == "BUFFER_PROFILE_TABLE:ingress_lossy_profile" then + size = size + lossypg_reserved + end + if profiles[i][1] == "BUFFER_PROFILE_TABLE:egress_lossy_profile" then + profiles[i][2] = count_up_port + end + if size ~= 0 then + accumulative_occupied_buffer = accumulative_occupied_buffer + size * profiles[i][2] + end + table.insert(statistics, {profiles[i][1], size, profiles[i][2]}) + end + end +end + +-- Extra lossy xon buffer for 400G port +local lossypg_extra_for_400g = (lossypg_reserved_400g - lossypg_reserved) * lossypg_400g +accumulative_occupied_buffer = accumulative_occupied_buffer + lossypg_extra_for_400g + +-- Accumulate sizes for egress mirror and management pool +local accumulative_egress_mirror_overhead = count_up_port * egress_mirror_headroom +accumulative_occupied_buffer = accumulative_occupied_buffer + accumulative_egress_mirror_overhead + mgmt_pool_size + +-- Fetch mmu_size +redis.call('SELECT', state_db) +local mmu_size = tonumber(redis.call('HGET', 'BUFFER_MAX_PARAM_TABLE|global', 'mmu_size')) +if mmu_size == nil then + mmu_size = tonumber(egress_lossless_pool_size) +end +local asic_keys = redis.call('KEYS', 'ASIC_TABLE*') +local cell_size = tonumber(redis.call('HGET', asic_keys[1], 'cell_size')) + +-- Align mmu_size at cell size boundary, otherwith the sdk will complain and the syncd will faill +local number_of_cells = math.floor(mmu_size / cell_size) +local ceiling_mmu_size = number_of_cells * cell_size + +-- Switch to CONFIG_DB +redis.call('SELECT', config_db) + +-- Fetch all the pools that need update +local pools_need_update = {} +local ipools = redis.call('KEYS', 'BUFFER_POOL|ingress*') +local ingress_pool_count = 0 +for i = 1, #ipools, 1 do + local size = tonumber(redis.call('HGET', ipools[i], 'size')) + if not size then + table.insert(pools_need_update, ipools[i]) + ingress_pool_count = ingress_pool_count + 1 + end +end + +local epools = redis.call('KEYS', 'BUFFER_POOL|egress*') +for i = 1, #epools, 1 do + local size = redis.call('HGET', epools[i], 'size') + if not size then + table.insert(pools_need_update, epools[i]) + end +end + +local pool_size +if ingress_pool_count == 1 then + pool_size = mmu_size - accumulative_occupied_buffer +else + pool_size = (mmu_size - accumulative_occupied_buffer) / 2 +end + +if pool_size > ceiling_mmu_size then + pool_size = ceiling_mmu_size +end + +for i = 1, #pools_need_update, 1 do + local pool_name = string.match(pools_need_update[i], "BUFFER_POOL|([^%s]+)$") + table.insert(result, pool_name .. ":" .. math.ceil(pool_size)) +end + +table.insert(result, "debug:mmu_size:" .. mmu_size) +table.insert(result, "debug:accumulative:" .. accumulative_occupied_buffer) +for i = 1, #statistics do + table.insert(result, "debug:" .. statistics[i][1] .. ":" .. statistics[i][2] .. ":" .. statistics[i][3]) +end +table.insert(result, "debug:extra_400g:" .. (lossypg_reserved_400g - lossypg_reserved) .. ":" .. lossypg_400g) +table.insert(result, "debug:mgmt_pool:" .. mgmt_pool_size) +table.insert(result, "debug:egress_mirror:" .. accumulative_egress_mirror_overhead) + +return result diff --git a/cfgmgr/buffer_pool_vs.lua b/cfgmgr/buffer_pool_vs.lua new file mode 120000 index 00000000000..b5dc777781b --- /dev/null +++ b/cfgmgr/buffer_pool_vs.lua @@ -0,0 +1 @@ +buffer_pool_mellanox.lua \ No newline at end of file diff --git a/cfgmgr/buffermgr.cpp b/cfgmgr/buffermgr.cpp index fbce96e40e6..4f7a94186cd 100644 --- a/cfgmgr/buffermgr.cpp +++ b/cfgmgr/buffermgr.cpp @@ -6,20 +6,28 @@ #include "producerstatetable.h" #include "tokenize.h" #include "ipprefix.h" +#include "timer.h" #include "buffermgr.h" #include "exec.h" #include "shellcmd.h" +#include "warm_restart.h" using namespace std; using namespace swss; -BufferMgr::BufferMgr(DBConnector *cfgDb, DBConnector *stateDb, string pg_lookup_file, const vector &tableNames) : +BufferMgr::BufferMgr(DBConnector *cfgDb, DBConnector *applDb, string pg_lookup_file, const vector &tableNames) : Orch(cfgDb, tableNames), m_cfgPortTable(cfgDb, CFG_PORT_TABLE_NAME), m_cfgCableLenTable(cfgDb, CFG_PORT_CABLE_LEN_TABLE_NAME), m_cfgBufferProfileTable(cfgDb, CFG_BUFFER_PROFILE_TABLE_NAME), m_cfgBufferPgTable(cfgDb, CFG_BUFFER_PG_TABLE_NAME), - m_cfgLosslessPgPoolTable(cfgDb, CFG_BUFFER_POOL_TABLE_NAME) + m_cfgLosslessPgPoolTable(cfgDb, CFG_BUFFER_POOL_TABLE_NAME), + m_applBufferPoolTable(applDb, APP_BUFFER_POOL_TABLE_NAME), + m_applBufferProfileTable(applDb, APP_BUFFER_PROFILE_TABLE_NAME), + m_applBufferPgTable(applDb, APP_BUFFER_PG_TABLE_NAME), + m_applBufferQueueTable(applDb, APP_BUFFER_QUEUE_TABLE_NAME), + m_applBufferIngressProfileListTable(applDb, APP_BUFFER_PORT_INGRESS_PROFILE_LIST_NAME), + m_applBufferEgressProfileListTable(applDb, APP_BUFFER_PORT_EGRESS_PROFILE_LIST_NAME) { readPgProfileLookupFile(pg_lookup_file); } @@ -183,9 +191,8 @@ task_process_status BufferMgr::doSpeedUpdateTask(string port, string speed) "]"; /* Check if PG Mapping is already then log message and return. */ - m_cfgBufferPgTable.get(buffer_pg_key, fvVector); - + for (auto& prop : fvVector) { if ((fvField(prop) == "profile") && (profile_ref == fvValue(prop))) @@ -194,20 +201,153 @@ task_process_status BufferMgr::doSpeedUpdateTask(string port, string speed) return task_process_status::task_success; } } - + fvVector.clear(); - + fvVector.push_back(make_pair("profile", profile_ref)); m_cfgBufferPgTable.set(buffer_pg_key, fvVector); return task_process_status::task_success; } +void BufferMgr::transformSeperator(string &name) +{ + size_t pos; + while ((pos = name.find("|")) != string::npos) + name.replace(pos, 1, ":"); +} + +void BufferMgr::transformReference(string &name) +{ + auto references = tokenize(name, list_item_delimiter); + int ref_index = 0; + + name = ""; + + for (auto &reference : references) + { + if (ref_index != 0) + name += list_item_delimiter; + ref_index ++; + + auto keys = tokenize(reference, config_db_key_delimiter); + int key_index = 0; + for (auto &key : keys) + { + if (key_index == 0) + name += key + "_TABLE"; + else + name += delimiter + key; + key_index ++; + } + } +} + +/* + * This function copies the data from tables in CONFIG_DB to APPL_DB. + * With dynamically buffer calculation supported, the following tables + * will be moved to APPL_DB from CONFIG_DB because the CONFIG_DB contains + * confgured entries only while APPL_DB contains dynamically generated entries + * - BUFFER_POOL + * - BUFFER_PROFILE + * - BUFFER_PG + * The following tables have to be moved to APPL_DB because they reference + * some entries that have been moved to APPL_DB + * - BUFFER_QUEUE + * - BUFFER_PORT_INGRESS_PROFILE_LIST + * - BUFFER_PORT_EGRESS_PROFILE_LIST + * One thing we need to handle is to transform the separator from | to : + * The following items contain separator: + * - keys of each item + * - pool in BUFFER_PROFILE + * - profile in BUFFER_PG + * - profile_list in BUFFER_PORT_INGRESS_PROFILE_LIST and BUFFER_PORT_EGRESS_PROFILE_LIST + */ +void BufferMgr::doBufferTableTask(Consumer &consumer, ProducerStateTable &applTable) +{ + SWSS_LOG_ENTER(); + + auto it = consumer.m_toSync.begin(); + while (it != consumer.m_toSync.end()) + { + KeyOpFieldsValuesTuple t = it->second; + string key = kfvKey(t); + + //transform the separator in key from "|" to ":" + transformSeperator(key); + + string op = kfvOp(t); + if (op == SET_COMMAND) + { + vector fvVector; + + SWSS_LOG_INFO("Inserting entry %s from CONFIG_DB to APPL_DB", key.c_str()); + + for (auto i : kfvFieldsValues(t)) + { + SWSS_LOG_INFO("Inserting field %s value %s", fvField(i).c_str(), fvValue(i).c_str()); + //transform the separator in values from "|" to ":" + if (fvField(i) == "pool") + transformReference(fvValue(i)); + if (fvField(i) == "profile") + transformReference(fvValue(i)); + if (fvField(i) == "profile_list") + transformReference(fvValue(i)); + fvVector.emplace_back(FieldValueTuple(fvField(i), fvValue(i))); + SWSS_LOG_INFO("Inserting field %s value %s", fvField(i).c_str(), fvValue(i).c_str()); + } + applTable.set(key, fvVector); + } + else if (op == DEL_COMMAND) + { + SWSS_LOG_INFO("Removing entry %s from APPL_DB", key.c_str()); + applTable.del(key); + } + it = consumer.m_toSync.erase(it); + } +} + void BufferMgr::doTask(Consumer &consumer) { SWSS_LOG_ENTER(); string table_name = consumer.getTableName(); + if (table_name == CFG_BUFFER_POOL_TABLE_NAME) + { + doBufferTableTask(consumer, m_applBufferPoolTable); + return; + } + + if (table_name == CFG_BUFFER_PROFILE_TABLE_NAME) + { + doBufferTableTask(consumer, m_applBufferProfileTable); + return; + } + + if (table_name == CFG_BUFFER_PG_TABLE_NAME) + { + doBufferTableTask(consumer, m_applBufferPgTable); + return; + } + + if (table_name == CFG_BUFFER_QUEUE_TABLE_NAME) + { + doBufferTableTask(consumer, m_applBufferQueueTable); + return; + } + + if (table_name == CFG_BUFFER_PORT_INGRESS_PROFILE_LIST_NAME) + { + doBufferTableTask(consumer, m_applBufferIngressProfileListTable); + return; + } + + if (table_name == CFG_BUFFER_PORT_EGRESS_PROFILE_LIST_NAME) + { + doBufferTableTask(consumer, m_applBufferEgressProfileListTable); + return; + } + auto it = consumer.m_toSync.begin(); while (it != consumer.m_toSync.end()) { diff --git a/cfgmgr/buffermgr.h b/cfgmgr/buffermgr.h index a8ce998546e..5ad88f8901b 100644 --- a/cfgmgr/buffermgr.h +++ b/cfgmgr/buffermgr.h @@ -13,6 +13,8 @@ namespace swss { #define INGRESS_LOSSLESS_PG_POOL_NAME "ingress_lossless_pool" #define LOSSLESS_PGS "3-4" +#define BUFFERMGR_TIMER_PERIOD 10 + typedef struct{ std::string size; std::string xon; @@ -29,7 +31,7 @@ typedef std::map port_cable_length_t; class BufferMgr : public Orch { public: - BufferMgr(DBConnector *cfgDb, DBConnector *stateDb, std::string pg_lookup_file, const std::vector &tableNames); + BufferMgr(DBConnector *cfgDb, DBConnector *applDb, std::string pg_lookup_file, const std::vector &tableNames); using Orch::doTask; private: @@ -38,6 +40,14 @@ class BufferMgr : public Orch Table m_cfgBufferProfileTable; Table m_cfgBufferPgTable; Table m_cfgLosslessPgPoolTable; + + ProducerStateTable m_applBufferProfileTable; + ProducerStateTable m_applBufferPgTable; + ProducerStateTable m_applBufferPoolTable; + ProducerStateTable m_applBufferQueueTable; + ProducerStateTable m_applBufferIngressProfileListTable; + ProducerStateTable m_applBufferEgressProfileListTable; + bool m_pgfile_processed; pg_profile_lookup_t m_pgProfileLookup; @@ -46,6 +56,10 @@ class BufferMgr : public Orch void readPgProfileLookupFile(std::string); task_process_status doCableTask(std::string port, std::string cable_length); task_process_status doSpeedUpdateTask(std::string port, std::string speed); + void doBufferTableTask(Consumer &consumer, ProducerStateTable &applTable); + + void transformSeperator(std::string &name); + void transformReference(std::string &name); void doTask(Consumer &consumer); }; diff --git a/cfgmgr/buffermgrd.cpp b/cfgmgr/buffermgrd.cpp index 9bce96f19c4..eb5122fc6db 100644 --- a/cfgmgr/buffermgrd.cpp +++ b/cfgmgr/buffermgrd.cpp @@ -7,13 +7,17 @@ #include "exec.h" #include "schema.h" #include "buffermgr.h" +#include "buffermgrdyn.h" #include #include +#include "json.h" +#include "json.hpp" using namespace std; using namespace swss; +using json = nlohmann::json; -/* select() function timeout retry time, in millisecond */ +/* SELECT() function timeout retry time, in millisecond */ #define SELECT_TIMEOUT 1000 /* @@ -34,22 +38,84 @@ mutex gDbMutex; void usage() { - cout << "Usage: buffermgrd -l pg_lookup.ini" << endl; - cout << " -l pg_lookup.ini: PG profile look up table file (mandatory)" << endl; - cout << " format: csv" << endl; - cout << " values: 'speed, cable, size, xon, xoff, dynamic_threshold, xon_offset'" << endl; + cout << "Usage: buffermgrd <-l pg_lookup.ini|-a asic_table.json [-p peripheral_table.json]>" << endl; + cout << " -l pg_lookup.ini: PG profile look up table file (mandatory for static mode)" << endl; + cout << " format: csv" << endl; + cout << " values: 'speed, cable, size, xon, xoff, dynamic_threshold, xon_offset'" << endl; + cout << " -a asic_table.json: ASIC-specific parameters definition (mandatory for dynamic mode)" << endl; + cout << " -p peripheral_table.json: Peripheral (eg. gearbox) parameters definition (mandatory for dynamic mode)" << endl; +} + +void dump_db_item(KeyOpFieldsValuesTuple &db_item) +{ + SWSS_LOG_DEBUG("db_item: ["); + SWSS_LOG_DEBUG("\toperation: %s", kfvOp(db_item).c_str()); + SWSS_LOG_DEBUG("\thash: %s", kfvKey(db_item).c_str()); + SWSS_LOG_DEBUG("\tfields: ["); + for (auto fv: kfvFieldsValues(db_item)) + SWSS_LOG_DEBUG("\t\tfield: %s value: %s", fvField(fv).c_str(), fvValue(fv).c_str()); + SWSS_LOG_DEBUG("\t]"); + SWSS_LOG_DEBUG("]"); +} + +void write_to_state_db(shared_ptr> db_items_ptr) +{ + DBConnector db("STATE_DB", 0, true); + auto &db_items = *db_items_ptr; + for (auto &db_item : db_items) + { + dump_db_item(db_item); + + string key = kfvKey(db_item); + size_t pos = key.find(":"); + if ((string::npos == pos) || ((key.size() - 1) == pos)) + { + SWSS_LOG_ERROR("Invalid formatted hash:%s\n", key.c_str()); + return; + } + string table_name = key.substr(0, pos); + string key_name = key.substr(pos + 1); + Table stateTable(&db, table_name); + + stateTable.set(key_name, kfvFieldsValues(db_item), SET_COMMAND); + } +} + +shared_ptr> load_json(string file) +{ + try + { + ifstream json(file); + auto db_items_ptr = make_shared>(); + + if (!JSon::loadJsonFromFile(json, *db_items_ptr)) + { + db_items_ptr.reset(); + return nullptr; + } + + return db_items_ptr; + } + catch (...) + { + SWSS_LOG_WARN("Loading file %s failed", file.c_str()); + return nullptr; + } } int main(int argc, char **argv) { int opt; string pg_lookup_file = ""; + string asic_table_file = ""; + string peripherial_table_file = ""; + string json_file = ""; Logger::linkToDbNative("buffermgrd"); SWSS_LOG_ENTER(); SWSS_LOG_NOTICE("--- Starting buffermgrd ---"); - while ((opt = getopt(argc, argv, "l:h")) != -1 ) + while ((opt = getopt(argc, argv, "l:h:a:p:j")) != -1 ) { switch (opt) { @@ -59,32 +125,86 @@ int main(int argc, char **argv) case 'h': usage(); return 1; + case 'a': + asic_table_file = optarg; + break; + case 'p': + peripherial_table_file = optarg; + break; default: /* '?' */ usage(); return EXIT_FAILURE; } } - if (pg_lookup_file.empty()) - { - usage(); - return EXIT_FAILURE; - } - try { - vector cfg_buffer_tables = { - CFG_PORT_TABLE_NAME, - CFG_PORT_CABLE_LEN_TABLE_NAME, - }; + std::vector cfgOrchList; + bool dynamicMode = false; + shared_ptr> db_items_ptr; DBConnector cfgDb("CONFIG_DB", 0); DBConnector stateDb("STATE_DB", 0); + DBConnector applDb("APPL_DB", 0); - BufferMgr buffmgr(&cfgDb, &stateDb, pg_lookup_file, cfg_buffer_tables); + if (!asic_table_file.empty()) + { + // Load the json file containing the SWITCH_TABLE + db_items_ptr = load_json(asic_table_file); + if (nullptr != db_items_ptr) + { + write_to_state_db(db_items_ptr); + db_items_ptr.reset(); + + if (!peripherial_table_file.empty()) + { + //Load the json file containing the PERIPHERIAL_TABLE + db_items_ptr = load_json(peripherial_table_file); + if (nullptr != db_items_ptr) + write_to_state_db(db_items_ptr); + } + + dynamicMode = true; + } + } + + if (dynamicMode) + { + vector buffer_table_connectors = { + TableConnector(&cfgDb, CFG_PORT_TABLE_NAME), + TableConnector(&cfgDb, CFG_PORT_CABLE_LEN_TABLE_NAME), + TableConnector(&cfgDb, CFG_BUFFER_POOL_TABLE_NAME), + TableConnector(&cfgDb, CFG_BUFFER_PROFILE_TABLE_NAME), + TableConnector(&cfgDb, CFG_BUFFER_PG_TABLE_NAME), + TableConnector(&cfgDb, CFG_BUFFER_QUEUE_TABLE_NAME), + TableConnector(&cfgDb, CFG_BUFFER_PORT_INGRESS_PROFILE_LIST_NAME), + TableConnector(&cfgDb, CFG_BUFFER_PORT_EGRESS_PROFILE_LIST_NAME), + TableConnector(&cfgDb, CFG_DEFAULT_LOSSLESS_BUFFER_PARAMETER), + TableConnector(&stateDb, STATE_BUFFER_MAXIMUM_VALUE_TABLE) + }; + cfgOrchList.emplace_back(new BufferMgrDynamic(&cfgDb, &stateDb, &applDb, buffer_table_connectors, db_items_ptr)); + } + else if (!pg_lookup_file.empty()) + { + vector cfg_buffer_tables = { + CFG_PORT_TABLE_NAME, + CFG_PORT_CABLE_LEN_TABLE_NAME, + CFG_BUFFER_POOL_TABLE_NAME, + CFG_BUFFER_PROFILE_TABLE_NAME, + CFG_BUFFER_PG_TABLE_NAME, + CFG_BUFFER_QUEUE_TABLE_NAME, + CFG_BUFFER_PORT_INGRESS_PROFILE_LIST_NAME, + CFG_BUFFER_PORT_EGRESS_PROFILE_LIST_NAME + }; + cfgOrchList.emplace_back(new BufferMgr(&cfgDb, &applDb, pg_lookup_file, cfg_buffer_tables)); + } + else + { + usage(); + return EXIT_FAILURE; + } - // TODO: add tables in stateDB which interface depends on to monitor list - std::vector cfgOrchList = {&buffmgr}; + auto buffmgr = cfgOrchList[0]; swss::Select s; for (Orch *o : cfgOrchList) @@ -106,7 +226,7 @@ int main(int argc, char **argv) } if (ret == Select::TIMEOUT) { - buffmgr.doTask(); + buffmgr->doTask(); continue; } diff --git a/cfgmgr/buffermgrdyn.cpp b/cfgmgr/buffermgrdyn.cpp new file mode 100644 index 00000000000..65188afc96b --- /dev/null +++ b/cfgmgr/buffermgrdyn.cpp @@ -0,0 +1,1602 @@ +#include +#include +#include +#include "logger.h" +#include "dbconnector.h" +#include "producerstatetable.h" +#include "tokenize.h" +#include "ipprefix.h" +#include "timer.h" +#include "buffermgrdyn.h" +#include "bufferorch.h" +#include "exec.h" +#include "shellcmd.h" +#include "schema.h" +#include "warm_restart.h" + +/* + * Some Tips + * 1. All keys in this file are in format of APPL_DB key. + * Key population: + * On receiving item update from CONFIG_DB: key has been transformed into the format of APPL_DB + * In intermal maps: table name removed from the index + * 2. Maintain maps for pools, profiles and PGs in CONFIG_DB and APPL_DB + * 3. Keys of maps in this file don't contain the TABLE_NAME + * 3. + */ +using namespace std; +using namespace swss; + +BufferMgrDynamic::BufferMgrDynamic(DBConnector *cfgDb, DBConnector *stateDb, DBConnector *applDb, const vector &tables, shared_ptr> gearboxInfo = nullptr) : + Orch(tables), + m_applDb(applDb), + m_cfgPortTable(cfgDb, CFG_PORT_TABLE_NAME), + m_cfgCableLenTable(cfgDb, CFG_PORT_CABLE_LEN_TABLE_NAME), + m_cfgBufferProfileTable(cfgDb, CFG_BUFFER_PROFILE_TABLE_NAME), + m_cfgBufferPgTable(cfgDb, CFG_BUFFER_PG_TABLE_NAME), + m_cfgLosslessPgPoolTable(cfgDb, CFG_BUFFER_POOL_TABLE_NAME), + m_cfgDefaultLosslessBufferParam(cfgDb, CFG_DEFAULT_LOSSLESS_BUFFER_PARAMETER), + m_applBufferPoolTable(applDb, APP_BUFFER_POOL_TABLE_NAME), + m_applBufferProfileTable(applDb, APP_BUFFER_PROFILE_TABLE_NAME), + m_applBufferPgTable(applDb, APP_BUFFER_PG_TABLE_NAME), + m_applBufferQueueTable(applDb, APP_BUFFER_QUEUE_TABLE_NAME), + m_applBufferIngressProfileListTable(applDb, APP_BUFFER_PORT_INGRESS_PROFILE_LIST_NAME), + m_applBufferEgressProfileListTable(applDb, APP_BUFFER_PORT_EGRESS_PROFILE_LIST_NAME), + m_stateBufferMaximumTable(stateDb, STATE_BUFFER_MAXIMUM_VALUE_TABLE), + m_stateBufferPoolTable(stateDb, STATE_BUFFER_POOL_TABLE_NAME), + m_stateBufferProfileTable(stateDb, STATE_BUFFER_PROFILE_TABLE_NAME), + m_applPortTable(applDb, APP_PORT_TABLE_NAME), + m_portInitDone(false), + m_firstTimeCalculateBufferPool(true) +{ + SWSS_LOG_ENTER(); + + // Initialize the handler map + initTableHandlerMap(); + parseGearboxInfo(gearboxInfo); + + string platform = getenv("ASIC_VENDOR") ? getenv("ASIC_VENDOR") : ""; + if (platform == "") + { + SWSS_LOG_ERROR("Platform environment variable is not defined"); + } + + string headroomSha, bufferpoolSha; + string headroomPluginName = "buffer_headroom_" + platform + ".lua"; + string bufferpoolPluginName = "buffer_pool_" + platform + ".lua"; + string checkHeadroomPluginName = "buffer_check_headroom_" + platform + ".lua"; + + try + { + string headroomLuaScript = swss::loadLuaScript(headroomPluginName); + m_headroomSha = swss::loadRedisScript(applDb, headroomLuaScript); + + string bufferpoolLuaScript = swss::loadLuaScript(bufferpoolPluginName); + m_bufferpoolSha = swss::loadRedisScript(applDb, bufferpoolLuaScript); + + string checkHeadroomLuaScript = swss::loadLuaScript(checkHeadroomPluginName); + m_checkHeadroomSha = swss::loadRedisScript(applDb, checkHeadroomLuaScript); + } + catch (...) + { + SWSS_LOG_WARN("Lua scripts for buffer calculation were not loaded successfully"); + } + + // Init timer + auto interv = timespec { .tv_sec = BUFFERMGR_TIMER_PERIOD, .tv_nsec = 0 }; + m_buffermgrPeriodtimer = new SelectableTimer(interv); + auto executor = new ExecutableTimer(m_buffermgrPeriodtimer, this, "PORT_INIT_DONE_POLL_TIMER"); + Orch::addExecutor(executor); + m_buffermgrPeriodtimer->start(); + + // Try fetch mmu size from STATE_DB + // - warm-reboot, the mmuSize should be in the STATE_DB, + // which is done by not removing it from STATE_DB before warm reboot + // - warm-reboot for the first time or cold-reboot, the mmuSize is + // fetched from SAI and then pushed into STATE_DB by orchagent + // This is to accelerate the process of inserting all the buffer pools + // into APPL_DB when the system starts + // In case that the mmuSize isn't available yet at time buffermgrd starts, + // the buffer_pool_.lua should try to fetch that from BUFFER_POOL + m_stateBufferMaximumTable.hget("global", "mmu_size", m_mmuSize); + + // Try fetch default dynamic_th from CONFIG_DB + vector keys; + m_cfgDefaultLosslessBufferParam.getKeys(keys); + if (!keys.empty()) + { + m_cfgDefaultLosslessBufferParam.hget(keys[0], "default_dynamic_th", m_defaultThreshold); + } +} + +void BufferMgrDynamic::parseGearboxInfo(shared_ptr> gearboxInfo) +{ + if (nullptr == gearboxInfo) + { + m_supportGearbox = false; + } + else + { + string gearboxModel; + for (auto &kfv : *gearboxInfo) + { + auto table = parseObjectNameFromKey(kfvKey(kfv), 0); + auto key = parseObjectNameFromKey(kfvKey(kfv), 1); + + if (table == STATE_PERIPHERAL_TABLE) + { + for (auto &fv: kfvFieldsValues(kfv)) + { + auto &field = fvField(fv); + auto &value = fvValue(fv); + SWSS_LOG_DEBUG("Processing table %s field:%s, value:%s", table.c_str(), field.c_str(), value.c_str()); + if (field == "gearbox_delay") + m_gearboxDelay[key] = value; + } + } + + if (table == STATE_PORT_PERIPHERAL_TABLE) + { + if (key != "global") + { + SWSS_LOG_ERROR("Port peripheral table: only global gearbox model is supported but got %s", key.c_str()); + continue; + } + + for (auto &fv: kfvFieldsValues(kfv)) + { + auto &field = fvField(fv); + auto &value = fvValue(fv); + SWSS_LOG_DEBUG("Processing table %s field:%s, value:%s", table.c_str(), field.c_str(), value.c_str()); + if (fvField(fv) == "gearbox_model") + gearboxModel = fvValue(fv); + } + } + } + + m_identifyGearboxDelay = m_gearboxDelay[gearboxModel]; + m_supportGearbox = false; + } +} + +void BufferMgrDynamic::initTableHandlerMap() +{ + m_bufferTableHandlerMap.insert(buffer_handler_pair(STATE_BUFFER_MAXIMUM_VALUE_TABLE, &BufferMgrDynamic::handleBufferMaxParam)); + m_bufferTableHandlerMap.insert(buffer_handler_pair(CFG_DEFAULT_LOSSLESS_BUFFER_PARAMETER, &BufferMgrDynamic::handleDefaultLossLessBufferParam)); + m_bufferTableHandlerMap.insert(buffer_handler_pair(CFG_BUFFER_POOL_TABLE_NAME, &BufferMgrDynamic::handleBufferPoolTable)); + m_bufferTableHandlerMap.insert(buffer_handler_pair(CFG_BUFFER_PROFILE_TABLE_NAME, &BufferMgrDynamic::handleBufferProfileTable)); + m_bufferTableHandlerMap.insert(buffer_handler_pair(CFG_BUFFER_QUEUE_TABLE_NAME, &BufferMgrDynamic::handleBufferQueueTable)); + m_bufferTableHandlerMap.insert(buffer_handler_pair(CFG_BUFFER_PG_TABLE_NAME, &BufferMgrDynamic::handleBufferPgTable)); + m_bufferTableHandlerMap.insert(buffer_handler_pair(CFG_BUFFER_PORT_INGRESS_PROFILE_LIST_NAME, &BufferMgrDynamic::handleBufferPortIngressProfileListTable)); + m_bufferTableHandlerMap.insert(buffer_handler_pair(CFG_BUFFER_PORT_EGRESS_PROFILE_LIST_NAME, &BufferMgrDynamic::handleBufferPortEgressProfileListTable)); + m_bufferTableHandlerMap.insert(buffer_handler_pair(CFG_PORT_TABLE_NAME, &BufferMgrDynamic::handlePortTable)); + m_bufferTableHandlerMap.insert(buffer_handler_pair(CFG_PORT_CABLE_LEN_TABLE_NAME, &BufferMgrDynamic::handleCableLenTable)); +} + +// APIs to handle variant kinds of keys + +// Transform key from CONFIG_DB format to APPL_DB format +void BufferMgrDynamic::transformSeperator(string &name) +{ + size_t pos; + while ((pos = name.find("|")) != string::npos) + name.replace(pos, 1, ":"); +} + +void BufferMgrDynamic::transformReference(string &name) +{ + auto references = tokenize(name, list_item_delimiter); + int ref_index = 0; + + name = ""; + + for (auto &reference : references) + { + if (ref_index != 0) + name += list_item_delimiter; + ref_index ++; + + auto keys = tokenize(reference, config_db_key_delimiter); + int key_index = 0; + for (auto &key : keys) + { + if (key_index == 0) + name += key + "_TABLE"; + else + name += delimiter + key; + key_index ++; + } + } +} + +// For string "TABLE_NAME|objectname", returns "objectname" +string BufferMgrDynamic::parseObjectNameFromKey(const string &key, size_t pos = 0) +{ + auto keys = tokenize(key, delimiter); + if (pos >= keys.size()) + { + SWSS_LOG_ERROR("Failed to fetch %lu-th sector of key %s", pos, key.c_str()); + } + return keys[pos]; +} + +// For string "[foo]", returns "foo" +string BufferMgrDynamic::parseObjectNameFromReference(const string &reference) +{ + auto objName = reference.substr(1, reference.size() - 2); + return parseObjectNameFromKey(objName, 1); +} + +string BufferMgrDynamic::getDynamicProfileName(const string &speed, const string &cable, const string &mtu, const string &threshold, const string &gearbox_model) +{ + string buffer_profile_key; + + if (mtu == DEFAULT_MTU_STR) + { + buffer_profile_key = "pg_lossless_" + speed + "_" + cable; + } + else + { + buffer_profile_key = "pg_lossless_" + speed + "_" + cable + "_mtu" + mtu; + } + + if (threshold != m_defaultThreshold) + { + buffer_profile_key = buffer_profile_key + "_th" + threshold; + } + + if (!gearbox_model.empty()) + { + buffer_profile_key = buffer_profile_key + "_" + gearbox_model; + } + + return buffer_profile_key + "_profile"; +} + +string BufferMgrDynamic::getPgPoolMode() +{ + return m_bufferPoolLookup[INGRESS_LOSSLESS_PG_POOL_NAME].mode; +} + +// Meta flows which are called by main flows +void BufferMgrDynamic::calculateHeadroomSize(const string &speed, const string &cable, const string &port_mtu, const string &gearbox_model, buffer_profile_t &headroom) +{ + // Call vendor-specific lua plugin to calculate the xon, xoff, xon_offset, size and threshold + vector keys = {}; + vector argv = {}; + + keys.emplace_back(headroom.name); + argv.emplace_back(speed); + argv.emplace_back(cable); + argv.emplace_back(port_mtu); + argv.emplace_back(m_identifyGearboxDelay); + + try + { + auto ret = swss::runRedisScript(*m_applDb, m_headroomSha, keys, argv); + + // The format of the result: + // a list of strings containing key, value pairs with colon as separator + // each is a field of the profile + // "xon:18432" + // "xoff:18432" + // "size:36864" + + for ( auto i : ret) + { + auto pairs = tokenize(i, ':'); + if (pairs[0] == "xon") + headroom.xon = pairs[1]; + if (pairs[0] == "xoff") + headroom.xoff = pairs[1]; + if (pairs[0] == "size") + headroom.size = pairs[1]; + if (pairs[0] == "xon_offset") + headroom.xon_offset = pairs[1]; + } + } + catch (...) + { + SWSS_LOG_WARN("Lua scripts for headroom calculation were not executed successfully"); + } +} + +void BufferMgrDynamic::recalculateSharedBufferPool() +{ + try + { + vector keys = {}; + vector argv = {}; + + auto ret = runRedisScript(*m_applDb, m_bufferpoolSha, keys, argv); + + // The format of the result: + // a list of strings containing key, value pairs with colon as separator + // each is the size of a buffer pool + + for ( auto i : ret) + { + auto pairs = tokenize(i, ':'); + auto poolName = pairs[0]; + + if ("debug" != poolName) + { + auto &pool = m_bufferPoolLookup[pairs[0]]; + + if (pool.total_size == pairs[1]) + continue; + + pool.total_size = pairs[1]; + + if (pool.initialized) + { + updateBufferPoolToDb(poolName, pool); + } + else + { + updateBufferPoolToDb(poolName, pool); + pool.initialized = true; + } + + SWSS_LOG_NOTICE("Buffer pool %s had been updated with new size [%s]", poolName.c_str(), pool.total_size.c_str()); + } + else + { + SWSS_LOG_INFO("Buffer pool debug info %s", i.c_str()); + } + } + } + catch (...) + { + SWSS_LOG_WARN("Lua scripts for buffer calculation were not executed successfully"); + } +} + +void BufferMgrDynamic::checkSharedBufferPoolSize() +{ + // PortInitDone indicates all steps of port initialization has been done + // Only after that does the buffer pool size update starts + if (!m_portInitDone) + { + vector values; + if (m_applPortTable.get("PortInitDone", values)) + { + SWSS_LOG_NOTICE("Buffer pools start to be updated"); + m_portInitDone = true; + } + else + { + if (m_firstTimeCalculateBufferPool) + { + // It's something like a placeholder especially for warm reboot flow + // without all buffer pools created, buffer profiles are unable to be cureated, + // which in turn causes buffer pgs and buffer queues unable to be created, + // which prevents the port from being ready and eventually fails the warm reboot + // After the buffer pools are created for the first time, we won't touch it + // until portInitDone + // Eventually, the correct values will pushed to APPL_DB and then ASIC_DB + recalculateSharedBufferPool(); + m_firstTimeCalculateBufferPool = false; + SWSS_LOG_NOTICE("Buffer pool update defered because port is still under initialization, start polling timer"); + } + + return; + } + } + + if (!m_mmuSize.empty()) + recalculateSharedBufferPool(); +} + +// For buffer pool, only size can be updated on-the-fly +void BufferMgrDynamic::updateBufferPoolToDb(const string &name, const buffer_pool_t &pool) +{ + vector fvVector; + + if (pool.ingress) + fvVector.emplace_back(make_pair("type", "ingress")); + else + fvVector.emplace_back(make_pair("type", "egress")); + + fvVector.emplace_back(make_pair("mode", pool.mode)); + + SWSS_LOG_INFO("Buffer pool %s is initialized", name.c_str()); + + fvVector.emplace_back(make_pair("size", pool.total_size)); + + m_applBufferPoolTable.set(name, fvVector); + + m_stateBufferPoolTable.set(name, fvVector); +} + +void BufferMgrDynamic::updateBufferProfileToDb(const string &name, const buffer_profile_t &profile) +{ + vector fvVector; + string mode = getPgPoolMode(); + + if (mode.empty()) + { + // this should never happen if switch initialized properly + SWSS_LOG_ERROR("PG lossless pool is not yet created, creating profile %s failed", name.c_str()); + return; + } + + // profile threshold field name + mode += "_th"; + string pg_pool_reference = string(APP_BUFFER_POOL_TABLE_NAME) + + m_applBufferProfileTable.getTableNameSeparator() + + INGRESS_LOSSLESS_PG_POOL_NAME; + + fvVector.emplace_back(make_pair("xon", profile.xon)); + if (!profile.xon_offset.empty()) { + fvVector.emplace_back(make_pair("xon_offset", profile.xon_offset)); + } + fvVector.emplace_back(make_pair("xoff", profile.xoff)); + fvVector.emplace_back(make_pair("size", profile.size)); + fvVector.emplace_back(make_pair("pool", "[" + pg_pool_reference + "]")); + fvVector.emplace_back(make_pair(mode, profile.threshold)); + + m_applBufferProfileTable.set(name, fvVector); + m_stateBufferProfileTable.set(name, fvVector); +} + +// Database operation +// Set/remove BUFFER_PG table entry +void BufferMgrDynamic::updateBufferPgToDb(const string &key, const string &profile, bool add) +{ + if (add) + { + vector fvVector; + + fvVector.clear(); + + string profile_ref = string("[") + + APP_BUFFER_PROFILE_TABLE_NAME + + m_applBufferPgTable.getTableNameSeparator() + + profile + + "]"; + + fvVector.clear(); + + fvVector.push_back(make_pair("profile", profile_ref)); + m_applBufferPgTable.set(key, fvVector); + } + else + { + m_applBufferPgTable.del(key); + } +} + +// We have to check the headroom ahead of applying them +task_process_status BufferMgrDynamic::allocateProfile(const string &speed, const string &cable, const string &mtu, const string &threshold, const string &gearbox_model, string &profile_name) +{ + // Create record in BUFFER_PROFILE table + + SWSS_LOG_INFO("Allocating new BUFFER_PROFILE %s", profile_name.c_str()); + + // check if profile already exists - if yes - skip creation + auto profileRef = m_bufferProfileLookup.find(profile_name); + if (profileRef == m_bufferProfileLookup.end()) + { + auto &profile = m_bufferProfileLookup[profile_name]; + SWSS_LOG_NOTICE("Creating new profile '%s'", profile_name.c_str()); + + string mode = getPgPoolMode(); + if (mode.empty()) + { + SWSS_LOG_NOTICE("BUFFER_PROFILE %s cannot be created because the buffer pool isn't ready", profile_name.c_str()); + return task_process_status::task_need_retry; + } + + // Call vendor-specific lua plugin to calculate the xon, xoff, xon_offset, size + // Pay attention, the threshold can contain valid value + calculateHeadroomSize(speed, cable, mtu, gearbox_model, profile); + + profile.threshold = threshold; + profile.dynamic_calculated = true; + profile.static_configured = false; + profile.name = profile_name; + profile.state = PROFILE_NORMAL; + + updateBufferProfileToDb(profile_name, profile); + + SWSS_LOG_NOTICE("BUFFER_PROFILE %s has been created successfully", profile_name.c_str()); + SWSS_LOG_DEBUG("New profile created %s according to (%s %s %s): xon %s xoff %s size %s", + profile_name.c_str(), + speed.c_str(), cable.c_str(), gearbox_model.c_str(), + profile.xon.c_str(), profile.xoff.c_str(), profile.size.c_str()); + } + else + { + SWSS_LOG_NOTICE("Reusing existing profile '%s'", profile_name.c_str()); + } + + return task_process_status::task_success; +} + +void BufferMgrDynamic::releaseProfile(const string &profile_name) +{ + // Crete record in BUFFER_PROFILE table + // key format is pg_lossless___profile + vector fvVector; + auto &profile = m_bufferProfileLookup[profile_name]; + + if (profile.static_configured) + { + // Check whether the profile is statically configured. + // This means: + // 1. It's a statically configured profile, headroom override + // 2. It's dynamically calculated headroom with static threshold (alpha) + // In this case we won't remove the entry from the local cache even if it's dynamically calculated + // because the speed, cable length and cable model are fixed the headroom info will always be valid once calculated. + SWSS_LOG_NOTICE("Unable to release profile %s because it's a static configured profile", profile_name.c_str()); + return; + } + + // Check whether it's referenced anymore by other PGs. + if (!profile.port_pgs.empty()) + { + for (auto &pg : profile.port_pgs) + { + SWSS_LOG_INFO("Unable to release profile %s because it's still referenced by %s (and others)", + profile_name.c_str(), pg.c_str()); + return; + } + } + + profile.port_pgs.clear(); + + m_applBufferProfileTable.del(profile_name); + + m_stateBufferProfileTable.del(profile_name); + + m_bufferProfileLookup.erase(profile_name); + + SWSS_LOG_NOTICE("BUFFER_PROFILE %s has been released successfully", profile_name.c_str()); +} + +bool BufferMgrDynamic::isHeadroomResourceValid(const string &port, buffer_profile_t &profile_info, string lossy_pg_changed) +{ + //port: used to check whether its a split port + //pg_changed: which pg's profile has been changed? + //profile_info: the new profile + + // 1. Get all the pgs associated to the port + auto portPgs = m_portPgLookup[port]; + + // 2. Accumulate all the headroom sizes allocated for each pg + // Iterate all the pgs, if + // for each pg in m_portPgLookup[port] do + // if pg is lossless or pg == lossy_pg_changed, take profile_info + // else take m_bufferProfileLookup[pg.profile_name] + + vector keys = {port}; + vector argv = {}; + + try + { + auto ret = runRedisScript(*m_applDb, m_checkHeadroomSha, keys, argv); + + // The format of the result: + // a list of strings containing key, value pairs with colon as separator + // each is the size of a buffer pool + + for ( auto i : ret) + { + auto pairs = tokenize(i, ':'); + if ("result" != pairs[0]) + continue; + + if ("true" != pairs[1]) + { + SWSS_LOG_ERROR("Unable to update profile for port %s. Accumulative headroom size exceeds limit", port.c_str()); + return false; + } + } + } + catch (...) + { + SWSS_LOG_WARN("Lua scripts for buffer calculation were not executed successfully"); + } + + return true; +} + +//Called when speed/cable length updated from CONFIG_DB +// Update buffer profile of a certern PG of a port or all PGs of the port according to its speed, cable_length and mtu +// Called when +// - port's speed, cable_length or mtu updated +// - one buffer pg of port's is set to dynamic calculation +// Args +// port - port name +// speed, cable_length, mtu - port info +// exactly_matched_key - representing a port,pg. when provided, only profile of this key is updated +// Flow +// 1. For each PGs in the port +// a. skip non-dynamically-calculated or non-exactly-matched PGs +// b. allocate/reuse profile according to speed/cable length/mtu +// c. check accumulative headroom size, fail if exceeding per-port limit +// d. profile reference: remove reference to old profile and add reference to new profile +// e. put the old profile to to-be-released profile set +// f. update BUFFER_PG database +// 2. Update port's info: speed, cable length and mtu +// 3. If any of the PGs is updated, recalculate pool size +// 4. try release each profile in to-be-released profile set +// Port state migrate: +// PORT_DYNAMIC_HEADROOM if any +task_process_status BufferMgrDynamic::refreshPriorityGroupsForPort(const string &port, const string &speed, const string &cable_length, const string &mtu, const string &exactly_matched_key = "") +{ + port_info_t &portInfo = m_portInfoLookup[port]; + string &gearbox_model = portInfo.gearbox_model; + bool isHeadroomUpdated = false; + buffer_pg_lookup_t &portPgs = m_portPgLookup[port]; + set profilesToBeReleased; + + // Iterate all the lossless PGs configured on this port + for (auto it = portPgs.begin(); it != portPgs.end(); ++it) + { + auto &key = it->first; + if (!exactly_matched_key.empty() && key != exactly_matched_key) + { + SWSS_LOG_DEBUG("Update buffer PGs: key %s doesn't match %s, skipped ", key.c_str(), exactly_matched_key.c_str()); + continue; + } + auto &portPg = it->second; + string newProfile, oldProfile; + + oldProfile = portPg.running_profile_name; + if (!oldProfile.empty()) + { + // Clear old profile + portPg.running_profile_name = ""; + } + + if (portPg.dynamic_calculated) + { + string threshold; + //Calculate new headroom size + if (portPg.static_configured) + { + // static_configured but dynamic_calculated means non-default threshold value + auto &profile = m_bufferProfileLookup[portPg.configured_profile_name]; + threshold = profile.threshold; + } + else + { + threshold = m_defaultThreshold; + } + newProfile = getDynamicProfileName(speed, cable_length, mtu, threshold, gearbox_model); + auto rc = allocateProfile(speed, cable_length, mtu, threshold, gearbox_model, newProfile); + if (task_process_status::task_success != rc) + return rc; + + SWSS_LOG_DEBUG("Handling PG %s port %s, for dynamically calculated profile %s", key.c_str(), port.c_str(), newProfile.c_str()); + } + else + { + newProfile = portPg.configured_profile_name; + + SWSS_LOG_DEBUG("Handling PG %s port %s, for static profile %s", key.c_str(), port.c_str(), newProfile.c_str()); + } + + //Calculate whether accumulative headroom size exceeds the maximum value + //Abort if it does + if (!isHeadroomResourceValid(port, m_bufferProfileLookup[newProfile], key)) + { + SWSS_LOG_ERROR("Update speed (%s) and cable length (%s) for port %s failed, accumulative headroom size exceeds the limit", + speed.c_str(), cable_length.c_str(), port.c_str()); + + releaseProfile(newProfile); + + return task_process_status::task_failed; + } + + if (newProfile != oldProfile) + { + // Need to remove the reference to the old profile + // and create the reference to the new one + m_bufferProfileLookup[oldProfile].port_pgs.erase(key); + m_bufferProfileLookup[newProfile].port_pgs.insert(key); + SWSS_LOG_INFO("Move profile reference for %s from [%s] to [%s]", key.c_str(), oldProfile.c_str(), newProfile.c_str()); + + // buffer pg needs to be updated as well + portPg.running_profile_name = newProfile; + + // Add the old profile to "to be removed" set + if (!oldProfile.empty()) + profilesToBeReleased.insert(oldProfile); + } + + //appl_db Database operation: set item BUFFER_PG|| + updateBufferPgToDb(key, newProfile, true); + isHeadroomUpdated = true; + } + + portInfo.speed = speed; + portInfo.cable_length = cable_length; + portInfo.gearbox_model = gearbox_model; + + if (isHeadroomUpdated) + { + checkSharedBufferPoolSize(); + } + else + { + SWSS_LOG_DEBUG("Nothing to do for port %s since no PG configured on it", port.c_str()); + } + + portInfo.state = PORT_READY; + + //Remove the old profile which is probably not referenced anymore. + //TODO release all profiles in to-be-removed map + if (!profilesToBeReleased.empty()) + { + for (auto &oldProfile : profilesToBeReleased) + { + releaseProfile(oldProfile); + } + } + + return task_process_status::task_success; +} + +// Main flows + +// Update lossless pg on a port after an PG has been installed on the port +// Called when pg updated from CONFIG_DB +// Key format: BUFFER_PG:: +task_process_status BufferMgrDynamic::doUpdatePgTask(const string &pg_key) +{ + auto port = parseObjectNameFromKey(pg_key); + string value; + port_info_t &portInfo = m_portInfoLookup[port]; + auto &bufferPg = m_portPgLookup[port][pg_key]; + task_process_status task_status = task_process_status::task_success; + + switch (portInfo.state) + { + case PORT_READY: + // Not having profile_name but both speed and cable length have been configured for that port + // This is because the first PG on that port is configured after speed, cable length configured + // Just regenerate the profile + task_status = refreshPriorityGroupsForPort(port, portInfo.speed, portInfo.cable_length, portInfo.mtu, pg_key); + if (task_status != task_process_status::task_success) + return task_status; + + break; + + case PORT_INITIALIZING: + if (bufferPg.dynamic_calculated) + { + SWSS_LOG_NOTICE("Skip setting BUFFER_PG for %s because port's info isn't ready for dynamic calculation", pg_key.c_str()); + } + else + { + task_status = refreshPriorityGroupsForPort(port, portInfo.speed, portInfo.cable_length, portInfo.mtu, pg_key); + if (task_status != task_process_status::task_success) + return task_status; + } + break; + + default: + // speed and cable length hasn't been configured + // In that case, we just skip the this update and return success. + // It will be handled after speed and cable length configured. + SWSS_LOG_NOTICE("Skip setting BUFFER_PG for %s because port's info isn't ready for dynamic calculation", pg_key.c_str()); + return task_process_status::task_success; + } + + if (bufferPg.static_configured && bufferPg.dynamic_calculated) + { + auto &profile = m_bufferProfileLookup[bufferPg.configured_profile_name]; + profile.port_pgs.insert(pg_key); + } + + return task_process_status::task_success; +} + +//Remove the currently configured lossless pg +task_process_status BufferMgrDynamic::doRemovePgTask(const string &pg_key) +{ + auto port = parseObjectNameFromKey(pg_key); + auto pgs = parseObjectNameFromKey(pg_key, 1); + auto &bufferPgs = m_portPgLookup[port]; + buffer_pg_t &bufferPg = bufferPgs[pg_key]; + port_info_t &portInfo = m_portInfoLookup[port]; + + // Remove the PG from APPL_DB + string null_str(""); + updateBufferPgToDb(pg_key, null_str, false); + + SWSS_LOG_NOTICE("Remove BUFFER_PG %s (profile %s, %s)", pg_key.c_str(), bufferPg.running_profile_name.c_str(), bufferPg.configured_profile_name.c_str()); + + // recalculate pool size + checkSharedBufferPoolSize(); + + if (!portInfo.speed.empty() && !portInfo.cable_length.empty()) + portInfo.state = PORT_READY; + else + portInfo.state = PORT_INITIALIZING; + SWSS_LOG_NOTICE("try removing the original profile %s", bufferPg.running_profile_name.c_str()); + releaseProfile(bufferPg.running_profile_name); + + if (bufferPg.static_configured && bufferPg.dynamic_calculated) + { + auto &profile = m_bufferProfileLookup[bufferPg.configured_profile_name]; + profile.port_pgs.erase(pg_key); + } + + return task_process_status::task_success; +} + +task_process_status BufferMgrDynamic::doUpdateStaticProfileTask(buffer_profile_t &profile) +{ + const string &profileName = profile.name; + auto &profileToMap = profile.port_pgs; + set portsChecked; + + if (profile.dynamic_calculated) + { + for (auto &key : profileToMap) + { + auto portName = parseObjectNameFromKey(key); + auto &port = m_portInfoLookup[portName]; + + if (portsChecked.find(portName) != portsChecked.end()) + continue; + + SWSS_LOG_DEBUG("Checking PG %s for dynamic profile %s", key.c_str(), profileName.c_str()); + portsChecked.insert(portName); + + refreshPriorityGroupsForPort(portName, port.speed, port.cable_length, port.mtu); + } + } + else + { + for (auto &key : profileToMap) + { + auto port = parseObjectNameFromKey(key); + + if (portsChecked.find(port) != portsChecked.end()) + continue; + + SWSS_LOG_DEBUG("Checking PG %s for profile %s", key.c_str(), profileName.c_str()); + + if (!isHeadroomResourceValid(port, profile, key)) + { + // to do: get the value from application database + SWSS_LOG_ERROR("BUFFER_PROFILE %s cannot be updated because %s referencing it violates the resource limitation", + profileName.c_str(), key.c_str()); + return task_process_status::task_failed; + } + + portsChecked.insert(port); + } + + updateBufferProfileToDb(profileName, profile); + } + + checkSharedBufferPoolSize(); + + return task_process_status::task_success; +} + +task_process_status BufferMgrDynamic::handleBufferMaxParam(Consumer &consumer) +{ + auto it = consumer.m_toSync.begin(); + KeyOpFieldsValuesTuple t = it->second; + string op = kfvOp(t); + + if (op == SET_COMMAND) + { + for (auto i : kfvFieldsValues(t)) + { + if (fvField(i) == "mmu_size") + { + m_mmuSize = fvValue(i); + SWSS_LOG_DEBUG("Handling Default Lossless Buffer Param table field mmu_size %s", m_mmuSize.c_str()); + } + } + } + + return task_process_status::task_success; +} + +task_process_status BufferMgrDynamic::handleDefaultLossLessBufferParam(Consumer &consumer) +{ + auto it = consumer.m_toSync.begin(); + KeyOpFieldsValuesTuple t = it->second; + string op = kfvOp(t); + + if (op == SET_COMMAND) + { + for (auto i : kfvFieldsValues(t)) + { + if (fvField(i) == "default_dynamic_th") + { + m_defaultThreshold = fvValue(i); + SWSS_LOG_DEBUG("Handling Buffer Maximum value table field default_dynamic_th value %s", m_defaultThreshold.c_str()); + } + } + } + + return task_process_status::task_success; +} + +task_process_status BufferMgrDynamic::handleCableLenTable(Consumer &consumer) +{ + auto it = consumer.m_toSync.begin(); + KeyOpFieldsValuesTuple t = it->second; + string op = kfvOp(t); + + task_process_status task_status = task_process_status::task_success; + int failed_item_count = 0; + if (op == SET_COMMAND) + { + for (auto i : kfvFieldsValues(t)) + { + // receive and cache cable length table + auto &port = fvField(i); + auto &cable_length = fvValue(i); + port_info_t &portInfo = m_portInfoLookup[port]; + string &speed = portInfo.speed; + string &mtu = portInfo.mtu; + + SWSS_LOG_DEBUG("Handling CABLE_LENGTH table field %s length %s", port.c_str(), cable_length.c_str()); + SWSS_LOG_DEBUG("Port Info for %s before handling %s %s %s", + port.c_str(), + portInfo.speed.c_str(), portInfo.cable_length.c_str(), portInfo.gearbox_model.c_str()); + + if (portInfo.cable_length == cable_length) + { + continue; + } + + portInfo.cable_length = cable_length; + if (speed.empty() || mtu.empty()) + { + SWSS_LOG_WARN("Speed or mtu for %s hasn't configured yet, unable to calculate headroom", port.c_str()); + // We don't retry here because it doesn't make sense until the speed is configured. + continue; + } + + SWSS_LOG_INFO("Updating BUFFER_PG for port %s due to cable length updated", port.c_str()); + + //Try updating the buffer information + switch (portInfo.state) + { + case PORT_INITIALIZING: + portInfo.state = PORT_READY; + task_status = refreshPriorityGroupsForPort(port, speed, cable_length, mtu); + break; + + case PORT_READY: + task_status = refreshPriorityGroupsForPort(port, speed, cable_length, mtu); + break; + } + + switch (task_status) + { + case task_process_status::task_need_retry: + return task_status; + case task_process_status::task_failed: + // We shouldn't return directly here. Because doing so will cause the following items lost + failed_item_count++; + break; + default: + break; + } + + SWSS_LOG_DEBUG("Port Info for %s after handling speed %s cable %s gb %s", + port.c_str(), + portInfo.speed.c_str(), portInfo.cable_length.c_str(), portInfo.gearbox_model.c_str()); + } + } + + if (failed_item_count > 0) + { + return task_process_status::task_failed; + } + + return task_process_status::task_success; +} + +// A tiny state machine is required for handling the events +// flags: +// speed_updated +// mtu_updated +// admin_status_updated +// flow: +// 1. handle all events in order, record new value if necessary +// 2. if cable length or speed hasn't been configured, return success +// 3. if mtu isn't configured, take the default value +// 4. if speed_updated or mtu_updated, update headroom size +// elif admin_status_updated, update buffer pool size +task_process_status BufferMgrDynamic::handlePortTable(Consumer &consumer) +{ + auto it = consumer.m_toSync.begin(); + KeyOpFieldsValuesTuple t = it->second; + auto &port = kfvKey(t); + string op = kfvOp(t); + bool speed_updated = false, mtu_updated = false, admin_status_updated = false; + + SWSS_LOG_DEBUG("processing command:%s PORT table key %s", op.c_str(), port.c_str()); + + port_info_t &portInfo = m_portInfoLookup[port]; + + SWSS_LOG_DEBUG("Port Info for %s before handling %s %s %s", + port.c_str(), + portInfo.speed.c_str(), portInfo.cable_length.c_str(), portInfo.gearbox_model.c_str()); + + task_process_status task_status = task_process_status::task_success; + + if (op == SET_COMMAND) + { + for (auto i : kfvFieldsValues(t)) + { + if (fvField(i) == "speed") + { + speed_updated = true; + portInfo.speed = fvValue(i); + } + else if (fvField(i) == "mtu") + { + mtu_updated = true; + portInfo.mtu = fvValue(i); + } + else if (fvField(i) == "admin_status") + { + admin_status_updated = true; + } + } + + string &cable_length = portInfo.cable_length; + string &mtu = portInfo.mtu; + string &speed = portInfo.speed; + + if (cable_length.empty() || speed.empty()) + { + SWSS_LOG_WARN("Cable length for %s hasn't configured yet, unable to calculate headroom", port.c_str()); + // We don't retry here because it doesn't make sense until the cable length is configured. + return task_process_status::task_success; + } + + if (speed_updated || mtu_updated) + { + SWSS_LOG_INFO("Updating BUFFER_PG for port %s due to speed or port updated", port.c_str()); + + //Try updating the buffer information + switch (portInfo.state) + { + case PORT_INITIALIZING: + portInfo.state = PORT_READY; + if (mtu.empty()) + { + // During initialization, the flow can be + // 1. mtu, cable_length, speed + // 2. cable_length, speed, mtu + // 3. cable_length, speed, but no mtu specified + // it's ok for the 1st case + // but for the 2nd case, we can't wait mtu for calculating the headrooms + // because we can't distinguish case 2 from case 3 which is also legal. + // So once we have speed updated, let's try to calculate the headroom with default mtu. + // The cost is that if the mtu is provided in the following iteration + // the current calculation is of no value and will be replaced. + mtu = DEFAULT_MTU_STR; + } + task_status = refreshPriorityGroupsForPort(port, speed, cable_length, mtu); + break; + + case PORT_READY: + task_status = refreshPriorityGroupsForPort(port, speed, cable_length, mtu); + break; + } + + SWSS_LOG_DEBUG("Port Info for %s after handling speed %s cable %s gb %s", + port.c_str(), + portInfo.speed.c_str(), portInfo.cable_length.c_str(), portInfo.gearbox_model.c_str()); + } + else if (admin_status_updated) + { + SWSS_LOG_INFO("Recalculate shared buffer pool size due to port %s's admin_status updated", port.c_str()); + checkSharedBufferPoolSize(); + } + } + + return task_status; +} + +task_process_status BufferMgrDynamic::handleBufferPoolTable(Consumer &consumer) +{ + SWSS_LOG_ENTER(); + auto it = consumer.m_toSync.begin(); + KeyOpFieldsValuesTuple tuple = it->second; + string &pool = kfvKey(tuple); + string op = kfvOp(tuple); + vector fvVector; + + SWSS_LOG_DEBUG("processing command:%s table BUFFER_POOL key %s", op.c_str(), pool.c_str()); + if (op == SET_COMMAND) + { + // For set command: + // 1. Create the corresponding table entries in APPL_DB + // 2. Record the table in the internal cache m_bufferPoolLookup + buffer_pool_t &bufferPool = m_bufferPoolLookup[pool]; + + bufferPool.initialized = false; + bufferPool.dynamic_size = true; + for (auto i = kfvFieldsValues(tuple).begin(); i != kfvFieldsValues(tuple).end(); i++) + { + string &field = fvField(*i); + string &value = fvValue(*i); + + SWSS_LOG_DEBUG("field:%s, value:%s", field.c_str(), value.c_str()); + if (field == buffer_size_field_name) + { + bufferPool.dynamic_size = false; + } + if (field == buffer_pool_xoff_field_name) + { + bufferPool.xoff = value; + } + if (field == buffer_pool_mode_field_name) + { + bufferPool.mode = value; + } + if (field == buffer_pool_type_field_name) + { + bufferPool.ingress = (value == buffer_value_ingress); + } + fvVector.emplace_back(FieldValueTuple(field, value)); + SWSS_LOG_INFO("Inserting BUFFER_POOL table field %s value %s", field.c_str(), value.c_str()); + } + if (!bufferPool.dynamic_size) + { + bufferPool.initialized = true; + m_applBufferPoolTable.set(pool, fvVector); + m_stateBufferPoolTable.set(pool, fvVector); + } + } + else if (op == DEL_COMMAND) + { + // How do we handle dependency? + m_applBufferPoolTable.del(pool); + m_stateBufferPoolTable.del(pool); + m_bufferPoolLookup.erase(pool); + } + else + { + SWSS_LOG_ERROR("Unknown operation type %s", op.c_str()); + return task_process_status::task_invalid_entry; + } + return task_process_status::task_success; +} + +task_process_status BufferMgrDynamic::handleBufferProfileTable(Consumer &consumer) +{ + SWSS_LOG_ENTER(); + auto it = consumer.m_toSync.begin(); + KeyOpFieldsValuesTuple tuple = it->second; + string profileName = kfvKey(tuple); + string op = kfvOp(tuple); + vector fvVector; + + SWSS_LOG_DEBUG("processing command:%s BUFFER_PROFILE table key %s", op.c_str(), profileName.c_str()); + if (op == SET_COMMAND) + { + //For set command: + //1. Create the corresponding table entries in APPL_DB + //2. Record the table in the internal cache m_bufferProfileLookup + buffer_profile_t &profileApp = m_bufferProfileLookup[profileName]; + + profileApp.static_configured = true; + if (PROFILE_INITIALIZING == profileApp.state) + { + profileApp.dynamic_calculated = false; + profileApp.lossless = false; + profileApp.name = profileName; + } + for (auto i = kfvFieldsValues(tuple).begin(); i != kfvFieldsValues(tuple).end(); i++) + { + string &field = fvField(*i); + string &value = fvValue(*i); + + SWSS_LOG_DEBUG("field:%s, value:%s", field.c_str(), value.c_str()); + if (field == buffer_pool_field_name) + { + if (!value.empty()) + { + transformReference(value); + auto poolName = parseObjectNameFromReference(value); + auto poolRef = m_bufferPoolLookup.find(poolName); + if (poolRef == m_bufferPoolLookup.end()) + { + SWSS_LOG_WARN("Pool %s hasn't been configured yet, skip", poolName.c_str()); + return task_process_status::task_need_retry; + } + profileApp.pool_name = poolName; + profileApp.ingress = poolRef->second.ingress; + } + else + { + SWSS_LOG_ERROR("Pool for BUFFER_PROFILE %s hasn't been specified", field.c_str()); + return task_process_status::task_failed; + } + } + if (field == buffer_xon_field_name) + { + profileApp.xon = value; + } + if (field == buffer_xoff_field_name) + { + profileApp.xoff = value; + profileApp.lossless = true; + } + if (field == buffer_xon_offset_field_name) + { + profileApp.xon_offset = value; + } + if (field == buffer_size_field_name) + { + profileApp.size = value; + } + if (field == buffer_dynamic_th_field_name) + { + profileApp.threshold = value; + } + if (field == buffer_static_th_field_name) + { + profileApp.threshold = value; + } + if (field == buffer_headroom_type_field_name) + { + profileApp.dynamic_calculated = (value == "dynamic"); + if (profileApp.dynamic_calculated) + { + // For dynamic calculated headroom, user can provide this field only + // We need to supply lossless and ingress + profileApp.lossless = true; + profileApp.ingress = true; + } + } + fvVector.emplace_back(FieldValueTuple(field, value)); + SWSS_LOG_INFO("Inserting BUFFER_PROFILE table field %s value %s", field.c_str(), value.c_str()); + } + // don't insert dynamically calculated profiles into APPL_DB + if (profileApp.lossless && profileApp.ingress) + { + if (profileApp.dynamic_calculated) + { + profileApp.state = PROFILE_NORMAL; + SWSS_LOG_NOTICE("BUFFER_PROFILE %s is dynamic calculation so it won't be deployed to APPL_DB until referenced by a port", + profileName.c_str()); + doUpdateStaticProfileTask(profileApp); + } + else + { + profileApp.state = PROFILE_NORMAL; + doUpdateStaticProfileTask(profileApp); + SWSS_LOG_NOTICE("BUFFER_PROFILE %s has been inserted into APPL_DB", profileName.c_str()); + SWSS_LOG_DEBUG("BUFFER_PROFILE %s for headroom override has been stored internally: [pool %s xon %s xoff %s size %s]", + profileName.c_str(), + profileApp.pool_name.c_str(), profileApp.xon.c_str(), profileApp.xoff.c_str(), profileApp.size.c_str()); + } + } + else + { + m_applBufferProfileTable.set(profileName, fvVector); + SWSS_LOG_NOTICE("BUFFER_PROFILE %s has been inserted into APPL_DB directly", profileName.c_str()); + + m_stateBufferProfileTable.set(profileName, fvVector); + m_bufferProfileIgnored.insert(profileName); + } + } + else if (op == DEL_COMMAND) + { + // For del command: + // Check whether it is referenced by port. If yes, return "need retry" and exit + // Typically, the referencing occurs when headroom override configured + // Remove it from APPL_DB and internal cache + + auto profileRef = m_bufferProfileLookup.find(profileName); + if (profileRef != m_bufferProfileLookup.end()) + { + auto &profile = profileRef->second; + if (!profile.port_pgs.empty()) + { + // still being referenced + if (profile.static_configured) + { + // For headroom override, we just wait until all reference removed + SWSS_LOG_WARN("BUFFER_PROFILE %s for headroom override is referenced and cannot be removed for now", profileName.c_str()); + return task_process_status::task_need_retry; + } + else + { + SWSS_LOG_ERROR("Try to remove non-static-configured profile %s", profileName.c_str()); + return task_process_status::task_invalid_entry; + } + } + } + + m_applBufferProfileTable.del(profileName); + m_stateBufferProfileTable.del(profileName); + + m_bufferProfileLookup.erase(profileName); + m_bufferProfileIgnored.erase(profileName); + } + else + { + SWSS_LOG_ERROR("Unknown operation type %s", op.c_str()); + return task_process_status::task_invalid_entry; + } + return task_process_status::task_success; +} + +task_process_status BufferMgrDynamic::handleOneBufferPgEntry(const string &key, const string &port, const string &op, const KeyOpFieldsValuesTuple &tuple) +{ + vector fvVector; + buffer_pg_t &bufferPg = m_portPgLookup[port][key]; + + SWSS_LOG_DEBUG("processing command:%s table BUFFER_PG key %s", op.c_str(), key.c_str()); + if (op == SET_COMMAND) + { + bool ignored = false; + bool pureDynamic = true; + // For set command: + // 1. Create the corresponding table entries in APPL_DB + // 2. Record the table in the internal cache m_portPgLookup + // 3. Check whether the profile is ingress or egress + // 4. Initialize "profile_name" of buffer_pg_t + + bufferPg.dynamic_calculated = true; + bufferPg.static_configured = false; + for (auto i = kfvFieldsValues(tuple).begin(); i != kfvFieldsValues(tuple).end(); i++) + { + const string &field = fvField(*i); + string value = fvValue(*i); + + SWSS_LOG_DEBUG("field:%s, value:%s", field.c_str(), value.c_str()); + if (field == buffer_profile_field_name && value != "NULL") + { + // Headroom override + pureDynamic = false; + transformReference(value); + string profileName = parseObjectNameFromReference(value); + auto searchRef = m_bufferProfileLookup.find(profileName); + if (searchRef == m_bufferProfileLookup.end()) + { + if (m_bufferProfileIgnored.find(profileName) != m_bufferProfileIgnored.end()) + { + // Referencing an ignored profile, the PG should be ignored as well + ignored = true; + bufferPg.dynamic_calculated = false; + bufferPg.lossless = false; + bufferPg.configured_profile_name = profileName; + } + else + { + // In this case, we shouldn't set the dynamc calculated flat to true + // It will be updated when its profile configured. + bufferPg.dynamic_calculated = false; + SWSS_LOG_WARN("Profile %s hasn't been configured yet, skip", profileName.c_str()); + return task_process_status::task_need_retry; + } + } + else + { + buffer_profile_t &profileRef = searchRef->second; + bufferPg.dynamic_calculated = profileRef.dynamic_calculated; + bufferPg.configured_profile_name = profileName; + bufferPg.lossless = profileRef.lossless; + } + bufferPg.static_configured = true; + bufferPg.configured_profile_name = profileName; + } + fvVector.emplace_back(FieldValueTuple(field, value)); + SWSS_LOG_INFO("Inserting BUFFER_PG table field %s value %s", field.c_str(), value.c_str()); + } + + if (pureDynamic) + { + // Generic dynamically calculated headroom + bufferPg.dynamic_calculated = true; + bufferPg.lossless = true; + } + + if (!ignored && bufferPg.lossless) + { + doUpdatePgTask(key); + } + else + { + SWSS_LOG_NOTICE("Inserting BUFFER_PG table entry %s into APPL_DB directly", key.c_str()); + m_applBufferPgTable.set(key, fvVector); + } + } + else if (op == DEL_COMMAND) + { + // For del command: + // 1. Removing it from APPL_DB + // 2. Update internal caches + string &profileName = bufferPg.running_profile_name; + + m_bufferProfileLookup[profileName].port_pgs.erase(key); + + if (bufferPg.lossless) + { + doRemovePgTask(key); + } + else + { + SWSS_LOG_NOTICE("Removing BUFFER_PG table entry %s from APPL_DB directly", key.c_str()); + m_applBufferPgTable.del(key); + } + + m_portPgLookup[port].erase(key); + SWSS_LOG_DEBUG("Profile %s has been removed from port %s PG %s", profileName.c_str(), port.c_str(), key.c_str()); + if (m_portPgLookup[port].empty()) + { + m_portPgLookup.erase(port); + SWSS_LOG_DEBUG("Profile %s has been removed from port %s on all lossless PG", profileName.c_str(), port.c_str()); + } + } + else + { + SWSS_LOG_ERROR("Unknown operation type %s", op.c_str()); + return task_process_status::task_invalid_entry; + } + + return task_process_status::task_success; +} + +task_process_status BufferMgrDynamic::handleBufferPgTable(Consumer &consumer) +{ + SWSS_LOG_ENTER(); + auto it = consumer.m_toSync.begin(); + KeyOpFieldsValuesTuple tuple = it->second; + string key = kfvKey(tuple); + string op = kfvOp(tuple); + + transformSeperator(key); + string ports = parseObjectNameFromKey(key); + string pgs = parseObjectNameFromKey(key, 1); + auto portsList = tokenize(ports, ','); + + task_process_status rc = task_process_status::task_success; + + if (portsList.size() == 1) + { + rc = handleOneBufferPgEntry(key, ports, op, tuple); + } + else + { + for (auto port : portsList) + { + string singleKey = port + ':' + pgs; + rc = handleOneBufferPgEntry(singleKey, port, op, tuple); + if (rc == task_process_status::task_need_retry) + return rc; + } + } + + return rc; +} + +task_process_status BufferMgrDynamic::handleBufferQueueTable(Consumer &consumer) +{ + return doBufferTableTask(consumer, m_applBufferQueueTable); +} + +task_process_status BufferMgrDynamic::handleBufferPortIngressProfileListTable(Consumer &consumer) +{ + return doBufferTableTask(consumer, m_applBufferIngressProfileListTable); +} + +task_process_status BufferMgrDynamic::handleBufferPortEgressProfileListTable(Consumer &consumer) +{ + return doBufferTableTask(consumer, m_applBufferEgressProfileListTable); +} + +/* + * This function copies the data from tables in CONFIG_DB to APPL_DB. + * With dynamically buffer calculation supported, the following tables + * will be moved to APPL_DB from CONFIG_DB because the CONFIG_DB contains + * confgured entries only while APPL_DB contains dynamically generated entries + * - BUFFER_POOL + * - BUFFER_PROFILE + * - BUFFER_PG + * The following tables have to be moved to APPL_DB because they reference + * some entries that have been moved to APPL_DB + * - BUFFER_QUEUE + * - BUFFER_PORT_INGRESS_PROFILE_LIST + * - BUFFER_PORT_EGRESS_PROFILE_LIST + * One thing we need to handle is to transform the separator from | to : + * The following items contain separator: + * - keys of each item + * - pool in BUFFER_POOL + * - profile in BUFFER_PG + */ +task_process_status BufferMgrDynamic::doBufferTableTask(Consumer &consumer, ProducerStateTable &applTable) +{ + SWSS_LOG_ENTER(); + + auto it = consumer.m_toSync.begin(); + KeyOpFieldsValuesTuple t = it->second; + string key = kfvKey(t); + const string &name = consumer.getTableName(); + + //transform the separator in key from "|" to ":" + transformSeperator(key); + + string op = kfvOp(t); + if (op == SET_COMMAND) + { + vector fvVector; + + SWSS_LOG_INFO("Inserting entry %s|%s from CONFIG_DB to APPL_DB", name.c_str(), key.c_str()); + + for (auto i : kfvFieldsValues(t)) + { + //transform the separator in values from "|" to ":" + if (fvField(i) == "pool") + transformReference(fvValue(i)); + if (fvField(i) == "profile") + transformReference(fvValue(i)); + if (fvField(i) == "profile_list") + transformReference(fvValue(i)); + fvVector.emplace_back(FieldValueTuple(fvField(i), fvValue(i))); + SWSS_LOG_INFO("Inserting field %s value %s", fvField(i).c_str(), fvValue(i).c_str()); + } + applTable.set(key, fvVector); + } + else if (op == DEL_COMMAND) + { + SWSS_LOG_INFO("Removing entry %s from APPL_DB", key.c_str()); + applTable.del(key); + } + + return task_process_status::task_success; +} + +void BufferMgrDynamic::doTask(Consumer &consumer) +{ + SWSS_LOG_ENTER(); + string table_name = consumer.getTableName(); + auto it = consumer.m_toSync.begin(); + + if (m_bufferTableHandlerMap.find(table_name) == m_bufferTableHandlerMap.end()) + { + SWSS_LOG_ERROR("No handler for key:%s found.", table_name.c_str()); + while (it != consumer.m_toSync.end()) + it = consumer.m_toSync.erase(it); + return; + } + + while (it != consumer.m_toSync.end()) + { + auto task_status = (this->*(m_bufferTableHandlerMap[table_name]))(consumer); + switch (task_status) + { + case task_process_status::task_failed: + SWSS_LOG_ERROR("Failed to process table update"); + return; + case task_process_status::task_need_retry: + SWSS_LOG_INFO("Unable to process table update. Will retry..."); + it++; + break; + case task_process_status::task_invalid_entry: + SWSS_LOG_ERROR("Failed to process invalid entry, drop it"); + it = consumer.m_toSync.erase(it); + break; + default: + it = consumer.m_toSync.erase(it); + break; + } + } +} + +void BufferMgrDynamic::doTask(SelectableTimer &timer) +{ + checkSharedBufferPoolSize(); +} diff --git a/cfgmgr/buffermgrdyn.h b/cfgmgr/buffermgrdyn.h new file mode 100644 index 00000000000..f45f10697ab --- /dev/null +++ b/cfgmgr/buffermgrdyn.h @@ -0,0 +1,245 @@ +#ifndef __BUFFMGRDYN__ +#define __BUFFMGRDYN__ + +#include "dbconnector.h" +#include "producerstatetable.h" +#include "orch.h" + +#include +#include +#include + +namespace swss { + +#define INGRESS_LOSSLESS_PG_POOL_NAME "ingress_lossless_pool" +#define DEFAULT_MTU_STR "9100" + +#define BUFFERMGR_TIMER_PERIOD 10 + +typedef struct { + bool ingress; + bool dynamic_size; + bool initialized; + std::string total_size; + std::string mode; + std::string xoff; +} buffer_pool_t; + +// State of the profile. +// We maintain state for dynamic profiles only +// INITIALIZING state indicates a profile is under initializing +// NORMAL state indicates that profile is working normally +// A profile's state will be STALE When it is no longer referenced +// It will be removed after having been in this state for 1 minute +// This deferred mechanism yields two benefits +// 1. Avoid frequently removing/re-adding a profile +// 2. Avoid removing profile failure if orchagent handles BUFFER_PG +// and BUFFER_PROFILE tables in an order which differs from which +// buffermgrd calls +typedef enum { + PROFILE_INITIALIZING, + PROFILE_NORMAL +} profile_state_t; + +typedef std::set port_pg_set_t; +typedef struct { + profile_state_t state; + bool dynamic_calculated; + bool static_configured; + bool ingress; + bool lossless; + std::string name; + std::string size; + std::string xon; + std::string xon_offset; + std::string xoff; + std::string threshold; + std::string pool_name; + // port_pgs - stores pgs referencing this profile + // An element will be added or removed when a PG added or removed + port_pg_set_t port_pgs; +} buffer_profile_t; + +typedef struct { + bool lossless; + bool dynamic_calculated; + bool static_configured; + // There will always be a running_profile which derived from: + // - the configured profile for static one, + // - the dynamically generated profile otherwise. + std::string configured_profile_name; + std::string running_profile_name; +} buffer_pg_t; + +typedef enum { + // Port is under initializing, which means its info hasn't been comprehensive for calculating headroom + PORT_INITIALIZING, + // All necessary information for calculating headrom is ready + PORT_READY +} port_state_t; + +typedef struct { + port_state_t state; + std::string speed; + std::string cable_length; + std::string mtu; + std::string gearbox_model; +// std::string profile_name; +} port_info_t; + +//TODO: +//add map to store all configured PGs +//add map to store all configured profiles +//check whether the configure database update is for dynamically lossless, +//if yes, call further function to handle; if no, handle it directly in the callback + +//map from port name to port info +typedef std::map port_info_lookup_t; +//map from port info to profile +//for dynamically calculated profile +typedef std::map buffer_profile_lookup_t; +//map from name to pool +typedef std::map buffer_pool_lookup_t; +//port -> headroom override +typedef std::map headroom_override_t; +//map from pg to info +typedef std::map buffer_pg_lookup_t; +//map from port to all its pgs +typedef std::map port_pg_lookup_t; +//map from gearbox model to gearbox delay +typedef std::map gearbox_delay_t; + +class BufferMgrDynamic : public Orch +{ +public: + BufferMgrDynamic(DBConnector *cfgDb, DBConnector *stateDb, DBConnector *applDb, const std::vector &tables, std::shared_ptr> gearboxInfo); + using Orch::doTask; + +private: + typedef task_process_status (BufferMgrDynamic::*buffer_table_handler)(Consumer& consumer); + typedef std::map buffer_table_handler_map; + typedef std::pair buffer_handler_pair; + + buffer_table_handler_map m_bufferTableHandlerMap; + + bool m_portInitDone; + bool m_firstTimeCalculateBufferPool; + + std::shared_ptr m_applDb = nullptr; + SelectableTimer *m_buffermgrPeriodtimer = nullptr; + + // PORT and CABLE_LENGTH table and caches + Table m_cfgPortTable; + Table m_cfgCableLenTable; + // m_portInfoLookup + // key: port name + // updated only when a port's speed and cable length updated + port_info_lookup_t m_portInfoLookup; + + // BUFFER_POOL table and cache + ProducerStateTable m_applBufferPoolTable; + Table m_stateBufferPoolTable; + buffer_pool_lookup_t m_bufferPoolLookup; + + // BUFFER_PROFILE table and caches + ProducerStateTable m_applBufferProfileTable; + Table m_cfgBufferProfileTable; + Table m_stateBufferProfileTable; + // m_bufferProfileLookup - the cache for the following set: + // 1. CFG_BUFFER_PROFILE + // 2. Dynamically calculated headroom info stored in APPL_BUFFER_PROFILE + // key: profile name + buffer_profile_lookup_t m_bufferProfileLookup; + // A set where the ignored profiles are stored. + // A PG that reference an ignored profile should also be ignored. + std::set m_bufferProfileIgnored; + + // BUFFER_PG table and caches + ProducerStateTable m_applBufferPgTable; + Table m_cfgBufferPgTable; + // m_portPgLookup - the cache for CFG_BUFFER_PG and APPL_BUFFER_PG + // 1st level key: port name, 2nd level key: PGs + // Updated in: + // 1. handleBufferPgTable, update from database + // 2. refreshPriorityGroupsForPort, speed/cable length updated + port_pg_lookup_t m_portPgLookup; + + // Other tables + Table m_cfgLosslessPgPoolTable; + Table m_cfgDefaultLosslessBufferParam; + + Table m_stateBufferMaximumTable; + + ProducerStateTable m_applBufferQueueTable; + ProducerStateTable m_applBufferIngressProfileListTable; + ProducerStateTable m_applBufferEgressProfileListTable; + + Table m_applPortTable; + + bool m_supportGearbox; + gearbox_delay_t m_gearboxDelay; + std::string m_identifyGearboxDelay; + + // Vendor specific lua plugins for calculating headroom and buffer pool + // Loaded when the buffer manager starts + // Executed whenever the headroom and pool size need to be updated + std::string m_headroomSha; + std::string m_bufferpoolSha; + std::string m_checkHeadroomSha; + + // Parameters for headroom generation + std::string m_mmuSize; + std::string m_defaultThreshold; + + // Initializers + void initTableHandlerMap(); + void parseGearboxInfo(std::shared_ptr> gearboxInfo); + + // Tool functions to parse keys and references + std::string getPgPoolMode(); + void transformSeperator(std::string &name); + void transformReference(std::string &name); + std::string parseObjectNameFromKey(const std::string &key, size_t pos/* = 1*/); + std::string parseObjectNameFromReference(const std::string &reference); + std::string getDynamicProfileName(const std::string &speed, const std::string &cable, const std::string &mtu, const std::string &threshold, const std::string &gearbox_model); + + // APPL_DB table operations + void updateBufferPoolToDb(const std::string &name, const buffer_pool_t &pool); + void updateBufferProfileToDb(const std::string &name, const buffer_profile_t &profile); + void updateBufferPgToDb(const std::string &key, const std::string &profile, bool add); + + // Meta flows + void calculateHeadroomSize(const std::string &speed, const std::string &cable, const std::string &port_mtu, const std::string &gearbox_model, buffer_profile_t &headroom); + void checkSharedBufferPoolSize(); + void recalculateSharedBufferPool(); + task_process_status allocateProfile(const std::string &speed, const std::string &cable, const std::string &mtu, const std::string &threshold, const std::string &gearbox_model, std::string &profile_name); + void releaseProfile(const std::string &profile_name); + bool isHeadroomResourceValid(const std::string &port, buffer_profile_t &profile_info, std::string lossy_pg_changed); + + // Main flows + task_process_status refreshPriorityGroupsForPort(const std::string &port, const std::string &speed, const std::string &cable_length, const std::string &mtu, const std::string &exactly_matched_key); + task_process_status doUpdatePgTask(const std::string &pg_key); + task_process_status doRemovePgTask(const std::string &pg_key); + task_process_status doAdminStatusTask(const std::string port, const std::string adminStatus); + task_process_status doUpdateStaticProfileTask(buffer_profile_t &profile); + + // Table update handlers + task_process_status handleBufferMaxParam(Consumer &consumer); + task_process_status handleDefaultLossLessBufferParam(Consumer &consumer); + task_process_status handleCableLenTable(Consumer &consumer); + task_process_status handlePortTable(Consumer &consumer); + task_process_status handleBufferPoolTable(Consumer &consumer); + task_process_status handleBufferProfileTable(Consumer &consumer); + task_process_status handleOneBufferPgEntry(const std::string &key, const std::string &port, const std::string &op, const KeyOpFieldsValuesTuple &tuple); + task_process_status handleBufferPgTable(Consumer &consumer); + task_process_status handleBufferQueueTable(Consumer &consumer); + task_process_status handleBufferPortIngressProfileListTable(Consumer &consumer); + task_process_status handleBufferPortEgressProfileListTable(Consumer &consumer); + task_process_status doBufferTableTask(Consumer &consumer, ProducerStateTable &applTable); + void doTask(Consumer &consumer); + void doTask(SelectableTimer &timer); +}; + +} + +#endif /* __BUFFMGRDYN__ */ diff --git a/orchagent/bufferorch.cpp b/orchagent/bufferorch.cpp index 86c4ed11805..aaa5ab2adc9 100644 --- a/orchagent/bufferorch.cpp +++ b/orchagent/bufferorch.cpp @@ -26,40 +26,45 @@ static const vector bufferPoolWatermarkStatIds = }; type_map BufferOrch::m_buffer_type_maps = { - {CFG_BUFFER_POOL_TABLE_NAME, new object_reference_map()}, - {CFG_BUFFER_PROFILE_TABLE_NAME, new object_reference_map()}, - {CFG_BUFFER_QUEUE_TABLE_NAME, new object_reference_map()}, - {CFG_BUFFER_PG_TABLE_NAME, new object_reference_map()}, - {CFG_BUFFER_PORT_INGRESS_PROFILE_LIST_NAME, new object_reference_map()}, - {CFG_BUFFER_PORT_EGRESS_PROFILE_LIST_NAME, new object_reference_map()} + {APP_BUFFER_POOL_TABLE_NAME, new object_reference_map()}, + {APP_BUFFER_PROFILE_TABLE_NAME, new object_reference_map()}, + {APP_BUFFER_QUEUE_TABLE_NAME, new object_reference_map()}, + {APP_BUFFER_PG_TABLE_NAME, new object_reference_map()}, + {APP_BUFFER_PORT_INGRESS_PROFILE_LIST_NAME, new object_reference_map()}, + {APP_BUFFER_PORT_EGRESS_PROFILE_LIST_NAME, new object_reference_map()} }; -BufferOrch::BufferOrch(DBConnector *db, vector &tableNames) : - Orch(db, tableNames), +BufferOrch::BufferOrch(DBConnector *applDb, DBConnector *confDb, DBConnector *stateDb, vector &tableNames) : + Orch(applDb, tableNames), m_flexCounterDb(new DBConnector("FLEX_COUNTER_DB", 0)), m_flexCounterTable(new ProducerTable(m_flexCounterDb.get(), FLEX_COUNTER_TABLE)), m_flexCounterGroupTable(new ProducerTable(m_flexCounterDb.get(), FLEX_COUNTER_GROUP_TABLE)), - m_countersDb(new DBConnector("COUNTERS_DB", 0)) + m_countersDb(new DBConnector("COUNTERS_DB", 0)), + m_stateBufferMaximumValueTable(stateDb, STATE_BUFFER_MAXIMUM_VALUE_TABLE) { SWSS_LOG_ENTER(); initTableHandlers(); - initBufferReadyLists(db); + initBufferReadyLists(confDb); initFlexCounterGroupTable(); + initBufferConstants(); }; void BufferOrch::initTableHandlers() { SWSS_LOG_ENTER(); - m_bufferHandlerMap.insert(buffer_handler_pair(CFG_BUFFER_POOL_TABLE_NAME, &BufferOrch::processBufferPool)); - m_bufferHandlerMap.insert(buffer_handler_pair(CFG_BUFFER_PROFILE_TABLE_NAME, &BufferOrch::processBufferProfile)); - m_bufferHandlerMap.insert(buffer_handler_pair(CFG_BUFFER_QUEUE_TABLE_NAME, &BufferOrch::processQueue)); - m_bufferHandlerMap.insert(buffer_handler_pair(CFG_BUFFER_PG_TABLE_NAME, &BufferOrch::processPriorityGroup)); - m_bufferHandlerMap.insert(buffer_handler_pair(CFG_BUFFER_PORT_INGRESS_PROFILE_LIST_NAME, &BufferOrch::processIngressBufferProfileList)); - m_bufferHandlerMap.insert(buffer_handler_pair(CFG_BUFFER_PORT_EGRESS_PROFILE_LIST_NAME, &BufferOrch::processEgressBufferProfileList)); + m_bufferHandlerMap.insert(buffer_handler_pair(APP_BUFFER_POOL_TABLE_NAME, &BufferOrch::processBufferPool)); + m_bufferHandlerMap.insert(buffer_handler_pair(APP_BUFFER_PROFILE_TABLE_NAME, &BufferOrch::processBufferProfile)); + m_bufferHandlerMap.insert(buffer_handler_pair(APP_BUFFER_QUEUE_TABLE_NAME, &BufferOrch::processQueue)); + m_bufferHandlerMap.insert(buffer_handler_pair(APP_BUFFER_PG_TABLE_NAME, &BufferOrch::processPriorityGroup)); + m_bufferHandlerMap.insert(buffer_handler_pair(APP_BUFFER_PORT_INGRESS_PROFILE_LIST_NAME, &BufferOrch::processIngressBufferProfileList)); + m_bufferHandlerMap.insert(buffer_handler_pair(APP_BUFFER_PORT_EGRESS_PROFILE_LIST_NAME, &BufferOrch::processEgressBufferProfileList)); } void BufferOrch::initBufferReadyLists(DBConnector *db) { + // The motivation of m_ready_list is to get the preconfigured buffer pg and buffer queue items + // from the database when system starts. + // When a buffer pg or queue item is updated, if the item isn't in the m_ready_list SWSS_LOG_ENTER(); Table pg_table(db, CFG_BUFFER_PG_TABLE_NAME); @@ -79,8 +84,6 @@ void BufferOrch::initBufferReadyList(Table& table) // populate the lists with buffer configuration information for (const auto& key: keys) { - m_ready_list[key] = false; - auto tokens = tokenize(key, config_db_key_delimiter); if (tokens.size() != 2) { @@ -88,15 +91,42 @@ void BufferOrch::initBufferReadyList(Table& table) continue; } + // We need transform the key from config db format to appl db format + auto appldb_key = tokens[0] + delimiter + tokens[1]; + m_ready_list[appldb_key] = false; + auto port_names = tokenize(tokens[0], list_item_delimiter); for(const auto& port_name: port_names) { - m_port_ready_list_ref[port_name].push_back(key); + SWSS_LOG_INFO("Item %s has been inserted into ready list", appldb_key.c_str()); + m_port_ready_list_ref[port_name].push_back(appldb_key); } } } +void BufferOrch::initBufferConstants() +{ + sai_status_t status; + sai_attribute_t attr; + + attr.id = SAI_SWITCH_ATTR_TOTAL_BUFFER_SIZE; + + status = sai_switch_api->get_switch_attribute(gSwitchId, 1, &attr); + if (status != SAI_STATUS_SUCCESS) + { + SWSS_LOG_ERROR("Failed to get Maximum memory size, rv:%d", status); + // This is not a mandatory attribute so in case of failure we just return + return; + } + + vector fvVector; + fvVector.emplace_back(make_pair("mmu_size", to_string(attr.value.u64 * 1024))); + m_stateBufferMaximumValueTable.set("global", fvVector); + SWSS_LOG_NOTICE("Got maximum memory size %lu, exposing to %s|global", + attr.value.u64, STATE_BUFFER_MAXIMUM_VALUE_TABLE); +} + void BufferOrch::initFlexCounterGroupTable(void) { string bufferPoolWmPluginName = "watermark_bufferpool.lua"; @@ -171,7 +201,7 @@ void BufferOrch::generateBufferPoolWatermarkCounterIdList(void) // these bits. This assumes the total number of buffer pools to be no greater than 32, which should satisfy all use cases. unsigned int noWmClrCapability = 0; unsigned int bitMask = 1; - for (const auto &it : *(m_buffer_type_maps[CFG_BUFFER_POOL_TABLE_NAME])) + for (const auto &it : *(m_buffer_type_maps[APP_BUFFER_POOL_TABLE_NAME])) { sai_status_t status = sai_buffer_api->clear_buffer_pool_stats( it.second.m_saiObjectId, @@ -198,7 +228,7 @@ void BufferOrch::generateBufferPoolWatermarkCounterIdList(void) vector fvTuples; fvTuples.emplace_back(BUFFER_POOL_COUNTER_ID_LIST, statList); bitMask = 1; - for (const auto &it : *(m_buffer_type_maps[CFG_BUFFER_POOL_TABLE_NAME])) + for (const auto &it : *(m_buffer_type_maps[APP_BUFFER_POOL_TABLE_NAME])) { string key = BUFFER_POOL_WATERMARK_STAT_COUNTER_FLEX_COUNTER_GROUP ":" + sai_serialize_object_id(it.second.m_saiObjectId); @@ -229,7 +259,7 @@ const object_reference_map &BufferOrch::getBufferPoolNameOidMap(void) // In the case different Orches are running in // different threads, caller may need to grab a read lock // before calling this function - return *m_buffer_type_maps[CFG_BUFFER_POOL_TABLE_NAME]; + return *m_buffer_type_maps[APP_BUFFER_POOL_TABLE_NAME]; } task_process_status BufferOrch::processBufferPool(Consumer &consumer) @@ -272,7 +302,8 @@ task_process_status BufferOrch::processBufferPool(Consumer &consumer) if (SAI_NULL_OBJECT_ID != sai_object) { - // We should skip the pool type because it's create only when setting a pool's attribute. + // We should skip the pool type when setting a pool's attribute because it's create only + // when setting a pool's attribute. SWSS_LOG_INFO("Skip setting buffer pool type %s for pool %s", type.c_str(), object_name.c_str()); continue; } @@ -299,7 +330,7 @@ task_process_status BufferOrch::processBufferPool(Consumer &consumer) if (SAI_NULL_OBJECT_ID != sai_object) { - // We should skip the pool mode because it's create only when setting a pool's attribute. + // We should skip the pool mode when setting a pool's attribute because it's create only. SWSS_LOG_INFO("Skip setting buffer pool mode %s for pool %s", mode.c_str(), object_name.c_str()); continue; } @@ -442,6 +473,13 @@ task_process_status BufferOrch::processBufferProfile(Consumer &consumer) SWSS_LOG_ERROR("Resolving pool reference failed"); return task_process_status::task_failed; } + if (SAI_NULL_OBJECT_ID != sai_object) + { + // We should skip the profile's pool name because it's create only + // when setting a profile's attribute. + SWSS_LOG_INFO("Skip setting buffer profile's pool %s for profile %s", value.c_str(), object_name.c_str()); + continue; + } attr.id = SAI_BUFFER_PROFILE_ATTR_POOL_ID; attr.value.oid = sai_pool; attribs.push_back(attr); @@ -474,7 +512,7 @@ task_process_status BufferOrch::processBufferProfile(Consumer &consumer) { if (SAI_NULL_OBJECT_ID != sai_object) { - // We should skip the profile's threshold type because it's create only when setting a profile's attribute. + // We should skip the profile's threshold type when setting a profile's attribute because it's create only. SWSS_LOG_INFO("Skip setting buffer profile's threshold type for profile %s", object_name.c_str()); } else @@ -492,7 +530,7 @@ task_process_status BufferOrch::processBufferProfile(Consumer &consumer) { if (SAI_NULL_OBJECT_ID != sai_object) { - // We should skip the profile's threshold type because it's create only when setting a profile's attribute. + // We should skip the profile's threshold type when setting a profile's attribute because it's create only. SWSS_LOG_INFO("Skip setting buffer profile's threshold type for profile %s", object_name.c_str()); } else @@ -584,7 +622,7 @@ task_process_status BufferOrch::processQueue(Consumer &consumer) sai_uint32_t range_low, range_high; SWSS_LOG_DEBUG("Processing:%s", key.c_str()); - tokens = tokenize(key, config_db_key_delimiter); + tokens = tokenize(key, delimiter); if (tokens.size() != 2) { SWSS_LOG_ERROR("malformed key:%s. Must contain 2 tokens", key.c_str()); @@ -680,7 +718,7 @@ task_process_status BufferOrch::processQueue(Consumer &consumer) for (const auto &port_name : port_names) { if (gPortsOrch->isPortAdminUp(port_name)) { - SWSS_LOG_ERROR("Queue profile '%s' applied after port %s is up", key.c_str(), port_name.c_str()); + SWSS_LOG_WARN("Queue profile '%s' applied after port %s is up", key.c_str(), port_name.c_str()); } } } @@ -704,7 +742,7 @@ task_process_status BufferOrch::processPriorityGroup(Consumer &consumer) sai_uint32_t range_low, range_high; SWSS_LOG_DEBUG("processing:%s", key.c_str()); - tokens = tokenize(key, config_db_key_delimiter); + tokens = tokenize(key, delimiter); if (tokens.size() != 2) { SWSS_LOG_ERROR("malformed key:%s. Must contain 2 tokens", key.c_str()); @@ -801,7 +839,7 @@ task_process_status BufferOrch::processPriorityGroup(Consumer &consumer) for (const auto &port_name : port_names) { if (gPortsOrch->isPortAdminUp(port_name)) { - SWSS_LOG_ERROR("PG profile '%s' applied after port %s is up", key.c_str(), port_name.c_str()); + SWSS_LOG_WARN("PG profile '%s' applied after port %s is up", key.c_str(), port_name.c_str()); } } } @@ -822,9 +860,9 @@ task_process_status BufferOrch::processIngressBufferProfileList(Consumer &consum string op = kfvOp(tuple); SWSS_LOG_DEBUG("processing:%s", key.c_str()); - if (consumer.getTableName() != CFG_BUFFER_PORT_INGRESS_PROFILE_LIST_NAME) + if (consumer.getTableName() != APP_BUFFER_PORT_INGRESS_PROFILE_LIST_NAME) { - SWSS_LOG_ERROR("Key with invalid table type passed in %s, expected:%s", key.c_str(), CFG_BUFFER_PORT_INGRESS_PROFILE_LIST_NAME); + SWSS_LOG_ERROR("Key with invalid table type passed in %s, expected:%s", key.c_str(), APP_BUFFER_PORT_INGRESS_PROFILE_LIST_NAME); return task_process_status::task_invalid_entry; } vector port_names = tokenize(key, list_item_delimiter); @@ -934,10 +972,12 @@ void BufferOrch::doTask() // ├── buffer queue // └── buffer pq table - auto pool_consumer = getExecutor((CFG_BUFFER_POOL_TABLE_NAME)); + SWSS_LOG_INFO("Handling buffer task"); + + auto pool_consumer = getExecutor((APP_BUFFER_POOL_TABLE_NAME)); pool_consumer->drain(); - auto profile_consumer = getExecutor(CFG_BUFFER_PROFILE_TABLE_NAME); + auto profile_consumer = getExecutor(APP_BUFFER_PROFILE_TABLE_NAME); profile_consumer->drain(); for(auto &it : m_consumerMap) @@ -957,6 +997,7 @@ void BufferOrch::doTask(Consumer &consumer) if (!gPortsOrch->isConfigDone()) { + SWSS_LOG_INFO("Buffer task for %s can't be executed ahead of port config done", consumer.getTableName().c_str()); return; } diff --git a/orchagent/bufferorch.h b/orchagent/bufferorch.h index f19e2b43224..35191db73bd 100644 --- a/orchagent/bufferorch.h +++ b/orchagent/bufferorch.h @@ -27,11 +27,12 @@ const string buffer_profile_field_name = "profile"; const string buffer_value_ingress = "ingress"; const string buffer_value_egress = "egress"; const string buffer_profile_list_field_name = "profile_list"; +const string buffer_headroom_type_field_name= "headroom_type"; class BufferOrch : public Orch { public: - BufferOrch(DBConnector *db, vector &tableNames); + BufferOrch(DBConnector *applDb, DBConnector *confDb, DBConnector *stateDb, vector &tableNames); bool isPortReady(const std::string& port_name) const; static type_map m_buffer_type_maps; void generateBufferPoolWatermarkCounterIdList(void); @@ -48,6 +49,7 @@ class BufferOrch : public Orch void initBufferReadyLists(DBConnector *db); void initBufferReadyList(Table& table); void initFlexCounterGroupTable(void); + void initBufferConstants(); task_process_status processBufferPool(Consumer &consumer); task_process_status processBufferProfile(Consumer &consumer); task_process_status processQueue(Consumer &consumer); @@ -63,6 +65,8 @@ class BufferOrch : public Orch unique_ptr m_flexCounterGroupTable; unique_ptr m_flexCounterTable; + Table m_stateBufferMaximumValueTable; + unique_ptr m_countersDb; bool m_isBufferPoolWatermarkCounterIdListGenerated = false; diff --git a/orchagent/orchdaemon.cpp b/orchagent/orchdaemon.cpp index 1682e4a23c0..14bc15b09e0 100644 --- a/orchagent/orchdaemon.cpp +++ b/orchagent/orchdaemon.cpp @@ -170,14 +170,14 @@ bool OrchDaemon::init() QosOrch *qos_orch = new QosOrch(m_configDb, qos_tables); vector buffer_tables = { - CFG_BUFFER_POOL_TABLE_NAME, - CFG_BUFFER_PROFILE_TABLE_NAME, - CFG_BUFFER_QUEUE_TABLE_NAME, - CFG_BUFFER_PG_TABLE_NAME, - CFG_BUFFER_PORT_INGRESS_PROFILE_LIST_NAME, - CFG_BUFFER_PORT_EGRESS_PROFILE_LIST_NAME + APP_BUFFER_POOL_TABLE_NAME, + APP_BUFFER_PROFILE_TABLE_NAME, + APP_BUFFER_QUEUE_TABLE_NAME, + APP_BUFFER_PG_TABLE_NAME, + APP_BUFFER_PORT_INGRESS_PROFILE_LIST_NAME, + APP_BUFFER_PORT_EGRESS_PROFILE_LIST_NAME }; - gBufferOrch = new BufferOrch(m_configDb, buffer_tables); + gBufferOrch = new BufferOrch(m_applDb, m_configDb, m_stateDb, buffer_tables); PolicerOrch *policer_orch = new PolicerOrch(m_configDb, "POLICER"); diff --git a/tests/mock_tests/portsorch_ut.cpp b/tests/mock_tests/portsorch_ut.cpp index 0803c0300f8..1b2400e4762 100644 --- a/tests/mock_tests/portsorch_ut.cpp +++ b/tests/mock_tests/portsorch_ut.cpp @@ -94,9 +94,10 @@ namespace portsorch_test TEST_F(PortsOrchTest, PortReadinessColdBoot) { Table portTable = Table(m_app_db.get(), APP_PORT_TABLE_NAME); - Table pgTable = Table(m_config_db.get(), CFG_BUFFER_PG_TABLE_NAME); - Table profileTable = Table(m_config_db.get(), CFG_BUFFER_PROFILE_TABLE_NAME); - Table poolTable = Table(m_config_db.get(), CFG_BUFFER_POOL_TABLE_NAME); + Table pgTable = Table(m_app_db.get(), APP_BUFFER_PG_TABLE_NAME); + Table pgTableCfg = Table(m_config_db.get(), CFG_BUFFER_PG_TABLE_NAME); + Table profileTable = Table(m_app_db.get(), APP_BUFFER_PROFILE_TABLE_NAME); + Table poolTable = Table(m_app_db.get(), APP_BUFFER_POOL_TABLE_NAME); // Get SAI default ports to populate DB @@ -112,7 +113,7 @@ namespace portsorch_test }); // Create test buffer profile - profileTable.set("test_profile", { { "pool", "[BUFFER_POOL|test_pool]" }, + profileTable.set("test_profile", { { "pool", "[BUFFER_POOL_TABLE:test_pool]" }, { "xon", "14832" }, { "xoff", "14832" }, { "size", "35000" }, @@ -121,9 +122,11 @@ namespace portsorch_test // Apply profile on PGs 3-4 all ports for (const auto &it : ports) { - std::ostringstream oss; - oss << it.first << "|3-4"; - pgTable.set(oss.str(), { { "profile", "[BUFFER_PROFILE|test_profile]" } }); + std::ostringstream ossAppl, ossCfg; + ossAppl << it.first << ":3-4"; + pgTable.set(ossAppl.str(), { { "profile", "[BUFFER_PROFILE_TABLE:test_profile]" } }); + ossCfg << it.first << "|3-4"; + pgTableCfg.set(ossCfg.str(), { { "profile", "[BUFFER_PROFILE|test_profile]" } }); } // Create dependencies ... @@ -140,15 +143,15 @@ namespace portsorch_test ASSERT_EQ(gPortsOrch, nullptr); gPortsOrch = new PortsOrch(m_app_db.get(), ports_tables); - vector buffer_tables = { CFG_BUFFER_POOL_TABLE_NAME, - CFG_BUFFER_PROFILE_TABLE_NAME, - CFG_BUFFER_QUEUE_TABLE_NAME, - CFG_BUFFER_PG_TABLE_NAME, - CFG_BUFFER_PORT_INGRESS_PROFILE_LIST_NAME, - CFG_BUFFER_PORT_EGRESS_PROFILE_LIST_NAME }; + vector buffer_tables = { APP_BUFFER_POOL_TABLE_NAME, + APP_BUFFER_PROFILE_TABLE_NAME, + APP_BUFFER_QUEUE_TABLE_NAME, + APP_BUFFER_PG_TABLE_NAME, + APP_BUFFER_PORT_INGRESS_PROFILE_LIST_NAME, + APP_BUFFER_PORT_EGRESS_PROFILE_LIST_NAME }; ASSERT_EQ(gBufferOrch, nullptr); - gBufferOrch = new BufferOrch(m_config_db.get(), buffer_tables); + gBufferOrch = new BufferOrch(m_app_db.get(), m_config_db.get(), m_state_db.get(), buffer_tables); // Populate pot table with SAI ports for (const auto &it : ports) @@ -212,9 +215,9 @@ namespace portsorch_test { Table portTable = Table(m_app_db.get(), APP_PORT_TABLE_NAME); - Table pgTable = Table(m_config_db.get(), CFG_BUFFER_PG_TABLE_NAME); - Table profileTable = Table(m_config_db.get(), CFG_BUFFER_PROFILE_TABLE_NAME); - Table poolTable = Table(m_config_db.get(), CFG_BUFFER_POOL_TABLE_NAME); + Table pgTable = Table(m_app_db.get(), APP_BUFFER_PG_TABLE_NAME); + Table profileTable = Table(m_app_db.get(), APP_BUFFER_PROFILE_TABLE_NAME); + Table poolTable = Table(m_app_db.get(), APP_BUFFER_POOL_TABLE_NAME); // Get SAI default ports to populate DB @@ -230,7 +233,7 @@ namespace portsorch_test }); // Create test buffer profile - profileTable.set("test_profile", { { "pool", "[BUFFER_POOL|test_pool]" }, + profileTable.set("test_profile", { { "pool", "[BUFFER_POOL_TABLE:test_pool]" }, { "xon", "14832" }, { "xoff", "14832" }, { "size", "35000" }, @@ -240,8 +243,8 @@ namespace portsorch_test for (const auto &it : ports) { std::ostringstream oss; - oss << it.first << "|3-4"; - pgTable.set(oss.str(), { { "profile", "[BUFFER_PROFILE|test_profile]" } }); + oss << it.first << ":3-4"; + pgTable.set(oss.str(), { { "profile", "[BUFFER_PROFILE_TABLE:test_profile]" } }); } // Populate pot table with SAI ports @@ -269,15 +272,15 @@ namespace portsorch_test ASSERT_EQ(gPortsOrch, nullptr); gPortsOrch = new PortsOrch(m_app_db.get(), ports_tables); - vector buffer_tables = { CFG_BUFFER_POOL_TABLE_NAME, - CFG_BUFFER_PROFILE_TABLE_NAME, - CFG_BUFFER_QUEUE_TABLE_NAME, - CFG_BUFFER_PG_TABLE_NAME, - CFG_BUFFER_PORT_INGRESS_PROFILE_LIST_NAME, - CFG_BUFFER_PORT_EGRESS_PROFILE_LIST_NAME }; + vector buffer_tables = { APP_BUFFER_POOL_TABLE_NAME, + APP_BUFFER_PROFILE_TABLE_NAME, + APP_BUFFER_QUEUE_TABLE_NAME, + APP_BUFFER_PG_TABLE_NAME, + APP_BUFFER_PORT_INGRESS_PROFILE_LIST_NAME, + APP_BUFFER_PORT_EGRESS_PROFILE_LIST_NAME }; ASSERT_EQ(gBufferOrch, nullptr); - gBufferOrch = new BufferOrch(m_config_db.get(), buffer_tables); + gBufferOrch = new BufferOrch(m_app_db.get(), m_config_db.get(), m_state_db.get(), buffer_tables); // warm start, bake fill refill consumer @@ -318,9 +321,9 @@ namespace portsorch_test TEST_F(PortsOrchTest, PfcZeroBufferHandlerLocksPortPgAndQueue) { Table portTable = Table(m_app_db.get(), APP_PORT_TABLE_NAME); - Table pgTable = Table(m_config_db.get(), CFG_BUFFER_PG_TABLE_NAME); - Table profileTable = Table(m_config_db.get(), CFG_BUFFER_PROFILE_TABLE_NAME); - Table poolTable = Table(m_config_db.get(), CFG_BUFFER_POOL_TABLE_NAME); + Table pgTable = Table(m_app_db.get(), APP_BUFFER_PG_TABLE_NAME); + Table profileTable = Table(m_app_db.get(), APP_BUFFER_PROFILE_TABLE_NAME); + Table poolTable = Table(m_app_db.get(), APP_BUFFER_POOL_TABLE_NAME); // Get SAI default ports to populate DB auto ports = ut_helper::getInitialSaiPorts(); @@ -339,15 +342,15 @@ namespace portsorch_test ASSERT_EQ(gPortsOrch, nullptr); gPortsOrch = new PortsOrch(m_app_db.get(), ports_tables); - vector buffer_tables = { CFG_BUFFER_POOL_TABLE_NAME, - CFG_BUFFER_PROFILE_TABLE_NAME, - CFG_BUFFER_QUEUE_TABLE_NAME, - CFG_BUFFER_PG_TABLE_NAME, - CFG_BUFFER_PORT_INGRESS_PROFILE_LIST_NAME, - CFG_BUFFER_PORT_EGRESS_PROFILE_LIST_NAME }; + vector buffer_tables = { APP_BUFFER_POOL_TABLE_NAME, + APP_BUFFER_PROFILE_TABLE_NAME, + APP_BUFFER_QUEUE_TABLE_NAME, + APP_BUFFER_PG_TABLE_NAME, + APP_BUFFER_PORT_INGRESS_PROFILE_LIST_NAME, + APP_BUFFER_PORT_EGRESS_PROFILE_LIST_NAME }; ASSERT_EQ(gBufferOrch, nullptr); - gBufferOrch = new BufferOrch(m_config_db.get(), buffer_tables); + gBufferOrch = new BufferOrch(m_app_db.get(), m_config_db.get(), m_state_db.get(), buffer_tables); // Populate port table with SAI ports for (const auto &it : ports) @@ -396,7 +399,7 @@ namespace portsorch_test }); // Create test buffer profile - profileTable.set("test_profile", { { "pool", "[BUFFER_POOL|test_pool]" }, + profileTable.set("test_profile", { { "pool", "[BUFFER_POOL_TABLE:test_pool]" }, { "xon", "14832" }, { "xoff", "14832" }, { "size", "35000" }, @@ -406,8 +409,8 @@ namespace portsorch_test for (const auto &it : ports) { std::ostringstream oss; - oss << it.first << "|3-4"; - pgTable.set(oss.str(), { { "profile", "[BUFFER_PROFILE|test_profile]" } }); + oss << it.first << ":3-4"; + pgTable.set(oss.str(), { { "profile", "[BUFFER_PROFILE_TABLE:test_profile]" } }); } gBufferOrch->addExistingData(&pgTable); gBufferOrch->addExistingData(&poolTable); @@ -416,7 +419,7 @@ namespace portsorch_test // process pool, profile and PGs static_cast(gBufferOrch)->doTask(); - auto pgConsumer = static_cast(gBufferOrch->getExecutor(CFG_BUFFER_PG_TABLE_NAME)); + auto pgConsumer = static_cast(gBufferOrch->getExecutor(APP_BUFFER_PG_TABLE_NAME)); pgConsumer->dumpPendingTasks(ts); ASSERT_FALSE(ts.empty()); // PG is skipped ts.clear(); @@ -427,7 +430,7 @@ namespace portsorch_test // process PGs static_cast(gBufferOrch)->doTask(); - pgConsumer = static_cast(gBufferOrch->getExecutor(CFG_BUFFER_PG_TABLE_NAME)); + pgConsumer = static_cast(gBufferOrch->getExecutor(APP_BUFFER_PG_TABLE_NAME)); pgConsumer->dumpPendingTasks(ts); ASSERT_TRUE(ts.empty()); // PG should be proceesed now ts.clear(); @@ -470,15 +473,15 @@ namespace portsorch_test ASSERT_EQ(gPortsOrch, nullptr); gPortsOrch = new PortsOrch(m_app_db.get(), ports_tables); - vector buffer_tables = { CFG_BUFFER_POOL_TABLE_NAME, - CFG_BUFFER_PROFILE_TABLE_NAME, - CFG_BUFFER_QUEUE_TABLE_NAME, - CFG_BUFFER_PG_TABLE_NAME, - CFG_BUFFER_PORT_INGRESS_PROFILE_LIST_NAME, - CFG_BUFFER_PORT_EGRESS_PROFILE_LIST_NAME }; + vector buffer_tables = { APP_BUFFER_POOL_TABLE_NAME, + APP_BUFFER_PROFILE_TABLE_NAME, + APP_BUFFER_QUEUE_TABLE_NAME, + APP_BUFFER_PG_TABLE_NAME, + APP_BUFFER_PORT_INGRESS_PROFILE_LIST_NAME, + APP_BUFFER_PORT_EGRESS_PROFILE_LIST_NAME }; ASSERT_EQ(gBufferOrch, nullptr); - gBufferOrch = new BufferOrch(m_config_db.get(), buffer_tables); + gBufferOrch = new BufferOrch(m_app_db.get(), m_config_db.get(), m_state_db.get(), buffer_tables); /* * Next we will prepare some configuration data to be consumed by PortsOrch diff --git a/tests/test_buffer.py b/tests/test_buffer.py new file mode 100644 index 00000000000..1b77990c65e --- /dev/null +++ b/tests/test_buffer.py @@ -0,0 +1,180 @@ +import time +import json +import redis +import pytest + +from pprint import pprint +from swsscommon import swsscommon + + +class TestBufferMgrDyn(object): + def setup_db(self, dvs): + self.initialized = False + self.cableLenTest1 = "23m" + self.cableLenTest2 = "29m" + self.speedToTest1 = "50000" + self.speedToTest2 = "10000" + + self.app_db = dvs.get_app_db() + self.config_db = dvs.get_config_db() + self.state_db = dvs.get_state_db() + + fvs = self.config_db.wait_for_entry("PORT", "Ethernet0") + self.originalSpeed = fvs["speed"] + if self.originalSpeed == self.speedToTest1: + self.speedToTest1 = "100000" + elif self.originalSpeed == self.speedToTest2: + self.speedToTest2 = "100000" + elif self.originalSpeed == "": + self.originalSpeed = "100000" + + # Check whether cabel length has been configured + fvs = self.config_db.wait_for_entry("CABLE_LENGTH", "AZURE") + self.originalCableLen = fvs["Ethernet0"] + if self.originalCableLen == self.cableLenTest1: + self.cableLenTest1 = "19m" + elif self.originalCableLen == self.cableLenTest2: + self.cableLenTest2 = "19m" + + fvs = {"mmu_size": "12766208"} + self.state_db.create_entry("BUFFER_MAX_PARAM_TABLE", "global", fvs) + + self.initialized = True + + def make_lossless_profile_name(self, speed, cable_length): + return "pg_lossless_" + speed + "_" + cable_length + "_profile" + + def test_changeSpeed(self, dvs, testlog): + self.setup_db(dvs) + + # Change speed to speed1 and verify whether the profile has been updated + dvs.runcmd("config interface speed Ethernet0 " + self.speedToTest1) + + expectedProfile = self.make_lossless_profile_name(self.speedToTest1, self.originalCableLen) + self.app_db.wait_for_entry("BUFFER_PROFILE_TABLE", expectedProfile) + + # Check whether bufer pg align + bufferPg = self.app_db.wait_for_field_match("BUFFER_PG_TABLE", "Ethernet0:3-4", {"profile": "[BUFFER_PROFILE_TABLE:" + expectedProfile + "]"}) + + # Remove PG + dvs.runcmd("config interface buffer priority_group lossless remove Ethernet0 3-4") + self.app_db.wait_for_deleted_entry("BUFFER_PG_TABLE", "Ethernet0:3-4") + + # Change speed to speed2 and verify + dvs.runcmd("config interface speed Ethernet0 " + self.speedToTest2) + expectedProfile = self.make_lossless_profile_name(self.speedToTest2, self.originalCableLen) + + # Re-add another PG + dvs.runcmd("config interface buffer priority_group lossless add Ethernet0 6") + self.app_db.wait_for_field_match("BUFFER_PG_TABLE", "Ethernet0:6", {"profile": "[BUFFER_PROFILE_TABLE:" + expectedProfile + "]"}) + + # Revert speed and PG to original value + dvs.runcmd("config interface buffer priority_group remove Ethernet0 6") + self.app_db.wait_for_deleted_entry("BUFFER_PG_TABLE", "Ethernet0:6") + + dvs.runcmd("config interface speed Ethernet0 " + self.originalSpeed) + dvs.runcmd("config interface buffer priority_group add Ethernet0 3-4") + + expectedProfile = self.make_lossless_profile_name(self.originalSpeed, self.originalCableLen) + self.app_db.wait_for_entry("BUFFER_PROFILE_TABLE", expectedProfile) + self.app_db.wait_for_field_match("BUFFER_PG_TABLE", "Ethernet0:3-4", {"profile": "[BUFFER_PROFILE_TABLE:" + expectedProfile + "]"}) + + def test_changeCableLen(self, dvs, testlog): + self.setup_db(dvs) + + # Change to new cable length + dvs.runcmd("config interface cable_length Ethernet0 " + self.cableLenTest1) + expectedProfile = self.make_lossless_profile_name(self.originalSpeed, self.cableLenTest1) + self.app_db.wait_for_entry("BUFFER_PROFILE_TABLE", expectedProfile) + self.app_db.wait_for_field_match("BUFFER_PG_TABLE", "Ethernet0:3-4", {"profile": "[BUFFER_PROFILE_TABLE:" + expectedProfile + "]"}) + + # Remove the lossless PGs + dvs.runcmd("config interface buffer priority_group remove Ethernet0") + self.app_db.wait_for_deleted_entry("BUFFER_PG_TABLE", "Ethernet0:3-4") + + # Change to another cable length + dvs.runcmd("config interface cable_length Ethernet0 " + self.cableLenTest2) + # Check whether the old profile has been removed + self.app_db.wait_for_deleted_entry("BUFFER_PROFILE_TABLE", expectedProfile) + + # Re-add lossless PGs + dvs.runcmd("config interface buffer priority_group add Ethernet0 3-4") + expectedProfile = self.make_lossless_profile_name(self.originalSpeed, self.cableLenTest2) + # Check the BUFFER_PROFILE_TABLE and BUFFER_PG_TABLE + self.app_db.wait_for_entry("BUFFER_PROFILE_TABLE", expectedProfile) + self.app_db.wait_for_field_match("BUFFER_PG_TABLE", "Ethernet0:3-4", {"profile": "[BUFFER_PROFILE_TABLE:" + expectedProfile + "]"}) + + # Revert the cable length + dvs.runcmd("config interface cable_length Ethernet0 " + self.originalCableLen) + # Check the BUFFER_PROFILE_TABLE and BUFFER_PG_TABLE + self.app_db.wait_for_deleted_entry("BUFFER_PROFILE_TABLE", expectedProfile) + expectedProfile = self.make_lossless_profile_name(self.originalSpeed, self.originalCableLen) + self.app_db.wait_for_field_match("BUFFER_PG_TABLE", "Ethernet0:3-4", {"profile": "[BUFFER_PROFILE_TABLE:" + expectedProfile + "]"}) + + def test_MultipleLosslessPg(self, dvs, testlog): + self.setup_db(dvs) + + # Add another lossless PG + dvs.runcmd("config interface buffer priority_group add Ethernet0 6") + expectedProfile = self.make_lossless_profile_name(self.originalSpeed, self.originalCableLen) + self.app_db.wait_for_field_match("BUFFER_PG_TABLE", "Ethernet0:6", {"profile": "[BUFFER_PROFILE_TABLE:" + expectedProfile + "]"}) + + # change speed and check + dvs.runcmd("config interface speed Ethernet0 " + self.speedToTest1) + expectedProfile = self.make_lossless_profile_name(self.speedToTest1, self.originalCableLen) + self.app_db.wait_for_entry("BUFFER_PROFILE_TABLE", expectedProfile) + self.app_db.wait_for_field_match("BUFFER_PG_TABLE", "Ethernet0:3-4", {"profile": "[BUFFER_PROFILE_TABLE:" + expectedProfile + "]"}) + self.app_db.wait_for_field_match("BUFFER_PG_TABLE", "Ethernet0:6", {"profile": "[BUFFER_PROFILE_TABLE:" + expectedProfile + "]"}) + + # change cable length and check + dvs.runcmd("config interface cable_length Ethernet0 " + self.cableLenTest1) + self.app_db.wait_for_deleted_entry("BUFFER_PROFILE_TABLE", expectedProfile) + expectedProfile = self.make_lossless_profile_name(self.speedToTest1, self.cableLenTest1) + self.app_db.wait_for_entry("BUFFER_PROFILE_TABLE", expectedProfile) + self.app_db.wait_for_field_match("BUFFER_PG_TABLE", "Ethernet0:3-4", {"profile": "[BUFFER_PROFILE_TABLE:" + expectedProfile + "]"}) + self.app_db.wait_for_field_match("BUFFER_PG_TABLE", "Ethernet0:6", {"profile": "[BUFFER_PROFILE_TABLE:" + expectedProfile + "]"}) + + # revert the speed and cable length and check + dvs.runcmd("config interface cable_length Ethernet0 " + self.originalCableLen) + dvs.runcmd("config interface speed Ethernet0 " + self.originalSpeed) + self.app_db.wait_for_deleted_entry("BUFFER_PROFILE_TABLE", expectedProfile) + expectedProfile = self.make_lossless_profile_name(self.originalSpeed, self.originalCableLen) + self.app_db.wait_for_entry("BUFFER_PROFILE_TABLE", expectedProfile) + self.app_db.wait_for_field_match("BUFFER_PG_TABLE", "Ethernet0:3-4", {"profile": "[BUFFER_PROFILE_TABLE:" + expectedProfile + "]"}) + self.app_db.wait_for_field_match("BUFFER_PG_TABLE", "Ethernet0:6", {"profile": "[BUFFER_PROFILE_TABLE:" + expectedProfile + "]"}) + + def test_headroomOverride(self, dvs, testlog): + self.setup_db(dvs) + + # Configure static profile + dvs.runcmd("config buffer_profile add profile test -xon 18432 -xoff 16384") + self.app_db.wait_for_exact_match("BUFFER_PROFILE_TABLE", "test", + { "pool" : "[BUFFER_POOL:ingress_lossless_pool]", + "xon" : "18432", + "xoff" : "16384", + "size" : "34816", + "dynamic_th" : "0" + }) + + dvs.runcmd("config interface cable_length Ethernet0 " + self.cableLenTest1) + expectedProfile = self.make_lossless_profile_name(self.originalSpeed, self.cableLenTest1) + self.app_db.wait_for_entry("BUFFER_PROFILE_TABLE", expectedProfile) + self.app_db.wait_for_field_match("BUFFER_PG_TABLE", "Ethernet0:3-4", {"profile": "[BUFFER_PROFILE_TABLE:" + expectedProfile + "]"}) + + dvs.runcmd("config interface buffer priority_group lossless set Ethernet0 3-4 test") + self.app_db.wait_for_field_match("BUFFER_PG_TABLE", "Ethernet0:3-4", {"profile": "[BUFFER_PROFILE_TABLE:test]"}) + dvs.runcmd("config interface headroom_override add Ethernet0 test 6") + self.app_db.wait_for_field_match("BUFFER_PG_TABLE", "Ethernet0:6", {"profile": "[BUFFER_PROFILE_TABLE:test]"}) + + dvs.runcmd("config interface buffer priority_group remove Ethernet0") + self.app_db.wait_for_deleted_entry("BUFFER_PG_TABLE", "Ethernet0:3-4") + self.app_db.wait_for_deleted_entry("BUFFER_PG_TABLE", "Ethernet0:6") + + dvs.runcmd("config interface buffer priority_group add Ethernet0 3-4") + self.app_db.wait_for_field_match("BUFFER_PG_TABLE", "Ethernet0:3-4", {"profile": "[BUFFER_PROFILE_TABLE:" + expectedProfile + "]"}) + + dvs.runcmd("config interface cable_length Ethernet0 " + self.originalCableLen) + self.app_db.wait_for_deleted_entry("BUFFER_PROFILE_TABLE", expectedProfile) + expectedProfile = self.make_lossless_profile_name(self.originalSpeed, self.originalCableLen) + self.app_db.wait_for_entry("BUFFER_PROFILE_TABLE", expectedProfile) + self.app_db.wait_for_field_match("BUFFER_PG_TABLE", "Ethernet0:3-4", {"profile": "[BUFFER_PROFILE_TABLE:" + expectedProfile + "]"})