Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
pkg/lightning: replace local generated ts with tso fetch from pd (#850)
Browse files Browse the repository at this point in the history
  • Loading branch information
glorv authored Mar 12, 2021
1 parent 3833d43 commit 8a3f5e2
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 90 deletions.
5 changes: 2 additions & 3 deletions pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"go.uber.org/zap"
Expand Down Expand Up @@ -298,7 +297,7 @@ func (be Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID
}

// OpenEngine opens an engine with the given table name and engine ID.
func (be Backend) OpenEngine(ctx context.Context, tableName string, engineID int32) (*OpenedEngine, error) {
func (be Backend) OpenEngine(ctx context.Context, tableName string, engineID int32, ts uint64) (*OpenedEngine, error) {
tag, engineUUID := MakeUUID(tableName, engineID)
logger := makeLogger(tag, engineUUID)

Expand Down Expand Up @@ -327,7 +326,7 @@ func (be Backend) OpenEngine(ctx context.Context, tableName string, engineID int
uuid: engineUUID,
},
tableName: tableName,
ts: oracle.ComposeTS(time.Now().Unix()*1000, 0),
ts: ts,
}, nil
}

Expand Down
15 changes: 9 additions & 6 deletions pkg/lightning/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/store/tikv/oracle"

kv "github.com/pingcap/br/pkg/lightning/backend"
"github.com/pingcap/br/pkg/mock"
Expand All @@ -18,6 +19,7 @@ type backendSuite struct {
controller *gomock.Controller
mockBackend *mock.MockBackend
backend kv.Backend
ts uint64
}

var _ = Suite(&backendSuite{})
Expand All @@ -29,6 +31,7 @@ func (s *backendSuite) setUpTest(c *C) {
s.controller = gomock.NewController(c)
s.mockBackend = mock.NewMockBackend(s.controller)
s.backend = kv.MakeBackend(s.mockBackend)
s.ts = oracle.ComposeTS(time.Now().Unix()*1000, 0)
}

func (s *backendSuite) tearDownTest() {
Expand Down Expand Up @@ -58,7 +61,7 @@ func (s *backendSuite) TestOpenCloseImportCleanUpEngine(c *C) {
Return(nil).
After(importCall)

engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1)
engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1, s.ts)
c.Assert(err, IsNil)
closedEngine, err := engine.Close(ctx)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -134,7 +137,7 @@ func (s *backendSuite) TestWriteEngine(c *C) {
AppendRows(ctx, "`db`.`table`", []string{"c1", "c2"}, gomock.Any(), rows2).
Return(nil)

engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1)
engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1, s.ts)
c.Assert(err, IsNil)
err = engine.WriteRows(ctx, []string{"c1", "c2"}, rows1)
c.Assert(err, IsNil)
Expand All @@ -155,7 +158,7 @@ func (s *backendSuite) TestWriteToEngineWithNothing(c *C) {
writer.EXPECT().Close().Return(nil)
s.mockBackend.EXPECT().LocalWriter(ctx, gomock.Any(), int64(kv.LocalMemoryTableSize)).Return(writer, nil)

engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1)
engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1, s.ts)
c.Assert(err, IsNil)
err = engine.WriteRows(ctx, nil, emptyRows)
c.Assert(err, IsNil)
Expand All @@ -170,7 +173,7 @@ func (s *backendSuite) TestOpenEngineFailed(c *C) {
s.mockBackend.EXPECT().OpenEngine(ctx, gomock.Any()).
Return(errors.New("fake unrecoverable open error"))

_, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1)
_, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1, s.ts)
c.Assert(err, ErrorMatches, "fake unrecoverable open error")
}

