Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

pkg/lightning: replace local generated ts with tso fetch from pd #850

Merged
merged 3 commits into from
Mar 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"go.uber.org/zap"
Expand Down Expand Up @@ -298,7 +297,7 @@ func (be Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID
}

// OpenEngine opens an engine with the given table name and engine ID.
func (be Backend) OpenEngine(ctx context.Context, tableName string, engineID int32) (*OpenedEngine, error) {
func (be Backend) OpenEngine(ctx context.Context, tableName string, engineID int32, ts uint64) (*OpenedEngine, error) {
tag, engineUUID := MakeUUID(tableName, engineID)
logger := makeLogger(tag, engineUUID)

Expand Down Expand Up @@ -327,7 +326,7 @@ func (be Backend) OpenEngine(ctx context.Context, tableName string, engineID int
uuid: engineUUID,
},
tableName: tableName,
ts: oracle.ComposeTS(time.Now().Unix()*1000, 0),
ts: ts,
}, nil
}

Expand Down
15 changes: 9 additions & 6 deletions pkg/lightning/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/store/tikv/oracle"

kv "github.com/pingcap/br/pkg/lightning/backend"
"github.com/pingcap/br/pkg/mock"
Expand All @@ -18,6 +19,7 @@ type backendSuite struct {
controller *gomock.Controller
mockBackend *mock.MockBackend
backend kv.Backend
ts uint64
}

var _ = Suite(&backendSuite{})
Expand All @@ -29,6 +31,7 @@ func (s *backendSuite) setUpTest(c *C) {
s.controller = gomock.NewController(c)
s.mockBackend = mock.NewMockBackend(s.controller)
s.backend = kv.MakeBackend(s.mockBackend)
s.ts = oracle.ComposeTS(time.Now().Unix()*1000, 0)
}

func (s *backendSuite) tearDownTest() {
Expand Down Expand Up @@ -58,7 +61,7 @@ func (s *backendSuite) TestOpenCloseImportCleanUpEngine(c *C) {
Return(nil).
After(importCall)

engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1)
engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1, s.ts)
c.Assert(err, IsNil)
closedEngine, err := engine.Close(ctx)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -134,7 +137,7 @@ func (s *backendSuite) TestWriteEngine(c *C) {
AppendRows(ctx, "`db`.`table`", []string{"c1", "c2"}, gomock.Any(), rows2).
Return(nil)

engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1)
engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1, s.ts)
c.Assert(err, IsNil)
err = engine.WriteRows(ctx, []string{"c1", "c2"}, rows1)
c.Assert(err, IsNil)
Expand All @@ -155,7 +158,7 @@ func (s *backendSuite) TestWriteToEngineWithNothing(c *C) {
writer.EXPECT().Close().Return(nil)
s.mockBackend.EXPECT().LocalWriter(ctx, gomock.Any(), int64(kv.LocalMemoryTableSize)).Return(writer, nil)

engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1)
engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1, s.ts)
c.Assert(err, IsNil)
err = engine.WriteRows(ctx, nil, emptyRows)
c.Assert(err, IsNil)
Expand All @@ -170,7 +173,7 @@ func (s *backendSuite) TestOpenEngineFailed(c *C) {
s.mockBackend.EXPECT().OpenEngine(ctx, gomock.Any()).
Return(errors.New("fake unrecoverable open error"))

_, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1)
_, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1, s.ts)
c.Assert(err, ErrorMatches, "fake unrecoverable open error")
}

