Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: refactor slow log format and parse slow query log to SLOW_QUERY table. #9290

Merged
merged 39 commits into from
Mar 11, 2019
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
be5e5a4
slow log init
crazycs520 Jan 24, 2019
31314c4
use table name, index name instead of id in slow log
crazycs520 Jan 24, 2019
e94b4a8
fix bug, sql should end with ';', otherwise, pt-query-digest parse wi…
crazycs520 Jan 24, 2019
5d60865
add slow log table, init
crazycs520 Jan 28, 2019
2a6800b
parse slow log update
crazycs520 Jan 30, 2019
ee25e4c
add tidb_slow_query_file session variable
crazycs520 Jan 30, 2019
08a7fb4
update test
crazycs520 Feb 12, 2019
103a354
fix test
crazycs520 Feb 12, 2019
a2e68e6
refine code
crazycs520 Feb 12, 2019
00a679f
Merge branch 'master' of https://github.com/pingcap/tidb into slow-lo…
crazycs520 Feb 12, 2019
20342c4
add test for select slow_query table
crazycs520 Feb 12, 2019
b1deabd
fix ci errcheck
crazycs520 Feb 12, 2019
2bb3830
Merge branch 'master' of https://github.com/pingcap/tidb into slow-lo…
crazycs520 Feb 27, 2019
9ed2b3b
refine code
crazycs520 Feb 27, 2019
bfbb5a1
address comment and add TODO
crazycs520 Feb 27, 2019
c17171a
address comment and add test
crazycs520 Feb 27, 2019
2b19b61
add comment
crazycs520 Feb 27, 2019
ab6751e
fmt code
crazycs520 Feb 28, 2019
b873963
address comment
crazycs520 Mar 1, 2019
baadd28
remove blank line
crazycs520 Mar 1, 2019
911f9d1
add license
crazycs520 Mar 1, 2019
ea1226c
address comment
crazycs520 Mar 1, 2019
c6e6c55
address comment
crazycs520 Mar 4, 2019
c7a35d2
change default config for slow log file name
crazycs520 Mar 4, 2019
13a4398
fix test
crazycs520 Mar 5, 2019
05b8246
add index name filed to slow_query table
crazycs520 Mar 6, 2019
227c5b4
add comment
crazycs520 Mar 6, 2019
ef4f5cb
Merge branch 'master' into slow-log-format
crazycs520 Mar 6, 2019
42800e8
Merge branch 'master' into slow-log-format
crazycs520 Mar 6, 2019
982da25
Update util/execdetails/execdetails.go
zz-jason Mar 7, 2019
91a7e46
Update infoschema/slow_log.go
zz-jason Mar 7, 2019
c9067b9
address comment
crazycs520 Mar 7, 2019
5e696c7
address comment
crazycs520 Mar 8, 2019
c5530d6
Merge branch 'master' of https://github.com/pingcap/tidb into slow-lo…
crazycs520 Mar 8, 2019
d8aa948
add test for parse time
crazycs520 Mar 11, 2019
0b8d096
add test for slow log convert timezone
crazycs520 Mar 11, 2019
c0993c0
change slow log time type to timestamp
crazycs520 Mar 11, 2019
9161ac7
address comment
crazycs520 Mar 11, 2019
7591f7c
Merge branch 'master' into slow-log-format
crazycs520 Mar 11, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 13 additions & 30 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,54 +379,37 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool) {
sessVars := a.Ctx.GetSessionVars()
sql = QueryReplacer.Replace(sql) + sessVars.GetExecuteArgumentsInfo()

connID := sessVars.ConnectionID
currentDB := sessVars.CurrentDB
var tableIDs, indexIDs string
if len(sessVars.StmtCtx.TableIDs) > 0 {
tableIDs = strings.Replace(fmt.Sprintf("table_ids:%v ", a.Ctx.GetSessionVars().StmtCtx.TableIDs), " ", ",", -1)
var tableNames, indexNames string
if len(sessVars.StmtCtx.TableNames) > 0 {
tableNames = strings.Join(sessVars.StmtCtx.TableNames, ",")
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
}
if len(sessVars.StmtCtx.IndexIDs) > 0 {
indexIDs = strings.Replace(fmt.Sprintf("index_ids:%v ", a.Ctx.GetSessionVars().StmtCtx.IndexIDs), " ", ",", -1)
}
user := sessVars.User
var internal string
if sessVars.InRestrictedSQL {
internal = "[INTERNAL] "
if len(sessVars.StmtCtx.IndexNames) > 0 {
indexNames = strings.Join(sessVars.StmtCtx.IndexNames, ",")
}
execDetail := sessVars.StmtCtx.GetExecDetails()
if costTime < threshold {
logutil.SlowQueryLogger.Debugf(
"[QUERY] %vcost_time:%vs %s succ:%v con:%v user:%s txn_start_ts:%v database:%v %v%vsql:%v",
internal, costTime.Seconds(), execDetail, succ, connID, user, txnTS, currentDB, tableIDs, indexIDs, sql)
logutil.SlowQueryLogger.Debugf(sessVars.SlowLogFormat(txnTS, costTime, execDetail, indexNames, sql))
} else {
logutil.SlowQueryLogger.Warnf(
"[SLOW_QUERY] %vcost_time:%vs %s succ:%v con:%v user:%s txn_start_ts:%v database:%v %v%vsql:%v",
internal, costTime.Seconds(), execDetail, succ, connID, user, txnTS, currentDB, tableIDs, indexIDs, sql)
logutil.SlowQueryLogger.Warnf(sessVars.SlowLogFormat(txnTS, costTime, execDetail, indexNames, sql))
metrics.TotalQueryProcHistogram.Observe(costTime.Seconds())
metrics.TotalCopProcHistogram.Observe(execDetail.ProcessTime.Seconds())
metrics.TotalCopWaitHistogram.Observe(execDetail.WaitTime.Seconds())
var userString string
if user != nil {
userString = user.String()
}
if len(tableIDs) > 10 {
tableIDs = tableIDs[10 : len(tableIDs)-1] // Remove "table_ids:" and the last ","
zimulala marked this conversation as resolved.
Show resolved Hide resolved
}
if len(indexIDs) > 10 {
indexIDs = indexIDs[10 : len(indexIDs)-1] // Remove "index_ids:" and the last ","
if sessVars.User != nil {
userString = sessVars.User.String()
}
domain.GetDomain(a.Ctx).LogSlowQuery(&domain.SlowQueryInfo{
SQL: sql,
Start: a.StartTime,
Duration: costTime,
Detail: sessVars.StmtCtx.GetExecDetails(),
Succ: succ,
ConnID: connID,
ConnID: sessVars.ConnectionID,
TxnTS: txnTS,
User: userString,
DB: currentDB,
TableIDs: tableIDs,
IndexIDs: indexIDs,
DB: sessVars.CurrentDB,
TableIDs: tableNames,
IndexIDs: indexNames,
Internal: sessVars.InRestrictedSQL,
})
}
Expand Down
8 changes: 4 additions & 4 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1623,7 +1623,7 @@ func (b *executorBuilder) buildTableReader(v *plannercore.PhysicalTableReader) *
ts := v.TablePlans[0].(*plannercore.PhysicalTableScan)
ret.ranges = ts.Ranges
sctx := b.ctx.GetSessionVars().StmtCtx
sctx.TableIDs = append(sctx.TableIDs, ts.Table.ID)
sctx.TableNames = append(sctx.TableNames, ts.Table.Name.O)
return ret
}

Expand Down Expand Up @@ -1681,7 +1681,7 @@ func (b *executorBuilder) buildIndexReader(v *plannercore.PhysicalIndexReader) *
is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan)
ret.ranges = is.Ranges
sctx := b.ctx.GetSessionVars().StmtCtx
sctx.IndexIDs = append(sctx.IndexIDs, is.Index.ID)
sctx.IndexNames = append(sctx.IndexNames, is.Index.Name.O)
return ret
}

Expand Down Expand Up @@ -1761,8 +1761,8 @@ func (b *executorBuilder) buildIndexLookUpReader(v *plannercore.PhysicalIndexLoo
ret.ranges = is.Ranges
metrics.ExecutorCounter.WithLabelValues("IndexLookUpExecutor").Inc()
sctx := b.ctx.GetSessionVars().StmtCtx
sctx.IndexIDs = append(sctx.IndexIDs, is.Index.ID)
sctx.TableIDs = append(sctx.TableIDs, ts.Table.ID)
sctx.IndexNames = append(sctx.IndexNames, is.Index.Name.O)
sctx.TableNames = append(sctx.TableNames, ts.Table.Name.O)
return ret
}

Expand Down
182 changes: 182 additions & 0 deletions infoschema/slow_log.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
package infoschema
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved

import (
"bufio"
"os"
"strconv"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/execdetails"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/logutil"
log "github.com/sirupsen/logrus"
)

var slowQueryCols = []columnInfo{
{variable.SlowLogTimeStr, mysql.TypeDatetime, -1, 0, nil, nil},
zz-jason marked this conversation as resolved.
Show resolved Hide resolved
{variable.SlowLogTxnStartTSStr, mysql.TypeLonglong, 20, mysql.UnsignedFlag, nil, nil},
zz-jason marked this conversation as resolved.
Show resolved Hide resolved
{variable.SlowLogUserStr, mysql.TypeVarchar, 64, 0, nil, nil},
{variable.SlowLogConnIDStr, mysql.TypeLonglong, 20, mysql.UnsignedFlag, nil, nil},
zz-jason marked this conversation as resolved.
Show resolved Hide resolved
{variable.SlowLogQueryTimeStr, mysql.TypeDouble, 22, 0, nil, nil},
{execdetails.ProcessTimeStr, mysql.TypeDouble, 22, 0, nil, nil},
{execdetails.WaitTimeStr, mysql.TypeDouble, 22, 0, nil, nil},
{execdetails.BackoffTimeStr, mysql.TypeDouble, 22, 0, nil, nil},
{execdetails.RequestCountStr, mysql.TypeLonglong, 20, mysql.UnsignedFlag, nil, nil},
{execdetails.TotalKeysStr, mysql.TypeLonglong, 20, mysql.UnsignedFlag, nil, nil},
{execdetails.ProcessedKeysStr, mysql.TypeLonglong, 20, mysql.UnsignedFlag, nil, nil},
{variable.SlowLogDBStr, mysql.TypeVarchar, 64, 0, nil, nil},
zz-jason marked this conversation as resolved.
Show resolved Hide resolved
{variable.SlowLogIsInternalStr, mysql.TypeTiny, 1, 0, nil, nil},
{variable.SlowLogQuerySQLStr, mysql.TypeVarchar, 4096, 0, nil, nil},
}

func dataForSlowLog(ctx sessionctx.Context) ([][]types.Datum, error) {
rowsMap, err := parseSlowLogFile(ctx.GetSessionVars().SlowQueryFile)
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, errors.Trace(err)
}
var rows [][]types.Datum
for _, row := range rowsMap {
record := make([]types.Datum, 0, len(slowQueryCols))
for _, col := range slowQueryCols {
if v, ok := row[col.name]; ok {
record = append(record, v)
} else {
record = append(record, types.NewDatum(nil))
}
}
rows = append(rows, record)
}
return rows, nil
}

