Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#3845
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
sdojjy authored and ti-chi-bot committed Dec 20, 2021
1 parent 3a46657 commit ac4fbc8
Show file tree
Hide file tree
Showing 3 changed files with 291 additions and 31 deletions.
37 changes: 36 additions & 1 deletion cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,20 @@ package owner

import (
"context"
"strings"
"sync"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
<<<<<<< HEAD
timodel "github.com/pingcap/parser/model"
"github.com/pingcap/tidb/sessionctx/binloginfo"
=======
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/format"
timodel "github.com/pingcap/tidb/parser/model"
>>>>>>> 7ccaad224 (ticdc/owner: Fix ddl special comment syntax error (#3845))
"github.com/pingcap/tiflow/cdc/model"
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerror "github.com/pingcap/tiflow/pkg/errors"
Expand Down Expand Up @@ -422,7 +429,11 @@ func (c *changefeed) asyncExecDDL(ctx cdcContext.Context, job *timodel.Job) (don
if err != nil {
return false, errors.Trace(err)
}
ddlEvent.Query = binloginfo.AddSpecialComment(ddlEvent.Query)
ddlEvent.Query, err = addSpecialComment(ddlEvent.Query)
if err != nil {
return false, errors.Trace(err)
}

c.ddlEventCache = ddlEvent
}
if job.BinlogInfo.TableInfo != nil && c.schema.IsIneligibleTableID(job.BinlogInfo.TableInfo.ID) {
Expand Down Expand Up @@ -480,3 +491,27 @@ func (c *changefeed) updateStatus(currentTs int64, barrierTs model.Ts) {
func (c *changefeed) Close() {
c.releaseResources()
}

// addSpecialComment translate tidb feature to comment
func addSpecialComment(ddlQuery string) (string, error) {
stms, _, err := parser.New().ParseSQL(ddlQuery)
if err != nil {
return "", errors.Trace(err)
}
if len(stms) != 1 {
log.Panic("invalid ddlQuery statement size", zap.String("ddlQuery", ddlQuery))
}
var sb strings.Builder
// translate TiDB feature to special comment
restoreFlags := format.RestoreTiDBSpecialComment
// escape the keyword
restoreFlags |= format.RestoreNameBackQuotes
// upper case keyword
restoreFlags |= format.RestoreKeyWordUppercase
// wrap string with single quote
restoreFlags |= format.RestoreStringSingleQuotes
if err = stms[0].Restore(format.NewRestoreCtx(restoreFlags, &sb)); err != nil {
return "", errors.Trace(err)
}
return sb.String(), nil
}
221 changes: 221 additions & 0 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,11 @@ func (s *changefeedSuite) TestExecDDL(c *check.C) {
mockDDLPuller.ddlQueue = append(mockDDLPuller.ddlQueue, job)
tickThreeTime()
c.Assert(state.Status.CheckpointTs, check.Equals, mockDDLPuller.resolvedTs)
<<<<<<< HEAD
c.Assert(mockAsyncSink.ddlExecuting.Query, check.Equals, "create database test1")
=======
c.Assert(mockDDLSink.ddlExecuting.Query, check.Equals, "CREATE DATABASE `test1`")
>>>>>>> 7ccaad224 (ticdc/owner: Fix ddl special comment syntax error (#3845))

// executing the ddl finished
mockAsyncSink.ddlDone = true
Expand All @@ -286,7 +290,11 @@ func (s *changefeedSuite) TestExecDDL(c *check.C) {
mockDDLPuller.ddlQueue = append(mockDDLPuller.ddlQueue, job)
tickThreeTime()
c.Assert(state.Status.CheckpointTs, check.Equals, mockDDLPuller.resolvedTs)
<<<<<<< HEAD
c.Assert(mockAsyncSink.ddlExecuting.Query, check.Equals, "create table test1.test1(id int primary key)")
=======
c.Assert(mockDDLSink.ddlExecuting.Query, check.Equals, "CREATE TABLE `test1`.`test1` (`id` INT PRIMARY KEY)")
>>>>>>> 7ccaad224 (ticdc/owner: Fix ddl special comment syntax error (#3845))

// executing the ddl finished
mockAsyncSink.ddlDone = true
Expand Down Expand Up @@ -353,3 +361,216 @@ func (s *changefeedSuite) TestFinished(c *check.C) {
c.Assert(state.Status.CheckpointTs, check.Equals, state.Info.TargetTs)
c.Assert(state.Info.State, check.Equals, model.StateFinished)
}
<<<<<<< HEAD
=======

func (s *changefeedSuite) TestRemoveChangefeed(c *check.C) {
defer testleak.AfterTest(c)()

baseCtx, cancel := context.WithCancel(context.Background())
ctx := cdcContext.NewContext4Test(baseCtx, true)
info := ctx.ChangefeedVars().Info
dir := c.MkDir()
info.Config.Consistent = &config.ConsistentConfig{
Level: "eventual",
Storage: filepath.Join("nfs://", dir),
}
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: ctx.ChangefeedVars().ID,
Info: info,
})
testChangefeedReleaseResource(c, ctx, cancel, dir, true /*expectedInitialized*/)
}

func (s *changefeedSuite) TestRemovePausedChangefeed(c *check.C) {
defer testleak.AfterTest(c)()

baseCtx, cancel := context.WithCancel(context.Background())
ctx := cdcContext.NewContext4Test(baseCtx, true)
info := ctx.ChangefeedVars().Info
info.State = model.StateStopped
dir := c.MkDir()
info.Config.Consistent = &config.ConsistentConfig{
Level: "eventual",
Storage: filepath.Join("nfs://", dir),
}
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: ctx.ChangefeedVars().ID,
Info: info,
})
testChangefeedReleaseResource(c, ctx, cancel, dir, false /*expectedInitialized*/)
}

func testChangefeedReleaseResource(
c *check.C,
ctx cdcContext.Context,
cancel context.CancelFunc,
redoLogDir string,
expectedInitialized bool,
) {
cf, state, captures, tester := createChangefeed4Test(ctx, c)

// pre check
cf.Tick(ctx, state, captures)
tester.MustApplyPatches()

// initialize
cf.Tick(ctx, state, captures)
tester.MustApplyPatches()
c.Assert(cf.initialized, check.Equals, expectedInitialized)

// remove changefeed from state manager by admin job
cf.feedStateManager.PushAdminJob(&model.AdminJob{
CfID: cf.id,
Type: model.AdminRemove,
})
// changefeed tick will release resources
err := cf.tick(ctx, state, captures)
c.Assert(err, check.IsNil)
cancel()
// check redo log dir is deleted
_, err = os.Stat(redoLogDir)
c.Assert(os.IsNotExist(err), check.IsTrue)
}

func (s *changefeedSuite) TestAddSpecialComment(c *check.C) {
defer testleak.AfterTest(c)()
testCase := []struct {
input string
result string
}{
{
"create table t1 (id int ) shard_row_id_bits=2;",
"CREATE TABLE `t1` (`id` INT) /*T! SHARD_ROW_ID_BITS = 2 */",
},
{
"create table t1 (id int ) shard_row_id_bits=2 pre_split_regions=2;",
"CREATE TABLE `t1` (`id` INT) /*T! SHARD_ROW_ID_BITS = 2 */ /*T! PRE_SPLIT_REGIONS = 2 */",
},
{
"create table t1 (id int ) shard_row_id_bits=2 pre_split_regions=2;",
"CREATE TABLE `t1` (`id` INT) /*T! SHARD_ROW_ID_BITS = 2 */ /*T! PRE_SPLIT_REGIONS = 2 */",
},
{
"create table t1 (id int ) shard_row_id_bits=2 engine=innodb pre_split_regions=2;",
"CREATE TABLE `t1` (`id` INT) /*T! SHARD_ROW_ID_BITS = 2 */ ENGINE = innodb /*T! PRE_SPLIT_REGIONS = 2 */",
},
{
"create table t1 (id int ) pre_split_regions=2 shard_row_id_bits=2;",
"CREATE TABLE `t1` (`id` INT) /*T! PRE_SPLIT_REGIONS = 2 */ /*T! SHARD_ROW_ID_BITS = 2 */",
},
{
"create table t6 (id int ) shard_row_id_bits=2 shard_row_id_bits=3 pre_split_regions=2;",
"CREATE TABLE `t6` (`id` INT) /*T! SHARD_ROW_ID_BITS = 2 */ /*T! SHARD_ROW_ID_BITS = 3 */ /*T! PRE_SPLIT_REGIONS = 2 */",
},
{
"create table t1 (id int primary key auto_random(2));",
"CREATE TABLE `t1` (`id` INT PRIMARY KEY /*T![auto_rand] AUTO_RANDOM(2) */)",
},
{
"create table t1 (id int primary key auto_random);",
"CREATE TABLE `t1` (`id` INT PRIMARY KEY /*T![auto_rand] AUTO_RANDOM */)",
},
{
"create table t1 (id int auto_random ( 4 ) primary key);",
"CREATE TABLE `t1` (`id` INT /*T![auto_rand] AUTO_RANDOM(4) */ PRIMARY KEY)",
},
{
"create table t1 (id int auto_random ( 4 ) primary key);",
"CREATE TABLE `t1` (`id` INT /*T![auto_rand] AUTO_RANDOM(4) */ PRIMARY KEY)",
},
{
"create table t1 (id int auto_random ( 3 ) primary key) auto_random_base = 100;",
"CREATE TABLE `t1` (`id` INT /*T![auto_rand] AUTO_RANDOM(3) */ PRIMARY KEY) /*T![auto_rand_base] AUTO_RANDOM_BASE = 100 */",
},
{
"create table t1 (id int auto_random primary key) auto_random_base = 50;",
"CREATE TABLE `t1` (`id` INT /*T![auto_rand] AUTO_RANDOM */ PRIMARY KEY) /*T![auto_rand_base] AUTO_RANDOM_BASE = 50 */",
},
{
"create table t1 (id int auto_increment key) auto_id_cache 100;",
"CREATE TABLE `t1` (`id` INT AUTO_INCREMENT PRIMARY KEY) /*T![auto_id_cache] AUTO_ID_CACHE = 100 */",
},
{
"create table t1 (id int auto_increment unique) auto_id_cache 10;",
"CREATE TABLE `t1` (`id` INT AUTO_INCREMENT UNIQUE KEY) /*T![auto_id_cache] AUTO_ID_CACHE = 10 */",
},
{
"create table t1 (id int) auto_id_cache = 5;",
"CREATE TABLE `t1` (`id` INT) /*T![auto_id_cache] AUTO_ID_CACHE = 5 */",
},
{
"create table t1 (id int) auto_id_cache=5;",
"CREATE TABLE `t1` (`id` INT) /*T![auto_id_cache] AUTO_ID_CACHE = 5 */",
},
{
"create table t1 (id int) /*T![auto_id_cache] auto_id_cache=5 */ ;",
"CREATE TABLE `t1` (`id` INT) /*T![auto_id_cache] AUTO_ID_CACHE = 5 */",
},
{
"create table t1 (id int, a varchar(255), primary key (a, b) clustered);",
"CREATE TABLE `t1` (`id` INT,`a` VARCHAR(255),PRIMARY KEY(`a`, `b`) /*T![clustered_index] CLUSTERED */)",
},
{
"create table t1(id int, v int, primary key(a) clustered);",
"CREATE TABLE `t1` (`id` INT,`v` INT,PRIMARY KEY(`a`) /*T![clustered_index] CLUSTERED */)",
},
{
"create table t1(id int primary key clustered, v int);",
"CREATE TABLE `t1` (`id` INT PRIMARY KEY /*T![clustered_index] CLUSTERED */,`v` INT)",
},
{
"alter table t add primary key(a) clustered;",
"ALTER TABLE `t` ADD PRIMARY KEY(`a`) /*T![clustered_index] CLUSTERED */",
},
{
"create table t1 (id int, a varchar(255), primary key (a, b) nonclustered);",
"CREATE TABLE `t1` (`id` INT,`a` VARCHAR(255),PRIMARY KEY(`a`, `b`) /*T![clustered_index] NONCLUSTERED */)",
},
{
"create table t1 (id int, a varchar(255), primary key (a, b) /*T![clustered_index] nonclustered */);",
"CREATE TABLE `t1` (`id` INT,`a` VARCHAR(255),PRIMARY KEY(`a`, `b`) /*T![clustered_index] NONCLUSTERED */)",
},
{
"create table clustered_test(id int)",
"CREATE TABLE `clustered_test` (`id` INT)",
},
{
"create database clustered_test",
"CREATE DATABASE `clustered_test`",
},
{
"create database clustered",
"CREATE DATABASE `clustered`",
},
{
"create table clustered (id int)",
"CREATE TABLE `clustered` (`id` INT)",
},
{
"create table t1 (id int, a varchar(255) key clustered);",
"CREATE TABLE `t1` (`id` INT,`a` VARCHAR(255) PRIMARY KEY /*T![clustered_index] CLUSTERED */)",
},
{
"alter table t force auto_increment = 12;",
"ALTER TABLE `t` /*T![force_inc] FORCE */ AUTO_INCREMENT = 12",
},
{
"alter table t force, auto_increment = 12;",
"ALTER TABLE `t` FORCE /* AlterTableForce is not supported */ , AUTO_INCREMENT = 12",
},
{
"create table cdc_test (id varchar(10) primary key ,c1 varchar(10)) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin/*!90000 SHARD_ROW_ID_BITS=4 PRE_SPLIT_REGIONS=3 */",
"CREATE TABLE `cdc_test` (`id` VARCHAR(10) PRIMARY KEY,`c1` VARCHAR(10)) ENGINE = InnoDB DEFAULT CHARACTER SET = UTF8MB4 DEFAULT COLLATE = UTF8MB4_BIN /*T! SHARD_ROW_ID_BITS = 4 */ /*T! PRE_SPLIT_REGIONS = 3 */",
},
}
for _, ca := range testCase {
re, err := addSpecialComment(ca.input)
c.Check(err, check.IsNil)
c.Check(re, check.Equals, ca.result)
}
c.Assert(func() {
_, _ = addSpecialComment("alter table t force, auto_increment = 12;alter table t force, auto_increment = 12;")
}, check.Panics, "invalid ddlQuery statement size")
}
>>>>>>> 7ccaad224 (ticdc/owner: Fix ddl special comment syntax error (#3845))
Loading

0 comments on commit ac4fbc8

Please sign in to comment.