From 4d7deba9354c922d18d75325dcd4255f4096560f Mon Sep 17 00:00:00 2001 From: Kamil Cudnik Date: Thu, 14 Sep 2017 15:47:55 -0700 Subject: [PATCH] [syncd]: Refactor mutexes (#220) --- syncd/syncd.cpp | 30 ++-- syncd/syncd.h | 5 +- syncd/syncd_counters.cpp | 6 +- syncd/syncd_notifications.cpp | 315 +++++++++++++++++++++++++++++----- syncd/syncd_reinit.cpp | 2 + 5 files changed, 298 insertions(+), 60 deletions(-) diff --git a/syncd/syncd.cpp b/syncd/syncd.cpp index aec117689c01..d4349490f2d9 100644 --- a/syncd/syncd.cpp +++ b/syncd/syncd.cpp @@ -5,7 +5,19 @@ #include "swss/tokenize.h" #include -std::mutex g_db_mutex; +/** + * @brief Global mutex for thread synchronization + * + * Purpose of this mutex is to synchronize multiple threads like main thread, + * counters and notifications as well as all operations which require multiple + * Redis DB access. + * + * For example: query DB for next VID id number, and then put map RID and VID + * to Redis. From syncd point of view this entire operation should be atomic + * and no other thread should access DB or make assumption on previous + * information until entire operation will finish. + */ +std::mutex g_mutex; swss::RedisClient *g_redisClient = NULL; @@ -148,7 +160,6 @@ void remove_rid_and_vid_from_local( sai_object_id_t translate_rid_to_vid( _In_ sai_object_id_t rid) { - std::lock_guard lock(g_db_mutex); SWSS_LOG_ENTER(); @@ -277,7 +288,6 @@ void translate_rid_to_vid_list( sai_object_id_t translate_vid_to_rid( _In_ sai_object_id_t vid) { - std::lock_guard lock(g_db_mutex); SWSS_LOG_ENTER(); @@ -401,7 +411,6 @@ void snoop_get_attr( SWSS_LOG_DEBUG("%s", key.c_str()); - std::lock_guard lock(g_db_mutex); g_redisClient->hset(key, attr_id, attr_value); } @@ -589,7 +598,6 @@ void internal_syncd_get_send( // object type and object id, only get status is required // get response will not put any data to table only queue is used - std::lock_guard lock(g_db_mutex); getResponse->set(key, entry, "getresponse"); @@ -707,7 +715,6 @@ sai_status_t handle_generic( std::string str_vid = sai_serialize_object_id(object_id); std::string str_rid = sai_serialize_object_id(real_object_id); - std::lock_guard lock(g_db_mutex); g_redisClient->hset(VIDTORID, str_vid, str_rid); g_redisClient->hset(RIDTOVID, str_rid, str_vid); @@ -741,7 +748,6 @@ sai_status_t handle_generic( std::string str_vid = sai_serialize_object_id(object_id); std::string str_rid = sai_serialize_object_id(rid); - std::lock_guard lock(g_db_mutex); g_redisClient->hdel(VIDTORID, str_vid); g_redisClient->hdel(RIDTOVID, str_rid); @@ -984,7 +990,6 @@ void sendResponse(sai_status_t status) SWSS_LOG_NOTICE("sending response: %s", str_status.c_str()); - std::lock_guard lock(g_db_mutex); getResponse->set(str_status, entry, "notify"); } @@ -1001,7 +1006,6 @@ void clearTempView() // TODO optimize with lua script (this takes ~0.2s now) - std::lock_guard lock(g_db_mutex); for (const auto &key: g_redisClient->keys(pattern)) { @@ -1396,6 +1400,8 @@ sai_status_t processBulkEvent( sai_status_t processEvent(swss::ConsumerTable &consumer) { + std::lock_guard lock(g_mutex); + SWSS_LOG_ENTER(); swss::KeyOpFieldsValuesTuple kco; @@ -1799,8 +1805,6 @@ bool handleRestartQuery(swss::NotificationConsumer &restartQuery) bool isVeryFirstRun() { - std::lock_guard lock(g_db_mutex); - SWSS_LOG_ENTER(); // if lane map is not defined in redis db then @@ -1977,6 +1981,8 @@ int main(int argc, char **argv) startCountersThread(options.countersThreadIntervalInSeconds); } + startNotificationsProcessingThread(); + SWSS_LOG_NOTICE("syncd listening for events"); swss::Select s; @@ -2039,5 +2045,7 @@ int main(int argc, char **argv) stop_cli(); + stopNotificationsProcessingThread(); + return EXIT_SUCCESS; } diff --git a/syncd/syncd.h b/syncd/syncd.h index d82f08c3ed3e..6f53c2b9de0d 100644 --- a/syncd/syncd.h +++ b/syncd/syncd.h @@ -62,7 +62,7 @@ extern "C" { extern void exit_and_notify(int status) __attribute__ ((__noreturn__)); -extern std::mutex g_db_mutex; +extern std::mutex g_mutex; extern std::set g_defaultPriorityGroupsRids; extern std::set g_defaultSchedulerGroupsRids; extern std::set g_defaultQueuesRids; @@ -167,4 +167,7 @@ void stop_cli(); sai_status_t applyViewTransition(); sai_status_t syncdApplyView(); +void startNotificationsProcessingThread(); +void stopNotificationsProcessingThread(); + #endif // __SYNCD_H__ diff --git a/syncd/syncd_counters.cpp b/syncd/syncd_counters.cpp index 3da7eb93d9f6..b539d5619016 100644 --- a/syncd/syncd_counters.cpp +++ b/syncd/syncd_counters.cpp @@ -6,9 +6,11 @@ void collectCounters(swss::Table &countersTable, const std::vector &supportedCounters) { // collect counters should be under mutex - // sice configuration can change and we + // since configuration can change and we // don't want that during counters collection + std::lock_guard lock(g_mutex); + SWSS_LOG_ENTER(); uint32_t countersSize = (uint32_t)supportedCounters.size(); @@ -50,8 +52,6 @@ void collectCounters(swss::Table &countersTable, values.push_back(fvt); } - std::lock_guard lock(g_db_mutex); - countersTable.set(strPortId, values, ""); } } diff --git a/syncd/syncd_notifications.cpp b/syncd/syncd_notifications.cpp index 6a2a88c5708f..b6495cc9f968 100644 --- a/syncd/syncd_notifications.cpp +++ b/syncd/syncd_notifications.cpp @@ -1,8 +1,13 @@ #include "syncd.h" #include "sairedis.h" -// mutex to protect notification send call -std::mutex g_ntf_mutex; +#include +#include +#include + +// NOTE: all serialized notifications in notifications context +// contain non translated OIDs since translation can generate new +// VID and it will populate redis db void send_notification( _In_ std::string op, @@ -29,11 +34,9 @@ void send_notification( send_notification(op, data, entry); } -void on_switch_state_change( +void process_on_switch_state_change( _In_ sai_switch_oper_status_t switch_oper_status) { - std::lock_guard lock(g_ntf_mutex); - SWSS_LOG_ENTER(); std::string s = sai_serialize_switch_oper_status(switch_oper_status); @@ -87,8 +90,6 @@ void redisPutFdbEntryToAsicView( const std::string &strField = fvField(e); const std::string &strValue = fvValue(e); - std::lock_guard lock(g_db_mutex); - g_redisClient->hset(key, strField, strValue); } @@ -109,17 +110,13 @@ void redisPutFdbEntryToAsicView( std::string strAttrId = sai_serialize_attr_id(*meta); std::string strAttrValue = sai_serialize_attr_value(*meta, attr); - std::lock_guard lock(g_db_mutex); - g_redisClient->hset(key, strAttrId, strAttrValue); } -void on_fdb_event( +void process_on_fdb_event( _In_ uint32_t count, _In_ sai_fdb_event_notification_data_t *data) { - std::lock_guard lock(g_ntf_mutex); - SWSS_LOG_ENTER(); SWSS_LOG_DEBUG("fdb event count: %d", count); @@ -143,12 +140,10 @@ void on_fdb_event( send_notification("fdb_event", s); } -void on_port_state_change( +void process_on_port_state_change( _In_ uint32_t count, _In_ sai_port_oper_status_notification_t *data) { - std::lock_guard lock(g_ntf_mutex); - SWSS_LOG_ENTER(); SWSS_LOG_DEBUG("port notification count: %u", count); @@ -165,12 +160,10 @@ void on_port_state_change( send_notification("port_state_change", s); } -void on_port_event( +void process_on_port_event( _In_ uint32_t count, _In_ sai_port_event_notification_t *data) { - std::lock_guard lock(g_ntf_mutex); - SWSS_LOG_ENTER(); for (uint32_t i = 0; i < count; i++) @@ -185,61 +178,293 @@ void on_port_event( send_notification("port_event", s); } -void on_switch_shutdown_request() +void process_on_switch_shutdown_request() { - std::lock_guard lock(g_ntf_mutex); - SWSS_LOG_ENTER(); send_notification("switch_shutdown_request", ""); } +void handle_switch_state_change( + _In_ const std::string &data) +{ + SWSS_LOG_ENTER(); + + sai_switch_oper_status_t switch_oper_status; + + sai_deserialize_switch_oper_status(data, switch_oper_status); + + process_on_switch_state_change(switch_oper_status); +} + +void handle_fdb_event( + _In_ const std::string &data) +{ + SWSS_LOG_ENTER(); + + uint32_t count; + sai_fdb_event_notification_data_t *fdbevent = NULL; + + sai_deserialize_fdb_event_ntf(data, count, &fdbevent); + + process_on_fdb_event(count, fdbevent); + + sai_deserialize_free_fdb_event_ntf(count, fdbevent); +} + +void handle_port_state_change( + _In_ const std::string &data) +{ + SWSS_LOG_ENTER(); + + uint32_t count; + sai_port_oper_status_notification_t *portoperstatus = NULL; + + sai_deserialize_port_oper_status_ntf(data, count, &portoperstatus); + + process_on_port_state_change(count, portoperstatus); + + sai_deserialize_free_port_oper_status_ntf(count, portoperstatus); +} + +void handle_port_event( + _In_ const std::string &data) +{ + SWSS_LOG_ENTER(); + + uint32_t count; + sai_port_event_notification_t *portevent = NULL; + + sai_deserialize_port_event_ntf(data, count, &portevent); + + process_on_port_event(count, portevent); + + sai_deserialize_free_port_event_ntf(count, portevent); +} + +void handle_switch_shutdown_request( + _In_ const std::string &data) +{ + SWSS_LOG_ENTER(); + + process_on_switch_shutdown_request(); +} + +void processNotification( + _In_ const swss::KeyOpFieldsValuesTuple &item) +{ + std::lock_guard lock(g_mutex); + + SWSS_LOG_ENTER(); + + std::string notification = kfvKey(item); + std::string data = kfvOp(item); + + if (notification == "switch_state_change") + { + handle_switch_state_change(data); + } + else if (notification == "fdb_event") + { + handle_fdb_event(data); + } + else if (notification == "port_state_change") + { + handle_port_state_change(data); + } + else if (notification == "port_event") + { + handle_port_event(data); + } + else if (notification == "switch_shutdown_request") + { + handle_switch_shutdown_request(data); + } + else + { + SWSS_LOG_ERROR("unknow notification: %s", notification.c_str()); + } +} + +// condition variable will be used to notify processing thread +// that some notiffication arrived + +std::condition_variable cv; +std::mutex queue_mutex; +std::queue ntf_queue; + +void enqueue_notification( + _In_ std::string op, + _In_ std::string data, + _In_ std::vector &entry) +{ + SWSS_LOG_ENTER(); + + SWSS_LOG_INFO("%s %s", op.c_str(), data.c_str()); + + swss::KeyOpFieldsValuesTuple item(op, data, entry); + + // this is notification context, so we need to protect queue + + std::lock_guard lock(queue_mutex); + + ntf_queue.push(item); + + cv.notify_all(); +} + +void enqueue_notification( + _In_ std::string op, + _In_ std::string data) +{ + SWSS_LOG_ENTER(); + + std::vector entry; + + enqueue_notification(op, data, entry); +} + +void on_switch_state_change( + _In_ sai_switch_oper_status_t switch_oper_status) +{ + SWSS_LOG_ENTER(); + + std::string s = sai_serialize_switch_oper_status(switch_oper_status); + + enqueue_notification("switch_state_change", s); +} + +void on_fdb_event( + _In_ uint32_t count, + _In_ sai_fdb_event_notification_data_t *data) +{ + SWSS_LOG_ENTER(); + + std::string s = sai_serialize_fdb_event_ntf(count, data); + + enqueue_notification("fdb_event", s); +} + +void on_port_state_change( + _In_ uint32_t count, + _In_ sai_port_oper_status_notification_t *data) +{ + SWSS_LOG_ENTER(); + + std::string s = sai_serialize_port_oper_status_ntf(count, data); + + enqueue_notification("port_state_change", s); +} + +void on_port_event( + _In_ uint32_t count, + _In_ sai_port_event_notification_t *data) +{ + SWSS_LOG_ENTER(); + + std::string s = sai_serialize_port_event_ntf(count, data); + + enqueue_notification("port_event", s); +} + +void on_switch_shutdown_request() +{ + SWSS_LOG_ENTER(); + + enqueue_notification("switch_shutdown_request", ""); +} + void on_packet_event( _In_ const void *buffer, _In_ sai_size_t buffer_size, _In_ uint32_t attr_count, _In_ const sai_attribute_t *attr_list) { - std::lock_guard lock(g_ntf_mutex); - SWSS_LOG_ENTER(); SWSS_LOG_ERROR("not implemented"); +} + +// determine whether notification thread is running + +volatile bool runThread; + +std::mutex ntf_mutex; +std::unique_lock ulock(ntf_mutex); - /* - std::string s; - sai_serialize_primitive(buffer_size, s); +bool tryDequeue( + _Out_ swss::KeyOpFieldsValuesTuple &item) +{ + std::lock_guard lock(queue_mutex); - sai_serialize_buffer(buffer, buffer_size, s); + SWSS_LOG_ENTER(); - std::vector entry; + if (ntf_queue.empty()) + { + return false; + } - entry = SaiAttributeList::serialize_attr_list( - SAI_OBJECT_TYPE_PACKET, - attr_count, - attr_list, - false); + item = ntf_queue.front(); - // since attr_list is const, we can't replace rid's - // we need to create copy of that list + ntf_queue.pop(); - SaiAttributeList copy(SAI_OBJECT_TYPE_PACKET, entry, false); + return true; +} - translate_rid_to_vid_list(SAI_OBJECT_TYPE_PACKET, copy.get_attr_count(), copy.get_attr_list()); +void ntf_process_function() +{ + SWSS_LOG_ENTER(); - entry = SaiAttributeList::serialize_attr_list( - SAI_OBJECT_TYPE_PACKET, - copy.get_attr_count(), - copy.get_attr_list(), - false); + while (runThread) + { + cv.wait(ulock); + + // this is notifications processing thread context, which is different + // from SAI notifications context, we can safe use g_mutex here, + // processing each notification is under same mutex as processing main + // events, counters and reinit + + swss::KeyOpFieldsValuesTuple item; + + while (tryDequeue(item)) + { + processNotification(item); + } + } +} + +std::shared_ptr ntf_process_thread; + +void startNotificationsProcessingThread() +{ + SWSS_LOG_ENTER(); + + runThread = true; + + ntf_process_thread = std::make_shared(ntf_process_function); + + ntf_process_thread->detach(); +} + +void stopNotificationsProcessingThread() +{ + SWSS_LOG_ENTER(); + + runThread = false; + + cv.notify_all(); + + if (ntf_process_thread != nullptr) + { + ntf_process_thread->join(); + } - send_notification("packet_event", s, entry); - */ + ntf_process_thread = nullptr; } sai_switch_notification_t switch_notifications { - on_switch_state_change, + on_switch_state_change, on_fdb_event, on_port_state_change, on_port_event, diff --git a/syncd/syncd_reinit.cpp b/syncd/syncd_reinit.cpp index 9c371749f1ee..dfe7f5550c04 100644 --- a/syncd/syncd_reinit.cpp +++ b/syncd/syncd_reinit.cpp @@ -986,6 +986,8 @@ void onSyncdStart(bool warmStart) // id's for ports, this may cause race condition so we need // to use a lock here to prevent that + std::lock_guard lock(g_mutex); + SWSS_LOG_ENTER(); SWSS_LOG_TIMER("on syncd start");