diff --git a/media/integrate/add-snowflake-sink-connector.png b/media/integrate/add-snowflake-sink-connector.png new file mode 100644 index 0000000000000..d97029c991b7f Binary files /dev/null and b/media/integrate/add-snowflake-sink-connector.png differ diff --git a/media/integrate/authentication.png b/media/integrate/authentication.png new file mode 100644 index 0000000000000..2c3c04ef5bd13 Binary files /dev/null and b/media/integrate/authentication.png differ diff --git a/media/integrate/configuration.png b/media/integrate/configuration.png new file mode 100644 index 0000000000000..81978291821a8 Binary files /dev/null and b/media/integrate/configuration.png differ diff --git a/media/integrate/confluent-topics.PNG b/media/integrate/confluent-topics.PNG new file mode 100644 index 0000000000000..47dd95904df87 Binary files /dev/null and b/media/integrate/confluent-topics.PNG differ diff --git a/media/integrate/credentials.png b/media/integrate/credentials.png new file mode 100644 index 0000000000000..526813d6e6148 Binary files /dev/null and b/media/integrate/credentials.png differ diff --git a/media/integrate/data-preview.png b/media/integrate/data-preview.png new file mode 100644 index 0000000000000..1defe43ddb707 Binary files /dev/null and b/media/integrate/data-preview.png differ diff --git a/media/integrate/results.png b/media/integrate/results.png new file mode 100644 index 0000000000000..a96387be350e9 Binary files /dev/null and b/media/integrate/results.png differ diff --git a/media/integrate/select-from-orders.png b/media/integrate/select-from-orders.png new file mode 100644 index 0000000000000..55e899ab531e3 Binary files /dev/null and b/media/integrate/select-from-orders.png differ diff --git a/media/integrate/topic-selection.png b/media/integrate/topic-selection.png new file mode 100644 index 0000000000000..1cbc1a2ef2e1b Binary files /dev/null and b/media/integrate/topic-selection.png differ diff --git a/ticdc/integrate-confluent-using-ticdc.md b/ticdc/integrate-confluent-using-ticdc.md index 9496e171ba3ad..cb92d92d1cdcd 100644 --- a/ticdc/integrate-confluent-using-ticdc.md +++ b/ticdc/integrate-confluent-using-ticdc.md @@ -1,130 +1,268 @@ --- -title: Quick Start Guide on Integrating TiDB with Confluent Platform -summary: Learn how to stream TiDB data to the Confluent Platform using TiCDC. +title: Integrate Data with Confluent Cloud +summary: Learn how to stream TiDB data to the Confluent Cloud using TiCDC, and how to replicate incremental data to ksqlDB, Snowflake, and SQL Server. --- -# Quick Start Guide on Integrating TiDB with Confluent Platform +# Integrate Data with Confluent Cloud -This document introduces how to integrate TiDB to Confluent Platform using [TiCDC](/ticdc/ticdc-overview.md). +Confluent is an Apache Kafka-compatible streaming data platform that provides strong data integration capabilities. On this platform, you can access, store, and manage non-stop real-time streaming data. -> **Warning:** -> -> This is still an experimental feature. Do **NOT** use it in a production environment. +Starting from TiDB v6.1.0, TiCDC supports replicating incremental data to Confluent in Avro format. This document introduces how to replicate TiDB incremental data to Confluent using [TiCDC](/ticdc/ticdc-overview.md), and further replicate data to ksqlDB, Snowflake, and SQL Server via Confluent Cloud. The organization of this document is as follows: -[Confluent Platform](https://docs.confluent.io/current/platform.html) is a data streaming platform with Apache Kafka at its core. With many official and third-party sink connectors, Confluent Platform enables you to easily connect stream sources to relational or non-relational databases. +1. Quickly deploy a TiDB cluster with TiCDC included. +2. Create a changefeed that replicates data from TiDB to Confluent Cloud. +3. Create Connectors that replicate data from Confluent Cloud to ksqlDB, Snowflake, and SQL Server. +4. Write data to TiDB using go-tpc, and observe data changes in ksqlDB, Snowflake, and SQL Server. -To integrate TiDB with Confluent Platform, you can use the TiCDC component with the Avro protocol. TiCDC can stream data changes to Kafka in the format that Confluent Platform recognizes. For the detailed integration guide, see the following sections: +The preceding steps are performed in a lab environment. You can also deploy a cluster in a production environment by referring to these steps. -## Prerequisites +## Replicate incremental data to Confluent Cloud -> **Note:** -> -> In this tutorial, the [JDBC sink connector](https://docs.confluent.io/current/connect/kafka-connect-jdbc/sink-connector/index.html#load-the-jdbc-sink-connector) is used to replicate TiDB data to a downstream relational database. To make it simple, **SQLite** is used here as an example. +### Step 1. Set up the environment -+ Make sure that Zookeeper, Kafka, and Schema Registry are properly installed. It is recommended that you follow the [Confluent Platform Quick Start Guide](https://docs.confluent.io/current/quickstart/ce-quickstart.html#ce-quickstart) to deploy a local test environment. +1. Deploy a TiDB cluster with TiCDC included. -+ Make sure that JDBC sink connector is installed by running the following command. The result should contain `jdbc-sink`. + In a lab or testing environment, you can deploy a TiDB cluster with TiCDC quickly by using TiUP Playground. - {{< copyable "shell-regular" >}} + ```shell + tiup playground --host 0.0.0.0 --db 1 --pd 1 --kv 1 --tiflash 0 --ticdc 1 + # View cluster status + tiup status + ``` + + If TiUP is not installed yet, refer to [Install TiUP](/tiup/tiup-overview.md#install-tiup). In a production environment, you can deploy a TiCDC as instructed in [Deploy TiCDC](/ticdc/deploy-ticdc.md). + +2. Register Confluent Cloud and create a Confluent cluster. + + Create a Basic cluster and make it accessible via Internet. For details, see [Quick Start for Confluent Cloud](https://docs.confluent.io/cloud/current/get-started/index.html). + +### Step 2. Create an access key pair + +1. Create a cluster API key. + + Sign in to [Confluent Cloud](https://confluent.cloud). Choose **Data integration** > **API keys** > **Create key**. On the **Select scope for API key** page that is displayed, select **Global access**. + + After creation, a key pair file is generated, as shown below. + + ``` + === Confluent Cloud API key: xxx-xxxxx === + + API key: + L5WWA4GK4NAT2EQV + + API secret: + xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + + Bootstrap server: + xxx-xxxxx.ap-east-1.aws.confluent.cloud:9092 + ``` + +2. Record the Schema Registry Endpoints. + + In the Confluent Cloud Console, choose **Schema Registry** > **API endpoint**. Record the Schema Registry Endpoints. The following is an example: + + ``` + https://yyy-yyyyy.us-east-2.aws.confluent.cloud + ``` + +3. Create a Schema Registry API key. + + In the Confluent Cloud Console, choose **Schema Registry** > **API credentials**. Click **Edit** and then **Create key**. + + After creation, a key pair file is generated, as shown below: + + ``` + === Confluent Cloud API key: yyy-yyyyy === + API key: + 7NBH2CAFM2LMGTH7 + API secret: + xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx + ``` + + You can also perform this step by using Confluent CLI. For details, see [Connect Confluent CLI to Confluent Cloud Cluster](https://docs.confluent.io/confluent-cli/current/connect.html). + +### Step 3. Create a Kafka changefeed + +1. Create a changefeed configuration file. + + As required by Avro and Confluent Connector, incremental data of each table must be sent to an independent topic, and a partition must be dispatched for each event based on the primary key value. Therefore, you need to create a changefeed configuration file `changefeed.conf` with the following contents: + + ``` + [sink] + dispatchers = [ + {matcher = ['*.*'], topic = "tidb_{schema}_{table}", partition="index-value"}, + ] + ``` + + For detailed description of `dispatchers` in the configuration file, see [Customize the rules for Topic and Partition dispatchers of Kafka Sink](/ticdc/manage-ticdc.md#customize-the-rules-for-topic-and-partition-dispatchers-of-kafka-sink). + +2. Create a changefeed to replicate incremental data to Confluent Cloud: ```shell - confluent local services connect connector list + tiup ctl:v6.1.0 cdc changefeed create --pd="http://127.0.0.1:2379" --sink-uri="kafka:///ticdc-meta?protocol=avro&replication-factor=3&enable-tls=true&auto-create-topic=true&sasl-mechanism=plain&sasl-user=&sasl-password=" --schema-registry="https://:@" --changefeed-id="confluent-changefeed" --config changefeed.conf ``` -## Integration procedures - -1. Save the following configuration into `jdbc-sink-connector.json`: - - {{< copyable "" >}} - - ```json - { - "name": "jdbc-sink-connector", - "config": { - "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", - "tasks.max": "1", - "topics": "testdb_test", - "connection.url": "sqlite:test.db", - "connection.ds.pool.size": 5, - "table.name.format": "test", - "auto.create": true, - "auto.evolve": true - } - } + You need to replace the values of the following fields with those created or recorded in [Step 2. Create an access key pair](#step-2-create-an-access-key-pair): + + - `` + - `` + - `` + - `` + - `` + - `` + + Note that you should encode `` based on [HTML URL Encoding Reference](https://www.w3schools.com/tags/ref_urlencode.asp) before replacing its value. After you replace all the preceding fields, the configuration file is as follows: + + ```shell + tiup ctl:v6.1.0 cdc changefeed create --pd="http://127.0.0.1:2379" --sink-uri="kafka://xxx-xxxxx.ap-east-1.aws.confluent.cloud:9092/ticdc-meta?protocol=avro&replication-factor=3&enable-tls=true&auto-create-topic=true&sasl-mechanism=plain&sasl-user=L5WWA4GK4NAT2EQV&sasl-password=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" --schema-registry="https://7NBH2CAFM2LMGTH7:xxxxxxxxxxxxxxxxxx@yyy-yyyyy.us-east-2.aws.confluent.cloud" --changefeed-id="confluent-changefeed" --config changefeed.conf ``` -2. Create an instance of the JDBC sink connector by running the following command (assuming Kafka is listening on `127.0.0.1:8083`): + - Run the command to create a changefeed. + + - If the changefeed is successfully created, changefeed information, such as changefeed ID, is displayed, as shown below: + + ```shell + Create changefeed successfully! + ID: confluent-changefeed + Info: {... changfeed info json struct ...} + ``` - {{< copyable "shell-regular" >}} + - If no result is returned after you run the command, check the network connectivity between the server where you run the command and Confluent Cloud. For details, see [Test connectivity to Confluent Cloud](https://docs.confluent.io/cloud/current/networking/testing.html). + +3. After creating the changefeed, run the following command to check the changefeed status: ```shell - curl -X POST -H "Content-Type: application/json" -d @jdbc-sink-connector.json http://127.0.0.1:8083/connectors + tiup ctl:v6.1.0 cdc changefeed list --pd="http://127.0.0.1:2379" ``` -3. Deploy TiCDC in one of the following ways. If TiCDC is already deployed, you can skip this step. + You can refer to [Manage TiCDC Cluster and Replication Tasks](/ticdc/manage-ticdc.md) to manage the changefeed. - - [Deploy a new TiDB cluster that includes TiCDC using TiUP](/ticdc/deploy-ticdc.md#deploy-a-new-tidb-cluster-that-includes-ticdc-using-tiup) - - [Add TiCDC to an existing TiDB cluster using TiUP](/ticdc/deploy-ticdc.md#add-ticdc-to-an-existing-tidb-cluster-using-tiup) - - [Add TiCDC to an existing TiDB cluster using binary (not recommended)](/ticdc/deploy-ticdc.md#add-ticdc-to-an-existing-tidb-cluster-using-binary-not-recommended) +### Step 4. Write data to generate change logs - Make sure that your TiDB and TiCDC clusters are healthy before proceeding. +After the preceding steps are done, TiCDC sends change logs of incremental data in the TiDB cluster to Confluent Cloud. This section describes how to write data into TiDB to generate change logs. -4. Create a `changefeed` by running the `cdc cli` command: +1. Simulate service workload. - {{< copyable "shell-regular" >}} + To generate change logs in a lab environment, you can use go-tpc to write data to the TiDB cluster. Specifically, run the following command to create a database `tpcc` in the TiDB cluster. Then, use TiUP bench to write data to this new database. ```shell - ./cdc cli changefeed create --pd="http://127.0.0.1:2379" --sink-uri="kafka://127.0.0.1:9092/testdb_test?protocol=avro" --opts "registry=http://127.0.0.1:8081" + tiup bench tpcc -H 127.0.0.1 -P 4000 -D tpcc --warehouses 4 prepare + tiup bench tpcc -H 127.0.0.1 -P 4000 -D tpcc --warehouses 4 run --time 300s ``` - > **Note:** - > - > Make sure that PD, Kafka, and Schema Registry are running on their respective default ports. + For more details about go-tpc, refer to [How to Run TPC-C Test on TiDB](/benchmark/benchmark-tidb-using-tpcc.md). + +2. Observe data in Confluent Cloud. -## Test data replication + ![Confluent topics](/media/integrate/confluent-topics.PNG) -After TiDB is integrated with Confluent Platform, you can follow the example procedures below to test the data replication. + In the Confluent Cloud Console, click **Topics**. You can see that the target topics have been created and are receiving data. At this time, incremental data of the TiDB database is successfully replicated to Confluent Cloud. -1. Create the `testdb` database in your TiDB cluster: +## Integrate data with ksqlDB - {{< copyable "sql" >}} +ksqlDB is a database purpose-built for stream processing applications. You can create ksqlDB clusters on Confluent Cloud and access incremental data replicated by TiCDC. + +1. Select **ksqlDB** in the Confluent Cloud Console and create a ksqlDB cluster as instructed. + + Wait until the ksqlDB cluster status is **Running**. This process takes several minutes. + +2. In the ksqlDB Editor, run the following command to create a stream to access the `tidb_tpcc_orders` topic: ```sql - CREATE DATABASE IF NOT EXISTS testdb; + CREATE STREAM orders (o_id INTEGER, o_d_id INTEGER, o_w_id INTEGER, o_c_id INTEGER, o_entry_d STRING, o_carrier_id INTEGER, o_ol_cnt INTEGER, o_all_local INTEGER) WITH (kafka_topic='tidb_tpcc_orders', partitions=3, value_format='AVRO'); ``` - Create the `test` table in `testdb`: - - {{< copyable "sql" >}} +3. Run the following command to check the orders STREAM data: ```sql - USE testdb; - CREATE TABLE test ( - id INT PRIMARY KEY, - v TEXT - ); + SELECT * FROM ORDERS EMIT CHANGES; ``` - > **Note:** - > - > If you need to change the database name or the table name, change `topics` in `jdbc-sink-connector.json` accordingly. + ![Select from orders](/media/integrate/select-from-orders.png) -2. Insert data into TiDB: + You can see that the incremental data has been replicated to ksqlDB, as shown in the preceding figure. Data integration with ksqlDB is done. - {{< copyable "sql" >}} +## Integrate data with Snowflake - ```sql - INSERT INTO test (id, v) values (1, 'a'); - INSERT INTO test (id, v) values (2, 'b'); - INSERT INTO test (id, v) values (3, 'c'); - INSERT INTO test (id, v) values (4, 'd'); - ``` +Snowflake is a cloud native data warehouse. With Confluent, you can replicate TiDB incremental data to Snowflake by creating Snowflake Sink Connectors. + +### Prerequisites + +- You have registered and created a Snowflake cluster. See [Getting Started with Snowflake](https://docs.snowflake.com/en/user-guide-getting-started.html). +- Before connecting to the Snowflake cluster, you have generated a private key for it. See [Key Pair Authentication & Key Pair Rotation](https://docs.snowflake.com/en/user-guide/key-pair-auth.html). + +### Integration procedure + +1. Create a database and a schema in Snowflake. + + In the Snowflake control console, choose **Data** > **Database**. Create a database named `TPCC` and a schema named `TiCDC`. + +2. In the Confluent Cloud Console, choose **Data integration** > **Connectors** > **Snowflake Sink**. The page shown below is displayed. + + ![Add snowflake sink connector](/media/integrate/add-snowflake-sink-connector.png) -3. Wait a moment for data to be replicated to the downstream. Then check the downstream for data: +3. Select the topic you want to replicate to Snowflake. Then go to the next page. - {{< copyable "shell-regular" >}} + ![Configuration](/media/integrate/configuration.png) + +4. Specify the authentication information for connecting Snowflake. Fill in **Database name** and **Schema name** with the values you created in the previous step. Then go to the next page. + + ![Configuration](/media/integrate/configuration.png) + +5. On the **Configuration** page, select `AVRO` for both **Input Kafka record value format** and **Input Kafka record key format**. Then click **Continue**. Wait until the connector is created and the status becomes **Running**, which might take several minutes. + + ![Data preview](/media/integrate/data-preview.png) + +6. In the Snowflake console, choose **Data** > **Database** > **TPCC** > **TiCDC**. You can see that TiDB incremental data has been replicated to Snowflake. Data integration with Snowflake is done. + +## Integrate data with SQL Server + +Microsoft SQL Server is a relational database management system (RDBMS) developed by Microsoft. With Confluent, you can replicate TiDB incremental data to SQL Server by creating SQL Server Sink Connectors. + +1. Connect to SQL Server and create a database named `tpcc`. ```shell - sqlite3 test.db - sqlite> SELECT * from test; + [ec2-user@ip-172-1-1-1 bin]$ sqlcmd -S 10.61.43.14,1433 -U admin + Password: + 1> create database tpcc + 2> go + 1> select name from master.dbo.sysdatabases + 2> go + name + ---------------------------------------------------------------------- + master + tempdb + model + msdb + rdsadmin + tpcc + (6 rows affected) ``` + +2. In the Confluent Cloud Console, choose **Data integration** > **Connectors** > **Microsoft SQL Server Sink**. The page shown below is displayed. + + ![Topic selection](/media/integrate/topic-selection.png) + +3. Select the topic you want to replicate to SQL Server. Then go to the next page. + + ![Authentication](/media/integrate/authentication.png) + +4. Fill in the connection and authentication information. Then go to the next page. + +5. On the **Configuration** page, configure the following fields and click **Continue**. + + | Field | Value | + | :- | :- | + | Input Kafka record value format | AVRO | + | Insert mode | UPSERT | + | Auto create table | true | + | Auto add columns | true | + | PK mode | record_key | + | Input Kafka record key format | AVRO | + | Delete on null | true | + +6. After configuration, click **Continue**. Wait until the connector status becomes **Running**, which might take several minutes. + + ![Results](/media/integrate/results.png) + +7. Connect SQL Server and observe the data. You can see that the incremental data has been replicated to SQL Server, as shown in the preceding figure. Data integration with SQL Server is done.