Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: avoid retry write rows and record duplicate rows in tidb backend #30179

Merged
merged 9 commits into from
Nov 29, 2021
10 changes: 8 additions & 2 deletions br/pkg/lightning/backend/tidb/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,10 @@ func EncodeRowForRecord(encTable table.Table, sqlMode mysql.SQLMode, row []types
}
resRow, err := enc.Encode(log.L(), row, 0, columnPermutation, "", 0)
if err != nil {
return fmt.Sprintf("/* ERROR: %s */", err)
// if encode can't succeed, fallback to record the raw input strings
// ignore the error since it can only happen if the datum type is unknown, this can't happen here.
datumStr, _ := types.DatumsToString(row, true)
return datumStr
}
return resRow.(tidbRow).insertStmt
}
Expand Down Expand Up @@ -435,14 +438,17 @@ rowLoop:
continue rowLoop
case utils.IsRetryableError(err):
// retry next loop
default:
case be.errorMgr.TypeErrorsRemain() > 0:
// WriteBatchRowsToDB failed in the batch mode and can not be retried,
// we need to redo the writing row-by-row to find where the error locates (and skip it correctly in future).
if err = be.WriteRowsToDB(ctx, tableName, columnNames, r); err != nil {
// If the error is not nil, it means we reach the max error count in the non-batch mode.
// For now, we will treat like maxErrorCount is always 0. So we will just return if any error occurs.
return errors.Annotatef(err, "[%s] write rows reach max error count %d", tableName, 0)
}
continue rowLoop
default:
return err
}
}
return errors.Annotatef(err, "[%s] batch write rows reach max retry %d and still failed", tableName, writeRowsMaxRetryTimes)
Expand Down
144 changes: 131 additions & 13 deletions br/pkg/lightning/backend/tidb/tidb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/errormanager"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/lightning/verification"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/parser/charset"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
Expand Down Expand Up @@ -408,7 +409,33 @@ func TestFetchRemoteTableModels_4_x_auto_random(t *testing.T) {
}, tableInfos)
}

func TestWriteRowsErrorDowngrading(t *testing.T) {
func TestWriteRowsErrorNoRetry(t *testing.T) {
t.Parallel()
nonRetryableError := sql.ErrNoRows
s := createMysqlSuite(t)
defer s.TearDownTest(t)

// batch insert, fail and rollback.
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(1),(2),(3),(4),(5)\\E").
WillReturnError(nonRetryableError)

// disable error record, should not expect retry statements one by one.
ignoreBackend := tidb.NewTiDBBackend(s.dbHandle, config.ErrorOnDup,
errormanager.New(s.dbHandle, &config.Config{}),
)
dataRows := encodeRowsTiDB(t, ignoreBackend, s.tbl)
ctx := context.Background()
engine, err := ignoreBackend.OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1)
require.NoError(t, err)
writer, err := engine.LocalWriter(ctx, nil)
require.NoError(t, err)
err = writer.WriteRows(ctx, []string{"a"}, dataRows)
require.Error(t, err)
require.False(t, utils.IsRetryableError(err), "err: %v", err)
}

func TestWriteRowsErrorDowngradingAll(t *testing.T) {
t.Parallel()
nonRetryableError := sql.ErrNoRows
s := createMysqlSuite(t)
Expand Down Expand Up @@ -439,13 +466,77 @@ func TestWriteRowsErrorDowngrading(t *testing.T) {
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*").
WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "9.csv", int64(0), nonRetryableError.Error(), "(3)").
WillReturnResult(driver.ResultNoRows)
// the forth row will exceed the error threshold, won't record this error
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(4)\\E").
WillReturnError(nonRetryableError)
s.mockDB.
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*").
WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "10.csv", int64(0), nonRetryableError.Error(), "(4)").
WillReturnResult(driver.ResultNoRows)
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(5)\\E").
WillReturnError(nonRetryableError)
s.mockDB.
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*").
WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "11.csv", int64(0), nonRetryableError.Error(), "(5)").
WillReturnResult(driver.ResultNoRows)

// disable error record, should not expect retry statements one by one.
ignoreBackend := tidb.NewTiDBBackend(s.dbHandle, config.ErrorOnDup,
errormanager.New(s.dbHandle, &config.Config{
App: config.Lightning{
TaskInfoSchemaName: "tidb_lightning_errors",
MaxError: config.MaxError{
Type: *atomic.NewInt64(10),
},
},
}),
)
dataRows := encodeRowsTiDB(t, ignoreBackend, s.tbl)
ctx := context.Background()
logger := log.L()
engine, err := ignoreBackend.OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1)
require.NoError(t, err)
writer, err := engine.LocalWriter(ctx, nil)
require.NoError(t, err)
err = writer.WriteRows(ctx, []string{"a"}, dataRows)
require.NoError(t, err)
}

