Skip to content

Commit

Permalink
Merge branch 'master' into warn-large-gap
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-srebot committed Mar 12, 2021
2 parents f45e824 + 4f0e07a commit a30673f
Show file tree
Hide file tree
Showing 12 changed files with 173 additions and 14 deletions.
10 changes: 9 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,15 @@ FAILPOINT := bin/failpoint-ctl
FAILPOINT_ENABLE := $$(echo $(FAILPOINT_DIR) | xargs $(FAILPOINT) enable >/dev/null)
FAILPOINT_DISABLE := $$(find $(FAILPOINT_DIR) | xargs $(FAILPOINT) disable >/dev/null)

RELEASE_VERSION ?= $(shell git describe --tags --dirty="-dev")
RELEASE_VERSION := v5.0.0-master
ifneq ($(shell git rev-parse --abbrev-ref HEAD | egrep '^release-[0-9]\.[0-9].*$$|^HEAD$$'),)
# If we are in release branch, use tag version.
RELEASE_VERSION := $(shell git describe --tags --dirty="-dirty")
else ifneq ($(shell git status --porcelain),)
# Add -dirty if the working tree is dirty for non release branch.
RELEASE_VERSION := $(RELEASE_VERSION)-dirty
endif

LDFLAGS += -X "$(CDC_PKG)/pkg/version.ReleaseVersion=$(RELEASE_VERSION)"
LDFLAGS += -X "$(CDC_PKG)/pkg/version.BuildTS=$(shell date -u '+%Y-%m-%d %H:%M:%S')"
LDFLAGS += -X "$(CDC_PKG)/pkg/version.GitHash=$(shell git rev-parse HEAD)"
Expand Down
3 changes: 3 additions & 0 deletions cdc/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"sync"
"time"

"github.com/pingcap/ticdc/pkg/version"

"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -124,6 +126,7 @@ func NewCapture(
info := &model.CaptureInfo{
ID: id,
AdvertiseAddr: advertiseAddr,
Version: version.ReleaseVersion,
}
processorManager := processor.NewManager(pdCli, credential, info)
log.Info("creating capture", zap.String("capture-id", id), util.ZapFieldCapture(ctx))
Expand Down
1 change: 1 addition & 0 deletions cdc/model/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
type CaptureInfo struct {
ID CaptureID `json:"id"`
AdvertiseAddr string `json:"address"`
Version string `json:"version"`
}

// Marshal using json.Marshal.
Expand Down
5 changes: 3 additions & 2 deletions cdc/model/capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,12 @@ func (s *captureSuite) TestMarshalUnmarshal(c *check.C) {
info := &CaptureInfo{
ID: "9ff52aca-aea6-4022-8ec4-fbee3f2c7890",
AdvertiseAddr: "127.0.0.1:8300",
Version: "dev",
}
expected := []byte(`{"id":"9ff52aca-aea6-4022-8ec4-fbee3f2c7890","address":"127.0.0.1:8300"}`)
expected := `{"id":"9ff52aca-aea6-4022-8ec4-fbee3f2c7890","address":"127.0.0.1:8300","version":"dev"}`
data, err := info.Marshal()
c.Assert(err, check.IsNil)
c.Assert(data, check.DeepEquals, expected)
c.Assert(string(data), check.Equals, expected)
decodedInfo := &CaptureInfo{}
err = decodedInfo.Unmarshal(data)
c.Assert(err, check.IsNil)
Expand Down
3 changes: 2 additions & 1 deletion cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
)

// SortEngine is the sorter engine
type SortEngine string
type SortEngine = string

