Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[orch] change Consumer class to support multiple values for the same key #1184

Merged
merged 4 commits into from
Jan 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cfgmgr/vlanmgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -447,8 +447,9 @@ void VlanMgr::processUntaggedVlanMembers(string vlan, const string &members)
vector<FieldValueTuple> fvVector;
FieldValueTuple t("tagging_mode", "untagged");
fvVector.push_back(t);
consumer.m_toSync[member_key] = make_tuple(member_key, SET_COMMAND, fvVector);
SWSS_LOG_DEBUG("%s", (dumpTuple(consumer, consumer.m_toSync[member_key])).c_str());
KeyOpFieldsValuesTuple tuple = make_tuple(member_key, SET_COMMAND, fvVector);
consumer.addToSync(tuple);
SWSS_LOG_DEBUG("%s", (dumpTuple(consumer, tuple)).c_str());
}
/*
* There is pending task from consumer pipe, in this case just skip it.
Expand Down
82 changes: 62 additions & 20 deletions orchagent/orch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,36 +67,66 @@ vector<Selectable *> Orch::getSelectables()
return selectables;
}

size_t Consumer::addToSync(std::deque<KeyOpFieldsValuesTuple> &entries)
void Consumer::addToSync(const KeyOpFieldsValuesTuple &entry)
{
SWSS_LOG_ENTER();

/* Nothing popped */
if (entries.empty())

string key = kfvKey(entry);
string op = kfvOp(entry);

/* Record incoming tasks */
if (gSwssRecord)
{
return 0;
Orch::recordTuple(*this, entry);
}

for (auto& entry: entries)
/*
* m_toSync is a multimap which will allow one key with multiple values,
* Also, the order of the key-value pairs whose keys compare equivalent
* is the order of insertion and does not change. (since C++11)
*/

/* If a new task comes we directly put it into getConsumerTable().m_toSync map */
if (m_toSync.find(key) == m_toSync.end())
{
string key = kfvKey(entry);
string op = kfvOp(entry);
m_toSync.emplace(key, entry);
}

/* Record incoming tasks */
if (gSwssRecord)
/* if a DEL task comes, we overwrite the old key */
else if (op == DEL_COMMAND)
{
m_toSync.erase(key);
m_toSync.emplace(key, entry);
}
else
{
/*
* Now we are trying to add the key-value with SET.
* We maintain maximun two values per key.
* In case there is one key-value, it should be DEL or SET
* In case there are two key-value pairs, it should be DEL then SET
* The code logic is following:
* We iterate the values with the key, we skip the value with DEL and then
* check if that was the only one (I,E, the iter pointer now points to end or next key),
* in such case, we insert the key-value with SET.
* If there was a SET already (I,E, the pointer still points to the same key), we combine the kfv.
*/
auto ret = m_toSync.equal_range(key);
auto iter = ret.first;
for (; iter != ret.second; ++iter)
{
Orch::recordTuple(*this, entry);
auto old_op = kfvOp(iter->second);
if (old_op == SET_COMMAND)
break;
}

/* If a new task comes or if a DEL task comes, we directly put it into getConsumerTable().m_toSync map */
if (m_toSync.find(key) == m_toSync.end() || op == DEL_COMMAND)
if (iter == ret.second)
{
m_toSync[key] = entry;
m_toSync.emplace(key, entry);
}
/* If an old task is still there, we combine the old task with new task */
else
{
KeyOpFieldsValuesTuple existing_data = m_toSync[key];
KeyOpFieldsValuesTuple existing_data = iter->second;

auto new_values = kfvFieldsValues(entry);
auto existing_values = kfvFieldsValues(existing_data);
Expand All @@ -118,9 +148,21 @@ size_t Consumer::addToSync(std::deque<KeyOpFieldsValuesTuple> &entries)
}
existing_values.push_back(FieldValueTuple(field, value));
}
m_toSync[key] = KeyOpFieldsValuesTuple(key, op, existing_values);
iter->second = KeyOpFieldsValuesTuple(key, op, existing_values);
}
}

}

size_t Consumer::addToSync(const std::deque<KeyOpFieldsValuesTuple> &entries)
{
SWSS_LOG_ENTER();

for (auto& entry: entries)
{
addToSync(entry);
}

return entries.size();
}

Expand Down Expand Up @@ -186,7 +228,7 @@ void Consumer::drain()
m_orch->doTask(*this);
}

