From d553c2419a1480cdaebca390d1680fe9b223e751 Mon Sep 17 00:00:00 2001 From: andy Date: Wed, 6 Nov 2019 19:33:04 +0800 Subject: [PATCH] proxy,ctl: add partition list #491 [summary] proxy add partition list ctl add partition list [test case] src/proxy/ddl_test.go src/ctl/v1/shard_test.go [patch codecov] src/proxy/ddl.go 93.6% src/ctl/v1/shard.go 88.3% --- src/ctl/v1/shard.go | 45 ++++++-- src/ctl/v1/shard_test.go | 162 ++++++++++++++++++++++++++-- src/planner/builder/builder_test.go | 1 - src/proxy/ddl.go | 35 ++++-- src/proxy/ddl_test.go | 22 +++- src/router/list.go | 12 +-- src/router/router.go | 5 + 7 files changed, 244 insertions(+), 38 deletions(-) diff --git a/src/ctl/v1/shard.go b/src/ctl/v1/shard.go index 212391c0..90c20307 100644 --- a/src/ctl/v1/shard.go +++ b/src/ctl/v1/shard.go @@ -10,6 +10,7 @@ package v1 import ( "net/http" + "regexp" "strconv" "strings" @@ -19,6 +20,25 @@ import ( "github.com/xelabs/go-mysqlstack/xlog" ) +var ( + subtable = regexp.MustCompile("_[0-9]{4}$") +) + +func SubTableToTable(from string) (isSub bool, to string) { + isSub = false + to = "" + + Suffix := subtable.FindAllStringSubmatch(from, -1) + lenSuffix := len(Suffix) + if lenSuffix == 0 { + return + } + + isSub = true + to = strings.TrimSuffix(from, Suffix[0][lenSuffix-1]) + return +} + // ShardzHandler impl. func ShardzHandler(log *xlog.Log, proxy *proxy.Proxy) rest.HandlerFunc { f := func(w rest.ResponseWriter, r *rest.Request) { @@ -138,7 +158,7 @@ func shardBalanceAdviceHandler(log *xlog.Log, proxy *proxy.Proxy, w rest.Respons var tableSize float64 var database, table string - router := proxy.Router() + route := proxy.Router() for _, row := range qr.Rows { db := string(row[0].Raw()) tbl := string(row[1].Raw()) @@ -152,17 +172,20 @@ func shardBalanceAdviceHandler(log *xlog.Log, proxy *proxy.Proxy, w rest.Respons // Make sure the table is small enough. if (min.size + tblSize) < (max.size - tblSize) { - // Filter the global and single table. - shardKey, err := router.ShardKey(db, tbl) - if err == nil && shardKey == "" { - log.Warning("api.v1.balance.advice.skip.global.and.single.table") - continue + isSub, t := SubTableToTable(tbl) + if isSub { + partitionType, err := route.PartitionType(db, t) + // The advice table just hash, Filter the global/single/list table. + if err == nil && route.IsPartitionHash(partitionType) { + //Find the advice table. + database = db + table = tbl + tableSize = tblSize + break + } } - // Find the advice table. - database = db - table = tbl - tableSize = tblSize - break + + log.Warning("api.v1.balance.advice.skip.table[%v]", tbl) } } diff --git a/src/ctl/v1/shard_test.go b/src/ctl/v1/shard_test.go index 637ac31a..eb2202cc 100644 --- a/src/ctl/v1/shard_test.go +++ b/src/ctl/v1/shard_test.go @@ -10,7 +10,6 @@ package v1 import ( "router" - "strings" "testing" "proxy" @@ -22,6 +21,7 @@ import ( querypb "github.com/xelabs/go-mysqlstack/sqlparser/depends/query" "github.com/xelabs/go-mysqlstack/sqlparser/depends/sqltypes" "github.com/xelabs/go-mysqlstack/xlog" + "strings" ) func TestCtlV1Shardz(t *testing.T) { @@ -72,9 +72,10 @@ func TestCtlV1Shardz(t *testing.T) { } func TestCtlV1ShardBalanceAdvice1(t *testing.T) { - log := xlog.NewStdLog(xlog.Level(xlog.PANIC)) + log := xlog.NewStdLog(xlog.Level(xlog.WARNING)) fakedbs, proxy, cleanup := proxy.MockProxy(log) defer cleanup() + address := proxy.Address() rdbs := &sqltypes.Result{ Fields: []*querypb.Field{ @@ -139,12 +140,12 @@ func TestCtlV1ShardBalanceAdvice1(t *testing.T) { Rows: [][]sqltypes.Value{ { sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("test")), - sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("t1_00001")), + sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("t1_0001")), sqltypes.MakeTrusted(querypb.Type_DECIMAL, []byte("6144")), }, { sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("test")), - sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("t1_00002")), + sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("t1_0002")), sqltypes.MakeTrusted(querypb.Type_DECIMAL, []byte("2048")), }, }, @@ -154,10 +155,29 @@ func TestCtlV1ShardBalanceAdvice1(t *testing.T) { { fakedbs.AddQuery("show databases", rdbs) fakedbs.AddQuery("create database if not exists `test`", &sqltypes.Result{}) + fakedbs.AddQueryPattern("create .*", &sqltypes.Result{}) fakedbs.AddQuerys("select round((sum(data_length) + sum(index_length)) / 1024/ 1024, 0) as sizeinmb from information_schema.tables", r10, r11) fakedbs.AddQuery("SELECT table_schema, table_name, ROUND((SUM(data_length+index_length)) / 1024/ 1024, 0) AS sizeMB FROM information_schema.TABLES GROUP BY table_name HAVING SUM(data_length + index_length)>10485760 ORDER BY (data_length + index_length) DESC", r2) } + // create database. + { + client, err := driver.NewConn("mock", "mock", address, "", "utf8") + assert.Nil(t, err) + query := "create database test" + _, err = client.FetchAll(query, -1) + assert.Nil(t, err) + } + + // create test table. + { + client, err := driver.NewConn("mock", "mock", address, "", "utf8") + assert.Nil(t, err) + query := "create table test.t1(id int, b int) partition by hash(id)" + _, err = client.FetchAll(query, -1) + assert.Nil(t, err) + } + { api := rest.NewApi() router, _ := rest.MakeRouter( @@ -171,7 +191,7 @@ func TestCtlV1ShardBalanceAdvice1(t *testing.T) { got := recorded.Recorder.Body.String() log.Debug(got) - assert.True(t, strings.Contains(got, `"to-datasize":3072,"to-user":"mock","to-password":"pwd","database":"test","table":"t1_00002","tablesize":2048`)) + assert.True(t, strings.Contains(got, `"to-datasize":3072,"to-user":"mock","to-password":"pwd","database":"test","table":"t1_0002","tablesize":2048`)) } } @@ -244,7 +264,7 @@ func TestCtlV1ShardBalanceAdviceGlobal(t *testing.T) { Rows: [][]sqltypes.Value{ { sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("sbtest")), - sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("t1_00002")), + sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("t1_0002")), sqltypes.MakeTrusted(querypb.Type_DECIMAL, []byte("6144")), }, { @@ -384,10 +404,11 @@ func TestCtlV1ShardBalanceAdviceSingle(t *testing.T) { } func TestCtlV1ShardBalanceAdviceGlobal1(t *testing.T) { - log := xlog.NewStdLog(xlog.Level(xlog.PANIC)) + log := xlog.NewStdLog(xlog.Level(xlog.WARNING)) fakedbs, proxy, cleanup := proxy.MockProxy(log) defer cleanup() proxy.Router().AddForTest("sbtest", router.MockTableGConfig()) + proxy.Router().AddForTest("sbtest", router.MockTableGConfig()) rdbs := &sqltypes.Result{ Fields: []*querypb.Field{ @@ -457,7 +478,7 @@ func TestCtlV1ShardBalanceAdviceGlobal1(t *testing.T) { }, { sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("sbtest")), - sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("t1_00001")), + sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("A1_0001")), sqltypes.MakeTrusted(querypb.Type_DECIMAL, []byte("2096")), }, }, @@ -484,7 +505,112 @@ func TestCtlV1ShardBalanceAdviceGlobal1(t *testing.T) { got := recorded.Recorder.Body.String() log.Debug(got) - assert.True(t, strings.Contains(got, `"to-datasize":3072,"to-user":"mock","to-password":"pwd","database":"sbtest","table":"t1_00001","tablesize":2096`)) + assert.Equal(t, "null", got) + } +} + +func TestCtlV1ShardBalanceAdviceList(t *testing.T) { + log := xlog.NewStdLog(xlog.Level(xlog.PANIC)) + fakedbs, proxy, cleanup := proxy.MockProxy(log) + defer cleanup() + proxy.Router().AddForTest("sbtest", router.MockTableListConfig()) + + rdbs := &sqltypes.Result{ + Fields: []*querypb.Field{ + { + Name: "Databases", + Type: querypb.Type_VARCHAR, + }, + }, + Rows: [][]sqltypes.Value{ + { + sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("sbtest")), + }, + { + sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("information_schema")), + }, + }, + } + + r10 := &sqltypes.Result{ + Fields: []*querypb.Field{ + { + Name: "SizeInMB", + Type: querypb.Type_DECIMAL, + }, + }, + Rows: [][]sqltypes.Value{ + { + sqltypes.MakeTrusted(querypb.Type_DECIMAL, []byte("8192")), + }, + }, + } + + r11 := &sqltypes.Result{ + Fields: []*querypb.Field{ + { + Name: "SizeInMB", + Type: querypb.Type_DECIMAL, + }, + }, + Rows: [][]sqltypes.Value{ + { + sqltypes.MakeTrusted(querypb.Type_DECIMAL, []byte("3072")), + }, + }, + } + + r2 := &sqltypes.Result{ + Fields: []*querypb.Field{ + { + Name: "table_schema", + Type: querypb.Type_VARCHAR, + }, + { + Name: "table_name", + Type: querypb.Type_VARCHAR, + }, + { + Name: "sizeMB", + Type: querypb.Type_DECIMAL, + }, + }, + Rows: [][]sqltypes.Value{ + { + sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("sbtest")), + sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("t1_00002")), + sqltypes.MakeTrusted(querypb.Type_DECIMAL, []byte("6144")), + }, + { + sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("sbtest")), + sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("L_0000")), + sqltypes.MakeTrusted(querypb.Type_DECIMAL, []byte("2048")), + }, + }, + } + + // fakedbs. + { + fakedbs.AddQuery("show databases", rdbs) + fakedbs.AddQuery("create database if not exists `sbtest`", &sqltypes.Result{}) + fakedbs.AddQueryPattern("create .*", &sqltypes.Result{}) + fakedbs.AddQuerys("select round((sum(data_length) + sum(index_length)) / 1024/ 1024, 0) as sizeinmb from information_schema.tables", r10, r11) + fakedbs.AddQuery("SELECT table_schema, table_name, ROUND((SUM(data_length+index_length)) / 1024/ 1024, 0) AS sizeMB FROM information_schema.TABLES GROUP BY table_name HAVING SUM(data_length + index_length)>10485760 ORDER BY (data_length + index_length) DESC", r2) + } + + { + api := rest.NewApi() + router, _ := rest.MakeRouter( + rest.Get("/v1/shard/balanceadvice", ShardBalanceAdviceHandler(log, proxy)), + ) + api.SetApp(router) + handler := api.MakeHandler() + + recorded := test.RunRequest(t, handler, test.MakeSimpleRequest("GET", "http://localhost/v1/shard/balanceadvice", nil)) + recorded.CodeIs(200) + + got := recorded.Recorder.Body.String() + assert.Equal(t, "null", got) } } @@ -1275,3 +1401,21 @@ func TestCtlV1Globals(t *testing.T) { assert.Equal(t, want, got) } } + +func TestSubTableToTable(t *testing.T) { + testCases := []struct { + in, out string + }{ + {"t", ""}, + {"t1_0001", "t1"}, + {"t2_000", ""}, + {"t2_0000_00001", "t2_0000"}, + } + + for _, test := range testCases { + isSub, table := SubTableToTable(test.in) + if isSub { + assert.Equal(t, test.out, table) + } + } +} diff --git a/src/planner/builder/builder_test.go b/src/planner/builder/builder_test.go index e9a6c0e9..bdef7562 100644 --- a/src/planner/builder/builder_test.go +++ b/src/planner/builder/builder_test.go @@ -1085,7 +1085,6 @@ func TestGenerateFieldQuery(t *testing.T) { assert.Equal(t, want, got) } - func TestSelectPlanList(t *testing.T) { querys := []string{ "select 1, sum(a),avg(a),a,b from sbtest.L where id>1 group by a,b order by a desc limit 10 offset 100", diff --git a/src/proxy/ddl.go b/src/proxy/ddl.go index 60df6e62..eff8798e 100644 --- a/src/proxy/ddl.go +++ b/src/proxy/ddl.go @@ -232,6 +232,11 @@ func (spanner *Spanner) handleDDL(session *driver.Session, query string, node *s return nil, err } tableType = router.TableTypePartitionHash + case sqlparser.PartitionTableList: + if shardKey, err = tryGetShardKey(ddl); err != nil { + return nil, err + } + tableType = router.TableTypePartitionList case sqlparser.GlobalTableType: tableType = router.TableTypeGlobal case sqlparser.SingleTableType: @@ -246,18 +251,30 @@ func (spanner *Spanner) handleDDL(session *driver.Session, query string, node *s AutoIncrement: autoinc, } - //TODO: a list of backends - if ddl.TableSpec.Options.Type == sqlparser.SingleTableType && ddl.BackendName != "" { - if isExist := scatter.CheckBackend(ddl.BackendName); !isExist { - log.Error("spanner.ddl.execute[%v].backend.doesn't.exist", query) - return nil, fmt.Errorf("create table distributed by backend '%s' doesn't exist", ddl.BackendName) - } + switch tableType { + case router.TableTypeSingle: + if ddl.BackendName != "" { + // TODO(andy): distributed by a list of backends + if isExist := scatter.CheckBackend(ddl.BackendName); !isExist { + log.Error("spanner.ddl.execute[%v].backend.doesn't.exist", query) + return nil, fmt.Errorf("create table distributed by backend '%s' doesn't exist", ddl.BackendName) + } - assignedBackends := []string{ddl.BackendName} - if err := route.CreateTable(database, table, shardKey, tableType, assignedBackends, extra); err != nil { + assignedBackends := []string{ddl.BackendName} + if err := route.CreateTable(database, table, shardKey, tableType, assignedBackends, extra); err != nil { + return nil, err + } + } else { + if err := route.CreateTable(database, table, shardKey, tableType, backends, extra); err != nil { + return nil, err + } + } + case router.TableTypePartitionList: + if err := route.CreateListTable(database, table, shardKey, tableType, ddl.PartitionOptions, extra); err != nil { return nil, err } - } else { + + default: if err := route.CreateTable(database, table, shardKey, tableType, backends, extra); err != nil { return nil, err } diff --git a/src/proxy/ddl_test.go b/src/proxy/ddl_test.go index 7e15e7ab..b2ea9a88 100644 --- a/src/proxy/ddl_test.go +++ b/src/proxy/ddl_test.go @@ -1142,7 +1142,7 @@ func TestProxyDDLDBPrivilegeN(t *testing.T) { } } -func TestProxyDDLGlobalSingleNormal(t *testing.T) { +func TestProxyDDLGlobalSingleNormalList(t *testing.T) { log := xlog.NewStdLog(xlog.Level(xlog.PANIC)) fakedbs, proxy, cleanup := MockProxy(log) defer cleanup() @@ -1172,17 +1172,33 @@ func TestProxyDDLGlobalSingleNormal(t *testing.T) { "CREATE TABLE t5(a int ,b int, primary key(a))", "CREATE TABLE t6(a int ,b int, primary key(a, b))", "create table t7(a int, b int unique)", + + // partition list + "CREATE TABLE l(a int primary key,b int ) partition by list(a)(" + + "PARTITION backend1 VALUES IN (1)," + + "PARTITION backend2 VALUES IN (2));", + "CREATE TABLE l(a int primary key,b int ) partition by list(a)(" + + "PARTITION backend1 VALUES IN (1)," + + "PARTITION backend2 VALUES IN (2));", + "CREATE TABLE l(a int primary key,b int ) partition by list(b)(" + + "PARTITION backend1 VALUES IN (1)," + + "PARTITION backend2 VALUES IN (2));", } results := []string{ "", "", "", - "single.table.not.impl.yet (errno 1105) (sqlstate HY000)", + "", "The unique/primary constraint shoule be defined or add 'PARTITION BY HASH' to mandatory indication (errno 1105) (sqlstate HY000)", "", "The unique/primary constraint shoule be defined or add 'PARTITION BY HASH' to mandatory indication (errno 1105) (sqlstate HY000)", "", + + // partition list + "", + "router.add.db[test].table[l].exists (errno 1105) (sqlstate HY000)", + "The unique/primary constraint should be only defined on the sharding key column[b] (errno 1105) (sqlstate HY000)", } for i, query := range querys { @@ -1333,6 +1349,8 @@ func TestProxyDDLCreateTableDistributed(t *testing.T) { queryErrs := []string{ "create table t1(a int, b int) distributed by (node0)", + //querys have created it. + "create table t1(a int, b int) distributed by (backend0)", } for _, query := range querys { diff --git a/src/router/list.go b/src/router/list.go index 7935f194..d120ee43 100644 --- a/src/router/list.go +++ b/src/router/list.go @@ -34,7 +34,7 @@ func (r *ListRange) Less(b KeyRange) bool { return false } -// List tuple. +// List ... type List struct { log *xlog.Log @@ -58,7 +58,7 @@ func NewList(log *xlog.Log, conf *config.TableConfig) *List { } } -// Build used to build list bitmap from schema config +// Build used to build list bitmap from schema config. func (list *List) Build() error { for _, part := range list.conf.Partitions { partition := Segment{ @@ -75,15 +75,15 @@ func (list *List) Build() error { return nil } -// Clear used to clean partitions +// Clear used to clean partitions. func (list *List) Clear() error { return nil } // Lookup used to lookup partition(s) through the sharding-key range -// List.Lookup only supports the type uint64/string +// List.Lookup only supports the type uint64/string. func (list *List) Lookup(start *sqlparser.SQLVal, end *sqlparser.SQLVal) ([]Segment, error) { - // if open interval we returns all partitions + // if open interval we returns all partitions. if start == nil || end == nil { return list.Segments, nil } @@ -93,7 +93,7 @@ func (list *List) Lookup(start *sqlparser.SQLVal, end *sqlparser.SQLVal) ([]Segm return nil, errors.Errorf("list.lookup.key.type.must.be.same:[%v!=%v]", start.Type, end.Type) } - // List just handle the equal + // List just handle the equal. if bytes.Equal(start.Val, end.Val) { idx, err := list.GetIndex(start) if err != nil { diff --git a/src/router/router.go b/src/router/router.go index da07d51d..2bd57193 100644 --- a/src/router/router.go +++ b/src/router/router.go @@ -220,6 +220,11 @@ func (r *Router) IsPartitionHash(partitionType MethodType) bool { return partitionType == methodTypeHash } +// IsPartitionList used to check whether the partitionType is list. +func (r *Router) IsPartitionList(partitionType MethodType) bool { + return partitionType == methodTypeList +} + func (r *Router) getTable(database string, tableName string) (*Table, error) { var ok bool var schema *Schema