Skip to content

Commit

Permalink
Merge branch 'release-5.0' into cherry-pick-2249-to-release-5.0
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jul 19, 2021
2 parents 3725987 + 03f9f2f commit 38f1bb8
Show file tree
Hide file tree
Showing 31 changed files with 221 additions and 620 deletions.
3 changes: 3 additions & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
linters:
enable:
- unconvert
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,9 @@ tools/bin/errdoc-gen: tools/check/go.mod
cd tools/check; test -e ../bin/errdoc-gen || \
$(GO) build -o ../bin/errdoc-gen github.com/pingcap/errors/errdoc-gen

tools/bin/golangci-lint: tools/check/go.mod
tools/bin/golangci-lint:
cd tools/check; test -e ../bin/golangci-lint || \
$(GO) build -o ../bin/golangci-lint github.com/golangci/golangci-lint/cmd/golangci-lint
curl -sfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh| sh -s -- -b ../bin v1.30.0

failpoint-enable: check_failpoint_ctl
$(FAILPOINT_ENABLE)
Expand Down
21 changes: 14 additions & 7 deletions cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,9 @@ func (w *regionWorker) handleSingleRegionError(ctx context.Context, err error, s
}

func (w *regionWorker) resolveLock(ctx context.Context) error {
// tikv resolved update interval is 1s, use half of the resolck lock interval
// as lock penalty.
resolveLockPenalty := 10
resolveLockInterval := 20 * time.Second
failpoint.Inject("kvClientResolveLockInterval", func(val failpoint.Value) {
resolveLockInterval = time.Duration(val.(int)) * time.Second
Expand Down Expand Up @@ -300,6 +303,17 @@ func (w *regionWorker) resolveLock(ctx context.Context) error {
zap.Duration("duration", sinceLastResolvedTs), zap.Duration("since last event", sinceLastResolvedTs))
return errReconnect
}
// Only resolve lock if the resovled-ts keeps unchanged for
// more than resolveLockPenalty times.
if rts.ts.penalty < resolveLockPenalty {
if lastResolvedTs > rts.ts.resolvedTs {
rts.ts.resolvedTs = lastResolvedTs
rts.ts.eventTime = time.Now()
rts.ts.penalty = 0
}
w.rtsManager.Upsert(rts)
continue
}
log.Warn("region not receiving resolved event from tikv or resolved ts is not pushing for too long time, try to resolve lock",
zap.Uint64("regionID", rts.regionID),
zap.Stringer("span", state.getRegionSpan()),
Expand Down Expand Up @@ -582,13 +596,6 @@ func (w *regionWorker) handleEventEntry(
}
w.metrics.metricPullEventInitializedCounter.Inc()

select {
case w.rtsUpdateCh <- &regionTsInfo{regionID: regionID, ts: newResolvedTsItem(state.sri.ts)}:
default:
// rtsUpdateCh block often means too many regions are suffering
// lock resolve, the kv client status is not very healthy.
log.Warn("region is not upsert into rts manager", zap.Uint64("region-id", regionID))
}
state.initialized = true
w.session.regionRouter.Release(state.sri.rpcCtx.Addr)
cachedEvents := state.matcher.matchCachedRow()
Expand Down
8 changes: 7 additions & 1 deletion cdc/kv/resolvedts_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type tsItem struct {
sortByEvTime bool
resolvedTs uint64
eventTime time.Time
penalty int
}

func newResolvedTsItem(ts uint64) tsItem {
Expand Down Expand Up @@ -92,9 +93,14 @@ func (rm *regionTsManager) Upsert(item *regionTsInfo) {
if old, ok := rm.m[item.regionID]; ok {
// in a single resolved ts manager, the resolved ts of a region should not be fallen back
if !item.ts.sortByEvTime {
if item.ts.resolvedTs > old.ts.resolvedTs || item.ts.eventTime.After(old.ts.eventTime) {
if item.ts.resolvedTs == old.ts.resolvedTs && item.ts.eventTime.After(old.ts.eventTime) {
old.ts.penalty++
old.ts.eventTime = item.ts.eventTime
heap.Fix(&rm.h, old.index)
} else if item.ts.resolvedTs > old.ts.resolvedTs {
old.ts.resolvedTs = item.ts.resolvedTs
old.ts.eventTime = item.ts.eventTime
old.ts.penalty = 0
heap.Fix(&rm.h, old.index)
}
} else {
Expand Down
29 changes: 29 additions & 0 deletions cdc/kv/resolvedts_heap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,35 @@ func (s *rtsHeapSuite) TestRegionTsManagerResolvedTs(c *check.C) {
c.Assert(rts, check.IsNil)
}

func (s *rtsHeapSuite) TestRegionTsManagerPenalty(c *check.C) {
defer testleak.AfterTest(c)()
mgr := newRegionTsManager()
initRegions := []*regionTsInfo{
{regionID: 100, ts: newResolvedTsItem(1000)},
}
for _, rts := range initRegions {
mgr.Upsert(rts)
}
c.Assert(mgr.Len(), check.Equals, 1)

// test penalty increases if resolved ts keeps unchanged
for i := 0; i < 6; i++ {
rts := &regionTsInfo{regionID: 100, ts: newResolvedTsItem(1000)}
mgr.Upsert(rts)
}
rts := mgr.Pop()
c.Assert(rts.ts.resolvedTs, check.Equals, uint64(1000))
c.Assert(rts.ts.penalty, check.Equals, 6)

// test penalty is cleared to zero if resolved ts is advanced
mgr.Upsert(rts)
rtsNew := &regionTsInfo{regionID: 100, ts: newResolvedTsItem(2000)}
mgr.Upsert(rtsNew)
rts = mgr.Pop()
c.Assert(rts.ts.penalty, check.DeepEquals, 0)
c.Assert(rts.ts.resolvedTs, check.DeepEquals, uint64(2000))
}

func (s *rtsHeapSuite) TestRegionTsManagerEvTime(c *check.C) {
defer testleak.AfterTest(c)()
mgr := newRegionTsManager()
Expand Down
2 changes: 1 addition & 1 deletion cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (info *ChangeFeedInfo) Unmarshal(data []byte) error {
return errors.Annotatef(
cerror.WrapError(cerror.ErrMarshalFailed, err), "Marshal data: %v", data)
}
info.Opts[mark.OptCyclicConfig] = string(cyclicCfg)
info.Opts[mark.OptCyclicConfig] = cyclicCfg
}
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,19 +314,19 @@ func ColumnValueString(c interface{}) string {
case int32:
data = strconv.FormatInt(int64(v), 10)
case int64:
data = strconv.FormatInt(int64(v), 10)
data = strconv.FormatInt(v, 10)
case uint8:
data = strconv.FormatUint(uint64(v), 10)
case uint16:
data = strconv.FormatUint(uint64(v), 10)
case uint32:
data = strconv.FormatUint(uint64(v), 10)
case uint64:
data = strconv.FormatUint(uint64(v), 10)
data = strconv.FormatUint(v, 10)
case float32:
data = strconv.FormatFloat(float64(v), 'f', -1, 32)
case float64:
data = strconv.FormatFloat(float64(v), 'f', -1, 64)
data = strconv.FormatFloat(v, 'f', -1, 64)
case string:
data = v
case []byte:
Expand Down
4 changes: 3 additions & 1 deletion cdc/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ const (
// CDCServiceSafePointID is the ID of CDC service in pd.UpdateServiceGCSafePoint.
CDCServiceSafePointID = "ticdc"
// GCSafepointUpdateInterval is the minimual interval that CDC can update gc safepoint
GCSafepointUpdateInterval = time.Duration(2 * time.Second)
GCSafepointUpdateInterval = 2 * time.Second
// MinGCSafePointCacheUpdateInterval is the interval that update minGCSafePointCache
MinGCSafePointCacheUpdateInterval = time.Second * 2
)
Expand Down Expand Up @@ -977,6 +977,8 @@ func (o *Owner) dispatchJob(ctx context.Context, job model.AdminJob) error {
ownerMaintainTableNumGauge.DeleteLabelValues(cf.id, capture.AdvertiseAddr, maintainTableTypeWip)
}
delete(o.changeFeeds, job.CfID)
changefeedCheckpointTsGauge.DeleteLabelValues(cf.id)
changefeedCheckpointTsLagGauge.DeleteLabelValues(cf.id)
return nil
}

Expand Down
2 changes: 2 additions & 0 deletions cdc/owner/async_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,11 @@ func (s *asyncSinkImpl) EmitCheckpointTs(ctx cdcContext.Context, ts uint64) {
func (s *asyncSinkImpl) EmitDDLEvent(ctx cdcContext.Context, ddl *model.DDLEvent) (bool, error) {
ddlFinishedTs := atomic.LoadUint64(&s.ddlFinishedTs)
if ddl.CommitTs <= ddlFinishedTs {
// the DDL event is executed successfully, and done is true
return true, nil
}
if ddl.CommitTs <= s.ddlSentTs {
// the DDL event is executing and not finished yes, return false
return false, nil
}
select {
Expand Down
16 changes: 9 additions & 7 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,9 @@ LOOP:
failpoint.Inject("NewChangefeedNoRetryError", func() {
failpoint.Return(cerror.ErrStartTsBeforeGC.GenWithStackByArgs(checkpointTs-300, checkpointTs))
})

failpoint.Inject("NewChangefeedRetryError", func() {
failpoint.Return(errors.New("failpoint injected retriable error"))
})

if c.state.Info.Config.CheckGCSafePoint {
err := util.CheckSafetyOfStartTs(ctx, ctx.GlobalVars().PDClient, c.state.ID, checkpointTs)
if err != nil {
Expand All @@ -214,7 +212,10 @@ LOOP:
if err != nil {
return errors.Trace(err)
}
c.ddlPuller, err = c.newDDLPuller(cancelCtx, checkpointTs)
// Since we wait for checkpoint == ddlJob.FinishTs before executing the DDL,
// when there is a recovery, there is no guarantee that the DDL at the checkpoint
// has been executed. So we need to start the DDL puller from (checkpoint-1).
c.ddlPuller, err = c.newDDLPuller(cancelCtx, checkpointTs-1)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -262,10 +263,8 @@ func (c *changefeed) preflightCheck(captures map[model.CaptureID]*model.CaptureI
if status == nil {
status = &model.ChangeFeedStatus{
// the changefeed status is nil when the changefeed is just created.
// the txn in start ts is not replicated at that time,
// so the checkpoint ts and resolved ts should less than start ts.
ResolvedTs: c.state.Info.StartTs - 1,
CheckpointTs: c.state.Info.StartTs - 1,
ResolvedTs: c.state.Info.StartTs,
CheckpointTs: c.state.Info.StartTs,
AdminJobType: model.AdminNone,
}
return status, true, nil
Expand Down Expand Up @@ -321,6 +320,9 @@ func (c *changefeed) handleBarrier(ctx cdcContext.Context) (uint64, error) {
case ddlJobBarrier:
ddlResolvedTs, ddlJob := c.ddlPuller.FrontDDL()
if ddlJob == nil || ddlResolvedTs != barrierTs {
if ddlResolvedTs < barrierTs {
return barrierTs, nil
}
c.barriers.Update(ddlJobBarrier, ddlResolvedTs)
return barrierTs, nil
}
Expand Down
4 changes: 2 additions & 2 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (s *changefeedSuite) TestInitialize(c *check.C) {
// initialize
cf.Tick(ctx, state, captures)
tester.MustApplyPatches()
c.Assert(state.Status.CheckpointTs, check.Equals, ctx.ChangefeedVars().Info.StartTs-1)
c.Assert(state.Status.CheckpointTs, check.Equals, ctx.ChangefeedVars().Info.StartTs)
}

func (s *changefeedSuite) TestHandleError(c *check.C) {
Expand All @@ -186,7 +186,7 @@ func (s *changefeedSuite) TestHandleError(c *check.C) {
// handle error
cf.Tick(ctx, state, captures)
tester.MustApplyPatches()
c.Assert(state.Status.CheckpointTs, check.Equals, ctx.ChangefeedVars().Info.StartTs-1)
c.Assert(state.Status.CheckpointTs, check.Equals, ctx.ChangefeedVars().Info.StartTs)
c.Assert(state.Info.Error.Message, check.Equals, "fake error")
}

Expand Down
8 changes: 4 additions & 4 deletions cdc/owner/feed_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
"go.uber.org/zap"
)

// feedStateManager manages the feedState of a changefeed
// when the error, admin job happened, the feedStateManager is responsible for controlling the feedState
// feedStateManager manages the ReactorState of a changefeed
// when a error or a admin job occurs, the feedStateManager is responsible for controlling the ReactorState
type feedStateManager struct {
state *model.ChangefeedReactorState
shouldBeRunning bool
Expand Down Expand Up @@ -112,15 +112,15 @@ func (m *feedStateManager) handleAdminJob() (jobsPending bool) {
m.shouldBeRunning = false
jobsPending = true
m.patchState(model.StateRemoved)
// remove changefeed info and state
// remove changefeed info and status
m.state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
return nil, true, nil
})
m.state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
return nil, true, nil
})
checkpointTs := m.state.Info.GetCheckpointTs(m.state.Status)
log.Info("the changefeed removed", zap.String("changefeed-id", m.state.ID), zap.Uint64("checkpoint-ts", checkpointTs))
log.Info("the changefeed is removed", zap.String("changefeed-id", m.state.ID), zap.Uint64("checkpoint-ts", checkpointTs))

case model.AdminResume:
switch m.state.Info.State {
Expand Down
3 changes: 3 additions & 0 deletions cdc/owner/gc_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ func (m *gcManager) updateGCSafePoint(ctx cdcContext.Context, state *model.Globa
failpoint.Inject("InjectActualGCSafePoint", func(val failpoint.Value) {
actual = uint64(val.(int))
})
if actual == minCheckpointTs {
log.Info("update gc safe point success", zap.Uint64("gcSafePointTs", minCheckpointTs))
}
if actual > minCheckpointTs {
log.Warn("update gc safe point failed, the gc safe point is larger than checkpointTs", zap.Uint64("actual", actual), zap.Uint64("checkpointTs", minCheckpointTs))
}
Expand Down
5 changes: 4 additions & 1 deletion cdc/owner/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ func newSchemaWrap4Owner(kvStorage tidbkv.Storage, startTs model.Ts, config *con
return nil, errors.Trace(err)
}
}
// We do a snapshot read of the metadata from TiKV at (startTs-1) instead of startTs,
// because the DDL puller might send a DDL at startTs, which would cause schema conflicts if
// the DDL's result is already contained in the snapshot.
schemaSnap, err := entry.NewSingleSchemaSnapshotFromMeta(meta, startTs-1, config.ForceReplicate)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -58,7 +61,7 @@ func newSchemaWrap4Owner(kvStorage tidbkv.Storage, startTs model.Ts, config *con
schemaSnapshot: schemaSnap,
filter: f,
config: config,
ddlHandledTs: startTs - 1,
ddlHandledTs: startTs,
}, nil
}

Expand Down
7 changes: 6 additions & 1 deletion cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1335,7 +1335,12 @@ func (p *oldProcessor) stop(ctx context.Context) error {
log.Warn("an error occurred when stopping the processor", zap.Error(err))
errRes = err
}
syncTableNumGauge.WithLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr).Set(0)
resolvedTsGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
resolvedTsLagGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
checkpointTsGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
checkpointTsLagGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
syncTableNumGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
processorErrorCounter.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
return errRes
}

Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error {
if err != nil {
return errors.Trace(err)
}
opts[mark.OptCyclicConfig] = string(cyclicCfg)
opts[mark.OptCyclicConfig] = cyclicCfg
}
opts[sink.OptChangefeedID] = p.changefeed.ID
opts[sink.OptCaptureAddr] = ctx.GlobalVars().CaptureInfo.AdvertiseAddr
Expand Down
2 changes: 1 addition & 1 deletion cdc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func (s *Server) etcdHealthChecker(ctx context.Context) error {
case <-ticker.C:
for _, pdEndpoint := range s.pdEndpoints {
start := time.Now()
ctx, cancel := context.WithTimeout(ctx, time.Duration(time.Second*10))
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
req, err := http.NewRequestWithContext(
ctx, http.MethodGet, fmt.Sprintf("%s/health", pdEndpoint), nil)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion cmd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,8 @@ func loop() {
command := newCliCommand()
command.SetArgs(args)
_ = command.ParseFlags(args)
command.SetOutput(os.Stdout)
command.SetOut(os.Stdout)
command.SetErr(os.Stdout)
if err = command.Execute(); err != nil {
command.Println(err)
}
Expand Down
3 changes: 2 additions & 1 deletion cmd/client_tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func newQueryTsoCommand() *cobra.Command {
return nil
},
}
command.SetOutput(os.Stdout)
command.SetOut(os.Stdout)
command.SetErr(os.Stdout)
return command
}
2 changes: 1 addition & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var rootCmd = &cobra.Command{

// Execute runs the root command
func Execute() {
// Ouputs cmd.Print to stdout.
// Outputs cmd.Print to stdout.
rootCmd.SetOut(os.Stdout)
if err := rootCmd.Execute(); err != nil {
rootCmd.Println(err)
Expand Down
7 changes: 5 additions & 2 deletions cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,11 @@ func runEServer(cmd *cobra.Command, args []string) error {
}

cancel := initCmd(cmd, &logutil.Config{
File: conf.LogFile,
Level: conf.LogLevel,
File: conf.LogFile,
Level: conf.LogLevel,
FileMaxSize: conf.Log.File.MaxSize,
FileMaxDays: conf.Log.File.MaxDays,
FileMaxBackups: conf.Log.File.MaxBackups,
})
defer cancel()
tz, err := util.GetTimezone(conf.TZ)
Expand Down
Loading

0 comments on commit 38f1bb8

Please sign in to comment.