diff --git a/pkg/lightning/backend/backend.go b/pkg/lightning/backend/backend.go index 22300462c..763f4fa98 100644 --- a/pkg/lightning/backend/backend.go +++ b/pkg/lightning/backend/backend.go @@ -23,7 +23,6 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/parser/model" - "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" "go.uber.org/zap" @@ -298,7 +297,7 @@ func (be Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID } // OpenEngine opens an engine with the given table name and engine ID. -func (be Backend) OpenEngine(ctx context.Context, tableName string, engineID int32) (*OpenedEngine, error) { +func (be Backend) OpenEngine(ctx context.Context, tableName string, engineID int32, ts uint64) (*OpenedEngine, error) { tag, engineUUID := MakeUUID(tableName, engineID) logger := makeLogger(tag, engineUUID) @@ -327,7 +326,7 @@ func (be Backend) OpenEngine(ctx context.Context, tableName string, engineID int uuid: engineUUID, }, tableName: tableName, - ts: oracle.ComposeTS(time.Now().Unix()*1000, 0), + ts: ts, }, nil } diff --git a/pkg/lightning/backend/backend_test.go b/pkg/lightning/backend/backend_test.go index 64297af24..377df2a9a 100644 --- a/pkg/lightning/backend/backend_test.go +++ b/pkg/lightning/backend/backend_test.go @@ -9,6 +9,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/errors" "github.com/pingcap/parser/mysql" + "github.com/pingcap/tidb/store/tikv/oracle" kv "github.com/pingcap/br/pkg/lightning/backend" "github.com/pingcap/br/pkg/lightning/mock" @@ -18,6 +19,7 @@ type backendSuite struct { controller *gomock.Controller mockBackend *mock.MockBackend backend kv.Backend + ts uint64 } var _ = Suite(&backendSuite{}) @@ -29,6 +31,7 @@ func (s *backendSuite) setUpTest(c *C) { s.controller = gomock.NewController(c) s.mockBackend = mock.NewMockBackend(s.controller) s.backend = kv.MakeBackend(s.mockBackend) + s.ts = oracle.ComposeTS(time.Now().Unix()*1000, 0) } func (s *backendSuite) tearDownTest() { @@ -58,7 +61,7 @@ func (s *backendSuite) TestOpenCloseImportCleanUpEngine(c *C) { Return(nil). After(importCall) - engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1) + engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1, s.ts) c.Assert(err, IsNil) closedEngine, err := engine.Close(ctx) c.Assert(err, IsNil) @@ -134,7 +137,7 @@ func (s *backendSuite) TestWriteEngine(c *C) { AppendRows(ctx, "`db`.`table`", []string{"c1", "c2"}, gomock.Any(), rows2). Return(nil) - engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1) + engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1, s.ts) c.Assert(err, IsNil) err = engine.WriteRows(ctx, []string{"c1", "c2"}, rows1) c.Assert(err, IsNil) @@ -155,7 +158,7 @@ func (s *backendSuite) TestWriteToEngineWithNothing(c *C) { writer.EXPECT().Close().Return(nil) s.mockBackend.EXPECT().LocalWriter(ctx, gomock.Any(), int64(kv.LocalMemoryTableSize)).Return(writer, nil) - engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1) + engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1, s.ts) c.Assert(err, IsNil) err = engine.WriteRows(ctx, nil, emptyRows) c.Assert(err, IsNil) @@ -170,7 +173,7 @@ func (s *backendSuite) TestOpenEngineFailed(c *C) { s.mockBackend.EXPECT().OpenEngine(ctx, gomock.Any()). Return(errors.New("fake unrecoverable open error")) - _, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1) + _, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1, s.ts) c.Assert(err, ErrorMatches, "fake unrecoverable open error") } @@ -189,7 +192,7 @@ func (s *backendSuite) TestWriteEngineFailed(c *C) { Return(errors.Annotate(context.Canceled, "fake unrecoverable write error")) mockWriter.EXPECT().Close().Return(nil) - engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1) + engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1, s.ts) c.Assert(err, IsNil) err = engine.WriteRows(ctx, nil, rows) c.Assert(err, ErrorMatches, "fake unrecoverable write error.*") @@ -210,7 +213,7 @@ func (s *backendSuite) TestWriteBatchSendFailedWithRetry(c *C) { MinTimes(1) mockWriter.EXPECT().Close().Return(nil).MinTimes(1) - engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1) + engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1, s.ts) c.Assert(err, IsNil) err = engine.WriteRows(ctx, nil, rows) c.Assert(err, ErrorMatches, ".*fake recoverable write batch error") diff --git a/pkg/lightning/backend/importer_test.go b/pkg/lightning/backend/importer_test.go index 5819e2da9..f7f67a29a 100644 --- a/pkg/lightning/backend/importer_test.go +++ b/pkg/lightning/backend/importer_test.go @@ -73,7 +73,7 @@ func (s *importerSuite) setUpTest(c *C) { Return(nil, nil) var err error - s.engine, err = importer.OpenEngine(s.ctx, "`db`.`table`", -1) + s.engine, err = importer.OpenEngine(s.ctx, "`db`.`table`", -1, 0) c.Assert(err, IsNil) } diff --git a/pkg/lightning/backend/tidb_test.go b/pkg/lightning/backend/tidb_test.go index acb87b9ad..c485716cd 100644 --- a/pkg/lightning/backend/tidb_test.go +++ b/pkg/lightning/backend/tidb_test.go @@ -79,7 +79,7 @@ func (s *mysqlSuite) TestWriteRowsReplaceOnDup(c *C) { ctx := context.Background() logger := log.L() - engine, err := s.backend.OpenEngine(ctx, "`foo`.`bar`", 1) + engine, err := s.backend.OpenEngine(ctx, "`foo`.`bar`", 1, 0) c.Assert(err, IsNil) dataRows := s.backend.MakeEmptyRows() @@ -126,7 +126,7 @@ func (s *mysqlSuite) TestWriteRowsIgnoreOnDup(c *C) { logger := log.L() ignoreBackend := kv.NewTiDBBackend(s.dbHandle, config.IgnoreOnDup) - engine, err := ignoreBackend.OpenEngine(ctx, "`foo`.`bar`", 1) + engine, err := ignoreBackend.OpenEngine(ctx, "`foo`.`bar`", 1, 0) c.Assert(err, IsNil) dataRows := ignoreBackend.MakeEmptyRows() @@ -164,7 +164,7 @@ func (s *mysqlSuite) TestWriteRowsErrorOnDup(c *C) { logger := log.L() ignoreBackend := kv.NewTiDBBackend(s.dbHandle, config.ErrorOnDup) - engine, err := ignoreBackend.OpenEngine(ctx, "`foo`.`bar`", 1) + engine, err := ignoreBackend.OpenEngine(ctx, "`foo`.`bar`", 1, 0) c.Assert(err, IsNil) dataRows := ignoreBackend.MakeEmptyRows() diff --git a/pkg/lightning/mock/backend.go b/pkg/lightning/mock/backend.go index 8f69935c3..25ab7427f 100644 --- a/pkg/lightning/mock/backend.go +++ b/pkg/lightning/mock/backend.go @@ -8,6 +8,9 @@ package mock import ( context "context" + reflect "reflect" + time "time" + gomock "github.com/golang/mock/gomock" uuid "github.com/google/uuid" backend "github.com/pingcap/br/pkg/lightning/backend" @@ -16,34 +19,32 @@ import ( model "github.com/pingcap/parser/model" table "github.com/pingcap/tidb/table" types "github.com/pingcap/tidb/types" - reflect "reflect" - time "time" ) -// MockBackend is a mock of AbstractBackend interface +// MockBackend is a mock of AbstractBackend interface. type MockBackend struct { ctrl *gomock.Controller recorder *MockBackendMockRecorder } -// MockBackendMockRecorder is the mock recorder for MockBackend +// MockBackendMockRecorder is the mock recorder for MockBackend. type MockBackendMockRecorder struct { mock *MockBackend } -// NewMockBackend creates a new mock instance +// NewMockBackend creates a new mock instance. func NewMockBackend(ctrl *gomock.Controller) *MockBackend { mock := &MockBackend{ctrl: ctrl} mock.recorder = &MockBackendMockRecorder{mock} return mock } -// EXPECT returns an object that allows the caller to indicate expected use +// EXPECT returns an object that allows the caller to indicate expected use. func (m *MockBackend) EXPECT() *MockBackendMockRecorder { return m.recorder } -// CheckRequirements mocks base method +// CheckRequirements mocks base method. func (m *MockBackend) CheckRequirements(arg0 context.Context) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "CheckRequirements", arg0) @@ -51,13 +52,13 @@ func (m *MockBackend) CheckRequirements(arg0 context.Context) error { return ret0 } -// CheckRequirements indicates an expected call of CheckRequirements +// CheckRequirements indicates an expected call of CheckRequirements. func (mr *MockBackendMockRecorder) CheckRequirements(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CheckRequirements", reflect.TypeOf((*MockBackend)(nil).CheckRequirements), arg0) } -// CleanupEngine mocks base method +// CleanupEngine mocks base method. func (m *MockBackend) CleanupEngine(arg0 context.Context, arg1 uuid.UUID) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "CleanupEngine", arg0, arg1) @@ -65,25 +66,25 @@ func (m *MockBackend) CleanupEngine(arg0 context.Context, arg1 uuid.UUID) error return ret0 } -// CleanupEngine indicates an expected call of CleanupEngine +// CleanupEngine indicates an expected call of CleanupEngine. func (mr *MockBackendMockRecorder) CleanupEngine(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CleanupEngine", reflect.TypeOf((*MockBackend)(nil).CleanupEngine), arg0, arg1) } -// Close mocks base method +// Close mocks base method. func (m *MockBackend) Close() { m.ctrl.T.Helper() m.ctrl.Call(m, "Close") } -// Close indicates an expected call of Close +// Close indicates an expected call of Close. func (mr *MockBackendMockRecorder) Close() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockBackend)(nil).Close)) } -// CloseEngine mocks base method +// CloseEngine mocks base method. func (m *MockBackend) CloseEngine(arg0 context.Context, arg1 uuid.UUID) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "CloseEngine", arg0, arg1) @@ -91,13 +92,13 @@ func (m *MockBackend) CloseEngine(arg0 context.Context, arg1 uuid.UUID) error { return ret0 } -// CloseEngine indicates an expected call of CloseEngine +// CloseEngine indicates an expected call of CloseEngine. func (mr *MockBackendMockRecorder) CloseEngine(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseEngine", reflect.TypeOf((*MockBackend)(nil).CloseEngine), arg0, arg1) } -// EngineFileSizes mocks base method +// EngineFileSizes mocks base method. func (m *MockBackend) EngineFileSizes() []backend.EngineFileSize { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "EngineFileSizes") @@ -105,13 +106,13 @@ func (m *MockBackend) EngineFileSizes() []backend.EngineFileSize { return ret0 } -// EngineFileSizes indicates an expected call of EngineFileSizes +// EngineFileSizes indicates an expected call of EngineFileSizes. func (mr *MockBackendMockRecorder) EngineFileSizes() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "EngineFileSizes", reflect.TypeOf((*MockBackend)(nil).EngineFileSizes)) } -// FetchRemoteTableModels mocks base method +// FetchRemoteTableModels mocks base method. func (m *MockBackend) FetchRemoteTableModels(arg0 context.Context, arg1 string) ([]*model.TableInfo, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "FetchRemoteTableModels", arg0, arg1) @@ -120,13 +121,13 @@ func (m *MockBackend) FetchRemoteTableModels(arg0 context.Context, arg1 string) return ret0, ret1 } -// FetchRemoteTableModels indicates an expected call of FetchRemoteTableModels +// FetchRemoteTableModels indicates an expected call of FetchRemoteTableModels. func (mr *MockBackendMockRecorder) FetchRemoteTableModels(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchRemoteTableModels", reflect.TypeOf((*MockBackend)(nil).FetchRemoteTableModels), arg0, arg1) } -// FlushAllEngines mocks base method +// FlushAllEngines mocks base method. func (m *MockBackend) FlushAllEngines(arg0 context.Context) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "FlushAllEngines", arg0) @@ -134,13 +135,13 @@ func (m *MockBackend) FlushAllEngines(arg0 context.Context) error { return ret0 } -// FlushAllEngines indicates an expected call of FlushAllEngines +// FlushAllEngines indicates an expected call of FlushAllEngines. func (mr *MockBackendMockRecorder) FlushAllEngines(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FlushAllEngines", reflect.TypeOf((*MockBackend)(nil).FlushAllEngines), arg0) } -// FlushEngine mocks base method +// FlushEngine mocks base method. func (m *MockBackend) FlushEngine(arg0 context.Context, arg1 uuid.UUID) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "FlushEngine", arg0, arg1) @@ -148,13 +149,13 @@ func (m *MockBackend) FlushEngine(arg0 context.Context, arg1 uuid.UUID) error { return ret0 } -// FlushEngine indicates an expected call of FlushEngine +// FlushEngine indicates an expected call of FlushEngine. func (mr *MockBackendMockRecorder) FlushEngine(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FlushEngine", reflect.TypeOf((*MockBackend)(nil).FlushEngine), arg0, arg1) } -// ImportEngine mocks base method +// ImportEngine mocks base method. func (m *MockBackend) ImportEngine(arg0 context.Context, arg1 uuid.UUID) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ImportEngine", arg0, arg1) @@ -162,13 +163,13 @@ func (m *MockBackend) ImportEngine(arg0 context.Context, arg1 uuid.UUID) error { return ret0 } -// ImportEngine indicates an expected call of ImportEngine +// ImportEngine indicates an expected call of ImportEngine. func (mr *MockBackendMockRecorder) ImportEngine(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ImportEngine", reflect.TypeOf((*MockBackend)(nil).ImportEngine), arg0, arg1) } -// LocalWriter mocks base method +// LocalWriter mocks base method. func (m *MockBackend) LocalWriter(arg0 context.Context, arg1 uuid.UUID, arg2 int64) (backend.EngineWriter, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "LocalWriter", arg0, arg1, arg2) @@ -177,13 +178,13 @@ func (m *MockBackend) LocalWriter(arg0 context.Context, arg1 uuid.UUID, arg2 int return ret0, ret1 } -// LocalWriter indicates an expected call of LocalWriter +// LocalWriter indicates an expected call of LocalWriter. func (mr *MockBackendMockRecorder) LocalWriter(arg0, arg1, arg2 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LocalWriter", reflect.TypeOf((*MockBackend)(nil).LocalWriter), arg0, arg1, arg2) } -// MakeEmptyRows mocks base method +// MakeEmptyRows mocks base method. func (m *MockBackend) MakeEmptyRows() backend.Rows { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "MakeEmptyRows") @@ -191,13 +192,13 @@ func (m *MockBackend) MakeEmptyRows() backend.Rows { return ret0 } -// MakeEmptyRows indicates an expected call of MakeEmptyRows +// MakeEmptyRows indicates an expected call of MakeEmptyRows. func (mr *MockBackendMockRecorder) MakeEmptyRows() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MakeEmptyRows", reflect.TypeOf((*MockBackend)(nil).MakeEmptyRows)) } -// NewEncoder mocks base method +// NewEncoder mocks base method. func (m *MockBackend) NewEncoder(arg0 table.Table, arg1 *backend.SessionOptions) (backend.Encoder, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "NewEncoder", arg0, arg1) @@ -206,13 +207,13 @@ func (m *MockBackend) NewEncoder(arg0 table.Table, arg1 *backend.SessionOptions) return ret0, ret1 } -// NewEncoder indicates an expected call of NewEncoder +// NewEncoder indicates an expected call of NewEncoder. func (mr *MockBackendMockRecorder) NewEncoder(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "NewEncoder", reflect.TypeOf((*MockBackend)(nil).NewEncoder), arg0, arg1) } -// OpenEngine mocks base method +// OpenEngine mocks base method. func (m *MockBackend) OpenEngine(arg0 context.Context, arg1 uuid.UUID) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "OpenEngine", arg0, arg1) @@ -220,13 +221,13 @@ func (m *MockBackend) OpenEngine(arg0 context.Context, arg1 uuid.UUID) error { return ret0 } -// OpenEngine indicates an expected call of OpenEngine +// OpenEngine indicates an expected call of OpenEngine. func (mr *MockBackendMockRecorder) OpenEngine(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OpenEngine", reflect.TypeOf((*MockBackend)(nil).OpenEngine), arg0, arg1) } -// ResetEngine mocks base method +// ResetEngine mocks base method. func (m *MockBackend) ResetEngine(arg0 context.Context, arg1 uuid.UUID) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ResetEngine", arg0, arg1) @@ -234,13 +235,13 @@ func (m *MockBackend) ResetEngine(arg0 context.Context, arg1 uuid.UUID) error { return ret0 } -// ResetEngine indicates an expected call of ResetEngine +// ResetEngine indicates an expected call of ResetEngine. func (mr *MockBackendMockRecorder) ResetEngine(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ResetEngine", reflect.TypeOf((*MockBackend)(nil).ResetEngine), arg0, arg1) } -// RetryImportDelay mocks base method +// RetryImportDelay mocks base method. func (m *MockBackend) RetryImportDelay() time.Duration { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "RetryImportDelay") @@ -248,13 +249,13 @@ func (m *MockBackend) RetryImportDelay() time.Duration { return ret0 } -// RetryImportDelay indicates an expected call of RetryImportDelay +// RetryImportDelay indicates an expected call of RetryImportDelay. func (mr *MockBackendMockRecorder) RetryImportDelay() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RetryImportDelay", reflect.TypeOf((*MockBackend)(nil).RetryImportDelay)) } -// ShouldPostProcess mocks base method +// ShouldPostProcess mocks base method. func (m *MockBackend) ShouldPostProcess() bool { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ShouldPostProcess") @@ -262,48 +263,48 @@ func (m *MockBackend) ShouldPostProcess() bool { return ret0 } -// ShouldPostProcess indicates an expected call of ShouldPostProcess +// ShouldPostProcess indicates an expected call of ShouldPostProcess. func (mr *MockBackendMockRecorder) ShouldPostProcess() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ShouldPostProcess", reflect.TypeOf((*MockBackend)(nil).ShouldPostProcess)) } -// MockEncoder is a mock of Encoder interface +// MockEncoder is a mock of Encoder interface. type MockEncoder struct { ctrl *gomock.Controller recorder *MockEncoderMockRecorder } -// MockEncoderMockRecorder is the mock recorder for MockEncoder +// MockEncoderMockRecorder is the mock recorder for MockEncoder. type MockEncoderMockRecorder struct { mock *MockEncoder } -// NewMockEncoder creates a new mock instance +// NewMockEncoder creates a new mock instance. func NewMockEncoder(ctrl *gomock.Controller) *MockEncoder { mock := &MockEncoder{ctrl: ctrl} mock.recorder = &MockEncoderMockRecorder{mock} return mock } -// EXPECT returns an object that allows the caller to indicate expected use +// EXPECT returns an object that allows the caller to indicate expected use. func (m *MockEncoder) EXPECT() *MockEncoderMockRecorder { return m.recorder } -// Close mocks base method +// Close mocks base method. func (m *MockEncoder) Close() { m.ctrl.T.Helper() m.ctrl.Call(m, "Close") } -// Close indicates an expected call of Close +// Close indicates an expected call of Close. func (mr *MockEncoderMockRecorder) Close() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockEncoder)(nil).Close)) } -// Encode mocks base method +// Encode mocks base method. func (m *MockEncoder) Encode(arg0 log.Logger, arg1 []types.Datum, arg2 int64, arg3 []int) (backend.Row, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Encode", arg0, arg1, arg2, arg3) @@ -312,36 +313,36 @@ func (m *MockEncoder) Encode(arg0 log.Logger, arg1 []types.Datum, arg2 int64, ar return ret0, ret1 } -// Encode indicates an expected call of Encode +// Encode indicates an expected call of Encode. func (mr *MockEncoderMockRecorder) Encode(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Encode", reflect.TypeOf((*MockEncoder)(nil).Encode), arg0, arg1, arg2, arg3) } -// MockRows is a mock of Rows interface +// MockRows is a mock of Rows interface. type MockRows struct { ctrl *gomock.Controller recorder *MockRowsMockRecorder } -// MockRowsMockRecorder is the mock recorder for MockRows +// MockRowsMockRecorder is the mock recorder for MockRows. type MockRowsMockRecorder struct { mock *MockRows } -// NewMockRows creates a new mock instance +// NewMockRows creates a new mock instance. func NewMockRows(ctrl *gomock.Controller) *MockRows { mock := &MockRows{ctrl: ctrl} mock.recorder = &MockRowsMockRecorder{mock} return mock } -// EXPECT returns an object that allows the caller to indicate expected use +// EXPECT returns an object that allows the caller to indicate expected use. func (m *MockRows) EXPECT() *MockRowsMockRecorder { return m.recorder } -// Clear mocks base method +// Clear mocks base method. func (m *MockRows) Clear() backend.Rows { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Clear") @@ -349,13 +350,13 @@ func (m *MockRows) Clear() backend.Rows { return ret0 } -// Clear indicates an expected call of Clear +// Clear indicates an expected call of Clear. func (mr *MockRowsMockRecorder) Clear() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Clear", reflect.TypeOf((*MockRows)(nil).Clear)) } -// SplitIntoChunks mocks base method +// SplitIntoChunks mocks base method. func (m *MockRows) SplitIntoChunks(arg0 int) []backend.Rows { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "SplitIntoChunks", arg0) @@ -363,71 +364,71 @@ func (m *MockRows) SplitIntoChunks(arg0 int) []backend.Rows { return ret0 } -// SplitIntoChunks indicates an expected call of SplitIntoChunks +// SplitIntoChunks indicates an expected call of SplitIntoChunks. func (mr *MockRowsMockRecorder) SplitIntoChunks(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SplitIntoChunks", reflect.TypeOf((*MockRows)(nil).SplitIntoChunks), arg0) } -// MockRow is a mock of Row interface +// MockRow is a mock of Row interface. type MockRow struct { ctrl *gomock.Controller recorder *MockRowMockRecorder } -// MockRowMockRecorder is the mock recorder for MockRow +// MockRowMockRecorder is the mock recorder for MockRow. type MockRowMockRecorder struct { mock *MockRow } -// NewMockRow creates a new mock instance +// NewMockRow creates a new mock instance. func NewMockRow(ctrl *gomock.Controller) *MockRow { mock := &MockRow{ctrl: ctrl} mock.recorder = &MockRowMockRecorder{mock} return mock } -// EXPECT returns an object that allows the caller to indicate expected use +// EXPECT returns an object that allows the caller to indicate expected use. func (m *MockRow) EXPECT() *MockRowMockRecorder { return m.recorder } -// ClassifyAndAppend mocks base method +// ClassifyAndAppend mocks base method. func (m *MockRow) ClassifyAndAppend(arg0 *backend.Rows, arg1 *verification.KVChecksum, arg2 *backend.Rows, arg3 *verification.KVChecksum) { m.ctrl.T.Helper() m.ctrl.Call(m, "ClassifyAndAppend", arg0, arg1, arg2, arg3) } -// ClassifyAndAppend indicates an expected call of ClassifyAndAppend +// ClassifyAndAppend indicates an expected call of ClassifyAndAppend. func (mr *MockRowMockRecorder) ClassifyAndAppend(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClassifyAndAppend", reflect.TypeOf((*MockRow)(nil).ClassifyAndAppend), arg0, arg1, arg2, arg3) } -// MockEngineWriter is a mock of EngineWriter interface +// MockEngineWriter is a mock of EngineWriter interface. type MockEngineWriter struct { ctrl *gomock.Controller recorder *MockEngineWriterMockRecorder } -// MockEngineWriterMockRecorder is the mock recorder for MockEngineWriter +// MockEngineWriterMockRecorder is the mock recorder for MockEngineWriter. type MockEngineWriterMockRecorder struct { mock *MockEngineWriter } -// NewMockEngineWriter creates a new mock instance +// NewMockEngineWriter creates a new mock instance. func NewMockEngineWriter(ctrl *gomock.Controller) *MockEngineWriter { mock := &MockEngineWriter{ctrl: ctrl} mock.recorder = &MockEngineWriterMockRecorder{mock} return mock } -// EXPECT returns an object that allows the caller to indicate expected use +// EXPECT returns an object that allows the caller to indicate expected use. func (m *MockEngineWriter) EXPECT() *MockEngineWriterMockRecorder { return m.recorder } -// AppendRows mocks base method +// AppendRows mocks base method. func (m *MockEngineWriter) AppendRows(arg0 context.Context, arg1 string, arg2 []string, arg3 uint64, arg4 backend.Rows) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "AppendRows", arg0, arg1, arg2, arg3, arg4) @@ -435,13 +436,13 @@ func (m *MockEngineWriter) AppendRows(arg0 context.Context, arg1 string, arg2 [] return ret0 } -// AppendRows indicates an expected call of AppendRows +// AppendRows indicates an expected call of AppendRows. func (mr *MockEngineWriterMockRecorder) AppendRows(arg0, arg1, arg2, arg3, arg4 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AppendRows", reflect.TypeOf((*MockEngineWriter)(nil).AppendRows), arg0, arg1, arg2, arg3, arg4) } -// Close mocks base method +// Close mocks base method. func (m *MockEngineWriter) Close() error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Close") @@ -449,7 +450,7 @@ func (m *MockEngineWriter) Close() error { return ret0 } -// Close indicates an expected call of Close +// Close indicates an expected call of Close. func (mr *MockEngineWriterMockRecorder) Close() *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockEngineWriter)(nil).Close)) diff --git a/pkg/lightning/restore/checksum.go b/pkg/lightning/restore/checksum.go index 369b20a18..ec6c03912 100644 --- a/pkg/lightning/restore/checksum.go +++ b/pkg/lightning/restore/checksum.go @@ -266,7 +266,11 @@ func newTiKVChecksumManager(client kv.Client, pdClient pd.Client, distSQLScanCon } func (e *tikvChecksumManager) checksumDB(ctx context.Context, tableInfo *TidbTableInfo) (*RemoteChecksum, error) { - executor, err := checksum.NewExecutorBuilder(tableInfo.Core, oracle.ComposeTS(time.Now().Unix()*1000, 0)). + physicalTS, logicalTS, err := e.manager.pdClient.GetTS(ctx) + if err != nil { + return nil, errors.Annotate(err, "fetch tso from pd failed") + } + executor, err := checksum.NewExecutorBuilder(tableInfo.Core, oracle.ComposeTS(physicalTS, logicalTS)). SetConcurrency(e.distSQLScanConcurrency). Build() if err != nil { diff --git a/pkg/lightning/restore/checksum_test.go b/pkg/lightning/restore/checksum_test.go index 02cfdfb39..25c73caa9 100644 --- a/pkg/lightning/restore/checksum_test.go +++ b/pkg/lightning/restore/checksum_test.go @@ -244,6 +244,10 @@ func (c *testPDClient) currentSafePoint() uint64 { return 0 } +func (c *testPDClient) GetTS(ctx context.Context) (int64, int64, error) { + return time.Now().Unix(), 0, nil +} + func (c *testPDClient) UpdateServiceGCSafePoint(ctx context.Context, serviceID string, ttl int64, safePoint uint64) (uint64, error) { if !strings.HasPrefix(serviceID, "lightning") { panic("service ID must start with 'lightning'") diff --git a/pkg/lightning/restore/restore.go b/pkg/lightning/restore/restore.go index b73b91b2c..5e15ce117 100644 --- a/pkg/lightning/restore/restore.go +++ b/pkg/lightning/restore/restore.go @@ -29,6 +29,7 @@ import ( sstpb "github.com/pingcap/kvproto/pkg/import_sstpb" "github.com/pingcap/parser/model" "github.com/pingcap/tidb/meta/autoid" + "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/table/tables" "github.com/pingcap/tidb/util/collate" @@ -163,6 +164,9 @@ type RestoreController struct { diskQuotaLock sync.RWMutex diskQuotaState int32 + + // commit ts for local and importer backend + ts uint64 } func NewRestoreController( @@ -241,6 +245,21 @@ func NewRestoreControllerWithPauser( return nil, errors.New("unknown backend: " + cfg.TikvImporter.Backend) } + var ts uint64 + if cfg.TikvImporter.Backend == config.BackendLocal || cfg.TikvImporter.Backend == config.BackendImporter { + pdController, err := pdutil.NewPdController(ctx, cfg.TiDB.PdAddr, tls.TLSConfig(), tls.ToPDSecurityOption()) + if err != nil { + return nil, errors.Trace(err) + } + defer pdController.Close() + + physical, logical, err := pdController.GetPDClient().GetTS(ctx) + if err != nil { + return nil, errors.Trace(err) + } + ts = oracle.ComposeTS(physical, logical) + } + rc := &RestoreController{ cfg: cfg, dbMetas: dbMetas, @@ -261,6 +280,7 @@ func NewRestoreControllerWithPauser( closedEngineLimit: worker.NewPool(ctx, cfg.App.TableConcurrency*2, "closed-engine"), store: s, + ts: ts, } return rc, nil @@ -1265,7 +1285,7 @@ func (t *TableRestore) restoreEngines(ctx context.Context, rc *RestoreController indexWorker := rc.indexWorkers.Apply() defer rc.indexWorkers.Recycle(indexWorker) - indexEngine, err := rc.backend.OpenEngine(ctx, t.tableName, indexEngineID) + indexEngine, err := rc.backend.OpenEngine(ctx, t.tableName, indexEngineID, rc.ts) if err != nil { return errors.Trace(err) } @@ -1414,7 +1434,7 @@ func (t *TableRestore) restoreEngine( logTask := t.logger.With(zap.Int32("engineNumber", engineID)).Begin(zap.InfoLevel, "encode kv data and write") - dataEngine, err := rc.backend.OpenEngine(ctx, t.tableName, engineID) + dataEngine, err := rc.backend.OpenEngine(ctx, t.tableName, engineID, rc.ts) if err != nil { return nil, errors.Trace(err) } diff --git a/pkg/lightning/restore/restore_test.go b/pkg/lightning/restore/restore_test.go index ef04845f6..d64475d4a 100644 --- a/pkg/lightning/restore/restore_test.go +++ b/pkg/lightning/restore/restore_test.go @@ -840,11 +840,11 @@ func (s *chunkRestoreSuite) TestDeliverLoopEmptyData(c *C) { AppendRows(ctx, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()). Return(nil).AnyTimes() - dataEngine, err := importer.OpenEngine(ctx, s.tr.tableName, 0) + dataEngine, err := importer.OpenEngine(ctx, s.tr.tableName, 0, 0) c.Assert(err, IsNil) dataWriter, err := dataEngine.LocalWriter(ctx, 2048) c.Assert(err, IsNil) - indexEngine, err := importer.OpenEngine(ctx, s.tr.tableName, -1) + indexEngine, err := importer.OpenEngine(ctx, s.tr.tableName, -1, 0) c.Assert(err, IsNil) indexWriter, err := indexEngine.LocalWriter(ctx, 2048) c.Assert(err, IsNil) @@ -877,9 +877,9 @@ func (s *chunkRestoreSuite) TestDeliverLoop(c *C) { mockWriter := mock.NewMockEngineWriter(controller) mockBackend.EXPECT().LocalWriter(ctx, gomock.Any(), int64(2048)).Return(mockWriter, nil).AnyTimes() - dataEngine, err := importer.OpenEngine(ctx, s.tr.tableName, 0) + dataEngine, err := importer.OpenEngine(ctx, s.tr.tableName, 0, 0) c.Assert(err, IsNil) - indexEngine, err := importer.OpenEngine(ctx, s.tr.tableName, -1) + indexEngine, err := importer.OpenEngine(ctx, s.tr.tableName, -1, 0) c.Assert(err, IsNil) dataWriter, err := dataEngine.LocalWriter(ctx, 2048) @@ -1092,9 +1092,9 @@ func (s *chunkRestoreSuite) TestRestore(c *C) { mockClient.EXPECT().OpenEngine(ctx, gomock.Any()).Return(nil, nil) mockClient.EXPECT().OpenEngine(ctx, gomock.Any()).Return(nil, nil) - dataEngine, err := importer.OpenEngine(ctx, s.tr.tableName, 0) + dataEngine, err := importer.OpenEngine(ctx, s.tr.tableName, 0, 0) c.Assert(err, IsNil) - indexEngine, err := importer.OpenEngine(ctx, s.tr.tableName, -1) + indexEngine, err := importer.OpenEngine(ctx, s.tr.tableName, -1, 0) c.Assert(err, IsNil) dataWriter, err := dataEngine.LocalWriter(ctx, 2048) c.Assert(err, IsNil)