Skip to content

Commit

Permalink
lightning: allocate autoid globally when increment-import is enabled (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
sleepymole authored Jul 11, 2022
1 parent 58e88e7 commit ae2cd91
Show file tree
Hide file tree
Showing 28 changed files with 439 additions and 87 deletions.
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/kv/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type panickingAllocator struct {
ty autoid.AllocatorType
}

// NewPanickingAllocator creates a PanickingAllocator shared by all allocation types.
// NewPanickingAllocators creates a PanickingAllocator shared by all allocation types.
func NewPanickingAllocators(base int64) autoid.Allocators {
sharedBase := &base
return autoid.NewAllocators(
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/checkpoints/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
)

type TidbDBInfo struct {
ID int64
Name string
Tables map[string]*TidbTableInfo
}
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/lightning/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,11 @@ func TableHasAutoRowID(info *model.TableInfo) bool {
return !info.PKIsHandle && !info.IsCommonHandle
}

// TableHasAutoID return whether table has auto generated id.
func TableHasAutoID(info *model.TableInfo) bool {
return TableHasAutoRowID(info) || info.GetAutoIncrementColInfo() != nil || info.ContainsAutoRandomBits()
}

// StringSliceEqual checks if two string slices are equal.
func StringSliceEqual(a, b []string) bool {
if len(a) != len(b) {
Expand Down
17 changes: 1 addition & 16 deletions br/pkg/lightning/restore/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/lightning/metric"
"github.com/pingcap/tidb/br/pkg/pdutil"
tidbcfg "github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/driver"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tipb/go-tipb"
"github.com/tikv/client-go/v2/oracle"
Expand Down Expand Up @@ -68,7 +66,7 @@ type ChecksumManager interface {
Checksum(ctx context.Context, tableInfo *checkpoints.TidbTableInfo) (*RemoteChecksum, error)
}

func newChecksumManager(ctx context.Context, rc *Controller) (ChecksumManager, error) {
func newChecksumManager(ctx context.Context, rc *Controller, store kv.Storage) (ChecksumManager, error) {
// if we don't need checksum, just return nil
if rc.cfg.TikvImporter.Backend == config.BackendTiDB || rc.cfg.PostRestore.Checksum == config.OpLevelOff {
return nil, nil
Expand All @@ -89,19 +87,6 @@ func newChecksumManager(ctx context.Context, rc *Controller) (ChecksumManager, e
return nil, errors.Trace(err)
}

// TODO: make tikv.Driver{}.Open use arguments instead of global variables
if tlsOpt.CAPath != "" {
conf := tidbcfg.GetGlobalConfig()
conf.Security.ClusterSSLCA = tlsOpt.CAPath
conf.Security.ClusterSSLCert = tlsOpt.CertPath
conf.Security.ClusterSSLKey = tlsOpt.KeyPath
tidbcfg.StoreGlobalConfig(conf)
}
store, err := driver.TiKVDriver{}.Open(fmt.Sprintf("tikv://%s?disableGC=true", pdAddr))
if err != nil {
return nil, errors.Trace(err)
}

manager = newTiKVChecksumManager(store.GetClient(), pdCli, uint(rc.cfg.TiDB.DistSQLScanConcurrency))
} else {
db, err := rc.tidbGlue.GetDB()
Expand Down
99 changes: 62 additions & 37 deletions br/pkg/lightning/restore/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,16 @@ import (
"database/sql"
"encoding/json"
"fmt"
"strings"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/lightning/backend/tidb"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/log"
verify "github.com/pingcap/tidb/br/pkg/lightning/verification"
"github.com/pingcap/tidb/br/pkg/pdutil"
"github.com/pingcap/tidb/br/pkg/redact"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -179,7 +178,8 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64
if err != nil {
return nil, 0, errors.Annotate(err, "enable pessimistic transaction failed")
}
needAutoID := common.TableHasAutoRowID(m.tr.tableInfo.Core) || m.tr.tableInfo.Core.GetAutoIncrementColInfo() != nil || m.tr.tableInfo.Core.ContainsAutoRandomBits()

needAutoID := common.TableHasAutoID(m.tr.tableInfo.Core)
err = exec.Transact(ctx, "init table allocator base", func(ctx context.Context, tx *sql.Tx) error {
rows, err := tx.QueryContext(
ctx,
Expand Down Expand Up @@ -244,44 +244,21 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64

// no enough info are available, fetch row_id max for table
if curStatus == metaStatusInitial {
if needAutoID && maxRowIDMax == 0 {
// NOTE: currently, if a table contains auto_incremental unique key and _tidb_rowid,
// the `show table next_row_id` will returns the unique key field only.
var autoIDField string
for _, col := range m.tr.tableInfo.Core.Columns {
if mysql.HasAutoIncrementFlag(col.GetFlag()) {
autoIDField = col.Name.L
break
} else if mysql.HasPriKeyFlag(col.GetFlag()) && m.tr.tableInfo.Core.AutoRandomBits > 0 {
autoIDField = col.Name.L
break
}
}
if len(autoIDField) == 0 && common.TableHasAutoRowID(m.tr.tableInfo.Core) {
autoIDField = model.ExtraHandleName.L
}
if len(autoIDField) == 0 {
return common.ErrAllocTableRowIDs.GenWithStack("table %s contains auto increment id or _tidb_rowid, but target field not found", m.tr.tableName)
if needAutoID {
// maxRowIDMax is the max row_id that other tasks has allocated, we need to rebase the global autoid base first.
if err := rebaseGlobalAutoID(ctx, maxRowIDMax, m.tr.kvStore, m.tr.dbInfo.ID, m.tr.tableInfo.Core); err != nil {
return errors.Trace(err)
}

autoIDInfos, err := tidb.FetchTableAutoIDInfos(ctx, tx, m.tr.tableName)
newRowIDBase, newRowIDMax, err = allocGlobalAutoID(ctx, rawRowIDMax, m.tr.kvStore, m.tr.dbInfo.ID, m.tr.tableInfo.Core)
if err != nil {
return errors.Trace(err)
}
found := false
for _, info := range autoIDInfos {
if strings.ToLower(info.Column) == autoIDField {
maxRowIDMax = info.NextID - 1
found = true
break
}
}
if !found {
return common.ErrAllocTableRowIDs.GenWithStack("can't fetch previous auto id base for table %s field '%s'", m.tr.tableName, autoIDField)
}
} else {
// Though we don't need auto ID, we still guarantee that the row ID is unique across all lightning instances.
newRowIDBase = maxRowIDMax
newRowIDMax = newRowIDBase + rawRowIDMax
}
newRowIDBase = maxRowIDMax
newRowIDMax = newRowIDBase + rawRowIDMax

// table contains no data, can skip checksum
if needAutoID && newRowIDBase == 0 && newStatus < metaStatusRestoreStarted {
newStatus = metaStatusRestoreStarted
Expand Down Expand Up @@ -1153,3 +1130,51 @@ func (m *singleTaskMetaMgr) CleanupAllMetas(ctx context.Context) error {

func (m *singleTaskMetaMgr) Close() {
}

func allocGlobalAutoID(ctx context.Context, n int64, store kv.Storage, dbID int64, tblInfo *model.TableInfo) (autoIDBase, autoIDMax int64, err error) {
alloc, err := getGlobalAutoIDAlloc(store, dbID, tblInfo)
if err != nil {
return 0, 0, err
}
return alloc.Alloc(ctx, uint64(n), 1, 1)
}

func rebaseGlobalAutoID(ctx context.Context, newBase int64, store kv.Storage, dbID int64, tblInfo *model.TableInfo) error {
alloc, err := getGlobalAutoIDAlloc(store, dbID, tblInfo)
if err != nil {
return err
}
return alloc.Rebase(ctx, newBase, false)
}

func getGlobalAutoIDAlloc(store kv.Storage, dbID int64, tblInfo *model.TableInfo) (autoid.Allocator, error) {
if store == nil {
return nil, errors.New("internal error: kv store should not be nil")
}
if dbID == 0 {
return nil, errors.New("internal error: dbID should not be 0")
}

// We don't need the cache here because we allocate all IDs at once.
// The argument for CustomAutoIncCacheOption is the cache step. step 1 means no cache.
noCache := autoid.CustomAutoIncCacheOption(1)
tblVer := autoid.AllocOptionTableInfoVersion(tblInfo.Version)

hasRowID := common.TableHasAutoRowID(tblInfo)
hasAutoIncID := tblInfo.GetAutoIncrementColInfo() != nil
hasAutoRandID := tblInfo.ContainsAutoRandomBits()

// Current TiDB has some limitations for auto ID.
// 1. Auto increment ID and auto row ID are using the same RowID allocator. See https://github.com/pingcap/tidb/issues/982.
// 2. Auto random column must be a clustered primary key. That is to say, there is no implicit row ID for tables with auto random column.
// 3. There is at most one auto column in a table.
// Therefore, we assume there is only one auto column in a table and use RowID allocator if possible.
switch {
case hasRowID || hasAutoIncID:
return autoid.NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoIncColUnsigned(), autoid.RowIDAllocType, noCache, tblVer), nil
case hasAutoRandID:
return autoid.NewAllocator(store, dbID, tblInfo.ID, tblInfo.IsAutoRandomBitColUnsigned(), autoid.AutoRandomType, noCache, tblVer), nil
default:
return nil, errors.Errorf("internal error: table %s has no auto ID", tblInfo.Name)
}
}
44 changes: 39 additions & 5 deletions br/pkg/lightning/restore/meta_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,13 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/lightning/verification"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/store/mockstore"
tmock "github.com/pingcap/tidb/util/mock"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
Expand All @@ -29,7 +33,7 @@ type metaMgrSuite struct {
checksumMgr *testChecksumMgr
}

func newTableRestore(t *testing.T) *TableRestore {
func newTableRestore(t *testing.T, kvStore kv.Storage) *TableRestore {
p := parser.New()
se := tmock.NewContext()

Expand All @@ -47,12 +51,35 @@ func newTableRestore(t *testing.T) *TableRestore {
Name: tb,
Core: tableInfo,
}
dbInfo := &checkpoints.TidbDBInfo{
ID: 1,
Name: schema,
Tables: map[string]*checkpoints.TidbTableInfo{
tb: ti,
},
}

ctx := kv.WithInternalSourceType(context.Background(), "test")
err = kv.RunInNewTxn(ctx, kvStore, false, func(ctx context.Context, txn kv.Transaction) error {
m := meta.NewMeta(txn)
if err := m.CreateDatabase(&model.DBInfo{ID: dbInfo.ID}); err != nil {
return err
}
if err := m.CreateTableOrView(dbInfo.ID, ti.Core); err != nil {
return err
}
return nil
})
require.NoError(t, err)

tableName := common.UniqueTable(schema, tb)
logger := log.With(zap.String("table", tableName))

return &TableRestore{
dbInfo: dbInfo,
tableName: tableName,
tableInfo: ti,
kvStore: kvStore,
logger: logger,
}
}
Expand All @@ -61,18 +88,24 @@ func newMetaMgrSuite(t *testing.T) (*metaMgrSuite, func()) {
db, m, err := sqlmock.New()
require.NoError(t, err)

storePath := t.TempDir()
kvStore, err := mockstore.NewMockStore(mockstore.WithPath(storePath))
require.NoError(t, err)

var s metaMgrSuite
s.mgr = &dbTableMetaMgr{
session: db,
taskID: 1,
tr: newTableRestore(t),
tr: newTableRestore(t, kvStore),
tableName: common.UniqueTable("test", TableMetaTableName),
needChecksum: true,
}
s.mockDB = m
s.checksumMgr = &testChecksumMgr{}

return &s, func() {
require.NoError(t, s.mockDB.ExpectationsWereMet())
require.NoError(t, kvStore.Close())
}
}

Expand Down Expand Up @@ -257,10 +290,11 @@ func (s *metaMgrSuite) prepareMock(rowsVal [][]driver.Value, nextRowID *int64, u
s.mockDB.ExpectQuery("\\QSELECT task_id, row_id_base, row_id_max, total_kvs_base, total_bytes_base, checksum_base, status from `test`.`table_meta` WHERE table_id = ? FOR UPDATE\\E").
WithArgs(int64(1)).
WillReturnRows(rows)

if nextRowID != nil {
s.mockDB.ExpectQuery("SHOW TABLE `test`.`t1` NEXT_ROW_ID").
WillReturnRows(sqlmock.NewRows([]string{"DB_NAME", "TABLE_NAME", "COLUMN_NAME", "NEXT_GLOBAL_ROW_ID", "ID_TYPE"}).
AddRow("test", "t1", "_tidb_rowid", *nextRowID, "AUTO_INCREMENT"))
allocs := autoid.NewAllocatorsFromTblInfo(s.mgr.tr.kvStore, s.mgr.tr.dbInfo.ID, s.mgr.tr.tableInfo.Core)
alloc := allocs.Get(autoid.RowIDAllocType)
alloc.ForceRebase(*nextRowID - 1)
}

if len(updateArgs) > 0 {
Expand Down
Loading

0 comments on commit ae2cd91

Please sign in to comment.