From ca5e43fde84d6c8b5502f49bc954a957185a4baf Mon Sep 17 00:00:00 2001 From: Marian Pritsak Date: Tue, 26 Sep 2017 11:01:12 +0300 Subject: [PATCH] [PFC WD]: Initial support (#227) * [sycnd]: Support for PFC Watchdog Signed-off-by: marian-pritsak * [syncd]: Use COUNTERS_TABLE in PFC WD Signed-off-by: marian-pritsak * [sai_redis_queue.cpp]: Revert redundant code. Signed-off-by: marian-pritsak * [syncd][PFCWD]: Set polling interval to 50 ms Signed-off-by: marian-pritsak * [sycnd][PFC WD]: Use make_shared instead of new Signed-off-by: marian-pritsak * [sycnd][PFC WD]: Use make_shared instead of new Signed-off-by: marian-pritsak * [syncd][PFC WD]: add missing space Signed-off-by: marian-pritsak * [tests][Makefile.am]: Add PFC WD sources Signed-off-by: marian-pritsak --- meta/saiserialize.cpp | 27 ++++ meta/saiserialize.h | 11 ++ syncd/Makefile.am | 3 +- syncd/syncd.cpp | 77 ++++++++++- syncd/syncd.h | 2 +- syncd/syncd_pfc_watchdog.cpp | 260 +++++++++++++++++++++++++++++++++++ syncd/syncd_pfc_watchdog.h | 65 +++++++++ tests/Makefile.am | 3 +- 8 files changed, 438 insertions(+), 10 deletions(-) create mode 100644 syncd/syncd_pfc_watchdog.cpp create mode 100644 syncd/syncd_pfc_watchdog.h diff --git a/meta/saiserialize.cpp b/meta/saiserialize.cpp index e180efffb..06aca497e 100644 --- a/meta/saiserialize.cpp +++ b/meta/saiserialize.cpp @@ -708,6 +708,14 @@ std::string sai_serialize_port_stat( return sai_serialize_enum(counter, &sai_metadata_enum_sai_port_stat_t); } +std::string sai_serialize_queue_stat( + _In_ const sai_queue_stat_t counter) +{ + SWSS_LOG_ENTER(); + + return sai_serialize_enum(counter, &sai_metadata_enum_sai_queue_stat_t); +} + std::string sai_serialize_switch_oper_status( _In_ sai_object_id_t switch_id, _In_ sai_switch_oper_status_t status) @@ -2701,3 +2709,22 @@ void sai_deserialize_free_port_oper_status_ntf( delete port_oper_status; } + +void sai_deserialize_port_stat( + _In_ const std::string& s, + _Out_ sai_port_stat_t& stat) +{ + SWSS_LOG_ENTER(); + + sai_deserialize_enum(s, &sai_metadata_enum_sai_port_stat_t, (int32_t&)stat); +} + +void sai_deserialize_queue_stat( + _In_ const std::string& s, + _Out_ sai_queue_stat_t& stat) +{ + SWSS_LOG_ENTER(); + + sai_deserialize_enum(s, &sai_metadata_enum_sai_queue_stat_t, (int32_t&)stat); +} + diff --git a/meta/saiserialize.h b/meta/saiserialize.h index 125c74b57..21e44b35c 100644 --- a/meta/saiserialize.h +++ b/meta/saiserialize.h @@ -72,6 +72,9 @@ std::string sai_serialize_common_api( std::string sai_serialize_port_stat( _In_ const sai_port_stat_t counter); +std::string sai_serialize_queue_stat( + _In_ const sai_queue_stat_t counter); + std::string sai_serialize_switch_oper_status( _In_ sai_object_id_t switch_id, _In_ sai_switch_oper_status_t status); @@ -196,4 +199,12 @@ void sai_deserialize_free_port_oper_status_ntf( _In_ uint32_t count, _In_ sai_port_oper_status_notification_t* portoperstatus); +void sai_deserialize_port_stat( + _In_ const std::string& s, + _Out_ sai_port_stat_t& stat); + +void sai_deserialize_queue_stat( + _In_ const std::string& s, + _Out_ sai_queue_stat_t& stat); + #endif // __SAI_SERIALIZE__ diff --git a/syncd/Makefile.am b/syncd/Makefile.am index ae9377c8a..2fff33bcf 100644 --- a/syncd/Makefile.am +++ b/syncd/Makefile.am @@ -20,7 +20,8 @@ syncd_SOURCES = \ syncd_hard_reinit.cpp \ syncd_notifications.cpp \ syncd_counters.cpp \ - syncd_applyview.cpp + syncd_applyview.cpp \ + syncd_pfc_watchdog.cpp syncd_CPPFLAGS = $(DBGFLAGS) $(AM_CPPFLAGS) $(CFLAGS_COMMON) $(SAIFLAGS) syncd_LDADD = -lhiredis -lswsscommon $(SAILIB) -lpthread -L$(top_srcdir)/meta/.libs -lsaimetadata -lsaimeta -ldl diff --git a/syncd/syncd.cpp b/syncd/syncd.cpp index d82899d6c..ad9bd9075 100644 --- a/syncd/syncd.cpp +++ b/syncd/syncd.cpp @@ -1,6 +1,7 @@ #include "syncd.h" #include "syncd_saiswitch.h" #include "sairedis.h" +#include "syncd_pfc_watchdog.h" #include "swss/tokenize.h" #include @@ -2330,6 +2331,62 @@ sai_status_t processEvent( return status; } +void processPfcWdEvent( + _In_ swss::ConsumerStateTable &consumer) +{ + std::lock_guard lock(g_mutex); + + SWSS_LOG_ENTER(); + + swss::KeyOpFieldsValuesTuple kco; + consumer.pop(kco); + + const auto &key = kfvKey(kco); + const auto &op = kfvOp(kco); + + sai_object_id_t queueVid = SAI_NULL_OBJECT_ID; + sai_deserialize_object_id(key, queueVid); + sai_object_id_t queueId = translate_vid_to_rid(queueVid); + + const auto values = kfvFieldsValues(kco); + for (const auto& valuePair : values) + { + const auto field = fvField(valuePair); + const auto value = fvValue(valuePair); + + if (op == DEL_COMMAND) + { + PfcWatchdog::removeQueue(queueVid); + continue; + } + + auto idStrings = swss::tokenize(value, ','); + + if (field == PFC_WD_PORT_COUNTER_ID_LIST) + { + std::vector portCounterIds; + for (const auto &str : idStrings) + { + sai_port_stat_t stat; + sai_deserialize_port_stat(str, stat); + portCounterIds.push_back(stat); + } + PfcWatchdog::setPortCounterList(queueVid, queueId, portCounterIds); + } + else if (field == PFC_WD_QUEUE_COUNTER_ID_LIST) + { + std::vector queueCounterIds; + for (const auto &str : idStrings) + { + sai_queue_stat_t stat; + sai_deserialize_queue_stat(str, stat); + queueCounterIds.push_back(stat); + } + PfcWatchdog::setQueueCounterList(queueVid, queueId, queueCounterIds); + } + } +} + void printUsage() { std::cout << "Usage: syncd [-N] [-d] [-p profile] [-i interval] [-t [cold|warm|fast]] [-h] [-u] [-S]" << std::endl; @@ -2901,13 +2958,15 @@ int main(int argc, char **argv) } #endif // SAITHRIFT - std::shared_ptr db = std::make_shared(ASIC_DB, swss::DBConnector::DEFAULT_UNIXSOCKET, 0); + std::shared_ptr dbAsic = std::make_shared(ASIC_DB, swss::DBConnector::DEFAULT_UNIXSOCKET, 0); std::shared_ptr dbNtf = std::make_shared(ASIC_DB, swss::DBConnector::DEFAULT_UNIXSOCKET, 0); + std::shared_ptr dbPfcWatchdog = std::make_shared(PFC_WD_DB, swss::DBConnector::DEFAULT_UNIXSOCKET, 0); - g_redisClient = std::make_shared(db.get()); + g_redisClient = std::make_shared(dbAsic.get()); - std::shared_ptr asicState = std::make_shared(db.get(), ASIC_STATE_TABLE); - std::shared_ptr restartQuery = std::make_shared(db.get(), "RESTARTQUERY"); + std::shared_ptr asicState = std::make_shared(dbAsic.get(), ASIC_STATE_TABLE); + std::shared_ptr restartQuery = std::make_shared(dbAsic.get(), "RESTARTQUERY"); + std::shared_ptr pfcWdState = std::make_shared(dbPfcWatchdog.get(), PFC_WD_STATE_TABLE); /* * At the end we cant use producer consumer concept since if one proces @@ -2915,7 +2974,7 @@ int main(int argc, char **argv) * response queue will also trigger another "response". */ - getResponse = std::make_shared(db.get(), "GETRESPONSE"); + getResponse = std::make_shared(dbAsic.get(), "GETRESPONSE"); notifications = std::make_shared(dbNtf.get(), "NOTIFICATIONS"); g_veryFirstRun = isVeryFirstRun(); @@ -3003,6 +3062,7 @@ int main(int argc, char **argv) s.addSelectable(asicState.get()); s.addSelectable(restartQuery.get()); + s.addSelectable(pfcWdState.get()); SWSS_LOG_NOTICE("starting main loop"); @@ -3027,8 +3087,11 @@ int main(int argc, char **argv) warmRestartHint = handleRestartQuery(*restartQuery); break; } - - if (result == swss::Select::OBJECT) + else if (sel == pfcWdState.get()) + { + processPfcWdEvent(*(swss::ConsumerStateTable*)sel); + } + else if (result == swss::Select::OBJECT) { processEvent(*(swss::ConsumerTable*)sel); } diff --git a/syncd/syncd.h b/syncd/syncd.h index 83a4d3fd7..7b166f87a 100644 --- a/syncd/syncd.h +++ b/syncd/syncd.h @@ -31,6 +31,7 @@ extern "C" { #include "swss/dbconnector.h" #include "swss/producertable.h" #include "swss/consumertable.h" +#include "swss/consumerstatetable.h" #include "swss/notificationconsumer.h" #include "swss/notificationproducer.h" #include "swss/selectableevent.h" @@ -100,7 +101,6 @@ void startCountersThread( _In_ int intervalInSeconds); sai_status_t syncdApplyView(); - void check_notifications_pointers( _In_ uint32_t attr_count, _In_ sai_attribute_t *attr_list); diff --git a/syncd/syncd_pfc_watchdog.cpp b/syncd/syncd_pfc_watchdog.cpp new file mode 100644 index 000000000..f19198158 --- /dev/null +++ b/syncd/syncd_pfc_watchdog.cpp @@ -0,0 +1,260 @@ +#include "syncd_pfc_watchdog.h" +#include "syncd.h" + +#define PFC_WD_POLL_MSECS 50 + +PfcWatchdog::PfcCounterIds::PfcCounterIds( + _In_ sai_object_id_t queue, + _In_ sai_object_id_t port, + _In_ const std::vector &portIds, + _In_ const std::vector &queueIds): + queueId(queue), portId(port), portCounterIds(portIds), queueCounterIds(queueIds) +{ +} + +void PfcWatchdog::setPortCounterList( + _In_ sai_object_id_t queueVid, + _In_ sai_object_id_t queueId, + _In_ const std::vector &counterIds) +{ + SWSS_LOG_ENTER(); + + PfcWatchdog &wd = getInstance(); + + sai_object_id_t portId = queueIdToPortId(queueId); + if (portId == SAI_NULL_OBJECT_ID) + { + return; + } + + auto it = wd.m_counterIdsMap.find(queueVid); + if (it != wd.m_counterIdsMap.end()) + { + (*it).second->portCounterIds = counterIds; + return; + } + + auto pfcCounterIds = std::make_shared(queueId, + portId, + counterIds, + std::vector()); + wd.m_counterIdsMap.emplace(queueVid, pfcCounterIds); + + // Start watchdog thread in case it was not running due to empty counter IDs map + wd.startWatchdogThread(); +} + +void PfcWatchdog::setQueueCounterList( + _In_ sai_object_id_t queueVid, + _In_ sai_object_id_t queueId, + _In_ const std::vector &counterIds) +{ + SWSS_LOG_ENTER(); + + PfcWatchdog &wd = getInstance(); + + sai_object_id_t portId = queueIdToPortId(queueId); + if (portId == SAI_NULL_OBJECT_ID) + { + return; + } + + auto it = wd.m_counterIdsMap.find(queueVid); + if (it != wd.m_counterIdsMap.end()) + { + (*it).second->queueCounterIds = counterIds; + return; + } + + auto pfcCounterIds = std::make_shared(queueId, + portId, + std::vector(), + counterIds); + wd.m_counterIdsMap.emplace(queueVid, pfcCounterIds); + + // Start watchdog thread in case it was not running due to empty counter IDs map + wd.startWatchdogThread(); +} + +void PfcWatchdog::removeQueue( + _In_ sai_object_id_t queueVid) +{ + SWSS_LOG_ENTER(); + + PfcWatchdog &wd = getInstance(); + + auto it = wd.m_counterIdsMap.find(queueVid); + if (it == wd.m_counterIdsMap.end()) + { + SWSS_LOG_ERROR("Trying to remove nonexisting queue counter Ids 0x%lx", queueVid); + return; + } + + wd.m_counterIdsMap.erase(it); + + // Stop watchdog thread if counter IDs map is empty + if (wd.m_counterIdsMap.empty()) + { + wd.endWatchdogThread(); + } +} + +PfcWatchdog::~PfcWatchdog(void) +{ + endWatchdogThread(); +} + +PfcWatchdog::PfcWatchdog(void) +{ +} + +PfcWatchdog& PfcWatchdog::getInstance(void) +{ + static PfcWatchdog wd; + + return wd; +} + +sai_object_id_t PfcWatchdog::queueIdToPortId( + _In_ sai_object_id_t queueId) +{ + SWSS_LOG_ENTER(); + + sai_attribute_t attr = + { + .id = SAI_QUEUE_ATTR_PORT, + .value = + { + .oid = queueId, + } + }; + + sai_status_t status = sai_metadata_sai_queue_api->get_queue_attribute(queueId, 1, &attr); + if (status != SAI_STATUS_SUCCESS) + { + SWSS_LOG_ERROR("Failed to get port Id of queue 0x%lx: %d", queueId, status); + return SAI_NULL_OBJECT_ID; + } + + return attr.value.oid; +} + +void PfcWatchdog::collectCounters( + _In_ swss::Table &countersTable) +{ + SWSS_LOG_ENTER(); + + std::lock_guard lock(g_mutex); + + // Collect stats for every registered queue + for (const auto &kv: m_counterIdsMap) + { + const auto &queueVid = kv.first; + const auto &queueId = kv.second->queueId; + const auto &portId = kv.second->portId; + const auto &portCounterIds = kv.second->portCounterIds; + const auto &queueCounterIds = kv.second->queueCounterIds; + + std::vector portStats(portCounterIds.size()); + std::vector queueStats(queueCounterIds.size()); + + // Get port stats for queue + sai_status_t status = sai_metadata_sai_port_api->get_port_stats( + portId, + static_cast(portCounterIds.size()), + portCounterIds.data(), + portStats.data()); + if (status != SAI_STATUS_SUCCESS) + { + SWSS_LOG_ERROR("Failed to get stats of port 0x%lx: %d", portId, status); + continue; + } + + // Get queue stats + status = sai_metadata_sai_queue_api->get_queue_stats( + queueId, + static_cast(queueCounterIds.size()), + queueCounterIds.data(), + queueStats.data()); + if (status != SAI_STATUS_SUCCESS) + { + SWSS_LOG_ERROR("Failed to get stats of queue 0x%lx: %d", queueVid, status); + continue; + } + + // Push all counter values to a single vector + std::vector values; + + for (size_t i = 0; i != portCounterIds.size(); i++) + { + const std::string &counterName = sai_serialize_port_stat(portCounterIds[i]); + values.emplace_back(counterName, std::to_string(portStats[i])); + } + + for (size_t i = 0; i != queueCounterIds.size(); i++) + { + const std::string &counterName = sai_serialize_queue_stat(queueCounterIds[i]); + values.emplace_back(counterName, std::to_string(queueStats[i])); + } + + // Write counters to DB + std::string queueVidStr = sai_serialize_object_id(queueVid); + + countersTable.set(queueVidStr, values, ""); + } +} + +void PfcWatchdog::pfcWatchdogThread(void) +{ + SWSS_LOG_ENTER(); + + swss::DBConnector db(COUNTERS_DB, swss::DBConnector::DEFAULT_UNIXSOCKET, 0); + swss::Table countersTable(&db, COUNTERS_TABLE); + + while (m_runPfcWatchdogThread) + { + collectCounters(countersTable); + + std::unique_lock lk(m_mtxSleep); + m_cvSleep.wait_for(lk, std::chrono::milliseconds(PFC_WD_POLL_MSECS)); + } +} + +void PfcWatchdog::startWatchdogThread(void) +{ + SWSS_LOG_ENTER(); + + if (m_runPfcWatchdogThread.load() == true) + { + return; + } + + m_runPfcWatchdogThread = true; + + m_pfcWatchdogThread = std::make_shared(&PfcWatchdog::pfcWatchdogThread, this); + + SWSS_LOG_INFO("PFC Watchdog thread started"); +} + +void PfcWatchdog::endWatchdogThread(void) +{ + SWSS_LOG_ENTER(); + + if (m_runPfcWatchdogThread.load() == false) + { + return; + } + + m_runPfcWatchdogThread = false; + + m_cvSleep.notify_all(); + + if (m_pfcWatchdogThread != nullptr) + { + SWSS_LOG_INFO("Wait for PFC Watchdog thread to end"); + + m_pfcWatchdogThread->join(); + } + + SWSS_LOG_INFO("PFC Watchdog thread ended"); +} diff --git a/syncd/syncd_pfc_watchdog.h b/syncd/syncd_pfc_watchdog.h new file mode 100644 index 000000000..4d8c73fb8 --- /dev/null +++ b/syncd/syncd_pfc_watchdog.h @@ -0,0 +1,65 @@ +#ifndef PFC_WATCHDOG_H +#define PFC_WATCHDOG_H + +extern "C" { +#include "sai.h" +} + +#include +#include +#include +#include "swss/table.h" + +class PfcWatchdog +{ + public: + static void setPortCounterList( + _In_ sai_object_id_t queueVid, + _In_ sai_object_id_t queueId, + _In_ const std::vector &counterIds); + static void setQueueCounterList( + _In_ sai_object_id_t queueVid, + _In_ sai_object_id_t queueId, + _In_ const std::vector &counterIds); + static void removeQueue( + _In_ sai_object_id_t queueVid); + + PfcWatchdog( + _In_ const PfcWatchdog&) = delete; + ~PfcWatchdog(void); + + private: + struct PfcCounterIds + { + PfcCounterIds( + _In_ sai_object_id_t queue, + _In_ sai_object_id_t port, + _In_ const std::vector &portIds, + _In_ const std::vector &queueIds); + + sai_object_id_t queueId; + sai_object_id_t portId; + std::vector portCounterIds; + std::vector queueCounterIds; + }; + + PfcWatchdog(void); + static PfcWatchdog& getInstance(void); + static sai_object_id_t queueIdToPortId( + _In_ sai_object_id_t queueVid); + void collectCounters( + _In_ swss::Table &countersTable); + void pfcWatchdogThread(void); + void startWatchdogThread(void); + void endWatchdogThread(void); + + // Key is a queue Virtual ID + std::map> m_counterIdsMap; + + std::atomic_bool m_runPfcWatchdogThread = { false }; + std::shared_ptr m_pfcWatchdogThread = nullptr; + std::mutex m_mtxSleep; + std::condition_variable m_cvSleep; +}; + +#endif diff --git a/tests/Makefile.am b/tests/Makefile.am index e470bdf02..3df592169 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -16,7 +16,8 @@ vssyncd_SOURCES = \ ../syncd/syncd_hard_reinit.cpp \ ../syncd/syncd_notifications.cpp \ ../syncd/syncd_counters.cpp \ - ../syncd/syncd_applyview.cpp + ../syncd/syncd_applyview.cpp \ + ../syncd/syncd_pfc_watchdog.cpp vssyncd_CPPFLAGS = $(DBGFLAGS) $(AM_CPPFLAGS) $(CFLAGS_COMMON) $(SAIFLAGS) vssyncd_LDADD = -lhiredis -lswsscommon $(SAILIB) -lpthread -L$(top_srcdir)/meta/.libs -lsaimetadata -lsaimeta -ldl