Skip to content

Commit

Permalink
infoschema: migrate dataForTables and dataForPartitions from pack…
Browse files Browse the repository at this point in the history
…age `infoschema` to `executor` #15034 #15037 (#15208)
  • Loading branch information
zhaox1n authored Mar 9, 2020
1 parent 9627132 commit 1f5fb46
Show file tree
Hide file tree
Showing 5 changed files with 473 additions and 470 deletions.
2 changes: 2 additions & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1410,6 +1410,8 @@ func (b *executorBuilder) buildMemTable(v *plannercore.PhysicalMemTable) Executo
case strings.ToLower(infoschema.TableSchemata),
strings.ToLower(infoschema.TableTiDBIndexes),
strings.ToLower(infoschema.TableViews),
strings.ToLower(infoschema.TableTables),
strings.ToLower(infoschema.TablePartitions),
strings.ToLower(infoschema.TableEngines),
strings.ToLower(infoschema.TableCollations),
strings.ToLower(infoschema.TableCharacterSets),
Expand Down
351 changes: 351 additions & 0 deletions executor/infoschema_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@ import (
"context"
"fmt"
"sort"
"sync"
"time"

"github.com/pingcap/parser/charset"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/sqlexec"
)

type memtableRetriever struct {
Expand Down Expand Up @@ -52,6 +56,10 @@ func (e *memtableRetriever) retrieve(ctx context.Context, sctx sessionctx.Contex
switch e.table.Name.O {
case infoschema.TableSchemata:
e.rows = dataForSchemata(sctx, dbs)
case infoschema.TableTables:
e.rows, err = dataForTables(sctx, dbs)
case infoschema.TablePartitions:
e.rows, err = dataForPartitions(sctx, dbs)
case infoschema.TableTiDBIndexes:
e.rows, err = dataForIndexes(sctx, dbs)
case infoschema.TableViews:
Expand Down Expand Up @@ -101,6 +109,134 @@ func (e *memtableRetriever) retrieve(ctx context.Context, sctx sessionctx.Contex
return rows, nil
}

func getRowCountAllTable(ctx sessionctx.Context) (map[int64]uint64, error) {
rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL("select table_id, count from mysql.stats_meta")
if err != nil {
return nil, err
}
rowCountMap := make(map[int64]uint64, len(rows))
for _, row := range rows {
tableID := row.GetInt64(0)
rowCnt := row.GetUint64(1)
rowCountMap[tableID] = rowCnt
}
return rowCountMap, nil
}

type tableHistID struct {
tableID int64
histID int64
}

func getColLengthAllTables(ctx sessionctx.Context) (map[tableHistID]uint64, error) {
rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL("select table_id, hist_id, tot_col_size from mysql.stats_histograms where is_index = 0")
if err != nil {
return nil, err
}
colLengthMap := make(map[tableHistID]uint64, len(rows))
for _, row := range rows {
tableID := row.GetInt64(0)
histID := row.GetInt64(1)
totalSize := row.GetInt64(2)
if totalSize < 0 {
totalSize = 0
}
colLengthMap[tableHistID{tableID: tableID, histID: histID}] = uint64(totalSize)
}
return colLengthMap, nil
}

func getDataAndIndexLength(info *model.TableInfo, physicalID int64, rowCount uint64, columnLengthMap map[tableHistID]uint64) (uint64, uint64) {
columnLength := make(map[string]uint64, len(info.Columns))
for _, col := range info.Columns {
if col.State != model.StatePublic {
continue
}
length := col.FieldType.StorageLength()
if length != types.VarStorageLen {
columnLength[col.Name.L] = rowCount * uint64(length)
} else {
length := columnLengthMap[tableHistID{tableID: physicalID, histID: col.ID}]
columnLength[col.Name.L] = length
}
}
dataLength, indexLength := uint64(0), uint64(0)
for _, length := range columnLength {
dataLength += length
}
for _, idx := range info.Indices {
if idx.State != model.StatePublic {
continue
}
for _, col := range idx.Columns {
if col.Length == types.UnspecifiedLength {
indexLength += columnLength[col.Name.L]
} else {
indexLength += rowCount * uint64(col.Length)
}
}
}
return dataLength, indexLength
}

type statsCache struct {
mu sync.Mutex
loading bool
modifyTime time.Time
tableRows map[int64]uint64
colLength map[tableHistID]uint64
}

var tableStatsCache = &statsCache{}

// TableStatsCacheExpiry is the expiry time for table stats cache.
var TableStatsCacheExpiry = 3 * time.Second

func (c *statsCache) setLoading(loading bool) {
c.mu.Lock()
c.loading = loading
c.mu.Unlock()
}

func (c *statsCache) get(ctx sessionctx.Context) (map[int64]uint64, map[tableHistID]uint64, error) {
c.mu.Lock()
if time.Since(c.modifyTime) < TableStatsCacheExpiry || c.loading {
tableRows, colLength := c.tableRows, c.colLength
c.mu.Unlock()
return tableRows, colLength, nil
}
c.loading = true
c.mu.Unlock()

tableRows, err := getRowCountAllTable(ctx)
if err != nil {
c.setLoading(false)
return nil, nil, err
}
colLength, err := getColLengthAllTables(ctx)
if err != nil {
c.setLoading(false)
return nil, nil, err
}

c.mu.Lock()
c.loading = false
c.tableRows = tableRows
c.colLength = colLength
c.modifyTime = time.Now()
c.mu.Unlock()
return tableRows, colLength, nil
}

func getAutoIncrementID(ctx sessionctx.Context, schema *model.DBInfo, tblInfo *model.TableInfo) (int64, error) {
is := ctx.GetSessionVars().TxnCtx.InfoSchema.(infoschema.InfoSchema)
tbl, err := is.TableByName(schema.Name, tblInfo.Name)
if err != nil {
return 0, err
}
return tbl.Allocator(ctx, autoid.RowIDAllocType).Base() + 1, nil
}

func dataForSchemata(ctx sessionctx.Context, schemas []*model.DBInfo) [][]types.Datum {
checker := privilege.GetPrivilegeManager(ctx)
rows := make([][]types.Datum, 0, len(schemas))
Expand Down Expand Up @@ -133,6 +269,221 @@ func dataForSchemata(ctx sessionctx.Context, schemas []*model.DBInfo) [][]types.
return rows
}

func dataForTables(ctx sessionctx.Context, schemas []*model.DBInfo) ([][]types.Datum, error) {
tableRowsMap, colLengthMap, err := tableStatsCache.get(ctx)
if err != nil {
return nil, err
}

checker := privilege.GetPrivilegeManager(ctx)

var rows [][]types.Datum
createTimeTp := mysql.TypeDatetime
for _, schema := range schemas {
for _, table := range schema.Tables {
collation := table.Collate
if collation == "" {
collation = mysql.DefaultCollationName
}
createTime := types.NewTime(types.FromGoTime(table.GetUpdateTime()), createTimeTp, types.DefaultFsp)

createOptions := ""

if checker != nil && !checker.RequestVerification(ctx.GetSessionVars().ActiveRoles, schema.Name.L, table.Name.L, "", mysql.AllPrivMask) {
continue
}

if !table.IsView() {
if table.GetPartitionInfo() != nil {
createOptions = "partitioned"
}
var autoIncID interface{}
hasAutoIncID, _ := infoschema.HasAutoIncrementColumn(table)
if hasAutoIncID {
autoIncID, err = getAutoIncrementID(ctx, schema, table)
if err != nil {
return nil, err
}
}

var rowCount, dataLength, indexLength uint64
if table.GetPartitionInfo() == nil {
rowCount = tableRowsMap[table.ID]
dataLength, indexLength = getDataAndIndexLength(table, table.ID, rowCount, colLengthMap)
} else {
for _, pi := range table.GetPartitionInfo().Definitions {
rowCount += tableRowsMap[pi.ID]
parDataLen, parIndexLen := getDataAndIndexLength(table, pi.ID, tableRowsMap[pi.ID], colLengthMap)
dataLength += parDataLen
indexLength += parIndexLen
}
}
avgRowLength := uint64(0)
if rowCount != 0 {
avgRowLength = dataLength / rowCount
}

shardingInfo := infoschema.GetShardingInfo(schema, table)
record := types.MakeDatums(
infoschema.CatalogVal, // TABLE_CATALOG
schema.Name.O, // TABLE_SCHEMA
table.Name.O, // TABLE_NAME
"BASE TABLE", // TABLE_TYPE
"InnoDB", // ENGINE
uint64(10), // VERSION
"Compact", // ROW_FORMAT
rowCount, // TABLE_ROWS
avgRowLength, // AVG_ROW_LENGTH
dataLength, // DATA_LENGTH
uint64(0), // MAX_DATA_LENGTH
indexLength, // INDEX_LENGTH
uint64(0), // DATA_FREE
autoIncID, // AUTO_INCREMENT
createTime, // CREATE_TIME
nil, // UPDATE_TIME
nil, // CHECK_TIME
collation, // TABLE_COLLATION
nil, // CHECKSUM
createOptions, // CREATE_OPTIONS
table.Comment, // TABLE_COMMENT
table.ID, // TIDB_TABLE_ID
shardingInfo, // TIDB_ROW_ID_SHARDING_INFO
)
rows = append(rows, record)
} else {
record := types.MakeDatums(
infoschema.CatalogVal, // TABLE_CATALOG
schema.Name.O, // TABLE_SCHEMA
table.Name.O, // TABLE_NAME
"VIEW", // TABLE_TYPE
nil, // ENGINE
nil, // VERSION
nil, // ROW_FORMAT
nil, // TABLE_ROWS
nil, // AVG_ROW_LENGTH
nil, // DATA_LENGTH
nil, // MAX_DATA_LENGTH
nil, // INDEX_LENGTH
nil, // DATA_FREE
nil, // AUTO_INCREMENT
createTime, // CREATE_TIME
nil, // UPDATE_TIME
nil, // CHECK_TIME
nil, // TABLE_COLLATION
nil, // CHECKSUM
nil, // CREATE_OPTIONS
"VIEW", // TABLE_COMMENT
table.ID, // TIDB_TABLE_ID
nil, // TIDB_ROW_ID_SHARDING_INFO
)
rows = append(rows, record)
}
}
}
return rows, nil
}

func dataForPartitions(ctx sessionctx.Context, schemas []*model.DBInfo) ([][]types.Datum, error) {
tableRowsMap, colLengthMap, err := tableStatsCache.get(ctx)
if err != nil {
return nil, err
}
checker := privilege.GetPrivilegeManager(ctx)
var rows [][]types.Datum
createTimeTp := mysql.TypeDatetime
for _, schema := range schemas {
for _, table := range schema.Tables {
if checker != nil && !checker.RequestVerification(ctx.GetSessionVars().ActiveRoles, schema.Name.L, table.Name.L, "", mysql.SelectPriv) {
continue
}
createTime := types.NewTime(types.FromGoTime(table.GetUpdateTime()), createTimeTp, types.DefaultFsp)

var rowCount, dataLength, indexLength uint64
if table.GetPartitionInfo() == nil {
rowCount = tableRowsMap[table.ID]
dataLength, indexLength = getDataAndIndexLength(table, table.ID, rowCount, colLengthMap)
avgRowLength := uint64(0)
if rowCount != 0 {
avgRowLength = dataLength / rowCount
}
record := types.MakeDatums(
infoschema.CatalogVal, // TABLE_CATALOG
schema.Name.O, // TABLE_SCHEMA
table.Name.O, // TABLE_NAME
nil, // PARTITION_NAME
nil, // SUBPARTITION_NAME
nil, // PARTITION_ORDINAL_POSITION
nil, // SUBPARTITION_ORDINAL_POSITION
nil, // PARTITION_METHOD
nil, // SUBPARTITION_METHOD
nil, // PARTITION_EXPRESSION
nil, // SUBPARTITION_EXPRESSION
nil, // PARTITION_DESCRIPTION
rowCount, // TABLE_ROWS
avgRowLength, // AVG_ROW_LENGTH
dataLength, // DATA_LENGTH
nil, // MAX_DATA_LENGTH
indexLength, // INDEX_LENGTH
nil, // DATA_FREE
createTime, // CREATE_TIME
nil, // UPDATE_TIME
nil, // CHECK_TIME
nil, // CHECKSUM
nil, // PARTITION_COMMENT
nil, // NODEGROUP
nil, // TABLESPACE_NAME
)
rows = append(rows, record)
} else {
for i, pi := range table.GetPartitionInfo().Definitions {
rowCount = tableRowsMap[pi.ID]
dataLength, indexLength = getDataAndIndexLength(table, pi.ID, tableRowsMap[pi.ID], colLengthMap)

avgRowLength := uint64(0)
if rowCount != 0 {
avgRowLength = dataLength / rowCount
}

var partitionDesc string
if table.Partition.Type == model.PartitionTypeRange {
partitionDesc = pi.LessThan[0]
}

record := types.MakeDatums(
infoschema.CatalogVal, // TABLE_CATALOG
schema.Name.O, // TABLE_SCHEMA
table.Name.O, // TABLE_NAME
pi.Name.O, // PARTITION_NAME
nil, // SUBPARTITION_NAME
i+1, // PARTITION_ORDINAL_POSITION
nil, // SUBPARTITION_ORDINAL_POSITION
table.Partition.Type.String(), // PARTITION_METHOD
nil, // SUBPARTITION_METHOD
table.Partition.Expr, // PARTITION_EXPRESSION
nil, // SUBPARTITION_EXPRESSION
partitionDesc, // PARTITION_DESCRIPTION
rowCount, // TABLE_ROWS
avgRowLength, // AVG_ROW_LENGTH
dataLength, // DATA_LENGTH
uint64(0), // MAX_DATA_LENGTH
indexLength, // INDEX_LENGTH
uint64(0), // DATA_FREE
createTime, // CREATE_TIME
nil, // UPDATE_TIME
nil, // CHECK_TIME
nil, // CHECKSUM
pi.Comment, // PARTITION_COMMENT
nil, // NODEGROUP
nil, // TABLESPACE_NAME
)
rows = append(rows, record)
}
}
}
}
return rows, nil
}

func dataForIndexes(ctx sessionctx.Context, schemas []*model.DBInfo) ([][]types.Datum, error) {
checker := privilege.GetPrivilegeManager(ctx)
var rows [][]types.Datum
Expand Down
Loading

0 comments on commit 1f5fb46

Please sign in to comment.