Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor, meta: Allocate auto id for global temporary tables #24506

Merged
merged 8 commits into from
May 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 117 additions & 0 deletions executor/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1590,3 +1590,120 @@ func (s *testSuite10) TestBinaryLiteralInsertToSet(c *C) {
tk.MustExec("insert into bintest(h) values(0x61)")
tk.MustQuery("select * from bintest").Check(testkit.Rows("a"))
}

var _ = SerialSuites(&testSuite13{&baseTestSuite{}})

type testSuite13 struct {
*baseTestSuite
}

func (s *testSuite13) TestGlobalTempTableAutoInc(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec(`use test`)
tk.MustExec("drop table if exists temp_test")
tk.MustExec("create global temporary table temp_test(id int primary key auto_increment) on commit delete rows")
defer tk.MustExec("drop table if exists temp_test")

// Data is cleared after transaction auto commits.
tk.MustExec("insert into temp_test(id) values(0)")
tk.MustQuery("select * from temp_test").Check(testkit.Rows())

// Data is not cleared inside a transaction.
tk.MustExec("begin")
tk.MustExec("insert into temp_test(id) values(0)")
tk.MustQuery("select * from temp_test").Check(testkit.Rows("1"))
tk.MustExec("commit")

// AutoID allocator is cleared.
tk.MustExec("begin")
tk.MustExec("insert into temp_test(id) values(0)")
tk.MustQuery("select * from temp_test").Check(testkit.Rows("1"))
// Test whether auto-inc is incremental
tk.MustExec("insert into temp_test(id) values(0)")
tk.MustQuery("select id from temp_test order by id").Check(testkit.Rows("1", "2"))
tk.MustExec("commit")

// multi-value insert
tk.MustExec("begin")
tk.MustExec("insert into temp_test(id) values(0), (0)")
tk.MustQuery("select id from temp_test order by id").Check(testkit.Rows("1", "2"))
tk.MustExec("insert into temp_test(id) values(0), (0)")
tk.MustQuery("select id from temp_test order by id").Check(testkit.Rows("1", "2", "3", "4"))
tk.MustExec("commit")

// rebase
tk.MustExec("begin")
tk.MustExec("insert into temp_test(id) values(10)")
tk.MustExec("insert into temp_test(id) values(0)")
tk.MustQuery("select id from temp_test order by id").Check(testkit.Rows("10", "11"))
tk.MustExec("insert into temp_test(id) values(20), (30)")
tk.MustExec("insert into temp_test(id) values(0), (0)")
tk.MustQuery("select id from temp_test order by id").Check(testkit.Rows("10", "11", "20", "30", "31", "32"))
tk.MustExec("commit")
}

func (s *testSuite13) TestGlobalTempTableRowID(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec(`use test`)
tk.MustExec("drop table if exists temp_test")
tk.MustExec("create global temporary table temp_test(id int) on commit delete rows")
defer tk.MustExec("drop table if exists temp_test")

// Data is cleared after transaction auto commits.
tk.MustExec("insert into temp_test(id) values(0)")
tk.MustQuery("select _tidb_rowid from temp_test").Check(testkit.Rows())

// Data is not cleared inside a transaction.
tk.MustExec("begin")
tk.MustExec("insert into temp_test(id) values(0)")
tk.MustQuery("select _tidb_rowid from temp_test").Check(testkit.Rows("1"))
tk.MustExec("commit")

// AutoID allocator is cleared.
tk.MustExec("begin")
tk.MustExec("insert into temp_test(id) values(0)")
tk.MustQuery("select _tidb_rowid from temp_test").Check(testkit.Rows("1"))
// Test whether row id is incremental
tk.MustExec("insert into temp_test(id) values(0)")
tk.MustQuery("select _tidb_rowid from temp_test order by _tidb_rowid").Check(testkit.Rows("1", "2"))
tk.MustExec("commit")

// multi-value insert
tk.MustExec("begin")
tk.MustExec("insert into temp_test(id) values(0), (0)")
tk.MustQuery("select _tidb_rowid from temp_test order by _tidb_rowid").Check(testkit.Rows("1", "2"))
tk.MustExec("insert into temp_test(id) values(0), (0)")
tk.MustQuery("select _tidb_rowid from temp_test order by _tidb_rowid").Check(testkit.Rows("1", "2", "3", "4"))
tk.MustExec("commit")
}

