Skip to content

Commit

Permalink
[BACKPORT 2.12] [CDCSDK] [#9019] CDC SDK Client API and Java Console …
Browse files Browse the repository at this point in the history
…Subscriber

Summary:
Original commits:
- d294abf / D13836
- 6b15b16 / D15860
- cf5fead / D16057
- 5408e30 / D15989
- 53afee99e7cfe912fa7ec579ce470e999701a074 / D16176

Github Master Ticket: #9019

Design DocumentL https://docs.google.com/document/d/1_xZqU5UgzCu1W--kci3ajU7_iYXXHMQvudmybDI-Xsk/edit
Functional Spec: https://docs.google.com/document/u/2/d/1nHuzHQ-qYVPbKi2dqo_drzSXMq00h7w5oi0JDf0GD1U/edit#heading=h.jmqfs7jgvvg8

This is the client-side change that exposes some APIs to be consumed by CDC consumers. Currently, these APIs are not public and are to be consumed by our Debezium connector. For testing purposes, we have written a console subscriber for testing purposes.

[CDCSDK][#11679] Add missing license headers to Java files

The following files have missing license headers:
1. `CreateCDCStreamRequest.java`
2. `GetCheckpointRequest.java`
3. `GetCheckpointResponse.java`
4. `GetDBStreamInfoRequest.java`
5. `GetDBStreamInfoResponse.java`
6. `SetCheckpointRequest.java`
7. `SetCheckpointResponse.java`

This diff adds these missing headers.

[#11779][CDCSDK] Add option to send a DDL record based on a flag value in GetChangesRequest

Before this, the issue was that if for a stream ID, some data was consumed and a client comes up with the same stream ID and requests for changes, it will only receive the changes.

Now the issue with this was with `Debezium` that when the connector was restarted, it directly received the changes without any DDL record, this DDL record was essential for Debezium since it was used to process the schema info for the columns in Debezium and in case it was not there, it lead to a `NullPointerException` on the client side, thus causing a connector crash effectively.

[#11729][DocDB][xCluster] Fix for replication not working if user upgrades to a branch with CDCSDK code changes

With the changes for CDCSDK, we have separate `source_type` values i.e. `XCLUSTER` for xCluster replication and `CDCSDK` for the new changes. Similarly there is another option i.e. `checkpoint_type` which can have `IMPLICIT` and `EXPLICIT` values.

If a stream for replication has been created before upgrading, it was unable to continue replication after upgrading to the latest version since the `source_type` and `checkpoint_type` options were missing from it as it has only been introduced with the CDCSDK changes only.

Test Fixes for 2.12

Test Plan:
Jenkins: skip
Unit tests in java for APIs and CDC behavior.
We have done some long-running testing with applications.
We have also run the YB-sample apps and enabled CDC on the table. Verified that all the events are received.

Tested the complete CDC with Debezium pipeline with the specified change.

Command to run test:
`ybd --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestNeedSchemaInfoFlag`

* Manually tested with a custom build on dev portal

Reviewers: nicolas, bogdan, ybase, rahuldesirazu, sdash, iamoncar, zyu, jhe, mkantimath, sergei

Reviewed By: sergei

Differential Revision: https://phabricator.dev.yugabyte.com/D16251
  • Loading branch information
Isha Amoncar authored and suranjan committed Mar 29, 2022
1 parent b952e40 commit c8c3516
Show file tree
Hide file tree
Showing 85 changed files with 8,713 additions and 451 deletions.
25 changes: 23 additions & 2 deletions ent/src/yb/cdc/cdc_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -256,11 +256,15 @@ class CDCServiceImpl::Impl {
it->last_streamed_op_id = op_id;
}

std::shared_ptr<Schema> GetOrAddSchema(const ProducerTabletInfo& producer_tablet) {
std::shared_ptr<Schema> GetOrAddSchema(const ProducerTabletInfo &producer_tablet,
const bool need_schema_info) {
std::lock_guard<decltype(mutex_)> l(mutex_);
auto it = cdc_state_metadata_.find(producer_tablet);

if (it != cdc_state_metadata_.end()) {
if (need_schema_info) {
it->current_schema = std::make_shared<Schema>();
}
return it->current_schema;
}
CDCStateMetadataInfo info = CDCStateMetadataInfo {
Expand Down Expand Up @@ -645,6 +649,20 @@ CHECKED_STATUS VerifyArg(const SetCDCCheckpointRequestPB& req) {
return Status::OK();
}

// This function is to handle the upgrade scenario where the DB is upgraded from a version
// without CDCSDK changes to the one with it. So in case, some required options are missing,
// the default values will be added for the same.
void AddDefaultOptionsIfMissing(std::unordered_map<std::string, std::string>* options) {
if ((*options).find(cdc::kSourceType) == (*options).end()) {
(*options).emplace(cdc::kSourceType, CDCRequestSource_Name(cdc::CDCRequestSource::XCLUSTER));
}

if ((*options).find(cdc::kCheckpointType) == (*options).end()) {
(*options).emplace(cdc::kCheckpointType,
CDCCheckpointType_Name(cdc::CDCCheckpointType::IMPLICIT));
}
}

} // namespace

template <class ReqType, class RespType>
Expand Down Expand Up @@ -1101,7 +1119,7 @@ void CDCServiceImpl::GetChanges(const GetChangesRequestPB* req,
std::string commit_timestamp;
OpId last_streamed_op_id;

auto cached_schema = impl_->GetOrAddSchema(producer_tablet);
auto cached_schema = impl_->GetOrAddSchema(producer_tablet, req->need_schema_info());
s = cdc::GetChangesForCDCSDK(
req->stream_id(), req->tablet_id(), cdc_sdk_op_id, record, tablet_peer, mem_tracker,
&msgs_holder, resp, &commit_timestamp, &cached_schema,
Expand Down Expand Up @@ -2108,6 +2126,9 @@ Result<std::shared_ptr<StreamMetadata>> CDCServiceImpl::GetStream(const std::str
RETURN_NOT_OK(client()->GetCDCStream(stream_id, &ns_id, &object_ids, &options));

auto stream_metadata = std::make_shared<StreamMetadata>();

AddDefaultOptionsIfMissing(&options);

for (const auto& option : options) {
if (option.first == kRecordType) {
SCHECK(CDCRecordType_Parse(option.second, &stream_metadata->record_type),
Expand Down
103 changes: 102 additions & 1 deletion ent/src/yb/integration-tests/cdcsdk_ysql-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,15 @@ class CDCSDKYsqlTest : public CDCSDKTestBase {
return resp.cluster_config().cluster_uuid();
}

// the range is exclusive of end i.e. [start, end)
Status DropDB(Cluster* cluster) {
const std::string db_name = "testdatabase";
RETURN_NOT_OK(CreateDatabase(&test_cluster_, db_name, true));
auto conn = VERIFY_RESULT(cluster->ConnectToDB(db_name));
RETURN_NOT_OK(conn.ExecuteFormat("DROP DATABASE $0", kNamespaceName));
return Status::OK();
}

// The range is exclusive of end i.e. [start, end)
void WriteRows(uint32_t start, uint32_t end, Cluster* cluster) {
auto conn = EXPECT_RESULT(cluster->ConnectToDB(kNamespaceName));
LOG(INFO) << "Writing " << end - start << " row(s)";
Expand Down Expand Up @@ -159,6 +167,18 @@ class CDCSDKYsqlTest : public CDCSDKTestBase {
change_req->mutable_from_cdc_sdk_checkpoint()->set_write_id(0);
}

void PrepareChangeRequest(
GetChangesRequestPB* change_req, const CDCStreamId& stream_id,
const google::protobuf::RepeatedPtrField<master::TabletLocationsPB>& tablets,
const CDCSDKCheckpointPB& cp) {
change_req->set_stream_id(stream_id);
change_req->set_tablet_id(tablets.Get(0).tablet_id());
change_req->mutable_from_cdc_sdk_checkpoint()->set_index(cp.index());
change_req->mutable_from_cdc_sdk_checkpoint()->set_term(cp.term());
change_req->mutable_from_cdc_sdk_checkpoint()->set_key(cp.key());
change_req->mutable_from_cdc_sdk_checkpoint()->set_write_id(cp.write_id());
}

void PrepareSetCheckpointRequest(
SetCDCCheckpointRequestPB* set_checkpoint_req,
const CDCStreamId stream_id,
Expand Down Expand Up @@ -253,6 +273,44 @@ class CDCSDKYsqlTest : public CDCSDKTestBase {
LOG(INFO) << "Got " << ins_count << " insert records";
ASSERT_EQ(expected_records_size, ins_count);
}

Result<GetChangesResponsePB> VerifyIfDDLRecordPresent(
const CDCStreamId& stream_id,
const google::protobuf::RepeatedPtrField<master::TabletLocationsPB>& tablets,
bool expect_ddl_record, bool is_first_call, const CDCSDKCheckpointPB* cp = nullptr) {
GetChangesRequestPB req;
GetChangesResponsePB resp;

if (cp == nullptr) {
PrepareChangeRequest(&req, stream_id, tablets);
} else {
PrepareChangeRequest(&req, stream_id, tablets, *cp);
}

// The default value for need_schema_info is false.
if (expect_ddl_record) {
req.set_need_schema_info(true);
}

RpcController get_changes_rpc;
RETURN_NOT_OK(cdc_proxy_->GetChanges(req, &resp, &get_changes_rpc));

if (resp.has_error()) {
return StatusFromPB(resp.error().status());
}

auto record = resp.cdc_sdk_proto_records(0);

// If it's the first call to GetChanges, we will get a DDL record irrespective of the
// value of need_schema_info.
if (is_first_call || expect_ddl_record) {
EXPECT_EQ(record.row_message().op(), RowMessage::DDL);
} else {
EXPECT_NE(record.row_message().op(), RowMessage::DDL);
}

return resp;
}
};

TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestBaseFunctions)) {
Expand Down Expand Up @@ -328,6 +386,49 @@ TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(MultiRowInsertion)) {
LOG(INFO) << "Got " << ins_count << " insert records";
ASSERT_EQ(expected_records_size, ins_count);
}

TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(DropDatabase)) {
ASSERT_OK(SetUpWithParams(3, 1, false));
CDCStreamId stream_id = ASSERT_RESULT(CreateDBStream());
ASSERT_OK(DropDB(&test_cluster_));
}

TEST_F(CDCSDKYsqlTest, YB_DISABLE_TEST_IN_TSAN(TestNeedSchemaInfoFlag)) {
ASSERT_OK(SetUpWithParams(1, 1, false));

auto table = ASSERT_RESULT(CreateTable(&test_cluster_, kNamespaceName, kTableName));

google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets;
ASSERT_OK(test_client()->GetTablets(
table, 0, &tablets, /* partition_list_version = */ nullptr));

std::string table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName));
CDCStreamId stream_id = ASSERT_RESULT(CreateDBStream());

ASSERT_OK(SetInitialCheckpoint(stream_id, tablets));

// This will write one row with PK = 0.
WriteRows(0 /* start */, 1 /* end */, &test_cluster_);

// This is the first call to GetChanges, we will get a DDL record.
GetChangesResponsePB resp = ASSERT_RESULT(VerifyIfDDLRecordPresent(stream_id, tablets, false,
true));

// Write another row to the database with PK = 1.
WriteRows(1 /* start */, 2 /* end */, &test_cluster_);

// We will not get any DDL record here since this is not the first call and the flag
// need_schema_info is also unset.
resp = ASSERT_RESULT(VerifyIfDDLRecordPresent(stream_id, tablets, false, false,
&resp.cdc_sdk_checkpoint()));

// Write another row to the database with PK = 2.
WriteRows(2 /* start */, 3 /* end */, &test_cluster_);

// We will get a DDL record since we have enabled the need_schema_info flag.
resp = ASSERT_RESULT(VerifyIfDDLRecordPresent(stream_id, tablets, true, false,
&resp.cdc_sdk_checkpoint()));
}
} // namespace enterprise
} // namespace cdc
} // namespace yb
38 changes: 36 additions & 2 deletions ent/src/yb/master/catalog_manager_ent.cc
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,34 @@ class CDCStreamLoader : public Visitor<PersistentCDCStreamInfo> {
public:
explicit CDCStreamLoader(CatalogManager* catalog_manager) : catalog_manager_(catalog_manager) {}

void AddDefaultValuesIfMissing(const SysCDCStreamEntryPB& metadata,
CDCStreamInfo::WriteLock* l) {
bool source_type_present = false;
bool checkpoint_type_present = false;

// Iterate over all the options to check if checkpoint_type and source_type are present.
for (auto option : metadata.options()) {
if (option.key() == cdc::kSourceType) {
source_type_present = true;
}
if (option.key() == cdc::kCheckpointType) {
checkpoint_type_present = true;
}
}

if (!source_type_present) {
auto source_type_opt = l->mutable_data()->pb.add_options();
source_type_opt->set_key(cdc::kSourceType);
source_type_opt->set_value(cdc::CDCRequestSource_Name(cdc::XCLUSTER));
}

if (!checkpoint_type_present) {
auto checkpoint_type_opt = l->mutable_data()->pb.add_options();
checkpoint_type_opt->set_key(cdc::kCheckpointType);
checkpoint_type_opt->set_value(cdc::CDCCheckpointType_Name(cdc::IMPLICIT));
}
}

Status Visit(const CDCStreamId& stream_id, const SysCDCStreamEntryPB& metadata)
REQUIRES(catalog_manager_->mutex_) {
DCHECK(!ContainsKey(catalog_manager_->cdc_stream_map_, stream_id))
Expand Down Expand Up @@ -244,6 +272,10 @@ class CDCStreamLoader : public Visitor<PersistentCDCStreamInfo> {
auto l = stream->LockForWrite();
l.mutable_data()->pb.CopyFrom(metadata);

// If no source_type and checkpoint_type is present, that means the stream was created in
// a previous version where these options were not present.
AddDefaultValuesIfMissing(metadata, &l);

// If the table has been deleted, then mark this stream as DELETING so it can be deleted by the
// catalog manager background thread. Otherwise if this stream is missing an entry
// for state, then mark its state as Active.
Expand Down Expand Up @@ -2505,7 +2537,8 @@ std::vector<scoped_refptr<CDCStreamInfo>> CatalogManager::FindCDCStreamsForTable
for (const auto& entry : cdc_stream_map_) {
auto ltm = entry.second->LockForRead();
// for xCluster the first entry will be the table_id
if (ltm->table_id().Get(0) == table_id && !ltm->started_deleting()) {
if (!ltm->table_id().empty() && ltm->table_id().Get(0) == table_id &&
!ltm->started_deleting()) {
streams.push_back(entry.second);
}
}
Expand Down Expand Up @@ -2983,7 +3016,8 @@ Status CatalogManager::ListCDCStreams(const ListCDCStreamsRequestPB* req,
continue;
}

if (filter_table && table->id() != entry.second->table_id().Get(0)) {
if (filter_table && !entry.second->table_id().empty() &&
table->id() != entry.second->table_id().Get(0)) {
continue; // Skip deleting/deleted streams and streams from other tables.
}

Expand Down
46 changes: 43 additions & 3 deletions java/yb-cdc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,17 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.1</version>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.10.2</version>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.5</version>
</dependency>
<dependency>
<groupId>org.yb</groupId>
Expand Down Expand Up @@ -69,10 +70,12 @@
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.2</version>
</dependency>
<dependency>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
<version>1.10</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
Expand All @@ -83,6 +86,33 @@
<artifactId>gson</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.2.23</version>
</dependency>
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
<version>3.4.5</version>
</dependency>
<!-- dependency for YCQL driver -->
<dependency>
<groupId>com.yugabyte</groupId>
<artifactId>java-driver-core</artifactId>
<version>4.6.0-yb-6</version>
</dependency>
<!-- dependency for smart jdbc driver yugabyte -->
<dependency>
<groupId>com.yugabyte</groupId>
<artifactId>jdbc-yugabytedb</artifactId>
<version>42.3.0-beta.1</version>
</dependency>
<dependency>
<groupId>${junit.groupId}</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -114,6 +144,7 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<id>copy-dependencies</id>
Expand All @@ -134,13 +165,14 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<finalName>yb-cdc-connector</finalName>
<appendAssemblyId>false</appendAssemblyId>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<mainClass>org.yb.cdc.Main</mainClass>
<mainClass>org.yb.cdc.CDCConsoleSubscriber</mainClass>
</manifest>
</archive>
<descriptorRefs>
Expand All @@ -166,6 +198,14 @@
<preparationGoals>clean verify</preparationGoals>
</configuration>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.0.2</version>
</plugin>
</plugins>
</build>
</project>
Loading

0 comments on commit c8c3516

Please sign in to comment.