Skip to content

Commit

Permalink
planner: encapsulate task dir and move test related code to test file (
Browse files Browse the repository at this point in the history
  • Loading branch information
AilinKid authored Mar 26, 2024
1 parent e7b4d31 commit 240ddf8
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 61 deletions.
6 changes: 1 addition & 5 deletions pkg/planner/memo/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ go_library(
"group_expr.go",
"implementation.go",
"pattern.go",
"task.go",
"task_scheduler.go",
],
importpath = "github.com/pingcap/tidb/pkg/planner/memo",
visibility = ["//visibility:public"],
Expand All @@ -29,12 +27,10 @@ go_test(
"group_test.go",
"main_test.go",
"pattern_test.go",
"task_scheduler_test.go",
"task_test.go",
],
embed = [":memo"],
flaky = True,
shard_count = 25,
shard_count = 22,
deps = [
"//pkg/domain",
"//pkg/expression",
Expand Down
24 changes: 24 additions & 0 deletions pkg/planner/task/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "task",
srcs = [
"task.go",
"task_scheduler.go",
],
importpath = "github.com/pingcap/tidb/pkg/planner/task",
visibility = ["//visibility:public"],
)

go_test(
name = "task_test",
timeout = "short",
srcs = [
"task_scheduler_test.go",
"task_test.go",
],
embed = [":task"],
flaky = True,
shard_count = 3,
deps = ["@com_github_stretchr_testify//require"],
)
47 changes: 6 additions & 41 deletions pkg/planner/memo/task.go → pkg/planner/task/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package memo
package task

import (
"strings"
"sync"
)

// Task is an interface defined for all type of optimizing work: exploring, implementing, deriving-stats, join-reordering and so on.
// Task is an interface defined for all type of optimizing work: exploring, implementing,
// deriving-stats, join-reordering and so on.
type Task interface {
// task self executing logic
execute() error
Expand All @@ -35,8 +36,8 @@ type Stack interface {
Destroy()
}

// TaskStackPool is initialized for memory saving by reusing taskStack.
var TaskStackPool = sync.Pool{
// StackTaskPool is initialized for memory saving by reusing taskStack.
var StackTaskPool = sync.Pool{
New: func() any {
return newTaskStack()
},
Expand All @@ -57,7 +58,7 @@ func newTaskStack() *taskStack {
func (ts *taskStack) Destroy() {
// when a taskStack itself is useless, we can destroy itself actively.
clear(ts.tasks)
TaskStackPool.Put(ts)
StackTaskPool.Put(ts)
}

// Desc is used to desc the detail info about current stack state.
Expand Down Expand Up @@ -102,39 +103,3 @@ func newTaskStackWithCap(c int) *taskStack {
tasks: make([]Task, 0, c),
}
}

// TaskStack2 is used to store the optimizing tasks created before or during the optimizing process.
type taskStack2 struct {
tasks []*Task
}

func newTaskStack2WithCap(c int) *taskStack2 {
return &taskStack2{
tasks: make([]*Task, 0, c),
}
}

// Push indicates to push one task into the stack.
func (ts *taskStack2) Push(one Task) {
ts.tasks = append(ts.tasks, &one)
}

// Len indicates the length of current stack.
func (ts *taskStack2) Len() int {
return len(ts.tasks)
}

// Empty indicates whether taskStack is empty.
func (ts *taskStack2) Empty() bool {
return ts.Len() == 0
}

// Pop indicates to pop one task out of the stack.
func (ts *taskStack2) Pop() Task {
if !ts.Empty() {
tmp := ts.tasks[len(ts.tasks)-1]
ts.tasks = ts.tasks[:len(ts.tasks)-1]
return *tmp
}
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package memo
package task

var _ TaskScheduler = &SimpleTaskScheduler{}
var _ Scheduler = &SimpleTaskScheduler{}

// TaskScheduler is a scheduling interface defined for serializing(single thread)/concurrent(multi thread) running.
type TaskScheduler interface {
// Scheduler is a scheduling interface defined for serializing(single thread)/concurrent(multi thread) running.
type Scheduler interface {
ExecuteTasks()
}

// SimpleTaskScheduler is defined for serializing scheduling of memo tasks.
type SimpleTaskScheduler struct {
Err error
SchedulerCtx TaskSchedulerContext
SchedulerCtx SchedulerContext
}

// ExecuteTasks implements the interface of TaskScheduler.
Expand All @@ -46,8 +46,8 @@ func (s *SimpleTaskScheduler) ExecuteTasks() {
}
}

// TaskSchedulerContext is defined for scheduling logic calling, also facilitate interface-oriented coding and testing.
type TaskSchedulerContext interface {
// SchedulerContext is defined for scheduling logic calling, also facilitate interface-oriented coding and testing.
type SchedulerContext interface {
// we exported the Stack interface here rather than the basic stack implementation.
getStack() Stack
// we exported the only one push action to user, Task is an interface definition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package memo
package task

import (
"errors"
Expand Down Expand Up @@ -63,7 +63,7 @@ func TestSimpleTaskScheduler(t *testing.T) {
testScheduler.SchedulerCtx.pushTask(&TestTaskImpl2{a: 2})
testScheduler.SchedulerCtx.pushTask(&TestTaskImpl2{a: 3})

var testTaskScheduler TaskScheduler = testScheduler
var testTaskScheduler Scheduler = testScheduler
testTaskScheduler.ExecuteTasks()
require.NotNil(t, testScheduler.Err)
require.Equal(t, testScheduler.Err.Error(), "mock error at task id = 2")
Expand Down
48 changes: 42 additions & 6 deletions pkg/planner/memo/task_test.go → pkg/planner/task/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package memo
package task

import (
"strconv"
Expand Down Expand Up @@ -50,7 +50,7 @@ func TestTaskStack(t *testing.T) {
}

func TestTaskFunctionality(t *testing.T) {
taskTaskPool := TaskStackPool.Get()
taskTaskPool := StackTaskPool.Get()
require.Equal(t, len(taskTaskPool.(*taskStack).tasks), 0)
require.Equal(t, cap(taskTaskPool.(*taskStack).tasks), 4)
ts := taskTaskPool.(*taskStack)
Expand All @@ -69,10 +69,10 @@ func TestTaskFunctionality(t *testing.T) {
ts.Push(&TestTaskImpl{a: 5})
ts.Push(&TestTaskImpl{a: 6})
// no clean, put it back
TaskStackPool.Put(taskTaskPool)
StackTaskPool.Put(taskTaskPool)

// require again.
ts = TaskStackPool.Get().(*taskStack)
ts = StackTaskPool.Get().(*taskStack)
require.Equal(t, len(ts.tasks), 4)
require.Equal(t, cap(ts.tasks), 4)
// clean the stack
Expand All @@ -89,11 +89,47 @@ func TestTaskFunctionality(t *testing.T) {

// self destroy.
ts.Destroy()
ts = TaskStackPool.Get().(*taskStack)
ts = StackTaskPool.Get().(*taskStack)
require.Equal(t, len(ts.tasks), 0)
require.Equal(t, cap(ts.tasks), 4)
}

// TaskStack2 is used to store the optimizing tasks created before or during the optimizing process.
type taskStackForBench struct {
tasks []*Task
}

func newTaskStackForBenchWithCap(c int) *taskStackForBench {
return &taskStackForBench{
tasks: make([]*Task, 0, c),
}
}

// Push indicates to push one task into the stack.
func (ts *taskStackForBench) Push(one Task) {
ts.tasks = append(ts.tasks, &one)
}

// Len indicates the length of current stack.
func (ts *taskStackForBench) Len() int {
return len(ts.tasks)
}

// Empty indicates whether taskStack is empty.
func (ts *taskStackForBench) Empty() bool {
return ts.Len() == 0
}

// Pop indicates to pop one task out of the stack.
func (ts *taskStackForBench) Pop() Task {
if !ts.Empty() {
tmp := ts.tasks[len(ts.tasks)-1]
ts.tasks = ts.tasks[:len(ts.tasks)-1]
return *tmp
}
return nil
}

// Benchmark result explanation:
// On the right side of the function name, you have four values, 43803,27569 ns/op,24000 B/op and 2000 allocs/op
// The former indicates the total number of times the loop was executed, while the latter is the average amount
Expand All @@ -104,7 +140,7 @@ func TestTaskFunctionality(t *testing.T) {
// BenchmarkTestStack2Pointer-8 42889 27017 ns/op 24000 B/op 2000 allocs/op
// BenchmarkTestStack2Pointer-8 43009 27524 ns/op 24000 B/op 2000 allocs/op
func BenchmarkTestStack2Pointer(b *testing.B) {
stack := newTaskStack2WithCap(1000)
stack := newTaskStackForBenchWithCap(1000)
fill := func() {
for idx := int64(0); idx < 1000; idx++ {
stack.Push(&TestTaskImpl{a: idx})
Expand Down

0 comments on commit 240ddf8

Please sign in to comment.