Skip to content

Commit

Permalink
Merge branch 'master' into fix-paging-sysvar
Browse files Browse the repository at this point in the history
  • Loading branch information
you06 authored Dec 30, 2021
2 parents 0297c21 + 0ab0dad commit 7f4667d
Show file tree
Hide file tree
Showing 47 changed files with 1,787 additions and 666 deletions.
19 changes: 13 additions & 6 deletions br/pkg/lightning/backend/noop/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (b noopBackend) ResetEngine(ctx context.Context, engineUUID uuid.UUID) erro

// LocalWriter obtains a thread-local EngineWriter for writing rows into the given engine.
func (b noopBackend) LocalWriter(context.Context, *backend.LocalWriterConfig, uuid.UUID) (backend.EngineWriter, error) {
return noopWriter{}, nil
return Writer{}, nil
}

func (b noopBackend) CollectLocalDuplicateRows(ctx context.Context, tbl table.Table, tableName string, opts *kv.SessionOptions) (bool, error) {
Expand Down Expand Up @@ -174,16 +174,23 @@ func (r noopRow) Size() uint64 {
func (r noopRow) ClassifyAndAppend(*kv.Rows, *verification.KVChecksum, *kv.Rows, *verification.KVChecksum) {
}

type noopWriter struct{}
// Writer define a local writer that do nothing.
type Writer struct{}

func (w noopWriter) AppendRows(context.Context, string, []string, kv.Rows) error {
func (w Writer) AppendRows(context.Context, string, []string, kv.Rows) error {
return nil
}

func (w noopWriter) IsSynced() bool {
func (w Writer) IsSynced() bool {
return true
}

func (w noopWriter) Close(context.Context) (backend.ChunkFlushStatus, error) {
return nil, nil
func (w Writer) Close(context.Context) (backend.ChunkFlushStatus, error) {
return trueStatus{}, nil
}

type trueStatus struct{}

func (s trueStatus) Flushed() bool {
return true
}
2 changes: 1 addition & 1 deletion br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ const (
)

var (
supportedStorageTypes = []string{"file", "local", "s3", "noop", "gcs"}
supportedStorageTypes = []string{"file", "local", "s3", "noop", "gcs", "gs"}

DefaultFilter = []string{
"*.*",
Expand Down
27 changes: 27 additions & 0 deletions br/pkg/lightning/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,33 @@ func (s *configTestSuite) TestAdjustInvalidBackend(c *C) {
c.Assert(err, ErrorMatches, "invalid config: unsupported `tikv-importer\\.backend` \\(no_such_backend\\)")
}

func (s *configTestSuite) TestCheckAndAdjustFilePath(c *C) {
tmpDir := c.MkDir()
// use slashPath in url to be compatible with windows
slashPath := filepath.ToSlash(tmpDir)

cfg := config.NewConfig()
cases := []string{
tmpDir,
".",
"file://" + slashPath,
"local://" + slashPath,
"s3://bucket_name",
"s3://bucket_name/path/to/dir",
"gcs://bucketname/path/to/dir",
"gs://bucketname/path/to/dir",
"noop:///",
}

for _, testCase := range cases {
cfg.Mydumper.SourceDir = testCase

err := cfg.CheckAndAdjustFilePath()
c.Assert(err, IsNil)
}

}

func (s *configTestSuite) TestAdjustFileRoutePath(c *C) {
cfg := config.NewConfig()
assignMinimalLegalValue(cfg)
Expand Down
89 changes: 88 additions & 1 deletion br/pkg/lightning/restore/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,93 @@ func (s *tableRestoreSuite) TestPopulateChunks(c *C) {
s.cfg.Mydumper.CSV.Header = false
}

type errorLocalWriter struct{}

func (w errorLocalWriter) AppendRows(context.Context, string, []string, kv.Rows) error {
return errors.New("mock write rows failed")
}

func (w errorLocalWriter) IsSynced() bool {
return true
}

func (w errorLocalWriter) Close(context.Context) (backend.ChunkFlushStatus, error) {
return nil, nil
}

func (s *tableRestoreSuite) TestRestoreEngineFailed(c *C) {
ctx := context.Background()
ctrl := gomock.NewController(c)
mockBackend := mock.NewMockBackend(ctrl)
rc := &Controller{
cfg: s.cfg,
pauser: DeliverPauser,
ioWorkers: worker.NewPool(ctx, 1, "io"),
regionWorkers: worker.NewPool(ctx, 10, "region"),
store: s.store,
backend: backend.MakeBackend(mockBackend),
errorSummaries: makeErrorSummaries(log.L()),
saveCpCh: make(chan saveCp, 1),
diskQuotaLock: newDiskQuotaLock(),
}
defer close(rc.saveCpCh)
go func() {
for cp := range rc.saveCpCh {
cp.waitCh <- nil
}
}()

cp := &checkpoints.TableCheckpoint{
Engines: make(map[int32]*checkpoints.EngineCheckpoint),
}
err := s.tr.populateChunks(ctx, rc, cp)
c.Assert(err, IsNil)

tbl, err := tables.TableFromMeta(kv.NewPanickingAllocators(0), s.tableInfo.Core)
c.Assert(err, IsNil)
_, indexUUID := backend.MakeUUID("`db`.`table`", -1)
_, dataUUID := backend.MakeUUID("`db`.`table`", 0)
realBackend := tidb.NewTiDBBackend(nil, "replace", nil)
mockBackend.EXPECT().OpenEngine(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
mockBackend.EXPECT().OpenEngine(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
mockBackend.EXPECT().CloseEngine(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
mockBackend.EXPECT().NewEncoder(gomock.Any(), gomock.Any()).
Return(realBackend.NewEncoder(tbl, &kv.SessionOptions{})).
AnyTimes()
mockBackend.EXPECT().MakeEmptyRows().Return(realBackend.MakeEmptyRows()).AnyTimes()
mockBackend.EXPECT().LocalWriter(gomock.Any(), gomock.Any(), dataUUID).Return(noop.Writer{}, nil)
mockBackend.EXPECT().LocalWriter(gomock.Any(), gomock.Any(), indexUUID).
Return(nil, errors.New("mock open index local writer failed"))
openedIdxEngine, err := rc.backend.OpenEngine(ctx, nil, "`db`.`table`", -1)
c.Assert(err, IsNil)

// open the first engine meet error, should directly return the error
_, err = s.tr.restoreEngine(ctx, rc, openedIdxEngine, 0, cp.Engines[0])
c.Assert(err, ErrorMatches, "mock open index local writer failed")

localWriter := func(ctx context.Context, cfg *backend.LocalWriterConfig, engineUUID uuid.UUID) (backend.EngineWriter, error) {
time.Sleep(20 * time.Millisecond)
select {
case <-ctx.Done():
return nil, errors.New("mock open index local writer failed after ctx.Done")
default:
return noop.Writer{}, nil
}
}
mockBackend.EXPECT().OpenEngine(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
mockBackend.EXPECT().OpenEngine(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil)
mockBackend.EXPECT().LocalWriter(gomock.Any(), gomock.Any(), dataUUID).Return(errorLocalWriter{}, nil).AnyTimes()
mockBackend.EXPECT().LocalWriter(gomock.Any(), gomock.Any(), indexUUID).
DoAndReturn(localWriter).AnyTimes()

openedIdxEngine, err = rc.backend.OpenEngine(ctx, nil, "`db`.`table`", -1)
c.Assert(err, IsNil)

// open engine failed after write rows failed, should return write rows error
_, err = s.tr.restoreEngine(ctx, rc, openedIdxEngine, 0, cp.Engines[0])
c.Assert(err, ErrorMatches, "mock write rows failed")
}

func (s *tableRestoreSuite) TestPopulateChunksCSVHeader(c *C) {
fakeDataDir := c.MkDir()
store, err := storage.NewLocalStorage(fakeDataDir)
Expand Down Expand Up @@ -1091,7 +1178,7 @@ func (s *tableRestoreSuite) TestTableRestoreMetrics(c *C) {
chunkPending := metric.ReadCounter(metric.ChunkCounter.WithLabelValues(metric.ChunkStatePending))
chunkFinished := metric.ReadCounter(metric.ChunkCounter.WithLabelValues(metric.ChunkStatePending))
c.Assert(chunkPending-chunkPendingBase, Equals, float64(7))
c.Assert(chunkFinished-chunkFinishedBase, Equals, chunkPending)
c.Assert(chunkFinished-chunkFinishedBase, Equals, chunkPending-chunkPendingBase)

engineFinished := metric.ReadCounter(metric.ProcessedEngineCounter.WithLabelValues("imported", metric.TableResultSuccess))
c.Assert(engineFinished-engineFinishedBase, Equals, float64(8))
Expand Down
25 changes: 17 additions & 8 deletions br/pkg/lightning/restore/table_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,6 +462,11 @@ func (tr *TableRestore) restoreEngine(
}
}()

setError := func(err error) {
chunkErr.Set(err)
cancel()
}

// Restore table data
for chunkIndex, chunk := range cp.Chunks {
if chunk.Chunk.Offset >= chunk.Chunk.EndOffset {
Expand Down Expand Up @@ -500,27 +505,32 @@ func (tr *TableRestore) restoreEngine(
// 4. flush kvs data (into tikv node)
cr, err := newChunkRestore(ctx, chunkIndex, rc.cfg, chunk, rc.ioWorkers, rc.store, tr.tableInfo)
if err != nil {
return nil, errors.Trace(err)
setError(err)
break
}
var remainChunkCnt float64
if chunk.Chunk.Offset < chunk.Chunk.EndOffset {
remainChunkCnt = float64(chunk.Chunk.EndOffset-chunk.Chunk.Offset) / float64(chunk.Chunk.EndOffset-chunk.Key.Offset)
metric.ChunkCounter.WithLabelValues(metric.ChunkStatePending).Add(remainChunkCnt)
}

restoreWorker := rc.regionWorkers.Apply()
wg.Add(1)

dataWriter, err := dataEngine.LocalWriter(ctx, dataWriterCfg)
if err != nil {
return nil, errors.Trace(err)
cr.close()
setError(err)
break
}

indexWriter, err := indexEngine.LocalWriter(ctx, &backend.LocalWriterConfig{})
if err != nil {
return nil, errors.Trace(err)
_, _ = dataWriter.Close(ctx)
cr.close()
setError(err)
break
}

restoreWorker := rc.regionWorkers.Apply()
wg.Add(1)
go func(w *worker.Worker, cr *chunkRestore) {
// Restore a chunk.
defer func() {
Expand Down Expand Up @@ -555,8 +565,7 @@ func (tr *TableRestore) restoreEngine(
}
} else {
metric.ChunkCounter.WithLabelValues(metric.ChunkStateFailed).Add(remainChunkCnt)
chunkErr.Set(err)
cancel()
setError(err)
}
}(restoreWorker, cr)
}
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/restore/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ func DBFromConfig(ctx context.Context, dsn config.DBStore) (*sql.DB, error) {
"tidb_opt_write_row_id": "1",
// always set auto-commit to ON
"autocommit": "1",
// alway set transaction mode to optimistic
"tidb_txn_mode": "optimistic",
}

if dsn.Vars != nil {
Expand Down
7 changes: 7 additions & 0 deletions ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,13 @@ func (w *worker) onAddTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (v
func alterTablePartitionBundles(t *meta.Meta, tblInfo *model.TableInfo, addingDefinitions []model.PartitionDefinition) ([]*placement.Bundle, error) {
var bundles []*placement.Bundle

// tblInfo do not include added partitions, so we should add them first
tblInfo = tblInfo.Clone()
p := *tblInfo.Partition
p.Definitions = append([]model.PartitionDefinition{}, p.Definitions...)
p.Definitions = append(tblInfo.Partition.Definitions, addingDefinitions...)
tblInfo.Partition = &p

// bundle for table should be recomputed because it includes some default configs for partitions
tblBundle, err := placement.NewTableBundle(t, tblInfo)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions ddl/placement_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1511,15 +1511,15 @@ func (s *testDBSuite6) TestAddPartitionWithPlacement(c *C) {
policy, ok := tk.Se.GetInfoSchema().(infoschema.InfoSchema).PolicyByName(model.NewCIStr("p1"))
c.Assert(ok, IsTrue)

tk.MustExec(`CREATE TABLE tp (id INT) PARTITION BY RANGE (id) (
tk.MustExec(`CREATE TABLE tp (id INT) FOLLOWERS=1 PARTITION BY RANGE (id) (
PARTITION p0 VALUES LESS THAN (100),
PARTITION p1 VALUES LESS THAN (1000)
);`)
defer tk.MustExec("drop table tp")
tk.MustQuery("show create table tp").Check(testkit.Rows("" +
"tp CREATE TABLE `tp` (\n" +
" `id` int(11) DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] FOLLOWERS=1 */\n" +
"PARTITION BY RANGE (`id`)\n" +
"(PARTITION `p0` VALUES LESS THAN (100),\n" +
" PARTITION `p1` VALUES LESS THAN (1000))"))
Expand All @@ -1534,7 +1534,7 @@ func (s *testDBSuite6) TestAddPartitionWithPlacement(c *C) {
tk.MustQuery("show create table tp").Check(testkit.Rows("" +
"tp CREATE TABLE `tp` (\n" +
" `id` int(11) DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] FOLLOWERS=1 */\n" +
"PARTITION BY RANGE (`id`)\n" +
"(PARTITION `p0` VALUES LESS THAN (100),\n" +
" PARTITION `p1` VALUES LESS THAN (1000),\n" +
Expand Down Expand Up @@ -1563,7 +1563,7 @@ func (s *testDBSuite6) TestAddPartitionWithPlacement(c *C) {
tk.MustQuery("show create table tp").Check(testkit.Rows("" +
"tp CREATE TABLE `tp` (\n" +
" `id` int(11) DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] FOLLOWERS=1 */\n" +
"PARTITION BY RANGE (`id`)\n" +
"(PARTITION `p0` VALUES LESS THAN (100),\n" +
" PARTITION `p1` VALUES LESS THAN (1000),\n" +
Expand Down
6 changes: 6 additions & 0 deletions distsql/request_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ func (builder *RequestBuilder) SetPartitionsAndHandles(handles []kv.Handle) *Req
return builder
}

// SetIsolationLevel sets "IsolationLevel" for "kv.Request".
func (builder *RequestBuilder) SetIsolationLevel(level kv.IsoLevel) *RequestBuilder {
builder.Request.IsolationLevel = level
return builder
}

const estimatedRegionRowCount = 100000

// SetDAGRequest sets the request type to "ReqTypeDAG" and construct request data.
Expand Down
2 changes: 2 additions & 0 deletions domain/sysvar_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,8 @@ func (do *Domain) checkEnableServerGlobalVar(name, sVal string) {
storekv.StoreLimit.Store(val)
case variable.TiDBPersistAnalyzeOptions:
variable.PersistAnalyzeOptions.Store(variable.TiDBOptOn(sVal))
case variable.TiDBEnableColumnTracking:
variable.EnableColumnTracking.Store(variable.TiDBOptOn(sVal))
}
if err != nil {
logutil.BgLogger().Error(fmt.Sprintf("load global variable %s error", name), zap.Error(err))
Expand Down
21 changes: 21 additions & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,27 @@ func (a *ExecStmt) handleNoDelay(ctx context.Context, e Executor, isPessimistic
return false, nil, nil
}

func isNoResultPlan(p plannercore.Plan) bool {
if p.Schema().Len() == 0 {
return true
}

// Currently this is only for the "DO" statement. Take "DO 1, @a=2;" as an example:
// the Projection has two expressions and two columns in the schema, but we should
// not return the result of the two expressions.
switch raw := p.(type) {
case *plannercore.LogicalProjection:
if raw.CalculateNoDelay {
return true
}
case *plannercore.PhysicalProjection:
if raw.CalculateNoDelay {
return true
}
}
return false
}

// getMaxExecutionTime get the max execution timeout value.
func getMaxExecutionTime(sctx sessionctx.Context) uint64 {
if sctx.GetSessionVars().StmtCtx.HasMaxExecutionTime {
Expand Down
Loading

0 comments on commit 7f4667d

Please sign in to comment.