Skip to content

Commit

Permalink
server, statistics: support dump history stats (pingcap#10291)
Browse files Browse the repository at this point in the history
  • Loading branch information
XuHuaiyu authored and alivxxx committed Jul 21, 2019
1 parent 26d6269 commit 398b0d0
Show file tree
Hide file tree
Showing 17 changed files with 346 additions and 46 deletions.
19 changes: 19 additions & 0 deletions docs/tidb_http_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,3 +169,22 @@ timezone.*
- Go mutex pprof
- Full goroutine
- TiDB config and version
Param:
- seconds: profile time(s), default is 10s.
1. Get statistics data of specified table.
```shell
curl http://{TiDBIP}:10080/stats/dump/{db}/{table}
```
1. Get statistics data of specific table and timestamp.
```shell
curl http://{TiDBIP}:10080/stats/dump/{db}/{table}/{yyyyMMddHHmmss}
```
```shell
curl http://{TiDBIP}:10080/stats/dump/{db}/{table}/{yyyy-MM-dd HH:mm:ss}
```
2 changes: 1 addition & 1 deletion executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ func (a *ExecStmt) LogSlowQuery(txnTS uint64, succ bool) {
copTaskInfo := sessVars.StmtCtx.CopTasksDetails()
statsInfos := a.getStatsInfo()
memMax := sessVars.StmtCtx.MemTracker.MaxConsumed()
joinVars := fmt.Sprintf("idxBatch %d hjCon %d", sessVars.IndexJoinBatchSize,sessVars.HashJoinConcurrency)
joinVars := fmt.Sprintf("idxBatch %d hjCon %d", sessVars.IndexJoinBatchSize, sessVars.HashJoinConcurrency)
if costTime < threshold {
_, digest := sessVars.StmtCtx.SQLDigest()
logutil.SlowQueryLogger.Debug(sessVars.SlowLogFormat(txnTS, costTime, execDetail, indexIDs, digest, statsInfos, copTaskInfo, memMax, sql, planString, joinVars))
Expand Down
2 changes: 2 additions & 0 deletions executor/load_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package executor

import (
"encoding/json"
"fmt"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/domain"
Expand Down Expand Up @@ -77,6 +78,7 @@ func (e *LoadStatsExec) Open(ctx context.Context) error {
// Update updates the stats of the corresponding table according to the data.
func (e *LoadStatsInfo) Update(data []byte) error {
jsonTbl := &statistics.JSONTable{}
fmt.Printf("%s\n", data[:10])
if err := json.Unmarshal(data, jsonTbl); err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion planner/core/stringer.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ func toStringWithCount(in Plan, strs []string, idxs []int) ([]string, []int) {
str = "MaxOneRow"
case *PhysicalLimit:
str = "Limit" + fmt.Sprintf("%v\n", x.StatsCount())
case *PhysicalSort:
case *PhysicalSort:
str = "Sort" + fmt.Sprintf("%.2f", x.StatsCount())
case *PhysicalUnionAll:
last := len(idxs) - 1
Expand Down
1 change: 1 addition & 0 deletions server/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ const (
pColumnFlag = "colFlag"
pColumnLen = "colLen"
pRowBin = "rowBin"
pSnapshot = "snapshot"
)

// For query string
Expand Down
3 changes: 2 additions & 1 deletion server/http_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ func (s *Server) startHTTPServer() {
router.Handle("/metrics", prometheus.Handler())

// HTTP path for dump statistics.
router.Handle("/stats/dump/{db}/{table}", s.newStatsHandler())
router.Handle("/stats/dump/{db}/{table}", s.newStatsHandler()).Name("StatsDump")
router.Handle("/stats/dump/{db}/{table}/{snapshot}", s.newStatsHistoryHandler()).Name("StatsHistoryDump")

router.Handle("/settings", settingsHandler{})
router.Handle("/binlog/recover", binlogRecover{})
Expand Down
115 changes: 114 additions & 1 deletion server/statistics_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,20 @@
package server

import (
"github.com/pingcap/errors"
"github.com/pingcap/tidb/sessionctx"
"net/http"
"strings"
"time"

"github.com/gorilla/mux"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/sqlexec"
)

// StatsHandler is the handler for dumping statistics.
Expand Down Expand Up @@ -51,11 +59,116 @@ func (sh StatsHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if err != nil {
writeError(w, err)
} else {
js, err := h.DumpStatsToJSON(params[pDBName], tbl.Meta())
js, err := h.DumpStatsToJSON(params[pDBName], tbl.Meta(), nil)
if err != nil {
writeError(w, err)
} else {
writeData(w, js)
}
}
}

// StatsHistoryHandler is the handler for dumping statistics.
type StatsHistoryHandler struct {
do *domain.Domain
}

func (s *Server) newStatsHistoryHandler() *StatsHistoryHandler {
store, ok := s.driver.(*TiDBDriver)
if !ok {
panic("Illegal driver")
}

do, err := session.GetDomain(store.store)
if err != nil {
panic("Failed to get domain")
}
return &StatsHistoryHandler{do}
}

func compatibleParseGCTime(format, value string) (time.Time, error) {
t, err := time.Parse(format, value)

if err != nil {
// Remove the last field that separated by space
parts := strings.Split(value, " ")
prefix := strings.Join(parts[:len(parts)-1], " ")
t, err = time.Parse(format, prefix)
}

if err != nil {
err = errors.Errorf("string \"%v\" doesn't has a prefix that matches format \"%v\"", value, format)
}
return t, err
}

// validateSnapshot checks that the newly set snapshot time is after GC safe point time.
func validateSnapshot(ctx sessionctx.Context, snapshotTS uint64) error {
sql := "SELECT variable_value FROM mysql.tidb WHERE variable_name = 'tikv_gc_safe_point'"
rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(ctx, sql)
if err != nil {
return errors.Trace(err)
}
if len(rows) != 1 {
return errors.New("can not get 'tikv_gc_safe_point'")
}
safePointString := rows[0].GetString(0)
const gcTimeFormat = "20060102-15:04:05 -0700"
safePointTime, err := compatibleParseGCTime(gcTimeFormat, safePointString)
if err != nil {
return errors.Trace(err)
}
safePointTS := variable.GoTimeToTS(safePointTime)
if safePointTS > snapshotTS {
return variable.ErrSnapshotTooOld.GenWithStackByArgs(safePointString)
}
return nil
}

func (sh StatsHistoryHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
w.Header().Set("Content-Type", "application/json")

params := mux.Vars(req)
se, err := session.CreateSession(sh.do.Store())
if err != nil {
writeError(w, err)
return
}
se.GetSessionVars().StmtCtx.TimeZone = time.Local
t, err := types.ParseTime(se.GetSessionVars().StmtCtx, params[pSnapshot], mysql.TypeTimestamp, 6)
if err != nil {
writeError(w, err)
return
}
t1, err := t.Time.GoTime(time.Local)
if err != nil {
writeError(w, err)
return
}
snapshot := variable.GoTimeToTS(t1)
err = validateSnapshot(se, snapshot)
if err != nil {
writeError(w, err)
return
}

is, err := sh.do.GetSnapshotInfoSchema(snapshot)
if err != nil {
writeError(w, err)
return
}
h := sh.do.StatsHandle()
tbl, err := is.TableByName(model.NewCIStr(params[pDBName]), model.NewCIStr(params[pTableName]))
if err != nil {
writeError(w, err)
return
}
se.GetSessionVars().SnapshotInfoschema, se.GetSessionVars().SnapshotTS = is, snapshot
historyStatsExec := se.(sqlexec.RestrictedSQLExecutor)
js, err := h.DumpStatsToJSON(params[pDBName], tbl.Meta(), historyStatsExec)
if err != nil {
writeError(w, err)
} else {
writeData(w, js)
}
}
72 changes: 65 additions & 7 deletions server/statistics_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"io/ioutil"
"net/http"
"os"
"time"

"github.com/go-sql-driver/mysql"
"github.com/gorilla/mux"
Expand Down Expand Up @@ -105,6 +106,37 @@ func (ds *testDumpStatsSuite) TestDumpStatsAPI(c *C) {
c.Assert(err, IsNil)
fp.Write(js)
ds.checkData(c, path)

// sleep for 1 seconds to ensure the existence of tidb.test
time.Sleep(time.Second)
timeBeforeDropStats := time.Now()
snapshot := timeBeforeDropStats.Format("20060102150405")
ds.prepare4DumpHistoryStats(c)

// test dump history stats
resp1, err := http.Get("http://127.0.0.1:10090/stats/dump/tidb/test")
c.Assert(err, IsNil)
defer resp1.Body.Close()
js, err = ioutil.ReadAll(resp1.Body)
c.Assert(err, IsNil)
c.Assert(string(js), Equals, "null")

path1 := "/tmp/stats_history.json"
fp1, err := os.Create(path1)
c.Assert(err, IsNil)
c.Assert(fp1, NotNil)
defer func() {
c.Assert(fp1.Close(), IsNil)
//c.Assert(os.Remove(path1), IsNil)
}()

resp1, err = http.Get("http://127.0.0.1:10090/stats/dump/tidb/test/" + snapshot)
c.Assert(err, IsNil)

js, err = ioutil.ReadAll(resp1.Body)
c.Assert(err, IsNil)
fp1.Write(js)
ds.checkData(c, path1)
}

func (ds *testDumpStatsSuite) prepareData(c *C) {
Expand All @@ -128,23 +160,37 @@ func (ds *testDumpStatsSuite) prepareData(c *C) {
c.Assert(h.Update(is), IsNil)
}

func (ds *testDumpStatsSuite) prepare4DumpHistoryStats(c *C) {
db, err := sql.Open("mysql", getDSN())
c.Assert(err, IsNil, Commentf("Error connecting"))
defer db.Close()

dbt := &DBTest{c, db}

safePointName := "tikv_gc_safe_point"
safePointValue := "20060102-15:04:05 -0700"
safePointComment := "All versions after safe point can be accessed. (DO NOT EDIT)"
updateSafePoint := fmt.Sprintf(`INSERT INTO mysql.tidb VALUES ('%[1]s', '%[2]s', '%[3]s')
ON DUPLICATE KEY
UPDATE variable_value = '%[2]s', comment = '%[3]s'`, safePointName, safePointValue, safePointComment)
dbt.mustExec(updateSafePoint)

dbt.mustExec("drop table tidb.test")
dbt.mustExec("create table tidb.test (a int, b varchar(20))")
}

func (ds *testDumpStatsSuite) checkData(c *C, path string) {
db, err := sql.Open("mysql", getDSN(func(config *mysql.Config) {
config.AllowAllFiles = true
config.Strict = false
}))
c.Assert(err, IsNil, Commentf("Error connecting"))
dbt := &DBTest{c, db}
defer func() {
dbt.mustExec("drop database tidb")
dbt.mustExec("truncate table mysql.stats_meta")
dbt.mustExec("truncate table mysql.stats_histograms")
dbt.mustExec("truncate table mysql.stats_buckets")
db.Close()
}()
defer db.Close()

dbt.mustExec("use tidb")
dbt.mustExec("drop stats test")
fmt.Printf("%v\n", fmt.Sprintf("load stats '%s'", path))
_, err = dbt.db.Exec(fmt.Sprintf("load stats '%s'", path))
c.Assert(err, IsNil)

Expand All @@ -160,3 +206,15 @@ func (ds *testDumpStatsSuite) checkData(c *C, path string) {
dbt.Check(modifyCount, Equals, int64(3))
dbt.Check(count, Equals, int64(4))
}

func (ds *testDumpStatsSuite) clearData(c *C, path string) {
db, err := sql.Open("mysql", getDSN())
c.Assert(err, IsNil, Commentf("Error connecting"))
defer db.Close()

dbt := &DBTest{c, db}
dbt.mustExec("drop database tidb")
dbt.mustExec("truncate table mysql.stats_meta")
dbt.mustExec("truncate table mysql.stats_histograms")
dbt.mustExec("truncate table mysql.stats_buckets")
}
67 changes: 67 additions & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"encoding/json"
"fmt"
"net"
"strconv"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -1581,3 +1582,69 @@ func logQuery(query string, vars *variable.SessionVars) {
zap.String("sql", query+vars.GetExecuteArgumentsInfo()))
}
}

// ExecRestrictedSQLWithSnapshot implements RestrictedSQLExecutor interface.
// This is used for executing some restricted sql statements with snapshot.
// If current session sets the snapshot timestamp, then execute with this snapshot timestamp.
// Otherwise, execute with the current transaction start timestamp if the transaction is valid.
func (s *session) ExecRestrictedSQLWithSnapshot(sctx sessionctx.Context, sql string) ([]chunk.Row, []*ast.ResultField, error) {
ctx := context.TODO()

// Use special session to execute the sql.
tmp, err := s.sysSessionPool().Get()
if err != nil {
return nil, nil, err
}
se := tmp.(*session)
defer s.sysSessionPool().Put(tmp)
metrics.SessionRestrictedSQLCounter.Inc()
var snapshot uint64
txn, err := s.Txn(false)
if err != nil {
return nil, nil, err
}
if txn.Valid() {
snapshot = s.txn.StartTS()
}
if s.sessionVars.SnapshotTS != 0 {
snapshot = s.sessionVars.SnapshotTS
}
// Set snapshot.
if snapshot != 0 {
if err := se.sessionVars.SetSystemVar(variable.TiDBSnapshot, strconv.FormatUint(snapshot, 10)); err != nil {
return nil, nil, err
}
defer func() {
if err := se.sessionVars.SetSystemVar(variable.TiDBSnapshot, ""); err != nil {
logutil.Logger(context.Background()).Error("set tidbSnapshot error", zap.Error(err))
}
}()
}
startTime := time.Now()
recordSets, err := se.Execute(ctx, sql)
if err != nil {
return nil, nil, errors.Trace(err)
}

var (
rows []chunk.Row
fields []*ast.ResultField
)
// Execute all recordset, take out the first one as result.
for i, rs := range recordSets {
tmp, err := drainRecordSet(ctx, se, rs)
if err != nil {
return nil, nil, errors.Trace(err)
}
if err = rs.Close(); err != nil {
return nil, nil, errors.Trace(err)
}

if i == 0 {
rows = tmp
fields = rs.Fields()
}
}
metrics.QueryDurationHistogram.WithLabelValues(metrics.LblInternal).Observe(time.Since(startTime).Seconds())
return rows, fields, nil
}
Loading

0 comments on commit 398b0d0

Please sign in to comment.