Skip to content

Commit

Permalink
proxy,ctl: add partition list #491
Browse files Browse the repository at this point in the history
[summary]
proxy add partition list
ctl add partition list

[test case]
src/proxy/ddl_test.go
src/ctl/v1/shard_test.go

[patch codecov]
src/proxy/ddl.go 93.6%
src/ctl/v1/shard.go 89%
  • Loading branch information
andyli029 authored and BohuTANG committed Nov 9, 2019
1 parent c3c0dc1 commit 3ead074
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 40 deletions.
46 changes: 35 additions & 11 deletions src/ctl/v1/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package v1

import (
"net/http"
"regexp"
"strconv"
"strings"

Expand All @@ -19,6 +20,26 @@ import (
"github.com/xelabs/go-mysqlstack/xlog"
)

var (
subtable = regexp.MustCompile("_[0-9]{4}$")
)

// SubTableToTable used to determine from is subtable or not; if it is, get the table from the subtable.
func SubTableToTable(from string) (isSub bool, to string) {
isSub = false
to = ""

Suffix := subtable.FindAllStringSubmatch(from, -1)
lenSuffix := len(Suffix)
if lenSuffix == 0 {
return
}

isSub = true
to = strings.TrimSuffix(from, Suffix[0][lenSuffix-1])
return
}

// ShardzHandler impl.
func ShardzHandler(log *xlog.Log, proxy *proxy.Proxy) rest.HandlerFunc {
f := func(w rest.ResponseWriter, r *rest.Request) {
Expand Down Expand Up @@ -138,7 +159,7 @@ func shardBalanceAdviceHandler(log *xlog.Log, proxy *proxy.Proxy, w rest.Respons

var tableSize float64
var database, table string
router := proxy.Router()
route := proxy.Router()
for _, row := range qr.Rows {
db := string(row[0].Raw())
tbl := string(row[1].Raw())
Expand All @@ -152,17 +173,20 @@ func shardBalanceAdviceHandler(log *xlog.Log, proxy *proxy.Proxy, w rest.Respons

// Make sure the table is small enough.
if (min.size + tblSize) < (max.size - tblSize) {
// Filter the global and single table.
shardKey, err := router.ShardKey(db, tbl)
if err == nil && shardKey == "" {
log.Warning("api.v1.balance.advice.skip.global.and.single.table")
continue
isSub, t := SubTableToTable(tbl)
if isSub {
partitionType, err := route.PartitionType(db, t)
// The advice table just hash, Filter the global/single/list table.
if err == nil && route.IsPartitionHash(partitionType) {
//Find the advice table.
database = db
table = tbl
tableSize = tblSize
break
}
}
// Find the advice table.
database = db
table = tbl
tableSize = tblSize
break

log.Warning("api.v1.balance.advice.skip.table[%v]", tbl)
}
}

Expand Down
62 changes: 50 additions & 12 deletions src/ctl/v1/shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func TestCtlV1ShardBalanceAdvice1(t *testing.T) {
log := xlog.NewStdLog(xlog.Level(xlog.PANIC))
fakedbs, proxy, cleanup := proxy.MockProxy(log)
defer cleanup()
address := proxy.Address()

rdbs := &sqltypes.Result{
Fields: []*querypb.Field{
Expand Down Expand Up @@ -139,12 +140,12 @@ func TestCtlV1ShardBalanceAdvice1(t *testing.T) {
Rows: [][]sqltypes.Value{
{
sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("test")),
sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("t1_00001")),
sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("t1_0001")),
sqltypes.MakeTrusted(querypb.Type_DECIMAL, []byte("6144")),
},
{
sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("test")),
sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("t1_00002")),
sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("t1_0002")),
sqltypes.MakeTrusted(querypb.Type_DECIMAL, []byte("2048")),
},
},
Expand All @@ -154,10 +155,29 @@ func TestCtlV1ShardBalanceAdvice1(t *testing.T) {
{
fakedbs.AddQuery("show databases", rdbs)
fakedbs.AddQuery("create database if not exists `test`", &sqltypes.Result{})
fakedbs.AddQueryPattern("create .*", &sqltypes.Result{})
fakedbs.AddQuerys("select round((sum(data_length) + sum(index_length)) / 1024/ 1024, 0) as sizeinmb from information_schema.tables", r10, r11)
fakedbs.AddQuery("SELECT table_schema, table_name, ROUND((SUM(data_length+index_length)) / 1024/ 1024, 0) AS sizeMB FROM information_schema.TABLES GROUP BY table_name HAVING SUM(data_length + index_length)>10485760 ORDER BY (data_length + index_length) DESC", r2)
}

// create database.
{
client, err := driver.NewConn("mock", "mock", address, "", "utf8")
assert.Nil(t, err)
query := "create database test"
_, err = client.FetchAll(query, -1)
assert.Nil(t, err)
}

// create test table.
{
client, err := driver.NewConn("mock", "mock", address, "", "utf8")
assert.Nil(t, err)
query := "create table test.t1(id int, b int) partition by hash(id)"
_, err = client.FetchAll(query, -1)
assert.Nil(t, err)
}

{
api := rest.NewApi()
router, _ := rest.MakeRouter(
Expand All @@ -171,7 +191,7 @@ func TestCtlV1ShardBalanceAdvice1(t *testing.T) {

got := recorded.Recorder.Body.String()
log.Debug(got)
assert.True(t, strings.Contains(got, `"to-datasize":3072,"to-user":"mock","to-password":"pwd","database":"test","table":"t1_00002","tablesize":2048`))
assert.True(t, strings.Contains(got, `"to-datasize":3072,"to-user":"mock","to-password":"pwd","database":"test","table":"t1_0002","tablesize":2048`))
}
}

Expand Down Expand Up @@ -244,7 +264,7 @@ func TestCtlV1ShardBalanceAdviceGlobal(t *testing.T) {
Rows: [][]sqltypes.Value{
{
sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("sbtest")),
sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("t1_00002")),
sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("t1_0002")),
sqltypes.MakeTrusted(querypb.Type_DECIMAL, []byte("6144")),
},
{
Expand Down Expand Up @@ -383,11 +403,11 @@ func TestCtlV1ShardBalanceAdviceSingle(t *testing.T) {
}
}

func TestCtlV1ShardBalanceAdviceGlobal1(t *testing.T) {
func TestCtlV1ShardBalanceAdviceList(t *testing.T) {
log := xlog.NewStdLog(xlog.Level(xlog.PANIC))
fakedbs, proxy, cleanup := proxy.MockProxy(log)
defer cleanup()
proxy.Router().AddForTest("sbtest", router.MockTableGConfig())
proxy.Router().AddForTest("sbtest", router.MockTableListConfig())

rdbs := &sqltypes.Result{
Fields: []*querypb.Field{
Expand Down Expand Up @@ -452,13 +472,13 @@ func TestCtlV1ShardBalanceAdviceGlobal1(t *testing.T) {
Rows: [][]sqltypes.Value{
{
sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("sbtest")),
sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("G")),
sqltypes.MakeTrusted(querypb.Type_DECIMAL, []byte("2048")),
sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("t1_00002")),
sqltypes.MakeTrusted(querypb.Type_DECIMAL, []byte("6144")),
},
{
sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("sbtest")),
sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("t1_00001")),
sqltypes.MakeTrusted(querypb.Type_DECIMAL, []byte("2096")),
sqltypes.MakeTrusted(querypb.Type_VARCHAR, []byte("L_0000")),
sqltypes.MakeTrusted(querypb.Type_DECIMAL, []byte("2048")),
},
},
}
Expand All @@ -467,6 +487,7 @@ func TestCtlV1ShardBalanceAdviceGlobal1(t *testing.T) {
{
fakedbs.AddQuery("show databases", rdbs)
fakedbs.AddQuery("create database if not exists `sbtest`", &sqltypes.Result{})
fakedbs.AddQueryPattern("create .*", &sqltypes.Result{})
fakedbs.AddQuerys("select round((sum(data_length) + sum(index_length)) / 1024/ 1024, 0) as sizeinmb from information_schema.tables", r10, r11)
fakedbs.AddQuery("SELECT table_schema, table_name, ROUND((SUM(data_length+index_length)) / 1024/ 1024, 0) AS sizeMB FROM information_schema.TABLES GROUP BY table_name HAVING SUM(data_length + index_length)>10485760 ORDER BY (data_length + index_length) DESC", r2)
}
Expand All @@ -483,8 +504,7 @@ func TestCtlV1ShardBalanceAdviceGlobal1(t *testing.T) {
recorded.CodeIs(200)

got := recorded.Recorder.Body.String()
log.Debug(got)
assert.True(t, strings.Contains(got, `"to-datasize":3072,"to-user":"mock","to-password":"pwd","database":"sbtest","table":"t1_00001","tablesize":2096`))
assert.Equal(t, "null", got)
}
}

Expand Down Expand Up @@ -1275,3 +1295,21 @@ func TestCtlV1Globals(t *testing.T) {
assert.Equal(t, want, got)
}
}

func TestSubTableToTable(t *testing.T) {
testCases := []struct {
in, out string
}{
{"t", ""},
{"t1_0001", "t1"},
{"t2_000", ""},
{"t3_0000_0001", "t3_0000"},
}

for _, test := range testCases {
isSub, table := SubTableToTable(test.in)
if isSub || test.out != "" {
assert.Equal(t, test.out, table)
}
}
}
35 changes: 26 additions & 9 deletions src/proxy/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,11 @@ func (spanner *Spanner) handleDDL(session *driver.Session, query string, node *s
return nil, err
}
tableType = router.TableTypePartitionHash
case sqlparser.PartitionTableList:
if shardKey, err = tryGetShardKey(ddl); err != nil {
return nil, err
}
tableType = router.TableTypePartitionList
case sqlparser.GlobalTableType:
tableType = router.TableTypeGlobal
case sqlparser.SingleTableType:
Expand All @@ -246,18 +251,30 @@ func (spanner *Spanner) handleDDL(session *driver.Session, query string, node *s
AutoIncrement: autoinc,
}

//TODO: a list of backends
if ddl.TableSpec.Options.Type == sqlparser.SingleTableType && ddl.BackendName != "" {
if isExist := scatter.CheckBackend(ddl.BackendName); !isExist {
log.Error("spanner.ddl.execute[%v].backend.doesn't.exist", query)
return nil, fmt.Errorf("create table distributed by backend '%s' doesn't exist", ddl.BackendName)
}
switch tableType {
case router.TableTypeSingle:
if ddl.BackendName != "" {
// TODO(andy): distributed by a list of backends
if isExist := scatter.CheckBackend(ddl.BackendName); !isExist {
log.Error("spanner.ddl.execute[%v].backend.doesn't.exist", query)
return nil, fmt.Errorf("create table distributed by backend '%s' doesn't exist", ddl.BackendName)
}

assignedBackends := []string{ddl.BackendName}
if err := route.CreateTable(database, table, shardKey, tableType, assignedBackends, extra); err != nil {
assignedBackends := []string{ddl.BackendName}
if err := route.CreateTable(database, table, shardKey, tableType, assignedBackends, extra); err != nil {
return nil, err
}
} else {
if err := route.CreateTable(database, table, shardKey, tableType, backends, extra); err != nil {
return nil, err
}
}
case router.TableTypePartitionList:
if err := route.CreateListTable(database, table, shardKey, tableType, ddl.PartitionOptions, extra); err != nil {
return nil, err
}
} else {

default:
if err := route.CreateTable(database, table, shardKey, tableType, backends, extra); err != nil {
return nil, err
}
Expand Down
22 changes: 20 additions & 2 deletions src/proxy/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1142,7 +1142,7 @@ func TestProxyDDLDBPrivilegeN(t *testing.T) {
}
}

func TestProxyDDLGlobalSingleNormal(t *testing.T) {
func TestProxyDDLGlobalSingleNormalList(t *testing.T) {
log := xlog.NewStdLog(xlog.Level(xlog.PANIC))
fakedbs, proxy, cleanup := MockProxy(log)
defer cleanup()
Expand Down Expand Up @@ -1172,17 +1172,33 @@ func TestProxyDDLGlobalSingleNormal(t *testing.T) {
"CREATE TABLE t5(a int ,b int, primary key(a))",
"CREATE TABLE t6(a int ,b int, primary key(a, b))",
"create table t7(a int, b int unique)",

// partition list
"CREATE TABLE l(a int primary key,b int ) partition by list(a)(" +
"PARTITION backend1 VALUES IN (1)," +
"PARTITION backend2 VALUES IN (2));",
"CREATE TABLE l(a int primary key,b int ) partition by list(a)(" +
"PARTITION backend1 VALUES IN (1)," +
"PARTITION backend2 VALUES IN (2));",
"CREATE TABLE l(a int primary key,b int ) partition by list(b)(" +
"PARTITION backend1 VALUES IN (1)," +
"PARTITION backend2 VALUES IN (2));",
}

results := []string{
"",
"",
"",
"single.table.not.impl.yet (errno 1105) (sqlstate HY000)",
"",
"The unique/primary constraint shoule be defined or add 'PARTITION BY HASH' to mandatory indication (errno 1105) (sqlstate HY000)",
"",
"The unique/primary constraint shoule be defined or add 'PARTITION BY HASH' to mandatory indication (errno 1105) (sqlstate HY000)",
"",

// partition list
"",
"router.add.db[test].table[l].exists (errno 1105) (sqlstate HY000)",
"The unique/primary constraint should be only defined on the sharding key column[b] (errno 1105) (sqlstate HY000)",
}

for i, query := range querys {
Expand Down Expand Up @@ -1333,6 +1349,8 @@ func TestProxyDDLCreateTableDistributed(t *testing.T) {

queryErrs := []string{
"create table t1(a int, b int) distributed by (node0)",
//querys have created it.
"create table t1(a int, b int) distributed by (backend0)",
}

for _, query := range querys {
Expand Down
12 changes: 6 additions & 6 deletions src/router/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (r *ListRange) Less(b KeyRange) bool {
return false
}

// List tuple.
// List ...
type List struct {
log *xlog.Log

Expand All @@ -58,7 +58,7 @@ func NewList(log *xlog.Log, conf *config.TableConfig) *List {
}
}

// Build used to build list bitmap from schema config
// Build used to build list bitmap from schema config.
func (list *List) Build() error {
for _, part := range list.conf.Partitions {
partition := Segment{
Expand All @@ -75,15 +75,15 @@ func (list *List) Build() error {
return nil
}

// Clear used to clean partitions
// Clear used to clean partitions.
func (list *List) Clear() error {
return nil
}

// Lookup used to lookup partition(s) through the sharding-key range
// List.Lookup only supports the type uint64/string
// List.Lookup only supports the type uint64/string.
func (list *List) Lookup(start *sqlparser.SQLVal, end *sqlparser.SQLVal) ([]Segment, error) {
// if open interval we returns all partitions
// if open interval we returns all partitions.
if start == nil || end == nil {
return list.Segments, nil
}
Expand All @@ -93,7 +93,7 @@ func (list *List) Lookup(start *sqlparser.SQLVal, end *sqlparser.SQLVal) ([]Segm
return nil, errors.Errorf("list.lookup.key.type.must.be.same:[%v!=%v]", start.Type, end.Type)
}

// List just handle the equal
// List just handle the equal.
if bytes.Equal(start.Val, end.Val) {
idx, err := list.GetIndex(start)
if err != nil {
Expand Down

0 comments on commit 3ead074

Please sign in to comment.