Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*(dm): support MySQL 8.0 collations #4949

Merged
merged 16 commits into from
Mar 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions dm/dm/master/election_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ func (t *testElectionSuite) TestFailToStartLeader(c *check.C) {
cfg2.AdvertisePeerUrls = cfg2.PeerUrls
cfg2.Join = cfg1.MasterAddr // join to an existing cluster

// imitate fail to start scheduler/pessimism/optimism
c.Assert(failpoint.Enable("github.com/pingcap/tiflow/dm/dm/master/FailToStartLeader", `return("dm-master-2")`), check.IsNil)
//nolint:errcheck
defer failpoint.Disable("github.com/pingcap/tiflow/dm/dm/master/FailToStartLeader")

s2 = NewServer(cfg2)
c.Assert(s2.Start(ctx), check.IsNil)
// wait the second master ready
Expand All @@ -92,11 +97,6 @@ func (t *testElectionSuite) TestFailToStartLeader(c *check.C) {
c.Assert(s1.ClusterID(), check.Greater, uint64(0))
c.Assert(s2.ClusterID(), check.Equals, uint64(0))

// fail to start scheduler/pessimism/optimism
c.Assert(failpoint.Enable("github.com/pingcap/tiflow/dm/dm/master/FailToStartLeader", `return("dm-master-2")`), check.IsNil)
//nolint:errcheck
defer failpoint.Disable("github.com/pingcap/tiflow/dm/dm/master/FailToStartLeader")

s1.election.Resign()
time.Sleep(1 * time.Second)

Expand Down
4 changes: 2 additions & 2 deletions dm/dm/master/shardddl/pessimist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -981,10 +981,10 @@ func (t *testPessimistSuite) TestMeetEtcdCompactError() {
done, _, err = pessimism.PutOperationDeleteExistInfo(t.etcdTestCli, op12c, i12)
require.NoError(t.T(), err)
require.True(t.T(), done)
require.True(t.T(), utils.WaitSomething(30, 100*time.Millisecond, func() bool {
require.Eventually(t.T(), func() bool {
_, ok := p.Locks()[ID1]
return !ok
}))
}, 5*time.Second, 100*time.Millisecond)
require.Len(t.T(), p.Locks(), 0)

cancel2()
Expand Down
9 changes: 9 additions & 0 deletions dm/pkg/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/mockstore"
"go.uber.org/zap"
Expand Down Expand Up @@ -201,6 +202,8 @@ func NewTracker(ctx context.Context, task string, sessionCfg map[string]string,
return nil, err
}
}
// skip DDL test https://github.com/pingcap/tidb/pull/33079
se.SetValue(sessionctx.QueryString, "skip")
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved

// TiDB will unconditionally create an empty "test" schema.
// This interferes with MySQL/MariaDB upstream which such schema does not
Expand Down Expand Up @@ -333,6 +336,7 @@ func IsTableNotExists(err error) bool {

// Reset drops all tables inserted into this tracker.
func (tr *Tracker) Reset() error {
tr.se.SetValue(sessionctx.QueryString, "skip")
allDBs := tr.dom.InfoSchema().AllSchemaNames()
ddl := tr.dom.DDL()
for _, db := range allDBs {
Expand All @@ -359,6 +363,7 @@ func (tr *Tracker) Close() error {

// DropTable drops a table from this tracker.
func (tr *Tracker) DropTable(table *filter.Table) error {
tr.se.SetValue(sessionctx.QueryString, "skip")
tableIdent := ast.Ident{
Schema: model.NewCIStr(table.Schema),
Name: model.NewCIStr(table.Name),
Expand All @@ -368,6 +373,7 @@ func (tr *Tracker) DropTable(table *filter.Table) error {

// DropIndex drops an index from this tracker.
func (tr *Tracker) DropIndex(table *filter.Table, index string) error {
tr.se.SetValue(sessionctx.QueryString, "skip")
tableIdent := ast.Ident{
Schema: model.NewCIStr(table.Schema),
Name: model.NewCIStr(table.Name),
Expand All @@ -377,6 +383,7 @@ func (tr *Tracker) DropIndex(table *filter.Table, index string) error {

// CreateSchemaIfNotExists creates a SCHEMA of the given name if it did not exist.
func (tr *Tracker) CreateSchemaIfNotExists(db string) error {
tr.se.SetValue(sessionctx.QueryString, "skip")
dbName := model.NewCIStr(db)
if tr.dom.InfoSchema().SchemaExists(dbName) {
return nil
Expand All @@ -399,6 +406,7 @@ func cloneTableInfo(ti *model.TableInfo) *model.TableInfo {

// CreateTableIfNotExists creates a TABLE of the given name if it did not exist.
func (tr *Tracker) CreateTableIfNotExists(table *filter.Table, ti *model.TableInfo) error {
tr.se.SetValue(sessionctx.QueryString, "skip")
schemaName := model.NewCIStr(table.Schema)
tableName := model.NewCIStr(table.Name)
ti = cloneTableInfo(ti)
Expand All @@ -407,6 +415,7 @@ func (tr *Tracker) CreateTableIfNotExists(table *filter.Table, ti *model.TableIn
}

func (tr *Tracker) BatchCreateTableIfNotExist(tablesToCreate map[string]map[string]*model.TableInfo) error {
tr.se.SetValue(sessionctx.QueryString, "skip")
for schema, tableNameInfo := range tablesToCreate {
var cloneTis []*model.TableInfo
for table, ti := range tableNameInfo {
Expand Down
4 changes: 2 additions & 2 deletions dm/tests/all_mode/data/db2.prepare.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ create table t2 (
id int NOT NULL AUTO_INCREMENT,
name varchar(20),
ts timestamp,
PRIMARY KEY (id));;
PRIMARY KEY (id));
insert into t2 (name, ts) values ('Arya', now()), ('Bran', '2021-05-11 10:01:05'), ('Sansa', NULL);

-- test block-allow-list
drop database if exists `ignore_db`;
create database `ignore_db`;
use `ignore_db`;
create table `ignore_table`(id int);
create table `ignore_table`(id int);
29 changes: 29 additions & 0 deletions dm/tests/new_collation_off/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# diff Configuration.

check-thread-count = 4

export-fix-sql = true

check-struct-only = false

[task]
output-dir = "/tmp/ticdc_dm_test/output"

source-instances = ["mysql2"]

target-instance = "tidb0"

target-check-tables = ["new_collation_off.?*"]

[data-sources]
[data-sources.mysql2]
host = "127.0.0.1"
port = 3307
user = "root"
password = "123456"

[data-sources.tidb0]
host = "127.0.0.1"
port = 4000
user = "test"
password = "123456"
5 changes: 5 additions & 0 deletions dm/tests/new_collation_off/conf/dm-master.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Master Configuration.
name = "master1"
master-addr = ":8261"
advertise-addr = "127.0.0.1:8261"
auto-compaction-retention = "3s"
41 changes: 41 additions & 0 deletions dm/tests/new_collation_off/conf/dm-task.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
---
name: test
task-mode: all
is-sharding: false
meta-schema: "dm_meta"
# enable-heartbeat: true

target-database:
host: "127.0.0.1"
port: 4000
user: "root"
password: ""

mysql-instances:
- source-id: "mysql-replica-02"
block-allow-list: "instance"
mydumper-config-name: "global"
loader-config-name: "global"
syncer-config-name: "global"

block-allow-list:
instance:
do-dbs: ["new_collation_off"]

mydumpers:
global:
threads: 4
chunk-filesize: 0
skip-tz-utc: true
statement-size: 100
extra-args: ""

loaders:
global:
pool-size: 16
dir: "./dumped_data"

syncers:
global:
worker-count: 16
batch: 100
2 changes: 2 additions & 0 deletions dm/tests/new_collation_off/conf/dm-worker1.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
name = "worker1"
join = "127.0.0.1:8261"
11 changes: 11 additions & 0 deletions dm/tests/new_collation_off/conf/source2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
source-id: mysql-replica-02
flavor: ''
enable-gtid: true
enable-relay: true
relay-binlog-name: ''
relay-binlog-gtid: ''
from:
host: 127.0.0.1
user: root
password: /Q7B9DizNLLTTfiZHv9WoEAKamfpIUs=
port: 3307
1 change: 1 addition & 0 deletions dm/tests/new_collation_off/conf/tidb-config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
new_collations_enabled_on_first_bootstrap = false
10 changes: 10 additions & 0 deletions dm/tests/new_collation_off/data/db2.increment.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use `new_collation_off`;

insert into t1 (id, name) values (2, 'Bob');

create table t2 (
id int PRIMARY KEY,
name varchar(20) COLLATE utf8mb4_0900_as_cs
);

insert into t2 (id, name) values (3, 'Charlie');
9 changes: 9 additions & 0 deletions dm/tests/new_collation_off/data/db2.prepare.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
drop database if exists `new_collation_off`;
create database `new_collation_off`;
use `new_collation_off`;
create table t1 (
id int PRIMARY KEY,
name varchar(20) COLLATE utf8mb4_0900_as_cs
);

insert into t1 (id, name) values (1, 'Alice');
51 changes: 51 additions & 0 deletions dm/tests/new_collation_off/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#!/bin/bash

set -eu

cur=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $cur/../_utils/test_prepare

WORK_DIR=$TEST_DIR/$TEST_NAME

API_VERSION="v1alpha1"

# this case will change downstream TiDB not to use new collation. Following cases
# should turn on new collation if they need.
function run() {
pkill -hup tidb-server 2>/dev/null || true
wait_process_exit tidb-server

# clean unistore data
rm -rf /tmp/tidb

# start a TiDB with off new-collation
run_tidb_server 4000 $TIDB_PASSWORD $cur/conf/tidb-config.toml
sleep 2

run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT

dmctl_operate_source create $cur/conf/source2.yaml $SOURCE_ID2
run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
check_contains 'Query OK, 1 row affected'

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"start-task $cur/conf/dm-task.yaml" \
"\"result\": true" 2

run_sql_file $cur/data/db2.increment.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2

echo "check data"
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml
}

cleanup_data new_collation_off
cleanup_process

run $*

cleanup_process

echo "[$(date)] <<<<<< test case $TEST_NAME success! >>>>>>"
1 change: 1 addition & 0 deletions dm/tests/others_integration_2.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ tracker_ignored_ddl
extend_column
shardddl_optimistic
gbk
new_collation_off
Loading