Skip to content

Commit

Permalink
complete log reader (pingcap#35)
Browse files Browse the repository at this point in the history
* Support local_log_reader && remote_log_reader

* support predicate push down to log reader

* refine log names

* refine http log handler

* specific host to read log

* 1. support multi-node log-reader 2. support uniform tikv-tidb log reader

* finding log path from global config

* bug fix
  • Loading branch information
spongedu authored and qiuyesuifeng committed Oct 26, 2019
1 parent ceada31 commit 576417b
Show file tree
Hide file tree
Showing 23 changed files with 1,983 additions and 9 deletions.
47 changes: 46 additions & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,27 @@ func (b *executorBuilder) build(p plannercore.Plan) Executor {
case *plannercore.PhysicalStreamReader:
return b.buildStreamReader(v)
case *plannercore.PhysicalInspectionReader:
return b.buildInspectionReader(v)
TimeStampLayout := "2006-01-02 15:04:05"
local, _ := time.LoadLocation("Asia/Chongqing")
// Transfer format
if s, ok := v.InspectionTableAttrs["q_starttime"]; ok {
t, _ := time.ParseInLocation(TimeStampLayout, strings.Split(s, ".")[0], local)
v.InspectionTableAttrs["startTime"] = t.Format("2006-01-02T15:04:05")
}
if s, ok := v.InspectionTableAttrs["q_endtime"]; ok {
t, _ := time.ParseInLocation(TimeStampLayout, strings.Split(s, ".")[0], local)
v.InspectionTableAttrs["endTime"] = t.Format("2006-01-02T15:04:05")
}
tp := v.InspectionTableAttrs["type"]
switch tp {
case "log_local":
return b.buildLocalLogReader(v)
case "log_remote":
return b.buildRemoteLogReader(v)
default:
b.err = ErrUnknownPlan.GenWithStack("Unknown Inspection reader type: %+s", tp)
return nil
}
case *plannercore.PhysicalIndexReader:
return b.buildIndexReader(v)
case *plannercore.PhysicalIndexLookUpReader:
Expand Down Expand Up @@ -240,6 +260,31 @@ func (b *executorBuilder) buildInspectionReader(v *plannercore.PhysicalInspectio
}
}

func (b *executorBuilder) buildLocalLogReader(v *plannercore.PhysicalInspectionReader) *LocalLogReaderExecutor {
return &LocalLogReaderExecutor{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
Table: v.Table,
Columns: v.Columns,
startTimeStr: v.InspectionTableAttrs["startTime"],
endTimeStr: v.InspectionTableAttrs["endTime"],
LimitStr: v.InspectionTableAttrs["limit"],
}
}

func (b *executorBuilder) buildRemoteLogReader(v *plannercore.PhysicalInspectionReader) *RemoteLogReaderExecutor {
return &RemoteLogReaderExecutor{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
Table: v.Table,
Columns: v.Columns,
startTimeStr: v.InspectionTableAttrs["startTime"],
endTimeStr: v.InspectionTableAttrs["endTime"],
LimitStr: v.InspectionTableAttrs["limit"],
pattern: v.InspectionTableAttrs["pattern"],
level: v.InspectionTableAttrs["level"],
nodes: v.InspectionTableAttrs["nodes"],
filename: v.InspectionTableAttrs["filename"],
}
}

func (b *executorBuilder) buildCancelDDLJobs(v *plannercore.CancelDDLJobs) Executor {
e := &CancelDDLJobsExec{
Expand Down
10 changes: 10 additions & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1713,6 +1713,16 @@ func (e *TiDBInspectionExec) Next(ctx context.Context, req *chunk.Chunk) error {
req.AppendString(2, "OK")
}

// generate TIDB_LOG table
idx++
req.AppendInt64(0, idx)
req.AppendString(1, "generate [TIDB_LOG] table")
if err := e.i.CreateLogTable(); err != nil {
return errors.Trace(err)
} else {
req.AppendString(2, "OK")
}

// generate RESULT table
idx++
req.AppendInt64(0, idx)
Expand Down
171 changes: 171 additions & 0 deletions executor/local_log_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

import (
"fmt"
"strconv"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/config"
log2 "github.com/pingcap/tidb/infoschema/inspection/log"
"github.com/pingcap/tidb/infoschema/inspection/log/item"
"github.com/pingcap/tidb/infoschema/inspection/log/search"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"golang.org/x/net/context"
)

// make sure `LocalLogReaderExecutor` implements `Executor`.
var _ Executor = &LocalLogReaderExecutor{}

type LocalLogReaderExecutor struct {
baseExecutor
Table *model.TableInfo
Columns []*model.ColumnInfo

se *search.Sequence
result *chunk.Chunk
cnt int
limit int
startTimeStr string
endTimeStr string
LimitStr string
}

func (e *LocalLogReaderExecutor) Open(ctx context.Context) error {
TimeStampLayout := "2006-01-02T15:04:05"
local, err := time.LoadLocation("Asia/Chongqing")
if err != nil {
return err
}
// startTime, _ := time.ParseInLocation(TimeStampLayout ,"1970-01-01T00:00:00", local)
// endTime, _ := time.ParseInLocation(TimeStampLayout , "2030-01-01T00:00:00", local)
startTime, err := time.ParseInLocation(TimeStampLayout ,e.startTimeStr, local)
if err != nil {
return err
}
endTime, err := time.ParseInLocation(TimeStampLayout , e.endTimeStr, local)
if err != nil {
return err
}
l, err := strconv.ParseInt(e.LimitStr, 10, 64)
if err != nil {
return err
}
e.limit = int(l)
if e.se, err = search.NewSequence(log2.GetTiDBLogPath(), startTime, endTime); err != nil {
return err
}
e.cnt = 0
return nil
}

// Next fills data into the chunk passed by its caller.
func (e *LocalLogReaderExecutor) Next(ctx context.Context, chk *chunk.Chunk) error {
var err error

chk.GrowAndReset(e.maxChunkSize)
if e.result == nil {
e.result = newFirstChunk(e)
}
e.result.Reset()
err = e.fetchAll()
if err != nil {
return errors.Trace(err)
}
iter := chunk.NewIterator4Chunk(e.result)
for colIdx := 0; colIdx < e.Schema().Len(); colIdx++ {
retType := e.Schema().Columns[colIdx].RetType
if !types.IsTypeVarchar(retType.Tp) {
continue
}
for row := iter.Begin(); row != iter.End(); row = iter.Next() {
if valLen := len(row.GetString(colIdx)); retType.Flen < valLen {
retType.Flen = valLen
}
}
}
chk.Append(e.result, 0, e.result.NumRows())
return nil
}

// Close implements the Executor Close interface.
func (e *LocalLogReaderExecutor) Close() error {
return nil
}

func (e *LocalLogReaderExecutor) fetchAll() error {
err := e.fetchData()
if err != nil {
return errors.Trace(err)
}
return nil
}

func (e *LocalLogReaderExecutor) fetchData() error {
for i := 0; i < 10; i++ {
if e.cnt >= e.limit {
break
}
item, err := e.se.Next()
if err != nil {
if err.Error() == "EOF" {
break
} else {
return errors.Trace(err)
}
}
data, err := e.parseData(item)
if err != nil {
return errors.Trace(err)
}
row := chunk.MutRowFromDatums(data).ToRow()
e.result.AppendRow(row)
e.cnt++
}
return nil
}

func (e *LocalLogReaderExecutor) parseData(data item.Item) ([]types.Datum, error) {
row := make([]types.Datum, 0, len(e.Columns))
for _, col := range e.Columns {
switch col.Name.L {
case "address":
row = append(row, types.NewStringDatum(fmt.Sprintf("%s:%d", config.GetGlobalConfig().Status.StatusHost, config.GetGlobalConfig().Status.StatusPort)))
case "component":
row = append(row, types.NewStringDatum("tidb"))
case "filename":
row = append(row, types.NewStringDatum(data.GetFileName()))
case "time":
tm := types.Time{
Time: types.FromGoTime(data.GetTime()),
Type: mysql.TypeDatetime,
Fsp: 0,
}
row = append(row, types.NewTimeDatum(tm))
case "level":
row = append(row, types.NewStringDatum(log2.ParseLevelToStr(data.GetLevel())))
case "content":
row = append(row, types.NewStringDatum(string(data.GetContent())))
default:
data := types.NewDatum(nil)
row = append(row, data)
}
}
return row, nil
}
Loading

0 comments on commit 576417b

Please sign in to comment.