Expand All @@ -189,7 +192,7 @@ func (s *backendSuite) TestWriteEngineFailed(c *C) {
Return(errors.Annotate(context.Canceled, "fake unrecoverable write error"))
mockWriter.EXPECT().Close().Return(nil)

engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1)
engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1, s.ts)
c.Assert(err, IsNil)
err = engine.WriteRows(ctx, nil, rows)
c.Assert(err, ErrorMatches, "fake unrecoverable write error.*")
Expand All @@ -210,7 +213,7 @@ func (s *backendSuite) TestWriteBatchSendFailedWithRetry(c *C) {
MinTimes(1)
mockWriter.EXPECT().Close().Return(nil).MinTimes(1)

engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1)
engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1, s.ts)
c.Assert(err, IsNil)
err = engine.WriteRows(ctx, nil, rows)
c.Assert(err, ErrorMatches, ".*fake recoverable write batch error")
Expand Down
2 changes: 1 addition & 1 deletion pkg/lightning/backend/importer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (s *importerSuite) setUpTest(c *C) {
Return(nil, nil)

var err error
s.engine, err = importer.OpenEngine(s.ctx, "`db`.`table`", -1)
s.engine, err = importer.OpenEngine(s.ctx, "`db`.`table`", -1, 0)
c.Assert(err, IsNil)
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/lightning/backend/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (s *mysqlSuite) TestWriteRowsReplaceOnDup(c *C) {
ctx := context.Background()
logger := log.L()

engine, err := s.backend.OpenEngine(ctx, "`foo`.`bar`", 1)
engine, err := s.backend.OpenEngine(ctx, "`foo`.`bar`", 1, 0)
c.Assert(err, IsNil)

dataRows := s.backend.MakeEmptyRows()
Expand Down Expand Up @@ -126,7 +126,7 @@ func (s *mysqlSuite) TestWriteRowsIgnoreOnDup(c *C) {
logger := log.L()

ignoreBackend := kv.NewTiDBBackend(s.dbHandle, config.IgnoreOnDup)
engine, err := ignoreBackend.OpenEngine(ctx, "`foo`.`bar`", 1)
engine, err := ignoreBackend.OpenEngine(ctx, "`foo`.`bar`", 1, 0)
c.Assert(err, IsNil)

dataRows := ignoreBackend.MakeEmptyRows()
Expand Down Expand Up @@ -164,7 +164,7 @@ func (s *mysqlSuite) TestWriteRowsErrorOnDup(c *C) {
logger := log.L()

ignoreBackend := kv.NewTiDBBackend(s.dbHandle, config.ErrorOnDup)
engine, err := ignoreBackend.OpenEngine(ctx, "`foo`.`bar`", 1)
engine, err := ignoreBackend.OpenEngine(ctx, "`foo`.`bar`", 1, 0)
c.Assert(err, IsNil)

dataRows := ignoreBackend.MakeEmptyRows()
Expand Down
6 changes: 5 additions & 1 deletion pkg/lightning/restore/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,11 @@ func newTiKVChecksumManager(client kv.Client, pdClient pd.Client, distSQLScanCon
}

func (e *tikvChecksumManager) checksumDB(ctx context.Context, tableInfo *TidbTableInfo) (*RemoteChecksum, error) {
executor, err := checksum.NewExecutorBuilder(tableInfo.Core, oracle.ComposeTS(time.Now().Unix()*1000, 0)).
physicalTS, logicalTS, err := e.manager.pdClient.GetTS(ctx)
if err != nil {
return nil, errors.Annotate(err, "fetch tso from pd failed")
}
executor, err := checksum.NewExecutorBuilder(tableInfo.Core, oracle.ComposeTS(physicalTS, logicalTS)).
SetConcurrency(e.distSQLScanConcurrency).
Build()
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/lightning/restore/checksum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,10 @@ func (c *testPDClient) currentSafePoint() uint64 {
return 0
}

func (c *testPDClient) GetTS(ctx context.Context) (int64, int64, error) {
return time.Now().Unix(), 0, nil
}

func (c *testPDClient) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
if !strings.HasPrefix(serviceID, "lightning") {
panic("service ID must start with 'lightning'")
Expand Down
24 changes: 22 additions & 2 deletions pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
sstpb "github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/util/collate"
Expand Down Expand Up @@ -164,6 +165,9 @@ type RestoreController struct {

diskQuotaLock sync.RWMutex
diskQuotaState int32

// commit ts for local and importer backend
ts uint64
}

func NewRestoreController(
Expand Down Expand Up @@ -242,6 +246,21 @@ func NewRestoreControllerWithPauser(
return nil, errors.New("unknown backend: " + cfg.TikvImporter.Backend)
}

var ts uint64
if cfg.TikvImporter.Backend == config.BackendLocal || cfg.TikvImporter.Backend == config.BackendImporter {
pdController, err := pdutil.NewPdController(ctx, cfg.TiDB.PdAddr, tls.TLSConfig(), tls.ToPDSecurityOption())
if err != nil {
return nil, errors.Trace(err)
}
defer pdController.Close()

physical, logical, err := pdController.GetPDClient().GetTS(ctx)
if err != nil {
return nil, errors.Trace(err)
}
ts = oracle.ComposeTS(physical, logical)
}

rc := &RestoreController{
cfg: cfg,
dbMetas: dbMetas,
Expand All @@ -262,6 +281,7 @@ func NewRestoreControllerWithPauser(
closedEngineLimit: worker.NewPool(ctx, cfg.App.TableConcurrency*2, "closed-engine"),

store: s,
ts: ts,
}

return rc, nil
Expand Down Expand Up @@ -1266,7 +1286,7 @@ func (t *TableRestore) restoreEngines(ctx context.Context, rc *RestoreController
indexWorker := rc.indexWorkers.Apply()
defer rc.indexWorkers.Recycle(indexWorker)

indexEngine, err := rc.backend.OpenEngine(ctx, t.tableName, indexEngineID)
indexEngine, err := rc.backend.OpenEngine(ctx, t.tableName, indexEngineID, rc.ts)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -1415,7 +1435,7 @@ func (t *TableRestore) restoreEngine(

logTask := t.logger.With(zap.Int32("engineNumber", engineID)).Begin(zap.InfoLevel, "encode kv data and write")

dataEngine, err := rc.backend.OpenEngine(ctx, t.tableName, engineID)
dataEngine, err := rc.backend.OpenEngine(ctx, t.tableName, engineID, rc.ts)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/lightning/restore/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,11 +861,11 @@ func (s *chunkRestoreSuite) TestDeliverLoopEmptyData(c *C) {
AppendRows(ctx, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil).AnyTimes()

dataEngine, err := importer.OpenEngine(ctx, s.tr.tableName, 0)
dataEngine, err := importer.OpenEngine(ctx, s.tr.tableName, 0, 0)
c.Assert(err, IsNil)
dataWriter, err := dataEngine.LocalWriter(ctx, 2048)
c.Assert(err, IsNil)
indexEngine, err := importer.OpenEngine(ctx, s.tr.tableName, -1)
indexEngine, err := importer.OpenEngine(ctx, s.tr.tableName, -1, 0)
c.Assert(err, IsNil)
indexWriter, err := indexEngine.LocalWriter(ctx, 2048)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -898,9 +898,9 @@ func (s *chunkRestoreSuite) TestDeliverLoop(c *C) {
mockWriter := mock.NewMockEngineWriter(controller)
mockBackend.EXPECT().LocalWriter(ctx, gomock.Any(), int64(2048)).Return(mockWriter, nil).AnyTimes()

dataEngine, err := importer.OpenEngine(ctx, s.tr.tableName, 0)
dataEngine, err := importer.OpenEngine(ctx, s.tr.tableName, 0, 0)
c.Assert(err, IsNil)
indexEngine, err := importer.OpenEngine(ctx, s.tr.tableName, -1)
indexEngine, err := importer.OpenEngine(ctx, s.tr.tableName, -1, 0)
c.Assert(err, IsNil)

dataWriter, err := dataEngine.LocalWriter(ctx, 2048)
Expand Down Expand Up @@ -1113,9 +1113,9 @@ func (s *chunkRestoreSuite) TestRestore(c *C) {
mockClient.EXPECT().OpenEngine(ctx, gomock.Any()).Return(nil, nil)
mockClient.EXPECT().OpenEngine(ctx, gomock.Any()).Return(nil, nil)

dataEngine, err := importer.OpenEngine(ctx, s.tr.tableName, 0)
dataEngine, err := importer.OpenEngine(ctx, s.tr.tableName, 0, 0)
c.Assert(err, IsNil)
indexEngine, err := importer.OpenEngine(ctx, s.tr.tableName, -1)
indexEngine, err := importer.OpenEngine(ctx, s.tr.tableName, -1, 0)
c.Assert(err, IsNil)
dataWriter, err := dataEngine.LocalWriter(ctx, 2048)
c.Assert(err, IsNil)
Expand Down
Loading