From 29e164ea9b226d599dd67fe800e3e39a5919c466 Mon Sep 17 00:00:00 2001 From: yisaer Date: Mon, 1 Nov 2021 13:18:21 +0800 Subject: [PATCH 1/4] support forward dumpfile request Signed-off-by: yisaer add log Signed-off-by: yisaer add log Signed-off-by: yisaer add log Signed-off-by: yisaer add log Signed-off-by: yisaer fix get topo Signed-off-by: yisaer revise log Signed-off-by: yisaer fix lint Signed-off-by: yisaer fix lint Signed-off-by: yisaer fix lint Signed-off-by: yisaer revise handle error Signed-off-by: yisaer revise handle error Signed-off-by: yisaer --- domain/infosync/info.go | 31 ++++++++++- domain/infosync/info_test.go | 4 +- server/plan_replayer.go | 104 ++++++++++++++++++++++++++++++----- 3 files changed, 120 insertions(+), 19 deletions(-) diff --git a/domain/infosync/info.go b/domain/infosync/info.go index 697d4d2b1e984..57f7e336a4ccf 100644 --- a/domain/infosync/info.go +++ b/domain/infosync/info.go @@ -25,6 +25,7 @@ import ( "os" "path" "strconv" + "strings" "sync/atomic" "time" @@ -512,25 +513,28 @@ func (is *InfoSyncer) RemoveServerInfo() { } } -type topologyInfo struct { +// TopologyInfo is the topology info +type TopologyInfo struct { ServerVersionInfo + IP string `json:"ip"` StatusPort uint `json:"status_port"` DeployPath string `json:"deploy_path"` StartTimestamp int64 `json:"start_timestamp"` Labels map[string]string `json:"labels"` } -func (is *InfoSyncer) getTopologyInfo() topologyInfo { +func (is *InfoSyncer) getTopologyInfo() TopologyInfo { s, err := os.Executable() if err != nil { s = "" } dir := path.Dir(s) - return topologyInfo{ + return TopologyInfo{ ServerVersionInfo: ServerVersionInfo{ Version: mysql.TiDBReleaseVersion, GitHash: is.info.ServerVersionInfo.GitHash, }, + IP: is.info.IP, StatusPort: is.info.StatusPort, DeployPath: dir, StartTimestamp: is.info.StartTimestamp, @@ -643,6 +647,27 @@ func (is *InfoSyncer) RestartTopology(ctx context.Context) error { return is.newTopologySessionAndStoreServerInfo(ctx, owner.NewSessionDefaultRetryCnt) } +// GetAllTiDBTopology gets all tidb topology +func (is *InfoSyncer) GetAllTiDBTopology(ctx context.Context) ([]*TopologyInfo, error) { + topos := make([]*TopologyInfo, 0) + response, err := is.etcdCli.Get(ctx, TopologyInformationPath, clientv3.WithPrefix()) + if err != nil { + return nil, err + } + for _, kv := range response.Kvs { + if !strings.HasSuffix(string(kv.Key), "/info") { + continue + } + var topo *TopologyInfo + err = json.Unmarshal(kv.Value, &topo) + if err != nil { + return nil, err + } + topos = append(topos, topo) + } + return topos, nil +} + // newSessionAndStoreServerInfo creates a new etcd session and stores server info to etcd. func (is *InfoSyncer) newSessionAndStoreServerInfo(ctx context.Context, retryCnt int) error { if is.etcdCli == nil { diff --git a/domain/infosync/info_test.go b/domain/infosync/info_test.go index 1c34ffd8b7b84..e839b5daa8368 100644 --- a/domain/infosync/info_test.go +++ b/domain/infosync/info_test.go @@ -114,7 +114,7 @@ func TestTopology(t *testing.T) { require.True(t, ttlExists) } -func (is *InfoSyncer) getTopologyFromEtcd(ctx context.Context) (*topologyInfo, error) { +func (is *InfoSyncer) getTopologyFromEtcd(ctx context.Context) (*TopologyInfo, error) { key := fmt.Sprintf("%s/%s:%v/info", TopologyInformationPath, is.info.IP, is.info.Port) resp, err := is.etcdCli.Get(ctx, key) if err != nil { @@ -126,7 +126,7 @@ func (is *InfoSyncer) getTopologyFromEtcd(ctx context.Context) (*topologyInfo, e if len(resp.Kvs) != 1 { return nil, errors.New("resp.Kvs error") } - var ret topologyInfo + var ret TopologyInfo err = json.Unmarshal(resp.Kvs[0].Value, &ret) if err != nil { return nil, err diff --git a/server/plan_replayer.go b/server/plan_replayer.go index 657fa6518ebcd..3f1fc4815c4b7 100644 --- a/server/plan_replayer.go +++ b/server/plan_replayer.go @@ -15,6 +15,7 @@ package server import ( + "fmt" "io" "net/http" "os" @@ -22,41 +23,116 @@ import ( "github.com/gorilla/mux" "github.com/pingcap/errors" + "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/domain" + "github.com/pingcap/tidb/domain/infosync" "github.com/pingcap/tidb/parser/terror" + "github.com/pingcap/tidb/util/logutil" + "go.uber.org/zap" ) // PlanReplayerHandler is the handler for dumping plan replayer file. type PlanReplayerHandler struct { + infoSyncer *infosync.InfoSyncer + address string + statusPort uint } func (s *Server) newPlanReplayerHandler() *PlanReplayerHandler { - return &PlanReplayerHandler{} + cfg := config.GetGlobalConfig() + return &PlanReplayerHandler{ + infoSyncer: s.dom.InfoSyncer(), + address: cfg.AdvertiseAddress, + statusPort: cfg.Status.StatusPort, + } } func (prh PlanReplayerHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { - w.Header().Set("Content-Type", "application/zip") - w.Header().Set("Content-Disposition", "attachment; filename=\"plan_replayer.zip\"") - params := mux.Vars(req) - name := params[pFileName] path := filepath.Join(domain.GetPlanReplayerDirName(), name) - file, err := os.Open(path) + if isExists(path) { + w.Header().Set("Content-Type", "application/zip") + w.Header().Set("Content-Disposition", "attachment; filename=\"plan_replayer.zip\"") + file, err := os.Open(path) + if err != nil { + writeError(w, err) + return + } + _, err = io.Copy(w, file) + if err != nil { + writeError(w, err) + return + } + err = file.Close() + if err != nil { + writeError(w, err) + return + } + err = os.Remove(path) + if err != nil { + writeError(w, err) + return + } + w.WriteHeader(http.StatusOK) + return + } + // we didn't find file for forward request, return 404 + forwarded := req.URL.Query().Get("forward") + if len(forwarded) > 0 { + w.WriteHeader(http.StatusNotFound) + return + } + // If we didn't find file in origin request, try to broadcast the request to all remote tidb-servers + topos, err := prh.infoSyncer.GetAllTiDBTopology(req.Context()) if err != nil { writeError(w, err) - } else { - _, err := io.Copy(w, file) + return + } + // transfer each remote tidb-server and try to find dump file + for _, topo := range topos { + if topo.IP == prh.address && topo.StatusPort == prh.statusPort { + continue + } + url := fmt.Sprintf("http://%s:%v/plan_replayer/dump/%s?forward=true", topo.IP, topo.StatusPort, name) + resp, err := http.Get(url) if err != nil { terror.Log(errors.Trace(err)) + logutil.BgLogger().Error("forward request failed", zap.String("addr", topo.IP), zap.Uint("port", topo.StatusPort), zap.Error(err)) + continue } + if resp.StatusCode != http.StatusOK { + continue + } + // find dump file in one remote tidb-server, return file directly + w.Header().Set("Content-Type", "application/zip") + w.Header().Set("Content-Disposition", "attachment; filename=\"plan_replayer.zip\"") + _, err = io.Copy(w, resp.Body) + if err != nil { + writeError(w, err) + return + } + err = resp.Body.Close() + if err != nil { + writeError(w, err) + return + } + w.WriteHeader(http.StatusOK) + return } - err = file.Close() - if err != nil { - terror.Log(errors.Trace(err)) - } - err = os.Remove(path) + // we can't find dump file in any tidb-server, return 404 directly + logutil.BgLogger().Info("can't find dump file in any remote server", zap.String("filename", name)) + w.WriteHeader(http.StatusNotFound) + return +} + +func isExists(path string) bool { + _, err := os.Stat(path) if err != nil { - terror.Log(errors.Trace(err)) + if os.IsExist(err) { + return true + } + return false } + return true } From cd3892ba5c51412f51ba64663b74181daa22b237 Mon Sep 17 00:00:00 2001 From: yisaer Date: Mon, 1 Nov 2021 16:02:01 +0800 Subject: [PATCH 2/4] fix lint Signed-off-by: yisaer --- server/plan_replayer.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/server/plan_replayer.go b/server/plan_replayer.go index 3f1fc4815c4b7..07cb8ffb71db9 100644 --- a/server/plan_replayer.go +++ b/server/plan_replayer.go @@ -123,15 +123,11 @@ func (prh PlanReplayerHandler) ServeHTTP(w http.ResponseWriter, req *http.Reques // we can't find dump file in any tidb-server, return 404 directly logutil.BgLogger().Info("can't find dump file in any remote server", zap.String("filename", name)) w.WriteHeader(http.StatusNotFound) - return } func isExists(path string) bool { _, err := os.Stat(path) - if err != nil { - if os.IsExist(err) { - return true - } + if err != nil && !os.IsExist(err) { return false } return true From f9f2aa1b4337a5044df658b7b8300471ef48f20d Mon Sep 17 00:00:00 2001 From: yisaer Date: Mon, 1 Nov 2021 16:07:53 +0800 Subject: [PATCH 3/4] fix lint Signed-off-by: yisaer --- server/plan_replayer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/plan_replayer.go b/server/plan_replayer.go index 07cb8ffb71db9..368a5694467e8 100644 --- a/server/plan_replayer.go +++ b/server/plan_replayer.go @@ -95,7 +95,7 @@ func (prh PlanReplayerHandler) ServeHTTP(w http.ResponseWriter, req *http.Reques continue } url := fmt.Sprintf("http://%s:%v/plan_replayer/dump/%s?forward=true", topo.IP, topo.StatusPort, name) - resp, err := http.Get(url) + resp, err := http.Get(url) // #nosec G107 if err != nil { terror.Log(errors.Trace(err)) logutil.BgLogger().Error("forward request failed", zap.String("addr", topo.IP), zap.Uint("port", topo.StatusPort), zap.Error(err)) From 116501ca9fc65049faa665e480f5969bb48a5fdd Mon Sep 17 00:00:00 2001 From: yisaer Date: Mon, 1 Nov 2021 16:22:31 +0800 Subject: [PATCH 4/4] fix lint Signed-off-by: yisaer --- server/plan_replayer.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/server/plan_replayer.go b/server/plan_replayer.go index 368a5694467e8..78f0129c084f6 100644 --- a/server/plan_replayer.go +++ b/server/plan_replayer.go @@ -33,18 +33,21 @@ import ( // PlanReplayerHandler is the handler for dumping plan replayer file. type PlanReplayerHandler struct { - infoSyncer *infosync.InfoSyncer + infoGetter *infosync.InfoSyncer address string statusPort uint } func (s *Server) newPlanReplayerHandler() *PlanReplayerHandler { cfg := config.GetGlobalConfig() - return &PlanReplayerHandler{ - infoSyncer: s.dom.InfoSyncer(), + prh := &PlanReplayerHandler{ address: cfg.AdvertiseAddress, statusPort: cfg.Status.StatusPort, } + if s.dom != nil && s.dom.InfoSyncer() != nil { + prh.infoGetter = s.dom.InfoSyncer() + } + return prh } func (prh PlanReplayerHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { @@ -77,6 +80,10 @@ func (prh PlanReplayerHandler) ServeHTTP(w http.ResponseWriter, req *http.Reques w.WriteHeader(http.StatusOK) return } + if prh.infoGetter == nil { + w.WriteHeader(http.StatusNotFound) + return + } // we didn't find file for forward request, return 404 forwarded := req.URL.Query().Get("forward") if len(forwarded) > 0 { @@ -84,7 +91,7 @@ func (prh PlanReplayerHandler) ServeHTTP(w http.ResponseWriter, req *http.Reques return } // If we didn't find file in origin request, try to broadcast the request to all remote tidb-servers - topos, err := prh.infoSyncer.GetAllTiDBTopology(req.Context()) + topos, err := prh.infoGetter.GetAllTiDBTopology(req.Context()) if err != nil { writeError(w, err) return