Skip to content

Commit

Permalink
[BACKPORT 2.20][#18890] CDCSDK: Forward CreateCDCStream requests for …
Browse files Browse the repository at this point in the history
…CDCSDK streams to YB-Master

Summary:
**Backport Description**
The merge was clean but the revision contains a few more changes that are taken from the https://phorge.dev.yugabyte.com/D30234.

These manual changes are required since in the master branch, we earlier declared the flag as an Autoflag and later changed it to a TEST flag. In this 2.20 branch, we want to start with a TEST flag itself.

The following manual changes were made:
1. xrepl_catalog_manager.cc - Added `ValidateCDCSDKRequestProperties` function. Updated the code to how it was after https://phorge.dev.yugabyte.com/D30234.
2. src/yb/master/master_xrepl-test.cc - Minor conflicts due to the flag names `ysql_yb_enable_replication_commands` (AutoFlag) and `TEST_ysql_yb_enable_replication_commands` (renamed TEST flag)

**Original Description**
Original commit: b1727f8 / D29420
Now that yb-master supports creating CDCSDK streams atomically, forward the CreateCDCStream requests to yb-master.

This change is guarded by the auto flag `ysql_yb_enable_replication_commands` since we need to ensure that yb-master has the upgraded code before forwarding the request to
it.

To be able to do this, this diff temporarily removes the restriction that each CreateCDCStream request for a PGSQL database must also have a replication slot name. This restriction will be added back once we've ensured that YSQL layer is the only client making the requests. This change is needed for the consistent snapshot project which will go before the publication/replication slot API.

Also fixed an issue with the implementation in yb-master. The `last_replication_time` for entries in the cdc_state_table do not need to be populated for CDCSDK streams as that is used to know whether a client has started streaming or not. It affects our handling of tablet splits.

**Upgrade/Rollback safety:**
This change adds a new error code field to an enum.

This does not need an autoflag since this is a yb-admin - cdc service RPC. Reference ("When do I not need an AutoFlag?" section): https://docs.google.com/document/d/1aFM0NPimXyrFoTGnnspjaaxZdeRBESG332zwpFsPh3A/edit#heading=h.cx5lth8w95ye
Jira: DB-7751

Test Plan:
Jenkins: test regex: .*CDCSDK.*

New test

```
./yb_build.sh --cxx-test cdcsdk_ysql-test --gtest_filter "*TestCDCStreamCreationDisabledDuringUpgrade*"
./yb_build.sh --cxx-test master_xrepl-test --gtest_filter MasterTestXRepl.TestCreateCDCStreamForNamespaceDisabled
```

Existing CDCSDK tests

Reviewers: skumar, asrinivasan, hsunder, xCluster

Reviewed By: skumar

Subscribers: bogdan, ycdcxcluster, ybase

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D31022
  • Loading branch information
dr0pdb committed Dec 14, 2023
1 parent f3c7647 commit 4c48938
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 222 deletions.
88 changes: 9 additions & 79 deletions src/yb/cdc/cdc_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ DECLARE_int32(rpc_workers_limit);
DECLARE_int64(cdc_intent_retention_ms);
DECLARE_bool(enable_xcluster_auto_flag_validation);

DECLARE_bool(TEST_ysql_yb_enable_replication_commands);

METRIC_DEFINE_entity(cdc);

METRIC_DEFINE_entity(cdcsdk);
Expand Down Expand Up @@ -971,87 +973,15 @@ Status CDCServiceImpl::CreateCDCStreamForNamespace(
// Generate a stream id by calling CreateCDCStream, and also setup the stream in the master.
std::unordered_map<std::string, std::string> options = GetCreateCDCStreamOptions(req);

// Filter out tables with PK
master::NamespaceIdentifierPB ns_identifier;
ns_identifier.set_id(ns_id);
auto table_list = VERIFY_RESULT_OR_SET_CODE(
client()->ListUserTables(ns_identifier), CDCError(CDCErrorPB::INTERNAL_ERROR));
std::vector<client::YBTableName> required_tables;
for (const auto& table_iter : table_list) {
std::shared_ptr<client::YBTable> table;

RETURN_NOT_OK_SET_CODE(
client()->OpenTable(table_iter.table_id(), &table), CDCError(CDCErrorPB::TABLE_NOT_FOUND));

// internally if any of the table doesn't have a primary key, then do not create
// a CDC stream ID for that table
if (!YsqlTableHasPrimaryKey(table->schema())) {
LOG(WARNING) << "Skipping CDC stream creation on " << table->name().table_name()
<< " because it does not have a primary key";
continue;
}

// We don't allow CDC on YEDIS and tables without a primary key.
if (req->record_format() != CDCRecordFormat::WAL) {
RETURN_NOT_OK_SET_CODE(CheckCdcCompatibility(table), CDCError(CDCErrorPB::INVALID_REQUEST));
}

required_tables.push_back(table_iter);
}

bool set_active = required_tables.empty();
// Forward request to master directly since we support creating CDCSDK stream for a namespace
// atomically in master now.
// If FLAGS_TEST_ysql_yb_enable_replication_commands, populate the namespace id in the newly added
// namespace_id field, otherwise use the table_id as done before.
bool populate_namespace_id_as_table_id = !FLAGS_TEST_ysql_yb_enable_replication_commands;
xrepl::StreamId db_stream_id = VERIFY_RESULT_OR_SET_CODE(
client()->CreateCDCStream(ns_id, options, set_active), CDCError(CDCErrorPB::INTERNAL_ERROR));

options.erase(kIdType);

std::vector<TableId> table_ids;
std::vector<xrepl::StreamId> stream_ids;
std::vector<CDCStateTableEntry> entries_to_insert;

for (const auto& table_iter : required_tables) {
// We only change the stream's state to "ACTIVE", while we are inserting the last table for the
// stream.
bool set_active = table_iter == required_tables.back();
const xrepl::StreamId stream_id = VERIFY_RESULT_OR_SET_CODE(
client()->CreateCDCStream(table_iter.table_id(), options, set_active, db_stream_id),
CDCError(CDCErrorPB::INTERNAL_ERROR));

creation_state.created_cdc_streams.push_back(stream_id);

google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets;
RETURN_NOT_OK_SET_CODE(
client()->GetTabletsFromTableId(table_iter.table_id(), 0, &tablets),
CDCError(CDCErrorPB::TABLE_NOT_FOUND));

// For each tablet, create a row in cdc_state table containing the generated stream id, and
// the op id as max in the logs.
for (const auto& tablet : tablets) {
InitNewTabletStreamEntry(
db_stream_id,
tablet.tablet_id(),
&creation_state.producer_entries_modified,
&entries_to_insert);
}
stream_ids.push_back(std::move(stream_id));
table_ids.push_back(table_iter.table_id());
}

// Add stream to cache.
AddStreamMetadataToCache(
db_stream_id,
std::make_shared<StreamMetadata>(
ns_id, table_ids, req->record_type(), req->record_format(), req->source_type(),
req->checkpoint_type(), StreamModeTransactional(req->transactional())));

RETURN_NOT_OK_SET_CODE(
cdc_state_table_->InsertEntries(entries_to_insert), CDCError(CDCErrorPB::INTERNAL_ERROR));

client()->CreateCDCSDKStreamForNamespace(ns_id, options, populate_namespace_id_as_table_id),
CDCError(CDCErrorPB::INTERNAL_ERROR));
resp->set_db_stream_id(db_stream_id.ToString());

// Clear creation_state so no changes are reversed by scope_exit since we succeeded.
creation_state.Clear();

return Status::OK();
}

