Skip to content

Commit

Permalink
Merge branch 'master' into rustin-patch-sink-useless
Browse files Browse the repository at this point in the history
  • Loading branch information
Rustin170506 authored Feb 16, 2022
2 parents e3e1c39 + 2f4b706 commit dded477
Show file tree
Hide file tree
Showing 52 changed files with 1,504 additions and 459 deletions.
9 changes: 0 additions & 9 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,15 +261,6 @@ func (m *mounterImpl) unmarshalRowKVEntry(tableInfo *model.TableInfo, rawKey []b
return nil, errors.Trace(err)
}

if base.Delete && !m.enableOldValue && (tableInfo.PKIsHandle || tableInfo.IsCommonHandle) {
handleColIDs, fieldTps, _ := tableInfo.GetRowColInfos()
preRow, err = tablecodec.DecodeHandleToDatumMap(recordID, handleColIDs, fieldTps, m.tz, nil)
if err != nil {
return nil, errors.Trace(err)
}
preRowExist = true
}

base.RecordID = recordID
return &rowKVEntry{
baseKVEntry: base,
Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/black_hole.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import (
)

// newBlackHoleSink creates a black hole sink
func newBlackHoleSink(ctx context.Context, opts map[string]string) *blackHoleSink {
func newBlackHoleSink(ctx context.Context) *blackHoleSink {
return &blackHoleSink{
statistics: NewStatistics(ctx, "blackhole", opts),
statistics: NewStatistics(ctx, "blackhole"),
}
}

Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/buffer_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestFlushTable(t *testing.T) {

ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
b := newBufferSink(newBlackHoleSink(ctx, make(map[string]string)), 5, make(chan drawbackMsg))
b := newBufferSink(newBlackHoleSink(ctx), 5, make(chan drawbackMsg))
go b.run(ctx, make(chan error))

require.Equal(t, uint64(5), b.getTableCheckpointTs(2))
Expand Down Expand Up @@ -82,7 +82,7 @@ func TestFlushFailed(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.TODO())
b := newBufferSink(newBlackHoleSink(ctx, make(map[string]string)), 5, make(chan drawbackMsg))
b := newBufferSink(newBlackHoleSink(ctx), 5, make(chan drawbackMsg))
go b.run(ctx, make(chan error))

checkpoint, err := b.FlushRowChangedEvents(ctx, 3, 8)
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func newMqSink(
resolvedNotifier: notifier,
resolvedReceiver: resolvedReceiver,

statistics: NewStatistics(ctx, "MQ", opts),
statistics: NewStatistics(ctx, "MQ"),

role: role,
id: changefeedID,
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func newMySQLSink(
filter: filter,
cyclic: sinkCyclic,
txnCache: common.NewUnresolvedTxnCache(),
statistics: NewStatistics(ctx, "mysql", opts),
statistics: NewStatistics(ctx, "mysql"),
metricConflictDetectDurationHis: metricConflictDetectDurationHis,
metricBucketSizeCounters: metricBucketSizeCounters,
errCh: make(chan error, 1),
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func newMySQLSink4Test(ctx context.Context, t *testing.T) *mysqlSink {
return &mysqlSink{
txnCache: common.NewUnresolvedTxnCache(),
filter: f,
statistics: NewStatistics(ctx, "test", make(map[string]string)),
statistics: NewStatistics(ctx, "test"),
params: params,
}
}
Expand Down
4 changes: 3 additions & 1 deletion cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,9 @@ func (k *kafkaSaramaProducer) Close() error {
zap.String("changefeed", k.id), zap.Any("role", k.role))
}

k.metricsMonitor.Cleanup()

// adminClient should be closed last, since `metricsMonitor` would use it when `Cleanup`.
start = time.Now()
if err := k.admin.Close(); err != nil {
log.Warn("close kafka cluster admin with error", zap.Error(err),
Expand All @@ -254,7 +257,6 @@ func (k *kafkaSaramaProducer) Close() error {
zap.String("changefeed", k.id), zap.Any("role", k.role))
}

k.metricsMonitor.Cleanup()
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func init() {
// register blackhole sink
sinkIniterMap["blackhole"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL,
filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error) {
return newBlackHoleSink(ctx, opts), nil
return newBlackHoleSink(ctx), nil
}

// register mysql sink
Expand Down
13 changes: 6 additions & 7 deletions cdc/sink/statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,12 @@ const (
)

// NewStatistics creates a statistics
func NewStatistics(ctx context.Context, name string, opts map[string]string) *Statistics {
statistics := &Statistics{name: name, lastPrintStatusTime: time.Now()}
if cid, ok := opts[OptChangefeedID]; ok {
statistics.changefeedID = cid
}
if cid, ok := opts[OptCaptureAddr]; ok {
statistics.captureAddr = cid
func NewStatistics(ctx context.Context, name string) *Statistics {
statistics := &Statistics{
name: name,
captureAddr: util.CaptureAddrFromCtx(ctx),
changefeedID: util.ChangefeedIDFromCtx(ctx),
lastPrintStatusTime: time.Now(),
}
statistics.metricExecTxnHis = execTxnHistogram.WithLabelValues(statistics.captureAddr, statistics.changefeedID)
statistics.metricExecDDLHis = execDDLHistogram.WithLabelValues(statistics.captureAddr, statistics.changefeedID)
Expand Down
1 change: 1 addition & 0 deletions dm/_utils/terror_gen/errors_release.txt
Original file line number Diff line number Diff line change
Expand Up @@ -479,6 +479,7 @@ ErrWorkerDDLLockOpNotFound,[code=40075:class=dm-worker:scope=internal:level=high
ErrWorkerTLSConfigNotValid,[code=40076:class=dm-worker:scope=internal:level=high], "Message: TLS config not valid, Workaround: Please check the `ssl-ca`, `ssl-cert` and `ssl-key` config in worker configuration file."
ErrWorkerFailConnectMaster,[code=40077:class=dm-worker:scope=internal:level=high], "Message: cannot join with master endpoints: %v, error: %v, Workaround: Please check network connection of worker and check worker name is unique."
ErrWorkerRelayConfigChanging,[code=40079:class=dm-worker:scope=internal:level=low], "Message: relay config of worker %s is changed too frequently, last relay source %s:, new relay source %s, Workaround: Please try again later"
ErrWorkerRouteTableDupMatch,[code=40080:class=dm-worker:scope=internal:level=high], "Message: table %s.%s matches more than one rule, Workaround: please check the route rules in the task config"
ErrTracerParseFlagSet,[code=42001:class=dm-tracer:scope=internal:level=medium], "Message: parse dm-tracer config flag set"
ErrTracerConfigTomlTransform,[code=42002:class=dm-tracer:scope=internal:level=medium], "Message: config toml transform, Workaround: Please check the configuration file has correct TOML format."
ErrTracerConfigInvalidFlag,[code=42003:class=dm-tracer:scope=internal:level=medium], "Message: '%s' is an invalid flag"
Expand Down
4 changes: 2 additions & 2 deletions dm/checker/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/tiflow/dm/pkg/dumpling"
fr "github.com/pingcap/tiflow/dm/pkg/func-rollback"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/router"
"github.com/pingcap/tiflow/dm/pkg/terror"
"github.com/pingcap/tiflow/dm/pkg/utils"
onlineddl "github.com/pingcap/tiflow/dm/syncer/online-ddl-tools"
Expand All @@ -43,7 +44,6 @@ import (
column "github.com/pingcap/tidb-tools/pkg/column-mapping"
"github.com/pingcap/tidb-tools/pkg/dbutil"
"github.com/pingcap/tidb-tools/pkg/filter"
router "github.com/pingcap/tidb-tools/pkg/table-router"
"github.com/pingcap/tidb/dumpling/export"
"github.com/pingcap/tidb/parser/mysql"
"go.uber.org/atomic"
Expand Down Expand Up @@ -137,7 +137,7 @@ func (c *Checker) Init(ctx context.Context) (err error) {
if err != nil {
return terror.ErrTaskCheckGenBAList.Delegate(err)
}
r, err := router.NewTableRouter(instance.cfg.CaseSensitive, instance.cfg.RouteRules)
r, err := router.NewRouter(instance.cfg.CaseSensitive, instance.cfg.RouteRules)
if err != nil {
return terror.ErrTaskCheckGenTableRouter.Delegate(err)
}
Expand Down
4 changes: 2 additions & 2 deletions dm/dm/config/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ import (
bf "github.com/pingcap/tidb-tools/pkg/binlog-filter"
"github.com/pingcap/tidb-tools/pkg/column-mapping"
"github.com/pingcap/tidb-tools/pkg/filter"
router "github.com/pingcap/tidb-tools/pkg/table-router"
"go.uber.org/zap"

"github.com/pingcap/tiflow/dm/pkg/dumpling"
"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/router"
"github.com/pingcap/tiflow/dm/pkg/terror"
"github.com/pingcap/tiflow/dm/pkg/utils"
)
Expand Down Expand Up @@ -426,7 +426,7 @@ func (c *SubTaskConfig) Adjust(verifyDecryptPassword bool) error {
if _, err := filter.New(c.CaseSensitive, c.BAList); err != nil {
return terror.ErrConfigGenBAList.Delegate(err)
}
if _, err := router.NewTableRouter(c.CaseSensitive, c.RouteRules); err != nil {
if _, err := router.NewRouter(c.CaseSensitive, c.RouteRules); err != nil {
return terror.ErrConfigGenTableRouter.Delegate(err)
}
// NewMapping will fill arguments with the default values.
Expand Down
2 changes: 2 additions & 0 deletions dm/dm/unit/unit.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type Unit interface {
// Close shuts down the process and closes the unit, after that can not call Process to resume
// The implementation should not block for a long time.
Close()
// Kill shuts down the process and closes the unit without graceful.
Kill()
// Pause does some cleanups and the unit can be resumed later. The caller will make sure Process has returned.
// The implementation should not block for a long time.
Pause()
Expand Down
2 changes: 1 addition & 1 deletion dm/dm/worker/hub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ import (
func (t *testServer) testConidtionHub(c *C, s *Server) {
// test condition hub
c.Assert(GetConditionHub(), NotNil)
c.Assert(GetConditionHub().w, DeepEquals, s.getWorker(true))
c.Assert(GetConditionHub().w, DeepEquals, s.getSourceWorker(true))
}
3 changes: 2 additions & 1 deletion dm/dm/worker/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ func (s *Server) KeepAlive() {
failpoint.Label("bypass")

// TODO: report the error.
err := s.stopWorker("", true)
// when lost keepalive, stop the worker without graceful. this is to fix https://github.com/pingcap/tiflow/issues/3737
err := s.stopSourceWorker("", true, false)
if err != nil {
log.L().Error("fail to stop worker", zap.Error(err))
return // return if failed to stop the worker.
Expand Down
Loading

0 comments on commit dded477

Please sign in to comment.