Skip to content

Commit

Permalink
proxy,ctl: add partition list radondb#491
Browse files Browse the repository at this point in the history
[summary]
proxy add partition list
ctl add partition list

[test case]
src/proxy/ddl_test.go
src/ctl/v1/shard_test.go

[patch codecov]
src/proxy/ddl.go 93.6%
src/ctl/v1/shard.go 88.3%
  • Loading branch information
andyli029 committed Nov 8, 2019
1 parent 0cf4b9e commit 40ab01b
Show file tree
Hide file tree
Showing 7 changed files with 245 additions and 38 deletions.
45 changes: 34 additions & 11 deletions src/ctl/v1/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package v1

import (
"net/http"
"regexp"
"strconv"
"strings"

Expand All @@ -19,6 +20,25 @@ import (
"github.com/xelabs/go-mysqlstack/xlog"
)

var (
subtable = regexp.MustCompile("_[0-9]{4}$")
)

func SubTableToTable(from string) (isSub bool, to string) {
isSub = false
to = ""

Suffix := subtable.FindAllStringSubmatch(from, -1)
lenSuffix := len(Suffix)
if lenSuffix == 0 {
return
}

isSub = true
to = strings.TrimSuffix(from, Suffix[0][lenSuffix-1])
return
}

// ShardzHandler impl.
func ShardzHandler(log *xlog.Log, proxy *proxy.Proxy) rest.HandlerFunc {
f := func(w rest.ResponseWriter, r *rest.Request) {
Expand Down Expand Up @@ -138,7 +158,7 @@ func shardBalanceAdviceHandler(log *xlog.Log, proxy *proxy.Proxy, w rest.Respons

var tableSize float64
var database, table string
router := proxy.Router()
route := proxy.Router()
for _, row := range qr.Rows {
db := string(row[0].Raw())
tbl := string(row[1].Raw())
Expand All @@ -152,17 +172,20 @@ func shardBalanceAdviceHandler(log *xlog.Log, proxy *proxy.Proxy, w rest.Respons

// Make sure the table is small enough.
if (min.size + tblSize) < (max.size - tblSize) {
// Filter the global and single table.
shardKey, err := router.ShardKey(db, tbl)
if err == nil && shardKey == "" {
log.Warning("api.v1.balance.advice.skip.global.and.single.table")
continue
isSub, t := SubTableToTable(tbl)
if isSub {
partitionType, err := route.PartitionType(db, t)
// The advice table just hash, Filter the global/single/list table.
if err == nil && route.IsPartitionHash(partitionType) {
//Find the advice table.
database = db
table = tbl
tableSize = tblSize
break
}
}
// Find the advice table.
database = db
table = tbl
tableSize = tblSize
break

log.Warning("api.v1.balance.advice.skip.table[%v]", tbl)
}
}

Expand Down
163 changes: 154 additions & 9 deletions src/ctl/v1/shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ package v1

import (
"router"
"strings"
"testing"

"proxy"
Expand All @@ -22,6 +21,7 @@ import (
querypb "github.com/xelabs/go-mysqlstack/sqlparser/depends/query"
"github.com/xelabs/go-mysqlstack/sqlparser/depends/sqltypes"
"github.com/xelabs/go-mysqlstack/xlog"
"strings"
)

func TestCtlV1Shardz(t *testing.T) {
Expand Down Expand Up @@ -72,9 +72,10 @@ func TestCtlV1Shardz(t *testing.T) {
}

func TestCtlV1ShardBalanceAdvice1(t *testing.T) {
log := xlog.NewStdLog(xlog.Level(xlog.PANIC))
log := xlog.NewStdLog(xlog.Level(xlog.WARNING))
fakedbs, proxy, cleanup := proxy.MockProxy(log)
defer cleanup()
address := proxy.Address()

rdbs := &sqltypes.Result{
Fields: []*querypb.Field{
Expand Down Expand Up @@ -139,12 +140,12 @@ func TestCtlV1ShardBalanceAdvice1(t *testing.T) {
Rows: [][]sqltypes.Value{
{
sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("test")),
sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("t1_00001")),
sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("t1_0001")),
sqltypes.MakeTrusted(querypb.Type_DECIMAL, []byte("6144")),
},
{
sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("test")),
sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("t1_00002")),
sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("t1_0002")),
sqltypes.MakeTrusted(querypb.Type_DECIMAL, []byte("2048")),
},
},
Expand All @@ -154,10 +155,29 @@ func TestCtlV1ShardBalanceAdvice1(t *testing.T) {
{
fakedbs.AddQuery("show databases", rdbs)
fakedbs.AddQuery("create database if not exists `test`", &sqltypes.Result{})
fakedbs.AddQueryPattern("create .*", &sqltypes.Result{})
fakedbs.AddQuerys("select round((sum(data_length) + sum(index_length)) / 1024/ 1024, 0) as sizeinmb from information_schema.tables", r10, r11)
fakedbs.AddQuery("SELECT table_schema, table_name, ROUND((SUM(data_length+index_length)) / 1024/ 1024, 0) AS sizeMB FROM information_schema.TABLES GROUP BY table_name HAVING SUM(data_length + index_length)>10485760 ORDER BY (data_length + index_length) DESC", r2)
}

// create database.
{
client, err := driver.NewConn("mock", "mock", address, "", "utf8")
assert.Nil(t, err)
query := "create database test"
_, err = client.FetchAll(query, -1)
assert.Nil(t, err)
}

// create test table.
{
client, err := driver.NewConn("mock", "mock", address, "", "utf8")
assert.Nil(t, err)
query := "create table test.t1(id int, b int) partition by hash(id)"
_, err = client.FetchAll(query, -1)
assert.Nil(t, err)
}

{
api := rest.NewApi()
router, _ := rest.MakeRouter(
Expand All @@ -171,7 +191,7 @@ func TestCtlV1ShardBalanceAdvice1(t *testing.T) {

got := recorded.Recorder.Body.String()
log.Debug(got)
assert.True(t, strings.Contains(got, `"to-datasize":3072,"to-user":"mock","to-password":"pwd","database":"test","table":"t1_00002","tablesize":2048`))
assert.True(t, strings.Contains(got, `"to-datasize":3072,"to-user":"mock","to-password":"pwd","database":"test","table":"t1_0002","tablesize":2048`))
}
}

Expand Down Expand Up @@ -244,7 +264,7 @@ func TestCtlV1ShardBalanceAdviceGlobal(t *testing.T) {
Rows: [][]sqltypes.Value{
{
sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("sbtest")),
sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("t1_00002")),
sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("t1_0002")),
sqltypes.MakeTrusted(querypb.Type_DECIMAL, []byte("6144")),
},
{
Expand Down Expand Up @@ -384,10 +404,11 @@ func TestCtlV1ShardBalanceAdviceSingle(t *testing.T) {
}

func TestCtlV1ShardBalanceAdviceGlobal1(t *testing.T) {
log := xlog.NewStdLog(xlog.Level(xlog.PANIC))
log := xlog.NewStdLog(xlog.Level(xlog.WARNING))
fakedbs, proxy, cleanup := proxy.MockProxy(log)
defer cleanup()
proxy.Router().AddForTest("sbtest", router.MockTableGConfig())
proxy.Router().AddForTest("sbtest", router.MockTableGConfig())

rdbs := &sqltypes.Result{
Fields: []*querypb.Field{
Expand Down Expand Up @@ -457,7 +478,7 @@ func TestCtlV1ShardBalanceAdviceGlobal1(t *testing.T) {
},
{
sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("sbtest")),
sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("t1_00001")),
sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("A1_0001")),
sqltypes.MakeTrusted(querypb.Type_DECIMAL, []byte("2096")),
},
},
Expand All @@ -484,7 +505,113 @@ func TestCtlV1ShardBalanceAdviceGlobal1(t *testing.T) {

got := recorded.Recorder.Body.String()
log.Debug(got)
assert.True(t, strings.Contains(got, `"to-datasize":3072,"to-user":"mock","to-password":"pwd","database":"sbtest","table":"t1_00001","tablesize":2096`))
assert.Equal(t, "null", got)
//assert.True(t, strings.Contains(got, `"to-datasize":3072,"to-user":"mock","to-password":"pwd","database":"sbtest","table":"t1_0001","tablesize":2096`))
}
}

func TestCtlV1ShardBalanceAdviceList(t *testing.T) {
log := xlog.NewStdLog(xlog.Level(xlog.PANIC))
fakedbs, proxy, cleanup := proxy.MockProxy(log)
defer cleanup()
proxy.Router().AddForTest("sbtest", router.MockTableListConfig())

rdbs := &sqltypes.Result{
Fields: []*querypb.Field{
{
Name: "Databases",
Type: querypb.Type_VARCHAR,
},
},
Rows: [][]sqltypes.Value{
{
sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("sbtest")),
},
{
sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("information_schema")),
},
},
}

r10 := &sqltypes.Result{
Fields: []*querypb.Field{
{
Name: "SizeInMB",
Type: querypb.Type_DECIMAL,
},
},
Rows: [][]sqltypes.Value{
{
sqltypes.MakeTrusted(querypb.Type_DECIMAL, []byte("8192")),
},
},
}

r11 := &sqltypes.Result{
Fields: []*querypb.Field{
{
Name: "SizeInMB",
Type: querypb.Type_DECIMAL,
},
},
Rows: [][]sqltypes.Value{
{
sqltypes.MakeTrusted(querypb.Type_DECIMAL, []byte("3072")),
},
},
}

r2 := &sqltypes.Result{
Fields: []*querypb.Field{
{
Name: "table_schema",
Type: querypb.Type_VARCHAR,
},
{
Name: "table_name",
Type: querypb.Type_VARCHAR,
},
{
Name: "sizeMB",
Type: querypb.Type_DECIMAL,
},
},
Rows: [][]sqltypes.Value{
{
sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("sbtest")),
sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("t1_00002")),
sqltypes.MakeTrusted(querypb.Type_DECIMAL, []byte("6144")),
},
{
sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("sbtest")),
sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("L_0000")),
sqltypes.MakeTrusted(querypb.Type_DECIMAL, []byte("2048")),
},
},
}

// fakedbs.
{
fakedbs.AddQuery("show databases", rdbs)
fakedbs.AddQuery("create database if not exists `sbtest`", &sqltypes.Result{})
fakedbs.AddQueryPattern("create .*", &sqltypes.Result{})
fakedbs.AddQuerys("select round((sum(data_length) + sum(index_length)) / 1024/ 1024, 0) as sizeinmb from information_schema.tables", r10, r11)
fakedbs.AddQuery("SELECT table_schema, table_name, ROUND((SUM(data_length+index_length)) / 1024/ 1024, 0) AS sizeMB FROM information_schema.TABLES GROUP BY table_name HAVING SUM(data_length + index_length)>10485760 ORDER BY (data_length + index_length) DESC", r2)
}

{
api := rest.NewApi()
router, _ := rest.MakeRouter(
rest.Get("/v1/shard/balanceadvice", ShardBalanceAdviceHandler(log, proxy)),
)
api.SetApp(router)
handler := api.MakeHandler()

recorded := test.RunRequest(t, handler, test.MakeSimpleRequest("GET", "http://localhost/v1/shard/balanceadvice", nil))
recorded.CodeIs(200)

got := recorded.Recorder.Body.String()
assert.Equal(t, "null", got)
}
}