// TODO: Support parse multiple log-files.
zz-jason marked this conversation as resolved.
Show resolved Hide resolved
func parseSlowLogFile(filePath string) ([]map[string]types.Datum, error) {
file, err := os.Open(filePath)
if err != nil {
return nil, errors.Trace(err)
}
defer func() {
if err = file.Close(); err != nil {
log.Error(err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add more information to this log.

}
}()

return parseSlowLog(bufio.NewScanner(file))
}

// TODO: optimize for parse huge log-file.
func parseSlowLog(scanner *bufio.Scanner) ([]map[string]types.Datum, error) {
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
rows := make([]map[string]types.Datum, 0)
rowMap := make(map[string]types.Datum, len(slowQueryCols))
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
startFlag := false
startPrefix := variable.SlowLogPrefixStr + variable.SlowLogTimeStr + variable.SlowLogSpaceMarkStr

for scanner.Scan() {
line := scanner.Text()
// Check slow log entry start flag.
if !startFlag && strings.Contains(line, startPrefix) {
value, err := parseSlowLogField(variable.SlowLogTimeStr, line[len(startPrefix):])
if err != nil {
log.Errorf("parse slow log error: %v", err)
continue
}
rowMap[variable.SlowLogTimeStr] = *value
startFlag = true
continue
}

if startFlag {
// Parse slow log field.
if strings.Contains(line, variable.SlowLogPrefixStr) {
line = line[len(variable.SlowLogPrefixStr):]
fieldValues := strings.Split(line, " ")
for i := 0; i < len(fieldValues)-1; i += 2 {
field := fieldValues[i]
if strings.HasSuffix(field, ":") {
field = field[:len(field)-1]
}
value, err := parseSlowLogField(field, fieldValues[i+1])
if err != nil {
log.Errorf("parse slow log error: %v", err)
continue
}
rowMap[field] = *value

}
} else if strings.HasSuffix(line, variable.SlowLogSQLSuffixStr) {
// Get the sql string, and mark the start flag to false.
rowMap[variable.SlowLogQuerySQLStr] = types.NewStringDatum(string(hack.Slice(line)))
rows = append(rows, rowMap)
rowMap = make(map[string]types.Datum, len(slowQueryCols))
startFlag = false
}
}
}

if err := scanner.Err(); err != nil {
return nil, errors.Trace(err)
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
}
return rows, nil
}

func parseSlowLogField(field, value string) (*types.Datum, error) {
col := findColumnByName(slowQueryCols, field)
if col == nil {
return nil, errors.Errorf("can't found column %v", field)
}
var val types.Datum
switch col.tp {
case mysql.TypeLonglong:
num, err := strconv.ParseUint(value, 10, 64)
if err != nil {
return nil, errors.Trace(err)
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
}
val = types.NewUintDatum(num)
case mysql.TypeVarchar:
val = types.NewStringDatum(value)
case mysql.TypeDouble:
num, err := strconv.ParseFloat(value, 64)
if err != nil {
return nil, errors.Trace(err)
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
}
val = types.NewDatum(num)
case mysql.TypeTiny:
// parse bool
val = types.NewDatum(value == "true")
case mysql.TypeDatetime:
t, err := parseTime(value)
if err != nil {
return nil, errors.Trace(err)
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
}
val = types.NewTimeDatum(types.Time{
Time: types.FromGoTime(t),
Type: mysql.TypeDatetime,
Fsp: types.MaxFsp,
})
}
return &val, nil
}

func parseTime(s string) (time.Time, error) {
t, err := time.Parse(logutil.SlowLogTimeFormat, s)

if err != nil {
err = errors.Errorf("string \"%v\" doesn't has a prefix that matches format \"%v\", err: %v", s, logutil.SlowLogTimeFormat, err)
}
return t, err
}

func findColumnByName(cols []columnInfo, colName string) *columnInfo {
for _, col := range cols {
if col.name == colName {
return &col
}
}
return nil
}
61 changes: 61 additions & 0 deletions infoschema/slow_log_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package infoschema

import (
"bufio"
"bytes"
"testing"

"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/types"
)

func TestParseSlowLogFile(t *testing.T) {
slowLog := bytes.NewBufferString(
`# Time: 2019-01-24-22:32:29.313255 +0800
# Txn_start_ts: 405888132465033227
# Query_time: 0.216905
# Process_time: 0.021 Request_count: 1 Total_keys: 637 Processed_keys: 436
# Is_internal: true
select * from t;`)
crazycs520 marked this conversation as resolved.
Show resolved Hide resolved
scanner := bufio.NewScanner(slowLog)
rows, err := parseSlowLog(scanner)
if err != nil {
t.Fatalf("parse slow log failed")
}
if len(rows) != 1 {
t.Fatalf("parse slow log failed")
}
row := rows[0]
t1 := types.Time{
Time: types.FromDate(2019, 01, 24, 22, 32, 29, 313255),
Type: mysql.TypeDatetime,
Fsp: types.MaxFsp,
}
if logTime, ok := row["Time"]; !ok || logTime.GetMysqlTime() != t1 {
t.Fatalf("parse slow log failed")
}
if ts, ok := row["Txn_start_ts"]; !ok || ts.GetUint64() != 405888132465033227 {
t.Fatalf("parse slow log failed")
}
if queryTime, ok := row["Query_time"]; !ok || queryTime.GetFloat64() != 0.216905 {
t.Fatalf("parse slow log failed")
}
if ProcessTime, ok := row["Process_time"]; !ok || ProcessTime.GetFloat64() != 0.021 {
t.Fatalf("parse slow log failed")
}
if requestCount, ok := row["Request_count"]; !ok || requestCount.GetUint64() != 1 {
t.Fatalf("parse slow log failed")
}
if totalKeys, ok := row["Total_keys"]; !ok || totalKeys.GetUint64() != 637 {
t.Fatalf("parse slow log failed")
}
if processedKeys, ok := row["Processed_keys"]; !ok || processedKeys.GetUint64() != 436 {
t.Fatalf("parse slow log failed")
}
if isInternal, ok := row["Is_internal"]; !ok || isInternal.GetInt64() != 1 {
t.Fatalf("parse slow log failed")
}
if sql, ok := row["Query"]; !ok || sql.GetString() != "select * from t;" {
t.Fatalf("parse slow log failed")
}
}
4 changes: 4 additions & 0 deletions infoschema/tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ const (
tableCollationCharacterSetApplicability = "COLLATION_CHARACTER_SET_APPLICABILITY"
tableProcesslist = "PROCESSLIST"
tableTiDBIndexes = "TIDB_INDEXES"
tableSlowLog = "SLOW_QUERY"
)

type columnInfo struct {
Expand Down Expand Up @@ -1469,6 +1470,7 @@ var tableNameToColumns = map[string][]columnInfo{
tableCollationCharacterSetApplicability: tableCollationCharacterSetApplicabilityCols,
tableProcesslist: tableProcesslistCols,
tableTiDBIndexes: tableTiDBIndexesCols,
tableSlowLog: slowQueryCols,
}

func createInfoSchemaTable(handle *Handle, meta *model.TableInfo) *infoschemaTable {
Expand Down Expand Up @@ -1560,6 +1562,8 @@ func (it *infoschemaTable) getRows(ctx sessionctx.Context, cols []*table.Column)
fullRows = dataForCollationCharacterSetApplicability()
case tableProcesslist:
fullRows = dataForProcesslist(ctx)
case tableSlowLog:
fullRows, err = dataForSlowLog(ctx)
}
if err != nil {
return nil, errors.Trace(err)
Expand Down
Loading