From 591a7d08ea44a82cb667784bc4d6447f68a7ee8f Mon Sep 17 00:00:00 2001 From: fengou1 <85682690+fengou1@users.noreply.github.com> Date: Mon, 27 Dec 2021 16:21:04 +0800 Subject: [PATCH] br: merge into feature branch (#31045) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * br: fix the integration tests (#30423) * util, cmd: remove unused filesort (#30438) * *: update client-go for small backoff time (#30436) * server: Fix unstable tests with FakeAuthSwitch (#30287) * dumpling: fix dump failed when sequence exists (#30164) * *: replace compareDatum by compare (#30421) * lightning: fix gcs max key limit (#30393) * expression, parser: add built-in func is_uuid (#30318) * expression: migrate test-infra to testify for constant_fold_test.go (#30424) * executor: fix pipelined window invalid memory address (#30418) * makefile: add gotestsum for verify ci (#29848) * server: close sql rows to fix unstable test (#30306) * Makefile: add coverage record for BR and Dumpling (#30457) * executor: track the mem usage of IndexMergeReader (#30210) * infosync: close body when ReadAll encounters error (#30462) * planner: show accessed partition when explain mpp query over partition table (#30367) * *: Fix use of user identity in SHOW GRANTS + error messages (#30294) * ddl: add not null flag for auto_increment column (#30477) * expression: make some unstable test serial (#30323) * expression: migrate test-infra to testify for constant_propagation_test.go (#30430) * executor: stable test TestSetDDLReorgBatchSize and TestSetDDLReorgWorkerCnt (#30480) * statistics, util/ranger: add cardinality estimation trace for `GetRowCountBy...` (#30321) * *: skip mysql client goroutine leak detection in integration ddl (#30467) * executor,util: write slow query to slow log no matter what log level (#30461) * executor: enable index_merge used in transaction. (#29875) * logutil: add testcase for SlowQueryLogger.MaxDays/MaxSize/MaxBackups (#30316) * expression: fix data race in builtin_other_vec_generated_test.go (#30503) * expression: fix data race in the collationInfo (#30490) * planner/core, session: fix error message of wrong variable scope (#30510) * lightning: support Re/ReregisterMySQL by different tls name (#30463) * executor: TestBatchGetandPointGetwithHashPartition test typo (#29669) (#29671) * mockstore: improve log to avoid panic for nil pointer (#30513) * *: replace compareDatum by compare, PR 10 (#30456) * planner: Disable dynamic partition prune mode for all non-autocommit (#27532) (#30505) * expression: change the log level of an confusing log from warn to debug (#30484) * br: Check crypter.key valid before backup (#29991) * *: replace compareDatum by compare, PR 11 (#30465) * dumpling: fix default column collation with upstream when dump table (#30531) * server: fix prepared cursor select (#30285) * executor: HashJoinExec checks the buildError even if the probeSide is empty (#30471) * parser, expression: follow mysql, increase interval precedence (#30528) * makefile: set timeout 25m for make race (#30555) * planner: fix the unstable test TestAnalyzeGlobalStatsWithOpts/2 (#30576) * expression,types: Adjusts UNIX_TIMESTAMP() for non-existing DST values (#28739) (#30405) * br: add res.Body.close to avoid leak (#30545) * lightning: add back integration test lightning_error_summary (#30547) * sessionctx/variable: small refactor (split large file) (#30511) * ddl: let `admin cancel ddl jobs` run in a new transaction (#30549) * *: Retry when placement PutBundles failed (#30590) * dumpling: delete unit test in github actions (#30562) * *: support trace plan target='estimation' statement (#30491) * expression: migrate test-infra to testify for integration_test.go (#30548) * planner: support trace for min/max eliminate (#30441) * support min/max trace Signed-off-by: yisaer * address the comment Signed-off-by: yisaer Co-authored-by: Ti Chi Robot * br: remove cdclog in br (#30573) * *: show cmd to check if all needed histograms are loaded (#29672) * expression: clone repertoire when clone the scalar function (#30602) * *: use the real StateRemote interface implementation for cached table (#30066) * *: query failed after add index / timestamp out-of-range (#28424) (#29323) * planner: implement collecting predicate columns from logical plan (#29878) * *: show PK name when decoding the clustered index row key (#30623) * ddl/callback_test.go: migrate test-infra to testify (#30317) * *: Rename some names of placement ddl operation (#30622) * executor: fix data race in the index_lookup_hash_join (#30619) * ddl: remove unnecessary locking when adding an index (#29772) * server: try to make `TidbTestSuite` more stable (#30643) * *: Add some PD tests for placement and fix some bug found (#30621) * *: migrate sync.WaitGroup to util.WaitGroupWrapper (#30644) * planner: add trace for join eliminate rule (#30343) * executor: migrate test-infra to testify for executor/shuffle_test.go (#30514) * planner: make (*AccessPath).OnlyPointRange more succinct (#30520) * planner: add trace for join reorder (#30394) * executor: migrate test-infra to testify for executor/union_scan_test.go (#30525) * expression: make cast return error if cast binary literal to another character set (#30537) * *: update tikv client (#30670) * *: update sysutil in go.mod to fix panic when search log (#30523) * topsql: shouldn't evict the SQL meta, since the evicted SQL can be appear on Other components (TiKV) TopN records (#27050) * testify: migrate test-infra to testify for analyze_test.go (#30640) * util: replace compareDatum by compare, point part (#30575) * test: make all the tests run in serial (#30692) * statistics: add mutex for Handle.globalMap and Handle.feedback (#30550) * executor: fix regular expression in json so that it could match identifer start with '$' (#29750) * util/testkit/testkit.go: fix typo (#30638) * planner: Introduce a new global variable to control the historical statistics feature (#30646) * topsql: introduce datasink interface (#30662) * planner: unify the argument of stats functions to use SessionCtx instead of StatementContext (#30668) * metrics: fix the Max SafeTS Gap metrics (#30689) * lightning: Add source dir existence check for s3 (#30674) * golangci-lint: support durationcheck (#30027) * executor: fix data race on IndexHashJoin.cancelFunc (#30701) * sessionctx/variable: change tidb_store_limit to global only (#30522) * statistics: remove reassignment of Handle.pool in NewHandle (#30675) * br: fix some unstable unit test cases. (#30716) * bindinfo: fix the comment typo (#30616) * server: support decoding prepared string args to character_set_client (#30723) * expression: fix enum type join binary get wrong result (#30445) * cmd/explaintest: fix wrong result comparison for explain test (#30717) * parallel create tables in br * metrics: fix copr-cache metrics (#30712) * test: merge executor's serial tests to other tests (#30711) * statistics: avoid deadlock when create/drop extended stats and analyze at the same time (#30566) * ddl: add batch create table api Signed-off-by: xhe * ddl: add unit tests Signed-off-by: xhe * ddl: fix fmt Signed-off-by: xhe * ddl: typo Co-authored-by: Arenatlx * ddl: fix tests Signed-off-by: xhe * ddl: rename to BatchCreateTableWithInfo Signed-off-by: xhe * ddl: trace the error Signed-off-by: xhe * ddl: comments Signed-off-by: xhe * ddl: cancle the job right Signed-off-by: xhe * ddl: cancel the job right 2 Signed-off-by: xhe * ddl: report error if entry too large Signed-off-by: xhe * ddl: report error when table is duplicated Signed-off-by: xhe * ddl: go fmt Signed-off-by: xhe * infoschema: improve batch memory perf Signed-off-by: xhe * ddl: retain ID Signed-off-by: xhe * sessionctx: fix the value of analyze_version when upgrading 4.x to 5.… (#30743) * ddl: reduce log frequency Signed-off-by: xhe * ddl: fix tests Signed-off-by: xhe * server: disable socket listener for `basicHTTPHandlerTestSuite` (#30680) * planner: support the plan cache aware of bindings (#30169) * planner: fix early set of plan's statisticsTable (#30754) * *: implement renew write lock lease for cached table (#30206) * *: Modify placement rule index to reserve some indexes for future work (#30737) * executor: add an unit test case for unreasonable invoking Close (#30696) * planner: fix wrong subquery's coercibility (#30750) * executor: add more testcases for index merge (#30497) * server: add grpc server config for a suitable behavior (#30774) * config, charset: make charset config not affected by collation config (#30572) * lightning: emit tidb log by change FilterCore to only allow matched packages (#30700) * topsql: a centralized place to generate tipb report data (#30781) * planner: add trace for partition pruning (#30546) * planner: refine collation handling for between (#30793) * test: merge serial tests in bindinfo, expression, parser and statistics (#30749) * br: update log description for split check (#30763) * *: replace compareDatum by compare, range part (#30710) * *: placement policy ref will be converted to direct options when recover or flashback table (#30705) * ddl: handle the incorrect number of placement followers (#30715) * ddl: revert "ddl: remove unnecessary locking when adding an index" (#30667) * br/pkg/task: migrate test-infra to testify (#30605) * *: fix the flen type datetime for union/case-when/control-funcs (#30588) * types, util: clean up compareDatum (#30815) * ddl: add helper function to set and query TiFlash's sync status (#30473) * dumpling: fix more dumpling log level query template (#30176) * parser: support `admin flush plan_cache` (#30747) * topsql: support multiple datasinks (#30808) * br: update permission, so tikv can write to folder when backup to local (#30396) * session: fix bootstrap to only persist global variables (#30593) close pingcap/tidb#28667 * docs/design: update collation compatibility issues in charsets doc (#30806) * executor: improve SET sysvar=DEFAULT handling (#29680) close pingcap/tidb#29670 * br: add error handling for group context cancel when restore file is corrupted (#30190) close pingcap/tidb#30135 * executor: buildWindow cannot call typeInfer twice (#30773) close pingcap/tidb#30402 * *: refactor encoding and uniform usages (#30288) * lightning: optimize region split check logic (#30428) close pingcap/tidb#30018 * br: ignore mock directory when gcov in br (#30586) * *: forbid set tiflash replica count for a placement table (#30844) close pingcap/tidb#30741 * execute: don't transform charset in internal sql (#30843) close pingcap/tidb#30789 * planner: update PlanBuilder.windowSpecs when building subquery (#30878) close pingcap/tidb#30804 * br: fix S3 backup endpoint suffix (#30530) close pingcap/tidb#30104 * lightning: make pre-check output message clearer (#30439) close pingcap/tidb#30395 * expression: wrap to_binary and from_binary for cast function's argument (#30706) * executor: fix bug when using IndexMerge in transaction (#30719) close pingcap/tidb#30685 * ddl: migrate test-infra to testify for ddl/foreign_key_test.go (#30853) close pingcap/tidb#29103 * expression: fix wrong retType for reverse function (#30829) close pingcap/tidb#30809 * planner: support trace topn push down (#30800) ref pingcap/tidb#29661 * github: add issue requirement to pull request template (#30817) close pingcap/tidb#30814 * fix merge issue * topsql: introduce stmtstats and sql execution count (#30277) * topsql: add pubsub datasink (#30860) * executor: fix the incorrect untouch used in optimistic transactions (#30447) close pingcap/tidb#30410 * expression, cmd: let crc32() support gbk (#30900) close pingcap/tidb#30898 * server: Add uptime status var and statistics (#29790) close pingcap/tidb#8842 * br: error log optimization (#29640) close pingcap/tidb#27015 * planner: fix wrong collation when rewrite in condition (#30492) close pingcap/tidb#30486 * planner: add extractor for tikv_region_peers (#30656) * fix issue that loss table restore * lightning: add back table empty check and add a switch config (#30887) close pingcap/tidb#27919 * br: improve backoff unit test (#30892) * *: add TxnManager to manage txn in session (#30574) * *: add TxnManager to manage txn in session * modify * add tests * move failpoint content to a single file * Makefile: add `t.Parallel` check to ensure tests are run in serial (#30869) * refactoring code * refactoring code * placement: remove isolationlevel (#30859) close pingcap/tidb#30858 * planner: revise the optimize trace output (#30882) * table: set the datum collation correctly in CastValue() (#30931) close pingcap/tidb#30930 * *: Use TxnManager.GetTxnInfoSchema() to get the txn infoschema (#30934) close pingcap/tidb#30933 * parser: add IsValid() to Encoding to speed up string validation for UTF-8 (#30937) close pingcap/tidb#30936 * planner: rename pstmtPlanCacheXX to PlanCacheXX (#30909) * table/tables: make CI TestCacheTableBasicReadAndWrite more stable (#30924) close pingcap/tidb#30922 * restore: use new ScatterRegions API (#30899) close pingcap/tidb#30425 * *: when placement conflicts with tiflash, cancel the job (#30945) * Makefile,tools: make CI great again! (#30828) close pingcap/tidb#30822 * br/pkg/membuf: remove global buffer pool (#29934) * ddl: add format error for incorrect dict syntax in the placement rule (#30919) close pingcap/tidb#30454 * planner: fix index merge plan when expr cannot be pushed to tikv (#30341) close pingcap/tidb#30200 * executor: display 'show create table' and INFOSCHEMA for cached table correctly (#30951) close pingcap/tidb#30950 * br: extend the timeout for scan region since 3 seconds is not enough (#30889) close pingcap/tidb#30720 * planner: remove bindSQL from planCacheKey to planCacheValue (#30916) * execution: refine precision of cast as decimal in agg func (#30805) * *: fix data race in the tikv_client (#30964) close pingcap/tidb#30658 * ddl: migrate test-infra to testify for ddl/db_partition_test.go (#30952) close pingcap/tidb#28635 * planner: fix `AccessPath.TableFilters` got modified unexpectedly (#30966) close pingcap/tidb#30965 * test: merge serial tests in ddl, infoschema, session, store, table, telemetry and types (#30874) * executor: fix the returned field count of the prepare statement (#30981) close pingcap/tidb#30971 * binlog: allow multiple ddl targets (#30904) * planner: trace predicate push down (#30902) ref pingcap/tidb#29661 * placement: give default 2 followers for non-sugar syntax (#31000) * flatten the json output (#30905) Signed-off-by: yisaer Co-authored-by: Ti Chi Robot * test: control log level with environment variables (#30871) * planner: add usage of paging copr in optimizer (#30536) close pingcap/tidb#30578 * test: merge serial tests in cmd, planner, server, util (#31003) * planner: change predicateColumnCollector to columnStatsUsageCollector and collect histogram-needed columns (#30671) * executor: migrate test-infra to testify for distsql_test.go (#31023) close pingcap/tidb#28574 * remote uncessary package errors * reused the retry code from lightning * refactoring retryable * ddl: add batch create table api Signed-off-by: xhe * ddl: add unit tests Signed-off-by: xhe * ddl: fix fmt Signed-off-by: xhe * br ddl code * parallel create tables in br * ddl: add batch create table api Signed-off-by: xhe * ddl: add unit tests Signed-off-by: xhe * ddl: fix fmt Signed-off-by: xhe * ddl: typo Co-authored-by: Arenatlx * ddl: fix tests Signed-off-by: xhe * ddl: rename to BatchCreateTableWithInfo Signed-off-by: xhe * ddl: trace the error Signed-off-by: xhe * ddl: comments Signed-off-by: xhe * ddl: cancle the job right Signed-off-by: xhe * ddl: cancel the job right 2 Signed-off-by: xhe * ddl: report error if entry too large Signed-off-by: xhe * ddl: report error when table is duplicated Signed-off-by: xhe * ddl: go fmt Signed-off-by: xhe * infoschema: improve batch memory perf Signed-off-by: xhe * ddl: retain ID Signed-off-by: xhe * ddl: reduce log frequency Signed-off-by: xhe * ddl: fix tests Signed-off-by: xhe * ddl: remove retainID from the interface Signed-off-by: xhe * ddl: fix tests Signed-off-by: xhe * executor: fix rebasing problem Signed-off-by: xhe * sessionctx: enable IndexMerge by default (#30650) close pingcap/tidb#29597 * br: Enable lint `gosec` in br (#30895) close pingcap/tidb#30699 * planner: support 'admin flush plan cache' (#30370) * merge from tidb batch_1 Co-authored-by: 3pointer Co-authored-by: wjHuang Co-authored-by: Lei Zhao Co-authored-by: Daniël van Eeden Co-authored-by: sylzd Co-authored-by: Jianjun Liao <36503113+Leavrth@users.noreply.github.com> Co-authored-by: unconsolable Co-authored-by: tison Co-authored-by: Shenghui Wu <793703860@qq.com> Co-authored-by: tangenta Co-authored-by: Weizhen Wang Co-authored-by: guo-shaoge Co-authored-by: Ryan Leung Co-authored-by: xufei Co-authored-by: Morgan Tocker Co-authored-by: Zhou Kunqin <25057648+time-and-fate@users.noreply.github.com> Co-authored-by: 王超 Co-authored-by: TonsnakeLin <87681388+TonsnakeLin@users.noreply.github.com> Co-authored-by: Ehco Co-authored-by: Mattias Jonsson Co-authored-by: HuaiyuXu <391585975@qq.com> Co-authored-by: Zak Zhao <57036248+joccau@users.noreply.github.com> Co-authored-by: WizardXiao <89761062+WizardXiao@users.noreply.github.com> Co-authored-by: xhe Co-authored-by: Hangjie Mo Co-authored-by: Yuanjia Zhang Co-authored-by: glorv Co-authored-by: djshow832 Co-authored-by: Chunzhu Li Co-authored-by: Song Gao Co-authored-by: Ti Chi Robot Co-authored-by: Xiaoju Wu Co-authored-by: xiongjiwei Co-authored-by: tiancaiamao Co-authored-by: Yifan Xu <30385241+xuyifangreeneyes@users.noreply.github.com> Co-authored-by: JmPotato Co-authored-by: Zach <51114270+zach030@users.noreply.github.com> Co-authored-by: bb7133 Co-authored-by: lvtu <37565148+tongtongyin@users.noreply.github.com> Co-authored-by: crazycs Co-authored-by: znhh6018 <44599853+znhh6018@users.noreply.github.com> Co-authored-by: eddie lin Co-authored-by: dongjunduo Co-authored-by: Zhenchi Co-authored-by: wangggong <793160615@qq.com> Co-authored-by: zhangjinpeng1987 Co-authored-by: Jack Yu Co-authored-by: Arenatlx Co-authored-by: Yiding Cui Co-authored-by: Chengpeng Yan <41809508+Reminiscent@users.noreply.github.com> Co-authored-by: bestwoody <89765764+bestwoody@users.noreply.github.com> Co-authored-by: Calvin Neo Co-authored-by: Lynn Co-authored-by: Zhuhe Fang Co-authored-by: Mini256 Co-authored-by: Xiang Zhang Co-authored-by: Yexiang Zhang Co-authored-by: cfzjywxk Co-authored-by: db <39407623+IcePigZDB@users.noreply.github.com> Co-authored-by: 山岚 <36239017+YuJuncen@users.noreply.github.com> Co-authored-by: Yujie Xia Co-authored-by: Yilong Li Co-authored-by: tuuuuuu <83738345+MiaoMiaoGarden@users.noreply.github.com> Co-authored-by: qupeng Co-authored-by: you06 --- .golangci_br.yml | 9 +- br/pkg/lightning/backend/kv/sql2kv.go | 4 +- br/pkg/lightning/checkpoints/checkpoints.go | 6 +- br/pkg/lightning/common/security.go | 2 +- br/pkg/lightning/lightning.go | 2 +- br/pkg/lightning/restore/meta_manager.go | 11 +- br/pkg/mock/mock_cluster.go | 2 +- br/pkg/storage/hdfs.go | 1 + br/pkg/utils/pprof.go | 2 +- cmd/explaintest/r/explain_indexmerge.result | 22 ++- domain/domain.go | 19 ++ executor/simple.go | 35 +++- planner/core/common_plans.go | 11 ++ planner/core/integration_test.go | 4 + planner/core/planbuilder.go | 2 + planner/core/prepare_test.go | 202 ++++++++++++++++++++ planner/core/testdata/plan_suite_out.json | 12 +- planner/optimize.go | 20 +- session/bootstrap.go | 57 +++++- session/bootstrap_test.go | 143 ++++++++++++++ sessionctx/variable/session.go | 5 +- sessionctx/variable/sysvar.go | 2 +- sessionctx/variable/sysvar_test.go | 9 + sessionctx/variable/tidb_vars.go | 1 + table/tables/cache.go | 54 ++---- table/tables/state_remote.go | 102 ++++++---- 26 files changed, 607 insertions(+), 132 deletions(-) diff --git a/.golangci_br.yml b/.golangci_br.yml index 28bbba74f749f..835a88488e7a3 100644 --- a/.golangci_br.yml +++ b/.golangci_br.yml @@ -26,7 +26,6 @@ linters: - exhaustivestruct - exhaustive - godot - - gosec - errorlint - wrapcheck - gomoddirectives @@ -81,3 +80,11 @@ linters-settings: issues: exclude-rules: + - path: br/tests/ + linters: + - gosec + - errcheck + - path: _test\.go + linters: + - gosec + diff --git a/br/pkg/lightning/backend/kv/sql2kv.go b/br/pkg/lightning/backend/kv/sql2kv.go index 45fd0ab664f50..658ad77d51d08 100644 --- a/br/pkg/lightning/backend/kv/sql2kv.go +++ b/br/pkg/lightning/backend/kv/sql2kv.go @@ -79,7 +79,7 @@ func NewTableKVEncoder(tbl table.Table, options *SessionOptions) (Encoder, error for _, col := range cols { if mysql.HasPriKeyFlag(col.Flag) { incrementalBits := autoRandomIncrementBits(col, int(meta.AutoRandomBits)) - autoRandomBits := rand.New(rand.NewSource(options.AutoRandomSeed)).Int63n(1< 0 { - rd := rand.New(rand.NewSource(options.AutoRandomSeed)) + rd := rand.New(rand.NewSource(options.AutoRandomSeed)) // nolint:gosec mask := int64(1)<= 0 { tableName := engine[:index] - engineID, err := strconv.Atoi(engine[index+1:]) + engineID, err := strconv.Atoi(engine[index+1:]) // nolint:gosec if err != nil { return nil, errors.Trace(err) } diff --git a/br/pkg/lightning/restore/meta_manager.go b/br/pkg/lightning/restore/meta_manager.go index 49358a9aee102..58d8c59966a6b 100644 --- a/br/pkg/lightning/restore/meta_manager.go +++ b/br/pkg/lightning/restore/meta_manager.go @@ -180,7 +180,7 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64 } needAutoID := common.TableHasAutoRowID(m.tr.tableInfo.Core) || m.tr.tableInfo.Core.GetAutoIncrementColInfo() != nil || m.tr.tableInfo.Core.ContainsAutoRandomBits() err = exec.Transact(ctx, "init table allocator base", func(ctx context.Context, tx *sql.Tx) error { - query := fmt.Sprintf("SELECT task_id, row_id_base, row_id_max, total_kvs_base, total_bytes_base, checksum_base, status from %s WHERE table_id = ? FOR UPDATE", m.tableName) + query := fmt.Sprintf("SELECT task_id, row_id_base, row_id_max, total_kvs_base, total_bytes_base, checksum_base, status from %s WHERE table_id = ? FOR UPDATE", m.tableName) // nolint:gosec rows, err := tx.QueryContext(ctx, query, m.tr.tableInfo.ID) if err != nil { return errors.Trace(err) @@ -381,6 +381,7 @@ func (m *dbTableMetaMgr) CheckAndUpdateLocalChecksum(ctx context.Context, checks needChecksum = true needRemoteDupe = true err = exec.Transact(ctx, "checksum pre-check", func(ctx context.Context, tx *sql.Tx) error { + // nolint:gosec query := fmt.Sprintf("SELECT task_id, total_kvs_base, total_bytes_base, checksum_base, total_kvs, total_bytes, checksum, status, has_duplicates from %s WHERE table_id = ? FOR UPDATE", m.tableName) rows, err := tx.QueryContext(ctx, query, m.tr.tableInfo.ID) if err != nil { @@ -593,7 +594,7 @@ func (m *dbTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) { // avoid override existing metadata if the meta is already inserted. exist := false err := exec.Transact(ctx, "check whether this task has started before", func(ctx context.Context, tx *sql.Tx) error { - query := fmt.Sprintf("SELECT task_id from %s WHERE task_id = %d", m.tableName, m.taskID) + query := fmt.Sprintf("SELECT task_id from %s WHERE task_id = %d", m.tableName, m.taskID) // nolint:gosec rows, err := tx.QueryContext(ctx, query) if err != nil { return errors.Annotate(err, "fetch task meta failed") @@ -635,7 +636,7 @@ func (m *dbTaskMetaMgr) CheckTasksExclusively(ctx context.Context, action func(t return errors.Annotate(err, "enable pessimistic transaction failed") } return exec.Transact(ctx, "check tasks exclusively", func(ctx context.Context, tx *sql.Tx) error { - query := fmt.Sprintf("SELECT task_id, pd_cfgs, status, state, source_bytes, cluster_avail from %s FOR UPDATE", m.tableName) + query := fmt.Sprintf("SELECT task_id, pd_cfgs, status, state, source_bytes, cluster_avail from %s FOR UPDATE", m.tableName) // nolint:gosec rows, err := tx.QueryContext(ctx, query) if err != nil { return errors.Annotate(err, "fetch task metas failed") @@ -695,7 +696,7 @@ func (m *dbTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.U paused := false var pausedCfg storedCfgs err = exec.Transact(ctx, "check and pause schedulers", func(ctx context.Context, tx *sql.Tx) error { - query := fmt.Sprintf("SELECT task_id, pd_cfgs, status, state from %s FOR UPDATE", m.tableName) + query := fmt.Sprintf("SELECT task_id, pd_cfgs, status, state from %s FOR UPDATE", m.tableName) // nolint:gosec rows, err := tx.QueryContext(ctx, query) if err != nil { return errors.Annotate(err, "fetch task meta failed") @@ -821,7 +822,7 @@ func (m *dbTaskMetaMgr) CheckAndFinishRestore(ctx context.Context, finished bool switchBack := true allFinished := finished err = exec.Transact(ctx, "check and finish schedulers", func(ctx context.Context, tx *sql.Tx) error { - query := fmt.Sprintf("SELECT task_id, status, state from %s FOR UPDATE", m.tableName) + query := fmt.Sprintf("SELECT task_id, status, state from %s FOR UPDATE", m.tableName) // nolint:gosec rows, err := tx.QueryContext(ctx, query) if err != nil { return errors.Annotate(err, "fetch task meta failed") diff --git a/br/pkg/mock/mock_cluster.go b/br/pkg/mock/mock_cluster.go index d1ece26505d05..387887a7f1b12 100644 --- a/br/pkg/mock/mock_cluster.go +++ b/br/pkg/mock/mock_cluster.go @@ -207,7 +207,7 @@ func waitUntilServerOnline(addr string, statusPort uint) string { // connect http status statusURL := fmt.Sprintf("http://127.0.0.1:%d/status", statusPort) for retry = 0; retry < retryTime; retry++ { - resp, err := http.Get(statusURL) // nolint:noctx + resp, err := http.Get(statusURL) // nolint:noctx,gosec if err == nil { // Ignore errors. _, _ = io.ReadAll(resp.Body) diff --git a/br/pkg/storage/hdfs.go b/br/pkg/storage/hdfs.go index cbcc24088292f..d2b3d996047ce 100644 --- a/br/pkg/storage/hdfs.go +++ b/br/pkg/storage/hdfs.go @@ -49,6 +49,7 @@ func dfsCommand(args ...string) (*exec.Cmd, error) { } cmd = append(cmd, bin, "dfs") cmd = append(cmd, args...) + //nolint:gosec return exec.Command(cmd[0], cmd[1:]...), nil } diff --git a/br/pkg/utils/pprof.go b/br/pkg/utils/pprof.go index 684d974174d7d..efa25389b80d8 100644 --- a/br/pkg/utils/pprof.go +++ b/br/pkg/utils/pprof.go @@ -11,7 +11,7 @@ import ( // #nosec // register HTTP handler for /debug/pprof "net/http" - _ "net/http/pprof" + _ "net/http/pprof" // nolint:gosec "github.com/pingcap/errors" "github.com/pingcap/failpoint" diff --git a/cmd/explaintest/r/explain_indexmerge.result b/cmd/explaintest/r/explain_indexmerge.result index 83bc89a593e7c..40e0cb4a1159d 100644 --- a/cmd/explaintest/r/explain_indexmerge.result +++ b/cmd/explaintest/r/explain_indexmerge.result @@ -6,19 +6,23 @@ create index td on t (d); load stats 's/explain_indexmerge_stats_t.json'; explain format = 'brief' select * from t where a < 50 or b < 50; id estRows task access object operator info -TableReader 98.00 root data:Selection -└─Selection 98.00 cop[tikv] or(lt(test.t.a, 50), lt(test.t.b, 50)) - └─TableFullScan 5000000.00 cop[tikv] table:t keep order:false +IndexMerge 98.00 root +├─TableRangeScan(Build) 49.00 cop[tikv] table:t range:[-inf,50), keep order:false +├─IndexRangeScan(Build) 49.00 cop[tikv] table:t, index:tb(b) range:[-inf,50), keep order:false +└─TableRowIDScan(Probe) 98.00 cop[tikv] table:t keep order:false explain format = 'brief' select * from t where (a < 50 or b < 50) and f > 100; id estRows task access object operator info -TableReader 98.00 root data:Selection -└─Selection 98.00 cop[tikv] gt(test.t.f, 100), or(lt(test.t.a, 50), lt(test.t.b, 50)) - └─TableFullScan 5000000.00 cop[tikv] table:t keep order:false +IndexMerge 98.00 root +├─TableRangeScan(Build) 49.00 cop[tikv] table:t range:[-inf,50), keep order:false +├─IndexRangeScan(Build) 49.00 cop[tikv] table:t, index:tb(b) range:[-inf,50), keep order:false +└─Selection(Probe) 98.00 cop[tikv] gt(test.t.f, 100) + └─TableRowIDScan 98.00 cop[tikv] table:t keep order:false explain format = 'brief' select * from t where b < 50 or c < 50; id estRows task access object operator info -TableReader 98.00 root data:Selection -└─Selection 98.00 cop[tikv] or(lt(test.t.b, 50), lt(test.t.c, 50)) - └─TableFullScan 5000000.00 cop[tikv] table:t keep order:false +IndexMerge 98.00 root +├─IndexRangeScan(Build) 49.00 cop[tikv] table:t, index:tb(b) range:[-inf,50), keep order:false +├─IndexRangeScan(Build) 49.00 cop[tikv] table:t, index:tc(c) range:[-inf,50), keep order:false +└─TableRowIDScan(Probe) 98.00 cop[tikv] table:t keep order:false set session tidb_enable_index_merge = on; explain format = 'brief' select * from t where a < 50 or b < 50; id estRows task access object operator info diff --git a/domain/domain.go b/domain/domain.go index d3a9664b97fe7..3560b865d2cea 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -50,6 +50,7 @@ import ( "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/statistics/handle" "github.com/pingcap/tidb/telemetry" + "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/domainutil" @@ -89,6 +90,7 @@ type Domain struct { cancel context.CancelFunc indexUsageSyncLease time.Duration planReplayer *planReplayer + expiredTimeStamp4PC types.Time serverID uint64 serverIDSession *concurrency.Session @@ -335,6 +337,22 @@ func (do *Domain) GetSnapshotMeta(startTS uint64) (*meta.Meta, error) { return meta.NewSnapshotMeta(snapshot), nil } +// ExpiredTimeStamp4PC gets expiredTimeStamp4PC from domain. +func (do *Domain) ExpiredTimeStamp4PC() types.Time { + do.m.Lock() + defer do.m.Unlock() + + return do.expiredTimeStamp4PC +} + +// SetExpiredTimeStamp4PC sets the expiredTimeStamp4PC from domain. +func (do *Domain) SetExpiredTimeStamp4PC(time types.Time) { + do.m.Lock() + defer do.m.Unlock() + + do.expiredTimeStamp4PC = time +} + // DDL gets DDL from domain. func (do *Domain) DDL() ddl.DDL { return do.ddl @@ -712,6 +730,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio planReplayer: &planReplayer{planReplayerGCLease: planReplayerGCLease}, onClose: onClose, renewLeaseCh: make(chan func(), 10), + expiredTimeStamp4PC: types.NewTime(types.ZeroCoreTime, mysql.TypeTimestamp, types.DefaultFsp), } do.SchemaValidator = NewSchemaValidator(ddlLease, do) diff --git a/executor/simple.go b/executor/simple.go index ee6932ad1dc91..388269ce8120e 100644 --- a/executor/simple.go +++ b/executor/simple.go @@ -42,6 +42,7 @@ import ( "github.com/pingcap/tidb/privilege" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/variable" + "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/collate" @@ -160,7 +161,7 @@ func (e *SimpleExec) Next(ctx context.Context, req *chunk.Chunk) (err error) { case *ast.ShutdownStmt: err = e.executeShutdown(x) case *ast.AdminStmt: - err = e.executeAdminReloadStatistics(x) + err = e.executeAdmin(x) } e.done = true return err @@ -1659,6 +1660,16 @@ func asyncDelayShutdown(p *os.Process, delay time.Duration) { } } +func (e *SimpleExec) executeAdmin(s *ast.AdminStmt) error { + switch s.Tp { + case ast.AdminReloadStatistics: + return e.executeAdminReloadStatistics(s) + case ast.AdminFlushPlanCache: + return e.executeAdminFlushPlanCache(s) + } + return nil +} + func (e *SimpleExec) executeAdminReloadStatistics(s *ast.AdminStmt) error { if s.Tp != ast.AdminReloadStatistics { return errors.New("This AdminStmt is not ADMIN RELOAD STATS_EXTENDED") @@ -1668,3 +1679,25 @@ func (e *SimpleExec) executeAdminReloadStatistics(s *ast.AdminStmt) error { } return domain.GetDomain(e.ctx).StatsHandle().ReloadExtendedStatistics() } + +func (e *SimpleExec) executeAdminFlushPlanCache(s *ast.AdminStmt) error { + if s.Tp != ast.AdminFlushPlanCache { + return errors.New("This AdminStmt is not ADMIN FLUSH PLAN_CACHE") + } + if s.StatementScope == ast.StatementScopeGlobal { + return errors.New("Do not support the 'admin flush global scope.'") + } + if !plannercore.PreparedPlanCacheEnabled() { + e.ctx.GetSessionVars().StmtCtx.AppendWarning(errors.New("The plan cache is disable. So there no need to flush the plan cache")) + return nil + } + now := types.NewTime(types.FromGoTime(time.Now().In(e.ctx.GetSessionVars().StmtCtx.TimeZone)), mysql.TypeTimestamp, 3) + e.ctx.GetSessionVars().LastUpdateTime4PC = now + e.ctx.PreparedPlanCache().DeleteAll() + if s.StatementScope == ast.StatementScopeInstance { + // Record the timestamp. When other sessions want to use the plan cache, + // it will check the timestamp first to decide whether the plan cache should be flushed. + domain.GetDomain(e.ctx).SetExpiredTimeStamp4PC(now) + } + return nil +} diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index d42d32e5ea111..8c0911ef43393 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -294,6 +294,16 @@ func (e *Execute) OptimizePreparedPlan(ctx context.Context, sctx sessionctx.Cont } prepared.SchemaVersion = is.SchemaMetaVersion() } + // If the lastUpdateTime less than expiredTimeStamp4PC, + // it means other sessions have executed 'admin flush instance plan_cache'. + // So we need to clear the current session's plan cache. + // And update lastUpdateTime to the newest one. + expiredTimeStamp4PC := domain.GetDomain(sctx).ExpiredTimeStamp4PC() + if prepared.UseCache && expiredTimeStamp4PC.Compare(vars.LastUpdateTime4PC) > 0 { + sctx.PreparedPlanCache().DeleteAll() + prepared.CachedPlan = nil + vars.LastUpdateTime4PC = expiredTimeStamp4PC + } err = e.getPhysicalPlan(ctx, sctx, is, preparedObj) if err != nil { return err @@ -401,6 +411,7 @@ func (e *Execute) getPhysicalPlan(ctx context.Context, sctx sessionctx.Context, } stmtCtx.UseCache = prepared.UseCache + var bindSQL string if prepared.UseCache { bindSQL = GetBindSQL4PlanCache(sctx, prepared.Stmt) cacheKey = NewPlanCacheKey(sctx.GetSessionVars(), e.ExecID, prepared.SchemaVersion) diff --git a/planner/core/integration_test.go b/planner/core/integration_test.go index 38048d4d30009..30c5a205ab257 100644 --- a/planner/core/integration_test.go +++ b/planner/core/integration_test.go @@ -2203,6 +2203,10 @@ func (s *testIntegrationSuite) TestOptimizeHintOnPartitionTable(c *C) { partition p1 values less than(11), partition p2 values less than(16));`) tk.MustExec(`insert into t values (1,1,"1"), (2,2,"2"), (8,8,"8"), (11,11,"11"), (15,15,"15")`) + tk.MustExec("set @@tidb_enable_index_merge = off") + defer func() { + tk.MustExec("set @@tidb_enable_index_merge = on") + }() // Create virtual tiflash replica info. dom := domain.GetDomain(tk.Se) diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index b830d26da025d..b91db6e0e995a 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -1380,6 +1380,8 @@ func (b *PlanBuilder) buildAdmin(ctx context.Context, as *ast.AdminStmt) (Plan, return &AdminResetTelemetryID{}, nil case ast.AdminReloadStatistics: return &Simple{Statement: as}, nil + case ast.AdminFlushPlanCache: + return &Simple{Statement: as}, nil default: return nil, ErrUnsupportedType.GenWithStack("Unsupported ast.AdminStmt(%T) for buildAdmin", as) } diff --git a/planner/core/prepare_test.go b/planner/core/prepare_test.go index d633896e7b718..637fdf6396cf9 100644 --- a/planner/core/prepare_test.go +++ b/planner/core/prepare_test.go @@ -51,6 +51,208 @@ type testPrepareSuite struct { type testPrepareSerialSuite struct { } +func (s *testPrepareSerialSuite) TestFlushPlanCache(c *C) { + defer testleak.AfterTest(c)() + store, dom, err := newStoreWithBootstrap() + c.Assert(err, IsNil) + tk := testkit.NewTestKit(c, store) + tk2 := testkit.NewTestKit(c, store) + orgEnable := core.PreparedPlanCacheEnabled() + defer func() { + dom.Close() + err = store.Close() + c.Assert(err, IsNil) + core.SetPreparedPlanCache(orgEnable) + }() + core.SetPreparedPlanCache(true) + tk.Se, err = session.CreateSession4TestWithOpt(store, &session.Opt{ + PreparedPlanCache: kvcache.NewSimpleLRUCache(100, 0.1, math.MaxUint64), + }) + c.Assert(err, IsNil) + + tk.MustExec("use test") + tk.MustExec("drop table if exists t1") + tk.MustExec("drop table if exists t2") + tk.MustExec("create table t1(id int, a int, b int, key(a))") + tk.MustExec("create table t2(id int, a int, b int, key(a))") + tk.MustExec("prepare stmt1 from 'SELECT * from t1,t2 where t1.id = t2.id';") + tk.MustExec("execute stmt1;") + tk.MustExec("execute stmt1;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) + + tk.MustExec("prepare stmt2 from 'SELECT * from t1';") + tk.MustExec("execute stmt2;") + tk.MustExec("execute stmt2;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) + + tk.MustExec("prepare stmt3 from 'SELECT * from t1 where id = 1';") + tk.MustExec("execute stmt3;") + tk.MustExec("execute stmt3;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) + + tk2.MustExec("use test") + tk2.MustExec("drop table if exists t1") + tk2.MustExec("drop table if exists t2") + tk2.MustExec("create table t1(id int, a int, b int, key(a))") + tk2.MustExec("create table t2(id int, a int, b int, key(a))") + tk2.MustExec("prepare stmt1 from 'SELECT * from t1,t2 where t1.id = t2.id';") + tk2.MustExec("execute stmt1;") + tk2.MustExec("execute stmt1;") + tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) + + tk2.MustExec("prepare stmt2 from 'SELECT * from t1';") + tk2.MustExec("execute stmt2;") + tk2.MustExec("execute stmt2;") + tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) + + tk2.MustExec("prepare stmt3 from 'SELECT * from t1 where id = 1';") + tk2.MustExec("execute stmt3;") + tk2.MustExec("execute stmt3;") + tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) + + tk.MustExec("admin flush session plan_cache;") + tk.MustExec("execute stmt1;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + tk.MustExec("execute stmt2;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + tk.MustExec("execute stmt3;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + + tk2.MustExec("execute stmt1;") + tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) + tk2.MustExec("execute stmt2;") + tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) + tk2.MustExec("execute stmt3;") + tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) + + tk.MustExec("execute stmt1;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) + tk.MustExec("execute stmt2;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) + tk.MustExec("execute stmt3;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) + + tk2.MustExec("admin flush instance plan_cache;") + tk2.MustExec("execute stmt1;") + tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + tk2.MustExec("execute stmt2;") + tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + tk2.MustExec("execute stmt3;") + tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + + tk.MustExec("execute stmt1;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + tk.MustExec("execute stmt2;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + tk.MustExec("execute stmt3;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + + err = tk.ExecToErr("admin flush global plan_cache;") + c.Check(err.Error(), Equals, "Do not support the 'admin flush global scope.'") +} + +func (s *testPrepareSerialSuite) TestFlushPlanCacheWithoutPCEnable(c *C) { + defer testleak.AfterTest(c)() + store, dom, err := newStoreWithBootstrap() + c.Assert(err, IsNil) + tk := testkit.NewTestKit(c, store) + tk2 := testkit.NewTestKit(c, store) + orgEnable := core.PreparedPlanCacheEnabled() + defer func() { + dom.Close() + err = store.Close() + c.Assert(err, IsNil) + core.SetPreparedPlanCache(orgEnable) + }() + core.SetPreparedPlanCache(false) + tk.Se, err = session.CreateSession4TestWithOpt(store, &session.Opt{ + PreparedPlanCache: kvcache.NewSimpleLRUCache(100, 0.1, math.MaxUint64), + }) + c.Assert(err, IsNil) + + tk.MustExec("use test") + tk.MustExec("drop table if exists t1") + tk.MustExec("drop table if exists t2") + tk.MustExec("create table t1(id int, a int, b int, key(a))") + tk.MustExec("create table t2(id int, a int, b int, key(a))") + tk.MustExec("prepare stmt1 from 'SELECT * from t1,t2 where t1.id = t2.id';") + tk.MustExec("execute stmt1;") + tk.MustExec("execute stmt1;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + + tk.MustExec("prepare stmt2 from 'SELECT * from t1';") + tk.MustExec("execute stmt2;") + tk.MustExec("execute stmt2;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + + tk.MustExec("prepare stmt3 from 'SELECT * from t1 where id = 1';") + tk.MustExec("execute stmt3;") + tk.MustExec("execute stmt3;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + + tk2.MustExec("use test") + tk2.MustExec("drop table if exists t1") + tk2.MustExec("drop table if exists t2") + tk2.MustExec("create table t1(id int, a int, b int, key(a))") + tk2.MustExec("create table t2(id int, a int, b int, key(a))") + tk2.MustExec("prepare stmt1 from 'SELECT * from t1,t2 where t1.id = t2.id';") + tk2.MustExec("execute stmt1;") + tk2.MustExec("execute stmt1;") + tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + + tk2.MustExec("prepare stmt2 from 'SELECT * from t1';") + tk2.MustExec("execute stmt2;") + tk2.MustExec("execute stmt2;") + tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + + tk2.MustExec("prepare stmt3 from 'SELECT * from t1 where id = 1';") + tk2.MustExec("execute stmt3;") + tk2.MustExec("execute stmt3;") + tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + + tk.MustExec("admin flush session plan_cache;") + tk.MustQuery("show warnings;").Check(testkit.Rows("Warning 1105 The plan cache is disable. So there no need to flush the plan cache")) + tk.MustExec("execute stmt1;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + tk.MustExec("execute stmt2;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + tk.MustExec("execute stmt3;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + + tk2.MustExec("execute stmt1;") + tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + tk2.MustExec("execute stmt2;") + tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + tk2.MustExec("execute stmt3;") + tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + + tk.MustExec("execute stmt1;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + tk.MustExec("execute stmt2;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + tk.MustExec("execute stmt3;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + + tk2.MustExec("admin flush instance plan_cache;") + tk2.MustQuery("show warnings;").Check(testkit.Rows("Warning 1105 The plan cache is disable. So there no need to flush the plan cache")) + tk2.MustExec("execute stmt1;") + tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + tk2.MustExec("execute stmt2;") + tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + tk2.MustExec("execute stmt3;") + tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + + tk.MustExec("execute stmt1;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + tk.MustExec("execute stmt2;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + tk.MustExec("execute stmt3;") + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + + err = tk.ExecToErr("admin flush global plan_cache;") + c.Check(err.Error(), Equals, "Do not support the 'admin flush global scope.'") +} + func (s *testPrepareSerialSuite) TestPrepareCache(c *C) { defer testleak.AfterTest(c)() store, dom, err := newStoreWithBootstrap() diff --git a/planner/core/testdata/plan_suite_out.json b/planner/core/testdata/plan_suite_out.json index f70d1eb7e5550..013d86e227342 100644 --- a/planner/core/testdata/plan_suite_out.json +++ b/planner/core/testdata/plan_suite_out.json @@ -286,9 +286,9 @@ }, { "SQL": "select /*+ USE_INDEX_MERGE(t1, c_d_e, f_g) */ * from t where c < 1 or f > 2", - "Best": "TableReader(Table(t)->Sel([or(lt(test.t.c, 1), gt(test.t.f, 2))]))", + "Best": "IndexMergeReader(PartialPlans->[Index(t.c_d_e)[[-inf,1)], Index(t.f)[(2,+inf]]], TablePlan->Table(t))", "HasWarn": true, - "Hints": "use_index(@`sel_1` `test`.`t` )" + "Hints": "use_index_merge(@`sel_1` `t` `c_d_e`, `f`)" }, { "SQL": "select /*+ NO_INDEX_MERGE(), USE_INDEX_MERGE(t, primary, f_g, c_d_e) */ * from t where a < 1 or f > 2", @@ -304,15 +304,15 @@ }, { "SQL": "select /*+ USE_INDEX_MERGE(db2.t) */ * from t where c < 1 or f > 2", - "Best": "TableReader(Table(t)->Sel([or(lt(test.t.c, 1), gt(test.t.f, 2))]))", + "Best": "IndexMergeReader(PartialPlans->[Index(t.c_d_e)[[-inf,1)], Index(t.f)[(2,+inf]]], TablePlan->Table(t))", "HasWarn": true, - "Hints": "use_index(@`sel_1` `test`.`t` )" + "Hints": "use_index_merge(@`sel_1` `t` `c_d_e`, `f`)" }, { "SQL": "select /*+ USE_INDEX_MERGE(db2.t, c_d_e, f_g) */ * from t where c < 1 or f > 2", - "Best": "TableReader(Table(t)->Sel([or(lt(test.t.c, 1), gt(test.t.f, 2))]))", + "Best": "IndexMergeReader(PartialPlans->[Index(t.c_d_e)[[-inf,1)], Index(t.f)[(2,+inf]]], TablePlan->Table(t))", "HasWarn": true, - "Hints": "use_index(@`sel_1` `test`.`t` )" + "Hints": "use_index_merge(@`sel_1` `t` `c_d_e`, `f`)" } ] }, diff --git a/planner/optimize.go b/planner/optimize.go index 028474a373e0e..73c2b137a2b38 100644 --- a/planner/optimize.go +++ b/planner/optimize.go @@ -152,23 +152,9 @@ func Optimize(ctx context.Context, sctx sessionctx.Context, node ast.Node, is in if !ok { useBinding = false } - - var ( - bindRecord *bindinfo.BindRecord - scope string - err error - ) - if useBinding { - bindRecord, scope, err = getBindRecord(sctx, stmtNode) - if err != nil || bindRecord == nil || len(bindRecord.Bindings) == 0 { - useBinding = false - } - } - if ok { - // add the extra Limit after matching the bind record - stmtNode = plannercore.TryAddExtraLimit(sctx, stmtNode) - node = stmtNode - + bindRecord, scope, match := matchSQLBinding(sctx, stmtNode) + if !match { + useBinding = false } if ok { // add the extra Limit after matching the bind record diff --git a/session/bootstrap.go b/session/bootstrap.go index 53f2e95c164a5..1fdb93b58ff0e 100644 --- a/session/bootstrap.go +++ b/session/bootstrap.go @@ -538,14 +538,17 @@ const ( version78 = 78 // version79 adds the mysql.table_cache_meta table version79 = 79 - + // version80 fixes the issue https://github.com/pingcap/tidb/issues/25422. + // If the TiDB upgrading from the 4.x to a newer version, we keep the tidb_analyze_version to 1. + version80 = 80 + // version81 insert "tidb_enable_index_merge|off" to mysql.GLOBAL_VARIABLES if there is no tidb_enable_index_merge. + // This will only happens when we upgrade a cluster before 4.0.0 to 4.0.0+. + version81 = 81 ) // currentBootstrapVersion is defined as a variable, so we can modify its value for testing. // please make sure this is the largest version - -var currentBootstrapVersion int64 = version79 - +var currentBootstrapVersion int64 = version81 var ( bootstrapVersion = []func(Session, int64){ @@ -628,7 +631,8 @@ var ( upgradeToVer77, upgradeToVer78, upgradeToVer79, - + upgradeToVer80, + upgradeToVer81, } ) @@ -1634,6 +1638,49 @@ func upgradeToVer79(s Session, ver int64) { doReentrantDDL(s, CreateTableCacheMetaTable) } +func upgradeToVer80(s Session, ver int64) { + if ver >= version80 { + return + } + // Check if tidb_analyze_version exists in mysql.GLOBAL_VARIABLES. + // If not, insert "tidb_analyze_version | 1" since this is the old behavior before we introduce this variable. + ctx := context.Background() + rs, err := s.ExecuteInternal(ctx, "SELECT VARIABLE_VALUE FROM %n.%n WHERE VARIABLE_NAME=%?;", + mysql.SystemDB, mysql.GlobalVariablesTable, variable.TiDBAnalyzeVersion) + terror.MustNil(err) + req := rs.NewChunk(nil) + err = rs.Next(ctx, req) + terror.MustNil(err) + if req.NumRows() != 0 { + return + } + + mustExecute(s, "INSERT HIGH_PRIORITY IGNORE INTO %n.%n VALUES (%?, %?);", + mysql.SystemDB, mysql.GlobalVariablesTable, variable.TiDBAnalyzeVersion, 1) +} + +// For users that upgrade TiDB from a pre-4.0 version, we want to disable index merge by default. +// This helps minimize query plan regressions. +func upgradeToVer81(s Session, ver int64) { + if ver >= version81 { + return + } + // Check if tidb_enable_index_merge exists in mysql.GLOBAL_VARIABLES. + // If not, insert "tidb_enable_index_merge | off". + ctx := context.Background() + rs, err := s.ExecuteInternal(ctx, "SELECT VARIABLE_VALUE FROM %n.%n WHERE VARIABLE_NAME=%?;", + mysql.SystemDB, mysql.GlobalVariablesTable, variable.TiDBEnableIndexMerge) + terror.MustNil(err) + req := rs.NewChunk(nil) + err = rs.Next(ctx, req) + terror.MustNil(err) + if req.NumRows() != 0 { + return + } + + mustExecute(s, "INSERT HIGH_PRIORITY IGNORE INTO %n.%n VALUES (%?, %?);", + mysql.SystemDB, mysql.GlobalVariablesTable, variable.TiDBEnableIndexMerge, variable.Off) +} func writeOOMAction(s Session) { comment := "oom-action is `log` by default in v3.0.x, `cancel` by default in v4.0.11+" diff --git a/session/bootstrap_test.go b/session/bootstrap_test.go index 6caf7e702c10b..1b44f58667114 100644 --- a/session/bootstrap_test.go +++ b/session/bootstrap_test.go @@ -928,3 +928,146 @@ func TestAnalyzeVersionUpgradeFrom300To500(t *testing.T) { require.Equal(t, 1, row.Len()) require.Equal(t, "1", row.GetString(0)) } + + +func TestIndexMergeInNewCluster(t *testing.T) { + store, err := mockstore.NewMockStore() + require.NoError(t, err) + // Indicates we are in a new cluster. + require.Equal(t, int64(notBootstrapped), getStoreBootstrapVersion(store)) + dom, err := BootstrapSession(store) + require.NoError(t, err) + defer func() { require.NoError(t, store.Close()) }() + defer dom.Close() + se := createSessionAndSetID(t, store) + + // In a new created cluster(above 5.4+), tidb_enable_index_merge is 1 by default. + mustExec(t, se, "use test;") + r := mustExec(t, se, "select @@tidb_enable_index_merge;") + require.NotNil(t, r) + + ctx := context.Background() + chk := r.NewChunk(nil) + err = r.Next(ctx, chk) + require.NoError(t, err) + require.Equal(t, 1, chk.NumRows()) + row := chk.GetRow(0) + require.Equal(t, 1, row.Len()) + require.Equal(t, int64(1), row.GetInt64(0)) +} + +func TestIndexMergeUpgradeFrom300To540(t *testing.T) { + ctx := context.Background() + store, _ := createStoreAndBootstrap(t) + defer func() { require.NoError(t, store.Close()) }() + + // Upgrade from 3.0.0 to 5.4+. + ver300 := 33 + seV3 := createSessionAndSetID(t, store) + txn, err := store.Begin() + require.NoError(t, err) + m := meta.NewMeta(txn) + err = m.FinishBootstrap(int64(ver300)) + require.NoError(t, err) + err = txn.Commit(context.Background()) + require.NoError(t, err) + mustExec(t, seV3, fmt.Sprintf("update mysql.tidb set variable_value=%d where variable_name='tidb_server_version'", ver300)) + mustExec(t, seV3, fmt.Sprintf("delete from mysql.GLOBAL_VARIABLES where variable_name='%s'", variable.TiDBEnableIndexMerge)) + mustExec(t, seV3, "commit") + unsetStoreBootstrapped(store.UUID()) + ver, err := getBootstrapVersion(seV3) + require.NoError(t, err) + require.Equal(t, int64(ver300), ver) + + // We are now in 3.0.0, check tidb_enable_index_merge shoudle not exist. + res := mustExec(t, seV3, fmt.Sprintf("select * from mysql.GLOBAL_VARIABLES where variable_name='%s'", variable.TiDBEnableIndexMerge)) + chk := res.NewChunk(nil) + err = res.Next(ctx, chk) + require.NoError(t, err) + require.Equal(t, 0, chk.NumRows()) + + domCurVer, err := BootstrapSession(store) + require.NoError(t, err) + defer domCurVer.Close() + seCurVer := createSessionAndSetID(t, store) + ver, err = getBootstrapVersion(seCurVer) + require.NoError(t, err) + require.Equal(t, currentBootstrapVersion, ver) + + // We are now in 5.x, tidb_enable_index_merge should be off. + res = mustExec(t, seCurVer, "select @@tidb_enable_index_merge") + chk = res.NewChunk(nil) + err = res.Next(ctx, chk) + require.NoError(t, err) + require.Equal(t, 1, chk.NumRows()) + row := chk.GetRow(0) + require.Equal(t, 1, row.Len()) + require.Equal(t, int64(0), row.GetInt64(0)) +} + +func TestIndexMergeUpgradeFrom400To540(t *testing.T) { + for i := 0; i < 2; i++ { + ctx := context.Background() + store, _ := createStoreAndBootstrap(t) + defer func() { require.NoError(t, store.Close()) }() + + // upgrade from 4.0.0 to 5.4+. + ver400 := 46 + seV4 := createSessionAndSetID(t, store) + txn, err := store.Begin() + require.NoError(t, err) + m := meta.NewMeta(txn) + err = m.FinishBootstrap(int64(ver400)) + require.NoError(t, err) + err = txn.Commit(context.Background()) + require.NoError(t, err) + mustExec(t, seV4, fmt.Sprintf("update mysql.tidb set variable_value=%d where variable_name='tidb_server_version'", ver400)) + mustExec(t, seV4, fmt.Sprintf("update mysql.GLOBAL_VARIABLES set variable_value='%s' where variable_name='%s'", variable.Off, variable.TiDBEnableIndexMerge)) + mustExec(t, seV4, "commit") + unsetStoreBootstrapped(store.UUID()) + ver, err := getBootstrapVersion(seV4) + require.NoError(t, err) + require.Equal(t, int64(ver400), ver) + + // We are now in 4.0.0, tidb_enable_index_merge is off. + res := mustExec(t, seV4, fmt.Sprintf("select * from mysql.GLOBAL_VARIABLES where variable_name='%s'", variable.TiDBEnableIndexMerge)) + chk := res.NewChunk(nil) + err = res.Next(ctx, chk) + require.NoError(t, err) + require.Equal(t, 1, chk.NumRows()) + row := chk.GetRow(0) + require.Equal(t, 2, row.Len()) + require.Equal(t, variable.Off, row.GetString(1)) + + if i == 0 { + // For the first time, We set tidb_enable_index_merge as on. + // And after upgrade to 5.x, tidb_enable_index_merge should remains to be on. + // For the second it should be off. + mustExec(t, seV4, "set global tidb_enable_index_merge = on") + } + + // Upgrade to 5.x. + domCurVer, err := BootstrapSession(store) + require.NoError(t, err) + defer domCurVer.Close() + seCurVer := createSessionAndSetID(t, store) + ver, err = getBootstrapVersion(seCurVer) + require.NoError(t, err) + require.Equal(t, currentBootstrapVersion, ver) + + // We are now in 5.x, tidb_enable_index_merge should be on because we enable it in 4.0.0. + res = mustExec(t, seCurVer, "select @@tidb_enable_index_merge") + chk = res.NewChunk(nil) + err = res.Next(ctx, chk) + require.NoError(t, err) + require.Equal(t, 1, chk.NumRows()) + row = chk.GetRow(0) + require.Equal(t, 1, row.Len()) + if i == 0 { + require.Equal(t, int64(1), row.GetInt64(0)) + } else { + require.Equal(t, int64(0), row.GetInt64(0)) + } + } +} + diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 3b0c8f33402e9..b205bda1de5ab 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -465,7 +465,8 @@ type SessionVars struct { // preparedStmtID is id of prepared statement. preparedStmtID uint32 // PreparedParams params for prepared statements - PreparedParams PreparedParams + PreparedParams PreparedParams + LastUpdateTime4PC types.Time // ActiveRoles stores active roles for current user ActiveRoles []*auth.RoleIdentity @@ -1172,7 +1173,7 @@ func NewSessionVars() *SessionVars { SlowQueryFile: config.GetGlobalConfig().Log.SlowQueryFile, WaitSplitRegionFinish: DefTiDBWaitSplitRegionFinish, WaitSplitRegionTimeout: DefWaitSplitRegionTimeout, - enableIndexMerge: false, + enableIndexMerge: DefTiDBEnableIndexMerge, NoopFuncsMode: TiDBOptOnOffWarn(DefTiDBEnableNoopFuncs), replicaRead: kv.ReplicaReadLeader, AllowRemoveAutoInc: DefTiDBAllowRemoveAutoInc, diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 3491f28bc73dc..396c93bddba1b 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -537,7 +537,7 @@ var defaultSysVars = []*SysVar{ s.SetEnableCascadesPlanner(TiDBOptOn(val)) return nil }}, - {Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableIndexMerge, Value: Off, Type: TypeBool, SetSession: func(s *SessionVars, val string) error { + {Scope: ScopeGlobal | ScopeSession, Name: TiDBEnableIndexMerge, Value: BoolToOnOff(DefTiDBEnableIndexMerge), Type: TypeBool, SetSession: func(s *SessionVars, val string) error { s.SetEnableIndexMerge(TiDBOptOn(val)) return nil }}, diff --git a/sessionctx/variable/sysvar_test.go b/sessionctx/variable/sysvar_test.go index e15c9f92b92b8..91b3451f15ed9 100644 --- a/sessionctx/variable/sysvar_test.go +++ b/sessionctx/variable/sysvar_test.go @@ -825,3 +825,12 @@ func TestDefaultCharsetAndCollation(t *testing.T) { require.NoError(t, err) require.Equal(t, val, mysql.DefaultCollationName) } + +func TestIndexMergeSwitcher(t *testing.T) { + vars := NewSessionVars() + vars.GlobalVarsAccessor = NewMockGlobalAccessor4Tests() + val, err := GetSessionOrGlobalSystemVar(vars, TiDBEnableIndexMerge) + require.NoError(t, err) + require.Equal(t, DefTiDBEnableIndexMerge, true) + require.Equal(t, BoolToOnOff(DefTiDBEnableIndexMerge), val) +} diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index ccec9e1b5a8fc..7540000f4fc8b 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -779,6 +779,7 @@ const ( DefTiDBRegardNULLAsPoint = true DefEnablePlacementCheck = true DefTimestamp = "0" + DefTiDBEnableIndexMerge = true ) // Process global variables. diff --git a/table/tables/cache.go b/table/tables/cache.go index a31f5d0780854..7e3eb7c40b9a9 100644 --- a/table/tables/cache.go +++ b/table/tables/cache.go @@ -183,57 +183,31 @@ func (c *cachedTable) UpdateLockForRead(ctx context.Context, store kv.Storage, t } // AddRecord implements the AddRecord method for the table.Table interface. +func (c *cachedTable) AddRecord(sctx sessionctx.Context, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) { + txnCtxAddCachedTable(sctx, c.Meta().ID, c.handle) + return c.TableCommon.AddRecord(sctx, r, opts...) +} -func (c *cachedTable) AddRecord(ctx sessionctx.Context, r []types.Datum, opts ...table.AddRecordOption) (recordID kv.Handle, err error) { - txn, err := ctx.Txn(true) - if err != nil { - return nil, err +func txnCtxAddCachedTable(sctx sessionctx.Context, tid int64, handle StateRemote) { + txnCtx := sctx.GetSessionVars().TxnCtx + if txnCtx.CachedTables == nil { + txnCtx.CachedTables = make(map[int64]interface{}) } - now := txn.StartTS() - start := time.Now() - err = c.handle.LockForWrite(context.Background(), c.Meta().ID, leaseFromTS(now)) - if err != nil { - return nil, errors.Trace(err) + if _, ok := txnCtx.CachedTables[tid]; !ok { + txnCtx.CachedTables[tid] = handle } - ctx.GetSessionVars().StmtCtx.WaitLockLeaseTime += time.Since(start) - return c.TableCommon.AddRecord(ctx, r, opts...) - } // UpdateRecord implements table.Table func (c *cachedTable) UpdateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, oldData, newData []types.Datum, touched []bool) error { - - txn, err := sctx.Txn(true) - if err != nil { - return err - } - now := txn.StartTS() - start := time.Now() - err = c.handle.LockForWrite(ctx, c.Meta().ID, leaseFromTS(now)) - if err != nil { - return errors.Trace(err) - } - sctx.GetSessionVars().StmtCtx.WaitLockLeaseTime += time.Since(start) - + txnCtxAddCachedTable(sctx, c.Meta().ID, c.handle) return c.TableCommon.UpdateRecord(ctx, sctx, h, oldData, newData, touched) } // RemoveRecord implements table.Table RemoveRecord interface. - -func (c *cachedTable) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []types.Datum) error { - txn, err := ctx.Txn(true) - if err != nil { - return err - } - now := txn.StartTS() - start := time.Now() - err = c.handle.LockForWrite(context.Background(), c.Meta().ID, leaseFromTS(now)) - if err != nil { - return errors.Trace(err) - } - ctx.GetSessionVars().StmtCtx.WaitLockLeaseTime += time.Since(start) - return c.TableCommon.RemoveRecord(ctx, h, r) - +func (c *cachedTable) RemoveRecord(sctx sessionctx.Context, h kv.Handle, r []types.Datum) error { + txnCtxAddCachedTable(sctx, c.Meta().ID, c.handle) + return c.TableCommon.RemoveRecord(sctx, h, r) } func (c *cachedTable) renewLease(ts uint64, op RenewLeaseType, data *cacheData) func() { diff --git a/table/tables/state_remote.go b/table/tables/state_remote.go index 0138a01ab94e1..aeddd5b972ab2 100644 --- a/table/tables/state_remote.go +++ b/table/tables/state_remote.go @@ -67,9 +67,7 @@ type StateRemote interface { LockForRead(ctx context.Context, tid int64, lease uint64) (bool, error) // LockForWrite try to add a write lock to the table with the specified tableID - - LockForWrite(ctx context.Context, tid int64, lease uint64) error - + LockForWrite(ctx context.Context, tid int64) (uint64, error) // RenewLease attempt to renew the read / write lock on the table with the specified tableID RenewLease(ctx context.Context, tid int64, newTs uint64, op RenewLeaseType) (bool, error) @@ -134,33 +132,32 @@ func (h *stateRemoteHandle) LockForRead(ctx context.Context, tid int64, ts uint6 return succ, err } - -func (h *stateRemoteHandle) LockForWrite(ctx context.Context, tid int64, ts uint64) error { +// LockForWrite try to add a write lock to the table with the specified tableID, return the write lock lease. +func (h *stateRemoteHandle) LockForWrite(ctx context.Context, tid int64) (uint64, error) { h.Lock() defer h.Unlock() + var ret uint64 for { - waitAndRetry, err := h.lockForWriteOnce(ctx, tid, ts) + waitAndRetry, lease, err := h.lockForWriteOnce(ctx, tid) if err != nil { - return err + return 0, err } if waitAndRetry == 0 { - + ret = lease break } time.Sleep(waitAndRetry) } - - return nil + return ret, nil } -func (h *stateRemoteHandle) lockForWriteOnce(ctx context.Context, tid int64, ts uint64) (waitAndRetry time.Duration, err error) { - +func (h *stateRemoteHandle) lockForWriteOnce(ctx context.Context, tid int64) (waitAndRetry time.Duration, ts uint64, err error) { err = h.runInTxn(ctx, func(ctx context.Context, now uint64) error { lockType, lease, oldReadLease, err := h.loadRow(ctx, tid) if err != nil { return errors.Trace(err) } - + ts = leaseFromTS(now) // The lease is outdated, so lock is invalid, clear orphan lock of any kind. if now > lease { if err := h.updateRow(ctx, tid, "WRITE", ts); err != nil { @@ -221,38 +218,69 @@ func (h *stateRemoteHandle) RenewLease(ctx context.Context, tid int64, newLease h.Lock() defer h.Unlock() + switch op { + case RenewReadLease: + return h.renewReadLease(ctx, tid, newLease) + case RenewWriteLease: + return h.renewWriteLease(ctx, tid, newLease) + } + return false, errors.New("wrong renew lease type") +} +func (h *stateRemoteHandle) renewReadLease(ctx context.Context, tid int64, newLease uint64) (bool, error) { var succ bool - if op == RenewReadLease { - err := h.runInTxn(ctx, func(ctx context.Context, now uint64) error { - lockType, oldLease, _, err := h.loadRow(ctx, tid) + err := h.runInTxn(ctx, func(ctx context.Context, now uint64) error { + lockType, oldLease, _, err := h.loadRow(ctx, tid) + if err != nil { + return errors.Trace(err) + } + if now >= oldLease { + // read lock had already expired, fail to renew + return nil + } + if lockType != CachedTableLockRead { + // Not read lock, fail to renew + return nil + } + + if newLease > oldLease { // lease should never decrease! + err = h.updateRow(ctx, tid, "READ", newLease) if err != nil { return errors.Trace(err) } - if now >= oldLease { - // read lock had already expired, fail to renew - return nil - } - if lockType != CachedTableLockRead { - // Not read lock, fail to renew - return nil - } + } + succ = true + return nil + }) + return succ, err +} - if newLease > oldLease { // lease should never decrease! - err = h.updateRow(ctx, tid, "READ", newLease) - if err != nil { - return errors.Trace(err) - } - } - succ = true +func (h *stateRemoteHandle) renewWriteLease(ctx context.Context, tid int64, newLease uint64) (bool, error) { + var succ bool + err := h.runInTxn(ctx, func(ctx context.Context, now uint64) error { + lockType, oldLease, _, err := h.loadRow(ctx, tid) + if err != nil { + return errors.Trace(err) + } + if now >= oldLease { + // write lock had already expired, fail to renew return nil - }) - return succ, err - } - - // TODO: renew for write lease - return false, errors.New("not implement yet") + } + if lockType != CachedTableLockWrite { + // Not write lock, fail to renew + return nil + } + if newLease > oldLease { // lease should never decrease! + err = h.updateRow(ctx, tid, "WRITE", newLease) + if err != nil { + return errors.Trace(err) + } + } + succ = true + return nil + }) + return succ, err } func (h *stateRemoteHandle) beginTxn(ctx context.Context) error {