string Consumer::dumpTuple(KeyOpFieldsValuesTuple &tuple)
string Consumer::dumpTuple(const KeyOpFieldsValuesTuple &tuple)
{
string s = getTableName() + getConsumerTable()->getTableNameSeparator() + kfvKey(tuple)
+ "|" + kfvOp(tuple);
Expand Down Expand Up @@ -412,7 +454,7 @@ void Orch::logfileReopen()
}
}

void Orch::recordTuple(Consumer &consumer, KeyOpFieldsValuesTuple &tuple)
void Orch::recordTuple(Consumer &consumer, const KeyOpFieldsValuesTuple &tuple)
{
string s = consumer.dumpTuple(tuple);

Expand All @@ -426,7 +468,7 @@ void Orch::recordTuple(Consumer &consumer, KeyOpFieldsValuesTuple &tuple)
}
}

string Orch::dumpTuple(Consumer &consumer, KeyOpFieldsValuesTuple &tuple)
string Orch::dumpTuple(Consumer &consumer, const KeyOpFieldsValuesTuple &tuple)
{
string s = consumer.dumpTuple(tuple);
return s;
Expand Down
17 changes: 11 additions & 6 deletions orchagent/orch.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,11 @@ typedef std::pair<std::string, sai_object_id_t> object_map_pair;

typedef std::map<std::string, object_map*> type_map;
typedef std::pair<std::string, object_map*> type_map_pair;
typedef std::map<std::string, swss::KeyOpFieldsValuesTuple> SyncMap;

// Use multimap to support multiple OpFieldsValues for the same key (e,g, DEL and SET)
// The order of the key-value pairs whose keys compare equivalent is the order of
// insertion and does not change. (since C++11)
typedef std::multimap<std::string, swss::KeyOpFieldsValuesTuple> SyncMap;

typedef std::pair<std::string, int> table_name_with_pri_t;

Expand Down Expand Up @@ -131,7 +135,7 @@ class Consumer : public Executor {
return getConsumerTable()->getDbId();
}

std::string dumpTuple(swss::KeyOpFieldsValuesTuple &tuple);
std::string dumpTuple(const swss::KeyOpFieldsValuesTuple &tuple);
void dumpPendingTasks(std::vector<std::string> &ts);

size_t refillToSync();
Expand All @@ -143,9 +147,10 @@ class Consumer : public Executor {
// TODO: hide?
SyncMap m_toSync;

protected:
void addToSync(const swss::KeyOpFieldsValuesTuple &entry);

// Returns: the number of entries added to m_toSync
size_t addToSync(std::deque<swss::KeyOpFieldsValuesTuple> &entries);
size_t addToSync(const std::deque<swss::KeyOpFieldsValuesTuple> &entries);
zhenggen-xu marked this conversation as resolved.
Show resolved Hide resolved
};

typedef std::map<std::string, std::shared_ptr<Executor>> ConsumerMap;
Expand Down Expand Up @@ -193,14 +198,14 @@ class Orch
virtual void doTask(swss::SelectableTimer &timer) { }

/* TODO: refactor recording */
static void recordTuple(Consumer &consumer, swss::KeyOpFieldsValuesTuple &tuple);
static void recordTuple(Consumer &consumer, const swss::KeyOpFieldsValuesTuple &tuple);

void dumpPendingTasks(std::vector<std::string> &ts);
protected:
ConsumerMap m_consumerMap;

static void logfileReopen();
std::string dumpTuple(Consumer &consumer, swss::KeyOpFieldsValuesTuple &tuple);
std::string dumpTuple(Consumer &consumer, const swss::KeyOpFieldsValuesTuple &tuple);
ref_resolve_status resolveFieldRefValue(type_map&, const std::string&, swss::KeyOpFieldsValuesTuple&, sai_object_id_t&);
bool parseIndexRange(const std::string &input, sai_uint32_t &range_low, sai_uint32_t &range_high);
bool parseReference(type_map &type_maps, std::string &ref, std::string &table_name, std::string &object_name);
Expand Down
4 changes: 2 additions & 2 deletions orchagent/routeorch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ std::string RouteOrch::getLinkLocalEui64Addr(void)

uint8_t eui64_interface_id[EUI64_INTF_ID_LEN];
char ipv6_ll_addr[INET6_ADDRSTRLEN] = {0};

/* Link-local IPv6 address autogenerated by kernel with eui64 interface-id
* derived from the MAC address of the host interface.
*/
Expand Down Expand Up @@ -406,7 +406,7 @@ void RouteOrch::doTask(Consumer& consumer)
vector<FieldValueTuple> v;
key = vrf + i.first.to_string();
auto x = KeyOpFieldsValuesTuple(key, DEL_COMMAND, v);
consumer.m_toSync[key] = x;
consumer.addToSync(x);
}
}
m_resync = true;
Expand Down
1 change: 1 addition & 0 deletions tests/mock_tests/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ LDADD_GTEST = -L/usr/src/gtest
tests_SOURCES = aclorch_ut.cpp \
portsorch_ut.cpp \
saispy_ut.cpp \
consumer_ut.cpp \
ut_saihelper.cpp \
mock_orchagent_main.cpp \
mock_dbconnector.cpp \
Expand Down
61 changes: 5 additions & 56 deletions tests/mock_tests/aclorch_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,54 +23,6 @@ namespace aclorch_test
{
using namespace std;

size_t consumerAddToSync(Consumer *consumer, const deque<KeyOpFieldsValuesTuple> &entries)
{
/* Nothing popped */
if (entries.empty())
{
return 0;
}

for (auto &entry : entries)
{
string key = kfvKey(entry);
string op = kfvOp(entry);

/* If a new task comes or if a DEL task comes, we directly put it into getConsumerTable().m_toSync map */
if (consumer->m_toSync.find(key) == consumer->m_toSync.end() || op == DEL_COMMAND)
{
consumer->m_toSync[key] = entry;
}
/* If an old task is still there, we combine the old task with new task */
else
{
KeyOpFieldsValuesTuple existing_data = consumer->m_toSync[key];

auto new_values = kfvFieldsValues(entry);
auto existing_values = kfvFieldsValues(existing_data);

for (auto it : new_values)
{
string field = fvField(it);
string value = fvValue(it);

auto iu = existing_values.begin();
while (iu != existing_values.end())
{
string ofield = fvField(*iu);
if (field == ofield)
iu = existing_values.erase(iu);
else
iu++;
}
existing_values.push_back(FieldValueTuple(field, value));
}
consumer->m_toSync[key] = KeyOpFieldsValuesTuple(key, op, existing_values);
}
}
return entries.size();
}

struct AclTestBase : public ::testing::Test
{
vector<int32_t *> m_s32list_pool;
Expand Down Expand Up @@ -199,8 +151,7 @@ namespace aclorch_test
auto consumer = unique_ptr<Consumer>(new Consumer(
new swss::ConsumerStateTable(config_db, CFG_ACL_TABLE_TABLE_NAME, 1, 1), m_aclOrch, CFG_ACL_TABLE_TABLE_NAME));

consumerAddToSync(consumer.get(), entries);

consumer->addToSync(entries);
static_cast<Orch *>(m_aclOrch)->doTask(*consumer);
}

Expand All @@ -209,8 +160,7 @@ namespace aclorch_test
auto consumer = unique_ptr<Consumer>(new Consumer(
new swss::ConsumerStateTable(config_db, CFG_ACL_RULE_TABLE_NAME, 1, 1), m_aclOrch, CFG_ACL_RULE_TABLE_NAME));

consumerAddToSync(consumer.get(), entries);

consumer->addToSync(entries);
static_cast<Orch *>(m_aclOrch)->doTask(*consumer);
}