// sort engines
const (
Expand Down Expand Up @@ -85,6 +85,7 @@ type ChangeFeedInfo struct {

SyncPointEnabled bool `json:"sync-point-enabled"`
SyncPointInterval time.Duration `json:"sync-point-interval"`
CreatorVersion string `json:"creator-version"`
}

var changeFeedIDRe *regexp.Regexp = regexp.MustCompile(`^[a-zA-Z0-9]+(\-[a-zA-Z0-9]+)*$`)
Expand Down
4 changes: 4 additions & 0 deletions cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -857,6 +857,10 @@ func (p *oldProcessor) addTable(ctx context.Context, tableID int64, replicaInfo
p.sendError(cerror.ErrUnknownSortEngine.GenWithStackByArgs(p.changefeed.Engine))
return nil
}
failpoint.Inject("ProcessorAddTableError", func() {
p.sendError(errors.New("processor add table injected error"))
failpoint.Return(nil)
})
go func() {
err := sorter.Run(ctx)
if errors.Cause(err) != context.Canceled {
Expand Down
4 changes: 4 additions & 0 deletions cdc/processor/pipeline/sorter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"os"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/cdc/puller"
psorter "github.com/pingcap/ticdc/cdc/puller/sorter"
Expand Down Expand Up @@ -73,6 +74,9 @@ func (n *sorterNode) Init(ctx pipeline.NodeContext) error {
default:
return cerror.ErrUnknownSortEngine.GenWithStackByArgs(n.sortEngine)
}
failpoint.Inject("ProcessorAddTableError", func() {
failpoint.Return(errors.New("processor add table injected error"))
})
n.wg.Go(func() error {
ctx.Throw(errors.Trace(sorter.Run(stdCtx)))
return nil
Expand Down
40 changes: 34 additions & 6 deletions cmd/client_changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"strings"
"time"

"github.com/pingcap/ticdc/pkg/version"

"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/log"
Expand Down Expand Up @@ -239,7 +241,7 @@ func newQueryChangefeedCommand() *cobra.Command {
return command
}

func verifyChangefeedParamers(ctx context.Context, cmd *cobra.Command, isCreate bool, credential *security.Credential) (*model.ChangeFeedInfo, error) {
func verifyChangefeedParamers(ctx context.Context, cmd *cobra.Command, isCreate bool, credential *security.Credential, captureInfos []*model.CaptureInfo) (*model.ChangeFeedInfo, error) {
if isCreate {
if sinkURI == "" {
return nil, errors.New("Creating chengfeed without a sink-uri")
Expand All @@ -261,8 +263,20 @@ func verifyChangefeedParamers(ctx context.Context, cmd *cobra.Command, isCreate
return nil, err
}
}

cdcClusterVer, err := version.GetTiCDCClusterVersion(captureInfos)
if err != nil {
return nil, errors.Trace(err)
}
cfg := config.GetDefaultReplicaConfig()

sortEngineFlag := cmd.Flag("sort-engine")
if cdcClusterVer == version.TiCDCClusterVersion4_0 {
cfg.EnableOldValue = false
if !sortEngineFlag.Changed {
sortEngine = model.SortInMemory
}
log.Warn("The TiCDC cluster is built from 4.0-release branch, the old-value and unified-sorter are disabled by default.")
}
if len(configFile) > 0 {
if err := strictDecodeFile(configFile, "cdc", cfg); err != nil {
return nil, err
Expand Down Expand Up @@ -320,18 +334,24 @@ func verifyChangefeedParamers(ctx context.Context, cmd *cobra.Command, isCreate
}
}
}
switch sortEngine {
case model.SortUnified, model.SortInMemory, model.SortInFile:
default:
return nil, errors.Errorf("Creating chengfeed with an invalid sort engine(%s), `%s`,`%s` and `%s` are optional.", sortEngine, model.SortUnified, model.SortInMemory, model.SortInFile)
}
info := &model.ChangeFeedInfo{
SinkURI: sinkURI,
Opts: make(map[string]string),
CreateTime: time.Now(),
StartTs: startTs,
TargetTs: targetTs,
Config: cfg,
Engine: model.SortEngine(sortEngine),
Engine: sortEngine,
SortDir: sortDir,
State: model.StateNormal,
SyncPointEnabled: syncPointEnabled,
SyncPointInterval: syncPointInterval,
CreatorVersion: version.ReleaseVersion,
}

if info.Engine != model.SortInMemory && (info.SortDir == ".") {
Expand Down Expand Up @@ -412,7 +432,7 @@ func changefeedConfigVariables(command *cobra.Command) {
command.PersistentFlags().StringVar(&sinkURI, "sink-uri", "", "sink uri")
command.PersistentFlags().StringVar(&configFile, "config", "", "Path of the configuration file")
command.PersistentFlags().StringSliceVar(&opts, "opts", nil, "Extra options, in the `key=value` format")
command.PersistentFlags().StringVar(&sortEngine, "sort-engine", "unified", "sort engine used for data sort")
command.PersistentFlags().StringVar(&sortEngine, "sort-engine", model.SortUnified, "sort engine used for data sort")
command.PersistentFlags().StringVar(&sortDir, "sort-dir", defaultSortDir, "directory used for data sort")
command.PersistentFlags().StringVar(&timezone, "tz", "SYSTEM", "timezone used when checking sink uri (changefeed timezone is determined by cdc server)")
command.PersistentFlags().Uint64Var(&cyclicReplicaID, "cyclic-replica-id", 0, "(Expremental) Cyclic replication replica ID of changefeed")
Expand All @@ -434,7 +454,11 @@ func newCreateChangefeedCommand() *cobra.Command {
id = uuid.New().String()
}

info, err := verifyChangefeedParamers(ctx, cmd, true /* isCreate */, getCredential())
_, captureInfos, err := cdcEtcdCli.GetCaptures(ctx)
if err != nil {
return err
}
info, err := verifyChangefeedParamers(ctx, cmd, true /* isCreate */, getCredential(), captureInfos)
if err != nil {
return err
}
Expand Down Expand Up @@ -475,7 +499,11 @@ func newUpdateChangefeedCommand() *cobra.Command {
return err
}

info, err := verifyChangefeedParamers(ctx, cmd, false /* isCreate */, getCredential())
_, captureInfos, err := cdcEtcdCli.GetCaptures(ctx)
if err != nil {
return err
}
info, err := verifyChangefeedParamers(ctx, cmd, false /* isCreate */, getCredential(), captureInfos)
if err != nil {
return err
}
Expand Down
12 changes: 10 additions & 2 deletions cmd/client_changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"path/filepath"

"github.com/pingcap/check"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/util/testleak"
"github.com/spf13/cobra"
)
Expand All @@ -32,6 +33,7 @@ func (s *clientChangefeedSuite) TestVerifyChangefeedParams(c *check.C) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cmd := &cobra.Command{}
changefeedConfigVariables(cmd)

dir := c.MkDir()
path := filepath.Join(dir, "config.toml")
Expand All @@ -42,12 +44,18 @@ enable-old-value = false
c.Assert(err, check.IsNil)

sinkURI = "blackhole:///?protocol=maxwell"
info, err := verifyChangefeedParamers(ctx, cmd, false /* isCreate */, nil)
info, err := verifyChangefeedParamers(ctx, cmd, false /* isCreate */, nil, nil)
c.Assert(err, check.IsNil)
c.Assert(info.Config.EnableOldValue, check.IsTrue)
c.Assert(info.SortDir, check.Equals, defaultSortDir)

sinkURI = ""
_, err = verifyChangefeedParamers(ctx, cmd, true /* isCreate */, nil)
_, err = verifyChangefeedParamers(ctx, cmd, true /* isCreate */, nil, nil)
c.Assert(err, check.NotNil)

sinkURI = "blackhole:///"
info, err = verifyChangefeedParamers(ctx, cmd, false /* isCreate */, nil, []*model.CaptureInfo{{Version: "4.0.0"}})
c.Assert(err, check.IsNil)
c.Assert(info.Config.EnableOldValue, check.IsFalse)
c.Assert(info.Engine, check.Equals, model.SortInMemory)
}
39 changes: 39 additions & 0 deletions pkg/version/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"regexp"
"strings"

"github.com/pingcap/ticdc/cdc/model"

"github.com/coreos/go-semver/semver"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
Expand Down Expand Up @@ -135,3 +137,40 @@ func CheckStoreVersion(ctx context.Context, client pd.Client, storeID uint64) er
}
return nil
}

// TiCDCClusterVersion is the version of TiCDC cluster
type TiCDCClusterVersion string

// ticdc cluster version
const (
TiCDCClusterVersionUnknown TiCDCClusterVersion = "Unknown"
TiCDCClusterVersion4_0 TiCDCClusterVersion = "4.0.X"
TiCDCClusterVersion5_0 TiCDCClusterVersion = "5.0.X"
)

// GetTiCDCClusterVersion returns the version of ticdc cluster
func GetTiCDCClusterVersion(captureInfos []*model.CaptureInfo) (TiCDCClusterVersion, error) {
if len(captureInfos) == 0 {
return TiCDCClusterVersionUnknown, nil
}
var minVer *semver.Version
for _, captureInfo := range captureInfos {
var ver *semver.Version
var err error
if captureInfo.Version != "" {
ver, err = semver.NewVersion(removeVAndHash(captureInfo.Version))
} else {
ver = semver.New("4.0.1")
}
if err != nil {
return TiCDCClusterVersionUnknown, cerror.WrapError(cerror.ErrNewSemVersion, err)
}
if minVer == nil || ver.Compare(*minVer) < 0 {
minVer = ver
}
}
if minVer.Major < 5 {
return TiCDCClusterVersion4_0, nil
}
return TiCDCClusterVersion5_0, nil
}
59 changes: 59 additions & 0 deletions pkg/version/check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/coreos/go-semver/semver"
"github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/util/testleak"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/pkg/tempurl"
Expand Down Expand Up @@ -153,3 +154,61 @@ func (s *checkSuite) TestReleaseSemver(c *check.C) {
c.Assert(ReleaseSemver(), check.Equals, cs.releaseSemver, check.Commentf("%v", cs))
}
}

func (s *checkSuite) TestGetTiCDCClusterVersion(c *check.C) {
defer testleak.AfterTest(c)()
testCases := []struct {
captureInfos []*model.CaptureInfo
expected TiCDCClusterVersion
}{
{
captureInfos: []*model.CaptureInfo{},
expected: TiCDCClusterVersionUnknown,
},
{
captureInfos: []*model.CaptureInfo{
{ID: "capture1", Version: ""},
{ID: "capture2", Version: ""},
{ID: "capture3", Version: ""},
},
expected: TiCDCClusterVersion4_0,
},
{
captureInfos: []*model.CaptureInfo{
{ID: "capture1", Version: "5.0.1"},
{ID: "capture2", Version: "4.0.7"},
{ID: "capture3", Version: "5.0.0-rc"},
},
expected: TiCDCClusterVersion4_0,
},
{
captureInfos: []*model.CaptureInfo{
{ID: "capture1", Version: "5.0.0-rc"},
},
expected: TiCDCClusterVersion5_0,
},
{
captureInfos: []*model.CaptureInfo{
{ID: "capture1", Version: "5.0.0"},
},
expected: TiCDCClusterVersion5_0,
},
{
captureInfos: []*model.CaptureInfo{
{ID: "capture1", Version: "4.1.0"},
},
expected: TiCDCClusterVersion4_0,
},
{
captureInfos: []*model.CaptureInfo{
{ID: "capture1", Version: "4.0.10"},
},
expected: TiCDCClusterVersion4_0,
},
}
for _, tc := range testCases {
ver, err := GetTiCDCClusterVersion(tc.captureInfos)
c.Assert(err, check.IsNil)
c.Assert(ver, check.Equals, tc.expected)
}
}
7 changes: 5 additions & 2 deletions tests/processor_err_chan/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,14 @@ function run() {
run_sql "CREATE table processor_err_chan.t$i (id int primary key auto_increment)" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
done

# export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/ProcessorAddTableError=1*return(true)' # old processor
export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/processor/pipeline/ProcessorAddTableError=1*return(true)' # new processor
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr
changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" --sort-engine=abc-engine 2>&1|tail -n2|head -n1|awk '{print $2}')

changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1|tail -n2|head -n1|awk '{print $2}')

retry_time=10
ensure $retry_time check_changefeed_mark_stopped $pd_addr $changefeed_id "[CDC:ErrUnknownSortEngine]unknown sort engine abc-engine"
ensure $retry_time check_changefeed_mark_stopped $pd_addr $changefeed_id "processor add table injected error"

cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI"
for i in $(seq 1 10); do
Expand Down

0 comments on commit a30673f

Please sign in to comment.