diff --git a/src/proxy/ddl.go b/src/proxy/ddl.go index c831fa68..3dcf957c 100644 --- a/src/proxy/ddl.go +++ b/src/proxy/ddl.go @@ -212,7 +212,7 @@ func (spanner *Spanner) handleDDL(session *driver.Session, query string, node *s table := ddl.Table.Name.String() backends := scatter.Backends() shardKey := ddl.PartitionName - tableType := router.TableTypeUnknow + tableType := router.TableTypeUnknown if !checkDatabaseExists(database, route) { return nil, sqldb.NewSQLError(sqldb.ER_BAD_DB_ERROR, database) diff --git a/src/router/compute.go b/src/router/compute.go index 4e608365..f771de30 100644 --- a/src/router/compute.go +++ b/src/router/compute.go @@ -15,6 +15,8 @@ import ( "config" "github.com/pkg/errors" + "github.com/xelabs/go-mysqlstack/sqlparser" + "github.com/xelabs/go-mysqlstack/sqlparser/depends/common" ) // HashUniform used to uniform the hash slots to backends. @@ -120,3 +122,61 @@ func (r *Router) SingleUniform(table string, backends []string) (*config.TableCo }}, }, nil } + +func listMergePartition(partitionDef sqlparser.PartitionOptions) (map[string]string, error) { + partitionMap := make(map[string]string) + for _, onePart := range partitionDef { + row := onePart.Row + valuesNum := len(row) + for i := 0; i < valuesNum; i++ { + key := common.BytesToString(row[i].(*sqlparser.SQLVal).Val) + if _, ok := partitionMap[key]; !ok { + partitionMap[key] = onePart.Backend + } else { + if partitionMap[key] != onePart.Backend { + return nil, errors.New("partition.list.different.backend.with.same.values.") + } + } + } + } + return partitionMap, nil +} + +// ListUniform used to uniform the list table to backends. +func (r *Router) ListUniform(table string, shardkey string, partitionDef sqlparser.PartitionOptions) (*config.TableConfig, error) { + if table == "" { + return nil, errors.New("table.cant.be.null") + } + if shardkey == "" { + return nil, errors.New("shard.key.cant.be.null") + } + + listMap, err := listMergePartition(partitionDef) + if err != nil { + return nil, err + } + + nums := len(listMap) + if nums == 0 { + return nil, errors.New("router.compute.partition.list.is.null") + } + + tableConf := &config.TableConfig{ + Name: table, + ShardType: methodTypeList, + ShardKey: shardkey, + Partitions: make([]*config.PartitionConfig, 0, 16), + } + + i := 0 + for listValue, backend := range listMap { + partConf := &config.PartitionConfig{ + Table: fmt.Sprintf("%s_%04d", table, i), + Backend: backend, + ListValue: listValue, + } + tableConf.Partitions = append(tableConf.Partitions, partConf) + i++ + } + return tableConf, nil +} diff --git a/src/router/compute_test.go b/src/router/compute_test.go index 52707d4b..ac6a8341 100644 --- a/src/router/compute_test.go +++ b/src/router/compute_test.go @@ -15,6 +15,7 @@ import ( "config" "github.com/stretchr/testify/assert" + "github.com/xelabs/go-mysqlstack/sqlparser" "github.com/xelabs/go-mysqlstack/xlog" ) @@ -355,3 +356,101 @@ func TestRouterComputeSingleError(t *testing.T) { assert.NotNil(t, err) } } + +func TestRouterComputeListError(t *testing.T) { + log := xlog.NewStdLog(xlog.Level(xlog.PANIC)) + router, cleanup := MockNewRouter(log) + defer cleanup() + + // backends is NULL. + { + assert.NotNil(t, router) + _, err := router.ListUniform("t1", "", sqlparser.PartitionOptions{}) + assert.NotNil(t, err) + } + + // Table is null. + { + assert.NotNil(t, router) + _, err := router.ListUniform("", "i", sqlparser.PartitionOptions{}) + assert.NotNil(t, err) + } + + // different backends with same list value. + { + partitionDef := sqlparser.PartitionOptions{ + &sqlparser.PartitionDefinition{ + Backend: "node1", + Row: sqlparser.ValTuple{sqlparser.NewStrVal([]byte("1"))}, + }, + &sqlparser.PartitionDefinition{ + Backend: "node2", + Row: sqlparser.ValTuple{sqlparser.NewIntVal([]byte("1"))}, + }, + } + + assert.NotNil(t, router) + _, err := router.ListUniform("t1", "i", partitionDef) + assert.NotNil(t, err) + } + + // empty PartitionOptions + { + assert.NotNil(t, router) + _, err := router.ListUniform("t1", "i", sqlparser.PartitionOptions{}) + assert.NotNil(t, err) + } +} + +func TestRouterComputeList1(t *testing.T) { + datas := `{ + "name": "l", + "shardtype": "LIST", + "shardkey": "id", + "partitions": [ + { + "table": "l_0000", + "segment": "", + "backend": "node1", + "listvalue": "2" + }, + { + "table": "l_0001", + "segment": "", + "backend": "node2", + "listvalue": "4" + }, + { + "table": "l_0002", + "segment": "", + "backend": "node3", + "listvalue": "6" + } + ] +}` + log := xlog.NewStdLog(xlog.Level(xlog.PANIC)) + router, cleanup := MockNewRouter(log) + defer cleanup() + assert.NotNil(t, router) + + partitionDef := sqlparser.PartitionOptions{ + &sqlparser.PartitionDefinition{ + Backend: "node1", + Row: sqlparser.ValTuple{sqlparser.NewStrVal([]byte("2"))}, + }, + &sqlparser.PartitionDefinition{ + Backend: "node2", + Row: sqlparser.ValTuple{sqlparser.NewIntVal([]byte("4"))}, + }, + &sqlparser.PartitionDefinition{ + Backend: "node3", + Row: sqlparser.ValTuple{sqlparser.NewIntVal([]byte("6"))}, + }, + } + + got, err := router.ListUniform("l", "id", partitionDef) + assert.Nil(t, err) + want, err := config.ReadTableConfig(datas) + assert.Nil(t, err) + assert.EqualValues(t, want, got) +} diff --git a/src/router/frm.go b/src/router/frm.go index f5e360b6..6bc6b6bd 100644 --- a/src/router/frm.go +++ b/src/router/frm.go @@ -17,12 +17,13 @@ import ( "config" "github.com/pkg/errors" + "github.com/xelabs/go-mysqlstack/sqlparser" ) const ( - TableTypeSingle = "single" - TableTypeGlobal = "global" - TableTypeUnknow = "unknow" + TableTypeSingle = "single" + TableTypeGlobal = "global" + TableTypeUnknown = "unknown" TableTypePartitionHash = "hash" TableTypePartitionList = "list" @@ -251,6 +252,47 @@ func (r *Router) CreateTable(db, table, shardKey string, tableType string, backe return nil } +func (r *Router) CreateListTable(db, table, shardKey string, tableType string, + partitionDef sqlparser.PartitionOptions, extra *Extra) error { + r.mu.Lock() + defer r.mu.Unlock() + + var err error + var tableConf *config.TableConfig + log := r.log + + switch tableType { + case TableTypePartitionList: + if tableConf, err = r.ListUniform(table, shardKey, partitionDef); err != nil { + return err + } + + default: + err := errors.Errorf("tableType is unsupported: %s", tableType) + return err + } + + if extra != nil { + tableConf.AutoIncrement = extra.AutoIncrement + } + + // add config to router. + if err = r.addTable(db, tableConf); err != nil { + log.Error("frm.create.add.route.error:%v", err) + return err + } + if err = r.writeTableFrmData(db, table, tableConf); err != nil { + log.Error("frm.create.table[db:%v, table:%v].file.error:%+v", db, tableConf.Name, err) + return err + } + + if err = config.UpdateVersion(r.metadir); err != nil { + log.Panicf("frm.create.table.update.version.error:%v", err) + return err + } + return nil +} + func (r *Router) createTable(db, table string, tableConf *config.TableConfig) error { var err error diff --git a/src/router/frm_test.go b/src/router/frm_test.go index 29ddeb3d..5a668274 100644 --- a/src/router/frm_test.go +++ b/src/router/frm_test.go @@ -15,6 +15,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/xelabs/go-mysqlstack/sqlparser" "github.com/xelabs/go-mysqlstack/xlog" ) @@ -82,6 +83,23 @@ func TestFrmTable(t *testing.T) { assert.Nil(t, err) } + // Add list table. + { + partitionDef := sqlparser.PartitionOptions{ + &sqlparser.PartitionDefinition{ + Backend: "node1", + Row: sqlparser.ValTuple{sqlparser.NewStrVal([]byte("2"))}, + }, + &sqlparser.PartitionDefinition{ + Backend: "node2", + Row: sqlparser.ValTuple{sqlparser.NewIntVal([]byte("4"))}, + }, + } + + err := router.CreateListTable("test", "l", "id", TableTypePartitionList, partitionDef, nil) + assert.Nil(t, err) + } + // Add partition table. { backends := []string{"backend1", "backend2"} diff --git a/src/router/list.go b/src/router/list.go new file mode 100644 index 00000000..5ea76b0a --- /dev/null +++ b/src/router/list.go @@ -0,0 +1,140 @@ +/* + * Radon + * + * Copyright 2018-2019 The Radon Authors. + * Code is licensed under the GPLv3. + * + */ + +package router + +import ( + "bytes" + "sort" + + "config" + + "github.com/pkg/errors" + "github.com/xelabs/go-mysqlstack/sqlparser" + "github.com/xelabs/go-mysqlstack/sqlparser/depends/common" + "github.com/xelabs/go-mysqlstack/xlog" +) + +// ListRange for Segment.Range. +type ListRange struct { + str string +} + +// String returns start-end info. +func (r *ListRange) String() string { + return r.str +} + +// Less impl. +func (r *ListRange) Less(b KeyRange) bool { + return false +} + +// List tuple. +type List struct { + log *xlog.Log + + // hash slots + slots int + + // hash method + typ MethodType + + // table config + conf *config.TableConfig + + // Partition map + Segments []Segment `json:",omitempty"` +} + +// NewList creates new list. +func NewList(log *xlog.Log, conf *config.TableConfig) *List { + return &List{ + log: log, + conf: conf, + typ: methodTypeList, + Segments: make([]Segment, 0, 16), + } +} + +// Build used to build list bitmap from schema config +func (list *List) Build() error { + for _, part := range list.conf.Partitions { + partition := Segment{ + Table: part.Table, + Backend: part.Backend, + ListValue: part.ListValue, + Range: &ListRange{ + str: "", + }, + } + // Segments + list.Segments = append(list.Segments, partition) + } + return nil +} + +// Clear used to clean hash partitions +func (list *List) Clear() error { + return nil +} + +// Lookup used to lookup partition(s) through the sharding-key range +// List.Lookup only supports the type uint64/string +func (list *List) Lookup(start *sqlparser.SQLVal, end *sqlparser.SQLVal) ([]Segment, error) { + // if open interval we returns all partitions + if start == nil || end == nil { + return list.Segments, nil + } + + // Check item types. + if start.Type != end.Type { + return nil, errors.Errorf("list.lookup.key.type.must.be.same:[%v!=%v]", start.Type, end.Type) + } + + // List just handle the equal + if bytes.Equal(start.Val, end.Val) { + idx, err := list.GetIndex(start) + if err != nil { + return nil, err + } + return []Segment{list.Segments[idx]}, nil + } + + sort.Sort(Segments(list.Segments)) + return list.Segments, nil +} + +// Type returns the hash type. +func (list *List) Type() MethodType { + return list.typ +} + +// GetIndex returns index based on sqlval. +func (list *List) GetIndex(sqlval *sqlparser.SQLVal) (int, error) { + idx := -1 + valStr := common.BytesToString(sqlval.Val) + for idx, segment := range list.Segments { + if segment.ListValue == valStr { + return idx, nil + } + } + return idx, errors.Errorf("Table has no partition for value %v", valStr) +} + +// GetSegments returns Segments based on index. +func (list *List) GetSegments() []Segment { + return list.Segments +} + +func (list *List) GetSegment(index int) (Segment, error) { + if index >= len(list.Segments) { + return Segment{}, errors.Errorf("single.getsegment.index.[%d].out.of.range", index) + } + return list.Segments[index], nil +} diff --git a/src/router/list_test.go b/src/router/list_test.go new file mode 100644 index 00000000..51fbeeab --- /dev/null +++ b/src/router/list_test.go @@ -0,0 +1,94 @@ +/* + * Radon + * + * Copyright 2018-2019 The Radon Authors. + * Code is licensed under the GPLv3. + * + */ + +package router + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/xelabs/go-mysqlstack/sqlparser" + "github.com/xelabs/go-mysqlstack/xlog" +) + +func TestList(t *testing.T) { + log := xlog.NewStdLog(xlog.Level(xlog.PANIC)) + list := NewList(log, MockTableSConfig()) + { + err := list.Build() + assert.Nil(t, err) + assert.Equal(t, string(list.Type()), methodTypeList) + } + + { + parts, err := list.Lookup(nil, nil) + assert.Nil(t, err) + assert.Equal(t, 1, len(parts)) + } + + { + err := list.Clear() + assert.Nil(t, err) + } +} + +func TestListLookup(t *testing.T) { + log := xlog.NewStdLog(xlog.Level(xlog.PANIC)) + list := NewList(log, MockTableListConfig()) + { + err := list.Build() + assert.Nil(t, err) + } + + // int + intVal := sqlparser.NewIntVal([]byte("1")) + { + parts, err := list.Lookup(intVal, intVal) + assert.Nil(t, err) + assert.Equal(t, string(list.Type()), methodTypeList) + assert.Equal(t, 1, len(parts)) + assert.Equal(t, "L_0000", parts[0].Table) + assert.Equal(t, "backend1", parts[0].Backend) + } + intVal = sqlparser.NewIntVal([]byte("2")) + { + _, err := list.Lookup(intVal, intVal) + assert.NotNil(t, err) + } + + // float + floatVal := sqlparser.NewFloatVal([]byte("65536.99999")) + { + _, err := list.Lookup(intVal, floatVal) + assert.NotNil(t, err) + } + + // [nil, endKey] + { + parts, err := list.Lookup(nil, intVal) + assert.Nil(t, err) + assert.Equal(t, 3, len(parts)) + } + + // [nil, nil] + { + parts, err := list.Lookup(nil, nil) + assert.Nil(t, err) + assert.Equal(t, 3, len(parts)) + } + + // [start, end) + { + s := sqlparser.NewIntVal([]byte("16")) + e := sqlparser.NewIntVal([]byte("17")) + + parts, err := list.Lookup(s, e) + assert.Nil(t, err) + assert.Equal(t, 3, len(parts)) + } +} diff --git a/src/router/mock.go b/src/router/mock.go index f3b6d12f..96337d32 100644 --- a/src/router/mock.go +++ b/src/router/mock.go @@ -368,7 +368,67 @@ func MockTableSConfig() *config.TableConfig { } } -// MockTableAConfig config. +// MockTableListConfig config, list shardtype. +func MockTableListConfig() *config.TableConfig { + mock := &config.TableConfig{ + Name: "L", + ShardType: "LIST", + ShardKey: "id", + Partitions: make([]*config.PartitionConfig, 0, 16), + } + L101 := &config.PartitionConfig{ + Table: "L_0000", + Segment: "", + Backend: "backend1", + ListValue: "1", + } + L102 := &config.PartitionConfig{ + Table: "L_0001", + Segment: "", + Backend: "backend2", + ListValue: "5", + } + L103 := &config.PartitionConfig{ + Table: "L_0002", + Segment: "", + Backend: "backend2", + ListValue: "6", + } + mock.Partitions = append(mock.Partitions, L101, L102, L103) + return mock +} + +// MockTableList1Config config, list shardtype. +func MockTableList1Config() *config.TableConfig { + mock := &config.TableConfig{ + Name: "L1", + ShardType: "LIST", + ShardKey: "id", + Partitions: make([]*config.PartitionConfig, 0, 16), + } + L101 := &config.PartitionConfig{ + Table: "L1_0000", + Segment: "", + Backend: "backend1", + ListValue: "1", + } + L102 := &config.PartitionConfig{ + Table: "L1_0001", + Segment: "", + Backend: "backend2", + ListValue: "5", + } + L103 := &config.PartitionConfig{ + Table: "L1_0002", + Segment: "", + Backend: "backend2", + ListValue: "6", + } + mock.Partitions = append(mock.Partitions, L101, L102, L103) + return mock +} + +// MockTableRConfig config. func MockTableRConfig() *config.TableConfig { mock := &config.TableConfig{ Name: "R", diff --git a/src/router/partition.go b/src/router/partition.go index 435c3186..b4def38e 100644 --- a/src/router/partition.go +++ b/src/router/partition.go @@ -40,6 +40,9 @@ type Segment struct { Backend string `json:",omitempty"` // key range of this segment. Range KeyRange `json:",omitempty"` + + // partition list value. + ListValue string `json:",omitempty"` } // Partition interface. diff --git a/src/router/router.go b/src/router/router.go index 8d3ef527..da07d51d 100644 --- a/src/router/router.go +++ b/src/router/router.go @@ -125,6 +125,12 @@ func (r *Router) addTable(db string, tbl *config.TableConfig) error { return err } table.Partition = single + case methodTypeList: + list := NewList(r.log, tbl) + if err := list.Build(); err != nil { + return err + } + table.Partition = list default: return errors.Errorf("router.unsupport.shardtype:[%v]", tbl.ShardType) } @@ -195,7 +201,7 @@ func (r *Router) clear() { r.Schemas = make(map[string]*Schema) } -// DatabaseACL used to check wheather the database is a system database. +// DatabaseACL used to check whether the database is a system database. func (r *Router) DatabaseACL(database string) error { if ok := r.dbACL.Allow(database); !ok { r.log.Warning("router.database.acl.check.fail[db:%s]", database) @@ -204,11 +210,16 @@ func (r *Router) DatabaseACL(database string) error { return nil } -// IsSystemDB used to check wheather the database is a system database. +// IsSystemDB used to check whether the database is a system database. func (r *Router) IsSystemDB(database string) bool { return r.dbACL.IsSystemDB(database) } +// IsPartitionHash used to check whether the partitionType is hash. +func (r *Router) IsPartitionHash(partitionType MethodType) bool { + return partitionType == methodTypeHash +} + func (r *Router) getTable(database string, tableName string) (*Table, error) { var ok bool var schema *Schema diff --git a/src/router/router_test.go b/src/router/router_test.go index 2734cde5..670dca44 100644 --- a/src/router/router_test.go +++ b/src/router/router_test.go @@ -329,6 +329,9 @@ func TestRouterPartitionType(t *testing.T) { partitionType, err := router.PartitionType("sbtest", "A") assert.Nil(t, err) assert.EqualValues(t, methodTypeHash, partitionType) + + isHash := router.IsPartitionHash(methodTypeHash) + assert.Equal(t, true, isHash) } } @@ -507,7 +510,7 @@ func TestRouterGetSegments(t *testing.T) { router, cleanup := MockNewRouter(log) defer cleanup() assert.NotNil(t, router) - err := router.AddForTest("sbtest", MockTableGConfig(), MockTableMConfig(), MockTableSConfig()) + err := router.AddForTest("sbtest", MockTableGConfig(), MockTableMConfig(), MockTableSConfig(), MockTableListConfig()) assert.Nil(t, err) // hash. { @@ -551,6 +554,18 @@ func TestRouterGetSegments(t *testing.T) { assert.Nil(t, err) assert.Equal(t, 1, len(segments)) } + //list. + { + segments, err := router.GetSegments("sbtest", "L", []int{0}) + assert.Nil(t, err) + assert.Equal(t, 1, len(segments)) + } + //list all segments. + { + segments, err := router.GetSegments("sbtest", "L", []int{}) + assert.Nil(t, err) + assert.Equal(t, 3, len(segments)) + } } func TestRouterGetSegmentsError(t *testing.T) { @@ -642,3 +657,16 @@ func TestRouterGetRenameTableConfig(t *testing.T) { _, err = router.getRenameTableConfig("sbtest", "A", "A") assert.NotNil(t, err) } + +func TestRouterIsPartitionHash(t *testing.T) { + log := xlog.NewStdLog(xlog.Level(xlog.PANIC)) + router, cleanup := MockNewRouter(log) + defer cleanup() + + // sbtest with tables. + err := router.AddForTest("sbtest", MockTableMConfig()) + assert.Nil(t, err) + + isHash := router.IsPartitionHash(methodTypeHash) + assert.Equal(t, true, isHash) +} diff --git a/src/router/types.go b/src/router/types.go index a2fe622d..fd129850 100644 --- a/src/router/types.go +++ b/src/router/types.go @@ -16,4 +16,5 @@ const ( methodTypeHash = "HASH" methodTypeGlobal = "GLOBAL" methodTypeSingle = "SINGLE" + methodTypeList = "LIST" ) diff --git a/src/vendor/github.com/xelabs/go-mysqlstack/sqlparser/ddl_test.go b/src/vendor/github.com/xelabs/go-mysqlstack/sqlparser/ddl_test.go index bce75729..69796d0d 100644 --- a/src/vendor/github.com/xelabs/go-mysqlstack/sqlparser/ddl_test.go +++ b/src/vendor/github.com/xelabs/go-mysqlstack/sqlparser/ddl_test.go @@ -263,8 +263,8 @@ func TestDDL1(t *testing.T) { " `id` int primary key,\n" + " `name` varchar(10)\n" + ") PARTITION BY LIST(c1) (" + - "PARTITION p0 VALUES IN (1,4,7)," + - "PARTITION p1 VALUES IN (2,5,8) )", + "PARTITION p0 VALUES IN (1,4,7)," + + "PARTITION p1 VALUES IN (2,5,8) )", output: "create table test.t (\n" + " `id` int primary key,\n" + " `name` varchar(10)\n" +