func TestWriteRowsErrorDowngradingExceedThreshold(t *testing.T) {
t.Parallel()
nonRetryableError := sql.ErrNoRows
s := createMysqlSuite(t)
defer s.TearDownTest(t)
// First, batch insert, fail and rollback.
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(1),(2),(3),(4),(5)\\E").
WillReturnError(nonRetryableError)
// Then, insert row-by-row due to the non-retryable error.
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(1)\\E").
WillReturnError(nonRetryableError)
s.mockDB.
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*").
WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "7.csv", int64(0), nonRetryableError.Error(), "(1)").
WillReturnResult(driver.ResultNoRows)
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(2)\\E").
WillReturnError(nonRetryableError)
s.mockDB.
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*").
WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "8.csv", int64(0), nonRetryableError.Error(), "(2)").
WillReturnResult(driver.ResultNoRows)
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(3)\\E").
WillReturnError(nonRetryableError)
s.mockDB.
ExpectExec("INSERT INTO `tidb_lightning_errors`\\.type_error_v1.*").
WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "9.csv", int64(0), nonRetryableError.Error(), "(3)").
WillReturnResult(driver.ResultNoRows)
// the forth row will exceed the error threshold, won't record this error
s.mockDB.
ExpectExec("\\QINSERT INTO `foo`.`bar`(`a`) VALUES(4)\\E").
WillReturnError(nonRetryableError)

ignoreBackend := tidb.NewTiDBBackend(s.dbHandle, config.ErrorOnDup,
errormanager.New(s.dbHandle, &config.Config{
Expand All @@ -457,15 +548,27 @@ func TestWriteRowsErrorDowngrading(t *testing.T) {
},
}),
)
dataRows := encodeRowsTiDB(t, ignoreBackend, s.tbl)
ctx := context.Background()
engine, err := ignoreBackend.OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1)
require.NoError(t, err)
writer, err := engine.LocalWriter(ctx, nil)
require.NoError(t, err)
err = writer.WriteRows(ctx, []string{"a"}, dataRows)
require.Error(t, err)
st, err := writer.Close(ctx)
require.NoError(t, err)
require.Nil(t, st)
}

dataRows := ignoreBackend.MakeEmptyRows()
func encodeRowsTiDB(t *testing.T, b backend.Backend, tbl table.Table) kv.Rows {
dataRows := b.MakeEmptyRows()
dataChecksum := verification.MakeKVChecksum(0, 0, 0)
indexRows := ignoreBackend.MakeEmptyRows()
indexRows := b.MakeEmptyRows()
indexChecksum := verification.MakeKVChecksum(0, 0, 0)
logger := log.L()

encoder, err := ignoreBackend.NewEncoder(s.tbl, &kv.SessionOptions{})
encoder, err := b.NewEncoder(tbl, &kv.SessionOptions{})
require.NoError(t, err)
row, err := encoder.Encode(logger, []types.Datum{
types.NewIntDatum(1),
Expand Down Expand Up @@ -501,12 +604,27 @@ func TestWriteRowsErrorDowngrading(t *testing.T) {
require.NoError(t, err)

row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum)
return dataRows
}

writer, err := engine.LocalWriter(ctx, nil)
require.NoError(t, err)
err = writer.WriteRows(ctx, []string{"a"}, dataRows)
require.Error(t, err)
st, err := writer.Close(ctx)
require.NoError(t, err)
require.Nil(t, st)
func TestEncodeRowForRecord(t *testing.T) {
t.Parallel()
s := createMysqlSuite(t)

// for a correct row, the will encode a correct result
row := tidb.EncodeRowForRecord(s.tbl, mysql.ModeStrictTransTables, []types.Datum{
types.NewIntDatum(5),
types.NewStringDatum("test test"),
types.NewBinaryLiteralDatum(types.NewBinaryLiteralFromUint(0xabcdef, 6)),
}, []int{0, -1, -1, -1, -1, -1, -1, -1, 1, 2, -1, -1, -1, -1})
require.Equal(t, row, "(5,'test test',x'000000abcdef')")

// the following row will result in column count mismatch error, there for encode
// result will fallback to a "," separated string list.
row = tidb.EncodeRowForRecord(s.tbl, mysql.ModeStrictTransTables, []types.Datum{
types.NewIntDatum(5),
types.NewStringDatum("test test"),
types.NewBinaryLiteralDatum(types.NewBinaryLiteralFromUint(0xabcdef, 6)),
}, []int{0, -1, -1, -1, -1, -1, -1, -1, 1, 2, 3, -1, -1, -1})
require.Equal(t, row, "(5, \"test test\", \x00\x00\x00\xab\xcd\xef)")
}
4 changes: 4 additions & 0 deletions br/pkg/lightning/errormanager/errormanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ type ErrorManager struct {
dupResolution config.DuplicateResolutionAlgorithm
}

func (em *ErrorManager) TypeErrorsRemain() int64 {
return em.remainingError.Type.Load()
}

// New creates a new error manager.
func New(db *sql.DB, cfg *config.Config) *ErrorManager {
em := &ErrorManager{
Expand Down
1 change: 0 additions & 1 deletion br/pkg/lightning/restore/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1489,7 +1489,6 @@ func (s *chunkRestoreSuite) TestEncodeLoopColumnsMismatch(c *C) {
}

func (s *chunkRestoreSuite) TestEncodeLoopIgnoreColumnsCSV(c *C) {
log.InitLogger(&log.Config{}, "error")
cases := []struct {
s string
ignoreColumns []*config.IgnoreColumns
Expand Down