Expand Down
28 changes: 28 additions & 0 deletions src/yb/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1448,6 +1448,34 @@ void YBClient::CreateCDCStream(
data_->CreateCDCStream(this, table_id, options, transactional, deadline, callback);
}

Result<xrepl::StreamId> YBClient::CreateCDCSDKStreamForNamespace(
const NamespaceId& namespace_id,
const std::unordered_map<std::string, std::string>& options,
bool populate_namespace_id_as_table_id,
const ReplicationSlotName& replication_slot_name) {
CreateCDCStreamRequestPB req;

if (populate_namespace_id_as_table_id) {
req.set_table_id(namespace_id);
} else {
req.set_namespace_id(namespace_id);
}

req.mutable_options()->Reserve(narrow_cast<int>(options.size()));
for (const auto& option : options) {
auto new_option = req.add_options();
new_option->set_key(option.first);
new_option->set_value(option.second);
}
if (!replication_slot_name.empty()) {
req.set_cdcsdk_ysql_replication_slot_name(replication_slot_name.ToString());
}

CreateCDCStreamResponsePB resp;
CALL_SYNC_LEADER_MASTER_RPC_EX(Replication, req, resp, CreateCDCStream);
return xrepl::StreamId::FromString(resp.stream_id());
}

Status YBClient::GetCDCStream(
const xrepl::StreamId& stream_id,
NamespaceId* ns_id,
Expand Down
5 changes: 5 additions & 0 deletions src/yb/client/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,11 @@ class YBClient {
cdc::StreamModeTransactional transactional,
CreateCDCStreamCallback callback);

Result<xrepl::StreamId> CreateCDCSDKStreamForNamespace(
const NamespaceId& namespace_id, const std::unordered_map<std::string, std::string>& options,
bool populate_namespace_id_as_table_id = false,
const ReplicationSlotName& replication_slot_name = ReplicationSlotName(""));

// Delete multiple CDC streams.
Status DeleteCDCStream(
const std::vector<xrepl::StreamId>& streams,
Expand Down
4 changes: 4 additions & 0 deletions src/yb/integration-tests/cdcsdk_test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ DECLARE_bool(hide_pg_catalog_table_creation_logs);
DECLARE_bool(master_auto_run_initdb);
DECLARE_int32(pggate_rpc_timeout_secs);
DECLARE_bool(cdc_populate_safepoint_record);
DECLARE_bool(TEST_ysql_yb_enable_replication_commands);

namespace yb {
using client::YBClient;
Expand Down Expand Up @@ -94,6 +95,9 @@ class CDCSDKTestBase : public YBTest {

ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_check_broadcast_address) = false;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_flush_rocksdb_on_shutdown) = false;

ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_ysql_yb_enable_replication_commands) = true;

google::SetVLOGLevel("cdc*", 4);
}