func (s *testSuite13) TestGlobalTempTableParallel(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec(`use test`)
tk.MustExec("drop table if exists temp_test")
tk.MustExec("create global temporary table temp_test(id int primary key auto_increment) on commit delete rows")
defer tk.MustExec("drop table if exists temp_test")

threads := 8
loops := 1
wg := sync.WaitGroup{}
wg.Add(threads)

insertFunc := func() {
defer wg.Done()
newTk := testkit.NewTestKitWithInit(c, s.store)
newTk.MustExec("begin")
for i := 0; i < loops; i++ {
newTk.MustExec("insert temp_test value(0)")
newTk.MustExec("insert temp_test value(0), (0)")
}
maxID := strconv.Itoa(loops * 3)
AilinKid marked this conversation as resolved.
Show resolved Hide resolved
newTk.MustQuery("select max(id) from temp_test").Check(testkit.Rows(maxID))
newTk.MustExec("commit")
}

for i := 0; i < threads; i++ {
go insertFunc()
}
wg.Wait()
}
24 changes: 23 additions & 1 deletion sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/parser"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/auth"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
pumpcli "github.com/pingcap/tidb-tools/tidb-binlog/pump_client"
Expand All @@ -49,6 +50,7 @@ import (
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/rowcodec"
"github.com/pingcap/tidb/util/stringutil"
"github.com/pingcap/tidb/util/tableutil"
"github.com/pingcap/tidb/util/timeutil"
"github.com/twmb/murmur3"
atomic2 "go.uber.org/atomic"
Expand Down Expand Up @@ -174,7 +176,9 @@ type TransactionContext struct {
// TableDeltaMap lock to prevent potential data race
tdmLock sync.Mutex

GlobalTemporaryTables map[int64]struct{}
// GlobalTemporaryTables is used to store transaction-specific information for global temporary tables.
// It can also be stored in sessionCtx with local temporary tables, but it's easier to clean this data after transaction ends.
GlobalTemporaryTables map[int64]tableutil.TempTable
}

// GetShard returns the shard prefix for the next `count` rowids.
Expand Down Expand Up @@ -1456,6 +1460,24 @@ func (s *SessionVars) LazyCheckKeyNotExists() bool {
return s.PresumeKeyNotExists || (s.TxnCtx.IsPessimistic && !s.StmtCtx.DupKeyAsWarning)
}

// GetTemporaryTable returns a TempTable by tableInfo.
func (s *SessionVars) GetTemporaryTable(tblInfo *model.TableInfo) tableutil.TempTable {
if tblInfo.TempTableType == model.TempTableGlobal {
if s.TxnCtx.GlobalTemporaryTables == nil {
s.TxnCtx.GlobalTemporaryTables = make(map[int64]tableutil.TempTable)
}
globalTempTables := s.TxnCtx.GlobalTemporaryTables
globalTempTable, ok := globalTempTables[tblInfo.ID]
if !ok {
globalTempTable = tableutil.TempTableFromMeta(tblInfo)
globalTempTables[tblInfo.ID] = globalTempTable
}
return globalTempTable
}
// TODO: check local temporary tables
return nil
}

// special session variables.
const (
SQLModeVar = "sql_mode"
Expand Down
73 changes: 60 additions & 13 deletions table/tables/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
tikvstore "github.com/pingcap/tidb/store/tikv/kv"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
Expand All @@ -46,6 +47,7 @@ import (
"github.com/pingcap/tidb/util/generatedexpr"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/stringutil"
"github.com/pingcap/tidb/util/tableutil"
"github.com/pingcap/tipb/go-binlog"
"github.com/pingcap/tipb/go-tipb"
"go.uber.org/zap"
Expand Down Expand Up @@ -322,8 +324,8 @@ func (t *TableCommon) UpdateRecord(ctx context.Context, sctx sessionctx.Context,
sh := memBuffer.Staging()
defer memBuffer.Cleanup(sh)

if meta := t.Meta(); meta.TempTableType == model.TempTableGlobal {
addTemporaryTableID(sctx, meta.ID)
if m := t.Meta(); m.TempTableType == model.TempTableGlobal {
addTemporaryTable(sctx, m)
}

var colIDs, binlogColIDs []int64
Expand Down Expand Up @@ -588,12 +590,9 @@ func TryGetCommonPkColumns(tbl table.Table) []*table.Column {
return pkCols
}

func addTemporaryTableID(sctx sessionctx.Context, id int64) {
txnCtx := sctx.GetSessionVars().TxnCtx
if txnCtx.GlobalTemporaryTables == nil {
txnCtx.GlobalTemporaryTables = make(map[int64]struct{})
}
txnCtx.GlobalTemporaryTables[id] = struct{}{}
func addTemporaryTable(sctx sessionctx.Context, tblInfo *model.TableInfo) {
tempTable := sctx.GetSessionVars().GetTemporaryTable(tblInfo)
tempTable.SetModified(true)
}

// AddRecord implements table.Table AddRecord interface.
Expand All @@ -608,8 +607,8 @@ func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts .
fn.ApplyOn(&opt)
}

if meta := t.Meta(); meta.TempTableType == model.TempTableGlobal {
addTemporaryTableID(sctx, meta.ID)
if m := t.Meta(); m.TempTableType == model.TempTableGlobal {
addTemporaryTable(sctx, m)
}

var ctx context.Context
Expand Down Expand Up @@ -1010,8 +1009,8 @@ func (t *TableCommon) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []type
return err
}

if meta := t.Meta(); meta.TempTableType == model.TempTableGlobal {
addTemporaryTableID(ctx, meta.ID)
if m := t.Meta(); m.TempTableType == model.TempTableGlobal {
addTemporaryTable(ctx, m)
}

// The table has non-public column and this column is doing the operation of "modify/change column".
Expand Down Expand Up @@ -1370,7 +1369,14 @@ func OverflowShardBits(recordID int64, shardRowIDBits uint64, typeBitsLength uin

// Allocators implements table.Table Allocators interface.
func (t *TableCommon) Allocators(ctx sessionctx.Context) autoid.Allocators {
if ctx == nil || ctx.GetSessionVars().IDAllocator == nil {
if ctx == nil {
return t.allocs
} else if ctx.GetSessionVars().IDAllocator == nil {
// Use an independent allocator for global temporary tables.
if t.meta.TempTableType == model.TempTableGlobal {
alloc := ctx.GetSessionVars().GetTemporaryTable(t.meta).GetAutoIDAllocator()
return autoid.Allocators{alloc}
}
return t.allocs
}

Expand Down Expand Up @@ -1498,6 +1504,7 @@ func getDuplicateErrorHandleString(t table.Table, handle kv.Handle, row []types.
func init() {
table.TableFromMeta = TableFromMeta
table.MockTableFromMeta = MockTableFromMeta
tableutil.TempTableFromMeta = TempTableFromMeta
}

// sequenceCommon cache the sequence value.
Expand Down Expand Up @@ -1763,3 +1770,43 @@ func BuildTableScanFromInfos(tableInfo *model.TableInfo, columnInfos []*model.Co
}
return tsExec
}

// TemporaryTable is used to store transaction-specific or session-specific information for global / local temporary tables.
// For example, stats and autoID should have their own copies of data, instead of being shared by all sessions.
type TemporaryTable struct {
// Whether it's modified in this transaction.
modified bool
djshow832 marked this conversation as resolved.
Show resolved Hide resolved
// The stats of this table. So far it's always pseudo stats.
stats *statistics.Table
// The autoID allocator of this table.
autoIDAllocator autoid.Allocator
}

// TempTableFromMeta builds a TempTable from model.TableInfo.
func TempTableFromMeta(tblInfo *model.TableInfo) tableutil.TempTable {
return &TemporaryTable{
modified: false,
stats: statistics.PseudoTable(tblInfo),
autoIDAllocator: autoid.NewAllocatorFromTempTblInfo(tblInfo),
}
}

// GetAutoIDAllocator is implemented from TempTable.GetAutoIDAllocator.
func (t *TemporaryTable) GetAutoIDAllocator() autoid.Allocator {
return t.autoIDAllocator
}

// SetModified is implemented from TempTable.SetModified.
func (t *TemporaryTable) SetModified(modified bool) {
t.modified = modified
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved
}

// GetModified is implemented from TempTable.GetModified.
func (t *TemporaryTable) GetModified() bool {
return t.modified
}

// GetStats is implemented from TempTable.GetStats.
func (t *TemporaryTable) GetStats() interface{} {
return t.stats
}
40 changes: 40 additions & 0 deletions util/tableutil/tableutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package tableutil

import (
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/meta/autoid"
)

// TempTable is used to store transaction-specific or session-specific information for global / local temporary tables.
// For example, stats and autoID should have their own copies of data, instead of being shared by all sessions.
type TempTable interface {
// GetAutoIDAllocator gets the autoID allocator of this table.
GetAutoIDAllocator() autoid.Allocator

// SetModified sets that the table is modified.
SetModified(bool)

// GetModified queries whether the table is modified.
GetModified() bool

// The stats of this table (*statistics.Table).
// Define the return type as interface{} here to avoid cycle imports.
GetStats() interface{}
}

// TempTableFromMeta builds a TempTable from *model.TableInfo.
// Currently, it is assigned to tables.TempTableFromMeta in tidb package's init function.
var TempTableFromMeta func(tblInfo *model.TableInfo) TempTable