Skip to content

Commit

Permalink
cherry pick pingcap#850 to release-4.0
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <[email protected]>
  • Loading branch information
glorv authored and ti-srebot committed Mar 12, 2021
1 parent 34c2240 commit 3aac8f4
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 90 deletions.
5 changes: 2 additions & 3 deletions pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"go.uber.org/zap"
Expand Down Expand Up @@ -298,7 +297,7 @@ func (be Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID
}

// OpenEngine opens an engine with the given table name and engine ID.
func (be Backend) OpenEngine(ctx context.Context, tableName string, engineID int32) (*OpenedEngine, error) {
func (be Backend) OpenEngine(ctx context.Context, tableName string, engineID int32, ts uint64) (*OpenedEngine, error) {
tag, engineUUID := MakeUUID(tableName, engineID)
logger := makeLogger(tag, engineUUID)

Expand Down Expand Up @@ -327,7 +326,7 @@ func (be Backend) OpenEngine(ctx context.Context, tableName string, engineID int
uuid: engineUUID,
},
tableName: tableName,
ts: oracle.ComposeTS(time.Now().Unix()*1000, 0),
ts: ts,
}, nil
}

Expand Down
15 changes: 9 additions & 6 deletions pkg/lightning/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
. "github.com/pingcap/check"
"github.com/pingcap/errors"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/store/tikv/oracle"

kv "github.com/pingcap/br/pkg/lightning/backend"
"github.com/pingcap/br/pkg/lightning/mock"
Expand All @@ -18,6 +19,7 @@ type backendSuite struct {
controller *gomock.Controller
mockBackend *mock.MockBackend
backend kv.Backend
ts uint64
}

var _ = Suite(&backendSuite{})
Expand All @@ -29,6 +31,7 @@ func (s *backendSuite) setUpTest(c *C) {
s.controller = gomock.NewController(c)
s.mockBackend = mock.NewMockBackend(s.controller)
s.backend = kv.MakeBackend(s.mockBackend)
s.ts = oracle.ComposeTS(time.Now().Unix()*1000, 0)
}

func (s *backendSuite) tearDownTest() {
Expand Down Expand Up @@ -58,7 +61,7 @@ func (s *backendSuite) TestOpenCloseImportCleanUpEngine(c *C) {
Return(nil).
After(importCall)

engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1)
engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1, s.ts)
c.Assert(err, IsNil)
closedEngine, err := engine.Close(ctx)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -134,7 +137,7 @@ func (s *backendSuite) TestWriteEngine(c *C) {
AppendRows(ctx, "`db`.`table`", []string{"c1", "c2"}, gomock.Any(), rows2).
Return(nil)

engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1)
engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1, s.ts)
c.Assert(err, IsNil)
err = engine.WriteRows(ctx, []string{"c1", "c2"}, rows1)
c.Assert(err, IsNil)
Expand All @@ -155,7 +158,7 @@ func (s *backendSuite) TestWriteToEngineWithNothing(c *C) {
writer.EXPECT().Close().Return(nil)
s.mockBackend.EXPECT().LocalWriter(ctx, gomock.Any(), int64(kv.LocalMemoryTableSize)).Return(writer, nil)

engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1)
engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1, s.ts)
c.Assert(err, IsNil)
err = engine.WriteRows(ctx, nil, emptyRows)
c.Assert(err, IsNil)
Expand All @@ -170,7 +173,7 @@ func (s *backendSuite) TestOpenEngineFailed(c *C) {
s.mockBackend.EXPECT().OpenEngine(ctx, gomock.Any()).
Return(errors.New("fake unrecoverable open error"))

_, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1)
_, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1, s.ts)
c.Assert(err, ErrorMatches, "fake unrecoverable open error")
}

Expand All @@ -189,7 +192,7 @@ func (s *backendSuite) TestWriteEngineFailed(c *C) {
Return(errors.Annotate(context.Canceled, "fake unrecoverable write error"))
mockWriter.EXPECT().Close().Return(nil)

engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1)
engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1, s.ts)
c.Assert(err, IsNil)
err = engine.WriteRows(ctx, nil, rows)
c.Assert(err, ErrorMatches, "fake unrecoverable write error.*")
Expand All @@ -210,7 +213,7 @@ func (s *backendSuite) TestWriteBatchSendFailedWithRetry(c *C) {
MinTimes(1)
mockWriter.EXPECT().Close().Return(nil).MinTimes(1)

engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1)
engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1, s.ts)
c.Assert(err, IsNil)
err = engine.WriteRows(ctx, nil, rows)
c.Assert(err, ErrorMatches, ".*fake recoverable write batch error")
Expand Down
2 changes: 1 addition & 1 deletion pkg/lightning/backend/importer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (s *importerSuite) setUpTest(c *C) {
Return(nil, nil)

var err error
s.engine, err = importer.OpenEngine(s.ctx, "`db`.`table`", -1)
s.engine, err = importer.OpenEngine(s.ctx, "`db`.`table`", -1, 0)
c.Assert(err, IsNil)
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/lightning/backend/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func (s *mysqlSuite) TestWriteRowsReplaceOnDup(c *C) {
ctx := context.Background()
logger := log.L()

engine, err := s.backend.OpenEngine(ctx, "`foo`.`bar`", 1)
engine, err := s.backend.OpenEngine(ctx, "`foo`.`bar`", 1, 0)
c.Assert(err, IsNil)

dataRows := s.backend.MakeEmptyRows()
Expand Down Expand Up @@ -126,7 +126,7 @@ func (s *mysqlSuite) TestWriteRowsIgnoreOnDup(c *C) {
logger := log.L()

ignoreBackend := kv.NewTiDBBackend(s.dbHandle, config.IgnoreOnDup)
engine, err := ignoreBackend.OpenEngine(ctx, "`foo`.`bar`", 1)
engine, err := ignoreBackend.OpenEngine(ctx, "`foo`.`bar`", 1, 0)
c.Assert(err, IsNil)

dataRows := ignoreBackend.MakeEmptyRows()
Expand Down Expand Up @@ -164,7 +164,7 @@ func (s *mysqlSuite) TestWriteRowsErrorOnDup(c *C) {
logger := log.L()

ignoreBackend := kv.NewTiDBBackend(s.dbHandle, config.ErrorOnDup)
engine, err := ignoreBackend.OpenEngine(ctx, "`foo`.`bar`", 1)
engine, err := ignoreBackend.OpenEngine(ctx, "`foo`.`bar`", 1, 0)
c.Assert(err, IsNil)

dataRows := ignoreBackend.MakeEmptyRows()
Expand Down
Loading

0 comments on commit 3aac8f4

Please sign in to comment.