Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner, executor: support Change and ChangeExec #9789

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
e94d660
support update pump or drainer status.
aliiohs Mar 19, 2019
249958f
fix
aliiohs Mar 19, 2019
fabd472
fix
aliiohs Mar 19, 2019
32517d1
fix
aliiohs Mar 19, 2019
b984d2d
fix
aliiohs Mar 19, 2019
d6f6000
rm go.sum
aliiohs Mar 19, 2019
e62f63a
fix
aliiohs Mar 19, 2019
dc09862
fix
aliiohs Mar 19, 2019
e2947b1
unless a blank line.
aliiohs Mar 19, 2019
b503dfe
add go.sum
aliiohs Mar 20, 2019
b723f42
Merge branch 'master' into Support_update_pump_or_drainer_status_lastest
WangXiangUSTC Mar 20, 2019
d045a56
fix
aliiohs Mar 20, 2019
180271d
Merge remote-tracking branch 'origin/Support_update_pump_or_drainer_s…
aliiohs Mar 20, 2019
2c5848a
1. use the context from Next.
aliiohs Mar 20, 2019
ef7f7f9
fix
aliiohs Mar 20, 2019
4349d6b
fix
aliiohs Mar 20, 2019
dafee9d
fix
aliiohs Mar 20, 2019
d37a807
fix
aliiohs Mar 20, 2019
b068758
Merge branch 'master' into Support_update_pump_or_drainer_status_lastest
WangXiangUSTC Mar 20, 2019
6511ac2
add metrics counter for 'change'
aliiohs Mar 20, 2019
5bd6683
Merge remote-tracking branch 'origin/Support_update_pump_or_drainer_s…
aliiohs Mar 20, 2019
b857671
Merge branch 'master' into Support_update_pump_or_drainer_status_lastest
WangXiangUSTC Mar 21, 2019
5640a97
merge updateNodeState method into Next
aliiohs Mar 21, 2019
4dfcdf3
Merge remote-tracking branch 'origin/Support_update_pump_or_drainer_s…
aliiohs Mar 21, 2019
e9d2a96
support update pump or drainer status.
aliiohs Mar 19, 2019
2decbc6
fix
aliiohs Mar 19, 2019
7f08429
fix
aliiohs Mar 19, 2019
4fde154
fix
aliiohs Mar 19, 2019
0f43b59
fix
aliiohs Mar 19, 2019
5525a92
fix
aliiohs Mar 19, 2019
46dca32
unless a blank line.
aliiohs Mar 19, 2019
85d1b6d
add go.sum
aliiohs Mar 20, 2019
81fe589
fix
aliiohs Mar 20, 2019
c0821c4
1. use the context from Next.
aliiohs Mar 20, 2019
43a3c53
fix
aliiohs Mar 20, 2019
12cca72
fix
aliiohs Mar 20, 2019
b0b5d93
fix
aliiohs Mar 20, 2019
8c1b00f
fix
aliiohs Mar 20, 2019
458ecce
add metrics counter for 'change'
aliiohs Mar 20, 2019
d61af1a
merge updateNodeState method into Next
aliiohs Mar 21, 2019
5d83bb8
fix
aliiohs Mar 21, 2019
ae18f5c
fix
aliiohs Mar 21, 2019
7fe3ad1
fix
aliiohs Mar 21, 2019
e964c41
format code
WangXiangUSTC Mar 21, 2019
fd18583
Merge branch 'master' into Support_update_pump_or_drainer_status_lastest
WangXiangUSTC Mar 21, 2019
2a92897
Merge branch 'master' into Support_update_pump_or_drainer_status_lastest
WangXiangUSTC Mar 21, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ func (b *executorBuilder) build(p plannercore.Plan) Executor {
switch v := p.(type) {
case nil:
return nil
case *plannercore.Change:
return b.buildChange(v)
case *plannercore.CheckTable:
return b.buildCheckTable(v)
case *plannercore.CheckIndex:
Expand Down Expand Up @@ -195,6 +197,12 @@ func (b *executorBuilder) buildCancelDDLJobs(v *plannercore.CancelDDLJobs) Execu
}
return e
}
func (b *executorBuilder) buildChange(v *plannercore.Change) Executor {
WangXiangUSTC marked this conversation as resolved.
Show resolved Hide resolved
e := &ChangeExec{
Statement: v.Statement,
}
return e
}