Expand Down
17 changes: 17 additions & 0 deletions src/yb/integration-tests/cdcsdk_ysql-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7057,5 +7057,22 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestUnrelatedTableDropUponTserver
GetChangesFromCDCWithoutRetry(stream_id, tablets, &change_resp_2.cdc_sdk_checkpoint()));
}

void TestStreamCreationViaCDCService(CDCSDKYsqlTest* test_class, bool enable_replication_commands) {
ASSERT_OK(test_class->SetUpWithParams(
/*replication_factor=*/3, /*num_masters=*/1, /*colocated=*/false));
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_ysql_yb_enable_replication_commands) =
enable_replication_commands;

ASSERT_OK(test_class->CreateDBStream());
}

TEST_F(CDCSDKYsqlTest, TestCDCStreamCreationViaCDCServiceWithReplicationCommandsEnabled) {
TestStreamCreationViaCDCService(this, /* enable_replication_commands */ true);
}

TEST_F(CDCSDKYsqlTest, TestCDCStreamCreationViaCDCServiceWithReplicationCommandsDisabled) {
TestStreamCreationViaCDCService(this, /* enable_replication_commands */ false);
}

} // namespace cdc
} // namespace yb
10 changes: 4 additions & 6 deletions src/yb/master/catalog_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,6 @@ constexpr int32_t kInvalidClusterConfigVersion = 0;

