From 17a9fc4adf73924dc406393725eb11418358564f Mon Sep 17 00:00:00 2001 From: Hangjie Mo Date: Fri, 1 Apr 2022 11:02:30 +0800 Subject: [PATCH 1/7] executor: improve UnionScanRead performance (#32668) ref pingcap/tidb#32433 --- executor/mem_reader.go | 31 +++++++++++++++++++------------ tablecodec/tablecodec.go | 16 +++++++++++++++- util/rowcodec/decoder.go | 5 +++-- 3 files changed, 37 insertions(+), 15 deletions(-) diff --git a/executor/mem_reader.go b/executor/mem_reader.go index 6912a4fb1adfb..1c4b29290b325 100644 --- a/executor/mem_reader.go +++ b/executor/mem_reader.go @@ -180,6 +180,7 @@ type memTableReader struct { buffer allocBuf pkColIDs []int64 cacheTable kv.MemBuffer + offsets []int } type allocBuf struct { @@ -240,18 +241,25 @@ func (m *memTableReader) getMemRows(ctx context.Context) ([][]types.Datum, error opentracing.ContextWithSpan(ctx, span1) } mutableRow := chunk.MutRowFromTypes(m.retFieldTypes) + resultRows := make([]types.Datum, len(m.columns)) + m.offsets = make([]int, len(m.columns)) + for i, col := range m.columns { + m.offsets[i] = m.colIDs[col.ID] + } err := iterTxnMemBuffer(m.ctx, m.cacheTable, m.kvRanges, func(key, value []byte) error { - row, err := m.decodeRecordKeyValue(key, value) + var err error + resultRows, err = m.decodeRecordKeyValue(key, value, &resultRows) if err != nil { return err } - mutableRow.SetDatums(row...) + mutableRow.SetDatums(resultRows...) matched, _, err := expression.EvalBool(m.ctx, m.conditions, mutableRow.ToRow()) if err != nil || !matched { return err } - m.addedRows = append(m.addedRows, row) + m.addedRows = append(m.addedRows, resultRows) + resultRows = make([]types.Datum, len(m.columns)) return nil }) if err != nil { @@ -265,30 +273,29 @@ func (m *memTableReader) getMemRows(ctx context.Context) ([][]types.Datum, error return m.addedRows, nil } -func (m *memTableReader) decodeRecordKeyValue(key, value []byte) ([]types.Datum, error) { +func (m *memTableReader) decodeRecordKeyValue(key, value []byte, resultRows *[]types.Datum) ([]types.Datum, error) { handle, err := tablecodec.DecodeRowKey(key) if err != nil { return nil, errors.Trace(err) } - return m.decodeRowData(handle, value) + return m.decodeRowData(handle, value, resultRows) } // decodeRowData uses to decode row data value. -func (m *memTableReader) decodeRowData(handle kv.Handle, value []byte) ([]types.Datum, error) { +func (m *memTableReader) decodeRowData(handle kv.Handle, value []byte, resultRows *[]types.Datum) ([]types.Datum, error) { values, err := m.getRowData(handle, value) if err != nil { return nil, err } - ds := make([]types.Datum, 0, len(m.columns)) - for _, col := range m.columns { - offset := m.colIDs[col.ID] - d, err := tablecodec.DecodeColumnValue(values[offset], &col.FieldType, m.ctx.GetSessionVars().Location()) + for i, col := range m.columns { + var datum types.Datum + err := tablecodec.DecodeColumnValueWithDatum(values[m.offsets[i]], &col.FieldType, m.ctx.GetSessionVars().Location(), &datum) if err != nil { return nil, err } - ds = append(ds, d) + (*resultRows)[i] = datum } - return ds, nil + return *resultRows, nil } // getRowData decodes raw byte slice to row data. diff --git a/tablecodec/tablecodec.go b/tablecodec/tablecodec.go index 54445d3e95e19..25bdb2c1c00db 100644 --- a/tablecodec/tablecodec.go +++ b/tablecodec/tablecodec.go @@ -375,6 +375,20 @@ func DecodeColumnValue(data []byte, ft *types.FieldType, loc *time.Location) (ty return colDatum, nil } +// DecodeColumnValueWithDatum decodes data to an existing Datum according to the column info. +func DecodeColumnValueWithDatum(data []byte, ft *types.FieldType, loc *time.Location, result *types.Datum) error { + var err error + _, *result, err = codec.DecodeOne(data) + if err != nil { + return errors.Trace(err) + } + *result, err = Unflatten(*result, ft, loc) + if err != nil { + return errors.Trace(err) + } + return nil +} + // DecodeRowWithMapNew decode a row to datum map. func DecodeRowWithMapNew(b []byte, cols map[int64]*types.FieldType, loc *time.Location, row map[int64]types.Datum) (map[int64]types.Datum, error) { @@ -401,7 +415,7 @@ func DecodeRowWithMapNew(b []byte, cols map[int64]*types.FieldType, return rd.DecodeToDatumMap(b, row) } -// DecodeRowWithMap decodes a byte slice into datums with a existing row map. +// DecodeRowWithMap decodes a byte slice into datums with an existing row map. // Row layout: colID1, value1, colID2, value2, ..... func DecodeRowWithMap(b []byte, cols map[int64]*types.FieldType, loc *time.Location, row map[int64]types.Datum) (map[int64]types.Datum, error) { if row == nil { diff --git a/util/rowcodec/decoder.go b/util/rowcodec/decoder.go index 6fee2c56bf16d..6bccac22214be 100644 --- a/util/rowcodec/decoder.go +++ b/util/rowcodec/decoder.go @@ -15,6 +15,7 @@ package rowcodec import ( + "encoding/binary" "fmt" "time" @@ -452,7 +453,7 @@ func (decoder *BytesDecoder) tryDecodeHandle(values [][]byte, offset int, col *C return false } -// DecodeToBytesNoHandle decodes raw byte slice to row dat without handle. +// DecodeToBytesNoHandle decodes raw byte slice to row data without handle. func (decoder *BytesDecoder) DecodeToBytesNoHandle(outputOffset map[int64]int, value []byte) ([][]byte, error) { return decoder.decodeToBytesInternal(outputOffset, nil, value, nil) } @@ -463,7 +464,7 @@ func (decoder *BytesDecoder) DecodeToBytes(outputOffset map[int64]int, handle kv } func (decoder *BytesDecoder) encodeOldDatum(tp byte, val []byte) []byte { - var buf []byte + buf := make([]byte, 0, 1+binary.MaxVarintLen64+len(val)) switch tp { case BytesFlag: buf = append(buf, CompactBytesFlag) From f5965a1609541a0e60a794160dd47d1266552153 Mon Sep 17 00:00:00 2001 From: Yujie Xia Date: Fri, 1 Apr 2022 11:32:30 +0800 Subject: [PATCH 2/7] lightning: fix test lightning_error_summary (#33616) close pingcap/tidb#33614 --- br/pkg/lightning/restore/restore.go | 2 +- br/pkg/lightning/restore/table_restore_test.go | 10 +++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 12aa146d78e80..d181b0882f209 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -972,7 +972,7 @@ func (rc *Controller) saveStatusCheckpoint(ctx context.Context, tableName string switch { case err == nil: break - case utils.IsRetryableError(err), utils.MessageIsRetryableStorageError(err.Error()), common.IsContextCanceledError(err): + case utils.MessageIsRetryableStorageError(err.Error()), common.IsContextCanceledError(err): // recoverable error, should not be recorded in checkpoint // which will prevent lightning from automatically recovering return nil diff --git a/br/pkg/lightning/restore/table_restore_test.go b/br/pkg/lightning/restore/table_restore_test.go index 7a7695be69dfd..77ae288ab3e30 100644 --- a/br/pkg/lightning/restore/table_restore_test.go +++ b/br/pkg/lightning/restore/table_restore_test.go @@ -18,7 +18,6 @@ import ( "context" "encoding/json" "fmt" - "net" "net/http" "net/http/httptest" "os" @@ -996,9 +995,14 @@ func (s *tableRestoreSuite) TestSaveStatusCheckpoint() { require.NoError(s.T(), err) require.Equal(s.T(), 0, len(rc.errorSummaries.summary)) - err = rc.saveStatusCheckpoint(context.Background(), common.UniqueTable("test", "tbl"), indexEngineID, &net.DNSError{IsTimeout: true}, checkpoints.CheckpointStatusImported) + err = rc.saveStatusCheckpoint( + context.Background(), + common.UniqueTable("test", "tbl"), indexEngineID, + common.ErrChecksumMismatch.GenWithStackByArgs(0, 0, 0, 0, 0, 0), + checkpoints.CheckpointStatusImported, + ) require.NoError(s.T(), err) - require.Equal(s.T(), 0, len(rc.errorSummaries.summary)) + require.Equal(s.T(), 1, len(rc.errorSummaries.summary)) start := time.Now() err = rc.saveStatusCheckpoint(context.Background(), common.UniqueTable("test", "tbl"), indexEngineID, nil, checkpoints.CheckpointStatusImported) From 6fe68d6764c8b701306e3835d9a2afacc13393e6 Mon Sep 17 00:00:00 2001 From: Mattias Jonsson Date: Fri, 1 Apr 2022 04:56:30 +0100 Subject: [PATCH 3/7] ddl: increasing value of partition by range columns fix (#32748) (#33643) close pingcap/tidb#32748 --- ddl/db_partition_test.go | 15 +++++++++++++++ ddl/ddl_api.go | 5 ----- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/ddl/db_partition_test.go b/ddl/db_partition_test.go index 0244f0bd54048..74c2236c92f50 100644 --- a/ddl/db_partition_test.go +++ b/ddl/db_partition_test.go @@ -545,6 +545,13 @@ create table log_message_1 ( "partition p1 values less than ('G'));", dbterror.ErrRangeNotIncreasing, }, + { + "create table t(a char(10) collate utf8mb4_bin) " + + "partition by range columns (a) (" + + "partition p0 values less than ('g'), " + + "partition p1 values less than ('A'));", + dbterror.ErrRangeNotIncreasing, + }, { "CREATE TABLE t1(c0 INT) PARTITION BY HASH((NOT c0)) PARTITIONS 2;", dbterror.ErrPartitionFunctionIsNotAllowed, @@ -617,6 +624,14 @@ create table log_message_1 ( partition p0 values less than ('a'), partition p1 values less than ('G'));`) + tk.MustExec("drop table if exists t;") + tk.MustExec(`create table t (a varchar(255) charset utf8mb4 collate utf8mb4_bin) ` + + `partition by range columns (a) ` + + `(partition pnull values less than (""),` + + `partition puppera values less than ("AAA"),` + + `partition plowera values less than ("aaa"),` + + `partition pmax values less than (MAXVALUE))`) + tk.MustExec("drop table if exists t;") tk.MustExec(`create table t(a int) partition by range columns (a) ( partition p0 values less than (10), diff --git a/ddl/ddl_api.go b/ddl/ddl_api.go index 169f7a6b309ef..9f2ca1f31cc8c 100644 --- a/ddl/ddl_api.go +++ b/ddl/ddl_api.go @@ -2681,11 +2681,6 @@ func checkTwoRangeColumns(ctx sessionctx.Context, curr, prev *model.PartitionDef return false, nil } - // Current and previous is the same. - if strings.EqualFold(curr.LessThan[i], prev.LessThan[i]) { - continue - } - // The tuples of column values used to define the partitions are strictly increasing: // PARTITION p0 VALUES LESS THAN (5,10,'ggg') // PARTITION p1 VALUES LESS THAN (10,20,'mmm') From 84e9698cc5f5bae63a27722d731374c0788786d9 Mon Sep 17 00:00:00 2001 From: crazycs Date: Fri, 1 Apr 2022 12:10:29 +0800 Subject: [PATCH 4/7] topsql: fix data race in test (#33617) close pingcap/tidb#33613 --- ddl/ddl_worker.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index e440d33591d93..ba9b8cc137492 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -466,6 +466,7 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) { updateRawArgs = false } w.writeDDLSeqNum(job) + w.jobContext.resetWhenJobFinish() err = t.AddHistoryDDLJob(job, updateRawArgs) return errors.Trace(err) } @@ -520,17 +521,16 @@ func newMetaWithQueueTp(txn kv.Transaction, tp workerType) *meta.Meta { func (w *jobContext) setDDLLabelForTopSQL(job *model.Job) { if !topsqlstate.TopSQLEnabled() || job == nil { - w.cacheDigest = nil - w.ddlJobCtx = context.Background() return } if job.Query != w.cacheSQL || w.cacheDigest == nil { w.cacheNormalizedSQL, w.cacheDigest = parser.NormalizeDigest(job.Query) w.cacheSQL = job.Query + w.ddlJobCtx = topsql.AttachSQLInfo(context.Background(), w.cacheNormalizedSQL, w.cacheDigest, "", nil, false) + } else { + topsql.AttachSQLInfo(w.ddlJobCtx, w.cacheNormalizedSQL, w.cacheDigest, "", nil, false) } - - w.ddlJobCtx = topsql.AttachSQLInfo(context.Background(), w.cacheNormalizedSQL, w.cacheDigest, "", nil, false) } func (w *jobContext) getResourceGroupTaggerForTopSQL() tikvrpc.ResourceGroupTagger { @@ -546,6 +546,13 @@ func (w *jobContext) getResourceGroupTaggerForTopSQL() tikvrpc.ResourceGroupTagg return tagger } +func (w *jobContext) resetWhenJobFinish() { + w.ddlJobCtx = context.Background() + w.cacheSQL = "" + w.cacheDigest = nil + w.cacheNormalizedSQL = "" +} + // handleDDLJobQueue handles DDL jobs in DDL Job queue. func (w *worker) handleDDLJobQueue(d *ddlCtx) error { once := true From 94c57b3049fe55256678d6d7e36f253efdfc7978 Mon Sep 17 00:00:00 2001 From: tison Date: Fri, 1 Apr 2022 12:48:29 +0800 Subject: [PATCH 5/7] executor: migrate test-infra to testify for executor_test.go testSlowQuerySuite (#33561) close pingcap/tidb#33439 --- executor/executor_legacy_test.go | 147 -------------------------- executor/slow_query_sql_test.go | 173 +++++++++++++++++++++++++++++++ executor/slow_query_test.go | 28 ++--- 3 files changed, 187 insertions(+), 161 deletions(-) create mode 100644 executor/slow_query_sql_test.go diff --git a/executor/executor_legacy_test.go b/executor/executor_legacy_test.go index 311d133173112..1c7333ecfacf5 100644 --- a/executor/executor_legacy_test.go +++ b/executor/executor_legacy_test.go @@ -73,7 +73,6 @@ import ( "github.com/pingcap/tidb/util/deadlockhistory" "github.com/pingcap/tidb/util/gcutil" "github.com/pingcap/tidb/util/israce" - "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" "github.com/pingcap/tidb/util/mock" "github.com/pingcap/tidb/util/rowcodec" @@ -111,7 +110,6 @@ var _ = SerialSuites(&testRecoverTable{}) var _ = SerialSuites(&testClusterTableSuite{}) var _ = SerialSuites(&testSplitTable{&baseTestSuite{}}) var _ = SerialSuites(&testSerialSuite1{&baseTestSuite{}}) -var _ = SerialSuites(&testSlowQuery{&baseTestSuite{}}) var _ = SerialSuites(&globalIndexSuite{&baseTestSuite{}}) var _ = SerialSuites(&testSerialSuite{&baseTestSuite{}}) var _ = SerialSuites(&testCoprCache{}) @@ -121,7 +119,6 @@ type testSuite struct{ *baseTestSuite } type testSuiteP1 struct{ *baseTestSuite } type testSuiteP2 struct{ *baseTestSuite } type testSplitTable struct{ *baseTestSuite } -type testSlowQuery struct{ *baseTestSuite } type globalIndexSuite struct{ *baseTestSuite } type testSerialSuite struct{ *baseTestSuite } type testCoprCache struct { @@ -4794,150 +4791,6 @@ func (s *testSuite) TestGenerateColumnReplace(c *C) { tk.MustQuery("select * from t1").Check(testkit.Rows("3 4")) } -func (s *testSlowQuery) TestSlowQueryWithoutSlowLog(c *C) { - tk := testkit.NewTestKit(c, s.store) - originCfg := config.GetGlobalConfig() - newCfg := *originCfg - newCfg.Log.SlowQueryFile = "tidb-slow-not-exist.log" - newCfg.Log.SlowThreshold = math.MaxUint64 - config.StoreGlobalConfig(&newCfg) - defer func() { - config.StoreGlobalConfig(originCfg) - }() - tk.MustQuery("select query from information_schema.slow_query").Check(testkit.Rows()) - tk.MustQuery("select query from information_schema.slow_query where time > '2020-09-15 12:16:39' and time < now()").Check(testkit.Rows()) -} - -func (s *testSlowQuery) TestSlowQuerySensitiveQuery(c *C) { - tk := testkit.NewTestKit(c, s.store) - originCfg := config.GetGlobalConfig() - newCfg := *originCfg - - f, err := os.CreateTemp("", "tidb-slow-*.log") - c.Assert(err, IsNil) - f.Close() - newCfg.Log.SlowQueryFile = f.Name() - config.StoreGlobalConfig(&newCfg) - defer func() { - tk.MustExec("set tidb_slow_log_threshold=300;") - config.StoreGlobalConfig(originCfg) - err = os.Remove(newCfg.Log.SlowQueryFile) - c.Assert(err, IsNil) - }() - err = logutil.InitLogger(newCfg.Log.ToLogConfig()) - c.Assert(err, IsNil) - - tk.MustExec("set tidb_slow_log_threshold=0;") - tk.MustExec("drop user if exists user_sensitive;") - tk.MustExec("create user user_sensitive identified by '123456789';") - tk.MustExec("alter user 'user_sensitive'@'%' identified by 'abcdefg';") - tk.MustExec("set password for 'user_sensitive'@'%' = 'xyzuvw';") - tk.MustQuery("select query from `information_schema`.`slow_query` " + - "where (query like 'set password%' or query like 'create user%' or query like 'alter user%') " + - "and query like '%user_sensitive%' order by query;"). - Check(testkit.Rows( - "alter user {user_sensitive@% password = ***};", - "create user {user_sensitive@% password = ***};", - "set password for user user_sensitive@%;", - )) -} - -func (s *testSlowQuery) TestSlowQueryPrepared(c *C) { - tk := testkit.NewTestKit(c, s.store) - originCfg := config.GetGlobalConfig() - newCfg := *originCfg - - f, err := os.CreateTemp("", "tidb-slow-*.log") - c.Assert(err, IsNil) - f.Close() - newCfg.Log.SlowQueryFile = f.Name() - config.StoreGlobalConfig(&newCfg) - defer func() { - tk.MustExec("set tidb_slow_log_threshold=300;") - tk.MustExec("set tidb_redact_log=0;") - config.StoreGlobalConfig(originCfg) - os.Remove(newCfg.Log.SlowQueryFile) - }() - err = logutil.InitLogger(newCfg.Log.ToLogConfig()) - c.Assert(err, IsNil) - - tk.MustExec("set tidb_slow_log_threshold=0;") - tk.MustExec(`prepare mystmt1 from 'select sleep(?), 1';`) - tk.MustExec("SET @num = 0.01;") - tk.MustExec("execute mystmt1 using @num;") - tk.MustQuery("SELECT Query FROM `information_schema`.`slow_query` " + - "where query like 'select%sleep%' order by time desc limit 1"). - Check(testkit.Rows( - "select sleep(?), 1 [arguments: 0.01];", - )) - - tk.MustExec("set tidb_redact_log=1;") - tk.MustExec(`prepare mystmt2 from 'select sleep(?), 2';`) - tk.MustExec("execute mystmt2 using @num;") - tk.MustQuery("SELECT Query FROM `information_schema`.`slow_query` " + - "where query like 'select%sleep%' order by time desc limit 1"). - Check(testkit.Rows( - "select `sleep` ( ? ) , ?;", - )) -} - -func (s *testSlowQuery) TestLogSlowLogIndex(c *C) { - tk := testkit.NewTestKit(c, s.store) - f, err := os.CreateTemp("", "tidb-slow-*.log") - c.Assert(err, IsNil) - f.Close() - - defer config.RestoreFunc()() - config.UpdateGlobal(func(conf *config.Config) { - conf.Log.SlowQueryFile = f.Name() - }) - err = logutil.InitLogger(config.GetGlobalConfig().Log.ToLogConfig()) - c.Assert(err, IsNil) - - tk.MustExec("use test") - tk.MustExec("create table t (a int, b int,index idx(a));") - tk.MustExec("set tidb_slow_log_threshold=0;") - tk.MustQuery("select * from t use index (idx) where a in (1) union select * from t use index (idx) where a in (2,3);") - tk.MustExec("set tidb_slow_log_threshold=300;") - tk.MustQuery("select index_names from `information_schema`.`slow_query` " + - "where query like 'select%union%' limit 1"). - Check(testkit.Rows( - "[t:idx]", - )) -} - -func (s *testSlowQuery) TestSlowQuery(c *C) { - tk := testkit.NewTestKit(c, s.store) - - f, err := os.CreateTemp("", "tidb-slow-*.log") - c.Assert(err, IsNil) - _, err = f.WriteString(` -# Time: 2020-10-13T20:08:13.970563+08:00 -select * from t; -# Time: 2020-10-16T20:08:13.970563+08:00 -select * from t; -`) - c.Assert(err, IsNil) - err = f.Close() - c.Assert(err, IsNil) - executor.ParseSlowLogBatchSize = 1 - originCfg := config.GetGlobalConfig() - newCfg := *originCfg - newCfg.Log.SlowQueryFile = f.Name() - config.StoreGlobalConfig(&newCfg) - defer func() { - executor.ParseSlowLogBatchSize = 64 - config.StoreGlobalConfig(originCfg) - err = os.Remove(newCfg.Log.SlowQueryFile) - c.Assert(err, IsNil) - }() - err = logutil.InitLogger(newCfg.Log.ToLogConfig()) - c.Assert(err, IsNil) - - tk.MustQuery("select count(*) from `information_schema`.`slow_query` where time > '2020-10-16 20:08:13' and time < '2020-10-16 21:08:13'").Check(testkit.Rows("1")) - tk.MustQuery("select count(*) from `information_schema`.`slow_query` where time > '2019-10-13 20:08:13' and time < '2020-10-16 21:08:13'").Check(testkit.Rows("2")) -} - func (s *testSerialSuite) TestKillTableReader(c *C) { var retry = "github.com/tikv/client-go/v2/locate/mockRetrySendReqToRegion" defer func() { diff --git a/executor/slow_query_sql_test.go b/executor/slow_query_sql_test.go new file mode 100644 index 0000000000000..fc4593bbb20aa --- /dev/null +++ b/executor/slow_query_sql_test.go @@ -0,0 +1,173 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor_test + +import ( + "fmt" + "math" + "os" + "testing" + + "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/executor" + "github.com/pingcap/tidb/testkit" + "github.com/pingcap/tidb/util/logutil" + "github.com/stretchr/testify/require" +) + +func TestSlowQueryWithoutSlowLog(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + tk := testkit.NewTestKit(t, store) + originCfg := config.GetGlobalConfig() + newCfg := *originCfg + newCfg.Log.SlowQueryFile = "tidb-slow-not-exist.log" + newCfg.Log.SlowThreshold = math.MaxUint64 + config.StoreGlobalConfig(&newCfg) + defer func() { + config.StoreGlobalConfig(originCfg) + }() + tk.MustQuery("select query from information_schema.slow_query").Check(testkit.Rows()) + tk.MustQuery("select query from information_schema.slow_query where time > '2020-09-15 12:16:39' and time < now()").Check(testkit.Rows()) +} + +func TestSlowQuerySensitiveQuery(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + tk := testkit.NewTestKit(t, store) + originCfg := config.GetGlobalConfig() + newCfg := *originCfg + + f, err := os.CreateTemp("", "tidb-slow-*.log") + require.NoError(t, err) + require.NoError(t, f.Close()) + newCfg.Log.SlowQueryFile = f.Name() + config.StoreGlobalConfig(&newCfg) + defer func() { + tk.MustExec("set tidb_slow_log_threshold=300;") + config.StoreGlobalConfig(originCfg) + require.NoError(t, os.Remove(newCfg.Log.SlowQueryFile)) + }() + require.NoError(t, logutil.InitLogger(newCfg.Log.ToLogConfig())) + + tk.MustExec(fmt.Sprintf("set @@tidb_slow_query_file='%v'", f.Name())) + tk.MustExec("set tidb_slow_log_threshold=0;") + tk.MustExec("drop user if exists user_sensitive;") + tk.MustExec("create user user_sensitive identified by '123456789';") + tk.MustExec("alter user 'user_sensitive'@'%' identified by 'abcdefg';") + tk.MustExec("set password for 'user_sensitive'@'%' = 'xyzuvw';") + tk.MustQuery("select query from `information_schema`.`slow_query` " + + "where (query like 'set password%' or query like 'create user%' or query like 'alter user%') " + + "and query like '%user_sensitive%' order by query;"). + Check(testkit.Rows( + "alter user {user_sensitive@% password = ***};", + "create user {user_sensitive@% password = ***};", + "set password for user user_sensitive@%;", + )) +} + +func TestSlowQueryPrepared(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + tk := testkit.NewTestKit(t, store) + originCfg := config.GetGlobalConfig() + newCfg := *originCfg + + f, err := os.CreateTemp("", "tidb-slow-*.log") + require.NoError(t, err) + require.NoError(t, f.Close()) + newCfg.Log.SlowQueryFile = f.Name() + config.StoreGlobalConfig(&newCfg) + defer func() { + tk.MustExec("set tidb_slow_log_threshold=300;") + tk.MustExec("set tidb_redact_log=0;") + config.StoreGlobalConfig(originCfg) + require.NoError(t, os.Remove(newCfg.Log.SlowQueryFile)) + }() + require.NoError(t, logutil.InitLogger(newCfg.Log.ToLogConfig())) + + tk.MustExec(fmt.Sprintf("set @@tidb_slow_query_file='%v'", f.Name())) + tk.MustExec("set tidb_slow_log_threshold=0;") + tk.MustExec(`prepare mystmt1 from 'select sleep(?), 1';`) + tk.MustExec("SET @num = 0.01;") + tk.MustExec("execute mystmt1 using @num;") + tk.MustQuery("SELECT Query FROM `information_schema`.`slow_query` " + + "where query like 'select%sleep%' order by time desc limit 1"). + Check(testkit.Rows("select sleep(?), 1 [arguments: 0.01];")) + + tk.MustExec("set tidb_redact_log=1;") + tk.MustExec(`prepare mystmt2 from 'select sleep(?), 2';`) + tk.MustExec("execute mystmt2 using @num;") + tk.MustQuery("SELECT Query FROM `information_schema`.`slow_query` " + + "where query like 'select%sleep%' order by time desc limit 1"). + Check(testkit.Rows("select `sleep` ( ? ) , ?;")) +} + +func TestLogSlowLogIndex(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + tk := testkit.NewTestKit(t, store) + f, err := os.CreateTemp("", "tidb-slow-*.log") + require.NoError(t, err) + require.NoError(t, f.Close()) + + defer config.RestoreFunc()() + config.UpdateGlobal(func(conf *config.Config) { + conf.Log.SlowQueryFile = f.Name() + }) + require.NoError(t, logutil.InitLogger(config.GetGlobalConfig().Log.ToLogConfig())) + + tk.MustExec(fmt.Sprintf("set @@tidb_slow_query_file='%v'", f.Name())) + tk.MustExec("use test") + tk.MustExec("create table t (a int, b int,index idx(a));") + tk.MustExec("set tidb_slow_log_threshold=0;") + tk.MustQuery("select * from t use index (idx) where a in (1) union select * from t use index (idx) where a in (2,3);") + tk.MustExec("set tidb_slow_log_threshold=300;") + tk.MustQuery("select index_names from `information_schema`.`slow_query` " + + "where query like 'select%union%' limit 1"). + Check(testkit.Rows("[t:idx]")) +} + +func TestSlowQuery(t *testing.T) { + store, clean := testkit.CreateMockStore(t) + defer clean() + tk := testkit.NewTestKit(t, store) + + f, err := os.CreateTemp("", "tidb-slow-*.log") + require.NoError(t, err) + _, err = f.WriteString(` +# Time: 2020-10-13T20:08:13.970563+08:00 +select * from t; +# Time: 2020-10-16T20:08:13.970563+08:00 +select * from t; +`) + require.NoError(t, err) + require.NoError(t, f.Close()) + executor.ParseSlowLogBatchSize = 1 + originCfg := config.GetGlobalConfig() + newCfg := *originCfg + newCfg.Log.SlowQueryFile = f.Name() + config.StoreGlobalConfig(&newCfg) + defer func() { + executor.ParseSlowLogBatchSize = 64 + config.StoreGlobalConfig(originCfg) + require.NoError(t, os.Remove(newCfg.Log.SlowQueryFile)) + }() + require.NoError(t, logutil.InitLogger(newCfg.Log.ToLogConfig())) + + tk.MustExec(fmt.Sprintf("set @@tidb_slow_query_file='%v'", f.Name())) + tk.MustQuery("select count(*) from `information_schema`.`slow_query` where time > '2020-10-16 20:08:13' and time < '2020-10-16 21:08:13'").Check(testkit.Rows("1")) + tk.MustQuery("select count(*) from `information_schema`.`slow_query` where time > '2019-10-13 20:08:13' and time < '2020-10-16 21:08:13'").Check(testkit.Rows("2")) +} diff --git a/executor/slow_query_test.go b/executor/slow_query_test.go index 997f0509f3cf3..83ab12ebc620e 100644 --- a/executor/slow_query_test.go +++ b/executor/slow_query_test.go @@ -39,7 +39,7 @@ import ( "github.com/stretchr/testify/require" ) -func parseLog(retriever *slowQueryRetriever, sctx sessionctx.Context, reader *bufio.Reader, logNum int) ([][]types.Datum, error) { +func parseLog(retriever *slowQueryRetriever, sctx sessionctx.Context, reader *bufio.Reader) ([][]types.Datum, error) { retriever.taskList = make(chan slowLogTask, 100) ctx := context.Background() retriever.parseSlowLog(ctx, sctx, reader, 64) @@ -67,14 +67,14 @@ func newSlowQueryRetriever() (*slowQueryRetriever, error) { return &slowQueryRetriever{outputCols: tbl.Meta().Columns}, nil } -func parseSlowLog(sctx sessionctx.Context, reader *bufio.Reader, logNum int) ([][]types.Datum, error) { +func parseSlowLog(sctx sessionctx.Context, reader *bufio.Reader) ([][]types.Datum, error) { retriever, err := newSlowQueryRetriever() if err != nil { return nil, err } // Ignore the error is ok for test. terror.Log(retriever.initialize(context.Background(), sctx)) - rows, err := parseLog(retriever, sctx, reader, logNum) + rows, err := parseLog(retriever, sctx, reader) return rows, err } @@ -108,7 +108,7 @@ select * from t;` require.NoError(t, err) sctx := mock.NewContext() sctx.GetSessionVars().TimeZone = loc - _, err = parseSlowLog(sctx, reader, 64) + _, err = parseSlowLog(sctx, reader) require.Error(t, err) require.Equal(t, err.Error(), "panic test") } @@ -145,7 +145,7 @@ select * from t;` require.NoError(t, err) ctx := mock.NewContext() ctx.GetSessionVars().TimeZone = loc - rows, err := parseSlowLog(ctx, reader, 64) + rows, err := parseSlowLog(ctx, reader) require.NoError(t, err) require.Len(t, rows, 1) recordString := "" @@ -168,7 +168,7 @@ select * from t;` // Issue 20928 reader = bufio.NewReader(bytes.NewBufferString(slowLogStr)) - rows, err = parseSlowLog(ctx, reader, 1) + rows, err = parseSlowLog(ctx, reader) require.NoError(t, err) require.Len(t, rows, 1) recordString = "" @@ -204,7 +204,7 @@ select a# from t; select * from t; `) reader = bufio.NewReader(slowLog) - _, err = parseSlowLog(ctx, reader, 64) + _, err = parseSlowLog(ctx, reader) require.NoError(t, err) // test for time format compatibility. @@ -215,7 +215,7 @@ select * from t; select * from t; `) reader = bufio.NewReader(slowLog) - rows, err = parseSlowLog(ctx, reader, 64) + rows, err = parseSlowLog(ctx, reader) require.NoError(t, err) require.Len(t, rows, 2) t0Str, err := rows[0][0].ToString() @@ -232,7 +232,7 @@ select * from t; select * from t; `) reader = bufio.NewReader(slowLog) - _, err = parseSlowLog(ctx, reader, 64) + _, err = parseSlowLog(ctx, reader) require.NoError(t, err) warnings := ctx.GetSessionVars().StmtCtx.GetWarnings() require.Len(t, warnings, 1) @@ -256,13 +256,13 @@ select * from t; sql := strings.Repeat("x", int(variable.MaxOfMaxAllowedPacket+1)) slowLog.WriteString(sql) reader := bufio.NewReader(slowLog) - _, err = parseSlowLog(ctx, reader, 64) + _, err = parseSlowLog(ctx, reader) require.Error(t, err) require.EqualError(t, err, "single line length exceeds limit: 65536") variable.MaxOfMaxAllowedPacket = originValue reader = bufio.NewReader(slowLog) - _, err = parseSlowLog(ctx, reader, 64) + _, err = parseSlowLog(ctx, reader) require.NoError(t, err) } @@ -313,7 +313,7 @@ select * from t;`) require.NoError(t, err) ctx := mock.NewContext() ctx.GetSessionVars().TimeZone = loc - _, err = parseSlowLog(ctx, scanner, 64) + _, err = parseSlowLog(ctx, scanner) require.NoError(t, err) // Test parser error. @@ -323,7 +323,7 @@ select * from t;`) select * from t; `) scanner = bufio.NewReader(slowLog) - _, err = parseSlowLog(ctx, scanner, 64) + _, err = parseSlowLog(ctx, scanner) require.NoError(t, err) warnings := ctx.GetSessionVars().StmtCtx.GetWarnings() require.Len(t, warnings, 1) @@ -469,7 +469,7 @@ select 7;` require.Equal(t, len(retriever.files), len(cas.files), comment) if len(retriever.files) > 0 { reader := bufio.NewReader(retriever.files[0].file) - rows, err := parseLog(retriever, sctx, reader, 64) + rows, err := parseLog(retriever, sctx, reader) require.NoError(t, err) require.Equal(t, len(rows), len(cas.querys), comment) for i, row := range rows { From d3245357d08da71fe897ab18d05dd18f15ca6978 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B1=B1=E5=B2=9A?= <36239017+YuJuncen@users.noreply.github.com> Date: Fri, 1 Apr 2022 13:14:29 +0800 Subject: [PATCH 6/7] br: fix unstable s3 test (#33610) close pingcap/tidb#33644 --- br/tests/br_s3/run.sh | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/br/tests/br_s3/run.sh b/br/tests/br_s3/run.sh index 9cd383de4f026..7aa2349f4fe23 100755 --- a/br/tests/br_s3/run.sh +++ b/br/tests/br_s3/run.sh @@ -37,13 +37,15 @@ start_s3() { bin/minio server --address $S3_ENDPOINT "$TEST_DIR/$DB" & s3_pid=$! i=0 - while ! curl -o /dev/null -v -s "http://$S3_ENDPOINT/"; do + status="$(curl -o /dev/null -v -s "http://$S3_ENDPOINT/" -w '%{http_code}' || true)" + while ! [ "$status" -gt 0 ] && [ "$status" -lt 500 ]; do i=$(($i+1)) if [ $i -gt 30 ]; then echo 'Failed to start minio' exit 1 fi sleep 2 + status="$(curl -o /dev/null -v -s "http://$S3_ENDPOINT/" -w '%{http_code}' || true)" done } From fa834a08063f857c77a8bba2362815702c9fd3a8 Mon Sep 17 00:00:00 2001 From: Takahashi <1225233+yuanhsh@users.noreply.github.com> Date: Thu, 31 Mar 2022 23:08:29 -0700 Subject: [PATCH 7/7] planner: Add control flag to keep or remove ORDER BY in subquery (#33173) close pingcap/tidb#32900 --- executor/set_test.go | 8 +++++++ planner/core/logical_plan_builder.go | 20 ++++++++++------ planner/core/logical_plan_test.go | 36 ++++++++++++++++++++++++++++ sessionctx/variable/session.go | 3 +++ sessionctx/variable/sysvar.go | 4 ++++ sessionctx/variable/tidb_vars.go | 4 ++++ 6 files changed, 68 insertions(+), 7 deletions(-) diff --git a/executor/set_test.go b/executor/set_test.go index d26e872fd22a3..9f2d12b9316bf 100644 --- a/executor/set_test.go +++ b/executor/set_test.go @@ -593,6 +593,14 @@ func TestSetVar(t *testing.T) { tk.MustExec("set global tidb_ignore_prepared_cache_close_stmt=0") tk.MustQuery("select @@global.tidb_ignore_prepared_cache_close_stmt").Check(testkit.Rows("0")) tk.MustQuery("show global variables like 'tidb_ignore_prepared_cache_close_stmt'").Check(testkit.Rows("tidb_ignore_prepared_cache_close_stmt OFF")) + + // test for tidb_remove_orderby_in_subquery + tk.MustQuery("select @@session.tidb_remove_orderby_in_subquery").Check(testkit.Rows("0")) // default value is 0 + tk.MustExec("set session tidb_remove_orderby_in_subquery=1") + tk.MustQuery("select @@session.tidb_remove_orderby_in_subquery").Check(testkit.Rows("1")) + tk.MustQuery("select @@global.tidb_remove_orderby_in_subquery").Check(testkit.Rows("0")) // default value is 0 + tk.MustExec("set global tidb_remove_orderby_in_subquery=1") + tk.MustQuery("select @@global.tidb_remove_orderby_in_subquery").Check(testkit.Rows("1")) } func TestTruncateIncorrectIntSessionVar(t *testing.T) { diff --git a/planner/core/logical_plan_builder.go b/planner/core/logical_plan_builder.go index ee8231be41c30..6141232a74a4f 100644 --- a/planner/core/logical_plan_builder.go +++ b/planner/core/logical_plan_builder.go @@ -3759,13 +3759,19 @@ func (b *PlanBuilder) buildSelect(ctx context.Context, sel *ast.SelectStmt) (p L } if sel.OrderBy != nil { - if b.ctx.GetSessionVars().SQLMode.HasOnlyFullGroupBy() { - p, err = b.buildSortWithCheck(ctx, p, sel.OrderBy.Items, orderMap, windowMapper, projExprs, oldLen, sel.Distinct) - } else { - p, err = b.buildSort(ctx, p, sel.OrderBy.Items, orderMap, windowMapper) - } - if err != nil { - return nil, err + // We need to keep the ORDER BY clause for the following cases: + // 1. The select is top level query, order should be honored + // 2. The query has LIMIT clause + // 3. The control flag requires keeping ORDER BY explicitly + if len(b.selectOffset) == 1 || sel.Limit != nil || !b.ctx.GetSessionVars().RemoveOrderbyInSubquery { + if b.ctx.GetSessionVars().SQLMode.HasOnlyFullGroupBy() { + p, err = b.buildSortWithCheck(ctx, p, sel.OrderBy.Items, orderMap, windowMapper, projExprs, oldLen, sel.Distinct) + } else { + p, err = b.buildSort(ctx, p, sel.OrderBy.Items, orderMap, windowMapper) + } + if err != nil { + return nil, err + } } } diff --git a/planner/core/logical_plan_test.go b/planner/core/logical_plan_test.go index 789a1cda0b90e..f799c64a059f4 100644 --- a/planner/core/logical_plan_test.go +++ b/planner/core/logical_plan_test.go @@ -2123,3 +2123,39 @@ func TestWindowLogicalPlanAmbiguous(t *testing.T) { } } } + +func TestRemoveOrderbyInSubquery(t *testing.T) { + tests := []struct { + sql string + best string + }{ + { + sql: "select * from t order by a", + best: "DataScan(t)->Projection->Sort", + }, + { + sql: "select (select 1) from t order by a", + best: "DataScan(t)->Projection->Sort->Projection", + }, + { + sql: "select count(*) from (select b from t order by a) n", + best: "DataScan(t)->Projection->Projection->Aggr(count(1),firstrow(test.t.b))->Projection", + }, + { + sql: "select count(1) from (select b from t order by a limit 1) n", + best: "DataScan(t)->Projection->Sort->Limit->Projection->Aggr(count(1),firstrow(test.t.b))->Projection", + }, + } + + s := createPlannerSuite() + s.ctx.GetSessionVars().RemoveOrderbyInSubquery = true + ctx := context.TODO() + for i, tt := range tests { + comment := fmt.Sprintf("case:%v sql:%s", i, tt.sql) + stmt, err := s.p.ParseOneStmt(tt.sql, "", "") + require.NoError(t, err, comment) + p, _, err := BuildLogicalPlanForTest(ctx, s.ctx, stmt, s.is) + require.NoError(t, err, comment) + require.Equal(t, tt.best, ToString(p), comment) + } +} diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 2f1eb99d15918..98c6e8dede550 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -1024,6 +1024,8 @@ type SessionVars struct { BatchPendingTiFlashCount int // RcReadCheckTS indicates if ts check optimization is enabled for current session. RcReadCheckTS bool + // RemoveOrderbyInSubquery indicates whether to remove ORDER BY in subquery. + RemoveOrderbyInSubquery bool } // InitStatementContext initializes a StatementContext, the object is reused to reduce allocation. @@ -1259,6 +1261,7 @@ func NewSessionVars() *SessionVars { Rng: utilMath.NewWithTime(), StatsLoadSyncWait: StatsLoadSyncWait.Load(), EnableLegacyInstanceScope: DefEnableLegacyInstanceScope, + RemoveOrderbyInSubquery: DefTiDBRemoveOrderbyInSubquery, } vars.KVVars = tikvstore.NewVariables(&vars.Killed) vars.Concurrency = Concurrency{ diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 25ac52238960f..2abc8f8beb0e3 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -1387,6 +1387,10 @@ var defaultSysVars = []*SysVar{ s.RcReadCheckTS = TiDBOptOn(val) return nil }}, + {Scope: ScopeGlobal | ScopeSession, Name: TiDBRemoveOrderbyInSubquery, Value: BoolToOnOff(DefTiDBRemoveOrderbyInSubquery), Type: TypeBool, SetSession: func(s *SessionVars, val string) error { + s.RemoveOrderbyInSubquery = TiDBOptOn(val) + return nil + }}, } // FeedbackProbability points to the FeedbackProbability in statistics package. diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index e70322be246ca..b8ec8a3120e93 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -594,6 +594,9 @@ const ( // TiDBEnableOrderedResultMode indicates if stabilize query results. TiDBEnableOrderedResultMode = "tidb_enable_ordered_result_mode" + // TiDBRemoveOrderbyInSubquery indicates whether to remove ORDER BY in subquery. + TiDBRemoveOrderbyInSubquery = "tidb_remove_orderby_in_subquery" + // TiDBEnablePseudoForOutdatedStats indicates whether use pseudo for outdated stats TiDBEnablePseudoForOutdatedStats = "tidb_enable_pseudo_for_outdated_stats" @@ -823,6 +826,7 @@ const ( DefTiDBIgnorePreparedCacheCloseStmt = false DefTiDBBatchPendingTiFlashCount = 4000 DefRCReadCheckTS = false + DefTiDBRemoveOrderbyInSubquery = false ) // Process global variables.