Skip to content

Commit

Permalink
Merge branch 'master' into use_wgw
Browse files Browse the repository at this point in the history
  • Loading branch information
hawkingrei committed Apr 1, 2022
2 parents 4032d9e + fa834a0 commit efd16f8
Show file tree
Hide file tree
Showing 18 changed files with 329 additions and 197 deletions.
2 changes: 1 addition & 1 deletion br/pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -972,7 +972,7 @@ func (rc *Controller) saveStatusCheckpoint(ctx context.Context, tableName string
switch {
case err == nil:
break
case utils.IsRetryableError(err), utils.MessageIsRetryableStorageError(err.Error()), common.IsContextCanceledError(err):
case utils.MessageIsRetryableStorageError(err.Error()), common.IsContextCanceledError(err):
// recoverable error, should not be recorded in checkpoint
// which will prevent lightning from automatically recovering
return nil
Expand Down
10 changes: 7 additions & 3 deletions br/pkg/lightning/restore/table_restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"encoding/json"
"fmt"
"net"
"net/http"
"net/http/httptest"
"os"
Expand Down Expand Up @@ -996,9 +995,14 @@ func (s *tableRestoreSuite) TestSaveStatusCheckpoint() {
require.NoError(s.T(), err)
require.Equal(s.T(), 0, len(rc.errorSummaries.summary))

err = rc.saveStatusCheckpoint(context.Background(), common.UniqueTable("test", "tbl"), indexEngineID, &net.DNSError{IsTimeout: true}, checkpoints.CheckpointStatusImported)
err = rc.saveStatusCheckpoint(
context.Background(),
common.UniqueTable("test", "tbl"), indexEngineID,
common.ErrChecksumMismatch.GenWithStackByArgs(0, 0, 0, 0, 0, 0),
checkpoints.CheckpointStatusImported,
)
require.NoError(s.T(), err)
require.Equal(s.T(), 0, len(rc.errorSummaries.summary))
require.Equal(s.T(), 1, len(rc.errorSummaries.summary))

start := time.Now()
err = rc.saveStatusCheckpoint(context.Background(), common.UniqueTable("test", "tbl"), indexEngineID, nil, checkpoints.CheckpointStatusImported)
Expand Down
4 changes: 3 additions & 1 deletion br/tests/br_s3/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,15 @@ start_s3() {
bin/minio server --address $S3_ENDPOINT "$TEST_DIR/$DB" &
s3_pid=$!
i=0
while ! curl -o /dev/null -v -s "http://$S3_ENDPOINT/"; do
status="$(curl -o /dev/null -v -s "http://$S3_ENDPOINT/" -w '%{http_code}' || true)"
while ! [ "$status" -gt 0 ] && [ "$status" -lt 500 ]; do
i=$(($i+1))
if [ $i -gt 30 ]; then
echo 'Failed to start minio'
exit 1
fi
sleep 2
status="$(curl -o /dev/null -v -s "http://$S3_ENDPOINT/" -w '%{http_code}' || true)"
done
}

Expand Down
15 changes: 15 additions & 0 deletions ddl/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,13 @@ create table log_message_1 (
"partition p1 values less than ('G'));",
dbterror.ErrRangeNotIncreasing,
},
{
"create table t(a char(10) collate utf8mb4_bin) " +
"partition by range columns (a) (" +
"partition p0 values less than ('g'), " +
"partition p1 values less than ('A'));",
dbterror.ErrRangeNotIncreasing,
},
{
"CREATE TABLE t1(c0 INT) PARTITION BY HASH((NOT c0)) PARTITIONS 2;",
dbterror.ErrPartitionFunctionIsNotAllowed,
Expand Down Expand Up @@ -617,6 +624,14 @@ create table log_message_1 (
partition p0 values less than ('a'),
partition p1 values less than ('G'));`)

tk.MustExec("drop table if exists t;")
tk.MustExec(`create table t (a varchar(255) charset utf8mb4 collate utf8mb4_bin) ` +
`partition by range columns (a) ` +
`(partition pnull values less than (""),` +
`partition puppera values less than ("AAA"),` +
`partition plowera values less than ("aaa"),` +
`partition pmax values less than (MAXVALUE))`)

tk.MustExec("drop table if exists t;")
tk.MustExec(`create table t(a int) partition by range columns (a) (
partition p0 values less than (10),
Expand Down
5 changes: 0 additions & 5 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2681,11 +2681,6 @@ func checkTwoRangeColumns(ctx sessionctx.Context, curr, prev *model.PartitionDef
return false, nil
}

// Current and previous is the same.
if strings.EqualFold(curr.LessThan[i], prev.LessThan[i]) {
continue
}

// The tuples of column values used to define the partitions are strictly increasing:
// PARTITION p0 VALUES LESS THAN (5,10,'ggg')
// PARTITION p1 VALUES LESS THAN (10,20,'mmm')
Expand Down
15 changes: 11 additions & 4 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {
updateRawArgs = false
}
w.writeDDLSeqNum(job)
w.jobContext.resetWhenJobFinish()
err = t.AddHistoryDDLJob(job, updateRawArgs)
return errors.Trace(err)
}
Expand Down Expand Up @@ -520,17 +521,16 @@ func newMetaWithQueueTp(txn kv.Transaction, tp workerType) *meta.Meta {

func (w *jobContext) setDDLLabelForTopSQL(job *model.Job) {
if !topsqlstate.TopSQLEnabled() || job == nil {
w.cacheDigest = nil
w.ddlJobCtx = context.Background()
return
}

if job.Query != w.cacheSQL || w.cacheDigest == nil {
w.cacheNormalizedSQL, w.cacheDigest = parser.NormalizeDigest(job.Query)
w.cacheSQL = job.Query
w.ddlJobCtx = topsql.AttachSQLInfo(context.Background(), w.cacheNormalizedSQL, w.cacheDigest, "", nil, false)
} else {
topsql.AttachSQLInfo(w.ddlJobCtx, w.cacheNormalizedSQL, w.cacheDigest, "", nil, false)
}

w.ddlJobCtx = topsql.AttachSQLInfo(context.Background(), w.cacheNormalizedSQL, w.cacheDigest, "", nil, false)
}

func (w *jobContext) getResourceGroupTaggerForTopSQL() tikvrpc.ResourceGroupTagger {
Expand All @@ -546,6 +546,13 @@ func (w *jobContext) getResourceGroupTaggerForTopSQL() tikvrpc.ResourceGroupTagg
return tagger
}

func (w *jobContext) resetWhenJobFinish() {
w.ddlJobCtx = context.Background()
w.cacheSQL = ""
w.cacheDigest = nil
w.cacheNormalizedSQL = ""
}

// handleDDLJobQueue handles DDL jobs in DDL Job queue.
func (w *worker) handleDDLJobQueue(d *ddlCtx) error {
once := true
Expand Down
147 changes: 0 additions & 147 deletions executor/executor_legacy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ import (
"github.com/pingcap/tidb/util/deadlockhistory"
"github.com/pingcap/tidb/util/gcutil"
"github.com/pingcap/tidb/util/israce"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/mock"
"github.com/pingcap/tidb/util/rowcodec"
Expand Down Expand Up @@ -111,7 +110,6 @@ var _ = SerialSuites(&testRecoverTable{})
var _ = SerialSuites(&testClusterTableSuite{})
var _ = SerialSuites(&testSplitTable{&baseTestSuite{}})
var _ = SerialSuites(&testSerialSuite1{&baseTestSuite{}})
var _ = SerialSuites(&testSlowQuery{&baseTestSuite{}})
var _ = SerialSuites(&globalIndexSuite{&baseTestSuite{}})
var _ = SerialSuites(&testSerialSuite{&baseTestSuite{}})
var _ = SerialSuites(&testCoprCache{})
Expand All @@ -121,7 +119,6 @@ type testSuite struct{ *baseTestSuite }
type testSuiteP1 struct{ *baseTestSuite }
type testSuiteP2 struct{ *baseTestSuite }
type testSplitTable struct{ *baseTestSuite }
type testSlowQuery struct{ *baseTestSuite }
type globalIndexSuite struct{ *baseTestSuite }
type testSerialSuite struct{ *baseTestSuite }
type testCoprCache struct {
Expand Down Expand Up @@ -4794,150 +4791,6 @@ func (s *testSuite) TestGenerateColumnReplace(c *C) {
tk.MustQuery("select * from t1").Check(testkit.Rows("3 4"))
}

func (s *testSlowQuery) TestSlowQueryWithoutSlowLog(c *C) {
tk := testkit.NewTestKit(c, s.store)
originCfg := config.GetGlobalConfig()
newCfg := *originCfg
newCfg.Log.SlowQueryFile = "tidb-slow-not-exist.log"
newCfg.Log.SlowThreshold = math.MaxUint64
config.StoreGlobalConfig(&newCfg)
defer func() {
config.StoreGlobalConfig(originCfg)
}()
tk.MustQuery("select query from information_schema.slow_query").Check(testkit.Rows())
tk.MustQuery("select query from information_schema.slow_query where time > '2020-09-15 12:16:39' and time < now()").Check(testkit.Rows())
}

func (s *testSlowQuery) TestSlowQuerySensitiveQuery(c *C) {
tk := testkit.NewTestKit(c, s.store)
originCfg := config.GetGlobalConfig()
newCfg := *originCfg

f, err := os.CreateTemp("", "tidb-slow-*.log")
c.Assert(err, IsNil)
f.Close()
newCfg.Log.SlowQueryFile = f.Name()
config.StoreGlobalConfig(&newCfg)
defer func() {
tk.MustExec("set tidb_slow_log_threshold=300;")
config.StoreGlobalConfig(originCfg)
err = os.Remove(newCfg.Log.SlowQueryFile)
c.Assert(err, IsNil)
}()
err = logutil.InitLogger(newCfg.Log.ToLogConfig())
c.Assert(err, IsNil)

tk.MustExec("set tidb_slow_log_threshold=0;")
tk.MustExec("drop user if exists user_sensitive;")
tk.MustExec("create user user_sensitive identified by '123456789';")
tk.MustExec("alter user 'user_sensitive'@'%' identified by 'abcdefg';")
tk.MustExec("set password for 'user_sensitive'@'%' = 'xyzuvw';")
tk.MustQuery("select query from `information_schema`.`slow_query` " +
"where (query like 'set password%' or query like 'create user%' or query like 'alter user%') " +
"and query like '%user_sensitive%' order by query;").
Check(testkit.Rows(
"alter user {user_sensitive@% password = ***};",
"create user {user_sensitive@% password = ***};",
"set password for user user_sensitive@%;",
))
}

func (s *testSlowQuery) TestSlowQueryPrepared(c *C) {
tk := testkit.NewTestKit(c, s.store)
originCfg := config.GetGlobalConfig()
newCfg := *originCfg

f, err := os.CreateTemp("", "tidb-slow-*.log")
c.Assert(err, IsNil)
f.Close()
newCfg.Log.SlowQueryFile = f.Name()
config.StoreGlobalConfig(&newCfg)
defer func() {
tk.MustExec("set tidb_slow_log_threshold=300;")
tk.MustExec("set tidb_redact_log=0;")
config.StoreGlobalConfig(originCfg)
os.Remove(newCfg.Log.SlowQueryFile)
}()
err = logutil.InitLogger(newCfg.Log.ToLogConfig())
c.Assert(err, IsNil)

tk.MustExec("set tidb_slow_log_threshold=0;")
tk.MustExec(`prepare mystmt1 from 'select sleep(?), 1';`)
tk.MustExec("SET @num = 0.01;")
tk.MustExec("execute mystmt1 using @num;")
tk.MustQuery("SELECT Query FROM `information_schema`.`slow_query` " +
"where query like 'select%sleep%' order by time desc limit 1").
Check(testkit.Rows(
"select sleep(?), 1 [arguments: 0.01];",
))

tk.MustExec("set tidb_redact_log=1;")
tk.MustExec(`prepare mystmt2 from 'select sleep(?), 2';`)
tk.MustExec("execute mystmt2 using @num;")
tk.MustQuery("SELECT Query FROM `information_schema`.`slow_query` " +
"where query like 'select%sleep%' order by time desc limit 1").
Check(testkit.Rows(
"select `sleep` ( ? ) , ?;",
))
}

func (s *testSlowQuery) TestLogSlowLogIndex(c *C) {
tk := testkit.NewTestKit(c, s.store)
f, err := os.CreateTemp("", "tidb-slow-*.log")
c.Assert(err, IsNil)
f.Close()

defer config.RestoreFunc()()
config.UpdateGlobal(func(conf *config.Config) {
conf.Log.SlowQueryFile = f.Name()
})
err = logutil.InitLogger(config.GetGlobalConfig().Log.ToLogConfig())
c.Assert(err, IsNil)

tk.MustExec("use test")
tk.MustExec("create table t (a int, b int,index idx(a));")
tk.MustExec("set tidb_slow_log_threshold=0;")
tk.MustQuery("select * from t use index (idx) where a in (1) union select * from t use index (idx) where a in (2,3);")
tk.MustExec("set tidb_slow_log_threshold=300;")
tk.MustQuery("select index_names from `information_schema`.`slow_query` " +
"where query like 'select%union%' limit 1").
Check(testkit.Rows(
"[t:idx]",
))
}

func (s *testSlowQuery) TestSlowQuery(c *C) {
tk := testkit.NewTestKit(c, s.store)

f, err := os.CreateTemp("", "tidb-slow-*.log")
c.Assert(err, IsNil)
_, err = f.WriteString(`
# Time: 2020-10-13T20:08:13.970563+08:00
select * from t;
# Time: 2020-10-16T20:08:13.970563+08:00
select * from t;
`)
c.Assert(err, IsNil)
err = f.Close()
c.Assert(err, IsNil)
executor.ParseSlowLogBatchSize = 1
originCfg := config.GetGlobalConfig()
newCfg := *originCfg
newCfg.Log.SlowQueryFile = f.Name()
config.StoreGlobalConfig(&newCfg)
defer func() {
executor.ParseSlowLogBatchSize = 64
config.StoreGlobalConfig(originCfg)
err = os.Remove(newCfg.Log.SlowQueryFile)
c.Assert(err, IsNil)
}()
err = logutil.InitLogger(newCfg.Log.ToLogConfig())
c.Assert(err, IsNil)

tk.MustQuery("select count(*) from `information_schema`.`slow_query` where time > '2020-10-16 20:08:13' and time < '2020-10-16 21:08:13'").Check(testkit.Rows("1"))
tk.MustQuery("select count(*) from `information_schema`.`slow_query` where time > '2019-10-13 20:08:13' and time < '2020-10-16 21:08:13'").Check(testkit.Rows("2"))
}

func (s *testSerialSuite) TestKillTableReader(c *C) {
var retry = "github.com/tikv/client-go/v2/locate/mockRetrySendReqToRegion"
defer func() {
Expand Down
31 changes: 19 additions & 12 deletions executor/mem_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ type memTableReader struct {
buffer allocBuf
pkColIDs []int64
cacheTable kv.MemBuffer
offsets []int
}

type allocBuf struct {
Expand Down Expand Up @@ -240,18 +241,25 @@ func (m *memTableReader) getMemRows(ctx context.Context) ([][]types.Datum, error
opentracing.ContextWithSpan(ctx, span1)
}
mutableRow := chunk.MutRowFromTypes(m.retFieldTypes)
resultRows := make([]types.Datum, len(m.columns))
m.offsets = make([]int, len(m.columns))
for i, col := range m.columns {
m.offsets[i] = m.colIDs[col.ID]
}
err := iterTxnMemBuffer(m.ctx, m.cacheTable, m.kvRanges, func(key, value []byte) error {
row, err := m.decodeRecordKeyValue(key, value)
var err error
resultRows, err = m.decodeRecordKeyValue(key, value, &resultRows)
if err != nil {
return err
}

mutableRow.SetDatums(row...)
mutableRow.SetDatums(resultRows...)
matched, _, err := expression.EvalBool(m.ctx, m.conditions, mutableRow.ToRow())
if err != nil || !matched {
return err
}
m.addedRows = append(m.addedRows, row)
m.addedRows = append(m.addedRows, resultRows)
resultRows = make([]types.Datum, len(m.columns))
return nil
})
if err != nil {
Expand All @@ -265,30 +273,29 @@ func (m *memTableReader) getMemRows(ctx context.Context) ([][]types.Datum, error
return m.addedRows, nil
}

func (m *memTableReader) decodeRecordKeyValue(key, value []byte) ([]types.Datum, error) {
func (m *memTableReader) decodeRecordKeyValue(key, value []byte, resultRows *[]types.Datum) ([]types.Datum, error) {
handle, err := tablecodec.DecodeRowKey(key)
if err != nil {
return nil, errors.Trace(err)
}
return m.decodeRowData(handle, value)
return m.decodeRowData(handle, value, resultRows)
}

// decodeRowData uses to decode row data value.
func (m *memTableReader) decodeRowData(handle kv.Handle, value []byte) ([]types.Datum, error) {
func (m *memTableReader) decodeRowData(handle kv.Handle, value []byte, resultRows *[]types.Datum) ([]types.Datum, error) {
values, err := m.getRowData(handle, value)
if err != nil {
return nil, err
}
ds := make([]types.Datum, 0, len(m.columns))
for _, col := range m.columns {
offset := m.colIDs[col.ID]
d, err := tablecodec.DecodeColumnValue(values[offset], &col.FieldType, m.ctx.GetSessionVars().Location())
for i, col := range m.columns {
var datum types.Datum
err := tablecodec.DecodeColumnValueWithDatum(values[m.offsets[i]], &col.FieldType, m.ctx.GetSessionVars().Location(), &datum)
if err != nil {
return nil, err
}
ds = append(ds, d)
(*resultRows)[i] = datum
}
return ds, nil
return *resultRows, nil
}

// getRowData decodes raw byte slice to row data.
Expand Down
Loading

0 comments on commit efd16f8

Please sign in to comment.