Skip to content

Commit

Permalink
server: refine plan replayer dump file (#33497)
Browse files Browse the repository at this point in the history
ref #26335
  • Loading branch information
Yisaer authored Mar 28, 2022
1 parent da95768 commit 060413b
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 47 deletions.
18 changes: 9 additions & 9 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ type Domain struct {
statsUpdating atomicutil.Int32
cancel context.CancelFunc
indexUsageSyncLease time.Duration
planReplayer *planReplayer
dumpFileGcChecker *dumpFileGcChecker
expiredTimeStamp4PC types.Time

serverID uint64
Expand Down Expand Up @@ -718,7 +718,7 @@ func (do *Domain) Close() {
const resourceIdleTimeout = 3 * time.Minute // resources in the ResourcePool will be recycled after idleTimeout

// NewDomain creates a new domain. Should not create multiple domains for the same store.
func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duration, idxUsageSyncLease time.Duration, planReplayerGCLease time.Duration, factory pools.Factory, onClose func()) *Domain {
func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duration, idxUsageSyncLease time.Duration, dumpFileGcLease time.Duration, factory pools.Factory, onClose func()) *Domain {
capacity := 200 // capacity of the sysSessionPool size
do := &Domain{
store: store,
Expand All @@ -728,7 +728,7 @@ func NewDomain(store kv.Storage, ddlLease time.Duration, statsLease time.Duratio
infoCache: infoschema.NewCache(16),
slowQuery: newTopNSlowQueries(30, time.Hour*24*7, 500),
indexUsageSyncLease: idxUsageSyncLease,
planReplayer: &planReplayer{planReplayerGCLease: planReplayerGCLease},
dumpFileGcChecker: &dumpFileGcChecker{gcLease: dumpFileGcLease, paths: []string{GetPlanReplayerDirName(), GetOptimizerTraceDirName()}},
onClose: onClose,
expiredTimeStamp4PC: types.NewTime(types.ZeroCoreTime, mysql.TypeTimestamp, types.DefaultFsp),
}
Expand Down Expand Up @@ -1221,23 +1221,23 @@ func (do *Domain) TelemetryRotateSubWindowLoop(ctx sessionctx.Context) {
}()
}

// PlanReplayerLoop creates a goroutine that handles `exit` and `gc`.
func (do *Domain) PlanReplayerLoop() {
// DumpFileGcCheckerLoop creates a goroutine that handles `exit` and `gc`.
func (do *Domain) DumpFileGcCheckerLoop() {
do.wg.Add(1)
go func() {
gcTicker := time.NewTicker(do.planReplayer.planReplayerGCLease)
gcTicker := time.NewTicker(do.dumpFileGcChecker.gcLease)
defer func() {
gcTicker.Stop()
do.wg.Done()
logutil.BgLogger().Info("PlanReplayerLoop exited.")
util.Recover(metrics.LabelDomain, "PlanReplayerLoop", nil, false)
logutil.BgLogger().Info("dumpFileGcChecker exited.")
util.Recover(metrics.LabelDomain, "dumpFileGcCheckerLoop", nil, false)
}()
for {
select {
case <-do.exit:
return
case <-gcTicker.C:
do.planReplayer.planReplayerGC(time.Hour)
do.dumpFileGcChecker.gcDumpFiles(time.Hour)
}
}
}()
Expand Down
29 changes: 18 additions & 11 deletions domain/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package domain

import (
"errors"
"fmt"
"io/ioutil"
"os"
"path/filepath"
Expand All @@ -29,9 +28,12 @@ import (
"go.uber.org/zap"
)

type planReplayer struct {
// dumpFileGcChecker is used to gc dump file in circle
// For now it is used by `plan replayer` and `trace plan` statement
type dumpFileGcChecker struct {
sync.Mutex
planReplayerGCLease time.Duration
gcLease time.Duration
paths []string
}

// GetPlanReplayerDirName returns plan replayer directory path.
Expand All @@ -56,32 +58,37 @@ func parseTime(s string) (time.Time, error) {
return time.Unix(0, i), nil
}

func (p *planReplayer) planReplayerGC(t time.Duration) {
func (p *dumpFileGcChecker) gcDumpFiles(t time.Duration) {
p.Lock()
defer p.Unlock()
path := GetPlanReplayerDirName()
for _, path := range p.paths {
p.gcDumpFilesByPath(path, t)
}
}

func (p *dumpFileGcChecker) gcDumpFilesByPath(path string, t time.Duration) {
files, err := ioutil.ReadDir(path)
if err != nil {
if !os.IsNotExist(err) {
logutil.BgLogger().Warn("[PlanReplayer] open plan replayer directory failed", zap.Error(err))
logutil.BgLogger().Warn("[dumpFileGcChecker] open plan replayer directory failed", zap.Error(err))
}
return
}

gcTime := time.Now().Add(-t)
for _, f := range files {
createTime, err := parseTime(f.Name())
fileName := f.Name()
createTime, err := parseTime(fileName)
if err != nil {
logutil.BgLogger().Warn("[PlanReplayer] parseTime failed", zap.Error(err))
logutil.BgLogger().Error("[dumpFileGcChecker] parseTime failed", zap.Error(err), zap.String("filename", fileName))
continue
}
if !createTime.After(gcTime) {
err := os.Remove(filepath.Join(path, f.Name()))
if err != nil {
logutil.BgLogger().Warn("[PlanReplayer] remove file failed", zap.Error(err))
logutil.BgLogger().Warn("[dumpFileGcChecker] remove file failed", zap.Error(err), zap.String("filename", fileName))
continue
}
logutil.BgLogger().Info(fmt.Sprintf("[PlanReplayer] GC %s", f.Name()))
logutil.BgLogger().Info("dumpFileGcChecker successful", zap.String("filename", fileName))
}
}
}
6 changes: 4 additions & 2 deletions domain/plan_replayer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ func TestPlanReplayerGC(t *testing.T) {
require.NoError(t, err)
zf.Close()

handler := &planReplayer{}
handler.planReplayerGC(0)
handler := &dumpFileGcChecker{
paths: []string{GetPlanReplayerDirName()},
}
handler.gcDumpFiles(0)

_, err = os.Stat(path)
require.NotNil(t, err)
Expand Down
12 changes: 6 additions & 6 deletions executor/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,9 @@ func (e *PlanReplayerSingleExec) Next(ctx context.Context, req *chunk.Chunk) err
// |_explain
// |-explain.txt
//
func (e *PlanReplayerSingleExec) dumpSingle(path string) (string, error) {
func (e *PlanReplayerSingleExec) dumpSingle(path string) (fileName string, err error) {
// Create path
err := os.MkdirAll(path, os.ModePerm)
err = os.MkdirAll(path, os.ModePerm)
if err != nil {
return "", errors.AddStack(err)
}
Expand All @@ -134,7 +134,7 @@ func (e *PlanReplayerSingleExec) dumpSingle(path string) (string, error) {
return "", err
}
key := base64.URLEncoding.EncodeToString(b)
fileName := fmt.Sprintf("replayer_single_%v_%v.zip", key, time)
fileName = fmt.Sprintf("replayer_single_%v_%v.zip", key, time)
zf, err := os.Create(filepath.Join(path, fileName))
if err != nil {
return "", errors.AddStack(err)
Expand All @@ -143,13 +143,13 @@ func (e *PlanReplayerSingleExec) dumpSingle(path string) (string, error) {
// Create zip writer
zw := zip.NewWriter(zf)
defer func() {
err := zw.Close()
err = zw.Close()
if err != nil {
logutil.BgLogger().Warn("Closing zip writer failed", zap.Error(err))
logutil.BgLogger().Error("Closing zip writer failed", zap.Error(err), zap.String("filename", fileName))
}
err = zf.Close()
if err != nil {
logutil.BgLogger().Warn("Closing zip file failed", zap.Error(err))
logutil.BgLogger().Error("Closing zip file failed", zap.Error(err), zap.String("filename", fileName))
}
}()

Expand Down
6 changes: 6 additions & 0 deletions server/optimize_trace.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,22 @@ type OptimizeTraceHandler struct {
infoGetter *infosync.InfoSyncer
address string
statusPort uint
scheme string
}

func (s *Server) newOptimizeTraceHandler() *OptimizeTraceHandler {
cfg := config.GetGlobalConfig()
oth := &OptimizeTraceHandler{
address: cfg.AdvertiseAddress,
statusPort: cfg.Status.StatusPort,
scheme: "http",
}
if s.dom != nil && s.dom.InfoSyncer() != nil {
oth.infoGetter = s.dom.InfoSyncer()
}
if len(cfg.Security.ClusterSSLCA) > 0 {
oth.scheme = "https"
}
return oth
}

Expand All @@ -55,6 +60,7 @@ func (oth OptimizeTraceHandler) ServeHTTP(w http.ResponseWriter, req *http.Reque
statusPort: oth.statusPort,
urlPath: fmt.Sprintf("optimize_trace/dump/%s", name),
downloadedFilename: "optimize_trace",
scheme: oth.scheme,
}
handleDownloadFile(handler, w, req)
}
45 changes: 27 additions & 18 deletions server/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@ import (
"path/filepath"

"github.com/gorilla/mux"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)
Expand All @@ -36,17 +34,22 @@ type PlanReplayerHandler struct {
infoGetter *infosync.InfoSyncer
address string
statusPort uint
scheme string
}

func (s *Server) newPlanReplayerHandler() *PlanReplayerHandler {
cfg := config.GetGlobalConfig()
prh := &PlanReplayerHandler{
address: cfg.AdvertiseAddress,
statusPort: cfg.Status.StatusPort,
scheme: "http",
}
if s.dom != nil && s.dom.InfoSyncer() != nil {
prh.infoGetter = s.dom.InfoSyncer()
}
if len(cfg.Security.ClusterSSLCA) > 0 {
prh.scheme = "https"
}
return prh
}

Expand All @@ -61,6 +64,7 @@ func (prh PlanReplayerHandler) ServeHTTP(w http.ResponseWriter, req *http.Reques
statusPort: prh.statusPort,
urlPath: fmt.Sprintf("plan_replyaer/dump/%s", name),
downloadedFilename: "plan_replayer",
scheme: prh.scheme,
}
handleDownloadFile(handler, w, req)
}
Expand All @@ -69,6 +73,8 @@ func handleDownloadFile(handler downloadFileHandler, w http.ResponseWriter, req
params := mux.Vars(req)
name := params[pFileName]
path := handler.filePath
isForwarded := len(req.URL.Query().Get("forward")) > 0
localAddr := fmt.Sprintf("%s:%v", handler.address, handler.statusPort)
exist, err := isExists(path)
if err != nil {
writeError(w, err)
Expand All @@ -90,27 +96,22 @@ func handleDownloadFile(handler downloadFileHandler, w http.ResponseWriter, req
writeError(w, err)
return
}
err = os.Remove(path)
if err != nil {
writeError(w, err)
return
}
_, err = w.Write(content)
if err != nil {
writeError(w, err)
return
}
w.Header().Set("Content-Type", "application/zip")
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=\"%s.zip\"", handler.downloadedFilename))
logutil.BgLogger().Info("return dump file successfully", zap.String("filename", name),
zap.String("address", localAddr), zap.Bool("forwarded", isForwarded))
return
}
if handler.infoGetter == nil {
w.WriteHeader(http.StatusNotFound)
return
}
// we didn't find file for forward request, return 404
forwarded := req.URL.Query().Get("forward")
if len(forwarded) > 0 {
// handler.infoGetter will be nil only in unit test
// or we couldn't find file for forward request, return 404
if handler.infoGetter == nil || isForwarded {
logutil.BgLogger().Info("failed to find dump file", zap.String("filename", name),
zap.String("address", localAddr), zap.Bool("forwarded", isForwarded))
w.WriteHeader(http.StatusNotFound)
return
}
Expand All @@ -125,14 +126,17 @@ func handleDownloadFile(handler downloadFileHandler, w http.ResponseWriter, req
if topo.IP == handler.address && topo.StatusPort == handler.statusPort {
continue
}
url := fmt.Sprintf("http://%s:%v/%s?forward=true", topo.IP, topo.StatusPort, handler.urlPath)
remoteAddr := fmt.Sprintf("%s/%v", topo.IP, topo.StatusPort)
url := fmt.Sprintf("%s://%s/%s?forward=true", handler.scheme, remoteAddr, handler.urlPath)
resp, err := http.Get(url) // #nosec G107
if err != nil {
terror.Log(errors.Trace(err))
logutil.BgLogger().Error("forward request failed", zap.String("addr", topo.IP), zap.Uint("port", topo.StatusPort), zap.Error(err))
logutil.BgLogger().Error("forward request failed",
zap.String("remote-addr", remoteAddr), zap.Error(err))
continue
}
if resp.StatusCode != http.StatusOK {
logutil.BgLogger().Info("can't find file in remote server", zap.String("filename", name),
zap.String("remote-addr", remoteAddr), zap.Int("status-code", resp.StatusCode))
continue
}
content, err := ioutil.ReadAll(resp.Body)
Expand All @@ -153,14 +157,19 @@ func handleDownloadFile(handler downloadFileHandler, w http.ResponseWriter, req
// find dump file in one remote tidb-server, return file directly
w.Header().Set("Content-Type", "application/zip")
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=\"%s.zip\"", handler.downloadedFilename))
logutil.BgLogger().Info("return dump file successfully in remote server",
zap.String("filename", name), zap.String("remote-addr", remoteAddr))
return
}
// we can't find dump file in any tidb-server, return 404 directly
logutil.BgLogger().Error("can't find dump file in any remote server", zap.String("filename", name))
logutil.BgLogger().Info("can't find dump file in any remote server", zap.String("filename", name))
w.WriteHeader(http.StatusNotFound)
_, err = w.Write([]byte(fmt.Sprintf("can't find dump file %s in any remote server", name)))
writeError(w, err)
}

type downloadFileHandler struct {
scheme string
filePath string
fileName string
infoGetter *infosync.InfoSyncer
Expand Down
2 changes: 1 addition & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2883,7 +2883,7 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) {
return nil, err
}

dom.PlanReplayerLoop()
dom.DumpFileGcCheckerLoop()

if raw, ok := store.(kv.EtcdBackend); ok {
err = raw.StartGCWorker()
Expand Down

0 comments on commit 060413b

Please sign in to comment.