- Introduction
- Motivation or Background
- Detailed Design
- Test Design
- Impacts & Risks
- Investigation & Alternatives
- Unresolved Questions
This document provides a complete design on implementing pulsar sink for TiCDC. The pulsar sink is used to distribute the DML change records, and DDL events generated by TiCDC.
Incorporating Pulsar into Ticdc is for the purpose of expanding the downstream MQ distribution channels. Users want to output TiDB events to Pulsar, because they can reuse machines from Pulsar with others, the pulsar easily expanded horizontally etc.
In order to maintain the consistency of the middleware of the MQ class, we give priority support some of the protocols supported by Kafka:
CanalJSON
Canal
Maxwell
CanalJSON protocol sample:
for more information, please refer to: https://docs.pingcap.com/tidb/dev/ticdc-canal-json
{
"id": 0,
"database": "test",
"table": "",
"pkNames": null,
"isDdl": true,
"type": "QUERY",
"es": 1639633094670,
"ts": 1639633095489,
"sql": "drop database if exists test",
"sqlType": null,
"mysqlType": null,
"data": null,
"old": null,
"_tidb": { // TiDB extension field
"commitTs": 163963309467037594
}
}
- Ensure that each event of commit-ts is incremented and be sent to Pulsar in order .
- Ensure that there are no incomplete inner-table transactions in Pulsar.
- Ensure that every event must be sent to Pulsar at least once.
https://github.com/apache/pulsar-client-go Version: v0.10.0 Requirement Golang 1.18+
The difference between pulsar and kafka is that the producer in the client of pulsar must be bound to a topic, but kafka does not.
type ClientOptions struct {
// Configure the service URL for the Pulsar service.
// This parameter is required
URL string
// Timeout for the establishment of a TCP connection (default: 5 seconds)
ConnectionTimeout time.Duration
// Set the operation timeout (default: 30 seconds)
// Producer-create, subscribe and unsubscribe operations will be retried until this interval, after which the
// operation will be marked as failed
OperationTimeout time.Duration
// Configure the ping send and check interval, default to 30 seconds.
KeepAliveInterval time.Duration
// Configure the authentication provider. (default: no authentication)
// Example: `Authentication: NewAuthenticationToken("token")`
Authentication
// Add custom labels to all the metrics reported by this client instance
CustomMetricsLabels map[string]string
// Specify metric registerer used to register metrics.
// Default prometheus.DefaultRegisterer
MetricsRegisterer prometheus.Registerer
}
Main Note:
- URL: like pulsar://127.0.0.1:6650
- Authentication: We only support token/token-from-file/account-with-password.
- MetricsRegisterer: We initialize pulsar MetricsRegisterer with
prometheus.NewRegistry()
from tiflow projectcdc/server/metrics.go
type ProducerOptions struct {
// Topic specifies the topic this producer will be publishing on.
// This argument is required when constructing the producer.
Topic string
// Properties specifies a set of application defined properties for the producer.
// This properties will be visible in the topic stats
Properties map[string]string
//……… others
}
- Payload: is carrying real binary data
- Value: Value and payload is mutually exclusive, Value for schema message.
- Key: The optional key associated with the message (particularly useful for things like topic compaction)
We must cache all producers to the client for different topics
Every changefeed of pulsar client have a producer map. Type as map[string]pulsar.Producer
, the key is topic name, value is producer of pulsar client.
type ProducerMessage struct {
// Payload for the message
Payload []byte
// Value and payload is mutually exclusive, `Value interface{}` for schema message.
Value interface{}
// Key sets the key of the message for routing policy
Key string
// OrderingKey sets the ordering key of the message
OrderingKey string
……… others no use
}
- Payload: is carrying real binary data
- Value: Value and payload is mutually exclusive, Value for schema message.
- Key: The optional key associated with the message (particularly useful for things like topic compaction)
- OrderingKey: OrderingKey sets the ordering key of the message.Same as Key, so we do not use it.
- Use authentication-token from sink-uri support token to authenticate the pulsar server.
- Use basic-user-name and basic-password from sink-uri authenticate to the pulsar server.
- Use token-from-file from sink-uri support token to authenticate the pulsar server.
- We support route events to different partitions by changefeed config dispatchers,
refer to
Pulsar Topic Rule
- You can set the message-key to any characters. We do not set any characters default, the event will be sent to the partition by hash algorithm.
dispatchers = [
{matcher = ['test1.*', 'test2.*'], topic = "Topic expression 1",partition="table" },
{matcher = ['test6.*'],topic = "Topic expression 2",partition="ts" }
]
The topic expression syntax is legal if it meets the following conditions:
1.{schema} and {table} respectively identify the database name and table name that need to be matched, and are required fields.
Pulsar support "(persistent|non-persistent)://tenant/namespace/topic" as topic name。
2.The tenant, namespace and topic must be separated by 2 slashes, such as: "tenant/namespace/topic".
3. If the topic does not match, it will enter the default topic, which is the topic in the sink-uri
4. "partition" ="xxx" choose [refer to https://docs.pingcap.com/tidb/dev/ticdc-sink-to-kafka#customize-the-rules-for-topic-and-partition-dispatchers-of-kafka-sink]:
default: When multiple unique indexes (including the primary key) exist or the Old Value feature is enabled, events are dispatched in the table mode. When only one unique index (or the primary key) exists, events are dispatched in the index-value mode.
ts: Use the commitTs of the row change to hash and dispatch events.
index-value: Use the value of the primary key or the unique index of the table to hash and dispatch events.
table: Use the schema name of the table and the table name to hash and dispatch events.
We implement the DDLProducer interface
It will find a producer by topic name.
Send the event to pulsar.
Report some metrics .
partitionNum
is not used, because the pulsar server supports set partition num only.
It do nothing
Close every producers
We implement the DMLProducer interface
It will find a producer by topic name.
Set a callback function to the pulsar producer client.
Send the event to pulsar.
Report some metrics.
partitionNum
is not used, because the pulsar server supports set partition num only.
Close every producers
Pulsar client support metric of prometheus.Registry
Following are pulsar client metrics
pulsar_client_bytes_published
pulsar_client_bytes_received
pulsar_client_connections_closed
pulsar_client_connections_establishment_errors
pulsar_client_connections_handshake_errors
pulsar_client_connections_opened
pulsar_client_lookup_count
pulsar_client_messages_published
pulsar_client_messages_received
pulsar_client_partitioned_topic_metadata_count
pulsar_client_producer_errors
pulsar_client_producer_latency_seconds_bucket
pulsar_client_producer_latency_seconds_count
pulsar_client_producer_latency_seconds_sum
pulsar_client_producer_pending_bytes
pulsar_client_producer_pending_messages
pulsar_client_producer_rpc_latency_seconds_bucket
pulsar_client_producer_rpc_latency_seconds_count
pulsar_client_producer_rpc_latency_seconds_sum
pulsar_client_producers_closed
pulsar_client_producers_opened
pulsar_client_producers_partitions_active
pulsar_client_producers_reconnect_failure
pulsar_client_producers_reconnect_max_retry
pulsar_client_readers_closed
pulsar_client_readers_opened
pulsar_client_rpc_count
Sink-URI
When creating a changefeed, the user can specify the sink-uri like this: cdc cli changefeed create --sink-uri="${scheme}://${address}/${topic-name}?protocol=${protocol}&pulsar-version=${pulsar-version}&authentication-token=${authentication-token}
Example:
cdc cli changefeed create --server=http://127.0.0.1:8300
--sink-uri="pulsar://127.0.0.1:6650/persistent://public/default/test?protocol=canal-json&pulsar-version=v2.10.0&authentication-token=eyJhbGciOiJSUzIxxxxxxxxxxxxxxxxx"
Pulsar sink is a new feature, For tests, we focus on the functional tests, scenario tests and benchmark.
- Regular unit testing and integration testing cover the correctness of data replication using canal/maxwell/canal-json protocol.
Run stability and chaos tests under different workloads.
- The upstream and downstream data are consistent.
- Throughput and latency are stable for most scenarios.
Should be compatible with other features.
Pulsar sink is a new feature, so there should be no upgrade or downgrade compatibility issues.
Perform benchmark tests under common scenarios, big data scenarios, multi-table scenarios, and wide table scenarios with different parameters.
N/A
N/A
N/A