Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: add simple serializing scheduler #51866

Merged
merged 8 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/planner/memo/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
"implementation.go",
"pattern.go",
"task.go",
"task_scheduler.go",
],
importpath = "github.com/pingcap/tidb/pkg/planner/memo",
visibility = ["//visibility:public"],
Expand All @@ -28,11 +29,12 @@ go_test(
"group_test.go",
"main_test.go",
"pattern_test.go",
"task_scheduler_test.go",
"task_test.go",
],
embed = [":memo"],
flaky = True,
shard_count = 24,
shard_count = 25,
deps = [
"//pkg/domain",
"//pkg/expression",
Expand Down
79 changes: 71 additions & 8 deletions pkg/planner/memo/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package memo

import (
"strings"
"sync"
)

Expand All @@ -26,6 +27,14 @@ type Task interface {
desc() string
}

// Stack is abstract definition of task container.(TaskStack is a kind of array stack implementation of it)
type Stack interface {
Push(one Task)
Pop() Task
Empty() bool
Destroy()
}

// TaskStackPool is initialized for memory saving by reusing taskStack.
var TaskStackPool = sync.Pool{
New: func() any {
Expand All @@ -34,30 +43,41 @@ var TaskStackPool = sync.Pool{
}

// TaskStack is used to store the optimizing tasks created before or during the optimizing process.
type TaskStack struct {
type taskStack struct {
tasks []Task
}

func newTaskStack() *TaskStack {
return &TaskStack{
func newTaskStack() *taskStack {
return &taskStack{
tasks: make([]Task, 0, 4),
}
}

// Destroy indicates that when stack itself is useless like in the end of optimizing phase, we can destroy ourselves.
func (ts *TaskStack) Destroy() {
func (ts *taskStack) Destroy() {
// when a taskStack itself is useless, we can destroy itself actively.
clear(ts.tasks)
TaskStackPool.Put(ts)
}

// Desc is used to desc the detail info about current stack state.
// when use customized stack to drive the tasks, the call-chain state is dived in the stack.
func (ts *taskStack) Desc() string {
var str strings.Builder
for _, one := range ts.tasks {
str.WriteString(one.desc())
str.WriteString("\n")
}
return str.String()
}

// Len indicates the length of current stack.
func (ts *TaskStack) Len() int {
func (ts *taskStack) Len() int {
return len(ts.tasks)
}

// Pop indicates to pop one task out of the stack.
func (ts *TaskStack) Pop() Task {
func (ts *taskStack) Pop() Task {
if !ts.Empty() {
tmp := ts.tasks[len(ts.tasks)-1]
ts.tasks = ts.tasks[:len(ts.tasks)-1]
Expand All @@ -67,11 +87,54 @@ func (ts *TaskStack) Pop() Task {
}

// Push indicates to push one task into the stack.
func (ts *TaskStack) Push(one Task) {
func (ts *taskStack) Push(one Task) {
ts.tasks = append(ts.tasks, one)
}

// Empty indicates whether taskStack is empty.
func (ts *TaskStack) Empty() bool {
func (ts *taskStack) Empty() bool {
return ts.Len() == 0
}

// BenchTest required.
func newTaskStackWithCap(c int) *taskStack {
return &taskStack{
tasks: make([]Task, 0, c),
}
}

// TaskStack2 is used to store the optimizing tasks created before or during the optimizing process.
type taskStack2 struct {
tasks []*Task
}

func newTaskStack2WithCap(c int) *taskStack2 {
return &taskStack2{
tasks: make([]*Task, 0, c),
}
}

// Push indicates to push one task into the stack.
func (ts *taskStack2) Push(one Task) {
ts.tasks = append(ts.tasks, &one)
}

// Len indicates the length of current stack.
func (ts *taskStack2) Len() int {
return len(ts.tasks)
}

// Empty indicates whether taskStack is empty.
func (ts *taskStack2) Empty() bool {
return ts.Len() == 0
}

// Pop indicates to pop one task out of the stack.
func (ts *taskStack2) Pop() Task {
if !ts.Empty() {
tmp := ts.tasks[len(ts.tasks)-1]
ts.tasks = ts.tasks[:len(ts.tasks)-1]
return *tmp
}
return nil
}
55 changes: 55 additions & 0 deletions pkg/planner/memo/task_scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package memo

var _ TaskScheduler = &SimpleTaskScheduler{}

// TaskScheduler is a scheduling interface defined for serializing(single thread)/concurrent(multi thread) running.
type TaskScheduler interface {
ExecuteTasks()
}

// SimpleTaskScheduler is defined for serializing scheduling of memo tasks.
type SimpleTaskScheduler struct {
Err error
SchedulerCtx TaskSchedulerContext
}

// ExecuteTasks implements the interface of TaskScheduler.
func (s *SimpleTaskScheduler) ExecuteTasks() {
stack := s.SchedulerCtx.getStack()
defer func() {
// when step out of the scheduler, if the stack is empty, clean and release it.
if !stack.Empty() {
stack.Destroy()
}
}()
for !stack.Empty() {
// when use customized stack to drive the tasks, the call-chain state is dived in the stack.
task := stack.Pop()
if err := task.execute(); err != nil {
s.Err = err
return
}
}
}

// TaskSchedulerContext is defined for scheduling logic calling, also facilitate interface-oriented coding and testing.
type TaskSchedulerContext interface {
// we exported the Stack interface here rather than the basic stack implementation.
getStack() Stack
// we exported the only one push action to user, Task is an interface definition.
pushTask(task Task)
winoros marked this conversation as resolved.
Show resolved Hide resolved
}
70 changes: 70 additions & 0 deletions pkg/planner/memo/task_scheduler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package memo

import (
"errors"
"strconv"
"testing"

"github.com/stretchr/testify/require"
)

// TestSchedulerContext is defined to test scheduling logic here.
type TestSchedulerContext struct {
ts *taskStack
}

func (t *TestSchedulerContext) getStack() Stack {
return t.ts
}

func (t *TestSchedulerContext) pushTask(task Task) {
t.ts.Push(task)
}

// TestSchedulerContext is defined to mock special error state in specified task.
type TestTaskImpl2 struct {
a int64
}

func (t *TestTaskImpl2) execute() error {
// mock error at special task
if t.a == 2 {
return errors.New("mock error at task id = 2")
}
return nil
}

func (t *TestTaskImpl2) desc() string {
return strconv.Itoa(int(t.a))
}

func TestSimpleTaskScheduler(t *testing.T) {
testSchedulerContext := &TestSchedulerContext{
newTaskStack(),
}
testScheduler := &SimpleTaskScheduler{
SchedulerCtx: testSchedulerContext,
}
testScheduler.SchedulerCtx.pushTask(&TestTaskImpl2{a: 1})
testScheduler.SchedulerCtx.pushTask(&TestTaskImpl2{a: 2})
testScheduler.SchedulerCtx.pushTask(&TestTaskImpl2{a: 3})

var testTaskScheduler TaskScheduler = testScheduler
testTaskScheduler.ExecuteTasks()
require.NotNil(t, testScheduler.Err)
require.Equal(t, testScheduler.Err.Error(), "mock error at task id = 2")
}
90 changes: 66 additions & 24 deletions pkg/planner/memo/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,45 +51,87 @@ func TestTaskStack(t *testing.T) {

func TestTaskFunctionality(t *testing.T) {
taskTaskPool := TaskStackPool.Get()
require.Equal(t, len(taskTaskPool.(*TaskStack).tasks), 0)
require.Equal(t, cap(taskTaskPool.(*TaskStack).tasks), 4)
taskStack := taskTaskPool.(*TaskStack)
taskStack.Push(&TestTaskImpl{a: 1})
taskStack.Push(&TestTaskImpl{a: 2})
one := taskStack.Pop()
require.Equal(t, len(taskTaskPool.(*taskStack).tasks), 0)
require.Equal(t, cap(taskTaskPool.(*taskStack).tasks), 4)
ts := taskTaskPool.(*taskStack)
ts.Push(&TestTaskImpl{a: 1})
ts.Push(&TestTaskImpl{a: 2})
one := ts.Pop()
require.Equal(t, one.desc(), "2")
one = taskStack.Pop()
one = ts.Pop()
require.Equal(t, one.desc(), "1")
// empty, pop nil.
one = taskStack.Pop()
one = ts.Pop()
require.Nil(t, one)

taskStack.Push(&TestTaskImpl{a: 3})
taskStack.Push(&TestTaskImpl{a: 4})
taskStack.Push(&TestTaskImpl{a: 5})
taskStack.Push(&TestTaskImpl{a: 6})
ts.Push(&TestTaskImpl{a: 3})
ts.Push(&TestTaskImpl{a: 4})
ts.Push(&TestTaskImpl{a: 5})
ts.Push(&TestTaskImpl{a: 6})
// no clean, put it back
TaskStackPool.Put(taskTaskPool)

// require again.
taskTaskPool = TaskStackPool.Get()
require.Equal(t, len(taskTaskPool.(*TaskStack).tasks), 4)
require.Equal(t, cap(taskTaskPool.(*TaskStack).tasks), 4)
ts = TaskStackPool.Get().(*taskStack)
require.Equal(t, len(ts.tasks), 4)
require.Equal(t, cap(ts.tasks), 4)
// clean the stack
one = taskStack.Pop()
one = ts.Pop()
require.Equal(t, one.desc(), "6")
one = taskStack.Pop()
one = ts.Pop()
require.Equal(t, one.desc(), "5")
one = taskStack.Pop()
one = ts.Pop()
require.Equal(t, one.desc(), "4")
one = taskStack.Pop()
one = ts.Pop()
require.Equal(t, one.desc(), "3")
one = taskStack.Pop()
one = ts.Pop()
require.Nil(t, one)

// self destroy.
taskStack.Destroy()
taskTaskPool = TaskStackPool.Get()
require.Equal(t, len(taskTaskPool.(*TaskStack).tasks), 0)
require.Equal(t, cap(taskTaskPool.(*TaskStack).tasks), 4)
ts.Destroy()
ts = TaskStackPool.Get().(*taskStack)
require.Equal(t, len(ts.tasks), 0)
require.Equal(t, cap(ts.tasks), 4)
}

// Benchmark result explanation:
// On the right side of the function name, you have four values, 43803,27569 ns/op,24000 B/op and 2000 allocs/op
// The former indicates the total number of times the loop was executed, while the latter is the average amount
// of time each iteration took to complete, expressed in nanoseconds per operation. The third is the costed Byte
// of each op, the last one is number of allocs of each op.

// BenchmarkTestStack2Pointer-8 43802 27569 ns/op 24000 B/op 2000 allocs/op
// BenchmarkTestStack2Pointer-8 42889 27017 ns/op 24000 B/op 2000 allocs/op
// BenchmarkTestStack2Pointer-8 43009 27524 ns/op 24000 B/op 2000 allocs/op
func BenchmarkTestStack2Pointer(b *testing.B) {
stack := newTaskStack2WithCap(1000)
fill := func() {
for idx := int64(0); idx < 1000; idx++ {
stack.Push(&TestTaskImpl{a: idx})
}
for idx := int64(0); idx < 1000; idx++ {
stack.Pop()
}
}
for i := 0; i < b.N; i++ {
fill()
}
}

// BenchmarkTestStackInterface-8 108644 10736 ns/op 8000 B/op 1000 allocs/op
// BenchmarkTestStackInterface-8 110587 10756 ns/op 8000 B/op 1000 allocs/op
// BenchmarkTestStackInterface-8 109136 10850 ns/op 8000 B/op 1000 allocs/op
func BenchmarkTestStackInterface(b *testing.B) {
stack := newTaskStackWithCap(1000)
fill := func() {
for idx := int64(0); idx < 1000; idx++ {
stack.Push(&TestTaskImpl{a: idx})
}
for idx := int64(0); idx < 1000; idx++ {
stack.Pop()
}
}
for i := 0; i < b.N; i++ {
fill()
}
}
2 changes: 2 additions & 0 deletions pkg/planner/property/logical_property.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,5 @@ type LogicalProperty struct {
Schema *expression.Schema
MaxOneRow bool
}

// todo: ScalarProperty: usedColumns in current scalar expr, null reject, cor-related, subq contained and so on