YB_DEFINE_ENUM(
CreateNewCDCStreamMode,
// Only populate the namespace_id. It is only used by CDCSDK while creating a stream from
// cdc_service. The caller is expected to populate table_ids in subsequent requests.
// This should not be needed after we tackle #18890.
(kNamespaceId)
// Only populate the table_id. It is only used by xCluster.
(kXClusterTableIds)
// Populate the namespace_id and a list of table ids. It is only used by CDCSDK.
Expand Down Expand Up @@ -1409,6 +1405,10 @@ class CatalogManager : public tserver::TabletPeerLookupIf,
const google::protobuf::RepeatedPtrField<std::string>& table_ids, const NamespaceId& ns_id)
REQUIRES(mutex_);

Status ValidateCDCSDKRequestProperties(
const CreateCDCStreamRequestPB& req, const std::string& source_type_option_value,
const std::string& id_type_option_value);

// Process the newly created tables that are relevant to existing CDCSDK streams.
Status ProcessNewTablesForCDCSDKStreams(
const TableStreamIdsMap& table_to_unprocessed_streams_map, const LeaderEpoch& epoch);
Expand Down Expand Up @@ -2673,8 +2673,6 @@ class CatalogManager : public tserver::TabletPeerLookupIf,
const std::vector<TableId>& table_ids, const std::optional<const NamespaceId>& namespace_id,
CreateCDCStreamResponsePB* resp, const LeaderEpoch& epoch);

Status AddTableIdToCDCStream(const CreateCDCStreamRequestPB& req) EXCLUDES(mutex_);

Status SetWalRetentionForTable(
const TableId& table_id, rpc::RpcContext* rpc, const LeaderEpoch& epoch);
Status BackfillMetadataForCDC(
Expand Down
44 changes: 22 additions & 22 deletions src/yb/master/master_xrepl-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,9 @@ TEST_F(MasterTestXRepl, TestCreateCDCStreamForNamespaceCql) {
ASSERT_EQ(1, list_resp.streams_size());
}

TEST_F(MasterTestXRepl, TestCreateCDCStreamForNamespaceInvalidDuplicationSlotName) {
TEST_F(MasterTestXRepl, TestCreateCDCStreamForNamespaceDisabled) {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_ysql_yb_enable_replication_commands) = false;

CreateNamespaceResponsePB create_namespace_resp;
ASSERT_OK(CreatePgsqlNamespace(kNamespaceName, kPgsqlNamespaceId, &create_namespace_resp));
auto ns_id = create_namespace_resp.id();
Expand All @@ -394,60 +396,58 @@ TEST_F(MasterTestXRepl, TestCreateCDCStreamForNamespaceInvalidDuplicationSlotNam
ASSERT_OK(CreatePgsqlTable(ns_id, Format("cdc_table_$0", i), kTableIds[i], kTableSchema));
}

ASSERT_RESULT(CreateCDCStreamForNamespace(ns_id, kPgReplicationSlotName));

CreateCDCStreamRequestPB req;
CreateCDCStreamResponsePB resp;
req.set_namespace_id(ns_id);
req.set_cdcsdk_ysql_replication_slot_name(kPgReplicationSlotName); // Use the same name again.
req.set_cdcsdk_ysql_replication_slot_name(kPgReplicationSlotName);
AddKeyValueToCreateCDCStreamRequestOption(&req, cdc::kIdType, cdc::kNamespaceId);
AddKeyValueToCreateCDCStreamRequestOption(
&req, cdc::kSourceType, CDCRequestSource_Name(cdc::CDCRequestSource::CDCSDK));

ASSERT_OK(proxy_replication_->CreateCDCStream(req, &resp, ResetAndGetController()));
SCOPED_TRACE(resp.DebugString());
ASSERT_TRUE(resp.has_error());
ASSERT_EQ(MasterErrorPB::OBJECT_ALREADY_PRESENT, resp.error().code());
ASSERT_NE(
resp.error().status().message().find(
"CDC stream with the given replication slot name already exists"),
"Creation of CDCSDK stream with a replication slot name is disallowed"),
std::string::npos)
<< resp.error().status().message();

auto list_resp = ASSERT_RESULT(ListCDCStreams());
ASSERT_EQ(1, list_resp.streams_size());
}

