Skip to content

Commit

Permalink
[BACKPORT 2.20][#18479] CDCSDK: Report tablet split as soon as we det…
Browse files Browse the repository at this point in the history
…ect it

Summary:
Original commit: c30499b / D28415

For CDCSDK, whenever a tablet split occurs, it is reported to the client as soon as we read the WAL records and hit a `SPLIT_OP`, the client then retrieves the children tablets and polls on them. This diff changes the mechanism so that now we check in every `GetChanges` call whether a tablet has been split, if it has we communicate the split to the client as soon as we detect the tablet being polled is split.

Now when the children tablets are being polled, we can have duplicate data since the WAL is duplicated. To overcome that, while iterating over the records in the WAL, we retrieve the hash_key of the key being read and check if it is within the bounds of the tablet being polled, if it is then we stream the record, otherwise we ignore.

**Summary of changes in this diff:**
1. Changed the time at which split is reported to client. Earlier it was reported when we hit the `SPLIT_OP` in WAL, now we report it as soon as we detect it on the tablet.
2. Modified the logic to get the children tablets using `tablet_peer` rather than iterating over the complete list.
3. Added and modified tests to comply with the new changes.

**2.20 specific changes:**
Few conflicts were there while cherry picking the original commit because of D30724, they were resolved in the files `cdcsdk_tablet_split-test` and `cdcsdk_consistent_stream-test`.

Jira: DB-7447

Test Plan: Run tests to ensure no regression.

Reviewers: skumar, asrinivasan, stiwary, jhe

Reviewed By: skumar

Subscribers: ycdcxcluster

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D30999
  • Loading branch information
vaibhav-yb committed Dec 14, 2023
1 parent bd6e3ac commit f3c7647
Show file tree
Hide file tree
Showing 7 changed files with 460 additions and 209 deletions.
2 changes: 0 additions & 2 deletions java/yb-cdc/src/test/java/org/yb/cdc/ysql/Test3IntCol.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,6 @@ public void testLongRunningScript() {
// begin;
// update test set b=b+1, c=c+1 where a=1;
// end;
new ExpectedRecord3Proto(-1, -1, -1, CdcService.RowMessage.Op.BEGIN),
new ExpectedRecord3Proto(-1, -1, -1, CdcService.RowMessage.Op.COMMIT),

// begin;
// insert into test values(11,12,13);
Expand Down
34 changes: 20 additions & 14 deletions src/yb/cdc/cdc_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -530,26 +530,27 @@ class CDCServiceImpl::Impl {

Status AddEntriesForChildrenTabletsOnSplitOp(
const ProducerTabletInfo& info,
const std::array<const master::TabletLocationsPB*, 2>& tablets,
const OpId& split_op_id) {
const std::array<TabletId, 2>& tablets,
const OpId& children_op_id) {
std::lock_guard l(mutex_);

for (const auto& tablet : tablets) {
ProducerTabletInfo producer_info{
info.replication_group_id, info.stream_id, tablet->tablet_id()};
info.replication_group_id, info.stream_id, tablet};
tablet_checkpoints_.emplace(TabletCheckpointInfo{
.producer_tablet_info = producer_info,
.cdc_state_checkpoint =
TabletCheckpoint{
.op_id = split_op_id, .last_update_time = {}, .last_active_time = {}},
.op_id = children_op_id, .last_update_time = {}, .last_active_time = {}},
.sent_checkpoint =
TabletCheckpoint{
.op_id = split_op_id, .last_update_time = {}, .last_active_time = {}},
.op_id = children_op_id, .last_update_time = {}, .last_active_time = {}},
.mem_tracker = nullptr,
});
cdc_state_metadata_.emplace(CDCStateMetadataInfo{
.producer_tablet_info = producer_info,
.commit_timestamp = {},
.last_streamed_op_id = split_op_id,
.last_streamed_op_id = children_op_id,
.schema_details_map = {},
.mem_tracker = nullptr,
});
Expand Down Expand Up @@ -1755,6 +1756,8 @@ void CDCServiceImpl::GetChanges(
}
// This specific error indicates that a tablet split occured on the tablet.
if (status.IsTabletSplit()) {
LOG(INFO) << "Updating children tablets on detected split on tablet "
<< producer_tablet.tablet_id;
status = UpdateChildrenTabletsOnSplitOpForCDCSDK(producer_tablet);
RPC_STATUS_RETURN_ERROR(status, resp->mutable_error(), CDCErrorPB::INTERNAL_ERROR, context);

Expand Down Expand Up @@ -3980,29 +3983,32 @@ void CDCServiceImpl::IsBootstrapRequired(

Status CDCServiceImpl::UpdateChildrenTabletsOnSplitOpForCDCSDK(const ProducerTabletInfo& info) {
auto tablets = VERIFY_RESULT(GetTablets(info.stream_id, true /* ignore_errors */));

// Initializing the children to 0.0 to prevent garbage collection on them.
const OpId& children_op_id = OpId();

std::array<const master::TabletLocationsPB*, 2> children_tablets;
std::array<TabletId, 2> children;
uint found_children = 0;
for (auto const& tablet : tablets) {
if (tablet.has_split_parent_tablet_id() && tablet.split_parent_tablet_id() == info.tablet_id) {
children_tablets[found_children] = &tablet;
children[found_children] = tablet.tablet_id();
found_children += 1;

if (found_children == 2) {
break;
}
}
}
LOG_IF(DFATAL, found_children != 2)
<< "Could not find the two split children for the tablet: " << info.tablet_id;

// Add the entries for the children tablets in 'cdc_state_metadata_' and 'tablet_checkpoints_'.
RSTATUS_DCHECK(
found_children == 2, InternalError,
Format("Could not find the two split children for tablet: $0", info.tablet_id));

RETURN_NOT_OK_SET_CODE(
impl_->AddEntriesForChildrenTabletsOnSplitOp(info, children_tablets, children_op_id),
impl_->AddEntriesForChildrenTabletsOnSplitOp(info, children, children_op_id),
CDCError(CDCErrorPB::INTERNAL_ERROR));
VLOG(1) << "Added entries for children tablets: " << children_tablets[0]->tablet_id() << " and "
<< children_tablets[1]->tablet_id() << ", of parent tablet: " << info.tablet_id
VLOG(1) << "Added entries for children tablets: " << children[0] << " and "
<< children[1] << ", of parent tablet: " << info.tablet_id
<< ", to 'cdc_state_metadata_' and 'tablet_checkpoints_'";

return Status::OK();
Expand Down
63 changes: 47 additions & 16 deletions src/yb/cdc/cdcsdk_producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,14 @@
#include "yb/tablet/tablet_metadata.h"
#include "yb/tablet/tablet_peer.h"
#include "yb/tablet/tablet.h"
#include "yb/tablet/tablet_types.pb.h"
#include "yb/tablet/transaction_participant.h"

#include "yb/util/flags.h"
#include "yb/util/logging.h"
#include "yb/util/opid.h"
#include "yb/util/status.h"
#include "yb/util/status_format.h"

using std::string;

Expand Down Expand Up @@ -1050,6 +1054,8 @@ Status PopulateCDCSDKWriteRecord(
// We'll use DocDB key hash to identify the records that belong to the same row.
Slice prev_key;

uint32_t records_added = 0;

bool colocated = tablet_ptr->metadata()->colocated();
Schema schema = Schema();
SchemaVersion schema_version = std::numeric_limits<uint32_t>::max();
Expand Down Expand Up @@ -1088,6 +1094,16 @@ Status PopulateCDCSDKWriteRecord(
row_message->op() == RowMessage_Op_UPDATE)) {
Slice sub_doc_key = key;
dockv::SubDocKey decoded_key;

// With tablet splits we will end up reading records from this tablet's ancestors -
// only process records that are in this tablet's key range.
const auto& key_bounds = tablet_ptr->key_bounds();
if (!key_bounds.IsWithinBounds(key)) {
VLOG(1) << "Key for the read record is not within tablet bounds, skipping the key: "
<< primary_key.data();
continue;
}

RETURN_NOT_OK(decoded_key.DecodeFrom(&sub_doc_key, dockv::HybridTimeRequired::kFalse));
if (colocated) {
colocation_id = decoded_key.doc_key().colocation_id();
Expand Down Expand Up @@ -1135,6 +1151,7 @@ Status PopulateCDCSDKWriteRecord(

// Write pair contains record for different row. Create a new CDCRecord in this case.
proto_record = resp->add_cdc_sdk_proto_records();
++records_added;
row_message = proto_record->mutable_row_message();
modified_columns.clear();
row_message->set_pgschema_name(schema.SchemaName());
Expand Down Expand Up @@ -1267,6 +1284,14 @@ Status PopulateCDCSDKWriteRecord(
}

if (FLAGS_cdc_populate_end_markers_transactions) {
// If there are no records added, we do not need to populate the begin-commit block
// and we should return from here.
if (records_added == 0 && !resp->mutable_cdc_sdk_proto_records()->empty()) {
VLOG(2) << "Removing the added BEGIN record because there are no other records to add";
resp->mutable_cdc_sdk_proto_records()->RemoveLast();
return Status::OK();
}

FillCommitRecordForSingleShardTransaction(
OpId(msg->id().term(), msg->id().index()), tablet_peer, resp, msg->hybrid_time());
}
Expand Down Expand Up @@ -2162,7 +2187,6 @@ Status GetChangesForCDCSDK(
// previously declared 'checkpoint' or the 'from_op_id'.
bool checkpoint_updated = false;
bool report_tablet_split = false;
OpId split_op_id = OpId::Invalid();
bool snapshot_operation = false;
bool pending_intents = false;
int wal_segment_index = GetWalSegmentIndex(wal_segment_index_req);
Expand Down Expand Up @@ -2278,6 +2302,16 @@ Status GetChangesForCDCSDK(
bool saw_non_actionable_message = false;
std::unordered_set<std::string> streamed_txns;

if (tablet_ptr->metadata()->tablet_data_state() == tablet::TABLET_DATA_SPLIT_COMPLETED) {
// This indicates that the tablet being polled has been split and in this case we should
// tell the client immediately about the split.
LOG(INFO) << "Tablet split detected for tablet " << tablet_id
<< ", moving to children tablets immediately";

return STATUS_FORMAT(
TabletSplit, "Tablet split detected on $0", tablet_id);
}

// It's possible that a batch of messages in read_ops after fetching from
// 'ReadReplicatedMessagesForCDC' , will not have any actionable messages. In which case we
// keep retrying by fetching the next batch, until either we get an actionable message or reach
Expand Down Expand Up @@ -2543,7 +2577,8 @@ Status GetChangesForCDCSDK(
saw_split_op = true;

// We first verify if a split has indeed occured succesfully by checking if there are
// two children tablets for the tablet.
// two children tablets for the tablet. This check also verifies if the SPLIT_OP
// belongs to the current tablet
if (!(VerifyTabletSplitOnParentTablet(table_id, tablet_id, client))) {
// We could verify the tablet split succeeded. This is possible when the child tablets
// of a split are not running yet.
Expand All @@ -2566,8 +2601,7 @@ Status GetChangesForCDCSDK(
} else {
// If 'GetChangesForCDCSDK' was called with the OpId just before the SplitOp's
// record, and if there is no more data to stream and we can notify the client
// about the split and update the checkpoint. At this point, we will store the
// split_op_id.
// about the split and update the checkpoint.
LOG(INFO) << "Found SPLIT_OP record with OpId: " << op_id
<< ", for parent tablet: " << tablet_id
<< ", and if we did not see any other records we will report the tablet "
Expand All @@ -2578,7 +2612,7 @@ Status GetChangesForCDCSDK(
&next_checkpoint_index, all_checkpoints, &checkpoint, last_streamed_op_id,
&safe_hybrid_time_resp, &wal_segment_index);
checkpoint_updated = true;
split_op_id = op_id;
report_tablet_split = true;
}
}
} break;
Expand Down Expand Up @@ -2622,12 +2656,14 @@ Status GetChangesForCDCSDK(
}
}

// If the split_op_id is equal to the checkpoint i.e the OpId of the last actionable message, we
// know that after the split there are no more actionable messages, and this confirms that the
// SPLIT OP was succesfull.
if (!snapshot_operation && split_op_id.term == checkpoint.term() &&
split_op_id.index == checkpoint.index()) {
report_tablet_split = true;
// If the GetChanges call is not for snapshot and then we know that a split has indeed been
// successful then we should report the split to the client.
if (!snapshot_operation && report_tablet_split) {
LOG(INFO) << "Tablet split detected for tablet " << tablet_id
<< ", moving to children tablets immediately";
return STATUS_FORMAT(
TabletSplit, "Tablet split detected on $0", tablet_id
);
}

if (consumption) {
Expand Down Expand Up @@ -2657,11 +2693,6 @@ Status GetChangesForCDCSDK(
last_streamed_op_id->ToPB(resp->mutable_checkpoint()->mutable_op_id());
}

if (report_tablet_split) {
return STATUS_FORMAT(
TabletSplit, "Tablet Split on tablet: $0, no more records to stream", tablet_id);
}

// We do not populate SAFEPOINT records in two scenarios:
// 1. When we are streaming batches of a large transaction
// 2. When we are streaming snapshot records
Expand Down
59 changes: 11 additions & 48 deletions src/yb/integration-tests/cdcsdk_consistent_stream-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
// or implied. See the License for the specific language governing permissions and limitations
// under the License.

#include "yb/cdc/cdc_service.pb.h"
#include "yb/integration-tests/cdcsdk_ysql_test_base.h"
#include "yb/util/test_macros.h"

Expand Down Expand Up @@ -618,57 +619,19 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestCDCSDKConsistentStreamWithTab
ASSERT_OK(test_client()->GetTablets(table, 0, &tablets_after_first_split, nullptr));
ASSERT_EQ(tablets_after_first_split.size(), 2);

// The count array stores counts of DDL, INSERT, UPDATE, DELETE, READ, TRUNCATE, BEGIN, COMMIT in
// that order.
const int expected_count_1[] = {
1,
2 * num_batches * inserts_per_batch,
0,
0,
0,
0,
2 * num_batches + num_batches * inserts_per_batch,
2 * num_batches + num_batches * inserts_per_batch,
};
const int expected_count_2[] = {3, 4 * num_batches * inserts_per_batch, 0, 0, 0, 0};
int count[] = {0, 0, 0, 0, 0, 0, 0, 0};
const int64 expected_total_records = 4 * num_batches * inserts_per_batch;

auto parent_get_changes = GetAllPendingChangesFromCdc(stream_id, tablets);
for (size_t i = 0; i < parent_get_changes.records.size(); i++) {
auto record = parent_get_changes.records[i];
UpdateRecordCount(record, count);
}
std::map<TabletId, CDCSDKCheckpointPB> tablet_to_checkpoint;
std::map<TabletId, std::vector<CDCSDKProtoRecordPB>> records_by_tablet;
int64 total_received_records = ASSERT_RESULT(GetChangeRecordCount(
stream_id, table, tablets, tablet_to_checkpoint, expected_total_records,
false /* explicit_checkpointing */, records_by_tablet));

CheckRecordsConsistency(parent_get_changes.records);
for (int i = 0; i < 8; i++) {
ASSERT_EQ(expected_count_1[i], count[i]);
}

// Wait until the 'cdc_parent_tablet_deletion_task_' has run.
SleepFor(MonoDelta::FromSeconds(2));

auto get_tablets_resp =
ASSERT_RESULT(GetTabletListToPollForCDC(stream_id, table_id, tablets[0].tablet_id()));
for (const auto& tablet_checkpoint_pair : get_tablets_resp.tablet_checkpoint_pairs()) {
auto new_tablet = tablet_checkpoint_pair.tablet_locations();
auto new_checkpoint = tablet_checkpoint_pair.cdc_sdk_checkpoint();

google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets;
auto tablet_ptr = tablets.Add();
tablet_ptr->CopyFrom(new_tablet);

auto child_get_changes = GetAllPendingChangesFromCdc(stream_id, tablets, &new_checkpoint);
vector<CDCSDKProtoRecordPB> child_plus_parent = parent_get_changes.records;
for (size_t i = 0; i < child_get_changes.records.size(); i++) {
auto record = child_get_changes.records[i];
child_plus_parent.push_back(record);
UpdateRecordCount(record, count);
}
CheckRecordsConsistency(child_plus_parent);
}
ASSERT_EQ(expected_total_records, total_received_records);

for (int i = 0; i < 6; i++) {
ASSERT_EQ(expected_count_2[i], count[i]);
for (auto iter = records_by_tablet.begin(); iter != records_by_tablet.end(); ++iter) {
LOG(INFO) << "Checking records consistency for tablet " << iter->first;
CheckRecordsConsistency(iter->second);
}
}

Expand Down
Loading

0 comments on commit f3c7647

Please sign in to comment.