Skip to content

Commit

Permalink
planner: add simple serializing scheduler (#51866)
Browse files Browse the repository at this point in the history
ref #51664
  • Loading branch information
AilinKid authored Mar 25, 2024
1 parent 639fa00 commit a66a80e
Show file tree
Hide file tree
Showing 6 changed files with 267 additions and 33 deletions.
4 changes: 3 additions & 1 deletion pkg/planner/memo/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"implementation.go",
"pattern.go",
"task.go",
"task_scheduler.go",
],
importpath = "github.com/pingcap/tidb/pkg/planner/memo",
visibility = ["//visibility:public"],
Expand All @@ -28,11 +29,12 @@ go_test(
"group_test.go",
"main_test.go",
"pattern_test.go",
"task_scheduler_test.go",
"task_test.go",
],
embed = [":memo"],
flaky = True,
shard_count = 24,
shard_count = 25,
deps = [
"//pkg/domain",
"//pkg/expression",
Expand Down
79 changes: 71 additions & 8 deletions pkg/planner/memo/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package memo

import (
"strings"
"sync"
)

Expand All @@ -26,6 +27,14 @@ type Task interface {
desc() string
}

// Stack is abstract definition of task container.(TaskStack is a kind of array stack implementation of it)
type Stack interface {
Push(one Task)
Pop() Task
Empty() bool
Destroy()
}

// TaskStackPool is initialized for memory saving by reusing taskStack.
var TaskStackPool = sync.Pool{
New: func() any {
Expand All @@ -34,30 +43,41 @@ var TaskStackPool = sync.Pool{
}

// TaskStack is used to store the optimizing tasks created before or during the optimizing process.
type TaskStack struct {
type taskStack struct {
tasks []Task
}

func newTaskStack() *TaskStack {
return &TaskStack{
func newTaskStack() *taskStack {
return &taskStack{
tasks: make([]Task, 0, 4),
}
}

// Destroy indicates that when stack itself is useless like in the end of optimizing phase, we can destroy ourselves.
func (ts *TaskStack) Destroy() {
func (ts *taskStack) Destroy() {
// when a taskStack itself is useless, we can destroy itself actively.
clear(ts.tasks)
TaskStackPool.Put(ts)
}

// Desc is used to desc the detail info about current stack state.
// when use customized stack to drive the tasks, the call-chain state is dived in the stack.
func (ts *taskStack) Desc() string {
var str strings.Builder
for _, one := range ts.tasks {
str.WriteString(one.desc())
str.WriteString("\n")
}
return str.String()
}

// Len indicates the length of current stack.
func (ts *TaskStack) Len() int {
func (ts *taskStack) Len() int {
return len(ts.tasks)
}

// Pop indicates to pop one task out of the stack.
func (ts *TaskStack) Pop() Task {
func (ts *taskStack) Pop() Task {
if !ts.Empty() {
tmp := ts.tasks[len(ts.tasks)-1]
ts.tasks = ts.tasks[:len(ts.tasks)-1]
Expand All @@ -67,11 +87,54 @@ func (ts *TaskStack) Pop() Task {
}

// Push indicates to push one task into the stack.
func (ts *TaskStack) Push(one Task) {
func (ts *taskStack) Push(one Task) {
ts.tasks = append(ts.tasks, one)
}

// Empty indicates whether taskStack is empty.
func (ts *TaskStack) Empty() bool {
func (ts *taskStack) Empty() bool {
return ts.Len() == 0
}

// BenchTest required.
func newTaskStackWithCap(c int) *taskStack {
return &taskStack{
tasks: make([]Task, 0, c),
}
}

// TaskStack2 is used to store the optimizing tasks created before or during the optimizing process.
type taskStack2 struct {
tasks []*Task
}

func newTaskStack2WithCap(c int) *taskStack2 {
return &taskStack2{
tasks: make([]*Task, 0, c),
}
}

// Push indicates to push one task into the stack.
func (ts *taskStack2) Push(one Task) {
ts.tasks = append(ts.tasks, &one)
}

// Len indicates the length of current stack.
func (ts *taskStack2) Len() int {
return len(ts.tasks)
}

// Empty indicates whether taskStack is empty.
func (ts *taskStack2) Empty() bool {
return ts.Len() == 0
}

// Pop indicates to pop one task out of the stack.
func (ts *taskStack2) Pop() Task {
if !ts.Empty() {
tmp := ts.tasks[len(ts.tasks)-1]
ts.tasks = ts.tasks[:len(ts.tasks)-1]
return *tmp
}
return nil
}
55 changes: 55 additions & 0 deletions pkg/planner/memo/task_scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package memo

var _ TaskScheduler = &SimpleTaskScheduler{}

// TaskScheduler is a scheduling interface defined for serializing(single thread)/concurrent(multi thread) running.
type TaskScheduler interface {
ExecuteTasks()
}

// SimpleTaskScheduler is defined for serializing scheduling of memo tasks.
type SimpleTaskScheduler struct {
Err error
SchedulerCtx TaskSchedulerContext
}

// ExecuteTasks implements the interface of TaskScheduler.
func (s *SimpleTaskScheduler) ExecuteTasks() {
stack := s.SchedulerCtx.getStack()
defer func() {
// when step out of the scheduler, if the stack is empty, clean and release it.
if !stack.Empty() {
stack.Destroy()
}
}()
for !stack.Empty() {
// when use customized stack to drive the tasks, the call-chain state is dived in the stack.
task := stack.Pop()
if err := task.execute(); err != nil {
s.Err = err
return
}
}
}

// TaskSchedulerContext is defined for scheduling logic calling, also facilitate interface-oriented coding and testing.
type TaskSchedulerContext interface {
// we exported the Stack interface here rather than the basic stack implementation.
getStack() Stack
// we exported the only one push action to user, Task is an interface definition.
pushTask(task Task)
}
70 changes: 70 additions & 0 deletions pkg/planner/memo/task_scheduler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package memo

import (
"errors"
"strconv"
"testing"

"github.com/stretchr/testify/require"
)

// TestSchedulerContext is defined to test scheduling logic here.
type TestSchedulerContext struct {
ts *taskStack
}

func (t *TestSchedulerContext) getStack() Stack {
return t.ts
}

func (t *TestSchedulerContext) pushTask(task Task) {
t.ts.Push(task)
}

// TestSchedulerContext is defined to mock special error state in specified task.
type TestTaskImpl2 struct {
a int64
}

func (t *TestTaskImpl2) execute() error {
// mock error at special task
if t.a == 2 {
return errors.New("mock error at task id = 2")
}
return nil
}

func (t *TestTaskImpl2) desc() string {
return strconv.Itoa(int(t.a))
}

func TestSimpleTaskScheduler(t *testing.T) {
testSchedulerContext := &TestSchedulerContext{
newTaskStack(),
}
testScheduler := &SimpleTaskScheduler{
SchedulerCtx: testSchedulerContext,
}
testScheduler.SchedulerCtx.pushTask(&TestTaskImpl2{a: 1})
testScheduler.SchedulerCtx.pushTask(&TestTaskImpl2{a: 2})
testScheduler.SchedulerCtx.pushTask(&TestTaskImpl2{a: 3})

var testTaskScheduler TaskScheduler = testScheduler
testTaskScheduler.ExecuteTasks()
require.NotNil(t, testScheduler.Err)
require.Equal(t, testScheduler.Err.Error(), "mock error at task id = 2")
}
90 changes: 66 additions & 24 deletions pkg/planner/memo/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,45 +51,87 @@ func TestTaskStack(t *testing.T) {

func TestTaskFunctionality(t *testing.T) {
taskTaskPool := TaskStackPool.Get()
require.Equal(t, len(taskTaskPool.(*TaskStack).tasks), 0)
require.Equal(t, cap(taskTaskPool.(*TaskStack).tasks), 4)
taskStack := taskTaskPool.(*TaskStack)
taskStack.Push(&TestTaskImpl{a: 1})
taskStack.Push(&TestTaskImpl{a: 2})
one := taskStack.Pop()
require.Equal(t, len(taskTaskPool.(*taskStack).tasks), 0)
require.Equal(t, cap(taskTaskPool.(*taskStack).tasks), 4)
ts := taskTaskPool.(*taskStack)
ts.Push(&TestTaskImpl{a: 1})
ts.Push(&TestTaskImpl{a: 2})
one := ts.Pop()
require.Equal(t, one.desc(), "2")
one = taskStack.Pop()
one = ts.Pop()
require.Equal(t, one.desc(), "1")
// empty, pop nil.
one = taskStack.Pop()
one = ts.Pop()
require.Nil(t, one)

taskStack.Push(&TestTaskImpl{a: 3})
taskStack.Push(&TestTaskImpl{a: 4})
taskStack.Push(&TestTaskImpl{a: 5})
taskStack.Push(&TestTaskImpl{a: 6})
ts.Push(&TestTaskImpl{a: 3})
ts.Push(&TestTaskImpl{a: 4})
ts.Push(&TestTaskImpl{a: 5})
ts.Push(&TestTaskImpl{a: 6})
// no clean, put it back
TaskStackPool.Put(taskTaskPool)

// require again.
taskTaskPool = TaskStackPool.Get()
require.Equal(t, len(taskTaskPool.(*TaskStack).tasks), 4)
require.Equal(t, cap(taskTaskPool.(*TaskStack).tasks), 4)
ts = TaskStackPool.Get().(*taskStack)
require.Equal(t, len(ts.tasks), 4)
require.Equal(t, cap(ts.tasks), 4)
// clean the stack
one = taskStack.Pop()
one = ts.Pop()
require.Equal(t, one.desc(), "6")
one = taskStack.Pop()
one = ts.Pop()
require.Equal(t, one.desc(), "5")
one = taskStack.Pop()
one = ts.Pop()
require.Equal(t, one.desc(), "4")
one = taskStack.Pop()
one = ts.Pop()
require.Equal(t, one.desc(), "3")
one = taskStack.Pop()
one = ts.Pop()
require.Nil(t, one)

// self destroy.
taskStack.Destroy()
taskTaskPool = TaskStackPool.Get()
require.Equal(t, len(taskTaskPool.(*TaskStack).tasks), 0)
require.Equal(t, cap(taskTaskPool.(*TaskStack).tasks), 4)
ts.Destroy()
ts = TaskStackPool.Get().(*taskStack)
require.Equal(t, len(ts.tasks), 0)
require.Equal(t, cap(ts.tasks), 4)
}

// Benchmark result explanation:
// On the right side of the function name, you have four values, 43803,27569 ns/op,24000 B/op and 2000 allocs/op
// The former indicates the total number of times the loop was executed, while the latter is the average amount
// of time each iteration took to complete, expressed in nanoseconds per operation. The third is the costed Byte
// of each op, the last one is number of allocs of each op.

// BenchmarkTestStack2Pointer-8 43802 27569 ns/op 24000 B/op 2000 allocs/op
// BenchmarkTestStack2Pointer-8 42889 27017 ns/op 24000 B/op 2000 allocs/op
// BenchmarkTestStack2Pointer-8 43009 27524 ns/op 24000 B/op 2000 allocs/op
func BenchmarkTestStack2Pointer(b *testing.B) {
stack := newTaskStack2WithCap(1000)
fill := func() {
for idx := int64(0); idx < 1000; idx++ {
stack.Push(&TestTaskImpl{a: idx})
}
for idx := int64(0); idx < 1000; idx++ {
stack.Pop()
}
}
for i := 0; i < b.N; i++ {
fill()
}
}

// BenchmarkTestStackInterface-8 108644 10736 ns/op 8000 B/op 1000 allocs/op
// BenchmarkTestStackInterface-8 110587 10756 ns/op 8000 B/op 1000 allocs/op
// BenchmarkTestStackInterface-8 109136 10850 ns/op 8000 B/op 1000 allocs/op
func BenchmarkTestStackInterface(b *testing.B) {
stack := newTaskStackWithCap(1000)
fill := func() {
for idx := int64(0); idx < 1000; idx++ {
stack.Push(&TestTaskImpl{a: idx})
}
for idx := int64(0); idx < 1000; idx++ {
stack.Pop()
}
}
for i := 0; i < b.N; i++ {
fill()
}
}
2 changes: 2 additions & 0 deletions pkg/planner/property/logical_property.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,5 @@ type LogicalProperty struct {
Schema *expression.Schema
MaxOneRow bool
}

// todo: ScalarProperty: usedColumns in current scalar expr, null reject, cor-related, subq contained and so on

0 comments on commit a66a80e

Please sign in to comment.