Expand Down Expand Up @@ -1275,3 +1402,21 @@ func TestCtlV1Globals(t *testing.T) {
assert.Equal(t, want, got)
}
}

func TestSubTableToTable(t *testing.T) {
testCases := []struct {
in, out string
}{
{"t", ""},
{"t1_0001", "t1"},
{"t2_000", ""},
{"t2_0000_00001", "t2_0000"},
}

for _, test := range testCases {
isSub, table := SubTableToTable(test.in)
if isSub {
assert.Equal(t, test.out, table)
}
}
}
1 change: 0 additions & 1 deletion src/planner/builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1085,7 +1085,6 @@ func TestGenerateFieldQuery(t *testing.T) {
assert.Equal(t, want, got)
}


func TestSelectPlanList(t *testing.T) {
querys := []string{
"select 1, sum(a),avg(a),a,b from sbtest.L where id>1 group by a,b order by a desc limit 10 offset 100",
Expand Down
35 changes: 26 additions & 9 deletions src/proxy/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,11 @@ func (spanner *Spanner) handleDDL(session *driver.Session, query string, node *s
return nil, err
}
tableType = router.TableTypePartitionHash
case sqlparser.PartitionTableList:
if shardKey, err = tryGetShardKey(ddl); err != nil {
return nil, err
}
tableType = router.TableTypePartitionList
case sqlparser.GlobalTableType:
tableType = router.TableTypeGlobal
case sqlparser.SingleTableType:
Expand All @@ -246,18 +251,30 @@ func (spanner *Spanner) handleDDL(session *driver.Session, query string, node *s
AutoIncrement: autoinc,
}

//TODO: a list of backends
if ddl.TableSpec.Options.Type == sqlparser.SingleTableType && ddl.BackendName != "" {
if isExist := scatter.CheckBackend(ddl.BackendName); !isExist {
log.Error("spanner.ddl.execute[%v].backend.doesn't.exist", query)
return nil, fmt.Errorf("create table distributed by backend '%s' doesn't exist", ddl.BackendName)
}
switch tableType {
case router.TableTypeSingle:
if ddl.BackendName != "" {
// TODO(andy): distributed by a list of backends
if isExist := scatter.CheckBackend(ddl.BackendName); !isExist {
log.Error("spanner.ddl.execute[%v].backend.doesn't.exist", query)
return nil, fmt.Errorf("create table distributed by backend '%s' doesn't exist", ddl.BackendName)
}

assignedBackends := []string{ddl.BackendName}
if err := route.CreateTable(database, table, shardKey, tableType, assignedBackends, extra); err != nil {
assignedBackends := []string{ddl.BackendName}
if err := route.CreateTable(database, table, shardKey, tableType, assignedBackends, extra); err != nil {
return nil, err
}
} else {
if err := route.CreateTable(database, table, shardKey, tableType, backends, extra); err != nil {
return nil, err
}
}
case router.TableTypePartitionList:
if err := route.CreateListTable(database, table, shardKey, tableType, ddl.PartitionOptions, extra); err != nil {
return nil, err
}
} else {

default:
if err := route.CreateTable(database, table, shardKey, tableType, backends, extra); err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit 40ab01b

Please sign in to comment.