Skip to content

Commit

Permalink
importinto: convert encode and sort part into operator (#46532)
Browse files Browse the repository at this point in the history
ref #42930, ref #46258
  • Loading branch information
D3Hunter authored Sep 8, 2023
1 parent 83e3565 commit 729746f
Show file tree
Hide file tree
Showing 8 changed files with 304 additions and 18 deletions.
14 changes: 13 additions & 1 deletion disttask/importinto/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go_library(
name = "importinto",
srcs = [
"dispatcher.go",
"encode_and_sort_operator.go",
"job.go",
"planner.go",
"proto.go",
Expand Down Expand Up @@ -32,6 +33,7 @@ go_library(
"//disttask/framework/scheduler",
"//disttask/framework/scheduler/execute",
"//disttask/framework/storage",
"//disttask/operator",
"//domain/infosync",
"//errno",
"//executor/asyncloaddata",
Expand All @@ -40,9 +42,12 @@ go_library(
"//meta/autoid",
"//parser/ast",
"//parser/mysql",
"//resourcemanager/pool/workerpool",
"//resourcemanager/util",
"//sessionctx",
"//sessionctx/variable",
"//table/tables",
"//util",
"//util/dbterror/exeerrors",
"//util/etcd",
"//util/logutil",
Expand All @@ -63,21 +68,25 @@ go_test(
timeout = "short",
srcs = [
"dispatcher_test.go",
"encode_and_sort_operator_test.go",
"planner_test.go",
"subtask_executor_test.go",
"wrapper_test.go",
],
embed = [":importinto"],
flaky = True,
race = "on",
shard_count = 5,
shard_count = 6,
deps = [
"//br/pkg/lightning/checkpoints",
"//br/pkg/lightning/mydump",
"//br/pkg/lightning/verification",
"//disttask/framework/mock/execute",
"//disttask/framework/planner",
"//disttask/framework/proto",
"//disttask/framework/scheduler/execute",
"//disttask/framework/storage",
"//disttask/operator",
"//domain/infosync",
"//executor/importer",
"//meta/autoid",
Expand All @@ -86,9 +95,12 @@ go_test(
"//testkit",
"//util/logutil",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
"@com_github_stretchr_testify//suite",
"@com_github_tikv_client_go_v2//util",
"@org_uber_go_mock//gomock",
"@org_uber_go_zap//:zap",
],
)
134 changes: 134 additions & 0 deletions disttask/importinto/encode_and_sort_operator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package importinto

import (
"context"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/disttask/operator"
"github.com/pingcap/tidb/resourcemanager/pool/workerpool"
"github.com/pingcap/tidb/resourcemanager/util"
tidbutil "github.com/pingcap/tidb/util"
"go.uber.org/atomic"
"go.uber.org/zap"
)

// encodeAndSortOperator is an operator that encodes and sorts data.
// this operator process data of a subtask, i.e. one engine, it contains a lot
// of data chunks, each chunk is a data file or part of it.
// we don't split into encode and sort operators of chunk level, we parallel
// them inside.
type encodeAndSortOperator struct {
*operator.AsyncOperator[*importStepMinimalTask, workerpool.None]
wg tidbutil.WaitGroupWrapper
firstErr atomic.Error

ctx context.Context
cancel context.CancelFunc

logger *zap.Logger
errCh chan error
}

var _ operator.Operator = (*encodeAndSortOperator)(nil)
var _ operator.WithSource[*importStepMinimalTask] = (*encodeAndSortOperator)(nil)
var _ operator.WithSink[workerpool.None] = (*encodeAndSortOperator)(nil)

func newEncodeAndSortOperator(ctx context.Context, concurrency int, logger *zap.Logger) *encodeAndSortOperator {
subCtx, cancel := context.WithCancel(ctx)
op := &encodeAndSortOperator{
ctx: subCtx,
cancel: cancel,
logger: logger,
errCh: make(chan error),
}
pool := workerpool.NewWorkerPool(
"encodeAndSortOperator",
util.ImportInto,
concurrency,
func() workerpool.Worker[*importStepMinimalTask, workerpool.None] {
return &chunkWorker{
ctx: subCtx,
op: op,
}
},
)
op.AsyncOperator = operator.NewAsyncOperator(subCtx, pool)
return op
}

func (op *encodeAndSortOperator) Open() error {
op.wg.Run(func() {
for err := range op.errCh {
if op.firstErr.CompareAndSwap(nil, err) {
op.cancel()
} else {
if errors.Cause(err) != context.Canceled {
op.logger.Error("error on encode and sort", zap.Error(err))
}
}
}
})
return op.AsyncOperator.Open()
}

func (op *encodeAndSortOperator) Close() error {
// TODO: handle close err after we separate wait part from close part.
// right now AsyncOperator.Close always returns nil, ok to ignore it.
// nolint:errcheck
op.AsyncOperator.Close()
op.cancel()
close(op.errCh)
op.wg.Wait()
// see comments on interface definition, this Close is actually WaitAndClose.
return op.firstErr.Load()
}

func (*encodeAndSortOperator) String() string {
return "encodeAndSortOperator"
}

func (op *encodeAndSortOperator) hasError() bool {
return op.firstErr.Load() != nil
}

func (op *encodeAndSortOperator) onError(err error) {
op.errCh <- err
}

func (op *encodeAndSortOperator) Done() <-chan struct{} {
return op.ctx.Done()
}

type chunkWorker struct {
ctx context.Context
op *encodeAndSortOperator
}

func (w *chunkWorker) HandleTask(task *importStepMinimalTask, _ func(workerpool.None)) {
if w.op.hasError() {
return
}
// we don't use the input send function, it makes workflow more complex
// we send result to errCh and handle it here.
executor := newImportMinimalTaskExecutor(task)
if err := executor.Run(w.ctx); err != nil {
w.op.onError(err)
}
}

func (*chunkWorker) Close() {
}
118 changes: 118 additions & 0 deletions disttask/importinto/encode_and_sort_operator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package importinto

import (
"context"
"os"
"path"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/pingcap/errors"
mockexecute "github.com/pingcap/tidb/disttask/framework/mock/execute"
"github.com/pingcap/tidb/disttask/framework/scheduler/execute"
"github.com/pingcap/tidb/disttask/operator"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"go.uber.org/zap"
)

func TestEncodeAndSortOperator(t *testing.T) {
bak := os.Stdout
logFileName := path.Join(t.TempDir(), "test.log")
file, err := os.OpenFile(logFileName, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644)
require.NoError(t, err)
os.Stdout = file
t.Cleanup(func() {
require.NoError(t, os.Stdout.Close())
os.Stdout = bak
})
logger := zap.NewExample()

ctrl := gomock.NewController(t)
defer ctrl.Finish()
executor := mockexecute.NewMockMiniTaskExecutor(ctrl)
backup := newImportMinimalTaskExecutor
t.Cleanup(func() {
newImportMinimalTaskExecutor = backup
})
newImportMinimalTaskExecutor = func(t *importStepMinimalTask) execute.MiniTaskExecutor {
return executor
}

source := operator.NewSimpleDataChannel(make(chan *importStepMinimalTask))
op := newEncodeAndSortOperator(context.Background(), 3, logger)
op.SetSource(source)
require.NoError(t, op.Open())
require.Greater(t, len(op.String()), 0)

// cancel on error
mockErr := errors.New("mock err")
executor.EXPECT().Run(gomock.Any()).Return(mockErr)
source.Channel() <- &importStepMinimalTask{}
require.Eventually(t, func() bool {
return op.hasError()
}, 3*time.Second, 300*time.Millisecond)
require.Equal(t, mockErr, op.firstErr.Load())
// should not block
<-op.ctx.Done()
require.ErrorIs(t, op.Close(), mockErr)

// cancel on error and log other errors
mockErr2 := errors.New("mock err 2")
source = operator.NewSimpleDataChannel(make(chan *importStepMinimalTask))
op = newEncodeAndSortOperator(context.Background(), 2, logger)
op.SetSource(source)
executor1 := mockexecute.NewMockMiniTaskExecutor(ctrl)
executor2 := mockexecute.NewMockMiniTaskExecutor(ctrl)
var id atomic.Int32
newImportMinimalTaskExecutor = func(t *importStepMinimalTask) execute.MiniTaskExecutor {
if id.Add(1) == 1 {
return executor1
}
return executor2
}
var wg sync.WaitGroup
wg.Add(2)
// wait until 2 executor start running, else workerpool will be cancelled.
executor1.EXPECT().Run(gomock.Any()).DoAndReturn(func(context.Context) error {
wg.Done()
wg.Wait()
return mockErr2
})
executor2.EXPECT().Run(gomock.Any()).DoAndReturn(func(context.Context) error {
wg.Done()
wg.Wait()
// wait error in executor1 has been processed
require.Eventually(t, func() bool {
return op.hasError()
}, 3*time.Second, 300*time.Millisecond)
return errors.New("mock error should be logged")
})
require.NoError(t, op.Open())
// send 2 tasks
source.Channel() <- &importStepMinimalTask{}
source.Channel() <- &importStepMinimalTask{}
// should not block
<-op.ctx.Done()
require.ErrorIs(t, op.Close(), mockErr2)
require.NoError(t, os.Stdout.Sync())
content, err := os.ReadFile(logFileName)
require.NoError(t, err)
require.Contains(t, string(content), "mock error should be logged")
}
34 changes: 22 additions & 12 deletions disttask/importinto/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/disttask/framework/proto"
"github.com/pingcap/tidb/disttask/framework/scheduler"
"github.com/pingcap/tidb/disttask/framework/scheduler/execute"
"github.com/pingcap/tidb/disttask/operator"
"github.com/pingcap/tidb/executor/asyncloaddata"
"github.com/pingcap/tidb/executor/importer"
"github.com/pingcap/tidb/meta/autoid"
Expand Down Expand Up @@ -98,7 +99,7 @@ func (s *importStepExecutor) SplitSubtask(ctx context.Context, subtask *proto.Su
if err != nil {
return nil, err
}
s.logger.Info("split subtask", zap.Int32("engine-id", subtaskMeta.ID))
s.logger.Info("split and run subtask", zap.Int32("engine-id", subtaskMeta.ID))

dataEngine, err := s.tableImporter.OpenDataEngine(ctx, subtaskMeta.ID)
if err != nil {
Expand All @@ -123,15 +124,31 @@ func (s *importStepExecutor) SplitSubtask(ctx context.Context, subtask *proto.Su
}
s.sharedVars.Store(subtaskMeta.ID, sharedVars)

miniTask := make([]proto.MinimalTask, 0, len(subtaskMeta.Chunks))
source := operator.NewSimpleDataChannel(make(chan *importStepMinimalTask))
op := newEncodeAndSortOperator(ctx, int(s.taskMeta.Plan.ThreadCnt), s.logger)
op.SetSource(source)
pipeline := operator.NewAsyncPipeline(op)
if err = pipeline.Execute(); err != nil {
return nil, err
}

outer:
for _, chunk := range subtaskMeta.Chunks {
miniTask = append(miniTask, &importStepMinimalTask{
// TODO: current workpool impl doesn't drain the input channel, it will
// just return on context cancel(error happened), so we add this select.
select {
case source.Channel() <- &importStepMinimalTask{
Plan: s.taskMeta.Plan,
Chunk: chunk,
SharedVars: sharedVars,
})
}:
case <-op.Done():
break outer
}
}
return miniTask, nil
source.Finish()

return nil, pipeline.Close()
}

func (s *importStepExecutor) OnFinished(ctx context.Context, subtaskMetaBytes []byte) ([]byte, error) {
Expand Down Expand Up @@ -266,14 +283,7 @@ func (*importScheduler) GetSubtaskExecutor(_ context.Context, task *proto.Task,

func (*importScheduler) GetMiniTaskExecutor(minimalTask proto.MinimalTask, _ string, step int64) (execute.MiniTaskExecutor, error) {
switch step {
case StepImport:
task, ok := minimalTask.(*importStepMinimalTask)
if !ok {
return nil, errors.Errorf("invalid task type %T", minimalTask)
}
return &ImportMinimalTaskExecutor{mTtask: task}, nil
case StepPostProcess:

mTask, ok := minimalTask.(*postProcessStepMinimalTask)
if !ok {
return nil, errors.Errorf("invalid task type %T", minimalTask)
Expand Down
Loading

0 comments on commit 729746f

Please sign in to comment.