Skip to content

Commit

Permalink
*(ticdc): enable new scheduler by default (#4836)
Browse files Browse the repository at this point in the history
ref #4126
  • Loading branch information
liuzix authored Mar 11, 2022
1 parent ce83af5 commit f4dc7d6
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 6 deletions.
3 changes: 3 additions & 0 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ func createChangefeed4Test(ctx cdcContext.Context, t *testing.T) (*changefeed, *
}, func() DDLSink {
return &mockDDLSink{}
})
cf.newScheduler = func(ctx cdcContext.Context, startTs uint64) (scheduler, error) {
return newSchedulerV1(), nil
}
state := orchestrator.NewChangefeedReactorState(ctx.ChangefeedVars().ID)
tester := orchestrator.NewReactorStateTester(t, state, nil)
state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
Expand Down
7 changes: 7 additions & 0 deletions cdc/scheduler/info_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ func (s *BaseScheduleDispatcher) GetTaskStatuses() (map[model.CaptureID]*model.T
}
}

// Fill empty entries for those captures with no tables.
for captureID := range s.captures {
if _, exists := ret[captureID]; !exists {
ret[captureID] = &model.TaskStatus{}
}
}

return ret, nil
}

Expand Down
12 changes: 8 additions & 4 deletions pkg/cmd/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,8 @@ func TestParseCfg(t *testing.T) {
TableActor: &config.TableActorConfig{
EventBatchSize: 32,
},
EnableDBSorter: false,
EnableDBSorter: false,
EnableNewScheduler: true,
DB: &config.DBConfig{
Count: 8,
Concurrency: 128,
Expand Down Expand Up @@ -321,7 +322,8 @@ server-worker-pool-size = 16
TableActor: &config.TableActorConfig{
EventBatchSize: 32,
},
EnableDBSorter: false,
EnableDBSorter: false,
EnableNewScheduler: true,
DB: &config.DBConfig{
Count: 5,
Concurrency: 6,
Expand Down Expand Up @@ -461,7 +463,8 @@ cert-allowed-cn = ["dd","ee"]
TableActor: &config.TableActorConfig{
EventBatchSize: 32,
},
EnableDBSorter: false,
EnableDBSorter: false,
EnableNewScheduler: true,
DB: &config.DBConfig{
Count: 8,
Concurrency: 128,
Expand Down Expand Up @@ -520,7 +523,8 @@ unknown3 = 3
TableActor: &config.TableActorConfig{
EventBatchSize: 32,
},
EnableDBSorter: false,
EnableDBSorter: false,
EnableNewScheduler: true,
DB: &config.DBConfig{
Count: 8,
Concurrency: 128,
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/config_test_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ const (
"iterator-max-alive-duration": 10000,
"iterator-slow-read-duration": 256
},
"enable-new-scheduler": false,
"enable-new-scheduler": true,
"messages": {
"client-max-batch-interval": 200000000,
"client-max-batch-size": 8388608,
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/server_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ var defaultServerConfig = &ServerConfig{
TableActor: &TableActorConfig{
EventBatchSize: 32,
},
EnableNewScheduler: false,
EnableNewScheduler: true,
// Default leveldb sorter config
EnableDBSorter: false,
DB: &DBConfig{
Expand Down
4 changes: 4 additions & 0 deletions tests/utils/cdc_state_checker/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/orchestrator"
"go.uber.org/zap"
)
Expand All @@ -29,6 +30,9 @@ type cdcMonitReactor struct {

func (r *cdcMonitReactor) Tick(_ context.Context, state orchestrator.ReactorState) (orchestrator.ReactorState, error) {
r.state = state.(*cdcReactorState)
if config.GetGlobalServerConfig().Debug.EnableNewScheduler {
return r.state, nil
}

err := r.verifyTs()
if err != nil {
Expand Down

0 comments on commit f4dc7d6

Please sign in to comment.