func (b *executorBuilder) buildShowNextRowID(v *plannercore.ShowNextRowID) Executor {
e := &ShowNextRowIDExec{
Expand Down
70 changes: 70 additions & 0 deletions executor/change.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Licensed under the Apache License, Version 2.0 (the "License");
WangXiangUSTC marked this conversation as resolved.
Show resolved Hide resolved
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

import (
"context"
WangXiangUSTC marked this conversation as resolved.
Show resolved Hide resolved
"fmt"
"github.com/pingcap/errors"
"github.com/pingcap/parser/ast"
"github.com/pingcap/tidb-tools/tidb-binlog/node"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/util/chunk"
)

// ChangeExec represents a change executor.
type ChangeExec struct {
baseExecutor

Statement *ast.ChangeStmt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can remove this, and only store kind, state, nodeID.

}

// Next implements the Executor Next interface.
func (e *ChangeExec) Next(ctx context.Context, req *chunk.RecordBatch) error {

WangXiangUSTC marked this conversation as resolved.
Show resolved Hide resolved
stmt := e.Statement
fmt.Println(stmt.NodeID)
cfg := config.GetGlobalConfig()
return updateNodeState(cfg.Path, stmt.NodeType, stmt.NodeID, stmt.State)
WangXiangUSTC marked this conversation as resolved.
Show resolved Hide resolved
}

// updateNodeState update pump or drainer's state.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not put the implementation of this function in ChangeExec.Next directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

finished

func updateNodeState(urls, kind, nodeID, state string) error {
/*
node's state can be online, pausing, paused, closing and offline.
if the state is one of them, will update the node's state saved in etcd directly.
*/
registry, err := createRegistry(urls)
if err != nil {
return errors.Trace(err)
}

nodes, _, err := registry.Nodes(context.Background(), node.NodePrefix[kind])
alivxxx marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return errors.Trace(err)
WangXiangUSTC marked this conversation as resolved.
Show resolved Hide resolved
}

for _, n := range nodes {
if n.NodeID != nodeID {
continue
}
switch state {
case node.Online, node.Pausing, node.Paused, node.Closing, node.Offline:
n.State = state
return registry.UpdateNode(context.Background(), node.NodePrefix[kind], n)
alivxxx marked this conversation as resolved.
Show resolved Hide resolved
default:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be checked eagerly. We check it with parser may be better.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, we will update parser, and then create another pr to remove these judgement.

return errors.Errorf("state %s is illegal", state)
}
}

return errors.NotFoundf("node %s, id %s from etcd %s", kind, nodeID, urls)
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ require (
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20190215154024-7f2fc73ef562
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596
github.com/pingcap/parser v0.0.0-20190312024907-3f6280b08c8b
github.com/pingcap/parser v0.0.0-20190318135537-2575f2d0ba4b
github.com/pingcap/pd v2.1.0-rc.4+incompatible
github.com/pingcap/tidb-tools v2.1.3-0.20190116051332-34c808eef588+incompatible
github.com/pingcap/tipb v0.0.0-20190107072121-abbec73437b7
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/StackExchange/wmi v0.0.0-20180725035823-b12b22c5341f h1:5ZfJxyXo8KyX8DgGXC5B7ILL8y51fci/qYz2B4j8iLY=
github.com/StackExchange/wmi v0.0.0-20180725035823-b12b22c5341f/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/aliiohs/parser v0.0.0-20190318091245-2946eb6d5d02 h1:L0NeP/soQ+LDqqmhtFWseqt4+n2uva+bau/ojaBlHo0=
github.com/aliiohs/parser v0.0.0-20190318091245-2946eb6d5d02/go.mod h1:WzwSl32vdxLYkpa8qu1hC3AGxGROygU9lQiy1JAyKYk=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/blacktear23/go-proxyprotocol v0.0.0-20180807104634-af7a81e8dd0d h1:rQlvB2AYWme2bIB18r/SipGiMEVJYE9U0z+MGoU/LtQ=
Expand Down Expand Up @@ -117,6 +119,8 @@ github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596 h1:t2OQTpPJnrPDGlvA+3F
github.com/pingcap/log v0.0.0-20190307075452-bd41d9273596/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
github.com/pingcap/parser v0.0.0-20190312024907-3f6280b08c8b h1:NlvTrxqezIJh6CD5Leky12IZ8E/GtpEEmzgNNb34wbw=
github.com/pingcap/parser v0.0.0-20190312024907-3f6280b08c8b/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA=
github.com/pingcap/parser v0.0.0-20190318135537-2575f2d0ba4b h1:cfmy3QgCQkjjsHzw3HgWLZYg62MBLXSRXHN6TcsKhSA=
github.com/pingcap/parser v0.0.0-20190318135537-2575f2d0ba4b/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA=
github.com/pingcap/pd v2.1.0-rc.4+incompatible h1:/buwGk04aHO5odk/+O8ZOXGs4qkUjYTJ2UpCJXna8NE=
github.com/pingcap/pd v2.1.0-rc.4+incompatible/go.mod h1:nD3+EoYes4+aNNODO99ES59V83MZSI+dFbhyr667a0E=
github.com/pingcap/tidb-tools v2.1.3-0.20190116051332-34c808eef588+incompatible h1:e9Gi/LP9181HT3gBfSOeSBA+5JfemuE4aEAhqNgoE4k=
Expand Down
7 changes: 7 additions & 0 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,13 @@ type CancelDDLJobs struct {
JobIDs []int64
}

// Change represents deallocate plan.
alivxxx marked this conversation as resolved.
Show resolved Hide resolved
type Change struct {
baseSchemaProducer
zz-jason marked this conversation as resolved.
Show resolved Hide resolved

Statement *ast.ChangeStmt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will make the code more concise

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can define it as :
type Change struct {
baseSchemaProducer
*ast.ChangeStmt
}

Thus we can use e.xxx to without stmt := e.Statement

}

// Prepare represents prepare plan.
type Prepare struct {
baseSchemaProducer
Expand Down
6 changes: 6 additions & 0 deletions planner/core/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,15 @@ func (b *PlanBuilder) Build(node ast.Node) (Plan, error) {
return b.buildSimple(node.(ast.StmtNode))
case ast.DDLNode:
return b.buildDDL(x)
case *ast.ChangeStmt:
return b.buildChange(x)
}
return nil, ErrUnsupportedType.GenWithStack("Unsupported type %T", node)
}
func (b *PlanBuilder) buildChange(v *ast.ChangeStmt) (Plan, error) {
WangXiangUSTC marked this conversation as resolved.
Show resolved Hide resolved
exe := &Change{Statement: v}
return exe, nil
WangXiangUSTC marked this conversation as resolved.
Show resolved Hide resolved
}

func (b *PlanBuilder) buildExecute(v *ast.ExecuteStmt) (Plan, error) {
vars := make([]expression.Expression, 0, len(v.UsingVars))
Expand Down