From 057997256da3d7273a3a86f8edf8e1be222fec2e Mon Sep 17 00:00:00 2001 From: zhaoxinyu Date: Mon, 16 May 2022 22:16:03 +0800 Subject: [PATCH 1/9] support alias for dispatcher --- pkg/config/replica_config.go | 4 ++-- pkg/config/replica_config_test.go | 26 +++++++++++++++++++++++--- pkg/config/sink.go | 18 +++++++++++++++--- 3 files changed, 40 insertions(+), 8 deletions(-) diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index 409586c7dbc..2952f216b8a 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -117,8 +117,8 @@ func (c *replicaConfig) fillFromV1(v1 *outdated.ReplicaConfigV1) { } for _, dispatch := range v1.Sink.DispatchRules { c.Sink.DispatchRules = append(c.Sink.DispatchRules, &DispatchRule{ - Matcher: []string{fmt.Sprintf("%s.%s", dispatch.Schema, dispatch.Name)}, - PartitionRule: dispatch.Rule, + Matcher: []string{fmt.Sprintf("%s.%s", dispatch.Schema, dispatch.Name)}, + DispatcherRule: dispatch.Rule, }) } } diff --git a/pkg/config/replica_config_test.go b/pkg/config/replica_config_test.go index 75003eb8579..528b13535fc 100644 --- a/pkg/config/replica_config_test.go +++ b/pkg/config/replica_config_test.go @@ -77,9 +77,9 @@ func TestReplicaConfigOutDated(t *testing.T) { conf.Mounter.WorkerNum = 3 conf.Sink.Protocol = "open-protocol" conf.Sink.DispatchRules = []*DispatchRule{ - {Matcher: []string{"a.b"}, PartitionRule: "r1"}, - {Matcher: []string{"a.c"}, PartitionRule: "r2"}, - {Matcher: []string{"a.d"}, PartitionRule: "r2"}, + {Matcher: []string{"a.b"}, DispatcherRule: "r1"}, + {Matcher: []string{"a.c"}, DispatcherRule: "r2"}, + {Matcher: []string{"a.d"}, DispatcherRule: "r2"}, } require.Equal(t, conf, conf2) } @@ -94,4 +94,24 @@ func TestReplicaConfigValidate(t *testing.T) { conf.Sink.Protocol = "canal" conf.EnableOldValue = false require.Regexp(t, ".*canal protocol requires old value to be enabled.*", conf.Validate()) + + conf = GetDefaultReplicaConfig() + conf.Sink.DispatchRules = []*DispatchRule{ + {Matcher: []string{"a.b"}, DispatcherRule: "d1", PartitionRule: "r1"}, + } + require.Regexp(t, ".*dispatcher and partition cannot be configured both.*", conf.Validate()) + + // Correct sink configuration. + conf = GetDefaultReplicaConfig() + conf.Sink.DispatchRules = []*DispatchRule{ + {Matcher: []string{"a.b"}, DispatcherRule: "d1"}, + {Matcher: []string{"a.c"}, PartitionRule: "p1"}, + {Matcher: []string{"a.d"}}, + } + err := conf.Validate() + require.Nil(t, err) + rules := conf.Sink.DispatchRules + require.Equal(t, "d1", rules[0].PartitionRule) + require.Equal(t, "p1", rules[1].PartitionRule) + require.Equal(t, "", rules[2].PartitionRule) } diff --git a/pkg/config/sink.go b/pkg/config/sink.go index 36aa9554454..e46645bf05d 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -19,6 +19,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/log" cerror "github.com/pingcap/tiflow/pkg/errors" + "go.uber.org/zap" ) // DefaultMaxMessageBytes sets the default value for max-message-bytes @@ -40,9 +41,10 @@ type SinkConfig struct { // DispatchRule represents partition rule for a table type DispatchRule struct { - Matcher []string `toml:"matcher" json:"matcher"` - PartitionRule string `toml:"dispatcher" json:"dispatcher"` - TopicRule string `toml:"topic" json:"topic"` + Matcher []string `toml:"matcher" json:"matcher"` + DispatcherRule string `toml:"dispatcher" json:"dispatcher"` + PartitionRule string `toml:"partition" json:"partition"` + TopicRule string `toml:"topic" json:"topic"` } // ColumnSelector represents a column selector for a table. @@ -62,6 +64,16 @@ func (s *SinkConfig) validate(enableOldValue bool) error { } } } + for _, rule := range s.DispatchRules { + if rule.DispatcherRule != "" && rule.PartitionRule != "" { + log.Error("dispatcher and partition cannot be configured both", zap.Any("rule", rule)) + return cerror.WrapError(cerror.ErrSinkInvalidConfig, + errors.New(fmt.Sprintf("dispatcher and partition cannot be configured both for rule:%v", rule))) + } + if rule.DispatcherRule != "" { + rule.PartitionRule = rule.DispatcherRule + } + } return nil } From 6d95c51f1e87174788ae9634a86177d9813a16f9 Mon Sep 17 00:00:00 2001 From: zhaoxinyu Date: Mon, 16 May 2022 23:26:08 +0800 Subject: [PATCH 2/9] modify ut --- cdc/sink/mysql/txn_cache.go | 4 +- pkg/cmd/util/changefeed.toml | 2 +- pkg/cmd/util/helper_test.go | 149 ++++++++++++++++-------------- pkg/config/replica_config_test.go | 6 +- pkg/config/sink.go | 4 +- 5 files changed, 90 insertions(+), 75 deletions(-) diff --git a/cdc/sink/mysql/txn_cache.go b/cdc/sink/mysql/txn_cache.go index 84c334ba300..c332bb2d6f8 100644 --- a/cdc/sink/mysql/txn_cache.go +++ b/cdc/sink/mysql/txn_cache.go @@ -121,7 +121,9 @@ func (c *unresolvedTxnCache) Resolved( func splitResolvedTxn( resolvedTsMap *sync.Map, unresolvedTxns map[model.TableID][]*txnsWithTheSameCommitTs, -) (checkpointTsMap map[model.TableID]uint64, resolvedRowsMap map[model.TableID][]*model.SingleTableTxn) { +) (checkpointTsMap map[model.TableID]uint64, + resolvedRowsMap map[model.TableID][]*model.SingleTableTxn, +) { var ( ok bool txnsLength int diff --git a/pkg/cmd/util/changefeed.toml b/pkg/cmd/util/changefeed.toml index 9c305a4bd35..41202b58e04 100644 --- a/pkg/cmd/util/changefeed.toml +++ b/pkg/cmd/util/changefeed.toml @@ -27,7 +27,7 @@ worker-num = 16 # For MQ Sinks, you can configure event distribution rules through dispatchers # Dispatchers support default, ts, rowid and table dispatchers = [ - { matcher = ['test1.*', 'test2.*'], dispatcher = "ts", topic = "hello_{schema}" }, + { matcher = ['test1.*', 'test2.*'], partition = "ts", topic = "hello_{schema}" }, { matcher = ['test3.*', 'test4.*'], dispatcher = "rowid", topic = "{schema}_world" }, ] # 对于 MQ 类的 Sink,可以通过 column-selectors 配置 column 选择器 diff --git a/pkg/cmd/util/helper_test.go b/pkg/cmd/util/helper_test.go index f0f1d2571f1..3b649dfec98 100644 --- a/pkg/cmd/util/helper_test.go +++ b/pkg/cmd/util/helper_test.go @@ -16,24 +16,17 @@ package util import ( "bytes" "fmt" + "io/ioutil" "os" "path/filepath" "testing" - "github.com/pingcap/check" "github.com/pingcap/tiflow/pkg/config" - "github.com/pingcap/tiflow/pkg/util/testleak" "github.com/spf13/cobra" + "github.com/stretchr/testify/require" ) -func TestSuite(t *testing.T) { check.TestingT(t) } - -type utilsSuite struct{} - -var _ = check.Suite(&utilsSuite{}) - -func (s *utilsSuite) TestProxyFields(c *check.C) { - defer testleak.AfterTest(c)() +func TestProxyFields(t *testing.T) { revIndex := map[string]int{ "http_proxy": 0, "https_proxy": 1, @@ -46,65 +39,74 @@ func (s *utilsSuite) TestProxyFields(c *check.C) { // Each bit of the mask decided whether this index of `envs` would be set. for mask := 0; mask <= 0b111; mask++ { for _, env := range envs { - c.Assert(os.Unsetenv(env), check.IsNil) + require.Nil(t, os.Unsetenv(env)) } for i := 0; i < 3; i++ { if (1< Date: Mon, 16 May 2022 23:46:24 +0800 Subject: [PATCH 3/9] modify a failed ut --- cdc/model/changefeed_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cdc/model/changefeed_test.go b/cdc/model/changefeed_test.go index c23b9b9609d..db137be82af 100644 --- a/cdc/model/changefeed_test.go +++ b/cdc/model/changefeed_test.go @@ -149,8 +149,8 @@ func TestFillV1(t *testing.T) { }, Sink: &config.SinkConfig{ DispatchRules: []*config.DispatchRule{ - {Matcher: []string{"test.tbl3"}, PartitionRule: "ts"}, - {Matcher: []string{"test.tbl4"}, PartitionRule: "rowid"}, + {Matcher: []string{"test.tbl3"}, DispatcherRule: "ts"}, + {Matcher: []string{"test.tbl4"}, DispatcherRule: "rowid"}, }, }, Cyclic: &config.CyclicConfig{ From bfcdc7db2c3448f7d2fae23027a074f7aa6e0218 Mon Sep 17 00:00:00 2001 From: zhaoxinyu Date: Tue, 17 May 2022 10:14:20 +0800 Subject: [PATCH 4/9] address comments --- docs/swagger/docs.go | 4 ++++ docs/swagger/swagger.json | 4 ++++ docs/swagger/swagger.yaml | 5 +++++ pkg/config/sink.go | 9 +++++++-- 4 files changed, 20 insertions(+), 2 deletions(-) diff --git a/docs/swagger/docs.go b/docs/swagger/docs.go index b62428fb2f4..34ed6c7c8aa 100644 --- a/docs/swagger/docs.go +++ b/docs/swagger/docs.go @@ -731,6 +731,10 @@ var doc = `{ "type": "string" } }, + "partition": { + "description": "PartitionRule is an alias added for DispatcherRule to mitigate confusions.\nIn the future release, the DispatcherRule is expected to be removed .", + "type": "string" + }, "topic": { "type": "string" } diff --git a/docs/swagger/swagger.json b/docs/swagger/swagger.json index 3d6a3e3ab25..ebe252d105f 100644 --- a/docs/swagger/swagger.json +++ b/docs/swagger/swagger.json @@ -712,6 +712,10 @@ "type": "string" } }, + "partition": { + "description": "PartitionRule is an alias added for DispatcherRule to mitigate confusions.\nIn the future release, the DispatcherRule is expected to be removed .", + "type": "string" + }, "topic": { "type": "string" } diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index e9f59cebcbe..82909bfc7de 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -18,6 +18,11 @@ definitions: items: type: string type: array + partition: + description: |- + PartitionRule is an alias added for DispatcherRule to mitigate confusions. + In the future release, the DispatcherRule is expected to be removed . + type: string topic: type: string type: object diff --git a/pkg/config/sink.go b/pkg/config/sink.go index 8b15111913b..80bb910f2c2 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -43,8 +43,10 @@ type SinkConfig struct { type DispatchRule struct { Matcher []string `toml:"matcher" json:"matcher"` DispatcherRule string `toml:"dispatcher" json:"dispatcher"` - PartitionRule string `toml:"partition" json:"partition"` - TopicRule string `toml:"topic" json:"topic"` + // PartitionRule is an alias added for DispatcherRule to mitigate confusions. + // In the future release, the DispatcherRule is expected to be removed . + PartitionRule string `toml:"partition" json:"partition"` + TopicRule string `toml:"topic" json:"topic"` } // ColumnSelector represents a column selector for a table. @@ -71,6 +73,9 @@ func (s *SinkConfig) validate(enableOldValue bool) error { errors.New(fmt.Sprintf("dispatcher and partition cannot be "+ "configured both for rule:%v", rule))) } + // We only use PartitionRule in our codebase to represent partition dispatching rule. + // So when DispatcherRule is not empty, we assign its value to PartitionRule + // and clear itself. if rule.DispatcherRule != "" { rule.PartitionRule = rule.DispatcherRule rule.DispatcherRule = "" From 1912554c54d073238ce71f4d450877297ad9fa4f Mon Sep 17 00:00:00 2001 From: zhaoxinyu Date: Tue, 17 May 2022 10:19:08 +0800 Subject: [PATCH 5/9] modify some code comments --- pkg/config/sink.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/config/sink.go b/pkg/config/sink.go index 80bb910f2c2..7c049187f5a 100644 --- a/pkg/config/sink.go +++ b/pkg/config/sink.go @@ -73,9 +73,9 @@ func (s *SinkConfig) validate(enableOldValue bool) error { errors.New(fmt.Sprintf("dispatcher and partition cannot be "+ "configured both for rule:%v", rule))) } - // We only use PartitionRule in our codebase to represent partition dispatching rule. - // So when DispatcherRule is not empty, we assign its value to PartitionRule - // and clear itself. + // After `validate()` is called, we only use PartitionRule to represent a partition + // dispatching rule. So when DispatcherRule is not empty, we assign its + // value to PartitionRule and clear itself. if rule.DispatcherRule != "" { rule.PartitionRule = rule.DispatcherRule rule.DispatcherRule = "" From d74ed03be561848441fce249d1b7848736e68f66 Mon Sep 17 00:00:00 2001 From: zhaoxinyu Date: Tue, 17 May 2022 10:34:02 +0800 Subject: [PATCH 6/9] address comments --- pkg/cmd/util/helper_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/cmd/util/helper_test.go b/pkg/cmd/util/helper_test.go index 3b649dfec98..915ff9d2120 100644 --- a/pkg/cmd/util/helper_test.go +++ b/pkg/cmd/util/helper_test.go @@ -186,9 +186,9 @@ func TestAndWriteExampleReplicaTOML(t *testing.T) { IgnoreTxnStartTs: []uint64{1, 2}, Rules: []string{"*.*", "!test.*"}, }, cfg.Filter) - require.Equal(t, cfg.Mounter, &config.MounterConfig{ + require.Equal(t, &config.MounterConfig{ WorkerNum: 16, - }) + }, cfg.Mounter) err = cfg.Validate() require.Nil(t, err) require.Equal(t, &config.SinkConfig{ From 99aa777edf9722c372720d76cf1fe6ab8e3e7349 Mon Sep 17 00:00:00 2001 From: zhaoxinyu Date: Tue, 17 May 2022 11:50:14 +0800 Subject: [PATCH 7/9] address comments --- pkg/cmd/util/helper_test.go | 2 +- pkg/config/sink.go | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/pkg/cmd/util/helper_test.go b/pkg/cmd/util/helper_test.go index 915ff9d2120..61ae730d1cb 100644 --- a/pkg/cmd/util/helper_test.go +++ b/pkg/cmd/util/helper_test.go @@ -51,7 +51,7 @@ func TestProxyFields(t *testing.T) { for _, field := range findProxyFields() { idx, ok := revIndex[field.Key] require.True(t, ok) - require.NotEqual(t, (1< Date: Tue, 17 May 2022 14:25:56 +0800 Subject: [PATCH 8/9] address comments --- pkg/cmd/util/main_test.go | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) create mode 100644 pkg/cmd/util/main_test.go diff --git a/pkg/cmd/util/main_test.go b/pkg/cmd/util/main_test.go new file mode 100644 index 00000000000..63a5c992512 --- /dev/null +++ b/pkg/cmd/util/main_test.go @@ -0,0 +1,24 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import ( + "testing" + + "github.com/pingcap/tiflow/pkg/leakutil" +) + +func TestMain(m *testing.M) { + leakutil.SetUpLeakTest(m) +} From 781913d92d271470b9d6580f106d79b21139ebbd Mon Sep 17 00:00:00 2001 From: zhaoxinyu Date: Tue, 17 May 2022 14:52:30 +0800 Subject: [PATCH 9/9] regenerate docs --- docs/swagger/docs.go | 1 + docs/swagger/swagger.json | 1 + docs/swagger/swagger.yaml | 1 + 3 files changed, 3 insertions(+) diff --git a/docs/swagger/docs.go b/docs/swagger/docs.go index 34ed6c7c8aa..b097e17f6f7 100644 --- a/docs/swagger/docs.go +++ b/docs/swagger/docs.go @@ -723,6 +723,7 @@ var doc = `{ "type": "object", "properties": { "dispatcher": { + "description": "Deprecated, please use PartitionRule.", "type": "string" }, "matcher": { diff --git a/docs/swagger/swagger.json b/docs/swagger/swagger.json index ebe252d105f..9f50eb12882 100644 --- a/docs/swagger/swagger.json +++ b/docs/swagger/swagger.json @@ -704,6 +704,7 @@ "type": "object", "properties": { "dispatcher": { + "description": "Deprecated, please use PartitionRule.", "type": "string" }, "matcher": { diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index 82909bfc7de..1d0ac4b317b 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -13,6 +13,7 @@ definitions: config.DispatchRule: properties: dispatcher: + description: Deprecated, please use PartitionRule. type: string matcher: items: