Skip to content

Commit

Permalink
Merge branch 'master' into filterBinlog
Browse files Browse the repository at this point in the history
  • Loading branch information
GMHDBJD committed Nov 17, 2022
2 parents eccd29a + ebe15d2 commit d060d48
Show file tree
Hide file tree
Showing 180 changed files with 3,424 additions and 8,425 deletions.
17 changes: 9 additions & 8 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ FAIL_ON_STDOUT := awk '{ print } END { if (NR > 0) { exit 1 } }'

CURDIR := $(shell pwd)
path_to_add := $(addsuffix /bin,$(subst :,/bin:,$(GOPATH)))
export PATH := $(CURDIR)/bin:$(path_to_add):$(PATH)
export PATH := $(CURDIR)/bin:$(CURDIR)/tools/bin:$(path_to_add):$(PATH)

SHELL := /usr/bin/env bash

Expand Down Expand Up @@ -77,7 +77,7 @@ MAKE_FILES = $(shell find . \( -name 'Makefile' -o -name '*.mk' \) -print)

RELEASE_VERSION =
ifeq ($(RELEASE_VERSION),)
RELEASE_VERSION := v6.4.0-master
RELEASE_VERSION := v6.5.0-master
release_version_regex := ^v[0-9]\..*$$
release_branch_regex := "^release-[0-9]\.[0-9].*$$|^HEAD$$|^.*/*tags/v[0-9]\.[0-9]\..*$$"
ifneq ($(shell git rev-parse --abbrev-ref HEAD | grep -E $(release_branch_regex)),)
Expand Down Expand Up @@ -147,7 +147,7 @@ storage_consumer:
install:
go install ./...

unit_test: check_failpoint_ctl generate_mock generate-msgp-code generate-protobuf
unit_test: check_failpoint_ctl generate_mock go-generate generate-protobuf
mkdir -p "$(TEST_DIR)"
$(FAILPOINT_ENABLE)
@export log_level=error;\
Expand Down Expand Up @@ -233,7 +233,7 @@ clean_integration_test_containers: ## Clean MySQL and Kafka integration test con
integration_test_storage: check_third_party_binary
tests/integration_tests/run.sh storage "$(CASE)" "$(START_AT)"

fmt: tools/bin/gofumports tools/bin/shfmt tools/bin/gci generate_mock generate-msgp-code
fmt: tools/bin/gofumports tools/bin/shfmt tools/bin/gci generate_mock go-generate
@echo "run gci (format imports)"
tools/bin/gci write $(FILES) 2>&1 | $(FAIL_ON_STDOUT)
@echo "run gofumports"
Expand Down Expand Up @@ -272,9 +272,10 @@ ifneq ($(shell echo $(RELEASE_VERSION) | grep master),)
@./scripts/check-diff-line-width.sh
endif

generate-msgp-code: tools/bin/msgp
@echo "generate-msgp-code"
./scripts/generate-msgp-code.sh
go-generate: ## Run go generate on all packages.
go-generate: tools/bin/msgp tools/bin/stringer tools/bin/mockery
@echo "go generate"
@go generate ./...

generate-protobuf: ## Generate code from protobuf files.
generate-protobuf: tools/bin/protoc tools/bin/protoc-gen-gogofaster \
Expand All @@ -300,7 +301,7 @@ check-static: tools/bin/golangci-lint
check: check-copyright fmt check-static tidy terror_check errdoc \
check-merge-conflicts check-ticdc-dashboard check-diff-line-width \
swagger-spec check-makefiles check_engine_integration_test
@git --no-pager diff --exit-code || echo "Please add changed files!"
@git --no-pager diff --exit-code || (echo "Please add changed files!" && false)

integration_test_coverage: tools/bin/gocovmerge tools/bin/goveralls
tools/bin/gocovmerge "$(TEST_DIR)"/cov.* | grep -vE ".*.pb.go|$(CDC_PKG)/testing_utils/.*|$(CDC_PKG)/cdc/entry/schema_test_helper.go|$(CDC_PKG)/cdc/sink/simple_mysql_tester.go|.*.__failpoint_binding__.go" > "$(TEST_DIR)/all_cov.out"
Expand Down
14 changes: 1 addition & 13 deletions cdc/api/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,22 +679,10 @@ func (h *OpenAPI) GetProcessor(c *gin.Context) {
}
status, captureExist := statuses[captureID]

positions, err := h.statusProvider().GetTaskPositions(ctx, changefeedID)
if err != nil {
_ = c.Error(err)
return
}
position, positionsExist := positions[captureID]
// Note: for the case that no tables are attached to a newly created changefeed,
// we just do not report an error.
var processorDetail model.ProcessorDetail
if captureExist && positionsExist {
processorDetail = model.ProcessorDetail{
CheckPointTs: position.CheckPointTs,
ResolvedTs: position.ResolvedTs,
Count: position.Count,
Error: position.Error,
}
if captureExist {
tables := make([]int64, 0)
for tableID := range status.Tables {
tables = append(tables, tableID)
Expand Down
16 changes: 1 addition & 15 deletions cdc/api/v1/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,6 @@ func (p *mockStatusProvider) GetAllTaskStatuses(ctx context.Context, changefeedI
return args.Get(0).(map[model.CaptureID]*model.TaskStatus), args.Error(1)
}

func (p *mockStatusProvider) GetTaskPositions(ctx context.Context, changefeedID model.ChangeFeedID) (map[model.CaptureID]*model.TaskPosition, error) {
args := p.Called(ctx)
return args.Get(0).(map[model.CaptureID]*model.TaskPosition), args.Error(1)
}

func (p *mockStatusProvider) GetProcessors(ctx context.Context) ([]*model.ProcInfoSnap, error) {
args := p.Called(ctx)
return args.Get(0).([]*model.ProcInfoSnap), args.Error(1)
Expand Down Expand Up @@ -117,11 +112,6 @@ func newStatusProvider() *mockStatusProvider {
statusProvider.On("GetAllTaskStatuses", mock.Anything).
Return(map[model.CaptureID]*model.TaskStatus{captureID: {}}, nil)

statusProvider.On("GetTaskPositions", mock.Anything).
Return(map[model.CaptureID]*model.TaskPosition{
captureID: {Error: &model.RunningError{Message: "test"}},
}, nil)

statusProvider.On("GetAllChangeFeedStatuses", mock.Anything).
Return(map[model.ChangeFeedID]*model.ChangeFeedStatus{
model.ChangeFeedID4Test("ab", "123"): {CheckpointTs: 1},
Expand Down Expand Up @@ -716,10 +706,6 @@ func TestGetProcessor(t *testing.T) {
req, _ := http.NewRequestWithContext(context.Background(), api.method, api.url, nil)
router.ServeHTTP(w, req)
require.Equal(t, 200, w.Code)
processorDetail := &model.ProcessorDetail{}
err := json.NewDecoder(w.Body).Decode(processorDetail)
require.Nil(t, err)
require.Equal(t, "test", processorDetail.Error.Message)

// test get processor fail due to capture ID error
api = testCase{
Expand All @@ -731,7 +717,7 @@ func TestGetProcessor(t *testing.T) {
router.ServeHTTP(w, req)
require.Equal(t, 400, w.Code)
httpError := &model.HTTPError{}
err = json.NewDecoder(w.Body).Decode(httpError)
err := json.NewDecoder(w.Body).Decode(httpError)
require.Nil(t, err)
require.Contains(t, httpError.Error, "capture not exists, non-exist-capture")
}
Expand Down
57 changes: 30 additions & 27 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,21 +182,22 @@ func (c *ReplicaConfig) ToInternalReplicaConfig() *config.ReplicaConfig {
csvConfig = &config.CSVConfig{
Delimiter: c.Sink.CSVConfig.Delimiter,
Quote: c.Sink.CSVConfig.Quote,
Terminator: c.Sink.CSVConfig.Terminator,
NullString: c.Sink.CSVConfig.NullString,
DateSeparator: c.Sink.CSVConfig.DateSeparator,
IncludeCommitTs: c.Sink.CSVConfig.IncludeCommitTs,
}
}

res.Sink = &config.SinkConfig{
DispatchRules: dispatchRules,
Protocol: c.Sink.Protocol,
CSVConfig: csvConfig,
TxnAtomicity: config.AtomicityLevel(c.Sink.TxnAtomicity),
ColumnSelectors: columnSelectors,
SchemaRegistry: c.Sink.SchemaRegistry,
EncoderConcurrency: c.Sink.EncoderConcurrency,
DispatchRules: dispatchRules,
Protocol: c.Sink.Protocol,
CSVConfig: csvConfig,
TxnAtomicity: config.AtomicityLevel(c.Sink.TxnAtomicity),
ColumnSelectors: columnSelectors,
SchemaRegistry: c.Sink.SchemaRegistry,
EncoderConcurrency: c.Sink.EncoderConcurrency,
Terminator: c.Sink.Terminator,
DateSeparator: c.Sink.DateSeparator,
EnablePartitionSeparator: c.Sink.EnablePartitionSeparator,
}
}
return res
Expand Down Expand Up @@ -278,21 +279,22 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
csvConfig = &CSVConfig{
Delimiter: cloned.Sink.CSVConfig.Delimiter,
Quote: cloned.Sink.CSVConfig.Quote,
Terminator: cloned.Sink.CSVConfig.Terminator,
NullString: cloned.Sink.CSVConfig.NullString,
DateSeparator: cloned.Sink.CSVConfig.DateSeparator,
IncludeCommitTs: cloned.Sink.CSVConfig.IncludeCommitTs,
}
}

res.Sink = &SinkConfig{
Protocol: cloned.Sink.Protocol,
SchemaRegistry: cloned.Sink.SchemaRegistry,
DispatchRules: dispatchRules,
CSVConfig: csvConfig,
ColumnSelectors: columnSelectors,
TxnAtomicity: string(cloned.Sink.TxnAtomicity),
EncoderConcurrency: cloned.Sink.EncoderConcurrency,
Protocol: cloned.Sink.Protocol,
SchemaRegistry: cloned.Sink.SchemaRegistry,
DispatchRules: dispatchRules,
CSVConfig: csvConfig,
ColumnSelectors: columnSelectors,
TxnAtomicity: string(cloned.Sink.TxnAtomicity),
EncoderConcurrency: cloned.Sink.EncoderConcurrency,
Terminator: cloned.Sink.Terminator,
DateSeparator: cloned.Sink.DateSeparator,
EnablePartitionSeparator: cloned.Sink.EnablePartitionSeparator,
}
}
if cloned.Consistent != nil {
Expand Down Expand Up @@ -418,23 +420,24 @@ type Table struct {
// SinkConfig represents sink config for a changefeed
// This is a duplicate of config.SinkConfig
type SinkConfig struct {
Protocol string `json:"protocol"`
SchemaRegistry string `json:"schema_registry"`
CSVConfig *CSVConfig `json:"csv"`
DispatchRules []*DispatchRule `json:"dispatchers,omitempty"`
ColumnSelectors []*ColumnSelector `json:"column_selectors"`
TxnAtomicity string `json:"transaction_atomicity"`
EncoderConcurrency int `json:"encoder_concurrency"`
Protocol string `json:"protocol"`
SchemaRegistry string `json:"schema_registry"`
CSVConfig *CSVConfig `json:"csv"`
DispatchRules []*DispatchRule `json:"dispatchers,omitempty"`
ColumnSelectors []*ColumnSelector `json:"column_selectors"`
TxnAtomicity string `json:"transaction_atomicity"`
EncoderConcurrency int `json:"encoder_concurrency"`
Terminator string `json:"terminator"`
DateSeparator string `json:"date_separator"`
EnablePartitionSeparator bool `json:"enable_partition_separator"`
}

// CSVConfig denotes the csv config
// This is the same as config.CSVConfig
type CSVConfig struct {
Delimiter string `json:"delimiter"`
Quote string `json:"quote"`
Terminator string `json:"terminator"`
NullString string `json:"null"`
DateSeparator string `json:"date_separator"`
IncludeCommitTs bool `json:"include_commit_ts"`
}

Expand Down
31 changes: 15 additions & 16 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/tiflow/cdc/owner"
"github.com/pingcap/tiflow/cdc/processor"
"github.com/pingcap/tiflow/cdc/processor/pipeline/system"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine/factory"
ssystem "github.com/pingcap/tiflow/cdc/sorter/db/system"
"github.com/pingcap/tiflow/pkg/config"
cdcContext "github.com/pingcap/tiflow/pkg/context"
Expand All @@ -36,7 +37,6 @@ import (
"github.com/pingcap/tiflow/pkg/migrate"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/p2p"
sortmgr "github.com/pingcap/tiflow/pkg/sorter/manager"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/util"
"github.com/pingcap/tiflow/pkg/version"
Expand Down Expand Up @@ -92,12 +92,12 @@ type captureImpl struct {
EtcdClient etcd.CDCEtcdClient
tableActorSystem *system.System

// useEventSortEngine indicates whether to use the new pull based sort engine or
// useSortEngine indicates whether to use the new pull based sort engine or
// the old push based sorter system. the latter will be removed after all sorter
// have been transformed into pull based sort engine.
useEventSortEngine bool
sorterSystem *ssystem.System
sortEngineManager *sortmgr.EventSortEngineManager
useSortEngine bool
sorterSystem *ssystem.System
sortEngineFactory *factory.SortEngineFactory

// MessageServer is the receiver of the messages from the other nodes.
// It should be recreated each time the capture is restarted.
Expand Down Expand Up @@ -130,7 +130,7 @@ func NewCapture(pdEndpoints []string,
etcdClient etcd.CDCEtcdClient,
grpcService *p2p.ServerWrapper,
tableActorSystem *system.System,
sortEngineManager *sortmgr.EventSortEngineManager,
sortEngineMangerFactory *factory.SortEngineFactory,
sorterSystem *ssystem.System,
) Capture {
conf := config.GetGlobalServerConfig()
Expand All @@ -146,9 +146,9 @@ func NewCapture(pdEndpoints []string,
newOwner: owner.NewOwner,
info: &model.CaptureInfo{},

useEventSortEngine: sortEngineManager != nil,
sortEngineManager: sortEngineManager,
sorterSystem: sorterSystem,
useSortEngine: sortEngineMangerFactory != nil,
sortEngineFactory: sortEngineMangerFactory,
sorterSystem: sorterSystem,

migrator: migrate.NewMigrator(etcdClient, pdEndpoints, conf),
}
Expand Down Expand Up @@ -311,14 +311,13 @@ func (c *captureImpl) run(stdCtx context.Context) error {

g, stdCtx := errgroup.WithContext(stdCtx)
ctx := cdcContext.NewContext(stdCtx, &cdcContext.GlobalVars{
CaptureInfo: c.info,
EtcdClient: c.EtcdClient,
TableActorSystem: c.tableActorSystem,
MessageServer: c.MessageServer,
MessageRouter: c.MessageRouter,

CaptureInfo: c.info,
EtcdClient: c.EtcdClient,
TableActorSystem: c.tableActorSystem,
MessageServer: c.MessageServer,
MessageRouter: c.MessageRouter,
SorterSystem: c.sorterSystem,
SortEngineManager: c.sortEngineManager,
SortEngineFactory: c.sortEngineFactory,
})

g.Go(func() error {
Expand Down
3 changes: 0 additions & 3 deletions cdc/capture/capture_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,6 @@ func TestDrainCaptureBySignal(t *testing.T) {
config: config.GetDefaultServerConfig(),
EtcdClient: me,
}

cp.config.Debug.EnableSchedulerV3 = true
require.Equal(t, model.LivenessCaptureAlive, cp.Liveness())

done := cp.Drain()
Expand Down Expand Up @@ -128,7 +126,6 @@ func TestDrainWaitsOwnerResign(t *testing.T) {
owner: mo,
config: config.GetDefaultServerConfig(),
}
cp.config.Debug.EnableSchedulerV3 = true
require.Equal(t, model.LivenessCaptureAlive, cp.Liveness())

mo.EXPECT().AsyncStop().Do(func() {}).AnyTimes()
Expand Down
14 changes: 14 additions & 0 deletions cdc/entry/mounter_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,3 +130,17 @@ func (m *mounterGroup) AddEvent(ctx context.Context, event *model.PolymorphicEve
return nil
}
}

// MockMountGroup is used for tests.
type MockMountGroup struct{}

// Run implements MountGroup.
func (m *MockMountGroup) Run(ctx context.Context) error {
return nil
}

// AddEvent implements MountGroup.
func (m *MockMountGroup) AddEvent(ctx context.Context, event *model.PolymorphicEvent) error {
event.MarkFinished()
return nil
}
Loading

0 comments on commit d060d48

Please sign in to comment.