Skip to content

Commit

Permalink
[dash] Improve dash orchagent ZMQ code. (sonic-net#2836)
Browse files Browse the repository at this point in the history
Improve dash orchagent code.

**What I did**
Move ZmqOrch related code to new files.
Remove unused code.
Change ZmqOrch::doTask() to use ConsumerBase as parameter.
Fix orchagent start parameter parse bug.
Fix dash orch can't receive data when ZMQ disabled issue.

**Why I did it**
Improve dash orchagent code.
HLD: https://github.com/sonic-net/SONiC/blob/master/doc/sonic-swss-common/ZMQ%20producer-consumer%20state%20table%20design.md?plain=1

**How I verified it**
Pass all UT.

**Details if related**
  • Loading branch information
liuh-80 authored Jul 6, 2023
1 parent c3ede76 commit 982a341
Show file tree
Hide file tree
Showing 15 changed files with 131 additions and 124 deletions.
1 change: 1 addition & 0 deletions orchagent/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ orchagent_SOURCES = \
srv6orch.cpp \
response_publisher.cpp \
nvgreorch.cpp \
zmqorch.cpp \
dash/dashorch.cpp \
dash/dashrouteorch.cpp \
dash/dashvnetorch.cpp \
Expand Down
2 changes: 1 addition & 1 deletion orchagent/dash/dashaclorch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ DashAclOrch::DashAclOrch(DBConnector *db, const vector<string> &tables, DashOrch
assert(m_dash_orch);
}

void DashAclOrch::doTask(ZmqConsumer &consumer)
void DashAclOrch::doTask(ConsumerBase &consumer)
{
SWSS_LOG_ENTER();

Expand Down
3 changes: 2 additions & 1 deletion orchagent/dash/dashaclorch.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include <dbconnector.h>
#include <bulker.h>
#include <orch.h>
#include "zmqorch.h"
#include "zmqserver.h"

#include "dashorch.h"
Expand Down Expand Up @@ -73,7 +74,7 @@ class DashAclOrch : public ZmqOrch
DashAclOrch(swss::DBConnector *db, const std::vector<std::string> &tables, DashOrch *dash_orch, swss::ZmqServer *zmqServer);

private:
void doTask(ZmqConsumer &consumer);
void doTask(ConsumerBase &consumer);

task_process_status taskUpdateDashAclIn(
const std::string &key,
Expand Down
10 changes: 5 additions & 5 deletions orchagent/dash/dashorch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ bool DashOrch::removeApplianceEntry(const string& appliance_id)
return true;
}

void DashOrch::doTaskApplianceTable(ZmqConsumer& consumer)
void DashOrch::doTaskApplianceTable(ConsumerBase& consumer)
{
SWSS_LOG_ENTER();

Expand Down Expand Up @@ -218,7 +218,7 @@ bool DashOrch::removeRoutingTypeEntry(const string& routing_type)
return true;
}

void DashOrch::doTaskRoutingTypeTable(ZmqConsumer& consumer)
void DashOrch::doTaskRoutingTypeTable(ConsumerBase& consumer)
{
SWSS_LOG_ENTER();

Expand Down Expand Up @@ -529,7 +529,7 @@ bool DashOrch::removeEni(const string& eni)
return true;
}

void DashOrch::doTaskEniTable(ZmqConsumer& consumer)
void DashOrch::doTaskEniTable(ConsumerBase& consumer)
{
SWSS_LOG_ENTER();

Expand Down Expand Up @@ -624,7 +624,7 @@ bool DashOrch::removeQosEntry(const string& qos_name)
return true;
}

void DashOrch::doTaskQosTable(ZmqConsumer& consumer)
void DashOrch::doTaskQosTable(ConsumerBase& consumer)
{
auto it = consumer.m_toSync.begin();
while (it != consumer.m_toSync.end())
Expand Down Expand Up @@ -684,7 +684,7 @@ void DashOrch::doTaskQosTable(ZmqConsumer& consumer)
}
}

void DashOrch::doTask(ZmqConsumer& consumer)
void DashOrch::doTask(ConsumerBase& consumer)
{
SWSS_LOG_ENTER();

Expand Down
12 changes: 6 additions & 6 deletions orchagent/dash/dashorch.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
#include "ipprefix.h"
#include "macaddress.h"
#include "timer.h"
#include "dashorch.h"
#include "zmqorch.h"
#include "zmqserver.h"

struct ApplianceEntry
Expand Down Expand Up @@ -64,11 +64,11 @@ class DashOrch : public ZmqOrch
RoutingTypeTable routing_type_entries_;
EniTable eni_entries_;
QosTable qos_entries_;
void doTask(ZmqConsumer &consumer);
void doTaskApplianceTable(ZmqConsumer &consumer);
void doTaskRoutingTypeTable(ZmqConsumer &consumer);
void doTaskEniTable(ZmqConsumer &consumer);
void doTaskQosTable(ZmqConsumer &consumer);
void doTask(ConsumerBase &consumer);
void doTaskApplianceTable(ConsumerBase &consumer);
void doTaskRoutingTypeTable(ConsumerBase &consumer);
void doTaskEniTable(ConsumerBase &consumer);
void doTaskQosTable(ConsumerBase &consumer);
bool addApplianceEntry(const std::string& appliance_id, const ApplianceEntry &entry);
bool removeApplianceEntry(const std::string& appliance_id);
bool addRoutingTypeEntry(const std::string& routing_type, const RoutingTypeEntry &entry);
Expand Down
6 changes: 3 additions & 3 deletions orchagent/dash/dashrouteorch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ bool DashRouteOrch::removeOutboundRoutingPost(const string& key, const OutboundR
return true;
}

void DashRouteOrch::doTaskRouteTable(ZmqConsumer& consumer)
void DashRouteOrch::doTaskRouteTable(ConsumerBase& consumer)
{
SWSS_LOG_ENTER();

Expand Down Expand Up @@ -481,7 +481,7 @@ bool DashRouteOrch::removeInboundRoutingPost(const string& key, const InboundRou
return true;
}

void DashRouteOrch::doTaskRouteRuleTable(ZmqConsumer& consumer)
void DashRouteOrch::doTaskRouteRuleTable(ConsumerBase& consumer)
{
SWSS_LOG_ENTER();

Expand Down Expand Up @@ -626,7 +626,7 @@ void DashRouteOrch::doTaskRouteRuleTable(ZmqConsumer& consumer)
}
}

void DashRouteOrch::doTask(ZmqConsumer& consumer)
void DashRouteOrch::doTask(ConsumerBase& consumer)
{
SWSS_LOG_ENTER();

Expand Down
7 changes: 4 additions & 3 deletions orchagent/dash/dashrouteorch.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "macaddress.h"
#include "timer.h"
#include "dashorch.h"
#include "zmqorch.h"
#include "zmqserver.h"

struct OutboundRoutingEntry
Expand Down Expand Up @@ -90,9 +91,9 @@ class DashRouteOrch : public ZmqOrch
EntityBulker<sai_dash_inbound_routing_api_t> inbound_routing_bulker_;
DashOrch *dash_orch_;

void doTask(ZmqConsumer &consumer);
void doTaskRouteTable(ZmqConsumer &consumer);
void doTaskRouteRuleTable(ZmqConsumer &consumer);
void doTask(ConsumerBase &consumer);
void doTaskRouteTable(ConsumerBase &consumer);
void doTaskRouteRuleTable(ConsumerBase &consumer);
bool addOutboundRouting(const std::string& key, OutboundRoutingBulkContext& ctxt);
bool addOutboundRoutingPost(const std::string& key, const OutboundRoutingBulkContext& ctxt);
bool removeOutboundRouting(const std::string& key, OutboundRoutingBulkContext& ctxt);
Expand Down
6 changes: 3 additions & 3 deletions orchagent/dash/dashvnetorch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ bool DashVnetOrch::removeVnetPost(const string& vnet_name, const DashVnetBulkCon
return true;
}

void DashVnetOrch::doTaskVnetTable(ZmqConsumer& consumer)
void DashVnetOrch::doTaskVnetTable(ConsumerBase& consumer)
{
SWSS_LOG_ENTER();

Expand Down Expand Up @@ -628,7 +628,7 @@ bool DashVnetOrch::removeVnetMapPost(const string& key, const VnetMapBulkContext
return true;
}

void DashVnetOrch::doTaskVnetMapTable(ZmqConsumer& consumer)
void DashVnetOrch::doTaskVnetMapTable(ConsumerBase& consumer)
{
SWSS_LOG_ENTER();

Expand Down Expand Up @@ -776,7 +776,7 @@ void DashVnetOrch::doTaskVnetMapTable(ZmqConsumer& consumer)
}
}

void DashVnetOrch::doTask(ZmqConsumer &consumer)
void DashVnetOrch::doTask(ConsumerBase &consumer)
{
SWSS_LOG_ENTER();

Expand Down
7 changes: 4 additions & 3 deletions orchagent/dash/dashvnetorch.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "ipaddresses.h"
#include "macaddress.h"
#include "timer.h"
#include "zmqorch.h"
#include "zmqserver.h"

struct VnetEntry
Expand Down Expand Up @@ -89,9 +90,9 @@ class DashVnetOrch : public ZmqOrch
EntityBulker<sai_dash_outbound_ca_to_pa_api_t> outbound_ca_to_pa_bulker_;
EntityBulker<sai_dash_pa_validation_api_t> pa_validation_bulker_;

void doTask(ZmqConsumer &consumer);
void doTaskVnetTable(ZmqConsumer &consumer);
void doTaskVnetMapTable(ZmqConsumer &consumer);
void doTask(ConsumerBase &consumer);
void doTaskVnetTable(ConsumerBase &consumer);
void doTaskVnetMapTable(ConsumerBase &consumer);
bool addVnet(const std::string& key, DashVnetBulkContext& ctxt);
bool addVnetPost(const std::string& key, const DashVnetBulkContext& ctxt);
bool removeVnet(const std::string& key, DashVnetBulkContext& ctxt);
Expand Down
2 changes: 1 addition & 1 deletion orchagent/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ int main(int argc, char **argv)
string responsepublisher_rec_filename = "responsepublisher.rec";
int record_type = 3; // Only swss and sairedis recordings enabled by default.

while ((opt = getopt(argc, argv, "b:m:r:f:j:d:i:hsz:k:")) != -1)
while ((opt = getopt(argc, argv, "b:m:r:f:j:d:i:hsz:k:q:")) != -1)
{
switch (opt)
{
Expand Down
68 changes: 0 additions & 68 deletions orchagent/orch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,30 +66,6 @@ Orch::~Orch()
}
}

ZmqOrch::ZmqOrch(DBConnector *db, const string tableName, int pri, ZmqServer *zmqServer)
: Orch()
{
addConsumer(db, tableName, pri, zmqServer);
}

ZmqOrch::ZmqOrch(DBConnector *db, const vector<string> &tableNames, ZmqServer *zmqServer)
: Orch()
{
for (auto it : tableNames)
{
addConsumer(db, it, default_orch_pri, zmqServer);
}
}

ZmqOrch::ZmqOrch(DBConnector *db, const vector<table_name_with_pri_t> &tableNames_with_pri, ZmqServer *zmqServer)
: Orch()
{
for (const auto& it : tableNames_with_pri)
{
addConsumer(db, it.first, it.second, zmqServer);
}
}

vector<Selectable *> Orch::getSelectables()
{
vector<Selectable *> selectables;
Expand Down Expand Up @@ -295,29 +271,6 @@ void Consumer::drain()
((Orch *)m_orch)->doTask((Consumer&)*this);
}

void ZmqConsumer::execute()
{
// ConsumerBase::execute_impl<swss::ConsumerTableBase>();
SWSS_LOG_ENTER();

size_t update_size = 0;
auto table = static_cast<swss::ZmqConsumerStateTable *>(getSelectable());
do
{
std::deque<KeyOpFieldsValuesTuple> entries;
table->pops(entries);
update_size = addToSync(entries);
} while (update_size != 0);

drain();
}

void ZmqConsumer::drain()
{
if (!m_toSync.empty())
((ZmqOrch *)m_orch)->doTask((ZmqConsumer&)*this);
}

size_t Orch::addExistingData(const string& tableName)
{
auto consumer = dynamic_cast<Consumer *>(getExecutor(tableName));
Expand Down Expand Up @@ -890,27 +843,6 @@ void Orch::addConsumer(DBConnector *db, string tableName, int pri)
}
}

void ZmqOrch::addConsumer(DBConnector *db, string tableName, int pri, ZmqServer *zmqServer)
{
if (db->getDbId() == APPL_DB)
{
if (zmqServer != nullptr)
{
SWSS_LOG_DEBUG("ZmqConsumer initialize for: %s", tableName.c_str());
addExecutor(new ZmqConsumer(new ZmqConsumerStateTable(db, tableName, *zmqServer, gBatchSize, pri), this, tableName));
}
else
{
SWSS_LOG_DEBUG("Consumer initialize for: %s", tableName.c_str());
addExecutor(new Consumer(new ConsumerStateTable(db, tableName, gBatchSize, pri), this, tableName));
}
}
else
{
SWSS_LOG_WARN("ZmqOrch does not support create consumer for db: %d, table: %s", db->getDbId(), tableName.c_str());
}
}

void Orch::addExecutor(Executor* executor)
{
auto inserted = m_consumerMap.emplace(std::piecewise_construct,
Expand Down
30 changes: 0 additions & 30 deletions orchagent/orch.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,23 +196,6 @@ class Consumer : public ConsumerBase {
void drain() override;
};

class ZmqConsumer : public ConsumerBase {
public:
ZmqConsumer(swss::ZmqConsumerStateTable *select, Orch *orch, const std::string &name)
: ConsumerBase(select, orch, name)
{
}

swss::TableBase *getConsumerTable() const override
{
// ZmqConsumerStateTable is a subclass of TableBase
return static_cast<swss::ZmqConsumerStateTable *>(getSelectable());
}

void execute() override;
void drain() override;
};

typedef std::map<std::string, std::shared_ptr<Executor>> ConsumerMap;

typedef enum
Expand Down Expand Up @@ -293,19 +276,6 @@ class Orch
void addConsumer(swss::DBConnector *db, std::string tableName, int pri = default_orch_pri);
};

class ZmqOrch : public Orch
{
public:
ZmqOrch(swss::DBConnector *db, const std::string tableName, int pri, swss::ZmqServer *zmqServer);
ZmqOrch(swss::DBConnector *db, const std::vector<std::string> &tableNames, swss::ZmqServer *zmqServer);
ZmqOrch(swss::DBConnector *db, const std::vector<table_name_with_pri_t> &tableNameWithPri, swss::ZmqServer *zmqServer);

virtual void doTask(ZmqConsumer &consumer) { };

private:
void addConsumer(swss::DBConnector *db, std::string tableName, int pri, swss::ZmqServer *zmqServer);
};

#include "request_parser.h"

class Orch2 : public Orch
Expand Down
Loading

0 comments on commit 982a341

Please sign in to comment.