diff --git a/DEPS.bzl b/DEPS.bzl index aa5579905cc9e..e606164a43f02 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -530,8 +530,8 @@ def go_deps(): name = "com_github_cloudfoundry_gosigar", build_file_proto_mode = "disable", importpath = "github.com/cloudfoundry/gosigar", - sum = "h1:T3MoGdugg1vdHn8Az7wDn7cZ4+QCjZph+eXf2CjSjo4=", - version = "v1.3.4", + sum = "h1:gIc08FbB3QPb+nAQhINIK/qhf5REKkY0FTGgRGXkcVc=", + version = "v1.3.6", ) go_repository( @@ -1120,8 +1120,8 @@ def go_deps(): name = "com_github_fsnotify_fsnotify", build_file_proto_mode = "disable_global", importpath = "github.com/fsnotify/fsnotify", - sum = "h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI=", - version = "v1.5.4", + sum = "h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=", + version = "v1.6.0", ) go_repository( name = "com_github_fsouza_fake_gcs_server", @@ -3118,8 +3118,8 @@ def go_deps(): name = "com_github_nxadm_tail", build_file_proto_mode = "disable_global", importpath = "github.com/nxadm/tail", - sum = "h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=", - version = "v1.4.4", + sum = "h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=", + version = "v1.4.8", ) go_repository( name = "com_github_oklog_run", diff --git a/br/pkg/restore/data.go b/br/pkg/restore/data.go index d4254c60adbff..265126b9411af 100644 --- a/br/pkg/restore/data.go +++ b/br/pkg/restore/data.go @@ -302,22 +302,29 @@ func (recovery *Recovery) WaitApply(ctx context.Context) (err error) { // prepare the region for flashback the data, the purpose is to stop region service, put region in flashback state func (recovery *Recovery) PrepareFlashbackToVersion(ctx context.Context, resolveTS uint64, startTS uint64) (err error) { - handler := func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) { - stats, err := ddl.SendPrepareFlashbackToVersionRPC(ctx, recovery.mgr.GetStorage().(tikv.Storage), resolveTS, startTS, r) - return stats, err - } + retryErr := utils.WithRetry( + ctx, + func() error { + handler := func(ctx context.Context, r tikvstore.KeyRange) (rangetask.TaskStat, error) { + stats, err := ddl.SendPrepareFlashbackToVersionRPC(ctx, recovery.mgr.GetStorage().(tikv.Storage), resolveTS, startTS, r) + return stats, err + } - runner := rangetask.NewRangeTaskRunner("br-flashback-prepare-runner", recovery.mgr.GetStorage().(tikv.Storage), int(recovery.concurrency), handler) - // Run prepare flashback on the entire TiKV cluster. Empty keys means the range is unbounded. - err = runner.RunOnRange(ctx, []byte(""), []byte("")) - if err != nil { - log.Error("region flashback prepare get error") - return errors.Trace(err) - } - recovery.progress.Inc() - log.Info("region flashback prepare complete", zap.Int("regions", runner.CompletedRegions())) + runner := rangetask.NewRangeTaskRunner("br-flashback-prepare-runner", recovery.mgr.GetStorage().(tikv.Storage), int(recovery.concurrency), handler) + // Run prepare flashback on the entire TiKV cluster. Empty keys means the range is unbounded. + err = runner.RunOnRange(ctx, []byte(""), []byte("")) + if err != nil { + log.Warn("region flashback prepare get error") + return errors.Trace(err) + } + log.Info("region flashback prepare complete", zap.Int("regions", runner.CompletedRegions())) + return nil + }, + utils.NewFlashBackBackoffer(), + ) - return nil + recovery.progress.Inc() + return retryErr } // flashback the region data to version resolveTS diff --git a/br/pkg/utils/backoff.go b/br/pkg/utils/backoff.go index 5353c972d24ad..bff2490b56650 100644 --- a/br/pkg/utils/backoff.go +++ b/br/pkg/utils/backoff.go @@ -33,6 +33,11 @@ const ( resetTSRetryTimeExt = 600 resetTSWaitIntervalExt = 500 * time.Millisecond resetTSMaxWaitIntervalExt = 300 * time.Second + + // region heartbeat are 10 seconds by default, if some region has 2 heartbeat missing (15 seconds), it appear to be a network issue between PD and TiKV. + flashbackRetryTime = 3 + flashbackWaitInterval = 3000 * time.Millisecond + flashbackMaxWaitInterval = 15 * time.Second ) // RetryState is the mutable state needed for retrying. @@ -204,3 +209,34 @@ func (bo *pdReqBackoffer) NextBackoff(err error) time.Duration { func (bo *pdReqBackoffer) Attempt() int { return bo.attempt } + +type flashbackBackoffer struct { + attempt int + delayTime time.Duration + maxDelayTime time.Duration +} + +// NewBackoffer creates a new controller regulating a truncated exponential backoff. +func NewFlashBackBackoffer() Backoffer { + return &flashbackBackoffer{ + attempt: flashbackRetryTime, + delayTime: flashbackWaitInterval, + maxDelayTime: flashbackMaxWaitInterval, + } +} + +// retry 3 times when prepare flashback failure. +func (bo *flashbackBackoffer) NextBackoff(err error) time.Duration { + bo.delayTime = 2 * bo.delayTime + bo.attempt-- + log.Warn("region may not ready to serve, retry it...", zap.Error(err)) + + if bo.delayTime > bo.maxDelayTime { + return bo.maxDelayTime + } + return bo.delayTime +} + +func (bo *flashbackBackoffer) Attempt() int { + return bo.attempt +} diff --git a/ddl/BUILD.bazel b/ddl/BUILD.bazel index a117b052d0651..48dfa40426b6a 100644 --- a/ddl/BUILD.bazel +++ b/ddl/BUILD.bazel @@ -110,6 +110,7 @@ go_library( "//util/filter", "//util/gcutil", "//util/generic", + "//util/gpool", "//util/gpool/spmc", "//util/hack", "//util/intest", diff --git a/ddl/dist_backfilling.go b/ddl/dist_backfilling.go index 79b1250d0e98c..53d1241444209 100644 --- a/ddl/dist_backfilling.go +++ b/ddl/dist_backfilling.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/dbterror" + "github.com/pingcap/tidb/util/gpool" "github.com/pingcap/tidb/util/gpool/spmc" "github.com/pingcap/tidb/util/logutil" "go.uber.org/zap" @@ -258,7 +259,7 @@ func GetTasks(d *ddlCtx, sess *session, tbl table.Table, runningJobID int64, con // TODO: add test: if all tidbs can't get the unmark backfill job(a tidb mark a backfill job, other tidbs returned, then the tidb can't handle this job.) if dbterror.ErrDDLJobNotFound.Equal(err) { logutil.BgLogger().Info("no backfill job, handle backfill task finished") - return nil, err + return nil, gpool.ErrProducerClosed } if kv.ErrWriteConflict.Equal(err) { logutil.BgLogger().Info("GetAndMarkBackfillJobsForOneEle failed", zap.Error(err)) diff --git a/distsql/BUILD.bazel b/distsql/BUILD.bazel index 5839f55fbc52c..d737718a2e084 100644 --- a/distsql/BUILD.bazel +++ b/distsql/BUILD.bazel @@ -34,8 +34,8 @@ go_library( "//util/logutil", "//util/memory", "//util/ranger", + "//util/tracing", "//util/trxevents", - "@com_github_opentracing_opentracing_go//:opentracing-go", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/metapb", diff --git a/distsql/distsql.go b/distsql/distsql.go index 2c5681e37824e..defbeb85ae74f 100644 --- a/distsql/distsql.go +++ b/distsql/distsql.go @@ -19,7 +19,6 @@ import ( "strconv" "unsafe" - "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/kv" @@ -31,6 +30,7 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" + "github.com/pingcap/tidb/util/tracing" "github.com/pingcap/tidb/util/trxevents" "github.com/pingcap/tipb/go-tipb" "github.com/tikv/client-go/v2/tikvrpc/interceptor" @@ -64,11 +64,8 @@ func DispatchMPPTasks(ctx context.Context, sctx sessionctx.Context, tasks []*kv. // Select sends a DAG request, returns SelectResult. // In kvReq, KeyRanges is required, Concurrency/KeepOrder/Desc/IsolationLevel/Priority are optional. func Select(ctx context.Context, sctx sessionctx.Context, kvReq *kv.Request, fieldTypes []*types.FieldType, fb *statistics.QueryFeedback) (SelectResult, error) { - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("distsql.Select", opentracing.ChildOf(span.Context())) - defer span1.Finish() - ctx = opentracing.ContextWithSpan(ctx, span1) - } + r, ctx := tracing.StartRegionEx(ctx, "distsql.Select") + defer r.End() // For testing purpose. if hook := ctx.Value("CheckSelectRequestHook"); hook != nil { diff --git a/distsql/request_builder.go b/distsql/request_builder.go index 439c6ecd8e7fe..8d0fae3a464f1 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -159,6 +159,7 @@ func (builder *RequestBuilder) SetDAGRequest(dag *tipb.DAGRequest) *RequestBuild if limit != nil && limit.Limit < estimatedRegionRowCount { builder.Request.Concurrency = 1 } + builder.Request.LimitSize = limit.GetLimit() } return builder } diff --git a/distsql/request_builder_test.go b/distsql/request_builder_test.go index 74bdf723216f1..1de6b11dd5244 100644 --- a/distsql/request_builder_test.go +++ b/distsql/request_builder_test.go @@ -676,6 +676,7 @@ func TestScanLimitConcurrency(t *testing.T) { Build() require.NoError(t, err) require.Equal(t, tt.concurrency, actual.Concurrency) + require.Equal(t, actual.LimitSize, tt.limit) }) } } diff --git a/domain/plan_replayer_dump.go b/domain/plan_replayer_dump.go index 5559dd3915b52..c656cb11d56e1 100644 --- a/domain/plan_replayer_dump.go +++ b/domain/plan_replayer_dump.go @@ -108,12 +108,14 @@ func (tne *tableNameExtractor) Leave(in ast.Node) (ast.Node, bool) { tne.err = err return in, true } - tp := tableNamePair{DBName: t.Schema.L, TableName: t.Name.L, IsView: isView} - if tp.DBName == "" { - tp.DBName = tne.curDB.L - } - if _, ok := tne.names[tp]; !ok { - tne.names[tp] = struct{}{} + if t.TableInfo != nil { + tp := tableNamePair{DBName: t.Schema.L, TableName: t.Name.L, IsView: isView} + if tp.DBName == "" { + tp.DBName = tne.curDB.L + } + if _, ok := tne.names[tp]; !ok { + tne.names[tp] = struct{}{} + } } } else if s, ok := in.(*ast.SelectStmt); ok { if s.With != nil && len(s.With.CTEs) > 0 { diff --git a/executor/BUILD.bazel b/executor/BUILD.bazel index 4eaeffdfb3c4f..56c6805fdef21 100644 --- a/executor/BUILD.bazel +++ b/executor/BUILD.bazel @@ -200,6 +200,7 @@ go_library( "//util/tls", "//util/topsql", "//util/topsql/state", + "//util/tracing", "@com_github_burntsushi_toml//:toml", "@com_github_gogo_protobuf//proto", "@com_github_ngaut_pools//:pools", diff --git a/executor/adapter.go b/executor/adapter.go index 145e1938216c2..33b321f60382e 100644 --- a/executor/adapter.go +++ b/executor/adapter.go @@ -25,7 +25,6 @@ import ( "sync/atomic" "time" - "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" @@ -65,6 +64,7 @@ import ( "github.com/pingcap/tidb/util/stringutil" "github.com/pingcap/tidb/util/topsql" topsqlstate "github.com/pingcap/tidb/util/topsql/state" + "github.com/pingcap/tidb/util/tracing" "github.com/prometheus/client_golang/prometheus" tikverr "github.com/tikv/client-go/v2/error" "github.com/tikv/client-go/v2/oracle" @@ -283,12 +283,12 @@ func (a *ExecStmt) GetStmtNode() ast.StmtNode { // PointGet short path for point exec directly from plan, keep only necessary steps func (a *ExecStmt) PointGet(ctx context.Context) (*recordSet, error) { - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("ExecStmt.PointGet", opentracing.ChildOf(span.Context())) - span1.LogKV("sql", a.OriginText()) - defer span1.Finish() - ctx = opentracing.ContextWithSpan(ctx, span1) + r, ctx := tracing.StartRegionEx(ctx, "ExecStmt.PointGet") + defer r.End() + if r.Span != nil { + r.Span.LogKV("sql", a.OriginText()) } + failpoint.Inject("assertTxnManagerInShortPointGetPlan", func() { sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerInShortPointGetPlan", true) // stale read should not reach here @@ -921,11 +921,8 @@ func (a *ExecStmt) runPessimisticSelectForUpdate(ctx context.Context, e Executor func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, e Executor) (sqlexec.RecordSet, error) { sctx := a.Ctx - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("executor.handleNoDelayExecutor", opentracing.ChildOf(span.Context())) - defer span1.Finish() - ctx = opentracing.ContextWithSpan(ctx, span1) - } + r, ctx := tracing.StartRegionEx(ctx, "executor.handleNoDelayExecutor") + defer r.End() var err error defer func() { diff --git a/executor/compiler.go b/executor/compiler.go index 9f089eed9bae0..29c024a9991e6 100644 --- a/executor/compiler.go +++ b/executor/compiler.go @@ -18,7 +18,6 @@ import ( "context" "strings" - "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/config" @@ -32,6 +31,7 @@ import ( "github.com/pingcap/tidb/sessiontxn/staleread" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" + "github.com/pingcap/tidb/util/tracing" "go.uber.org/zap" ) @@ -56,11 +56,9 @@ type Compiler struct { // Compile compiles an ast.StmtNode to a physical plan. func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (_ *ExecStmt, err error) { - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("executor.Compile", opentracing.ChildOf(span.Context())) - defer span1.Finish() - ctx = opentracing.ContextWithSpan(ctx, span1) - } + r, ctx := tracing.StartRegionEx(ctx, "executor.Compile") + defer r.End() + defer func() { r := recover() if r == nil { diff --git a/executor/executor.go b/executor/executor.go index c2fcdaa2d7887..e02abeed16d75 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -19,7 +19,6 @@ import ( "fmt" "math" "runtime/pprof" - "runtime/trace" "strconv" "strings" "sync" @@ -68,6 +67,7 @@ import ( "github.com/pingcap/tidb/util/resourcegrouptag" "github.com/pingcap/tidb/util/topsql" topsqlstate "github.com/pingcap/tidb/util/topsql/state" + "github.com/pingcap/tidb/util/tracing" tikverr "github.com/tikv/client-go/v2/error" tikvstore "github.com/tikv/client-go/v2/kv" tikvutil "github.com/tikv/client-go/v2/util" @@ -314,14 +314,10 @@ func Next(ctx context.Context, e Executor, req *chunk.Chunk) error { if atomic.LoadUint32(&sessVars.Killed) == 1 { return ErrQueryInterrupted } - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan(fmt.Sprintf("%T.Next", e), opentracing.ChildOf(span.Context())) - defer span1.Finish() - ctx = opentracing.ContextWithSpan(ctx, span1) - } - if trace.IsEnabled() { - defer trace.StartRegion(ctx, fmt.Sprintf("%T.Next", e)).End() - } + + r, ctx := tracing.StartRegionEx(ctx, fmt.Sprintf("%T.Next", e)) + defer r.End() + if topsqlstate.TopSQLEnabled() && sessVars.StmtCtx.IsSQLAndPlanRegistered.CompareAndSwap(false, true) { registerSQLAndPlanInExecForTopSQL(sessVars) } @@ -1527,11 +1523,8 @@ func init() { s.RewritePhaseInfo.DurationPreprocessSubQuery += time.Since(begin) }(time.Now()) - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("executor.EvalSubQuery", opentracing.ChildOf(span.Context())) - defer span1.Finish() - ctx = opentracing.ContextWithSpan(ctx, span1) - } + r, ctx := tracing.StartRegionEx(ctx, "executor.EvalSubQuery") + defer r.End() e := newExecutorBuilder(sctx, is, nil) exec := e.build(p) diff --git a/executor/executor_test.go b/executor/executor_test.go index 422956d045abe..b426a42ab7014 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -6398,3 +6398,134 @@ func TestIssue39211(t *testing.T) { tk.MustExec("set @@tidb_enable_null_aware_anti_join=true;") tk.MustQuery("select * from t where (a,b) not in (select a, b from s);").Check(testkit.Rows()) } + +func TestPlanReplayerDumpTPCDS(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec(`create table catalog_sales +( + cs_sold_date_sk int , + cs_sold_time_sk int , + cs_ship_date_sk int , + cs_bill_customer_sk int , + cs_bill_cdemo_sk int , + cs_bill_hdemo_sk int , + cs_bill_addr_sk int , + cs_ship_customer_sk int , + cs_ship_cdemo_sk int , + cs_ship_hdemo_sk int , + cs_ship_addr_sk int , + cs_call_center_sk int , + cs_catalog_page_sk int , + cs_ship_mode_sk int , + cs_warehouse_sk int , + cs_item_sk int not null, + cs_promo_sk int , + cs_order_number int not null, + cs_quantity int , + cs_wholesale_cost decimal(7,2) , + cs_list_price decimal(7,2) , + cs_sales_price decimal(7,2) , + cs_ext_discount_amt decimal(7,2) , + cs_ext_sales_price decimal(7,2) , + cs_ext_wholesale_cost decimal(7,2) , + cs_ext_list_price decimal(7,2) , + cs_ext_tax decimal(7,2) , + cs_coupon_amt decimal(7,2) , + cs_ext_ship_cost decimal(7,2) , + cs_net_paid decimal(7,2) , + cs_net_paid_inc_tax decimal(7,2) , + cs_net_paid_inc_ship decimal(7,2) , + cs_net_paid_inc_ship_tax decimal(7,2) , + cs_net_profit decimal(7,2) , + primary key (cs_item_sk, cs_order_number) +);`) + tk.MustExec(`create table store_sales +( + ss_sold_date_sk int , + ss_sold_time_sk int , + ss_item_sk int not null, + ss_customer_sk int , + ss_cdemo_sk int , + ss_hdemo_sk int , + ss_addr_sk int , + ss_store_sk int , + ss_promo_sk int , + ss_ticket_number int not null, + ss_quantity int , + ss_wholesale_cost decimal(7,2) , + ss_list_price decimal(7,2) , + ss_sales_price decimal(7,2) , + ss_ext_discount_amt decimal(7,2) , + ss_ext_sales_price decimal(7,2) , + ss_ext_wholesale_cost decimal(7,2) , + ss_ext_list_price decimal(7,2) , + ss_ext_tax decimal(7,2) , + ss_coupon_amt decimal(7,2) , + ss_net_paid decimal(7,2) , + ss_net_paid_inc_tax decimal(7,2) , + ss_net_profit decimal(7,2) , + primary key (ss_item_sk, ss_ticket_number) +);`) + tk.MustExec(`create table date_dim +( + d_date_sk int not null, + d_date_id char(16) not null, + d_date date , + d_month_seq int , + d_week_seq int , + d_quarter_seq int , + d_year int , + d_dow int , + d_moy int , + d_dom int , + d_qoy int , + d_fy_year int , + d_fy_quarter_seq int , + d_fy_week_seq int , + d_day_name char(9) , + d_quarter_name char(6) , + d_holiday char(1) , + d_weekend char(1) , + d_following_holiday char(1) , + d_first_dom int , + d_last_dom int , + d_same_day_ly int , + d_same_day_lq int , + d_current_day char(1) , + d_current_week char(1) , + d_current_month char(1) , + d_current_quarter char(1) , + d_current_year char(1) , + primary key (d_date_sk) +);`) + tk.MustQuery(`plan replayer dump explain with ssci as ( +select ss_customer_sk customer_sk + ,ss_item_sk item_sk +from store_sales,date_dim +where ss_sold_date_sk = d_date_sk + and d_month_seq between 1212 and 1212 + 11 +group by ss_customer_sk + ,ss_item_sk), +csci as( + select cs_bill_customer_sk customer_sk + ,cs_item_sk item_sk +from catalog_sales,date_dim +where cs_sold_date_sk = d_date_sk + and d_month_seq between 1212 and 1212 + 11 +group by cs_bill_customer_sk + ,cs_item_sk) + select sum(case when ssci.customer_sk is not null and csci.customer_sk is null then 1 else 0 end) store_only + ,sum(case when ssci.customer_sk is null and csci.customer_sk is not null then 1 else 0 end) catalog_only + ,sum(case when ssci.customer_sk is not null and csci.customer_sk is not null then 1 else 0 end) store_and_catalog +from ssci left join csci on (ssci.customer_sk=csci.customer_sk + and ssci.item_sk = csci.item_sk) +UNION + select sum(case when ssci.customer_sk is not null and csci.customer_sk is null then 1 else 0 end) store_only + ,sum(case when ssci.customer_sk is null and csci.customer_sk is not null then 1 else 0 end) catalog_only + ,sum(case when ssci.customer_sk is not null and csci.customer_sk is not null then 1 else 0 end) store_and_catalog +from ssci right join csci on (ssci.customer_sk=csci.customer_sk + and ssci.item_sk = csci.item_sk) +limit 100;`) +} diff --git a/executor/index_merge_reader.go b/executor/index_merge_reader.go index 509500d597947..c4da6edc9f5cb 100644 --- a/executor/index_merge_reader.go +++ b/executor/index_merge_reader.go @@ -408,7 +408,7 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context, } } }, - handleWorkerPanic(ctx, e.finished, fetchCh, nil, "partialIndexWorker"), + handleWorkerPanic(ctx, e.finished, fetchCh, nil, partialIndexWorkerType), ) }() @@ -516,7 +516,7 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context, } } }, - handleWorkerPanic(ctx, e.finished, fetchCh, nil, "partialTableWorker"), + handleWorkerPanic(ctx, e.finished, fetchCh, nil, partialTableWorkerType), ) }() return nil diff --git a/executor/insert.go b/executor/insert.go index 1a4eb27d3c6f6..9d958d5d8bd2f 100644 --- a/executor/insert.go +++ b/executor/insert.go @@ -21,7 +21,6 @@ import ( "runtime/trace" "time" - "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/expression" @@ -37,6 +36,7 @@ import ( "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/stringutil" + "github.com/pingcap/tidb/util/tracing" "go.uber.org/zap" ) @@ -116,11 +116,8 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error { } func prefetchUniqueIndices(ctx context.Context, txn kv.Transaction, rows []toBeCheckedRow) (map[string][]byte, error) { - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("prefetchUniqueIndices", opentracing.ChildOf(span.Context())) - defer span1.Finish() - ctx = opentracing.ContextWithSpan(ctx, span1) - } + r, ctx := tracing.StartRegionEx(ctx, "prefetchUniqueIndices") + defer r.End() nKeys := 0 for _, r := range rows { @@ -148,11 +145,8 @@ func prefetchUniqueIndices(ctx context.Context, txn kv.Transaction, rows []toBeC } func prefetchConflictedOldRows(ctx context.Context, txn kv.Transaction, rows []toBeCheckedRow, values map[string][]byte) error { - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("prefetchConflictedOldRows", opentracing.ChildOf(span.Context())) - defer span1.Finish() - ctx = opentracing.ContextWithSpan(ctx, span1) - } + r, ctx := tracing.StartRegionEx(ctx, "prefetchConflictedOldRows") + defer r.End() batchKeys := make([]kv.Key, 0, len(rows)) for _, r := range rows { @@ -182,11 +176,7 @@ func (e *InsertValues) prefetchDataCache(ctx context.Context, txn kv.Transaction return nil } - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("prefetchDataCache", opentracing.ChildOf(span.Context())) - defer span1.Finish() - ctx = opentracing.ContextWithSpan(ctx, span1) - } + defer tracing.StartRegion(ctx, "prefetchDataCache").End() values, err := prefetchUniqueIndices(ctx, txn, rows) if err != nil { return err diff --git a/executor/insert_common.go b/executor/insert_common.go index 21b78b878028b..87938ff9af27b 100644 --- a/executor/insert_common.go +++ b/executor/insert_common.go @@ -22,7 +22,6 @@ import ( "sync" "time" - "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/tidb/ddl" "github.com/pingcap/tidb/expression" @@ -46,6 +45,7 @@ import ( "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" + "github.com/pingcap/tidb/util/tracing" "github.com/tikv/client-go/v2/txnkv/txnsnapshot" "go.uber.org/zap" ) @@ -1142,11 +1142,7 @@ func (e *InsertValues) batchCheckAndInsert(ctx context.Context, rows [][]types.D replace bool) error { // all the rows will be checked, so it is safe to set BatchCheck = true e.ctx.GetSessionVars().StmtCtx.BatchCheck = true - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("InsertValues.batchCheckAndInsert", opentracing.ChildOf(span.Context())) - defer span1.Finish() - opentracing.ContextWithSpan(ctx, span1) - } + defer tracing.StartRegion(ctx, "InsertValues.batchCheckAndInsert").End() start := time.Now() // Get keys need to be checked. toBeCheckedRows, err := getKeysNeedCheck(ctx, e.ctx, e.Table, rows) diff --git a/executor/mem_reader.go b/executor/mem_reader.go index 647d785caeafc..91bc6140b1295 100644 --- a/executor/mem_reader.go +++ b/executor/mem_reader.go @@ -17,7 +17,6 @@ package executor import ( "context" - "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/expression" @@ -34,6 +33,7 @@ import ( "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/rowcodec" + "github.com/pingcap/tidb/util/tracing" ) type memReader interface { @@ -65,11 +65,7 @@ type memIndexReader struct { } func buildMemIndexReader(ctx context.Context, us *UnionScanExec, idxReader *IndexReaderExecutor) *memIndexReader { - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("buildMemIndexReader", opentracing.ChildOf(span.Context())) - defer span1.Finish() - opentracing.ContextWithSpan(ctx, span1) - } + defer tracing.StartRegion(ctx, "buildMemIndexReader").End() kvRanges := idxReader.kvRanges outputOffset := make([]int, 0, len(us.columns)) for _, col := range idxReader.outputColumns { @@ -90,11 +86,7 @@ func buildMemIndexReader(ctx context.Context, us *UnionScanExec, idxReader *Inde } func (m *memIndexReader) getMemRows(ctx context.Context) ([][]types.Datum, error) { - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("memIndexReader.getMemRows", opentracing.ChildOf(span.Context())) - defer span1.Finish() - opentracing.ContextWithSpan(ctx, span1) - } + defer tracing.StartRegion(ctx, "memIndexReader.getMemRows").End() tps := make([]*types.FieldType, 0, len(m.index.Columns)+1) cols := m.table.Columns for _, col := range m.index.Columns { @@ -190,11 +182,7 @@ type allocBuf struct { } func buildMemTableReader(ctx context.Context, us *UnionScanExec, tblReader *TableReaderExecutor) *memTableReader { - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("buildMemTableReader", opentracing.ChildOf(span.Context())) - defer span1.Finish() - opentracing.ContextWithSpan(ctx, span1) - } + defer tracing.StartRegion(ctx, "buildMemTableReader").End() colIDs := make(map[int64]int, len(us.columns)) for i, col := range us.columns { colIDs[col.ID] = i @@ -235,11 +223,7 @@ func buildMemTableReader(ctx context.Context, us *UnionScanExec, tblReader *Tabl // TODO: Try to make memXXXReader lazy, There is no need to decode many rows when parent operator only need 1 row. func (m *memTableReader) getMemRows(ctx context.Context) ([][]types.Datum, error) { - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("memTableReader.getMemRows", opentracing.ChildOf(span.Context())) - defer span1.Finish() - opentracing.ContextWithSpan(ctx, span1) - } + defer tracing.StartRegion(ctx, "memTableReader.getMemRows").End() mutableRow := chunk.MutRowFromTypes(m.retFieldTypes) resultRows := make([]types.Datum, len(m.columns)) m.offsets = make([]int, len(m.columns)) @@ -490,11 +474,8 @@ type memIndexLookUpReader struct { } func buildMemIndexLookUpReader(ctx context.Context, us *UnionScanExec, idxLookUpReader *IndexLookUpExecutor) *memIndexLookUpReader { - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("buildMemIndexLookUpReader", opentracing.ChildOf(span.Context())) - defer span1.Finish() - opentracing.ContextWithSpan(ctx, span1) - } + defer tracing.StartRegion(ctx, "buildMemIndexLookUpReader").End() + kvRanges := idxLookUpReader.kvRanges outputOffset := []int{len(idxLookUpReader.index.Columns)} memIdxReader := &memIndexReader{ @@ -527,11 +508,9 @@ func buildMemIndexLookUpReader(ctx context.Context, us *UnionScanExec, idxLookUp } func (m *memIndexLookUpReader) getMemRows(ctx context.Context) ([][]types.Datum, error) { - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("memIndexLookUpReader.getMemRows", opentracing.ChildOf(span.Context())) - defer span1.Finish() - ctx = opentracing.ContextWithSpan(ctx, span1) - } + r, ctx := tracing.StartRegionEx(ctx, "memIndexLookUpReader.getMemRows") + defer r.End() + kvRanges := [][]kv.KeyRange{m.idxReader.kvRanges} tbls := []table.Table{m.table} if m.partitionMode { @@ -604,11 +583,7 @@ type memIndexMergeReader struct { } func buildMemIndexMergeReader(ctx context.Context, us *UnionScanExec, indexMergeReader *IndexMergeReaderExecutor) *memIndexMergeReader { - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("buildMemIndexMergeReader", opentracing.ChildOf(span.Context())) - defer span1.Finish() - opentracing.ContextWithSpan(ctx, span1) - } + defer tracing.StartRegion(ctx, "buildMemIndexMergeReader").End() indexCount := len(indexMergeReader.indexes) memReaders := make([]memReader, 0, indexCount) for i := 0; i < indexCount; i++ { @@ -661,11 +636,8 @@ func buildMemIndexMergeReader(ctx context.Context, us *UnionScanExec, indexMerge } func (m *memIndexMergeReader) getMemRows(ctx context.Context) ([][]types.Datum, error) { - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("memIndexMergeReader.getMemRows", opentracing.ChildOf(span.Context())) - defer span1.Finish() - ctx = opentracing.ContextWithSpan(ctx, span1) - } + r, ctx := tracing.StartRegionEx(ctx, "memIndexMergeReader.getMemRows") + defer r.End() tbls := []table.Table{m.table} // [partNum][indexNum][rangeNum] var kvRanges [][][]kv.KeyRange diff --git a/executor/sample.go b/executor/sample.go index e7eb9bd223639..7f64c365d599e 100644 --- a/executor/sample.go +++ b/executor/sample.go @@ -17,7 +17,6 @@ package executor import ( "context" - "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" @@ -28,6 +27,7 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" decoder "github.com/pingcap/tidb/util/rowDecoder" + "github.com/pingcap/tidb/util/tracing" "github.com/tikv/client-go/v2/tikv" "golang.org/x/exp/slices" ) @@ -47,10 +47,7 @@ type TableSampleExecutor struct { // Open initializes necessary variables for using this executor. func (e *TableSampleExecutor) Open(ctx context.Context) error { - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("TableSampleExecutor.Open", opentracing.ChildOf(span.Context())) - defer span1.Finish() - } + defer tracing.StartRegion(ctx, "TableSampleExecutor.Open").End() return nil } diff --git a/executor/table_reader.go b/executor/table_reader.go index 984212dcf7328..3e29dfe27b053 100644 --- a/executor/table_reader.go +++ b/executor/table_reader.go @@ -19,7 +19,6 @@ import ( "context" "time" - "github.com/opentracing/opentracing-go" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/distsql" "github.com/pingcap/tidb/domain" @@ -39,6 +38,7 @@ import ( "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/ranger" "github.com/pingcap/tidb/util/stringutil" + "github.com/pingcap/tidb/util/tracing" "github.com/pingcap/tipb/go-tipb" "golang.org/x/exp/slices" ) @@ -135,11 +135,8 @@ func (e *TableReaderExecutor) setDummy() { // Open initializes necessary variables for using this executor. func (e *TableReaderExecutor) Open(ctx context.Context) error { - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("TableReaderExecutor.Open", opentracing.ChildOf(span.Context())) - defer span1.Finish() - ctx = opentracing.ContextWithSpan(ctx, span1) - } + r, ctx := tracing.StartRegionEx(ctx, "TableReaderExecutor.Open") + defer r.End() failpoint.Inject("mockSleepInTableReaderNext", func(v failpoint.Value) { ms := v.(int) time.Sleep(time.Millisecond * time.Duration(ms)) diff --git a/executor/union_scan.go b/executor/union_scan.go index a23cd8b8c7873..f3a2b82c70812 100644 --- a/executor/union_scan.go +++ b/executor/union_scan.go @@ -19,7 +19,6 @@ import ( "fmt" "runtime/trace" - "github.com/opentracing/opentracing-go" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" @@ -30,6 +29,7 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/collate" + "github.com/pingcap/tidb/util/tracing" ) // UnionScanExec merges the rows from dirty table and the rows from distsql request. @@ -71,11 +71,9 @@ type UnionScanExec struct { // Open implements the Executor Open interface. func (us *UnionScanExec) Open(ctx context.Context) error { - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("UnionScanExec.Open", opentracing.ChildOf(span.Context())) - defer span1.Finish() - ctx = opentracing.ContextWithSpan(ctx, span1) - } + r, ctx := tracing.StartRegionEx(ctx, "UnionScanExec.Open") + defer r.End() + if err := us.baseExecutor.Open(ctx); err != nil { return err } diff --git a/executor/write.go b/executor/write.go index 363bb097fd02c..10e8b25a53c16 100644 --- a/executor/write.go +++ b/executor/write.go @@ -18,7 +18,6 @@ import ( "context" "strings" - "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/expression" @@ -34,6 +33,7 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/collate" "github.com/pingcap/tidb/util/memory" + "github.com/pingcap/tidb/util/tracing" ) var ( @@ -52,11 +52,9 @@ var ( // 2. err (error) : error in the update. func updateRecord(ctx context.Context, sctx sessionctx.Context, h kv.Handle, oldData, newData []types.Datum, modified []bool, t table.Table, onDup bool, memTracker *memory.Tracker, fkChecks []*FKCheckExec, fkCascades []*FKCascadeExec) (bool, error) { - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("executor.updateRecord", opentracing.ChildOf(span.Context())) - defer span1.Finish() - ctx = opentracing.ContextWithSpan(ctx, span1) - } + r, ctx := tracing.StartRegionEx(ctx, "executor.updateRecord") + defer r.End() + sc := sctx.GetSessionVars().StmtCtx changed, handleChanged := false, false // onUpdateSpecified is for "UPDATE SET ts_field = old_value", the diff --git a/expression/builtin_compare.go b/expression/builtin_compare.go index bcb27a1233da7..dec5d06983679 100644 --- a/expression/builtin_compare.go +++ b/expression/builtin_compare.go @@ -1565,17 +1565,33 @@ func (c *compareFunctionClass) refineArgs(ctx sessionctx.Context, args []Express arg0Type, arg1Type := args[0].GetType(), args[1].GetType() arg0IsInt := arg0Type.EvalType() == types.ETInt arg1IsInt := arg1Type.EvalType() == types.ETInt + arg0IsString := arg0Type.EvalType() == types.ETString + arg1IsString := arg1Type.EvalType() == types.ETString arg0, arg0IsCon := args[0].(*Constant) arg1, arg1IsCon := args[1].(*Constant) isExceptional, finalArg0, finalArg1 := false, args[0], args[1] isPositiveInfinite, isNegativeInfinite := false, false - // int non-constant [cmp] non-int constant - if arg0IsInt && !arg0IsCon && !arg1IsInt && arg1IsCon { - if MaybeOverOptimized4PlanCache(ctx, []Expression{arg1}) { - ctx.GetSessionVars().StmtCtx.SetSkipPlanCache(errors.Errorf("skip plan-cache: '%v' may be converted to INT", arg1.String())) + if MaybeOverOptimized4PlanCache(ctx, args) { + // To keep the result be compatible with MySQL, refine `int non-constant str constant` + // here and skip this refine operation in all other cases for safety. + if (arg0IsInt && !arg0IsCon && arg1IsString && arg1IsCon) || (arg1IsInt && !arg1IsCon && arg0IsString && arg0IsCon) { + var reason error + if arg1IsString { + reason = errors.Errorf("skip plan-cache: '%v' may be converted to INT", arg1.String()) + } else { // arg0IsString + reason = errors.Errorf("skip plan-cache: '%v' may be converted to INT", arg0.String()) + } + ctx.GetSessionVars().StmtCtx.SetSkipPlanCache(reason) RemoveMutableConst(ctx, args) + } else { + return args } - + } else if !ctx.GetSessionVars().StmtCtx.UseCache { + // We should remove the mutable constant for correctness, because its value may be changed. + RemoveMutableConst(ctx, args) + } + // int non-constant [cmp] non-int constant + if arg0IsInt && !arg0IsCon && !arg1IsInt && arg1IsCon { arg1, isExceptional = RefineComparedConstant(ctx, *arg0Type, arg1, c.op) // Why check not null flag // eg: int_col > const_val(which is less than min_int32) @@ -1603,11 +1619,6 @@ func (c *compareFunctionClass) refineArgs(ctx sessionctx.Context, args []Express } // non-int constant [cmp] int non-constant if arg1IsInt && !arg1IsCon && !arg0IsInt && arg0IsCon { - if MaybeOverOptimized4PlanCache(ctx, []Expression{arg0}) { - ctx.GetSessionVars().StmtCtx.SetSkipPlanCache(errors.Errorf("skip plan-cache: '%v' may be converted to INT", arg0.String())) - RemoveMutableConst(ctx, args) - } - arg0, isExceptional = RefineComparedConstant(ctx, *arg1Type, arg0, symmetricOp[c.op]) if !isExceptional || (isExceptional && mysql.HasNotNullFlag(arg1Type.GetFlag())) { finalArg0 = arg0 @@ -1625,11 +1636,6 @@ func (c *compareFunctionClass) refineArgs(ctx sessionctx.Context, args []Express } // int constant [cmp] year type if arg0IsCon && arg0IsInt && arg1Type.GetType() == mysql.TypeYear && !arg0.Value.IsNull() { - if MaybeOverOptimized4PlanCache(ctx, []Expression{arg0}) { - ctx.GetSessionVars().StmtCtx.SetSkipPlanCache(errors.Errorf("skip plan-cache: '%v' may be converted to YEAR", arg0.String())) - RemoveMutableConst(ctx, args) - } - adjusted, failed := types.AdjustYear(arg0.Value.GetInt64(), false) if failed == nil { arg0.Value.SetInt64(adjusted) @@ -1638,11 +1644,6 @@ func (c *compareFunctionClass) refineArgs(ctx sessionctx.Context, args []Express } // year type [cmp] int constant if arg1IsCon && arg1IsInt && arg0Type.GetType() == mysql.TypeYear && !arg1.Value.IsNull() { - if MaybeOverOptimized4PlanCache(ctx, []Expression{arg1}) { - ctx.GetSessionVars().StmtCtx.SetSkipPlanCache(errors.Errorf("skip plan-cache: '%v' may be converted to YEAR", arg1.String())) - RemoveMutableConst(ctx, args) - } - adjusted, failed := types.AdjustYear(arg1.Value.GetInt64(), false) if failed == nil { arg1.Value.SetInt64(adjusted) diff --git a/go.mod b/go.mod index ceb153b0e9b84..60fc16f64df2c 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/charithe/durationcheck v0.0.9 github.com/cheggaaa/pb/v3 v3.0.8 github.com/cheynewallace/tabby v1.1.1 - github.com/cloudfoundry/gosigar v1.3.4 + github.com/cloudfoundry/gosigar v1.3.6 github.com/cockroachdb/errors v1.8.1 github.com/cockroachdb/pebble v0.0.0-20210719141320-8c3bd06debb5 github.com/coocood/freecache v1.2.1 diff --git a/go.sum b/go.sum index bd81c37660b2b..8072e16609420 100644 --- a/go.sum +++ b/go.sum @@ -500,8 +500,8 @@ github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWR github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= -github.com/cloudfoundry/gosigar v1.3.4 h1:T3MoGdugg1vdHn8Az7wDn7cZ4+QCjZph+eXf2CjSjo4= -github.com/cloudfoundry/gosigar v1.3.4/go.mod h1:g9r7ETZ1tpvJCT9TpqxO53+5BUZiM2FDSFSENzjK5Z8= +github.com/cloudfoundry/gosigar v1.3.6 h1:gIc08FbB3QPb+nAQhINIK/qhf5REKkY0FTGgRGXkcVc= +github.com/cloudfoundry/gosigar v1.3.6/go.mod h1:lNWstu5g5gw59O09Y+wsMNFzBSnU8a0u+Sfx4dq360E= github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= @@ -633,7 +633,7 @@ github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM github.com/frankban/quicktest v1.14.3 h1:FJKSZTDHjyhriyC81FLQ0LY93eSai0ZyR/ZIkd3ZUKE= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= -github.com/fsnotify/fsnotify v1.5.4 h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI= +github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= github.com/fsouza/fake-gcs-server v1.19.0 h1:XyaGOlqo+R5sjT03x2ymk0xepaQlgwhRLTT2IopW0zA= github.com/fsouza/fake-gcs-server v1.19.0/go.mod h1:JtXHY/QzHhtyIxsNfIuQ+XgHtRb5B/w8nqbL5O8zqo0= github.com/fzipp/gocyclo v0.3.1/go.mod h1:DJHO6AUmbdqj2ET4Z9iArSuwWgYDRryYt2wASxc7x3E= @@ -1082,8 +1082,8 @@ github.com/ngaut/sync2 v0.0.0-20141008032647-7a24ed77b2ef/go.mod h1:7WjlapSfwQyo github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/nishanths/predeclared v0.2.2 h1:V2EPdZPliZymNAn79T8RkNApBjMmVKh5XRpLm/w98Vk= github.com/nishanths/predeclared v0.2.2/go.mod h1:RROzoN6TnGQupbC+lqggsOlcgysk3LMK/HI84Mp280c= -github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= diff --git a/infoschema/cache.go b/infoschema/cache.go index 34cc08eca2231..3ceab0bffb00d 100644 --- a/infoschema/cache.go +++ b/infoschema/cache.go @@ -15,6 +15,7 @@ package infoschema import ( + "fmt" "sort" "sync" @@ -36,22 +37,27 @@ var ( // It only promised to cache the infoschema, if it is newer than all the cached. type InfoCache struct { mu sync.RWMutex - // cache is sorted by SchemaVersion in descending order - cache []InfoSchema - // record SnapshotTS of the latest schema Insert. - maxUpdatedSnapshotTS uint64 + // cache is sorted by both SchemaVersion and timestamp in descending order, assume they have same order + cache []schemaAndTimestamp +} + +type schemaAndTimestamp struct { + infoschema InfoSchema + timestamp int64 } // NewCache creates a new InfoCache. func NewCache(capacity int) *InfoCache { - return &InfoCache{cache: make([]InfoSchema, 0, capacity)} + return &InfoCache{ + cache: make([]schemaAndTimestamp, 0, capacity), + } } // Reset resets the cache. func (h *InfoCache) Reset(capacity int) { h.mu.Lock() defer h.mu.Unlock() - h.cache = make([]InfoSchema, 0, capacity) + h.cache = make([]schemaAndTimestamp, 0, capacity) } // GetLatest gets the newest information schema. @@ -61,18 +67,40 @@ func (h *InfoCache) GetLatest() InfoSchema { getLatestCounter.Inc() if len(h.cache) > 0 { hitLatestCounter.Inc() - return h.cache[0] + return h.cache[0].infoschema } return nil } +// GetSchemaByTimestamp returns the schema used at the specific timestamp +func (h *InfoCache) GetSchemaByTimestamp(ts uint64) (InfoSchema, error) { + h.mu.RLock() + defer h.mu.RUnlock() + return h.getSchemaByTimestampNoLock(ts) +} + +func (h *InfoCache) getSchemaByTimestampNoLock(ts uint64) (InfoSchema, error) { + i := sort.Search(len(h.cache), func(i int) bool { + return uint64(h.cache[i].timestamp) <= ts + }) + if i < len(h.cache) { + return h.cache[i].infoschema, nil + } + + return nil, fmt.Errorf("no schema cached for timestamp %d", ts) +} + // GetByVersion gets the information schema based on schemaVersion. Returns nil if it is not loaded. func (h *InfoCache) GetByVersion(version int64) InfoSchema { h.mu.RLock() defer h.mu.RUnlock() + return h.getByVersionNoLock(version) +} + +func (h *InfoCache) getByVersionNoLock(version int64) InfoSchema { getVersionCounter.Inc() i := sort.Search(len(h.cache), func(i int) bool { - return h.cache[i].SchemaMetaVersion() <= version + return h.cache[i].infoschema.SchemaMetaVersion() <= version }) // `GetByVersion` is allowed to load the latest schema that is less than argument `version`. @@ -93,9 +121,9 @@ func (h *InfoCache) GetByVersion(version int64) InfoSchema { // } // ``` - if i < len(h.cache) && (i != 0 || h.cache[i].SchemaMetaVersion() == version) { + if i < len(h.cache) && (i != 0 || h.cache[i].infoschema.SchemaMetaVersion() == version) { hitVersionCounter.Inc() - return h.cache[i] + return h.cache[i].infoschema } return nil } @@ -108,11 +136,9 @@ func (h *InfoCache) GetBySnapshotTS(snapshotTS uint64) InfoSchema { defer h.mu.RUnlock() getTSCounter.Inc() - if snapshotTS >= h.maxUpdatedSnapshotTS { - if len(h.cache) > 0 { - hitTSCounter.Inc() - return h.cache[0] - } + if schema, err := h.getSchemaByTimestampNoLock(snapshotTS); err == nil { + hitTSCounter.Inc() + return schema } return nil } @@ -125,16 +151,17 @@ func (h *InfoCache) Insert(is InfoSchema, snapshotTS uint64) bool { defer h.mu.Unlock() version := is.SchemaMetaVersion() + + // assume this is the timestamp order as well i := sort.Search(len(h.cache), func(i int) bool { - return h.cache[i].SchemaMetaVersion() <= version + return h.cache[i].infoschema.SchemaMetaVersion() <= version }) - if h.maxUpdatedSnapshotTS < snapshotTS { - h.maxUpdatedSnapshotTS = snapshotTS - } - // cached entry - if i < len(h.cache) && h.cache[i].SchemaMetaVersion() == version { + if i < len(h.cache) && h.cache[i].infoschema.SchemaMetaVersion() == version { + if h.cache[i].timestamp > int64(snapshotTS) { + h.cache[i].timestamp = int64(snapshotTS) + } return true } @@ -142,12 +169,18 @@ func (h *InfoCache) Insert(is InfoSchema, snapshotTS uint64) bool { // has free space, grown the slice h.cache = h.cache[:len(h.cache)+1] copy(h.cache[i+1:], h.cache[i:]) - h.cache[i] = is + h.cache[i] = schemaAndTimestamp{ + infoschema: is, + timestamp: int64(snapshotTS), + } return true } else if i < len(h.cache) { // drop older schema copy(h.cache[i+1:], h.cache[i:]) - h.cache[i] = is + h.cache[i] = schemaAndTimestamp{ + infoschema: is, + timestamp: int64(snapshotTS), + } return true } // older than all cached schemas, refuse to cache it diff --git a/infoschema/cache_test.go b/infoschema/cache_test.go index 83506bc4794d8..5d6e0e7f4e1b1 100644 --- a/infoschema/cache_test.go +++ b/infoschema/cache_test.go @@ -42,7 +42,7 @@ func TestInsert(t *testing.T) { ic.Insert(is5, 5) require.Equal(t, is5, ic.GetByVersion(5)) require.Equal(t, is2, ic.GetByVersion(2)) - require.Nil(t, ic.GetBySnapshotTS(2)) + require.Equal(t, is2, ic.GetBySnapshotTS(2)) require.Equal(t, is5, ic.GetBySnapshotTS(10)) // older @@ -59,7 +59,7 @@ func TestInsert(t *testing.T) { require.Equal(t, is5, ic.GetByVersion(5)) require.Equal(t, is2, ic.GetByVersion(2)) require.Nil(t, ic.GetByVersion(0)) - require.Nil(t, ic.GetBySnapshotTS(2)) + require.Equal(t, is2, ic.GetBySnapshotTS(2)) require.Equal(t, is6, ic.GetBySnapshotTS(10)) // replace 2, drop 2 @@ -91,7 +91,7 @@ func TestInsert(t *testing.T) { require.Nil(t, ic.GetByVersion(2)) require.Nil(t, ic.GetByVersion(0)) require.Nil(t, ic.GetBySnapshotTS(2)) - require.Nil(t, ic.GetBySnapshotTS(5)) + require.Equal(t, is5, ic.GetBySnapshotTS(5)) require.Equal(t, is6, ic.GetBySnapshotTS(10)) } @@ -129,3 +129,61 @@ func TestGetLatest(t *testing.T) { ic.Insert(is0, 0) require.Equal(t, is2, ic.GetLatest()) } + +func TestGetByTimestamp(t *testing.T) { + ic := infoschema.NewCache(16) + require.NotNil(t, ic) + require.Nil(t, ic.GetLatest()) + + is1 := infoschema.MockInfoSchemaWithSchemaVer(nil, 1) + ic.Insert(is1, 1) + require.Equal(t, is1, ic.GetLatest()) + _, err := ic.GetSchemaByTimestamp(0) + require.NotNil(t, err) + schema, err := ic.GetSchemaByTimestamp(1) + require.Nil(t, err) + require.Equal(t, int64(1), schema.SchemaMetaVersion()) + require.Equal(t, is1, ic.GetBySnapshotTS(1)) + schema, err = ic.GetSchemaByTimestamp(2) + require.Nil(t, err) + require.Equal(t, int64(1), schema.SchemaMetaVersion()) + require.Equal(t, is1, ic.GetBySnapshotTS(2)) + + is2 := infoschema.MockInfoSchemaWithSchemaVer(nil, 2) + ic.Insert(is2, 2) + require.Equal(t, is2, ic.GetLatest()) + _, err = ic.GetSchemaByTimestamp(0) + require.NotNil(t, err) + schema, err = ic.GetSchemaByTimestamp(1) + require.Nil(t, err) + require.Equal(t, int64(1), schema.SchemaMetaVersion()) + require.Equal(t, is1, ic.GetBySnapshotTS(1)) + schema, err = ic.GetSchemaByTimestamp(2) + require.Nil(t, err) + require.Equal(t, int64(2), schema.SchemaMetaVersion()) + require.Equal(t, is2, ic.GetBySnapshotTS(2)) + schema, err = ic.GetSchemaByTimestamp(3) + require.Nil(t, err) + require.Equal(t, int64(2), schema.SchemaMetaVersion()) + require.Equal(t, is2, ic.GetBySnapshotTS(3)) + + is0 := infoschema.MockInfoSchemaWithSchemaVer(nil, 0) + ic.Insert(is0, 0) + require.Equal(t, is2, ic.GetLatest()) + schema, err = ic.GetSchemaByTimestamp(0) + require.Nil(t, err) + require.Equal(t, int64(0), schema.SchemaMetaVersion()) + require.Equal(t, is0, ic.GetBySnapshotTS(0)) + schema, err = ic.GetSchemaByTimestamp(1) + require.Nil(t, err) + require.Equal(t, int64(1), schema.SchemaMetaVersion()) + require.Equal(t, is1, ic.GetBySnapshotTS(1)) + schema, err = ic.GetSchemaByTimestamp(2) + require.Nil(t, err) + require.Equal(t, int64(2), schema.SchemaMetaVersion()) + require.Equal(t, is2, ic.GetBySnapshotTS(2)) + schema, err = ic.GetSchemaByTimestamp(3) + require.Nil(t, err) + require.Equal(t, int64(2), schema.SchemaMetaVersion()) + require.Equal(t, is2, ic.GetBySnapshotTS(3)) +} diff --git a/kv/kv.go b/kv/kv.go index 9239cc514c7b2..057c22fb7312b 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -573,6 +573,8 @@ type Request struct { StoreBatchSize int // ResourceGroupName is the name of the bind resource group. ResourceGroupName string + // LimitSize indicates whether the request is scan and limit + LimitSize uint64 } // CoprRequestAdjuster is used to check and adjust a copr request according to specific rules. diff --git a/meta/autoid/BUILD.bazel b/meta/autoid/BUILD.bazel index d6bcc1ef94689..0eb6034820160 100644 --- a/meta/autoid/BUILD.bazel +++ b/meta/autoid/BUILD.bazel @@ -23,7 +23,7 @@ go_library( "//util/execdetails", "//util/logutil", "//util/mathutil", - "@com_github_opentracing_opentracing_go//:opentracing-go", + "//util/tracing", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/autoid", diff --git a/meta/autoid/autoid.go b/meta/autoid/autoid.go index 8c5e1d7bc58d8..4cd03bea89219 100644 --- a/meta/autoid/autoid.go +++ b/meta/autoid/autoid.go @@ -23,7 +23,6 @@ import ( "sync" "time" - "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/autoid" @@ -37,6 +36,7 @@ import ( "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/mathutil" + "github.com/pingcap/tidb/util/tracing" "github.com/tikv/client-go/v2/txnkv/txnsnapshot" tikvutil "github.com/tikv/client-go/v2/util" clientv3 "go.etcd.io/etcd/client/v3" @@ -900,11 +900,7 @@ func (alloc *allocator) alloc4Signed(ctx context.Context, n uint64, increment, o ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnMeta) err := kv.RunInNewTxn(ctx, alloc.store, true, func(ctx context.Context, txn kv.Transaction) error { - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("alloc.alloc4Signed", opentracing.ChildOf(span.Context())) - defer span1.Finish() - opentracing.ContextWithSpan(ctx, span1) - } + defer tracing.StartRegion(ctx, "alloc.alloc4Signed").End() if allocatorStats != nil { txn.SetOption(kv.CollectRuntimeStats, allocatorStats.SnapshotRuntimeStats) } @@ -995,11 +991,7 @@ func (alloc *allocator) alloc4Unsigned(ctx context.Context, n uint64, increment, ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnMeta) err := kv.RunInNewTxn(ctx, alloc.store, true, func(ctx context.Context, txn kv.Transaction) error { - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("alloc.alloc4Unsigned", opentracing.ChildOf(span.Context())) - defer span1.Finish() - opentracing.ContextWithSpan(ctx, span1) - } + defer tracing.StartRegion(ctx, "alloc.alloc4Unsigned").End() if allocatorStats != nil { txn.SetOption(kv.CollectRuntimeStats, allocatorStats.SnapshotRuntimeStats) } diff --git a/meta/autoid/autoid_service.go b/meta/autoid/autoid_service.go index 314ac3beef679..8e359f6e523fb 100644 --- a/meta/autoid/autoid_service.go +++ b/meta/autoid/autoid_service.go @@ -20,12 +20,12 @@ import ( "sync" "time" - "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/autoid" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/tracing" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" "google.golang.org/grpc" @@ -112,11 +112,8 @@ func (d *clientDiscover) GetClient(ctx context.Context) (autoid.AutoIDAllocClien // case increment=1 & offset=1: you can derive the ids like min+1, min+2... max. // case increment=x & offset=y: you firstly need to seek to firstID by `SeekToFirstAutoIDXXX`, then derive the IDs like firstID, firstID + increment * 2... in the caller. func (sp *singlePointAlloc) Alloc(ctx context.Context, n uint64, increment, offset int64) (min int64, max int64, _ error) { - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("autoid.Alloc", opentracing.ChildOf(span.Context())) - defer span1.Finish() - ctx = opentracing.ContextWithSpan(ctx, span1) - } + r, ctx := tracing.StartRegionEx(ctx, "autoid.Alloc") + defer r.End() if !validIncrementAndOffset(increment, offset) { return 0, 0, errInvalidIncrementAndOffset.GenWithStackByArgs(increment, offset) @@ -186,11 +183,8 @@ func (*singlePointAlloc) AllocSeqCache() (a int64, b int64, c int64, err error) // If allocIDs is true, it will allocate some IDs and save to the cache. // If allocIDs is false, it will not allocate IDs. func (sp *singlePointAlloc) Rebase(ctx context.Context, newBase int64, _ bool) error { - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("autoid.Rebase", opentracing.ChildOf(span.Context())) - defer span1.Finish() - ctx = opentracing.ContextWithSpan(ctx, span1) - } + r, ctx := tracing.StartRegionEx(ctx, "autoid.Rebase") + defer r.End() return sp.rebase(ctx, newBase, false) } diff --git a/planner/core/BUILD.bazel b/planner/core/BUILD.bazel index 12c2730c59364..cd20248f05d55 100644 --- a/planner/core/BUILD.bazel +++ b/planner/core/BUILD.bazel @@ -199,6 +199,7 @@ go_test( "plan_to_pb_test.go", "planbuilder_test.go", "point_get_plan_test.go", + "predicate_simplification_test.go", "prepare_test.go", "preprocess_test.go", "rule_inject_extra_projection_test.go", diff --git a/planner/core/expression_rewriter.go b/planner/core/expression_rewriter.go index c7af8385d556c..a2d1f242a0ff6 100644 --- a/planner/core/expression_rewriter.go +++ b/planner/core/expression_rewriter.go @@ -1563,7 +1563,7 @@ func (er *expressionRewriter) inToExpression(lLen int, not bool, tp *types.Field continue // no need to refine it } er.sctx.GetSessionVars().StmtCtx.SetSkipPlanCache(errors.Errorf("skip plan-cache: '%v' may be converted to INT", c.String())) - expression.RemoveMutableConst(er.sctx, args) + expression.RemoveMutableConst(er.sctx, []expression.Expression{c}) } args[i], isExceptional = expression.RefineComparedConstant(er.sctx, *leftFt, c, opcode.EQ) if isExceptional { diff --git a/planner/core/plan_cache_test.go b/planner/core/plan_cache_test.go index e3a0d276e9815..8278050681553 100644 --- a/planner/core/plan_cache_test.go +++ b/planner/core/plan_cache_test.go @@ -506,22 +506,3 @@ func TestPlanCacheWithLimit(t *testing.T) { tk.MustExec("execute stmt using @a") tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 skip plan-cache: limit count more than 10000")) } - -func TestIssue40679(t *testing.T) { - store := testkit.CreateMockStore(t) - tk := testkit.NewTestKit(t, store) - tk.MustExec("use test") - tk.MustExec("create table t (a int, key(a));") - tk.MustExec("prepare st from 'select * from t use index(a) where a < ?'") - tk.MustExec("set @a1=1.1") - tk.MustExec("execute st using @a1") - - tkProcess := tk.Session().ShowProcess() - ps := []*util.ProcessInfo{tkProcess} - tk.Session().SetSessionManager(&testkit.MockSessionManager{PS: ps}) - rows := tk.MustQuery(fmt.Sprintf("explain for connection %d", tkProcess.ID)).Rows() - require.True(t, strings.Contains(rows[1][0].(string), "RangeScan")) // RangeScan not FullScan - - tk.MustExec("execute st using @a1") - tk.MustQuery("show warnings").Check(testkit.Rows("Warning 1105 skip plan-cache: '1.1' may be converted to INT")) -} diff --git a/planner/core/predicate_simplification_test.go b/planner/core/predicate_simplification_test.go new file mode 100644 index 0000000000000..673c793d41013 --- /dev/null +++ b/planner/core/predicate_simplification_test.go @@ -0,0 +1,63 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package core_test + +import ( + "context" + "fmt" + "testing" + + "github.com/pingcap/tidb/infoschema" + "github.com/pingcap/tidb/parser" + "github.com/pingcap/tidb/parser/model" + "github.com/pingcap/tidb/planner" + "github.com/pingcap/tidb/planner/core" + "github.com/pingcap/tidb/sessiontxn" + "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/testkit/testdata" + "github.com/stretchr/testify/require" +) + +// Test redundant conditions in single table and join predicates. +func TestRemoveRedundantPredicates(t *testing.T) { + store := testkit.CreateMockStore(t) + + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("set tidb_cost_model_version=2") + tk.MustExec("set tidb_opt_limit_push_down_threshold=0") + var input []string + var output []struct { + SQL string + Best string + } + planSuiteData := core.GetPlanSuiteData() + planSuiteData.LoadTestCases(t, &input, &output) + p := parser.New() + is := infoschema.MockInfoSchema([]*model.TableInfo{core.MockSignedTable(), core.MockUnsignedTable()}) + for i, tt := range input { + comment := fmt.Sprintf("case: %v, sql: %s", i, tt) + stmt, err := p.ParseOneStmt(tt, "", "") + require.NoError(t, err, comment) + require.NoError(t, sessiontxn.NewTxn(context.Background(), tk.Session())) + p, _, err := planner.Optimize(context.TODO(), tk.Session(), stmt, is) + require.NoError(t, err) + testdata.OnRecord(func() { + output[i].SQL = tt + output[i].Best = core.ToString(p) + }) + require.Equal(t, output[i].Best, core.ToString(p), comment) + } +} diff --git a/planner/core/testdata/plan_suite_in.json b/planner/core/testdata/plan_suite_in.json index 6f6e74fac3cfa..fdabafeb9c211 100644 --- a/planner/core/testdata/plan_suite_in.json +++ b/planner/core/testdata/plan_suite_in.json @@ -1194,5 +1194,21 @@ "select /*+ agg_to_cop() hash_agg() */ count(1) from tbl_15 ;", "select /*+ agg_to_cop() stream_agg() */ avg( tbl_16.col_100 ) as r0 from tbl_16 where tbl_16.col_100 in ( 10672141 ) or tbl_16.col_104 in ( 'yfEG1t!*b' ,'C1*bqx_qyO' ,'vQ^yUpKHr&j#~' ) group by tbl_16.col_100 order by r0 limit 20 ;" ] + }, + { + "name": "TestRemoveRedundantPredicates", + "cases":[ + "select f from t use index() where f = 1 and f = 1 -- simple redundancy of exact condition", + "select f from t use index() where f = 1 and f = 2 -- unsatisfiable condition", + "select f from t use index() where f = 1 and f in (1,2,3) -- intersection of in and =", + "select f from t use index() where f = 1 and f <> 1 -- intersection of = and <>", + "select f from t use index() where f not in (1,2,3) and f = 3 -- intersection of not in list and =", + "select f from t use index() where f <> 3 and f <> 3 -- intersection of two not in values.", + "select t1.f /* merge_join(t1, t2) */ from t t1, t t2 where t1.a=t2.a and t1.a=t2.a -- exact redundancy in joins", + "select f from t use index() where f in (1,2,3) and f <> 2 -- intersection of in and <>. Not done yet see issue 39676", + "select f from t use index() where f in (1,2,3) and f in (3,4,5) -- intersection of two in. Not done yet", + "select f from t use index() where f not in (1,2,3) and f not in (3,4,5) -- intersection of two not in. Not done yet", + "select f from t use index() where f not in (1,2,3) and f in (1,2,3) -- intersection of in and not in. Not done yet" + ] } ] diff --git a/planner/core/testdata/plan_suite_out.json b/planner/core/testdata/plan_suite_out.json index 905ffab969200..2394a0e9e669c 100644 --- a/planner/core/testdata/plan_suite_out.json +++ b/planner/core/testdata/plan_suite_out.json @@ -7600,5 +7600,54 @@ "Warning": null } ] + }, + { + "Name": "TestRemoveRedundantPredicates", + "Cases": [ + { + "SQL": "select f from t use index() where f = 1 and f = 1 -- simple redundancy of exact condition", + "Best": "TableReader(Table(t)->Sel([eq(test.t.f, 1)]))" + }, + { + "SQL": "select f from t use index() where f = 1 and f = 2 -- unsatisfiable condition", + "Best": "Dual" + }, + { + "SQL": "select f from t use index() where f = 1 and f in (1,2,3) -- intersection of in and =", + "Best": "TableReader(Table(t)->Sel([eq(test.t.f, 1)]))" + }, + { + "SQL": "select f from t use index() where f = 1 and f <> 1 -- intersection of = and <>", + "Best": "Dual" + }, + { + "SQL": "select f from t use index() where f not in (1,2,3) and f = 3 -- intersection of not in list and =", + "Best": "Dual" + }, + { + "SQL": "select f from t use index() where f <> 3 and f <> 3 -- intersection of two not in values.", + "Best": "TableReader(Table(t)->Sel([ne(test.t.f, 3)]))" + }, + { + "SQL": "select t1.f /* merge_join(t1, t2) */ from t t1, t t2 where t1.a=t2.a and t1.a=t2.a -- exact redundancy in joins", + "Best": "MergeInnerJoin{TableReader(Table(t))->TableReader(Table(t))}(test.t.a,test.t.a)" + }, + { + "SQL": "select f from t use index() where f in (1,2,3) and f <> 2 -- intersection of in and <>. Not done yet see issue 39676", + "Best": "TableReader(Table(t)->Sel([in(test.t.f, 1, 2, 3) ne(test.t.f, 2)]))" + }, + { + "SQL": "select f from t use index() where f in (1,2,3) and f in (3,4,5) -- intersection of two in. Not done yet", + "Best": "TableReader(Table(t)->Sel([in(test.t.f, 1, 2, 3) in(test.t.f, 3, 4, 5)]))" + }, + { + "SQL": "select f from t use index() where f not in (1,2,3) and f not in (3,4,5) -- intersection of two not in. Not done yet", + "Best": "TableReader(Table(t)->Sel([not(in(test.t.f, 1, 2, 3)) not(in(test.t.f, 3, 4, 5))]))" + }, + { + "SQL": "select f from t use index() where f not in (1,2,3) and f in (1,2,3) -- intersection of in and not in. Not done yet", + "Best": "TableReader(Table(t)->Sel([not(in(test.t.f, 1, 2, 3)) in(test.t.f, 1, 2, 3)]))" + } + ] } ] diff --git a/resourcemanager/schedule.go b/resourcemanager/schedule.go index 50a5f54697800..a33e0b75a764e 100644 --- a/resourcemanager/schedule.go +++ b/resourcemanager/schedule.go @@ -31,6 +31,9 @@ func (r *ResourceManager) schedule() { } func (r *ResourceManager) schedulePool(pool *util.PoolContainer) scheduler.Command { + if pool.Pool.Running() == 0 { + return scheduler.Hold + } for _, sch := range r.scheduler { cmd := sch.Tune(pool.Component, pool.Pool) switch cmd { diff --git a/server/BUILD.bazel b/server/BUILD.bazel index d1139f0a94138..c0477b9248e5c 100644 --- a/server/BUILD.bazel +++ b/server/BUILD.bazel @@ -91,11 +91,11 @@ go_library( "//util/topsql", "//util/topsql/state", "//util/topsql/stmtstats", + "//util/tracing", "//util/versioninfo", "@com_github_blacktear23_go_proxyprotocol//:go-proxyprotocol", "@com_github_burntsushi_toml//:toml", "@com_github_gorilla_mux//:mux", - "@com_github_opentracing_opentracing_go//:opentracing-go", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_fn//:fn", diff --git a/server/column.go b/server/column.go index 2ef6323d83b5f..155a1b0e897a1 100644 --- a/server/column.go +++ b/server/column.go @@ -15,6 +15,8 @@ package server import ( + "fmt" + "github.com/pingcap/tidb/parser/charset" "github.com/pingcap/tidb/parser/mysql" ) @@ -23,22 +25,30 @@ const maxColumnNameSize = 256 // ColumnInfo contains information of a column type ColumnInfo struct { - Schema string - Table string - OrgTable string - Name string - OrgName string - ColumnLength uint32 - Charset uint16 - Flag uint16 - Decimal uint8 - Type uint8 - DefaultValueLength uint64 - DefaultValue []byte + Schema string + Table string + OrgTable string + Name string + OrgName string + ColumnLength uint32 + Charset uint16 + Flag uint16 + Decimal uint8 + Type uint8 + DefaultValue any } // Dump dumps ColumnInfo to bytes. func (column *ColumnInfo) Dump(buffer []byte, d *resultEncoder) []byte { + return column.dump(buffer, d, false) +} + +// DumpWithDefault dumps ColumnInfo to bytes, including column defaults. This is used for ComFieldList responses. +func (column *ColumnInfo) DumpWithDefault(buffer []byte, d *resultEncoder) []byte { + return column.dump(buffer, d, true) +} + +func (column *ColumnInfo) dump(buffer []byte, d *resultEncoder, withDefault bool) []byte { if d == nil { d = newResultEncoder(charset.CharsetUTF8MB4) } @@ -64,9 +74,14 @@ func (column *ColumnInfo) Dump(buffer []byte, d *resultEncoder) []byte { buffer = append(buffer, column.Decimal) buffer = append(buffer, 0, 0) - if column.DefaultValue != nil { - buffer = dumpUint64(buffer, uint64(len(column.DefaultValue))) - buffer = append(buffer, column.DefaultValue...) + if withDefault { + switch column.DefaultValue { + case "CURRENT_TIMESTAMP", "CURRENT_DATE", nil: + buffer = append(buffer, 251) // NULL + default: + defaultValStr := fmt.Sprintf("%v", column.DefaultValue) + buffer = dumpLengthEncodedString(buffer, []byte(defaultValStr)) + } } return buffer diff --git a/server/column_test.go b/server/column_test.go index 9d54a643f7f7b..e2251040e1f9a 100644 --- a/server/column_test.go +++ b/server/column_test.go @@ -23,21 +23,47 @@ import ( func TestDumpColumn(t *testing.T) { info := ColumnInfo{ - Schema: "testSchema", - Table: "testTable", - OrgTable: "testOrgTable", - Name: "testName", - OrgName: "testOrgName", - ColumnLength: 1, - Charset: 106, - Flag: 0, - Decimal: 1, - Type: 14, - DefaultValueLength: 2, - DefaultValue: []byte{5, 2}, + Schema: "testSchema", + Table: "testTable", + OrgTable: "testOrgTable", + Name: "testName", + OrgName: "testOrgName", + ColumnLength: 1, + Charset: 106, + Flag: 0, + Decimal: 1, + Type: 14, + DefaultValue: []byte{5, 2}, } r := info.Dump(nil, nil) - exp := []byte{0x3, 0x64, 0x65, 0x66, 0xa, 0x74, 0x65, 0x73, 0x74, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x9, 0x74, 0x65, 0x73, 0x74, 0x54, 0x61, 0x62, 0x6c, 0x65, 0xc, 0x74, 0x65, 0x73, 0x74, 0x4f, 0x72, 0x67, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x8, 0x74, 0x65, 0x73, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0xb, 0x74, 0x65, 0x73, 0x74, 0x4f, 0x72, 0x67, 0x4e, 0x61, 0x6d, 0x65, 0xc, 0x6a, 0x0, 0x1, 0x0, 0x0, 0x0, 0xe, 0x0, 0x0, 0x1, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x5, 0x2} + exp := []byte{0x3, 0x64, 0x65, 0x66, 0xa, 0x74, 0x65, 0x73, 0x74, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x9, 0x74, 0x65, 0x73, 0x74, 0x54, 0x61, 0x62, 0x6c, 0x65, 0xc, 0x74, 0x65, 0x73, 0x74, 0x4f, 0x72, 0x67, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x8, 0x74, 0x65, 0x73, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0xb, 0x74, 0x65, 0x73, 0x74, 0x4f, 0x72, 0x67, 0x4e, 0x61, 0x6d, 0x65, 0xc, 0x6a, 0x0, 0x1, 0x0, 0x0, 0x0, 0xe, 0x0, 0x0, 0x1, 0x0, 0x0} + require.Equal(t, exp, r) + + require.Equal(t, uint16(mysql.SetFlag), dumpFlag(mysql.TypeSet, 0)) + require.Equal(t, uint16(mysql.EnumFlag), dumpFlag(mysql.TypeEnum, 0)) + require.Equal(t, uint16(0), dumpFlag(mysql.TypeString, 0)) + + require.Equal(t, mysql.TypeString, dumpType(mysql.TypeSet)) + require.Equal(t, mysql.TypeString, dumpType(mysql.TypeEnum)) + require.Equal(t, mysql.TypeBit, dumpType(mysql.TypeBit)) +} + +func TestDumpColumnWithDefault(t *testing.T) { + info := ColumnInfo{ + Schema: "testSchema", + Table: "testTable", + OrgTable: "testOrgTable", + Name: "testName", + OrgName: "testOrgName", + ColumnLength: 1, + Charset: 106, + Flag: 0, + Decimal: 1, + Type: 14, + DefaultValue: "test", + } + r := info.DumpWithDefault(nil, nil) + exp := []byte{0x3, 0x64, 0x65, 0x66, 0xa, 0x74, 0x65, 0x73, 0x74, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x9, 0x74, 0x65, 0x73, 0x74, 0x54, 0x61, 0x62, 0x6c, 0x65, 0xc, 0x74, 0x65, 0x73, 0x74, 0x4f, 0x72, 0x67, 0x54, 0x61, 0x62, 0x6c, 0x65, 0x8, 0x74, 0x65, 0x73, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0xb, 0x74, 0x65, 0x73, 0x74, 0x4f, 0x72, 0x67, 0x4e, 0x61, 0x6d, 0x65, 0xc, 0x6a, 0x0, 0x1, 0x0, 0x0, 0x0, 0xe, 0x0, 0x0, 0x1, 0x0, 0x0, 0x4, 0x74, 0x65, 0x73, 0x74} require.Equal(t, exp, r) require.Equal(t, uint16(mysql.SetFlag), dumpFlag(mysql.TypeSet, 0)) @@ -55,20 +81,19 @@ func TestColumnNameLimit(t *testing.T) { aLongName = append(aLongName, 'a') } info := ColumnInfo{ - Schema: "testSchema", - Table: "testTable", - OrgTable: "testOrgTable", - Name: string(aLongName), - OrgName: "testOrgName", - ColumnLength: 1, - Charset: 106, - Flag: 0, - Decimal: 1, - Type: 14, - DefaultValueLength: 2, - DefaultValue: []byte{5, 2}, + Schema: "testSchema", + Table: "testTable", + OrgTable: "testOrgTable", + Name: string(aLongName), + OrgName: "testOrgName", + ColumnLength: 1, + Charset: 106, + Flag: 0, + Decimal: 1, + Type: 14, + DefaultValue: []byte{5, 2}, } r := info.Dump(nil, nil) - exp := []byte{0x3, 0x64, 0x65, 0x66, 0xa, 0x74, 0x65, 0x73, 0x74, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x9, 0x74, 0x65, 0x73, 0x74, 0x54, 0x61, 0x62, 0x6c, 0x65, 0xc, 0x74, 0x65, 0x73, 0x74, 0x4f, 0x72, 0x67, 0x54, 0x61, 0x62, 0x6c, 0x65, 0xfc, 0x0, 0x1, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0xb, 0x74, 0x65, 0x73, 0x74, 0x4f, 0x72, 0x67, 0x4e, 0x61, 0x6d, 0x65, 0xc, 0x6a, 0x0, 0x1, 0x0, 0x0, 0x0, 0xe, 0x0, 0x0, 0x1, 0x0, 0x0, 0x2, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x5, 0x2} + exp := []byte{0x3, 0x64, 0x65, 0x66, 0xa, 0x74, 0x65, 0x73, 0x74, 0x53, 0x63, 0x68, 0x65, 0x6d, 0x61, 0x9, 0x74, 0x65, 0x73, 0x74, 0x54, 0x61, 0x62, 0x6c, 0x65, 0xc, 0x74, 0x65, 0x73, 0x74, 0x4f, 0x72, 0x67, 0x54, 0x61, 0x62, 0x6c, 0x65, 0xfc, 0x0, 0x1, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0x61, 0xb, 0x74, 0x65, 0x73, 0x74, 0x4f, 0x72, 0x67, 0x4e, 0x61, 0x6d, 0x65, 0xc, 0x6a, 0x0, 0x1, 0x0, 0x0, 0x0, 0xe, 0x0, 0x0, 0x1, 0x0, 0x0} require.Equal(t, exp, r) } diff --git a/server/conn.go b/server/conn.go index 4cd2aedcb42c1..10d81ffa196b1 100644 --- a/server/conn.go +++ b/server/conn.go @@ -54,7 +54,6 @@ import ( "time" "unsafe" - "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/config" @@ -89,6 +88,7 @@ import ( "github.com/pingcap/tidb/util/memory" tlsutil "github.com/pingcap/tidb/util/tls" topsqlstate "github.com/pingcap/tidb/util/topsql/state" + "github.com/pingcap/tidb/util/tracing" "github.com/prometheus/client_golang/prometheus" "github.com/tikv/client-go/v2/util" "go.uber.org/zap" @@ -1290,10 +1290,11 @@ func (cc *clientConn) dispatch(ctx context.Context, data []byte) error { connIdleDurationHistogramNotInTxn.Observe(t.Sub(cc.lastActive).Seconds()) } - span := opentracing.StartSpan("server.dispatch") cfg := config.GetGlobalConfig() if cfg.OpenTracing.Enable { - ctx = opentracing.ContextWithSpan(ctx, span) + var r tracing.Region + r, ctx = tracing.StartRegionEx(ctx, "server.dispatch") + defer r.End() } var cancelFunc context.CancelFunc @@ -1340,7 +1341,6 @@ func (cc *clientConn) dispatch(ctx context.Context, data []byte) error { } cc.server.releaseToken(token) - span.Finish() cc.lastActive = time.Now() }() @@ -2104,14 +2104,8 @@ func (cc *clientConn) handleFieldList(ctx context.Context, sql string) (err erro cc.initResultEncoder(ctx) defer cc.rsEncoder.clean() for _, column := range columns { - // Current we doesn't output defaultValue but reserve defaultValue length byte to make mariadb client happy. - // https://dev.mysql.com/doc/internals/en/com-query-response.html#column-definition - // TODO: fill the right DefaultValues. - column.DefaultValueLength = 0 - column.DefaultValue = []byte{} - data = data[0:4] - data = column.Dump(data, cc.rsEncoder) + data = column.DumpWithDefault(data, cc.rsEncoder) if err := cc.writePacket(data); err != nil { return err } diff --git a/server/conn_test.go b/server/conn_test.go index fb1cd15102129..6571d7efd5352 100644 --- a/server/conn_test.go +++ b/server/conn_test.go @@ -429,9 +429,9 @@ func TestDispatch(t *testing.T) { in: []byte("t"), err: nil, out: []byte{ - 0x26, 0x0, 0x0, 0xc, 0x3, 0x64, 0x65, 0x66, 0x4, 0x74, 0x65, 0x73, 0x74, 0x1, 0x74, + 0x1f, 0x0, 0x0, 0xc, 0x3, 0x64, 0x65, 0x66, 0x4, 0x74, 0x65, 0x73, 0x74, 0x1, 0x74, 0x1, 0x74, 0x1, 0x61, 0x1, 0x61, 0xc, 0x3f, 0x0, 0xb, 0x0, 0x0, 0x0, 0x3, 0x0, 0x0, - 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x0, 0x0, 0xd, 0xfe, + 0x0, 0x0, 0x0, 0xfb, 0x1, 0x0, 0x0, 0xd, 0xfe, }, }, { @@ -549,10 +549,9 @@ func TestDispatchClientProtocol41(t *testing.T) { in: []byte("t"), err: nil, out: []byte{ - 0x26, 0x0, 0x0, 0xc, 0x3, 0x64, 0x65, 0x66, 0x4, 0x74, 0x65, 0x73, 0x74, 0x1, 0x74, + 0x1f, 0x0, 0x0, 0xc, 0x3, 0x64, 0x65, 0x66, 0x4, 0x74, 0x65, 0x73, 0x74, 0x1, 0x74, 0x1, 0x74, 0x1, 0x61, 0x1, 0x61, 0xc, 0x3f, 0x0, 0xb, 0x0, 0x0, 0x0, 0x3, 0x0, 0x0, - 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x5, 0x0, 0x0, 0xd, 0xfe, - 0x0, 0x0, 0x2, 0x0, + 0x0, 0x0, 0x0, 0xfb, 0x5, 0x0, 0x0, 0x0d, 0xfe, 0x0, 0x0, 0x2, 0x0, }, }, { diff --git a/server/driver_tidb.go b/server/driver_tidb.go index 7b25a998d618b..b37b76a75a889 100644 --- a/server/driver_tidb.go +++ b/server/driver_tidb.go @@ -522,13 +522,14 @@ func unwrapResultSet(rs ResultSet) ResultSet { func convertColumnInfo(fld *ast.ResultField) (ci *ColumnInfo) { ci = &ColumnInfo{ - Name: fld.ColumnAsName.O, - OrgName: fld.Column.Name.O, - Table: fld.TableAsName.O, - Schema: fld.DBName.O, - Flag: uint16(fld.Column.GetFlag()), - Charset: uint16(mysql.CharsetNameToID(fld.Column.GetCharset())), - Type: fld.Column.GetType(), + Name: fld.ColumnAsName.O, + OrgName: fld.Column.Name.O, + Table: fld.TableAsName.O, + Schema: fld.DBName.O, + Flag: uint16(fld.Column.GetFlag()), + Charset: uint16(mysql.CharsetNameToID(fld.Column.GetCharset())), + Type: fld.Column.GetType(), + DefaultValue: fld.Column.GetDefaultValue(), } if fld.Table != nil { diff --git a/server/driver_tidb_test.go b/server/driver_tidb_test.go index b56632937e078..35dd70f438982 100644 --- a/server/driver_tidb_test.go +++ b/server/driver_tidb_test.go @@ -28,18 +28,17 @@ import ( func createColumnByTypeAndLen(tp byte, cl uint32) *ColumnInfo { return &ColumnInfo{ - Schema: "test", - Table: "dual", - OrgTable: "", - Name: "a", - OrgName: "a", - ColumnLength: cl, - Charset: uint16(mysql.CharsetNameToID(charset.CharsetUTF8)), - Flag: uint16(mysql.UnsignedFlag), - Decimal: uint8(0), - Type: tp, - DefaultValueLength: uint64(0), - DefaultValue: nil, + Schema: "test", + Table: "dual", + OrgTable: "", + Name: "a", + OrgName: "a", + ColumnLength: cl, + Charset: uint16(mysql.CharsetNameToID(charset.CharsetUTF8)), + Flag: uint16(mysql.UnsignedFlag), + Decimal: uint8(0), + Type: tp, + DefaultValue: nil, } } func TestConvertColumnInfo(t *testing.T) { diff --git a/session/BUILD.bazel b/session/BUILD.bazel index d6404e04c4bbf..f5ef849b3a036 100644 --- a/session/BUILD.bazel +++ b/session/BUILD.bazel @@ -88,8 +88,8 @@ go_library( "//util/topsql", "//util/topsql/state", "//util/topsql/stmtstats", + "//util/tracing", "@com_github_ngaut_pools//:pools", - "@com_github_opentracing_opentracing_go//:opentracing-go", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/kvrpcpb", diff --git a/session/session.go b/session/session.go index f6403c67b258c..2d84f1c0fa858 100644 --- a/session/session.go +++ b/session/session.go @@ -29,7 +29,6 @@ import ( "math" "math/rand" "runtime/pprof" - "runtime/trace" "strconv" "strings" "sync" @@ -37,7 +36,6 @@ import ( "time" "github.com/ngaut/pools" - "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/kvrpcpb" @@ -105,6 +103,7 @@ import ( "github.com/pingcap/tidb/util/topsql" topsqlstate "github.com/pingcap/tidb/util/topsql/state" "github.com/pingcap/tidb/util/topsql/stmtstats" + "github.com/pingcap/tidb/util/tracing" "github.com/pingcap/tipb/go-binlog" tikverr "github.com/tikv/client-go/v2/error" tikvstore "github.com/tikv/client-go/v2/kv" @@ -956,11 +955,9 @@ func (s *session) doCommitWithRetry(ctx context.Context) error { var err error txnSize := s.txn.Size() isPessimistic := s.txn.IsPessimistic() - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("session.doCommitWitRetry", opentracing.ChildOf(span.Context())) - defer span1.Finish() - ctx = opentracing.ContextWithSpan(ctx, span1) - } + r, ctx := tracing.StartRegionEx(ctx, "session.doCommitWithRetry") + defer r.End() + err = s.doCommit(ctx) if err != nil { // polish the Write Conflict error message @@ -1083,11 +1080,8 @@ func (s *session) updateStatsDeltaToCollector() { } func (s *session) CommitTxn(ctx context.Context) error { - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("session.CommitTxn", opentracing.ChildOf(span.Context())) - defer span1.Finish() - ctx = opentracing.ContextWithSpan(ctx, span1) - } + r, ctx := tracing.StartRegionEx(ctx, "session.CommitTxn") + defer r.End() var commitDetail *tikvutil.CommitDetails ctx = context.WithValue(ctx, tikvutil.CommitDetailCtxKey, &commitDetail) @@ -1107,10 +1101,8 @@ func (s *session) CommitTxn(ctx context.Context) error { } func (s *session) RollbackTxn(ctx context.Context) { - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("session.RollbackTxn", opentracing.ChildOf(span.Context())) - defer span1.Finish() - } + r, ctx := tracing.StartRegionEx(ctx, "session.RollbackTxn") + defer r.End() if s.txn.Valid() { terror.Log(s.txn.Rollback()) @@ -1537,11 +1529,7 @@ func (s *session) GetTiDBTableValue(name string) (string, error) { var _ sqlexec.SQLParser = &session{} func (s *session) ParseSQL(ctx context.Context, sql string, params ...parser.ParseParam) ([]ast.StmtNode, []error, error) { - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("session.ParseSQL", opentracing.ChildOf(span.Context())) - defer span1.Finish() - } - defer trace.StartRegion(ctx, "ParseSQL").End() + defer tracing.StartRegion(ctx, "ParseSQL").End() p := parserPool.Get().(*parser.Parser) defer parserPool.Put(p) @@ -1655,12 +1643,9 @@ func (s *session) ExecuteInternal(ctx context.Context, sql string, args ...inter pprof.SetGoroutineLabels(ctx) }() - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("session.ExecuteInternal", opentracing.ChildOf(span.Context())) - defer span1.Finish() - ctx = opentracing.ContextWithSpan(ctx, span1) - logutil.Eventf(ctx, "execute: %s", sql) - } + r, ctx := tracing.StartRegionEx(ctx, "session.ExecuteInternal") + defer r.End() + logutil.Eventf(ctx, "execute: %s", sql) stmtNode, err := s.ParseWithParams(ctx, sql, args...) if err != nil { @@ -1680,12 +1665,9 @@ func (s *session) ExecuteInternal(ctx context.Context, sql string, args ...inter // Execute is deprecated, we can remove it as soon as plugins are migrated. func (s *session) Execute(ctx context.Context, sql string) (recordSets []sqlexec.RecordSet, err error) { - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("session.Execute", opentracing.ChildOf(span.Context())) - defer span1.Finish() - ctx = opentracing.ContextWithSpan(ctx, span1) - logutil.Eventf(ctx, "execute: %s", sql) - } + r, ctx := tracing.StartRegionEx(ctx, "session.Execute") + defer r.End() + logutil.Eventf(ctx, "execute: %s", sql) stmtNodes, err := s.Parse(ctx, sql) if err != nil { @@ -2123,16 +2105,12 @@ func (s *session) ExecuteInternalStmt(ctx context.Context, stmtNode ast.StmtNode } func (s *session) ExecuteStmt(ctx context.Context, stmtNode ast.StmtNode) (sqlexec.RecordSet, error) { - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("session.ExecuteStmt", opentracing.ChildOf(span.Context())) - defer span1.Finish() - ctx = opentracing.ContextWithSpan(ctx, span1) - } + r, ctx := tracing.StartRegionEx(ctx, "session.ExecuteStmt") + defer r.End() if err := s.PrepareTxnCtx(ctx); err != nil { return nil, err } - if err := s.loadCommonGlobalVariablesIfNeeded(); err != nil { return nil, err } @@ -2318,12 +2296,12 @@ func runStmt(ctx context.Context, se *session, s sqlexec.Statement) (rs sqlexec. } }) - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("session.runStmt", opentracing.ChildOf(span.Context())) - span1.LogKV("sql", s.OriginText()) - defer span1.Finish() - ctx = opentracing.ContextWithSpan(ctx, span1) + r, ctx := tracing.StartRegionEx(ctx, "session.runStmt") + defer r.End() + if r.Span != nil { + r.Span.LogKV("sql", s.OriginText()) } + se.SetValue(sessionctx.QueryString, s.OriginText()) if _, ok := s.(*executor.ExecStmt).StmtNode.(ast.DDLNode); ok { se.SetValue(sessionctx.LastExecuteDDL, true) @@ -4150,8 +4128,9 @@ func (s *session) EncodeSessionStates(ctx context.Context, sctx sessionctx.Conte sessionStates.SystemVars = make(map[string]string) for _, sv := range variable.GetSysVars() { switch { - case sv.Hidden, sv.HasNoneScope(), sv.HasInstanceScope(), !sv.HasSessionScope(): - // Hidden and none-scoped variables cannot be modified. + case sv.HasNoneScope(), sv.HasInstanceScope(), !sv.HasSessionScope(): + // Hidden attribute is deprecated. + // None-scoped variables cannot be modified. // Instance-scoped variables don't need to be encoded. // Noop variables should also be migrated even if they are noop. continue diff --git a/sessionctx/sessionstates/session_states_test.go b/sessionctx/sessionstates/session_states_test.go index 4d1541cc9443d..a4a92c55a4495 100644 --- a/sessionctx/sessionstates/session_states_test.go +++ b/sessionctx/sessionstates/session_states_test.go @@ -107,8 +107,9 @@ func TestSystemVars(t *testing.T) { }, { // hidden variable - inSessionStates: false, + inSessionStates: true, varName: variable.TiDBTxnReadTS, + expectedValue: "", }, { // none-scoped variable diff --git a/sessionctx/variable/BUILD.bazel b/sessionctx/variable/BUILD.bazel index c1d6e0b8b9651..60fec443c6a0a 100644 --- a/sessionctx/variable/BUILD.bazel +++ b/sessionctx/variable/BUILD.bazel @@ -96,10 +96,12 @@ go_test( "//parser/mysql", "//parser/terror", "//planner/core", + "//sessionctx/sessionstates", "//sessionctx/stmtctx", "//testkit", "//testkit/testsetup", "//types", + "//util", "//util/chunk", "//util/execdetails", "//util/gctuner", diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 036886457e323..64c79ea32646e 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -2320,16 +2320,16 @@ func (s *SessionVars) GetTemporaryTable(tblInfo *model.TableInfo) tableutil.Temp // EncodeSessionStates saves session states into SessionStates. func (s *SessionVars) EncodeSessionStates(ctx context.Context, sessionStates *sessionstates.SessionStates) (err error) { // Encode user-defined variables. + s.userVars.lock.RLock() sessionStates.UserVars = make(map[string]*types.Datum, len(s.userVars.values)) sessionStates.UserVarTypes = make(map[string]*ptypes.FieldType, len(s.userVars.types)) - s.userVars.lock.RLock() - defer s.userVars.lock.RUnlock() for name, userVar := range s.userVars.values { sessionStates.UserVars[name] = userVar.Clone() } for name, userVarType := range s.userVars.types { sessionStates.UserVarTypes[name] = userVarType.Clone() } + s.userVars.lock.RUnlock() // Encode other session contexts. sessionStates.PreparedStmtID = s.preparedStmtID @@ -2357,11 +2357,9 @@ func (s *SessionVars) EncodeSessionStates(ctx context.Context, sessionStates *se // DecodeSessionStates restores session states from SessionStates. func (s *SessionVars) DecodeSessionStates(ctx context.Context, sessionStates *sessionstates.SessionStates) (err error) { // Decode user-defined variables. - s.userVars.values = make(map[string]types.Datum, len(sessionStates.UserVars)) for name, userVar := range sessionStates.UserVars { s.SetUserVarVal(name, *userVar.Clone()) } - s.userVars.types = make(map[string]*ptypes.FieldType, len(sessionStates.UserVarTypes)) for name, userVarType := range sessionStates.UserVarTypes { s.SetUserVarType(name, userVarType.Clone()) } diff --git a/sessionctx/variable/session_test.go b/sessionctx/variable/session_test.go index 5df5e187088d0..62c94e91867e9 100644 --- a/sessionctx/variable/session_test.go +++ b/sessionctx/variable/session_test.go @@ -15,6 +15,8 @@ package variable_test import ( + "context" + "strconv" "sync" "testing" "time" @@ -25,10 +27,12 @@ import ( "github.com/pingcap/tidb/parser/auth" "github.com/pingcap/tidb/parser/mysql" plannercore "github.com/pingcap/tidb/planner/core" + "github.com/pingcap/tidb/sessionctx/sessionstates" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/testkit" "github.com/pingcap/tidb/types" + util2 "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/chunk" "github.com/pingcap/tidb/util/execdetails" "github.com/pingcap/tidb/util/mock" @@ -533,3 +537,35 @@ func TestPretectedTSList(t *testing.T) { require.Equal(t, uint64(0), lst.GetMinProtectedTS(1)) require.Equal(t, 0, lst.Size()) } + +func TestUserVarConcurrently(t *testing.T) { + sv := variable.NewSessionVars(nil) + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + var wg util2.WaitGroupWrapper + wg.Run(func() { + for i := 0; ; i++ { + select { + case <-time.After(time.Millisecond): + name := strconv.Itoa(i) + sv.SetUserVarVal(name, types.Datum{}) + sv.GetUserVarVal(name) + case <-ctx.Done(): + return + } + } + }) + wg.Run(func() { + for { + select { + case <-time.After(time.Millisecond): + var states sessionstates.SessionStates + require.NoError(t, sv.EncodeSessionStates(ctx, &states)) + require.NoError(t, sv.DecodeSessionStates(ctx, &states)) + case <-ctx.Done(): + return + } + } + }) + wg.Wait() + cancel() +} diff --git a/sessiontxn/isolation/BUILD.bazel b/sessiontxn/isolation/BUILD.bazel index 2f2cb5dce4e31..c89b245021655 100644 --- a/sessiontxn/isolation/BUILD.bazel +++ b/sessiontxn/isolation/BUILD.bazel @@ -27,7 +27,7 @@ go_library( "//sessiontxn/staleread", "//table/temptable", "//util/logutil", - "@com_github_opentracing_opentracing_go//:opentracing-go", + "//util/tracing", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_tikv_client_go_v2//error", diff --git a/sessiontxn/isolation/base.go b/sessiontxn/isolation/base.go index 41e0e40846aa3..dd1e2881b389e 100644 --- a/sessiontxn/isolation/base.go +++ b/sessiontxn/isolation/base.go @@ -18,7 +18,6 @@ import ( "context" "time" - "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/config" @@ -31,6 +30,7 @@ import ( "github.com/pingcap/tidb/sessiontxn/internal" "github.com/pingcap/tidb/sessiontxn/staleread" "github.com/pingcap/tidb/table/temptable" + "github.com/pingcap/tidb/util/tracing" "github.com/tikv/client-go/v2/oracle" ) @@ -474,11 +474,8 @@ func canReuseTxnWhenExplicitBegin(sctx sessionctx.Context) bool { // newOracleFuture creates new future according to the scope and the session context func newOracleFuture(ctx context.Context, sctx sessionctx.Context, scope string) oracle.Future { - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("isolation.newOracleFuture", opentracing.ChildOf(span.Context())) - defer span1.Finish() - ctx = opentracing.ContextWithSpan(ctx, span1) - } + r, ctx := tracing.StartRegionEx(ctx, "isolation.newOracleFuture") + defer r.End() failpoint.Inject("requestTsoFromPD", func() { sessiontxn.TsoRequestCountInc(sctx) diff --git a/store/copr/BUILD.bazel b/store/copr/BUILD.bazel index 42da13e86746a..9f187190bb70c 100644 --- a/store/copr/BUILD.bazel +++ b/store/copr/BUILD.bazel @@ -32,10 +32,10 @@ go_library( "//util/memory", "//util/paging", "//util/tiflashcompute", + "//util/tracing", "//util/trxevents", "@com_github_dgraph_io_ristretto//:ristretto", "@com_github_gogo_protobuf//proto", - "@com_github_opentracing_opentracing_go//:opentracing-go", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/coprocessor", diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index eca0b8037daa6..573f746a6b0d3 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -26,7 +26,6 @@ import ( "unsafe" "github.com/gogo/protobuf/proto" - "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/coprocessor" @@ -46,6 +45,7 @@ import ( "github.com/pingcap/tidb/util/mathutil" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/paging" + "github.com/pingcap/tidb/util/tracing" "github.com/pingcap/tidb/util/trxevents" "github.com/pingcap/tipb/go-tipb" "github.com/tikv/client-go/v2/metrics" @@ -402,7 +402,13 @@ func buildCopTasks(bo *Backoffer, ranges *KeyRanges, opt *buildCopTaskOpt) ([]*c } i = nextI if req.Paging.Enable { - pagingSize = paging.GrowPagingSize(pagingSize, req.Paging.MaxPagingSize) + if req.LimitSize != 0 && req.LimitSize < pagingSize { + // disable paging for small limit. + task.paging = false + task.pagingSize = 0 + } else { + pagingSize = paging.GrowPagingSize(pagingSize, req.Paging.MaxPagingSize) + } } } } @@ -419,11 +425,7 @@ func buildCopTasks(bo *Backoffer, ranges *KeyRanges, opt *buildCopTaskOpt) ([]*c zap.Int("task len", len(tasks))) } if elapsed > time.Millisecond { - ctx := bo.GetCtx() - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("copr.buildCopTasks", opentracing.ChildOf(span.Context()), opentracing.StartTime(start)) - defer span1.Finish() - } + defer tracing.StartRegion(bo.GetCtx(), "copr.buildCopTasks").End() } metrics.TxnRegionsNumHistogramWithCoprocessor.Observe(float64(builder.regionNum())) return tasks, nil diff --git a/store/copr/coprocessor_test.go b/store/copr/coprocessor_test.go index f7b15ebfd682d..c94d441932d8c 100644 --- a/store/copr/coprocessor_test.go +++ b/store/copr/coprocessor_test.go @@ -514,8 +514,6 @@ func TestBuildPagingTasks(t *testing.T) { req := &kv.Request{} req.Paging.Enable = true req.Paging.MinPagingSize = paging.MinPagingSize - flashReq := &kv.Request{} - flashReq.StoreType = kv.TiFlash tasks, err := buildTestCopTasks(bo, cache, buildCopRanges("a", "c"), req, nil) require.NoError(t, err) require.Len(t, tasks, 1) @@ -525,6 +523,37 @@ func TestBuildPagingTasks(t *testing.T) { require.Equal(t, tasks[0].pagingSize, paging.MinPagingSize) } +func TestBuildPagingTasksDisablePagingForSmallLimit(t *testing.T) { + mockClient, cluster, pdClient, err := testutils.NewMockTiKV("", nil) + require.NoError(t, err) + defer func() { + pdClient.Close() + err = mockClient.Close() + require.NoError(t, err) + }() + _, regionIDs, _ := testutils.BootstrapWithMultiRegions(cluster, []byte("g"), []byte("n"), []byte("t")) + + pdCli := tikv.NewCodecPDClient(tikv.ModeTxn, pdClient) + defer pdCli.Close() + + cache := NewRegionCache(tikv.NewRegionCache(pdCli)) + defer cache.Close() + + bo := backoff.NewBackofferWithVars(context.Background(), 3000, nil) + + req := &kv.Request{} + req.Paging.Enable = true + req.Paging.MinPagingSize = paging.MinPagingSize + req.LimitSize = 1 + tasks, err := buildTestCopTasks(bo, cache, buildCopRanges("a", "c"), req, nil) + require.NoError(t, err) + require.Len(t, tasks, 1) + require.Len(t, tasks, 1) + taskEqual(t, tasks[0], regionIDs[0], 0, "a", "c") + require.False(t, tasks[0].paging) + require.Equal(t, tasks[0].pagingSize, uint64(0)) +} + func toCopRange(r kv.KeyRange) *coprocessor.KeyRange { coprRange := coprocessor.KeyRange{} coprRange.Start = r.StartKey diff --git a/store/driver/txn/BUILD.bazel b/store/driver/txn/BUILD.bazel index f6e5e46014a97..437fd464b8c81 100644 --- a/store/driver/txn/BUILD.bazel +++ b/store/driver/txn/BUILD.bazel @@ -26,7 +26,7 @@ go_library( "//types", "//util", "//util/logutil", - "@com_github_opentracing_opentracing_go//:opentracing-go", + "//util/tracing", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_kvproto//pkg/kvrpcpb", diff --git a/store/driver/txn/txn_driver.go b/store/driver/txn/txn_driver.go index 1892b6674032c..6ac8700c3cbc4 100644 --- a/store/driver/txn/txn_driver.go +++ b/store/driver/txn/txn_driver.go @@ -19,7 +19,6 @@ import ( "context" "sync/atomic" - "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" @@ -30,6 +29,7 @@ import ( "github.com/pingcap/tidb/store/driver/options" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/logutil" + "github.com/pingcap/tidb/util/tracing" tikverr "github.com/tikv/client-go/v2/error" tikvstore "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/tikv" @@ -165,11 +165,8 @@ func (txn *tikvTxn) IterReverse(k kv.Key) (iter kv.Iterator, err error) { // Do not use len(value) == 0 or value == nil to represent non-exist. // If a key doesn't exist, there shouldn't be any corresponding entry in the result map. func (txn *tikvTxn) BatchGet(ctx context.Context, keys []kv.Key) (map[string][]byte, error) { - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("tikvTxn.BatchGet", opentracing.ChildOf(span.Context())) - defer span1.Finish() - ctx = opentracing.ContextWithSpan(ctx, span1) - } + r, ctx := tracing.StartRegionEx(ctx, "tikvTxn.BatchGet") + defer r.End() return NewBufferBatchGetter(txn.GetMemBuffer(), nil, txn.GetSnapshot()).BatchGet(ctx, keys) } diff --git a/table/BUILD.bazel b/table/BUILD.bazel index a1f2feab60722..e1cf80e5e90fa 100644 --- a/table/BUILD.bazel +++ b/table/BUILD.bazel @@ -29,7 +29,7 @@ go_library( "//util/logutil", "//util/sqlexec", "//util/timeutil", - "@com_github_opentracing_opentracing_go//:opentracing-go", + "//util/tracing", "@com_github_pingcap_errors//:errors", "@org_uber_go_zap//:zap", ], diff --git a/table/table.go b/table/table.go index 813131df90896..8b316a048be55 100644 --- a/table/table.go +++ b/table/table.go @@ -22,7 +22,6 @@ import ( "context" "time" - "github.com/opentracing/opentracing-go" mysql "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta/autoid" @@ -31,6 +30,7 @@ import ( "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/sqlexec" + "github.com/pingcap/tidb/util/tracing" ) // Type is used to distinguish between different tables that store data in different ways. @@ -200,10 +200,8 @@ type Table interface { // AllocAutoIncrementValue allocates an auto_increment value for a new row. func AllocAutoIncrementValue(ctx context.Context, t Table, sctx sessionctx.Context) (int64, error) { - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("table.AllocAutoIncrementValue", opentracing.ChildOf(span.Context())) - defer span1.Finish() - } + r, ctx := tracing.StartRegionEx(ctx, "table.AllocAutoIncrementValue") + defer r.End() increment := sctx.GetSessionVars().AutoIncrementIncrement offset := sctx.GetSessionVars().AutoIncrementOffset alloc := t.Allocators(sctx).Get(autoid.AutoIncrementType) diff --git a/table/tables/BUILD.bazel b/table/tables/BUILD.bazel index a6e9bec521355..3aa8362fbaf46 100644 --- a/table/tables/BUILD.bazel +++ b/table/tables/BUILD.bazel @@ -46,8 +46,8 @@ go_library( "//util/sqlexec", "//util/stringutil", "//util/tableutil", + "//util/tracing", "@com_github_google_btree//:btree", - "@com_github_opentracing_opentracing_go//:opentracing-go", "@com_github_pingcap_errors//:errors", "@com_github_pingcap_failpoint//:failpoint", "@com_github_pingcap_log//:log", diff --git a/table/tables/index.go b/table/tables/index.go index 2636a82bffee4..57c6498e698fd 100644 --- a/table/tables/index.go +++ b/table/tables/index.go @@ -19,7 +19,6 @@ import ( "errors" "sync" - "github.com/opentracing/opentracing-go" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" @@ -29,6 +28,7 @@ import ( "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/rowcodec" + "github.com/pingcap/tidb/util/tracing" ) // index is the data structure for index data in the KV store. @@ -170,11 +170,9 @@ func (c *index) Create(sctx sessionctx.Context, txn kv.Transaction, indexedValue indexedValues := c.getIndexedValue(indexedValue) ctx := opt.Ctx if ctx != nil { - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("index.Create", opentracing.ChildOf(span.Context())) - defer span1.Finish() - ctx = opentracing.ContextWithSpan(ctx, span1) - } + var r tracing.Region + r, ctx = tracing.StartRegionEx(ctx, "index.Create") + defer r.End() } else { ctx = context.TODO() } diff --git a/table/tables/tables.go b/table/tables/tables.go index 709318a42332c..bbdbffdd893fc 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -27,7 +27,6 @@ import ( "sync" "time" - "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/kv" @@ -50,6 +49,7 @@ import ( "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/stringutil" "github.com/pingcap/tidb/util/tableutil" + "github.com/pingcap/tidb/util/tracing" "github.com/pingcap/tipb/go-binlog" "github.com/pingcap/tipb/go-tipb" "go.uber.org/zap" @@ -700,11 +700,9 @@ func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts . var ctx context.Context if opt.Ctx != nil { ctx = opt.Ctx - if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { - span1 := span.Tracer().StartSpan("table.AddRecord", opentracing.ChildOf(span.Context())) - defer span1.Finish() - ctx = opentracing.ContextWithSpan(ctx, span1) - } + var r tracing.Region + r, ctx = tracing.StartRegionEx(ctx, "table.AddRecord") + defer r.End() } else { ctx = context.Background() } diff --git a/telemetry/data_feature_usage.go b/telemetry/data_feature_usage.go index 8661ce13ecccb..1fe696870c291 100644 --- a/telemetry/data_feature_usage.go +++ b/telemetry/data_feature_usage.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/sqlexec" "github.com/tikv/client-go/v2/metrics" + "go.uber.org/zap" ) // emptyClusterIndexUsage is empty ClusterIndexUsage, deprecated. @@ -81,7 +82,7 @@ func getFeatureUsage(ctx context.Context, sctx sessionctx.Context) (*featureUsag var err error usage.NewClusterIndex, usage.ClusterIndex, err = getClusterIndexUsageInfo(ctx, sctx) if err != nil { - logutil.BgLogger().Info(err.Error()) + logutil.BgLogger().Info("Failed to get feature usage", zap.Error(err)) return nil, err } diff --git a/telemetry/data_slow_query.go b/telemetry/data_slow_query.go index 791b5f9b51bbe..b686db8d41105 100644 --- a/telemetry/data_slow_query.go +++ b/telemetry/data_slow_query.go @@ -64,7 +64,7 @@ var ( func getSlowQueryStats() (*slowQueryStats, error) { slowQueryBucket, err := getSlowQueryBucket() if err != nil { - logutil.BgLogger().Info(err.Error()) + logutil.BgLogger().Info("Failed to get Slow Query Stats", zap.Error(err)) return nil, err } diff --git a/telemetry/data_window.go b/telemetry/data_window.go index b0be7b841cbea..49a4037591b16 100644 --- a/telemetry/data_window.go +++ b/telemetry/data_window.go @@ -25,6 +25,7 @@ import ( promv1 "github.com/prometheus/client_golang/api/prometheus/v1" pmodel "github.com/prometheus/common/model" "go.uber.org/atomic" + "go.uber.org/zap" ) var ( @@ -253,7 +254,8 @@ func RotateSubWindow() { err := readSQLMetric(time.Now(), &thisSubWindow.SQLUsage) if err != nil { - logutil.BgLogger().Info("Error exists when getting the SQL Metric.") + logutil.BgLogger().Info("Error exists when getting the SQL Metric.", + zap.Error(err)) } thisSubWindow.SQLUsage.SQLTotal = getSQLSum(&thisSubWindow.SQLUsage.SQLType) diff --git a/tidb-server/main.go b/tidb-server/main.go index 3d73fa111ab8c..9c92c2d2a558c 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -476,7 +476,6 @@ func overrideConfig(cfg *config.Config) { cfg.Port = uint(p) } if actualFlags[nmCors] { - fmt.Println(cors) cfg.Cors = *cors } if actualFlags[nmStore] { diff --git a/ttl/ttlworker/job_manager.go b/ttl/ttlworker/job_manager.go index 5f8b7bd038fc4..919cc56e7c8da 100644 --- a/ttl/ttlworker/job_manager.go +++ b/ttl/ttlworker/job_manager.go @@ -153,7 +153,7 @@ func (m *JobManager) jobLoop() error { scheduleTaskTicker := time.Tick(getTaskManagerLoopTickerInterval()) updateTaskHeartBeatTicker := time.Tick(ttlTaskHeartBeatTickerInterval) - taskCheckTicker := time.Tick(getTaskManagerLoopTickerInterval()) + taskCheckTicker := time.Tick(time.Second * 5) checkScanTaskFinishedTicker := time.Tick(getTaskManagerLoopTickerInterval()) cmdWatcher := m.cmdCli.WatchCommand(m.ctx) @@ -535,6 +535,7 @@ func (m *JobManager) couldTrySchedule(tableStatus *cache.TableStatus, table *cac // It could be nil, nil, if the table query doesn't return error but the job has been locked by other instances. func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table *cache.PhysicalTable, now time.Time, ignoreScheduleInterval bool) (*ttlJob, error) { var expireTime time.Time + var jobID string err := se.RunInTxn(ctx, func() error { sql, args := cache.SelectFromTTLTableStatusWithID(table.ID) @@ -574,7 +575,7 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table * return err } - jobID := uuid.New().String() + jobID = uuid.New().String() jobExist := false if len(tableStatus.CurrentJobID) > 0 { // don't create new job if there is already one running @@ -629,7 +630,7 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table * return nil, err } - job := m.createNewJob(expireTime, now, table) + job := m.createNewJob(jobID, expireTime, now, table) // job is created, notify every scan managers to fetch new tasks err = m.notificationCli.Notify(m.ctx, scanTaskNotificationType, job.id) @@ -639,9 +640,7 @@ func (m *JobManager) lockNewJob(ctx context.Context, se session.Session, table * return job, nil } -func (m *JobManager) createNewJob(expireTime time.Time, now time.Time, table *cache.PhysicalTable) *ttlJob { - id := m.tableStatusCache.Tables[table.ID].CurrentJobID - +func (m *JobManager) createNewJob(id string, expireTime time.Time, now time.Time, table *cache.PhysicalTable) *ttlJob { return &ttlJob{ id: id, ownerID: m.id, diff --git a/ttl/ttlworker/scan.go b/ttl/ttlworker/scan.go index ac1c1cd85ab50..4cf3d919d9545 100644 --- a/ttl/ttlworker/scan.go +++ b/ttl/ttlworker/scan.go @@ -166,7 +166,7 @@ func (t *ttlScanTask) doScan(ctx context.Context, delCh chan<- *ttlDeleteTask, s zap.String("SQL", sql), zap.Int("retryTimes", retryTimes), zap.Bool("needRetry", needRetry), - zap.Error(err), + zap.Error(sqlErr), ) if !needRetry { diff --git a/util/tracing/util.go b/util/tracing/util.go index 34953c72482a6..924e2eb039f44 100644 --- a/util/tracing/util.go +++ b/util/tracing/util.go @@ -16,6 +16,7 @@ package tracing import ( "context" + "runtime/trace" "github.com/opentracing/basictracer-go" "github.com/opentracing/opentracing-go" @@ -66,3 +67,46 @@ func ChildSpanFromContxt(ctx context.Context, opName string) (opentracing.Span, } return noopSpan(), ctx } + +// StartRegion provides better API, integrating both opentracing and runtime.trace facilities into one. +// Recommended usage is +// +// defer tracing.StartRegion(ctx, "myTracedRegion").End() +func StartRegion(ctx context.Context, regionType string) Region { + r := trace.StartRegion(ctx, regionType) + var span1 opentracing.Span + if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { + span1 = span.Tracer().StartSpan(regionType, opentracing.ChildOf(span.Context())) + } + return Region{ + Region: r, + Span: span1, + } +} + +// StartRegionEx returns Region together with the context. +// Recommended usage is +// +// r, ctx := tracing.StartRegionEx(ctx, "myTracedRegion") +// defer r.End() +func StartRegionEx(ctx context.Context, regionType string) (Region, context.Context) { + r := StartRegion(ctx, regionType) + if r.Span != nil { + ctx = opentracing.ContextWithSpan(ctx, r.Span) + } + return r, ctx +} + +// Region is a region of code whose execution time interval is traced. +type Region struct { + *trace.Region + opentracing.Span +} + +// End marks the end of the traced code region. +func (r Region) End() { + if r.Span != nil { + r.Span.Finish() + } + r.Region.End() +}