Skip to content

Commit

Permalink
cherry pick pingcap#29299 to release-5.3
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <[email protected]>
  • Loading branch information
Yisaer authored and ti-srebot committed Nov 4, 2021
1 parent 64319c0 commit 61e5e19
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 20 deletions.
31 changes: 28 additions & 3 deletions domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"os"
"path"
"strconv"
"strings"
"sync/atomic"
"time"

Expand Down Expand Up @@ -512,25 +513,28 @@ func (is *InfoSyncer) RemoveServerInfo() {
}
}

type topologyInfo struct {
// TopologyInfo is the topology info
type TopologyInfo struct {
ServerVersionInfo
IP string `json:"ip"`
StatusPort uint `json:"status_port"`
DeployPath string `json:"deploy_path"`
StartTimestamp int64 `json:"start_timestamp"`
Labels map[string]string `json:"labels"`
}

func (is *InfoSyncer) getTopologyInfo() topologyInfo {
func (is *InfoSyncer) getTopologyInfo() TopologyInfo {
s, err := os.Executable()
if err != nil {
s = ""
}
dir := path.Dir(s)
return topologyInfo{
return TopologyInfo{
ServerVersionInfo: ServerVersionInfo{
Version: mysql.TiDBReleaseVersion,
GitHash: is.info.ServerVersionInfo.GitHash,
},
IP: is.info.IP,
StatusPort: is.info.StatusPort,
DeployPath: dir,
StartTimestamp: is.info.StartTimestamp,
Expand Down Expand Up @@ -643,6 +647,27 @@ func (is *InfoSyncer) RestartTopology(ctx context.Context) error {
return is.newTopologySessionAndStoreServerInfo(ctx, owner.NewSessionDefaultRetryCnt)
}

// GetAllTiDBTopology gets all tidb topology
func (is *InfoSyncer) GetAllTiDBTopology(ctx context.Context) ([]*TopologyInfo, error) {
topos := make([]*TopologyInfo, 0)
response, err := is.etcdCli.Get(ctx, TopologyInformationPath, clientv3.WithPrefix())
if err != nil {
return nil, err
}
for _, kv := range response.Kvs {
if !strings.HasSuffix(string(kv.Key), "/info") {
continue
}
var topo *TopologyInfo
err = json.Unmarshal(kv.Value, &topo)
if err != nil {
return nil, err
}
topos = append(topos, topo)
}
return topos, nil
}

// newSessionAndStoreServerInfo creates a new etcd session and stores server info to etcd.
func (is *InfoSyncer) newSessionAndStoreServerInfo(ctx context.Context, retryCnt int) error {
if is.etcdCli == nil {
Expand Down
4 changes: 2 additions & 2 deletions domain/infosync/info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func TestTopology(t *testing.T) {
require.True(t, ttlExists)
}

func (is *InfoSyncer) getTopologyFromEtcd(ctx context.Context) (*topologyInfo, error) {
func (is *InfoSyncer) getTopologyFromEtcd(ctx context.Context) (*TopologyInfo, error) {
key := fmt.Sprintf("%s/%s:%v/info", TopologyInformationPath, is.info.IP, is.info.Port)
resp, err := is.etcdCli.Get(ctx, key)
if err != nil {
Expand All @@ -126,7 +126,7 @@ func (is *InfoSyncer) getTopologyFromEtcd(ctx context.Context) (*topologyInfo, e
if len(resp.Kvs) != 1 {
return nil, errors.New("resp.Kvs error")
}
var ret topologyInfo
var ret TopologyInfo
err = json.Unmarshal(resp.Kvs[0].Value, &ret)
if err != nil {
return nil, err
Expand Down
109 changes: 94 additions & 15 deletions server/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,48 +15,127 @@
package server

import (
"fmt"
"io"
"net/http"
"os"
"path/filepath"

"github.com/gorilla/mux"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/domain/infosync"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/util/logutil"
"go.uber.org/zap"
)

// PlanReplayerHandler is the handler for dumping plan replayer file.
type PlanReplayerHandler struct {
infoGetter *infosync.InfoSyncer
address string
statusPort uint
}

func (s *Server) newPlanReplayerHandler() *PlanReplayerHandler {
return &PlanReplayerHandler{}
cfg := config.GetGlobalConfig()
prh := &PlanReplayerHandler{
address: cfg.AdvertiseAddress,
statusPort: cfg.Status.StatusPort,
}
if s.dom != nil && s.dom.InfoSyncer() != nil {
prh.infoGetter = s.dom.InfoSyncer()
}
return prh
}

func (prh PlanReplayerHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
w.Header().Set("Content-Type", "application/zip")
w.Header().Set("Content-Disposition", "attachment; filename=\"plan_replayer.zip\"")

params := mux.Vars(req)

name := params[pFileName]
path := filepath.Join(domain.GetPlanReplayerDirName(), name)
file, err := os.Open(path)
if isExists(path) {
w.Header().Set("Content-Type", "application/zip")
w.Header().Set("Content-Disposition", "attachment; filename=\"plan_replayer.zip\"")
file, err := os.Open(path)
if err != nil {
writeError(w, err)
return
}
_, err = io.Copy(w, file)
if err != nil {
writeError(w, err)
return
}
err = file.Close()
if err != nil {
writeError(w, err)
return
}
err = os.Remove(path)
if err != nil {
writeError(w, err)
return
}
w.WriteHeader(http.StatusOK)
return
}
if prh.infoGetter == nil {
w.WriteHeader(http.StatusNotFound)
return
}
// we didn't find file for forward request, return 404
forwarded := req.URL.Query().Get("forward")
if len(forwarded) > 0 {
w.WriteHeader(http.StatusNotFound)
return
}
// If we didn't find file in origin request, try to broadcast the request to all remote tidb-servers
topos, err := prh.infoGetter.GetAllTiDBTopology(req.Context())
if err != nil {
writeError(w, err)
} else {
_, err := io.Copy(w, file)
return
}
// transfer each remote tidb-server and try to find dump file
for _, topo := range topos {
if topo.IP == prh.address && topo.StatusPort == prh.statusPort {
continue
}
url := fmt.Sprintf("http://%s:%v/plan_replayer/dump/%s?forward=true", topo.IP, topo.StatusPort, name)
resp, err := http.Get(url) // #nosec G107
if err != nil {
terror.Log(errors.Trace(err))
logutil.BgLogger().Error("forward request failed", zap.String("addr", topo.IP), zap.Uint("port", topo.StatusPort), zap.Error(err))
continue
}
if resp.StatusCode != http.StatusOK {
continue
}
// find dump file in one remote tidb-server, return file directly
w.Header().Set("Content-Type", "application/zip")
w.Header().Set("Content-Disposition", "attachment; filename=\"plan_replayer.zip\"")
_, err = io.Copy(w, resp.Body)
if err != nil {
writeError(w, err)
return
}
err = resp.Body.Close()
if err != nil {
writeError(w, err)
return
}
w.WriteHeader(http.StatusOK)
return
}
err = file.Close()
if err != nil {
terror.Log(errors.Trace(err))
}
err = os.Remove(path)
if err != nil {
terror.Log(errors.Trace(err))
// we can't find dump file in any tidb-server, return 404 directly
logutil.BgLogger().Info("can't find dump file in any remote server", zap.String("filename", name))
w.WriteHeader(http.StatusNotFound)
}

func isExists(path string) bool {
_, err := os.Stat(path)
if err != nil && !os.IsExist(err) {
return false
}
return true
}

0 comments on commit 61e5e19

Please sign in to comment.