Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

binlog: do not decode rows for block table #7622

Merged
merged 9 commits into from
Nov 24, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions dm/relay/local_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ var ErrorMaybeDuplicateEvent = errors.New("truncate binlog file found, event may

// BinlogReaderConfig is the configuration for BinlogReader.
type BinlogReaderConfig struct {
RelayDir string
Timezone *time.Location
Flavor string
RelayDir string
Timezone *time.Location
Flavor string
RowsEventDecodeFunc func(*replication.RowsEvent, []byte) error
}

// BinlogReader is a binlog reader.
Expand Down Expand Up @@ -82,6 +83,7 @@ func newBinlogReader(logger log.Logger, cfg *BinlogReaderConfig, relay Process)
parser.SetVerifyChecksum(true)
// use string representation of decimal, to replicate the exact value
parser.SetUseDecimal(false)
parser.SetRowsEventDecodeFunc(cfg.RowsEventDecodeFunc)
if cfg.Timezone != nil {
parser.SetTimestampStringLocation(cfg.Timezone)
}
Expand Down
2 changes: 1 addition & 1 deletion dm/syncer/binlogstream/streamer_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func (c *StreamerController) resetReplicationSyncer(tctx *tcontext.Context, loca
if c.currentBinlogType == RemoteBinlog {
c.streamProducer = &remoteBinlogReader{replication.NewBinlogSyncer(c.syncCfg), tctx, c.syncCfg.Flavor, c.enableGTID}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to abandon these blocked events while using a remote binlog reader?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

c.syncCfg is created by

func subtaskCfg2BinlogSyncerCfg(cfg *config.SubTaskConfig, timezone *time.Location, baList *filter.Filter) (replication.BinlogSyncerConfig, error) {
, which abandon these blocked events

} else {
c.streamProducer = &localBinlogReader{c.relay.NewReader(tctx.L(), &relay.BinlogReaderConfig{RelayDir: c.localBinlogDir, Timezone: c.timezone, Flavor: c.syncCfg.Flavor}), c.enableGTID}
c.streamProducer = &localBinlogReader{c.relay.NewReader(tctx.L(), &relay.BinlogReaderConfig{RelayDir: c.localBinlogDir, Timezone: c.timezone, Flavor: c.syncCfg.Flavor, RowsEventDecodeFunc: c.syncCfg.RowsEventDecodeFunc}), c.enableGTID}
}

c.upstream, err = newLocationStream(c.streamProducer, location)
Expand Down
2 changes: 1 addition & 1 deletion dm/syncer/data_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func (v *DataValidator) initialize() error {
return err
}

v.syncCfg, err = subtaskCfg2BinlogSyncerCfg(v.cfg, v.timezone)
v.syncCfg, err = subtaskCfg2BinlogSyncerCfg(v.cfg, v.timezone, v.syncer.baList)
if err != nil {
return err
}
Expand Down
39 changes: 6 additions & 33 deletions dm/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,12 @@ func (s *Syncer) Init(ctx context.Context) (err error) {
return
}

s.syncCfg, err = subtaskCfg2BinlogSyncerCfg(s.cfg, s.timezone)
s.baList, err = filter.New(s.cfg.CaseSensitive, s.cfg.BAList)
if err != nil {
return terror.ErrSyncerUnitGenBAList.Delegate(err)
}

s.syncCfg, err = subtaskCfg2BinlogSyncerCfg(s.cfg, s.timezone, s.baList)
if err != nil {
return err
}
Expand Down Expand Up @@ -392,11 +397,6 @@ func (s *Syncer) Init(ctx context.Context) (err error) {
s.tctx.L(),
)

s.baList, err = filter.New(s.cfg.CaseSensitive, s.cfg.BAList)
if err != nil {
return terror.ErrSyncerUnitGenBAList.Delegate(err)
}

s.binlogFilter, err = bf.NewBinlogEvent(s.cfg.CaseSensitive, s.cfg.FilterRules)
if err != nil {
return terror.ErrSyncerUnitGenBinlogEventFilter.Delegate(err)
Expand Down Expand Up @@ -3330,33 +3330,6 @@ func (s *Syncer) checkpointID() string {
return strconv.FormatUint(uint64(s.cfg.ServerID), 10)
}

// UpdateFromConfig updates config for `From`.
func (s *Syncer) UpdateFromConfig(cfg *config.SubTaskConfig) error {
s.Lock()
defer s.Unlock()
s.fromDB.BaseDB.Close()

s.cfg.From = cfg.From

var err error
s.cfg.From.RawDBCfg = config.DefaultRawDBConfig().SetReadTimeout(maxDMLConnectionTimeout)
s.fromDB, err = dbconn.NewUpStreamConn(&s.cfg.From)
if err != nil {
s.tctx.L().Error("fail to create baseConn connection", log.ShortError(err))
return err
}

s.syncCfg, err = subtaskCfg2BinlogSyncerCfg(s.cfg, s.timezone)
if err != nil {
return err
}

if s.streamerController != nil {
s.streamerController.UpdateSyncCfg(s.syncCfg, s.fromDB)
}
return nil
}

// ShardDDLOperation returns the current pending to handle shard DDL lock operation.
func (s *Syncer) ShardDDLOperation() *pessimism.Operation {
return s.pessimist.PendingOperation()
Expand Down
53 changes: 47 additions & 6 deletions dm/syncer/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,14 @@ import (
"go.uber.org/zap"
)

// the time layout for TiDB SHOW DDL statements.
const timeLayout = "2006-01-02 15:04:05"

// everytime retrieve 10 new rows from TiDB history jobs.
const linesOfRows = 10
const (
// the time layout for TiDB SHOW DDL statements.
timeLayout = "2006-01-02 15:04:05"
// everytime retrieve 10 new rows from TiDB history jobs.
linesOfRows = 10
// max capacity of the block/allow list.
maxCapacity = 100000
)

// getTableByDML gets table from INSERT/UPDATE/DELETE statement.
func getTableByDML(dml ast.DMLNode) (*filter.Table, error) {
Expand Down Expand Up @@ -136,7 +139,7 @@ func str2TimezoneOrFromDB(tctx *tcontext.Context, tzStr string, dbCfg *config.DB
return loc, tzStr, nil
}

func subtaskCfg2BinlogSyncerCfg(cfg *config.SubTaskConfig, timezone *time.Location) (replication.BinlogSyncerConfig, error) {
func subtaskCfg2BinlogSyncerCfg(cfg *config.SubTaskConfig, timezone *time.Location, baList *filter.Filter) (replication.BinlogSyncerConfig, error) {
var tlsConfig *tls.Config
var err error
if cfg.From.Security != nil {
Expand All @@ -153,6 +156,43 @@ func subtaskCfg2BinlogSyncerCfg(cfg *config.SubTaskConfig, timezone *time.Locati
}
}

var rowsEventDecodeFunc func(*replication.RowsEvent, []byte) error
if baList != nil {
// we don't track delete table events, so simply reset the cache if it's full
allowListCache := make(map[uint64]struct{}, maxCapacity)
blockListCache := make(map[uint64]struct{}, maxCapacity)

rowsEventDecodeFunc = func(re *replication.RowsEvent, data []byte) error {
pos, err := re.DecodeHeader(data)
if err != nil {
return err
}
if _, ok := blockListCache[re.TableID]; ok {
return nil
} else if _, ok := allowListCache[re.TableID]; ok {
return re.DecodeData(pos, data)
}

tb := &filter.Table{
Schema: string(re.Table.Schema),
Name: string(re.Table.Table),
}
if skipByTable(baList, tb) {
if len(blockListCache) >= maxCapacity {
blockListCache = make(map[uint64]struct{}, maxCapacity)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will prefer to evict one record rather than reset the whole cache. Maybe an LRU or CLOCK cache is more appropriate.

But it might be rare to run out of capacity, so it's okay.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a TODO c5511e7

}
blockListCache[re.TableID] = struct{}{}
return nil
}

if len(allowListCache) >= maxCapacity {
allowListCache = make(map[uint64]struct{}, maxCapacity)
}
allowListCache[re.TableID] = struct{}{}
return re.DecodeData(pos, data)
}
}

syncCfg := replication.BinlogSyncerConfig{
ServerID: cfg.ServerID,
Flavor: cfg.Flavor,
Expand All @@ -162,6 +202,7 @@ func subtaskCfg2BinlogSyncerCfg(cfg *config.SubTaskConfig, timezone *time.Locati
Password: cfg.From.Password,
TimestampStringLocation: timezone,
TLSConfig: tlsConfig,
RowsEventDecodeFunc: rowsEventDecodeFunc,
}
// when retry count > 1, go-mysql will retry sync from the previous GTID set in GTID mode,
// which may get duplicate binlog event after retry success. so just set retry count = 1, and task
Expand Down
29 changes: 29 additions & 0 deletions dm/tests/binlog_parse/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# diff Configuration.

check-thread-count = 4

export-fix-sql = true

check-struct-only = false

[task]
output-dir = "/tmp/ticdc_dm_test/output"

source-instances = ["mysql1"]

target-instance = "tidb0"

target-check-tables = ["binlog_parse.t?*"]

[data-sources]
[data-sources.mysql1]
host = "127.0.0.1"
port = 3306
user = "root"
password = "123456"

[data-sources.tidb0]
host = "127.0.0.1"
port = 4000
user = "test"
password = "123456"
4 changes: 4 additions & 0 deletions dm/tests/binlog_parse/conf/dm-master.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Master Configuration.
master-addr = ":8261"
advertise-addr = "127.0.0.1:8261"
auto-compaction-retention = "3s"
42 changes: 42 additions & 0 deletions dm/tests/binlog_parse/conf/dm-task.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
---
name: test
task-mode: all
is-sharding: false
meta-schema: "dm_meta"

target-database:
host: "127.0.0.1"
port: 4000
user: "test"
password: "/Q7B9DizNLLTTfiZHv9WoEAKamfpIUs="

mysql-instances:
- source-id: "mysql-replica-01"
block-allow-list: "instance"
mydumper-config-name: "global"
loader-config-name: "global"
syncer-config-name: "global"

block-allow-list:
instance:
do-dbs: ["binlog_parse"]
do-tables:
- db-name: "binlog_parse"
tbl-name: "t1"

mydumpers:
global:
threads: 4
chunk-filesize: 64
skip-tz-utc: true
extra-args: ""

loaders:
global:
pool-size: 16
dir: "./dumped_data"

syncers:
global:
worker-count: 16
batch: 100
2 changes: 2 additions & 0 deletions dm/tests/binlog_parse/conf/dm-worker1.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
name = "worker1"
join = "127.0.0.1:8261"
12 changes: 12 additions & 0 deletions dm/tests/binlog_parse/conf/source1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
source-id: mysql-replica-01
enable-gtid: true
relay-binlog-name: ''
relay-binlog-gtid: ''
enable-relay: false
from:
host: 127.0.0.1
user: root
password: /Q7B9DizNLLTTfiZHv9WoEAKamfpIUs=
port: 3306
checker:
check-enable: false
3 changes: 3 additions & 0 deletions dm/tests/binlog_parse/data/db1.increment.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
use binlog_parse;
insert into t1 (id, created_time) values (3, '2022-01-03 00:00:01'), (4, '2022-01-04 00:00:01');
insert into t2 (id, created_time) values (3, '2022-01-03 00:00:01'), (4, '2022-01-04 00:00:01');
3 changes: 3 additions & 0 deletions dm/tests/binlog_parse/data/db1.increment1.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
use binlog_parse;
insert into t1 (id, created_time) values (5, '2022-01-05 00:00:01'), (6, '2022-01-06 00:00:01');
insert into t2 (id, created_time) values (5, '2022-01-05 00:00:01'), (6, '2022-01-06 00:00:01');
7 changes: 7 additions & 0 deletions dm/tests/binlog_parse/data/db1.prepare.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
drop database if exists `binlog_parse`;
create database `binlog_parse` character set utf8;
use `binlog_parse`;
create table t1 (id int, created_time timestamp, primary key(`id`)) character set utf8;
create table t2 (id int, created_time timestamp(3), primary key(`id`)) character set utf8;
insert into t1 (id, created_time) values (1, '2022-01-01 00:00:01'), (2, '2022-01-02 00:00:01');
insert into t2 (id, created_time) values (1, '2022-01-01 00:00:01'), (2, '2022-01-02 00:00:01');
67 changes: 67 additions & 0 deletions dm/tests/binlog_parse/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#!/bin/bash

set -eu

cur=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $cur/../_utils/test_prepare
WORK_DIR=$TEST_DIR/$TEST_NAME

function run() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would you leave a comment to illustrate what this test intends to do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you mean this used to manually test MariaDB? I remember we don't have MariaDB in CI

Copy link
Contributor Author

@GMHDBJD GMHDBJD Nov 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, v10.0

run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT

# operate mysql config to worker
cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml
dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1

echo "prepare data"
run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1

echo "start task"
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"start-task $cur/conf/dm-task.yaml --remove-meta"

echo "check full phase"
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"validation start test" \
"\"result\": true" 1

echo "prepare incremental data"
run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1

echo "check incremental phase"
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

run_sql_tidb_with_retry "select count(1) from binlog_parse.t1;" "count(1): 4"

# relay error in mariadb:10.0, success in mysql
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"start-relay -s $SOURCE_ID1 worker1" \
"\"result\": true" 2 \
"\"source\": \"$SOURCE_ID1\"" 1 \
"\"worker\": \"worker1\"" 1
# "TCPReader get relay event with error" 1

echo "prepare incremental data 2"
run_sql_file $cur/data/db1.increment1.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"validation start test" \
"\"result\": true" 1

echo "check incremental phase 2"
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

read v1
}

cleanup_data $TEST_NAME
# also cleanup dm processes in case of last run failed
cleanup_process $*
run $*
cleanup_process $*

echo "[$(date)] <<<<<< test case $TEST_NAME success! >>>>>>"
17 changes: 0 additions & 17 deletions dm/worker/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,23 +678,6 @@ func (st *SubTask) OperateSchema(ctx context.Context, req *pb.OperateWorkerSchem
return syncUnit.OperateSchema(ctx, req)
}

// UpdateFromConfig updates config for `From`.
func (st *SubTask) UpdateFromConfig(cfg *config.SubTaskConfig) error {
st.Lock()
defer st.Unlock()

if sync, ok := st.currUnit.(*syncer.Syncer); ok {
err := sync.UpdateFromConfig(cfg)
if err != nil {
return err
}
}

st.cfg.From = cfg.From

return nil
}

// CheckUnit checks whether current unit is sync unit.
func (st *SubTask) CheckUnit() bool {
st.RLock()
Expand Down
Loading