From f3c7647dee91e5548234ef862c4ca2d100b3fac8 Mon Sep 17 00:00:00 2001 From: Vaibhav Kushwaha Date: Wed, 13 Dec 2023 11:25:49 +0530 Subject: [PATCH] [BACKPORT 2.20][#18479] CDCSDK: Report tablet split as soon as we detect it Summary: Original commit: c30499b8b62645bc113a5a721c78ddfd1fb7bfe8 / D28415 For CDCSDK, whenever a tablet split occurs, it is reported to the client as soon as we read the WAL records and hit a `SPLIT_OP`, the client then retrieves the children tablets and polls on them. This diff changes the mechanism so that now we check in every `GetChanges` call whether a tablet has been split, if it has we communicate the split to the client as soon as we detect the tablet being polled is split. Now when the children tablets are being polled, we can have duplicate data since the WAL is duplicated. To overcome that, while iterating over the records in the WAL, we retrieve the hash_key of the key being read and check if it is within the bounds of the tablet being polled, if it is then we stream the record, otherwise we ignore. **Summary of changes in this diff:** 1. Changed the time at which split is reported to client. Earlier it was reported when we hit the `SPLIT_OP` in WAL, now we report it as soon as we detect it on the tablet. 2. Modified the logic to get the children tablets using `tablet_peer` rather than iterating over the complete list. 3. Added and modified tests to comply with the new changes. **2.20 specific changes:** Few conflicts were there while cherry picking the original commit because of D30724, they were resolved in the files `cdcsdk_tablet_split-test` and `cdcsdk_consistent_stream-test`. Jira: DB-7447 Test Plan: Run tests to ensure no regression. Reviewers: skumar, asrinivasan, stiwary, jhe Reviewed By: skumar Subscribers: ycdcxcluster Tags: #jenkins-ready Differential Revision: https://phorge.dev.yugabyte.com/D30999 --- .../java/org/yb/cdc/ysql/Test3IntCol.java | 2 - src/yb/cdc/cdc_service.cc | 34 +- src/yb/cdc/cdcsdk_producer.cc | 63 +++- .../cdcsdk_consistent_stream-test.cc | 59 +--- .../cdcsdk_tablet_split-test.cc | 331 +++++++++++------- .../cdcsdk_ysql_test_base.cc | 162 ++++++++- .../integration-tests/cdcsdk_ysql_test_base.h | 18 +- 7 files changed, 460 insertions(+), 209 deletions(-) diff --git a/java/yb-cdc/src/test/java/org/yb/cdc/ysql/Test3IntCol.java b/java/yb-cdc/src/test/java/org/yb/cdc/ysql/Test3IntCol.java index 1a04b8a57503..f5d7b149469a 100644 --- a/java/yb-cdc/src/test/java/org/yb/cdc/ysql/Test3IntCol.java +++ b/java/yb-cdc/src/test/java/org/yb/cdc/ysql/Test3IntCol.java @@ -192,8 +192,6 @@ public void testLongRunningScript() { // begin; // update test set b=b+1, c=c+1 where a=1; // end; - new ExpectedRecord3Proto(-1, -1, -1, CdcService.RowMessage.Op.BEGIN), - new ExpectedRecord3Proto(-1, -1, -1, CdcService.RowMessage.Op.COMMIT), // begin; // insert into test values(11,12,13); diff --git a/src/yb/cdc/cdc_service.cc b/src/yb/cdc/cdc_service.cc index 766fbe882868..49eb9e595e39 100644 --- a/src/yb/cdc/cdc_service.cc +++ b/src/yb/cdc/cdc_service.cc @@ -530,26 +530,27 @@ class CDCServiceImpl::Impl { Status AddEntriesForChildrenTabletsOnSplitOp( const ProducerTabletInfo& info, - const std::array& tablets, - const OpId& split_op_id) { + const std::array& tablets, + const OpId& children_op_id) { std::lock_guard l(mutex_); + for (const auto& tablet : tablets) { ProducerTabletInfo producer_info{ - info.replication_group_id, info.stream_id, tablet->tablet_id()}; + info.replication_group_id, info.stream_id, tablet}; tablet_checkpoints_.emplace(TabletCheckpointInfo{ .producer_tablet_info = producer_info, .cdc_state_checkpoint = TabletCheckpoint{ - .op_id = split_op_id, .last_update_time = {}, .last_active_time = {}}, + .op_id = children_op_id, .last_update_time = {}, .last_active_time = {}}, .sent_checkpoint = TabletCheckpoint{ - .op_id = split_op_id, .last_update_time = {}, .last_active_time = {}}, + .op_id = children_op_id, .last_update_time = {}, .last_active_time = {}}, .mem_tracker = nullptr, }); cdc_state_metadata_.emplace(CDCStateMetadataInfo{ .producer_tablet_info = producer_info, .commit_timestamp = {}, - .last_streamed_op_id = split_op_id, + .last_streamed_op_id = children_op_id, .schema_details_map = {}, .mem_tracker = nullptr, }); @@ -1755,6 +1756,8 @@ void CDCServiceImpl::GetChanges( } // This specific error indicates that a tablet split occured on the tablet. if (status.IsTabletSplit()) { + LOG(INFO) << "Updating children tablets on detected split on tablet " + << producer_tablet.tablet_id; status = UpdateChildrenTabletsOnSplitOpForCDCSDK(producer_tablet); RPC_STATUS_RETURN_ERROR(status, resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context); @@ -3980,13 +3983,15 @@ void CDCServiceImpl::IsBootstrapRequired( Status CDCServiceImpl::UpdateChildrenTabletsOnSplitOpForCDCSDK(const ProducerTabletInfo& info) { auto tablets = VERIFY_RESULT(GetTablets(info.stream_id, true /* ignore_errors */)); + + // Initializing the children to 0.0 to prevent garbage collection on them. const OpId& children_op_id = OpId(); - std::array children_tablets; + std::array children; uint found_children = 0; for (auto const& tablet : tablets) { if (tablet.has_split_parent_tablet_id() && tablet.split_parent_tablet_id() == info.tablet_id) { - children_tablets[found_children] = &tablet; + children[found_children] = tablet.tablet_id(); found_children += 1; if (found_children == 2) { @@ -3994,15 +3999,16 @@ Status CDCServiceImpl::UpdateChildrenTabletsOnSplitOpForCDCSDK(const ProducerTab } } } - LOG_IF(DFATAL, found_children != 2) - << "Could not find the two split children for the tablet: " << info.tablet_id; - // Add the entries for the children tablets in 'cdc_state_metadata_' and 'tablet_checkpoints_'. + RSTATUS_DCHECK( + found_children == 2, InternalError, + Format("Could not find the two split children for tablet: $0", info.tablet_id)); + RETURN_NOT_OK_SET_CODE( - impl_->AddEntriesForChildrenTabletsOnSplitOp(info, children_tablets, children_op_id), + impl_->AddEntriesForChildrenTabletsOnSplitOp(info, children, children_op_id), CDCError(CDCErrorPB::INTERNAL_ERROR)); - VLOG(1) << "Added entries for children tablets: " << children_tablets[0]->tablet_id() << " and " - << children_tablets[1]->tablet_id() << ", of parent tablet: " << info.tablet_id + VLOG(1) << "Added entries for children tablets: " << children[0] << " and " + << children[1] << ", of parent tablet: " << info.tablet_id << ", to 'cdc_state_metadata_' and 'tablet_checkpoints_'"; return Status::OK(); diff --git a/src/yb/cdc/cdcsdk_producer.cc b/src/yb/cdc/cdcsdk_producer.cc index e5c691e74e64..41994ed706aa 100644 --- a/src/yb/cdc/cdcsdk_producer.cc +++ b/src/yb/cdc/cdcsdk_producer.cc @@ -43,10 +43,14 @@ #include "yb/tablet/tablet_metadata.h" #include "yb/tablet/tablet_peer.h" #include "yb/tablet/tablet.h" +#include "yb/tablet/tablet_types.pb.h" #include "yb/tablet/transaction_participant.h" #include "yb/util/flags.h" #include "yb/util/logging.h" +#include "yb/util/opid.h" +#include "yb/util/status.h" +#include "yb/util/status_format.h" using std::string; @@ -1050,6 +1054,8 @@ Status PopulateCDCSDKWriteRecord( // We'll use DocDB key hash to identify the records that belong to the same row. Slice prev_key; + uint32_t records_added = 0; + bool colocated = tablet_ptr->metadata()->colocated(); Schema schema = Schema(); SchemaVersion schema_version = std::numeric_limits::max(); @@ -1088,6 +1094,16 @@ Status PopulateCDCSDKWriteRecord( row_message->op() == RowMessage_Op_UPDATE)) { Slice sub_doc_key = key; dockv::SubDocKey decoded_key; + + // With tablet splits we will end up reading records from this tablet's ancestors - + // only process records that are in this tablet's key range. + const auto& key_bounds = tablet_ptr->key_bounds(); + if (!key_bounds.IsWithinBounds(key)) { + VLOG(1) << "Key for the read record is not within tablet bounds, skipping the key: " + << primary_key.data(); + continue; + } + RETURN_NOT_OK(decoded_key.DecodeFrom(&sub_doc_key, dockv::HybridTimeRequired::kFalse)); if (colocated) { colocation_id = decoded_key.doc_key().colocation_id(); @@ -1135,6 +1151,7 @@ Status PopulateCDCSDKWriteRecord( // Write pair contains record for different row. Create a new CDCRecord in this case. proto_record = resp->add_cdc_sdk_proto_records(); + ++records_added; row_message = proto_record->mutable_row_message(); modified_columns.clear(); row_message->set_pgschema_name(schema.SchemaName()); @@ -1267,6 +1284,14 @@ Status PopulateCDCSDKWriteRecord( } if (FLAGS_cdc_populate_end_markers_transactions) { + // If there are no records added, we do not need to populate the begin-commit block + // and we should return from here. + if (records_added == 0 && !resp->mutable_cdc_sdk_proto_records()->empty()) { + VLOG(2) << "Removing the added BEGIN record because there are no other records to add"; + resp->mutable_cdc_sdk_proto_records()->RemoveLast(); + return Status::OK(); + } + FillCommitRecordForSingleShardTransaction( OpId(msg->id().term(), msg->id().index()), tablet_peer, resp, msg->hybrid_time()); } @@ -2162,7 +2187,6 @@ Status GetChangesForCDCSDK( // previously declared 'checkpoint' or the 'from_op_id'. bool checkpoint_updated = false; bool report_tablet_split = false; - OpId split_op_id = OpId::Invalid(); bool snapshot_operation = false; bool pending_intents = false; int wal_segment_index = GetWalSegmentIndex(wal_segment_index_req); @@ -2278,6 +2302,16 @@ Status GetChangesForCDCSDK( bool saw_non_actionable_message = false; std::unordered_set streamed_txns; + if (tablet_ptr->metadata()->tablet_data_state() == tablet::TABLET_DATA_SPLIT_COMPLETED) { + // This indicates that the tablet being polled has been split and in this case we should + // tell the client immediately about the split. + LOG(INFO) << "Tablet split detected for tablet " << tablet_id + << ", moving to children tablets immediately"; + + return STATUS_FORMAT( + TabletSplit, "Tablet split detected on $0", tablet_id); + } + // It's possible that a batch of messages in read_ops after fetching from // 'ReadReplicatedMessagesForCDC' , will not have any actionable messages. In which case we // keep retrying by fetching the next batch, until either we get an actionable message or reach @@ -2543,7 +2577,8 @@ Status GetChangesForCDCSDK( saw_split_op = true; // We first verify if a split has indeed occured succesfully by checking if there are - // two children tablets for the tablet. + // two children tablets for the tablet. This check also verifies if the SPLIT_OP + // belongs to the current tablet if (!(VerifyTabletSplitOnParentTablet(table_id, tablet_id, client))) { // We could verify the tablet split succeeded. This is possible when the child tablets // of a split are not running yet. @@ -2566,8 +2601,7 @@ Status GetChangesForCDCSDK( } else { // If 'GetChangesForCDCSDK' was called with the OpId just before the SplitOp's // record, and if there is no more data to stream and we can notify the client - // about the split and update the checkpoint. At this point, we will store the - // split_op_id. + // about the split and update the checkpoint. LOG(INFO) << "Found SPLIT_OP record with OpId: " << op_id << ", for parent tablet: " << tablet_id << ", and if we did not see any other records we will report the tablet " @@ -2578,7 +2612,7 @@ Status GetChangesForCDCSDK( &next_checkpoint_index, all_checkpoints, &checkpoint, last_streamed_op_id, &safe_hybrid_time_resp, &wal_segment_index); checkpoint_updated = true; - split_op_id = op_id; + report_tablet_split = true; } } } break; @@ -2622,12 +2656,14 @@ Status GetChangesForCDCSDK( } } - // If the split_op_id is equal to the checkpoint i.e the OpId of the last actionable message, we - // know that after the split there are no more actionable messages, and this confirms that the - // SPLIT OP was succesfull. - if (!snapshot_operation && split_op_id.term == checkpoint.term() && - split_op_id.index == checkpoint.index()) { - report_tablet_split = true; + // If the GetChanges call is not for snapshot and then we know that a split has indeed been + // successful then we should report the split to the client. + if (!snapshot_operation && report_tablet_split) { + LOG(INFO) << "Tablet split detected for tablet " << tablet_id + << ", moving to children tablets immediately"; + return STATUS_FORMAT( + TabletSplit, "Tablet split detected on $0", tablet_id + ); } if (consumption) { @@ -2657,11 +2693,6 @@ Status GetChangesForCDCSDK( last_streamed_op_id->ToPB(resp->mutable_checkpoint()->mutable_op_id()); } - if (report_tablet_split) { - return STATUS_FORMAT( - TabletSplit, "Tablet Split on tablet: $0, no more records to stream", tablet_id); - } - // We do not populate SAFEPOINT records in two scenarios: // 1. When we are streaming batches of a large transaction // 2. When we are streaming snapshot records diff --git a/src/yb/integration-tests/cdcsdk_consistent_stream-test.cc b/src/yb/integration-tests/cdcsdk_consistent_stream-test.cc index f4c71ac552e6..d938ff655c38 100644 --- a/src/yb/integration-tests/cdcsdk_consistent_stream-test.cc +++ b/src/yb/integration-tests/cdcsdk_consistent_stream-test.cc @@ -10,6 +10,7 @@ // or implied. See the License for the specific language governing permissions and limitations // under the License. +#include "yb/cdc/cdc_service.pb.h" #include "yb/integration-tests/cdcsdk_ysql_test_base.h" #include "yb/util/test_macros.h" @@ -618,57 +619,19 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKConsistentStreamWithTab ASSERT_OK(test_client()->GetTablets(table, 0, &tablets_after_first_split, nullptr)); ASSERT_EQ(tablets_after_first_split.size(), 2); - // The count array stores counts of DDL, INSERT, UPDATE, DELETE, READ, TRUNCATE, BEGIN, COMMIT in - // that order. - const int expected_count_1[] = { - 1, - 2 * num_batches * inserts_per_batch, - 0, - 0, - 0, - 0, - 2 * num_batches + num_batches * inserts_per_batch, - 2 * num_batches + num_batches * inserts_per_batch, - }; - const int expected_count_2[] = {3, 4 * num_batches * inserts_per_batch, 0, 0, 0, 0}; - int count[] = {0, 0, 0, 0, 0, 0, 0, 0}; + const int64 expected_total_records = 4 * num_batches * inserts_per_batch; - auto parent_get_changes = GetAllPendingChangesFromCdc(stream_id, tablets); - for (size_t i = 0; i < parent_get_changes.records.size(); i++) { - auto record = parent_get_changes.records[i]; - UpdateRecordCount(record, count); - } + std::map tablet_to_checkpoint; + std::map> records_by_tablet; + int64 total_received_records = ASSERT_RESULT(GetChangeRecordCount( + stream_id, table, tablets, tablet_to_checkpoint, expected_total_records, + false /* explicit_checkpointing */, records_by_tablet)); - CheckRecordsConsistency(parent_get_changes.records); - for (int i = 0; i < 8; i++) { - ASSERT_EQ(expected_count_1[i], count[i]); - } - - // Wait until the 'cdc_parent_tablet_deletion_task_' has run. - SleepFor(MonoDelta::FromSeconds(2)); - - auto get_tablets_resp = - ASSERT_RESULT(GetTabletListToPollForCDC(stream_id, table_id, tablets[0].tablet_id())); - for (const auto& tablet_checkpoint_pair : get_tablets_resp.tablet_checkpoint_pairs()) { - auto new_tablet = tablet_checkpoint_pair.tablet_locations(); - auto new_checkpoint = tablet_checkpoint_pair.cdc_sdk_checkpoint(); - - google::protobuf::RepeatedPtrField tablets; - auto tablet_ptr = tablets.Add(); - tablet_ptr->CopyFrom(new_tablet); - - auto child_get_changes = GetAllPendingChangesFromCdc(stream_id, tablets, &new_checkpoint); - vector child_plus_parent = parent_get_changes.records; - for (size_t i = 0; i < child_get_changes.records.size(); i++) { - auto record = child_get_changes.records[i]; - child_plus_parent.push_back(record); - UpdateRecordCount(record, count); - } - CheckRecordsConsistency(child_plus_parent); - } + ASSERT_EQ(expected_total_records, total_received_records); - for (int i = 0; i < 6; i++) { - ASSERT_EQ(expected_count_2[i], count[i]); + for (auto iter = records_by_tablet.begin(); iter != records_by_tablet.end(); ++iter) { + LOG(INFO) << "Checking records consistency for tablet " << iter->first; + CheckRecordsConsistency(iter->second); } } diff --git a/src/yb/integration-tests/cdcsdk_tablet_split-test.cc b/src/yb/integration-tests/cdcsdk_tablet_split-test.cc index b5a53bf9f07b..05261a3eb6e1 100644 --- a/src/yb/integration-tests/cdcsdk_tablet_split-test.cc +++ b/src/yb/integration-tests/cdcsdk_tablet_split-test.cc @@ -11,9 +11,14 @@ // under the License. #include "yb/cdc/cdc_service.pb.h" +#include "yb/gutil/dynamic_annotations.h" +#include "yb/gutil/integral_types.h" #include "yb/integration-tests/cdcsdk_ysql_test_base.h" #include "yb/cdc/cdc_state_table.h" +#include "yb/util/monotime.h" +#include "yb/util/result.h" +#include "yb/util/test_macros.h" namespace yb { namespace cdc { @@ -284,14 +289,12 @@ TEST_F( new_checkpoint.set_index(data.op_id.index); // Initiate a tablet split request, since there are around 5000 rows in the table/ tablet, it will - // take some time for the child tablets to be in tunning state. + // take some time for the child tablets to be in running state. ASSERT_OK(SplitTablet(tablets.Get(0).tablet_id(), &test_cluster_)); - // Verify that we did not get the tablet split error in the first 'GetChanges' call - auto change_resp = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets, &new_checkpoint)); - // Keep calling 'GetChange' until we get an error for the tablet split, this will only happen // after both the child tablets are in running state. + GetChangesResponsePB change_resp; ASSERT_OK(WaitFor( [&]() -> Result { auto result = GetChangesFromCDC(stream_id, tablets, &change_resp.cdc_sdk_checkpoint()); @@ -345,14 +348,24 @@ TEST_F( // We must still be able to get the remaining records from the parent tablet even after master is // restarted. - change_resp_1 = - ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets, &change_resp_1.cdc_sdk_checkpoint())); - ASSERT_GE(change_resp_1.cdc_sdk_proto_records_size(), 200); - LOG(INFO) << "Number of records after restart: " << change_resp_1.cdc_sdk_proto_records_size(); - // Now that there are no more records to stream, further calls of 'GetChangesFromCDC' to the same - // tablet should fail. + // Since split is complete at this stage, parent tablet will report an error + // and we should be able to get the records from the children. ASSERT_NOK(GetChangesFromCDC(stream_id, tablets, &change_resp_1.cdc_sdk_checkpoint())); + + // Get children tablets. + ASSERT_OK(test_client()->GetTablets(table, 0, &tablets, /* partition_list_version =*/nullptr)); + ASSERT_EQ(2, tablets.size()); + + GetChangesResponsePB child_resp_1 = + ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets, &change_resp_1.cdc_sdk_checkpoint(), 0)); + GetChangesResponsePB child_resp_2 = + ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets, &change_resp_1.cdc_sdk_checkpoint(), 1)); + + ASSERT_GE( + child_resp_1.cdc_sdk_proto_records_size() + child_resp_2.cdc_sdk_proto_records_size(), 200); + LOG(INFO) << "Number of records after restart: " << child_resp_1.cdc_sdk_proto_records_size() + + child_resp_2.cdc_sdk_proto_records_size(); } TEST_F(CDCSDKTabletSplitTest, YB_DISABLE_TEST_IN_TSAN(TestGetChangesOnChildrenOnSplit)) { @@ -439,14 +452,18 @@ TEST_F( LOG(INFO) << "All nodes restarted"; SleepFor(MonoDelta::FromSeconds(10)); - change_resp_1 = - ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets, &change_resp_1.cdc_sdk_checkpoint())); - ASSERT_GE(change_resp_1.cdc_sdk_proto_records_size(), 200); - LOG(INFO) << "Number of records after restart: " << change_resp_1.cdc_sdk_proto_records_size(); - - // Now that there are no more records to stream, further calls of 'GetChangesFromCDC' to the same - // tablet should fail. - ASSERT_NOK(GetChangesFromCDC(stream_id, tablets, &change_resp_1.cdc_sdk_checkpoint())); + google::protobuf::RepeatedPtrField tablets_after_split; + ASSERT_OK(test_client()->GetTablets(table, 0, &tablets_after_split, nullptr)); + + GetChangesResponsePB resp_1 = + ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets_after_split, + &change_resp_1.cdc_sdk_checkpoint(), 0)); + GetChangesResponsePB resp_2 = + ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets_after_split, + &change_resp_1.cdc_sdk_checkpoint(), 1)); + ASSERT_GE(resp_1.cdc_sdk_proto_records_size() + resp_2.cdc_sdk_proto_records_size(), 200); + LOG(INFO) << "Number of records after restart: " + << resp_1.cdc_sdk_proto_records_size() + resp_2.cdc_sdk_proto_records_size(); } TEST_F(CDCSDKTabletSplitTest, YB_DISABLE_TEST_IN_TSAN(TestGetChangesMultipleStreamsTabletSplit)) { @@ -492,22 +509,39 @@ TEST_F(CDCSDKTabletSplitTest, YB_DISABLE_TEST_IN_TSAN(TestGetChangesMultipleStre WaitUntilSplitIsSuccesful(tablets.Get(0).tablet_id(), table); - change_resp_1 = - ASSERT_RESULT(GetChangesFromCDC(stream_id_1, tablets, &change_resp_1.cdc_sdk_checkpoint())); - ASSERT_GE(change_resp_1.cdc_sdk_proto_records_size(), 100); - LOG(INFO) << "Number of records on first stream after split: " - << change_resp_1.cdc_sdk_proto_records_size(); + // Get new tablets. + google::protobuf::RepeatedPtrField tablets_after_split; + ASSERT_OK(test_client()->GetTablets(table, 0, &tablets_after_split, nullptr)); - // Now that there are no more records to stream, further calls of 'GetChangesFromCDC' to the same - // tablet should fail. ASSERT_NOK(GetChangesFromCDC(stream_id_1, tablets, &change_resp_1.cdc_sdk_checkpoint())); - // Calling GetChanges on stream 2 should still return around 200 records. - change_resp_2 = - ASSERT_RESULT(GetChangesFromCDC(stream_id_2, tablets, &change_resp_2.cdc_sdk_checkpoint())); - ASSERT_GE(change_resp_2.cdc_sdk_proto_records_size(), 200); + GetChangesResponsePB stream_1_resp_1 = + ASSERT_RESULT(GetChangesFromCDC(stream_id_1, tablets_after_split, + &change_resp_1.cdc_sdk_checkpoint(), 0)); + GetChangesResponsePB stream_1_resp_2 = + ASSERT_RESULT(GetChangesFromCDC(stream_id_1, tablets_after_split, + &change_resp_1.cdc_sdk_checkpoint(), 1)); + ASSERT_GE( + stream_1_resp_1.cdc_sdk_proto_records_size() + stream_1_resp_2.cdc_sdk_proto_records_size(), + 100); + LOG(INFO) << "Number of records on first stream after split: " + << stream_1_resp_1.cdc_sdk_proto_records_size() + + stream_1_resp_2.cdc_sdk_proto_records_size(); + + ASSERT_NOK(GetChangesFromCDC(stream_id_1, tablets, &change_resp_1.cdc_sdk_checkpoint())); - ASSERT_NOK(GetChangesFromCDC(stream_id_2, tablets, &change_resp_2.cdc_sdk_checkpoint())); + GetChangesResponsePB stream_2_resp_1 = + ASSERT_RESULT(GetChangesFromCDC(stream_id_2, tablets_after_split, + &change_resp_2.cdc_sdk_checkpoint(), 0)); + GetChangesResponsePB stream_2_resp_2 = + ASSERT_RESULT(GetChangesFromCDC(stream_id_2, tablets_after_split, + &change_resp_2.cdc_sdk_checkpoint(), 1)); + ASSERT_GE( + stream_2_resp_1.cdc_sdk_proto_records_size() + stream_2_resp_2.cdc_sdk_proto_records_size(), + 200); + LOG(INFO) << "Number of records on first stream after split: " + << stream_2_resp_1.cdc_sdk_proto_records_size() + + stream_2_resp_2.cdc_sdk_proto_records_size(); } TEST_F(CDCSDKTabletSplitTest, YB_DISABLE_TEST_IN_TSAN(TestSetCDCCheckpointAfterTabletSplit)) { @@ -823,16 +857,24 @@ TEST_F( WaitUntilSplitIsSuccesful(tablets.Get(0).tablet_id(), table); - change_resp_1 = - ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets, &change_resp_1.cdc_sdk_checkpoint())); - ASSERT_GE(change_resp_1.cdc_sdk_proto_records_size(), 200); - LOG(INFO) << "Number of records after restart: " << change_resp_1.cdc_sdk_proto_records_size(); - - // Now that there are no more records to stream, further calls of 'GetChangesFromCDC' to the same - // tablet should fail. + // Call on parent tablet should fail since that has been split. ASSERT_NOK(GetChangesFromCDC(stream_id, tablets, &change_resp_1.cdc_sdk_checkpoint())); LOG(INFO) << "The tablet split error is now communicated to the client."; + google::protobuf::RepeatedPtrField tablets_after_split; + ASSERT_OK(test_client()->GetTablets(table, 0, &tablets_after_split, nullptr)); + + GetChangesResponsePB resp_1 = + ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets_after_split, + &change_resp_1.cdc_sdk_checkpoint(), 0)); + GetChangesResponsePB resp_2 = + ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets_after_split, + &change_resp_1.cdc_sdk_checkpoint(), 1)); + + ASSERT_GE(resp_1.cdc_sdk_proto_records_size() + resp_2.cdc_sdk_proto_records_size(), 200); + LOG(INFO) << "Number of records after restart: " + << resp_1.cdc_sdk_proto_records_size() + resp_2.cdc_sdk_proto_records_size(); + auto get_tablets_resp = ASSERT_RESULT(GetTabletListToPollForCDC(stream_id, table_id, tablets[0].tablet_id())); ASSERT_EQ(get_tablets_resp.tablet_checkpoint_pairs().size(), 2); @@ -844,9 +886,6 @@ TEST_F( // Wait until the 'cdc_parent_tablet_deletion_task_' has run. SleepFor(MonoDelta::FromSeconds(2)); - google::protobuf::RepeatedPtrField tablets_after_split; - ASSERT_OK(test_client()->GetTablets( - table, 0, &tablets_after_split, /* partition_list_version =*/nullptr)); bool saw_row_child_one = false; bool saw_row_child_two = false; @@ -1036,7 +1075,7 @@ TEST_F( GetChangesResponsePB change_resp_1 = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets)); TableId table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName)); - ASSERT_OK(WriteRowsHelper(1, 200, &test_cluster_, true)); + ASSERT_OK(WriteRowsHelper(0, 200, &test_cluster_, true)); ASSERT_OK(test_client()->FlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ true)); @@ -1057,10 +1096,8 @@ TEST_F( WaitUntilSplitIsSuccesful(tablets_after_first_split.Get(0).tablet_id(), table, 3); - change_resp_1 = - ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets, &change_resp_1.cdc_sdk_checkpoint())); - ASSERT_GE(change_resp_1.cdc_sdk_proto_records_size(), 200); - LOG(INFO) << "Number of records after restart: " << change_resp_1.cdc_sdk_proto_records_size(); + // GetChanges call would fail since the tablet is split. + ASSERT_NOK(GetChangesFromCDC(stream_id, tablets, &change_resp_1.cdc_sdk_checkpoint())); // We are calling: "GetTabletListToPollForCDC" when the tablet split on the parent tablet has // still not been communicated to the client. Hence we should get only the original parent tablet. @@ -1071,10 +1108,6 @@ TEST_F( ASSERT_EQ(tablet_id, tablets[0].tablet_id()); } - // Now that there are no more records to stream, further calls of 'GetChangesFromCDC' to the same - // tablet should fail. - ASSERT_NOK(GetChangesFromCDC(stream_id, tablets, &change_resp_1.cdc_sdk_checkpoint())); - // Wait until the 'cdc_parent_tablet_deletion_task_' has run. SleepFor(MonoDelta::FromSeconds(2)); @@ -1101,10 +1134,17 @@ TEST_F( } ASSERT_TRUE(saw_first_child && saw_second_child); - change_resp_1 = ASSERT_RESULT( - GetChangesFromCDC(stream_id, tablets_after_first_split, &change_resp_1.cdc_sdk_checkpoint())); - ASSERT_NOK( - GetChangesFromCDC(stream_id, tablets_after_first_split, &change_resp_1.cdc_sdk_checkpoint())); + // These calls will return errors since the tablet have been split further. + ASSERT_NOK(GetChangesFromCDC(stream_id, tablets_after_first_split, + &change_resp_1.cdc_sdk_checkpoint(), 0)); + + + std::map tablet_to_checkpoint; + tablet_to_checkpoint[tablets.Get(0).tablet_id()] = change_resp_1.cdc_sdk_checkpoint(); + + int64 received_records = ASSERT_RESULT(GetChangeRecordCount( + stream_id, table, tablets, tablet_to_checkpoint, 400)); + ASSERT_EQ(received_records, 400); } TEST_F(CDCSDKTabletSplitTest, YB_DISABLE_TEST_IN_TSAN(TestTabletSplitOnAddedTableForCDC)) { @@ -1150,22 +1190,18 @@ TEST_F(CDCSDKTabletSplitTest, YB_DISABLE_TEST_IN_TSAN(TestTabletSplitOnAddedTabl change_resp = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets_2, &change_resp.cdc_sdk_checkpoint())); - ASSERT_OK(WriteRowsHelper(1, 200, &test_cluster_, true, 2, "test_table_1")); + ASSERT_OK(WriteRowsHelper(0, 200, &test_cluster_, true, 2, "test_table_1")); ASSERT_OK(test_client()->FlushTables( {table_2_id}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ true)); ASSERT_OK(test_cluster_.mini_cluster_->CompactTablets()); WaitUntilSplitIsSuccesful(tablets_2.Get(0).tablet_id(), table_2); - // Verify GetChanges returns records even after tablet split, i.e tablets of the newly added table - // are hidden instead of being deleted. - change_resp = - ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets_2, &change_resp.cdc_sdk_checkpoint())); - ASSERT_GE(change_resp.cdc_sdk_proto_records_size(), 200); + std::map tablet_to_checkpoint; + int64 received_records = ASSERT_RESULT(GetChangeRecordCount( + stream_id, table_2, tablets_2, tablet_to_checkpoint, 200)); - // Now that all the required records have been streamed, verify that the tablet split error is - // reported. - ASSERT_NOK(GetChangesFromCDC(stream_id, tablets_2, &change_resp.cdc_sdk_checkpoint())); + ASSERT_EQ(received_records, 200); } TEST_F( @@ -1224,15 +1260,13 @@ TEST_F( ASSERT_OK(test_cluster_.mini_cluster_->CompactTablets()); WaitUntilSplitIsSuccesful(tablets_2.Get(0).tablet_id(), table_2); - // Verify GetChanges returns records even after tablet split, i.e tablets of the newly added table - // are hidden instead of being deleted. - change_resp = - ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets_2, &change_resp.cdc_sdk_checkpoint())); - ASSERT_GE(change_resp.cdc_sdk_proto_records_size(), 200); + std::map tablet_to_checkpoint; + tablet_to_checkpoint[tablets_2.Get(0).tablet_id()] = change_resp.cdc_sdk_checkpoint(); + int64 record_count = ASSERT_RESULT(GetChangeRecordCount( + stream_id, table_2, tablets_2, tablet_to_checkpoint, 199)); - // Now that all the required records have been streamed, verify that the tablet split error is - // reported. - ASSERT_NOK(GetChangesFromCDC(stream_id, tablets_2, &change_resp.cdc_sdk_checkpoint())); + // Verify the count of records for the table. + ASSERT_EQ(record_count, 199); } TEST_F(CDCSDKTabletSplitTest, YB_DISABLE_TEST_IN_TSAN(TestTabletSplitDuringSnapshot)) { @@ -1320,7 +1354,7 @@ TEST_F(CDCSDKTabletSplitTest, YB_DISABLE_TEST_IN_TSAN(TestTransactionCommitAfter auto conn = ASSERT_RESULT(test_cluster_.ConnectToDB(kNamespaceName)); ASSERT_OK(conn.Execute("BEGIN")); - ASSERT_OK(WriteRowsHelper(1, 200, &test_cluster_, true)); + ASSERT_OK(WriteRowsHelper(0, 200, &test_cluster_, true)); ASSERT_OK(test_client()->FlushTables( {table}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ true)); @@ -1373,12 +1407,13 @@ TEST_F(CDCSDKTabletSplitTest, YB_DISABLE_TEST_IN_TSAN(TestTransactionCommitAfter change_resp_1 = *result; } - uint32_t child1_record_count = GetTotalNumRecordsInTablet( - stream_id, first_tablet_after_split, &change_resp_1.cdc_sdk_checkpoint()); - uint32_t child2_record_count = GetTotalNumRecordsInTablet( - stream_id, second_tablet_after_split, &change_resp_1.cdc_sdk_checkpoint()); + std::map tablet_to_checkpoint; + tablet_to_checkpoint[tablets.Get(0).tablet_id()] = change_resp_1.cdc_sdk_checkpoint(); + + int64 received_records = ASSERT_RESULT(GetChangeRecordCount( + stream_id, table, tablets, tablet_to_checkpoint, 400)); - ASSERT_GE(child1_record_count + child2_record_count, 200); + ASSERT_EQ(received_records, 400); } TEST_F( @@ -1502,7 +1537,7 @@ TEST_F(CDCSDKTabletSplitTest, YB_DISABLE_TEST_IN_TSAN(TestRecordCountsAfterMulti ASSERT_FALSE(resp.has_error()); TableId table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName)); - ASSERT_OK(WriteRows(1, 200, &test_cluster_)); + ASSERT_OK(WriteRows(0, 200, &test_cluster_)); ASSERT_OK(test_client()->FlushTables({table.table_id()}, false, 100, false)); WaitUntilSplitIsSuccesful(tablets.Get(0).tablet_id(), table); @@ -1529,26 +1564,96 @@ TEST_F(CDCSDKTabletSplitTest, YB_DISABLE_TEST_IN_TSAN(TestRecordCountsAfterMulti ASSERT_OK(WriteRows(600, 1000, &test_cluster_)); ASSERT_OK(test_client()->FlushTables({table.table_id()}, false, 100, false)); - const int expected_total_records = 3006; - const int expected_total_splits = 4; - // The array stores counts of DDL, INSERT, UPDATE, DELETE, READ, TRUNCATE, BEGIN, COMMIT in that - // order. - const int expected_records_count[] = {9, 999, 0, 0, 0, 0, 999, 999}; + const int expected_total_records = 1000; + std::map tablet_to_checkpoint; + int64 total_records = ASSERT_RESULT(GetChangeRecordCount( + stream_id, table, tablets, tablet_to_checkpoint, expected_total_records)); + + LOG(INFO) << "Got " << total_records << " records"; + ASSERT_EQ(expected_total_records, total_records); +} + +TEST_F(CDCSDKTabletSplitTest, YB_DISABLE_TEST_IN_TSAN(TestSplitAfterSplit)) { + ANNOTATE_UNPROTECTED_WRITE(FLAGS_update_min_cdc_indices_interval_secs) = 1; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_state_checkpoint_update_interval_ms) = 0; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_aborted_intent_cleanup_ms) = 1000; + ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_parent_tablet_deletion_task_retry_secs) = 1; + + google::protobuf::RepeatedPtrField tablets; + ASSERT_OK(SetUpWithParams(1, 1, false)); + const uint32_t num_tablets = 1; + + auto table = ASSERT_RESULT(CreateTable(&test_cluster_, kNamespaceName, kTableName, num_tablets)); + ASSERT_OK(test_client()->GetTablets(table, 0, &tablets, nullptr)); + ASSERT_EQ(tablets.size(), num_tablets); + + xrepl::StreamId stream_id = ASSERT_RESULT(CreateDBStream(IMPLICIT)); + auto resp = ASSERT_RESULT(SetCDCCheckpoint(stream_id, tablets)); + ASSERT_FALSE(resp.has_error()); + + TableId table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName)); + ASSERT_OK(WriteRows(0, 200, &test_cluster_)); + ASSERT_OK(test_client()->FlushTables({table.table_id()}, false, 100, false)); + + WaitUntilSplitIsSuccesful(tablets.Get(0).tablet_id(), table); - int total_records = 0; - int total_splits = 0; - int record_count[] = {0, 0, 0, 0, 0, 0, 0, 0}; + ASSERT_OK(WriteRows(200, 400, &test_cluster_)); + ASSERT_OK(test_client()->FlushTables({table.table_id()}, false, 100, false)); - GetRecordsAndSplitCount( - stream_id, tablets[0].tablet_id(), table_id, record_count, &total_records, &total_splits); + google::protobuf::RepeatedPtrField tablets_after_first_split; + ASSERT_OK(test_client()->GetTablets(table, 0, &tablets_after_first_split, nullptr)); + ASSERT_EQ(tablets_after_first_split.size(), 2); - for (int i = 0; i < 8; i++) { - ASSERT_EQ(expected_records_count[i], record_count[i]); + WaitUntilSplitIsSuccesful(tablets_after_first_split.Get(0).tablet_id(), table, 3); + WaitUntilSplitIsSuccesful(tablets_after_first_split.Get(1).tablet_id(), table, 4); + + // Don't insert anything, just split the tablets further + ASSERT_OK(test_client()->FlushTables({table.table_id()}, false, 100, false)); + + google::protobuf::RepeatedPtrField tablets_after_third_split; + ASSERT_OK(test_client()->GetTablets(table, 0, &tablets_after_third_split, nullptr)); + ASSERT_EQ(tablets_after_third_split.size(), 4); + + WaitUntilSplitIsSuccesful(tablets_after_third_split.Get(1).tablet_id(), table, 5); + + std::map checkpoint_map; + int64 received_records = + ASSERT_RESULT(GetChangeRecordCount(stream_id, table, tablets, checkpoint_map, 400)); + + ASSERT_EQ(received_records, 400); + + google::protobuf::RepeatedPtrField final_tablets; + ASSERT_OK(test_client()->GetTablets(table, 0, &final_tablets, nullptr)); + + std::unordered_set expected_tablet_ids; + for (int i = 0; i < final_tablets.size(); ++i) { + expected_tablet_ids.insert(final_tablets.Get(i).tablet_id()); } - LOG(INFO) << "Got " << total_records << " records and " << total_splits << " tablet splits"; - ASSERT_EQ(expected_total_records, total_records); - ASSERT_EQ(expected_total_splits, total_splits); + // Verify that the cdc_state has only current set of children tablets. + CDCStateTable cdc_state_table(test_client()); + ASSERT_OK(WaitFor( + [&]() -> Result { + Status s; + std::unordered_set tablets_found; + for (auto row_result : VERIFY_RESULT(cdc_state_table.GetTableRange( + CDCStateTableEntrySelector().IncludeCheckpoint(), &s))) { + RETURN_NOT_OK(row_result); + auto& row = *row_result; + if (row.key.stream_id == stream_id && !expected_tablet_ids.contains(row.key.tablet_id)) { + // Still have a tablet left over from a dropped table. + return false; + } + if (row.key.stream_id == stream_id) { + tablets_found.insert(row.key.tablet_id); + } + } + RETURN_NOT_OK(s); + LOG(INFO) << "tablets found: " << AsString(tablets_found) + << ", expected tablets: " << AsString(expected_tablet_ids); + return expected_tablet_ids == tablets_found; + }, + MonoDelta::FromSeconds(60), "Waiting for stream metadata cleanup.")); } TEST_F( @@ -1572,7 +1677,7 @@ TEST_F( ASSERT_FALSE(resp.has_error()); TableId table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName)); - ASSERT_OK(WriteRows(1, 200, &test_cluster_)); + ASSERT_OK(WriteRows(0, 200, &test_cluster_)); ASSERT_OK(test_client()->FlushTables({table.table_id()}, false, 100, false)); WaitUntilSplitIsSuccesful(tablets.Get(0).tablet_id(), table); @@ -1584,26 +1689,13 @@ TEST_F( ASSERT_OK(test_client()->GetTablets(table, 0, &tablets_after_first_split, nullptr)); ASSERT_EQ(tablets_after_first_split.size(), 2); - const int expected_total_records = 1200; - const int expected_total_splits = 1; - // The array stores counts of DDL, INSERT, UPDATE, DELETE, READ, TRUNCATE, BEGIN, COMMIT in that - // order. - const int expected_records_count[] = {3, 399, 0, 0, 0, 0, 399, 399}; - - int total_records = 0; - int total_splits = 0; - int record_count[] = {0, 0, 0, 0, 0, 0, 0, 0}; - - GetRecordsAndSplitCount( - stream_id, tablets[0].tablet_id(), table_id, record_count, &total_records, &total_splits); - - for (int i = 0; i < 8; i++) { - ASSERT_EQ(expected_records_count[i], record_count[i]); - } + const int expected_total_records = 400; + std::map tablet_to_checkpoint; + int64 total_records = ASSERT_RESULT(GetChangeRecordCount( + stream_id, table, tablets, tablet_to_checkpoint, expected_total_records)); - LOG(INFO) << "Got " << total_records << " records and " << total_splits << " tablet splits"; + LOG(INFO) << "Got " << total_records << " records"; ASSERT_EQ(expected_total_records, total_records); - ASSERT_EQ(expected_total_splits, total_splits); } TEST_F(CDCSDKTabletSplitTest, YB_DISABLE_TEST_IN_TSAN(TestGetTabletListToPollForCDCWithTabletId)) { @@ -1625,7 +1717,7 @@ TEST_F(CDCSDKTabletSplitTest, YB_DISABLE_TEST_IN_TSAN(TestGetTabletListToPollFor GetChangesResponsePB change_resp_1 = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets)); TableId table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName)); - ASSERT_OK(WriteRowsHelper(1, 200, &test_cluster_, true)); + ASSERT_OK(WriteRowsHelper(0, 200, &test_cluster_, true)); ASSERT_OK(test_client()->FlushTables( {table.table_id()}, /* add_indexes = */ false, /* timeout_secs = */ 30, /* is_compaction = */ true)); @@ -1635,16 +1727,13 @@ TEST_F(CDCSDKTabletSplitTest, YB_DISABLE_TEST_IN_TSAN(TestGetTabletListToPollFor WaitUntilSplitIsSuccesful(tablets.Get(0).tablet_id(), table); - ASSERT_OK(WaitForGetChangesToFetchRecords( - &change_resp_1, stream_id, tablets, 199, &change_resp_1.cdc_sdk_checkpoint())); - - ASSERT_GE(change_resp_1.cdc_sdk_proto_records_size(), 200); - LOG(INFO) << "Number of records after restart: " << change_resp_1.cdc_sdk_proto_records_size(); + auto cp = change_resp_1.cdc_sdk_checkpoint(); + std::map tablet_to_checkpoint; + tablet_to_checkpoint[tablets.Get(0).tablet_id()] = cp; - // Now that there are no more records to stream, further calls of 'GetChangesFromCDC' to the same - // tablet should fail. - ASSERT_NOK(GetChangesFromCDC(stream_id, tablets, &change_resp_1.cdc_sdk_checkpoint())); - LOG(INFO) << "The tablet split error is now communicated to the client."; + int64 record_count = ASSERT_RESULT(GetChangeRecordCount( + stream_id, table, tablets, tablet_to_checkpoint, 200)); + ASSERT_EQ(record_count, 200); auto get_tablets_resp = ASSERT_RESULT(GetTabletListToPollForCDC(stream_id, table_id, tablets[0].tablet_id())); diff --git a/src/yb/integration-tests/cdcsdk_ysql_test_base.cc b/src/yb/integration-tests/cdcsdk_ysql_test_base.cc index b487235a4654..1fb2d94bb40f 100644 --- a/src/yb/integration-tests/cdcsdk_ysql_test_base.cc +++ b/src/yb/integration-tests/cdcsdk_ysql_test_base.cc @@ -11,11 +11,22 @@ // under the License. #include "yb/integration-tests/cdcsdk_ysql_test_base.h" +#include +#include +#include +#include +#include +#include "yb/cdc/cdc_fwd.h" #include "yb/cdc/cdc_service.pb.h" #include "yb/cdc/cdc_state_table.h" +#include "yb/client/yb_table_name.h" +#include "yb/common/entity_ids_types.h" #include "yb/master/catalog_manager.h" +#include "yb/master/master_client.pb.h" +#include "yb/rpc/rpc_controller.h" +#include "yb/util/status.h" namespace yb { namespace cdc { @@ -239,11 +250,15 @@ namespace cdc { Status CDCSDKYsqlTest::WriteRowsHelper( uint32_t start, uint32_t end, Cluster* cluster, bool flag, uint32_t num_cols, - const char* const table_name, const vector& optional_cols_name) { + const char* const table_name, const vector& optional_cols_name, + const bool transaction_enabled) { auto conn = VERIFY_RESULT(cluster->ConnectToDB(kNamespaceName)); LOG(INFO) << "Writing " << end - start << " row(s) within transaction"; - RETURN_NOT_OK(conn.Execute("BEGIN")); + if (transaction_enabled) { + RETURN_NOT_OK(conn.Execute("BEGIN")); + } + for (uint32_t i = start; i < end; ++i) { if (!optional_cols_name.empty()) { std::stringstream columns_name; @@ -271,11 +286,15 @@ namespace cdc { RETURN_NOT_OK(conn.ExecuteFormat(statement, table_name)); } } - if (flag) { - RETURN_NOT_OK(conn.Execute("COMMIT")); - } else { - RETURN_NOT_OK(conn.Execute("ABORT")); + + if (transaction_enabled) { + if (flag) { + RETURN_NOT_OK(conn.Execute("COMMIT")); + } else { + RETURN_NOT_OK(conn.Execute("ABORT")); + } } + return Status::OK(); } @@ -848,7 +867,11 @@ namespace cdc { change_req->set_tablet_id(tablet_id); change_req->mutable_from_cdc_sdk_checkpoint()->set_term(cp.term()); change_req->mutable_from_cdc_sdk_checkpoint()->set_index(cp.index()); - change_req->mutable_from_cdc_sdk_checkpoint()->set_key(cp.key()); + + if (cp.has_key()) { + change_req->mutable_from_cdc_sdk_checkpoint()->set_key(cp.key()); + } + change_req->mutable_from_cdc_sdk_checkpoint()->set_write_id(cp.write_id()); } @@ -1351,6 +1374,131 @@ namespace cdc { return change_resp; } + Result CDCSDKYsqlTest::GetChangeRecordCount( + const xrepl::StreamId& stream_id, + const YBTableName& table, + const google::protobuf::RepeatedPtrField& tablets, + std::map tablet_to_checkpoint, + const int64 expected_total_records, + bool explicit_checkpointing_enabled, + std::map> records) { + std::vector tablet_ids; + std::map explicit_checkpoints; + for (int i = 0; i < tablets.size(); ++i) { + tablet_ids.push_back(tablets.Get(i).tablet_id()); + } + + GetChangesRequestPB change_req; + GetChangesResponsePB change_resp; + + CDCSDKCheckpointPB explicit_checkpoint; + explicit_checkpoint.set_term(0); + explicit_checkpoint.set_index(0); + explicit_checkpoint.set_write_id(0); + explicit_checkpoint.set_key(""); + + int64 total_record_count = 0; + + RETURN_NOT_OK(WaitFor([&]() -> Result { + for (uint32_t i = 0; i < tablet_ids.size(); ++i) { + auto cp = tablet_to_checkpoint.find(tablet_ids[i]); + + if (cp == tablet_to_checkpoint.end()) { + PrepareChangeRequest(&change_req, stream_id, tablet_ids[i], i); + } else { + PrepareChangeRequest(&change_req, stream_id, tablet_ids[i], cp->second, i); + } + + // If the stream is configured for explicit checkpointing, then we will populate the + // explicit_cdc_sdk_checkpoint field as well. + auto iter = explicit_checkpoints.find(tablet_ids[i]); + CDCSDKCheckpointPB explicit_cp; + if (explicit_checkpointing_enabled) { + if (iter == explicit_checkpoints.end()) { + change_req.mutable_explicit_cdc_sdk_checkpoint()->CopyFrom(explicit_checkpoint); + } else { + explicit_cp = iter->second; + change_req.mutable_explicit_cdc_sdk_checkpoint()->CopyFrom(explicit_cp); + + } + } + + rpc::RpcController get_changes_rpc; + + LOG(INFO) << "Calling GetChanges on " << tablet_ids[i] << " with " + << change_req.from_cdc_sdk_checkpoint().term() << ":" + << change_req.from_cdc_sdk_checkpoint().index(); + auto status = cdc_proxy_->GetChanges(change_req, &change_resp, &get_changes_rpc); + + if (status.ok() && !change_resp.has_error()) { + // Process the records here. + for (auto record : change_resp.cdc_sdk_proto_records()) { + if (IsDMLRecord(record)) { + ++total_record_count; + } + + if (record.row_message().op() != RowMessage::DDL) { + records[tablet_ids[i]].push_back(record); + } + + if (explicit_checkpointing_enabled) { + explicit_cp.set_term(record.from_op_id().term()); + explicit_cp.set_index(record.from_op_id().index()); + explicit_cp.set_key(record.from_op_id().write_id_key()); + explicit_cp.set_write_id(record.from_op_id().write_id()); + explicit_cp.set_snapshot_time(record.row_message().commit_time() - 1); + } + } + + LOG(INFO) << "Received records for tablet " << tablet_ids[i] << ": " + << change_resp.cdc_sdk_proto_records_size() << " with response checkpoint " + << change_resp.cdc_sdk_checkpoint().term() << ":" + << change_resp.cdc_sdk_checkpoint().index(); + + tablet_to_checkpoint[tablet_ids[i]] = change_resp.cdc_sdk_checkpoint(); + } else { + status = StatusFromPB(change_resp.error().status()); + if (status.IsTabletSplit()) { + LOG(INFO) << "Got a tablet split on tablet " << tablet_ids[i] + << ", fetching new tablets"; + + auto get_tablets_resp = VERIFY_RESULT( + GetTabletListToPollForCDC(stream_id, table.table_id(), tablet_ids[i])); + + VERIFY_EQ(get_tablets_resp.tablet_checkpoint_pairs_size(), 2); + + // Store the opIds for the children tablets. + for (int j = 0; j < get_tablets_resp.tablet_checkpoint_pairs_size(); ++j) { + auto pair = get_tablets_resp.tablet_checkpoint_pairs(j); + tablet_to_checkpoint[pair.tablet_locations().tablet_id()] = pair.cdc_sdk_checkpoint(); + explicit_checkpoints[pair.tablet_locations().tablet_id()] = pair.cdc_sdk_checkpoint(); + + tablet_ids.push_back(pair.tablet_locations().tablet_id()); + + LOG(INFO) << "Assigned from_op_id " << pair.cdc_sdk_checkpoint().term() << ":" + << pair.cdc_sdk_checkpoint().index() << " to child " + << pair.tablet_locations().tablet_id(); + } + + tablet_ids.erase(find(tablet_ids.begin(), tablet_ids.end(), tablet_ids[i])); + + break; + } else { + RETURN_NOT_OK(status); + } + } + } + + LOG(INFO) << "Total records consumed so far: " << total_record_count; + + return total_record_count >= expected_total_records; + }, + MonoDelta::FromSeconds(300), + "Timed out while fetching the changes")); + + return total_record_count; + } + Result CDCSDKYsqlTest::GetChangesFromCDCWithoutRetry( const xrepl::StreamId& stream_id, const google::protobuf::RepeatedPtrField& tablets, diff --git a/src/yb/integration-tests/cdcsdk_ysql_test_base.h b/src/yb/integration-tests/cdcsdk_ysql_test_base.h index 5d578660f79e..b0499a997acb 100644 --- a/src/yb/integration-tests/cdcsdk_ysql_test_base.h +++ b/src/yb/integration-tests/cdcsdk_ysql_test_base.h @@ -14,6 +14,7 @@ #include #include #include +#include #include #include @@ -214,7 +215,8 @@ class CDCSDKYsqlTest : public CDCSDKTestBase { Status WriteRowsHelper( uint32_t start, uint32_t end, Cluster* cluster, bool flag, uint32_t num_cols = 2, - const char* const table_name = kTableName, const vector& optional_cols_name = {}); + const char* const table_name = kTableName, const vector& optional_cols_name = {}, + const bool trasaction_enabled = true); Status CreateTableWithoutPK(Cluster* cluster); @@ -337,6 +339,20 @@ class CDCSDKYsqlTest : public CDCSDKTestBase { const uint64_t cdc_sdk_safe_time, bool bootstrap); + bool IsDMLRecord(const CDCSDKProtoRecordPB& record) { + return record.row_message().op() == RowMessage::INSERT + || record.row_message().op() == RowMessage::UPDATE + || record.row_message().op() == RowMessage::DELETE + || record.row_message().op() == RowMessage::READ; + } + + Result GetChangeRecordCount( + const xrepl::StreamId& stream_id, const YBTableName& table, + const google::protobuf::RepeatedPtrField& tablets, + std::map tablet_to_checkpoint, + const int64 expected_total_records, bool explicit_checkpointing_enabled = false, + std::map> records = {}); + Result SetCDCCheckpoint( const xrepl::StreamId& stream_id, const google::protobuf::RepeatedPtrField& tablets,