Expand Down Expand Up @@ -381,8 +331,7 @@ namespace aclorch_test
auto consumer = unique_ptr<Consumer>(new Consumer(
new swss::ConsumerStateTable(m_app_db.get(), APP_PORT_TABLE_NAME, 1, 1), gPortsOrch, APP_PORT_TABLE_NAME));

consumerAddToSync(consumer.get(), { { "PortInitDone", EMPTY_PREFIX, { { "", "" } } } });

consumer->addToSync({ { "PortInitDone", EMPTY_PREFIX, { { "", "" } } } });
static_cast<Orch *>(gPortsOrch)->doTask(*consumer.get());
}

Expand Down Expand Up @@ -628,7 +577,7 @@ namespace aclorch_test
// consistency validation with CRM
bool validateResourceCountWithCrm(const AclOrch *aclOrch, CrmOrch *crmOrch)
{
// Verify ACL Tables
// Verify ACL Tables
auto const &resourceMap = Portal::CrmOrchInternal::getResourceMap(crmOrch);
uint32_t crm_acl_table_cnt = 0;
for (auto const &kv : resourceMap.at(CrmResourceType::CRM_ACL_TABLE).countersMap)
Expand All @@ -642,7 +591,7 @@ namespace aclorch_test
<< ") and AclOrch " << Portal::AclOrchInternal::getAclTables(aclOrch).size();
return false;
}


// Verify ACL Rules
//
Expand Down
Loading