Skip to content

Commit

Permalink
ticdc 2 pulsar : add pulsar config from sink-uri & pulsar errors (#9393)
Browse files Browse the repository at this point in the history
ref #9413
  • Loading branch information
yumchina authored Jul 21, 2023
1 parent 1a67111 commit db5e513
Show file tree
Hide file tree
Showing 8 changed files with 1,014 additions and 3 deletions.
335 changes: 335 additions & 0 deletions docs/design/2023-07-04-ticdc-pulsar-sink.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,335 @@
# TiCDC Design Documents

- Author(s): [yumchina](https://github.com/yumchina)
- Tracking Issue: https://github.com/pingcap/tiflow/issues/9413

## Table of Contents

- [Introduction](#introduction)
- [Motivation or Background](#motivation-or-background)
- [Detailed Design](#detailed-design)
- [Protocol-support](#protocol-support)
- [Row Order and Transactions](#row-order-and-transactions)
- [Pulsar Client](#pulsar-client)
- [Information](#information)
- [Different from Kafka](#different-from-kafka)
- [Pulsar Client Config](#pulsar-client-config)
- [Pulsar Producer](#pulsar-producer)
- [Producer Message](#producer-message)
- [Producer Authentication](#pulsar-authentication)
- [Producer Route Rule](#pulsar-route-rule)
- [Producer Topic Rule](#pulsar-topic-rule)
- [Produce DDL Event](#produce-ddl-event)
- [SyncSendMessage Method](#syncsendmessage-method)
- [SyncBroadcastMessage Method](#syncbroadcastmessage-method)
- [Close Method](#close-method)
- [Produce DML Event](#produce-dml-event)
- [AsyncSendMessage Method](#asyncsendmessage-method)
- [Close Method](#close-method)
- [Pulsar Metrics](#pulsar-metrics)
- [User Interface](#user-interface)
- [Test Design](#test-design)
- [Functional Tests](#functional-tests)
- [Scenario Tests](#scenario-tests)
- [Compatibility Tests](#compatibility-tests)
- [Benchmark Tests](#benchmark-tests)
- [Impacts & Risks](#impacts--risks)
- [Investigation & Alternatives](#investigation--alternatives)
- [Unresolved Questions](#unresolved-questions)

## Introduction


This document provides a complete design on implementing pulsar sink for TiCDC.
The pulsar sink is used to distribute the DML change records, and DDL events generated by TiCDC.


## Motivation or Background

Incorporating Pulsar into Ticdc is for the purpose of expanding the downstream MQ distribution channels.
Users want to output TiDB events to Pulsar, because they can reuse machines from Pulsar with others,
the pulsar easily expanded horizontally etc.


## Detailed Design

#### Protocol-support

In order to maintain the consistency of the middleware of the MQ class,
we give priority support some of the protocols supported by Kafka:

__CanalJSON__

__Canal__

__Maxwell__

CanalJSON protocol sample:
```
for more information, please refer to: https://docs.pingcap.com/tidb/dev/ticdc-canal-json
{
"id": 0,
"database": "test",
"table": "",
"pkNames": null,
"isDdl": true,
"type": "QUERY",
"es": 1639633094670,
"ts": 1639633095489,
"sql": "drop database if exists test",
"sqlType": null,
"mysqlType": null,
"data": null,
"old": null,
"_tidb": { // TiDB extension field
"commitTs": 163963309467037594
}
}
```

#### Row Order and Transactions

- Ensure that each event of commit-ts is incremented and be sent to Pulsar in order .
- Ensure that there are no incomplete inner-table transactions in Pulsar.
- Ensure that every event must be sent to Pulsar at least once.


#### Pulsar Client
##### Information

https://github.com/apache/pulsar-client-go Version: v0.10.0
Requirement Golang 1.18+

##### Different from Kafka
The difference between pulsar and kafka is that the producer in the client of pulsar must be bound to a topic, but kafka does not.

##### Pulsar Client Config
```api
type ClientOptions struct {
// Configure the service URL for the Pulsar service.
// This parameter is required
URL string
// Timeout for the establishment of a TCP connection (default: 5 seconds)
ConnectionTimeout time.Duration
// Set the operation timeout (default: 30 seconds)
// Producer-create, subscribe and unsubscribe operations will be retried until this interval, after which the
// operation will be marked as failed
OperationTimeout time.Duration
// Configure the ping send and check interval, default to 30 seconds.
KeepAliveInterval time.Duration
// Configure the authentication provider. (default: no authentication)
// Example: `Authentication: NewAuthenticationToken("token")`
Authentication
// Add custom labels to all the metrics reported by this client instance
CustomMetricsLabels map[string]string
// Specify metric registerer used to register metrics.
// Default prometheus.DefaultRegisterer
MetricsRegisterer prometheus.Registerer
}
```

**Main Note:**

- URL: like pulsar://127.0.0.1:6650
- Authentication: We only support token/token-from-file/account-with-password.
- MetricsRegisterer: We initialize pulsar MetricsRegisterer with `prometheus.NewRegistry()` from tiflow project `cdc/server/metrics.go`


#### Pulsar Producer
```go
type ProducerOptions struct {
// Topic specifies the topic this producer will be publishing on.
// This argument is required when constructing the producer.
Topic string

// Properties specifies a set of application defined properties for the producer.
// This properties will be visible in the topic stats
Properties map[string]string

//……… others

}

```
- Payload: is carrying real binary data
- Value: Value and payload is mutually exclusive, Value for schema message.
- Key: The optional key associated with the message (particularly useful for things like topic compaction)

**We must cache all producers to the client for different topics
Every changefeed of pulsar client have a producer map. Type as `map[string]pulsar.Producer`, the key is topic name, value is producer of pulsar client.**


##### Producer Message:
```go
type ProducerMessage struct {
// Payload for the message
Payload []byte
// Value and payload is mutually exclusive, `Value interface{}` for schema message.
Value interface{}
// Key sets the key of the message for routing policy
Key string
// OrderingKey sets the ordering key of the message
OrderingKey string

……… others no use
}
```

- Payload: is carrying real binary data
- Value: Value and payload is mutually exclusive, Value for schema message.
- Key: The optional key associated with the message (particularly useful for things like topic compaction)
- OrderingKey: OrderingKey sets the ordering key of the message.Same as Key, so we do not use it.

#### Pulsar Authentication

- Use authentication-token from sink-uri support token to authenticate the pulsar server.
- Use basic-user-name and basic-password from sink-uri authenticate to the pulsar server.
- Use token-from-file from sink-uri support token to authenticate the pulsar server.

#### Pulsar Route Rule

- We support route events to different partitions by changefeed config dispatchers,
refer to `Pulsar Topic Rule`
- You can set the message-key to any characters. We do not set any characters default, the event will be sent to the partition by hash algorithm.

#### Pulsar Topic Rule

```yaml
dispatchers = [
{matcher = ['test1.*', 'test2.*'], topic = "Topic expression 1",partition="table" },
{matcher = ['test6.*'],topic = "Topic expression 2",partition="ts" }
]
The topic expression syntax is legal if it meets the following conditions:
1.{schema} and {table} respectively identify the database name and table name that need to be matched, and are required fields.
Pulsar support "(persistent|non-persistent)://tenant/namespace/topic" as topic name。
2.The tenant, namespace and topic must be separated by 2 slashes, such as: "tenant/namespace/topic".
3. If the topic does not match, it will enter the default topic, which is the topic in the sink-uri
4. "partition" ="xxx" choose [refer to https://docs.pingcap.com/tidb/dev/ticdc-sink-to-kafka#customize-the-rules-for-topic-and-partition-dispatchers-of-kafka-sink]:
default: When multiple unique indexes (including the primary key) exist or the Old Value feature is enabled, events are dispatched in the table mode. When only one unique index (or the primary key) exists, events are dispatched in the index-value mode.
ts: Use the commitTs of the row change to hash and dispatch events.
index-value: Use the value of the primary key or the unique index of the table to hash and dispatch events.
table: Use the schema name of the table and the table name to hash and dispatch events.

```


#### Produce DDL Event

We implement the DDLProducer interface

##### SyncSendMessage Method
It will find a producer by topic name.
Send the event to pulsar.
Report some metrics .
`partitionNum` is not used, because the pulsar server supports set partition num only.
##### SyncBroadcastMessage Method
It do nothing
##### Close Method
Close every producers


##### Produce DML Event
We implement the DMLProducer interface
##### AsyncSendMessage Method
It will find a producer by topic name.
Set a callback function to the pulsar producer client.
Send the event to pulsar.
Report some metrics.
`partitionNum` is not used, because the pulsar server supports set partition num only.
##### Close Method
Close every producers

#### Pulsar Metrics

Pulsar client support metric of `prometheus.Registry`
Following are pulsar client metrics

```
pulsar_client_bytes_published
pulsar_client_bytes_received
pulsar_client_connections_closed
pulsar_client_connections_establishment_errors
pulsar_client_connections_handshake_errors
pulsar_client_connections_opened
pulsar_client_lookup_count
pulsar_client_messages_published
pulsar_client_messages_received
pulsar_client_partitioned_topic_metadata_count
pulsar_client_producer_errors
pulsar_client_producer_latency_seconds_bucket
pulsar_client_producer_latency_seconds_count
pulsar_client_producer_latency_seconds_sum
pulsar_client_producer_pending_bytes
pulsar_client_producer_pending_messages
pulsar_client_producer_rpc_latency_seconds_bucket
pulsar_client_producer_rpc_latency_seconds_count
pulsar_client_producer_rpc_latency_seconds_sum
pulsar_client_producers_closed
pulsar_client_producers_opened
pulsar_client_producers_partitions_active
pulsar_client_producers_reconnect_failure
pulsar_client_producers_reconnect_max_retry
pulsar_client_readers_closed
pulsar_client_readers_opened
pulsar_client_rpc_count
```

#### User Interface
**Sink-URI**

When creating a changefeed, the user can specify the sink-uri like this:
cdc cli changefeed create --sink-uri="${scheme}://${address}/${topic-name}?protocol=${protocol}&pulsar-version=${pulsar-version}&authentication-token=${authentication-token}

Example:
```
cdc cli changefeed create --server=http://127.0.0.1:8300
--sink-uri="pulsar://127.0.0.1:6650/persistent://public/default/test?protocol=canal-json&pulsar-version=v2.10.0&authentication-token=eyJhbGciOiJSUzIxxxxxxxxxxxxxxxxx"
```

## Test Design

Pulsar sink is a new feature, For tests, we focus on the functional tests, scenario tests and benchmark.

### Functional Tests

- Regular unit testing and integration testing cover the correctness of data replication using canal/maxwell/canal-json protocol.

### Scenario Tests

Run stability and chaos tests under different workloads.

- The upstream and downstream data are consistent.
- Throughput and latency are stable for most scenarios.

### Compatibility Tests

#### Compatibility with other features/components

Should be compatible with other features.

#### Upgrade Downgrade Compatibility

Pulsar sink is a new feature, so there should be no upgrade
or downgrade compatibility issues.

### Benchmark Tests

Perform benchmark tests under common scenarios, big data scenarios, multi-table scenarios, and wide table scenarios with different parameters.

## Impacts & Risks

N/A

## Investigation & Alternatives

N/A

## Unresolved Questions

N/A
Loading

0 comments on commit db5e513

Please sign in to comment.