TEST_F(MasterTestXRepl, TestCreateCDCStreamForNamespaceInvalidIdTypeOption) {
TEST_F(MasterTestXRepl, TestCreateCDCStreamForNamespaceInvalidDuplicationSlotName) {
CreateNamespaceResponsePB create_namespace_resp;
CreateCDCStreamRequestPB req;
CreateCDCStreamResponsePB resp;

ASSERT_OK(CreatePgsqlNamespace(kNamespaceName, kPgsqlNamespaceId, &create_namespace_resp));
auto ns_id = create_namespace_resp.id();

for (auto i = 0; i < num_tables; ++i) {
ASSERT_OK(CreatePgsqlTable(ns_id, Format("cdc_table_$0", i), kTableIds[i], kTableSchema));
}

ASSERT_RESULT(CreateCDCStreamForNamespace(ns_id, kPgReplicationSlotName));

// Not setting kIdType option, treated as kTableId by default.
CreateCDCStreamRequestPB req;
CreateCDCStreamResponsePB resp;
req.set_namespace_id(ns_id);
req.set_cdcsdk_ysql_replication_slot_name(kPgReplicationSlotName);
req.set_cdcsdk_ysql_replication_slot_name(kPgReplicationSlotName); // Use the same name again.
AddKeyValueToCreateCDCStreamRequestOption(&req, cdc::kIdType, cdc::kNamespaceId);
AddKeyValueToCreateCDCStreamRequestOption(
&req, cdc::kSourceType, CDCRequestSource_Name(cdc::CDCRequestSource::CDCSDK));

ASSERT_OK(proxy_replication_->CreateCDCStream(req, &resp, ResetAndGetController()));
SCOPED_TRACE(resp.DebugString());
ASSERT_TRUE(resp.has_error());
ASSERT_EQ(MasterErrorPB::INVALID_REQUEST, resp.error().code());
ASSERT_EQ(MasterErrorPB::OBJECT_ALREADY_PRESENT, resp.error().code());
ASSERT_NE(
resp.error().status().message().find(
"Invalid id_type in options. Expected to be NAMESPACEID"),
"CDC stream with the given replication slot name already exists"),
std::string::npos)
<< resp.error().status().message();

auto list_resp = ASSERT_RESULT(ListCDCStreams());
ASSERT_EQ(0, list_resp.streams_size());
ASSERT_EQ(1, list_resp.streams_size());
}

TEST_F(MasterTestXRepl, TestCreateCDCStreamForNamespaceMissingReplicationSlotName) {
TEST_F(MasterTestXRepl, TestCreateCDCStreamForNamespaceInvalidIdTypeOption) {
CreateNamespaceResponsePB create_namespace_resp;
CreateCDCStreamRequestPB req;
CreateCDCStreamResponsePB resp;
Expand All @@ -456,9 +456,9 @@ TEST_F(MasterTestXRepl, TestCreateCDCStreamForNamespaceMissingReplicationSlotNam
auto ns_id = create_namespace_resp.id();


// Not populating cdcsdk_ysql_replication_slot_name.
// Not setting kIdType option, treated as kTableId by default.
req.set_namespace_id(ns_id);
AddKeyValueToCreateCDCStreamRequestOption(&req, cdc::kIdType, cdc::kNamespaceId);
req.set_cdcsdk_ysql_replication_slot_name(kPgReplicationSlotName);
AddKeyValueToCreateCDCStreamRequestOption(
&req, cdc::kSourceType, CDCRequestSource_Name(cdc::CDCRequestSource::CDCSDK));

Expand All @@ -468,7 +468,7 @@ TEST_F(MasterTestXRepl, TestCreateCDCStreamForNamespaceMissingReplicationSlotNam
ASSERT_EQ(MasterErrorPB::INVALID_REQUEST, resp.error().code());
ASSERT_NE(
resp.error().status().message().find(
"cdcsdk_ysql_replication_slot_name is required for YSQL databases"),
"Invalid id_type in options. Expected to be NAMESPACEID"),
std::string::npos)
<< resp.error().status().message();

Expand Down
Loading

0 comments on commit 4c48938

Please sign in to comment.