Expand All @@ -189,7 +192,7 @@ func (s *backendSuite) TestWriteEngineFailed(c *C) {
Return(errors.Annotate(context.Canceled, "fake unrecoverable write error"))
mockWriter.EXPECT().Close().Return(nil)

engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1)
engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1, s.ts)
c.Assert(err, IsNil)
err = engine.WriteRows(ctx, nil, rows)
c.Assert(err, ErrorMatches, "fake unrecoverable write error.*")
Expand All @@ -210,7 +213,7 @@ func (s *backendSuite) TestWriteBatchSendFailedWithRetry(c *C) {
MinTimes(1)
mockWriter.EXPECT().Close().Return(nil).MinTimes(1)

engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1)
engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1, s.ts)
c.Assert(err, IsNil)
err = engine.WriteRows(ctx, nil, rows)
c.Assert(err, ErrorMatches, ".*fake recoverable write batch error")
Expand Down
2 changes: 1 addition & 1 deletion pkg/lightning/backend/importer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (s *importerSuite) setUpTest(c *C) {
Return(nil, nil)

var err error
s.engine, err = importer.OpenEngine(s.ctx, "`db`.`table`", -1)
s.engine, err = importer.OpenEngine(s.ctx, "`db`.`table`", -1, 0)
c.Assert(err, IsNil)
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/lightning/backend/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (s *mysqlSuite) TestWriteRowsReplaceOnDup(c *C) {
ctx := context.Background()
logger := log.L()

engine, err := s.backend.OpenEngine(ctx, "`foo`.`bar`", 1)
engine, err := s.backend.OpenEngine(ctx, "`foo`.`bar`", 1, 0)
c.Assert(err, IsNil)

dataRows := s.backend.MakeEmptyRows()
Expand Down Expand Up @@ -126,7 +126,7 @@ func (s *mysqlSuite) TestWriteRowsIgnoreOnDup(c *C) {
logger := log.L()

ignoreBackend := kv.NewTiDBBackend(s.dbHandle, config.IgnoreOnDup)
engine, err := ignoreBackend.OpenEngine(ctx, "`foo`.`bar`", 1)
engine, err := ignoreBackend.OpenEngine(ctx, "`foo`.`bar`", 1, 0)
c.Assert(err, IsNil)

dataRows := ignoreBackend.MakeEmptyRows()
Expand Down Expand Up @@ -164,7 +164,7 @@ func (s *mysqlSuite) TestWriteRowsErrorOnDup(c *C) {
logger := log.L()

ignoreBackend := kv.NewTiDBBackend(s.dbHandle, config.ErrorOnDup)
engine, err := ignoreBackend.OpenEngine(ctx, "`foo`.`bar`", 1)
engine, err := ignoreBackend.OpenEngine(ctx, "`foo`.`bar`", 1, 0)
c.Assert(err, IsNil)

dataRows := ignoreBackend.MakeEmptyRows()
Expand Down
6 changes: 5 additions & 1 deletion pkg/lightning/restore/checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,11 @@ func newTiKVChecksumManager(client kv.Client, pdClient pd.Client, distSQLScanCon
}

func (e *tikvChecksumManager) checksumDB(ctx context.Context, tableInfo *TidbTableInfo) (*RemoteChecksum, error) {
executor, err := checksum.NewExecutorBuilder(tableInfo.Core, oracle.ComposeTS(time.Now().Unix()*1000, 0)).
physicalTS, logicalTS, err := e.manager.pdClient.GetTS(ctx)
if err != nil {
return nil, errors.Annotate(err, "fetch tso from pd failed")
}
executor, err := checksum.NewExecutorBuilder(tableInfo.Core, oracle.ComposeTS(physicalTS, logicalTS)).
SetConcurrency(e.distSQLScanConcurrency).
Build()
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/lightning/restore/checksum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,10 @@ func (c *testPDClient) currentSafePoint() uint64 {
return 0
}

func (c *testPDClient) GetTS(ctx context.Context) (int64, int64, error) {
return time.Now().Unix(), 0, nil
}

func (c *testPDClient) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) {
if !strings.HasPrefix(serviceID, "lightning") {
panic("service ID must start with 'lightning'")
Expand Down
24 changes: 22 additions & 2 deletions pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
sstpb "github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/util/collate"
Expand Down Expand Up @@ -164,6 +165,9 @@ type RestoreController struct {

diskQuotaLock sync.RWMutex
diskQuotaState int32

// commit ts for local and importer backend
ts uint64
}

func NewRestoreController(
Expand Down Expand Up @@ -242,6 +246,21 @@ func NewRestoreControllerWithPauser(
return nil, errors.New("unknown backend: " + cfg.TikvImporter.Backend)
}

var ts uint64
if cfg.TikvImporter.Backend == config.BackendLocal || cfg.TikvImporter.Backend == config.BackendImporter {
pdController, err := pdutil.NewPdController(ctx, cfg.TiDB.PdAddr, tls.TLSConfig(), tls.ToPDSecurityOption())
if err != nil {
return nil, errors.Trace(err)
}
defer pdController.Close()

physical, logical, err := pdController.GetPDClient().GetTS(ctx)
if err != nil {
return nil, errors.Trace(err)
}
ts = oracle.ComposeTS(physical, logical)
}

rc := &RestoreController{
cfg: cfg,
dbMetas: dbMetas,
Expand All @@ -262,6 +281,7 @@ func NewRestoreControllerWithPauser(
closedEngineLimit: worker.NewPool(ctx, cfg.App.TableConcurrency*2, "closed-engine"),

store: s,
ts: ts,
}

return rc, nil
Expand Down Expand Up @@ -1266,7 +1286,7 @@ func (t *TableRestore) restoreEngines(ctx context.Context, rc *RestoreController
indexWorker := rc.indexWorkers.Apply()
defer rc.indexWorkers.Recycle(indexWorker)

indexEngine, err := rc.backend.OpenEngine(ctx, t.tableName, indexEngineID)
indexEngine, err := rc.backend.OpenEngine(ctx, t.tableName, indexEngineID, rc.ts)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -1415,7 +1435,7 @@ func (t *TableRestore) restoreEngine(

logTask := t.logger.With(zap.Int32("engineNumber", engineID)).Begin(zap.InfoLevel, "encode kv data and write")

dataEngine, err := rc.backend.OpenEngine(ctx, t.tableName, engineID)
dataEngine, err := rc.backend.OpenEngine(ctx, t.tableName, engineID, rc.ts)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/lightning/restore/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -861,11 +861,11 @@ func (s *chunkRestoreSuite) TestDeliverLoopEmptyData(c *C) {
AppendRows(ctx, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(nil).AnyTimes()

dataEngine, err := importer.OpenEngine(ctx, s.tr.tableName, 0)
dataEngine, err := importer.OpenEngine(ctx, s.tr.tableName, 0, 0)
c.Assert(err, IsNil)
dataWriter, err := dataEngine.LocalWriter(ctx, 2048)
c.Assert(err, IsNil)
indexEngine, err := importer.OpenEngine(ctx, s.tr.tableName, -1)
indexEngine, err := importer.OpenEngine(ctx, s.tr.tableName, -1, 0)
c.Assert(err, IsNil)
indexWriter, err := indexEngine.LocalWriter(ctx, 2048)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -898,9 +898,9 @@ func (s *chunkRestoreSuite) TestDeliverLoop(c *C) {
mockWriter := mock.NewMockEngineWriter(controller)
mockBackend.EXPECT().LocalWriter(ctx, gomock.Any(), int64(2048)).Return(mockWriter, nil).AnyTimes()

dataEngine, err := importer.OpenEngine(ctx, s.tr.tableName, 0)
dataEngine, err := importer.OpenEngine(ctx, s.tr.tableName, 0, 0)
c.Assert(err, IsNil)
indexEngine, err := importer.OpenEngine(ctx, s.tr.tableName, -1)
indexEngine, err := importer.OpenEngine(ctx, s.tr.tableName, -1, 0)
c.Assert(err, IsNil)

dataWriter, err := dataEngine.LocalWriter(ctx, 2048)
Expand Down Expand Up @@ -1113,9 +1113,9 @@ func (s *chunkRestoreSuite) TestRestore(c *C) {
mockClient.EXPECT().OpenEngine(ctx, gomock.Any()).Return(nil, nil)
mockClient.EXPECT().OpenEngine(ctx, gomock.Any()).Return(nil, nil)

dataEngine, err := importer.OpenEngine(ctx, s.tr.tableName, 0)
dataEngine, err := importer.OpenEngine(ctx, s.tr.tableName, 0, 0)
c.Assert(err, IsNil)
indexEngine, err := importer.OpenEngine(ctx, s.tr.tableName, -1)
indexEngine, err := importer.OpenEngine(ctx, s.tr.tableName, -1, 0)
c.Assert(err, IsNil)
dataWriter, err := dataEngine.LocalWriter(ctx, 2048)
c.Assert(err, IsNil)
Expand Down
Loading

0 comments on commit 8a3f5e2

Please sign in to comment.