From a1d32302b50303b7a7f55af80b889805587e074b Mon Sep 17 00:00:00 2001 From: ti-srebot <66930949+ti-srebot@users.noreply.github.com> Date: Mon, 7 Sep 2020 19:25:20 +0800 Subject: [PATCH] server: Refine log error format (#2873) (#2911) Signed-off-by: ti-srebot Signed-off-by: ZenoTan --- server/core/region_storage.go | 9 ++++-- server/core/storage.go | 12 ++++---- server/grpc_service.go | 3 +- server/heartbeat_streams.go | 6 ++-- server/id/id.go | 6 ++-- server/kv/etcd_kv.go | 2 +- server/member/lease.go | 3 +- server/server.go | 54 +++++++++++++++++------------------ server/systime_mon.go | 3 +- 9 files changed, 51 insertions(+), 47 deletions(-) diff --git a/server/core/region_storage.go b/server/core/region_storage.go index 3bc6b8e4ac2..be6de0d037f 100644 --- a/server/core/region_storage.go +++ b/server/core/region_storage.go @@ -19,7 +19,6 @@ import ( "sync" "time" - "github.com/pingcap/errors" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" @@ -140,7 +139,7 @@ func loadRegions(kv kv.Base, f func(region *RegionInfo) []*RegionInfo) error { for _, s := range res { region := &metapb.Region{} if err := region.Unmarshal([]byte(s)); err != nil { - return errors.WithStack(err) + return errs.ErrProtoUnmarshal.Wrap(err).GenWithStackByArgs() } nextID = region.GetId() + 1 @@ -181,5 +180,9 @@ func (s *RegionStorage) Close() error { log.Error("meet error before close the region storage", errs.ZapError(err)) } s.regionStorageCancel() - return errors.WithStack(s.LeveldbKV.Close()) + err = s.LeveldbKV.Close() + if err != nil { + return errs.ErrLevelDBClose.Wrap(err).GenWithStackByArgs() + } + return nil } diff --git a/server/core/storage.go b/server/core/storage.go index 8449344fdd3..45ad270aaf4 100644 --- a/server/core/storage.go +++ b/server/core/storage.go @@ -217,7 +217,7 @@ func (s *Storage) LoadConfig(cfg interface{}) (bool, error) { } err = json.Unmarshal([]byte(value), cfg) if err != nil { - return false, errors.WithStack(err) + return false, errs.ErrJSONUnmarshal.Wrap(err).GenWithStackByCause() } return true, nil } @@ -264,7 +264,7 @@ func (s *Storage) LoadRules(f func(k, v string)) (bool, error) { func (s *Storage) SaveReplicationStatus(mode string, status interface{}) error { value, err := json.Marshal(status) if err != nil { - return errors.WithStack(err) + return errs.ErrJSONMarshal.Wrap(err).GenWithStackByArgs() } return s.Save(path.Join(replicationPath, mode), string(value)) } @@ -280,7 +280,7 @@ func (s *Storage) LoadReplicationStatus(mode string, status interface{}) (bool, } err = json.Unmarshal([]byte(v), status) if err != nil { - return false, errors.WithStack(err) + return false, errs.ErrJSONUnmarshal.Wrap(err).GenWithStackByArgs() } return true, nil } @@ -305,7 +305,7 @@ func (s *Storage) LoadComponent(component interface{}) (bool, error) { } err = json.Unmarshal([]byte(v), component) if err != nil { - return false, errors.WithStack(err) + return false, errs.ErrJSONUnmarshal.Wrap(err).GenWithStackByArgs() } return true, nil } @@ -323,7 +323,7 @@ func (s *Storage) LoadStores(f func(store *StoreInfo)) error { for _, str := range res { store := &metapb.Store{} if err := store.Unmarshal([]byte(str)); err != nil { - return errors.WithStack(err) + return errs.ErrProtoUnmarshal.Wrap(err).GenWithStackByArgs() } leaderWeight, err := s.loadFloatWithDefaultValue(s.storeLeaderWeightPath(store.GetId()), 1.0) if err != nil { @@ -364,7 +364,7 @@ func (s *Storage) loadFloatWithDefaultValue(path string, def float64) (float64, } val, err := strconv.ParseFloat(res, 64) if err != nil { - return 0, errors.WithStack(err) + return 0, errs.ErrStrconvParseFloat.Wrap(err).GenWithStackByArgs() } return val, nil } diff --git a/server/grpc_service.go b/server/grpc_service.go index a24138558a2..5f509409bd3 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/server/cluster" "github.com/tikv/pd/server/core" "go.uber.org/zap" @@ -404,7 +405,7 @@ func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error { region := core.RegionFromHeartbeat(request) if region.GetLeader() == nil { - log.Error("invalid request, the leader is nil", zap.Reflect("reqeust", request)) + log.Error("invalid request, the leader is nil", zap.Reflect("request", request), errs.ZapError(errs.ErrLeaderNil)) continue } if region.GetID() == 0 { diff --git a/server/heartbeat_streams.go b/server/heartbeat_streams.go index a0fdb37a410..57f7c5cea31 100644 --- a/server/heartbeat_streams.go +++ b/server/heartbeat_streams.go @@ -88,7 +88,7 @@ func (s *heartbeatStreams) run() { if store == nil { log.Error("failed to get store", zap.Uint64("region-id", msg.RegionId), - zap.Uint64("store-id", storeID)) + zap.Uint64("store-id", storeID), errs.ZapError(errs.ErrGetSourceStore)) delete(s.streams, storeID) continue } @@ -96,7 +96,7 @@ func (s *heartbeatStreams) run() { if stream, ok := s.streams[storeID]; ok { if err := stream.Send(msg); err != nil { log.Error("send heartbeat message fail", - zap.Uint64("region-id", msg.RegionId), zap.Error(err)) + zap.Uint64("region-id", msg.RegionId), errs.ZapError(errs.ErrGRPCSend.Wrap(err).GenWithStackByArgs())) delete(s.streams, storeID) regionHeartbeatCounter.WithLabelValues(storeAddress, storeLabel, "push", "err").Inc() } else { @@ -112,7 +112,7 @@ func (s *heartbeatStreams) run() { for storeID, stream := range s.streams { store := s.cluster.GetStore(storeID) if store == nil { - log.Error("failed to get store", zap.Uint64("store-id", storeID)) + log.Error("failed to get store", zap.Uint64("store-id", storeID), errs.ZapError(errs.ErrGetSourceStore)) delete(s.streams, storeID) continue } diff --git a/server/id/id.go b/server/id/id.go index 35b9aa19455..3e533fa5c82 100644 --- a/server/id/id.go +++ b/server/id/id.go @@ -17,8 +17,8 @@ import ( "path" "sync" - "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/etcdutil" "github.com/tikv/pd/pkg/typeutil" "github.com/tikv/pd/server/kv" @@ -101,10 +101,10 @@ func (alloc *AllocatorImpl) generate() (uint64, error) { t := txn.If(append([]clientv3.Cmp{cmp}, clientv3.Compare(clientv3.Value(leaderPath), "=", alloc.member))...) resp, err := t.Then(clientv3.OpPut(key, string(value))).Commit() if err != nil { - return 0, err + return 0, errs.ErrEtcdTxn.Wrap(err).GenWithStackByArgs() } if !resp.Succeeded { - return 0, errors.New("generate id failed, we may not leader") + return 0, errs.ErrEtcdTxn.FastGenByArgs() } log.Info("idAllocator allocates a new id", zap.Uint64("alloc-id", end)) diff --git a/server/kv/etcd_kv.go b/server/kv/etcd_kv.go index 763b7bf6fde..f4fa5a5914d 100644 --- a/server/kv/etcd_kv.go +++ b/server/kv/etcd_kv.go @@ -55,7 +55,7 @@ func (kv *etcdKVBase) Load(key string) (string, error) { if n := len(resp.Kvs); n == 0 { return "", nil } else if n > 1 { - return "", errors.Errorf("load more than one kvs: key %v kvs %v", key, n) + return "", errs.ErrEtcdKVGetResponse.GenWithStackByArgs(resp.Kvs) } return string(resp.Kvs[0].Value), nil } diff --git a/server/member/lease.go b/server/member/lease.go index ad920455d0a..a6efb49dc81 100644 --- a/server/member/lease.go +++ b/server/member/lease.go @@ -18,7 +18,6 @@ import ( "sync/atomic" "time" - "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" "go.etcd.io/etcd/clientv3" @@ -50,7 +49,7 @@ func (l *LeaderLease) Grant(leaseTimeout int64) error { leaseResp, err := l.lease.Grant(ctx, leaseTimeout) cancel() if err != nil { - return errors.WithStack(err) + return errs.ErrEtcdGrantLease.Wrap(err).GenWithStackByCause() } if cost := time.Since(start); cost > slowRequestTime { log.Warn("lease grants too slow", zap.Duration("cost", cost)) diff --git a/server/server.go b/server/server.go index a2bdc56f0c3..73cd9256b5c 100644 --- a/server/server.go +++ b/server/server.go @@ -172,7 +172,7 @@ func combineBuilderServerHTTPService(ctx context.Context, svr *Server, serviceBu return nil, err } if !info.IsCore && len(info.PathPrefix) == 0 && (len(info.Name) == 0 || len(info.Version) == 0) { - return nil, errors.Errorf("invalid API information, group %s version %s", info.Name, info.Version) + return nil, errs.ErrAPIInformationInvalid.FastGenByArgs(info.Name, info.Version) } var pathPrefix string if len(info.PathPrefix) != 0 { @@ -183,7 +183,7 @@ func combineBuilderServerHTTPService(ctx context.Context, svr *Server, serviceBu pathPrefix = path.Join(ExtensionsPath, info.Name, info.Version) } if _, ok := registerMap[pathPrefix]; ok { - return nil, errors.Errorf("service with path [%s] already registered", pathPrefix) + return nil, errs.ErrServiceRegistered.FastGenByArgs(pathPrefix) } log.Info("register REST path", zap.String("path", pathPrefix)) @@ -263,13 +263,13 @@ func (s *Server) startEtcd(ctx context.Context) error { etcd, err := embed.StartEtcd(s.etcdCfg) if err != nil { - return errors.WithStack(err) + return errs.ErrStartEtcd.Wrap(err).GenWithStackByCause() } // Check cluster ID urlmap, err := types.NewURLsMap(s.cfg.InitialCluster) if err != nil { - return errors.WithStack(err) + return errs.ErrEtcdURLMap.Wrap(err).GenWithStackByCause() } tlsConfig, err := s.cfg.Security.ToTLSConfig() if err != nil { @@ -284,7 +284,7 @@ func (s *Server) startEtcd(ctx context.Context) error { // Wait etcd until it is ready to use case <-etcd.Server.ReadyNotify(): case <-newCtx.Done(): - return errors.Errorf("canceled when waiting embed etcd to be ready") + return errs.ErrCancelStartEtcd.FastGenByArgs() } endpoints := []string{s.etcdCfg.ACUrls[0].String()} @@ -296,7 +296,7 @@ func (s *Server) startEtcd(ctx context.Context) error { TLS: tlsConfig, }) if err != nil { - return errors.WithStack(err) + return errs.ErrNewEtcdClient.Wrap(err).GenWithStackByCause() } etcdServerID := uint64(etcd.Server.ID()) @@ -429,7 +429,7 @@ func (s *Server) Close() { s.hbStreams.Close() } if err := s.storage.Close(); err != nil { - log.Error("close storage meet error", zap.Error(err)) + log.Error("close storage meet error", errs.ZapError(err)) } // Run callbacks @@ -448,7 +448,7 @@ func (s *Server) IsClosed() bool { // Run runs the pd server. func (s *Server) Run() error { go StartMonitor(s.ctx, time.Now, func() { - log.Error("system time jumps backward") + log.Error("system time jumps backward", errs.ZapError(errs.ErrIncorrectSystemTime)) timeJumpBackCounter.Inc() }) if err := s.startEtcd(s.ctx); err != nil { @@ -565,11 +565,11 @@ func (s *Server) bootstrapCluster(req *pdpb.BootstrapRequest) (*pdpb.BootstrapRe bootstrapCmp := clientv3.Compare(clientv3.CreateRevision(clusterRootPath), "=", 0) resp, err := kv.NewSlowLogTxn(s.client).If(bootstrapCmp).Then(ops...).Commit() if err != nil { - return nil, errors.WithStack(err) + return nil, errs.ErrEtcdTxn.Wrap(err).GenWithStackByCause() } if !resp.Succeeded { log.Warn("cluster already bootstrapped", zap.Uint64("cluster-id", clusterID)) - return nil, errors.Errorf("cluster %d already bootstrapped", clusterID) + return nil, errs.ErrEtcdTxn.FastGenByArgs() } log.Info("bootstrap cluster ok", zap.Uint64("cluster-id", clusterID)) @@ -736,7 +736,7 @@ func (s *Server) GetConfig() *config.Config { log.Error("failed to decode scheduler config", zap.String("config", configs[i]), zap.String("scheduler", sche), - zap.Error(err)) + errs.ZapError(err)) continue } payload[sche] = config @@ -768,7 +768,7 @@ func (s *Server) SetScheduleConfig(cfg config.ScheduleConfig) error { log.Error("failed to update schedule config", zap.Reflect("new", cfg), zap.Reflect("old", old), - zap.Error(err)) + errs.ZapError(err)) return err } log.Info("schedule config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old)) @@ -814,7 +814,7 @@ func (s *Server) SetReplicationConfig(cfg config.ReplicationConfig) error { log.Error("failed to update replication config", zap.Reflect("new", cfg), zap.Reflect("old", old), - zap.Error(err)) + errs.ZapError(err)) return err } log.Info("replication config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old)) @@ -852,7 +852,7 @@ func (s *Server) SetPDServerConfig(cfg config.PDServerConfig) error { log.Error("failed to update PDServer config", zap.Reflect("new", cfg), zap.Reflect("old", old), - zap.Error(err)) + errs.ZapError(err)) return err } log.Info("PD server config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old)) @@ -868,7 +868,7 @@ func (s *Server) SetLabelPropertyConfig(cfg config.LabelPropertyConfig) error { log.Error("failed to update label property config", zap.Reflect("new", cfg), zap.Reflect("old", &old), - zap.Error(err)) + errs.ZapError(err)) return err } log.Info("label property config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old)) @@ -886,7 +886,7 @@ func (s *Server) SetLabelProperty(typ, labelKey, labelValue string) error { zap.String("labelKey", labelKey), zap.String("labelValue", labelValue), zap.Reflect("config", s.persistOptions.GetLabelPropertyConfig()), - zap.Error(err)) + errs.ZapError(err)) return err } @@ -905,7 +905,7 @@ func (s *Server) DeleteLabelProperty(typ, labelKey, labelValue string) error { zap.String("labelKey", labelKey), zap.String("labelValue", labelValue), zap.Reflect("config", s.persistOptions.GetLabelPropertyConfig()), - zap.Error(err)) + errs.ZapError(err)) return err } @@ -932,7 +932,7 @@ func (s *Server) SetClusterVersion(v string) error { log.Error("failed to update cluster version", zap.String("old-version", old.String()), zap.String("new-version", v), - zap.Error(err)) + errs.ZapError(err)) return err } log.Info("cluster version is updated", zap.String("new-version", v)) @@ -1039,7 +1039,7 @@ func (s *Server) SetReplicationModeConfig(cfg config.ReplicationModeConfig) erro log.Error("failed to update replication mode config", zap.Reflect("new", cfg), zap.Reflect("old", &old), - zap.Error(err)) + errs.ZapError(err)) return err } log.Info("replication mode config is updated", zap.Reflect("new", cfg), zap.Reflect("old", old)) @@ -1057,7 +1057,7 @@ func (s *Server) SetReplicationModeConfig(cfg config.ReplicationModeConfig) erro s.persistOptions.SetReplicationModeConfig(old) revertErr := s.persistOptions.Persist(s.storage) if revertErr != nil { - log.Error("failed to revert replication mode persistent config", zap.Error(err)) + log.Error("failed to revert replication mode persistent config", errs.ZapError(revertErr)) } } return err @@ -1083,7 +1083,7 @@ func (s *Server) leaderLoop() { if leader != nil { err := s.reloadConfigFromKV() if err != nil { - log.Error("reload config failed", zap.Error(err)) + log.Error("reload config failed", errs.ZapError(err)) continue } syncer := s.cluster.GetRegionSyncer() @@ -1114,7 +1114,7 @@ func (s *Server) campaignLeader() { lease := member.NewLeaderLease(s.client) defer lease.Close() if err := s.member.CampaignLeader(lease, s.cfg.LeaderLease); err != nil { - log.Error("campaign leader meet error", zap.Error(err)) + log.Error("campaign leader meet error", errs.ZapError(err)) return } @@ -1132,14 +1132,14 @@ func (s *Server) campaignLeader() { log.Debug("sync timestamp for tso") if err := s.tso.SyncTimestamp(lease); err != nil { - log.Error("failed to sync timestamp", zap.Error(err)) + log.Error("failed to sync timestamp", errs.ZapError(err)) return } defer s.tso.ResetTimestamp() err := s.reloadConfigFromKV() if err != nil { - log.Error("failed to reload configuration", zap.Error(err)) + log.Error("failed to reload configuration", errs.ZapError(err)) return } // Try to create raft cluster. @@ -1229,7 +1229,7 @@ func (s *Server) ReplicateFileToAllMembers(ctx context.Context, name string, dat clientUrls := member.GetClientUrls() if len(clientUrls) == 0 { log.Warn("failed to replicate file", zap.String("name", name), zap.String("member", member.GetName()), errs.ZapError(err)) - return errors.Errorf("failed to replicate to member %s: clientUrls is empty", member.GetName()) + return errs.ErrClientURLEmpty.FastGenByArgs() } url := clientUrls[0] + filepath.Join("/pd/api/v1/admin/persist-file", name) req, _ := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(data)) @@ -1237,11 +1237,11 @@ func (s *Server) ReplicateFileToAllMembers(ctx context.Context, name string, dat res, err := s.httpClient.Do(req) if err != nil { log.Warn("failed to replicate file", zap.String("name", name), zap.String("member", member.GetName()), errs.ZapError(err)) - return errors.Errorf("failed to replicate to member %s", member.GetName()) + return errs.ErrSendRequest.Wrap(err).GenWithStackByCause() } if res.StatusCode != http.StatusOK { log.Warn("failed to replicate file", zap.String("name", name), zap.String("member", member.GetName()), zap.Int("status-code", res.StatusCode)) - return errors.Errorf("failed to replicate to member %s", member.GetName()) + return errs.ErrSendRequest.FastGenByArgs() } } return nil diff --git a/server/systime_mon.go b/server/systime_mon.go index ebfa7a0603b..291ec1197ab 100644 --- a/server/systime_mon.go +++ b/server/systime_mon.go @@ -18,6 +18,7 @@ import ( "time" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/errs" "go.uber.org/zap" ) @@ -31,7 +32,7 @@ func StartMonitor(ctx context.Context, now func() time.Time, systimeErrHandler f select { case <-tick.C: if now().UnixNano() < last { - log.Error("system time jump backward", zap.Int64("last", last)) + log.Error("system time jump backward", zap.Int64("last", last), errs.ZapError(errs.ErrIncorrectSystemTime)) systimeErrHandler() } case <-ctx.Done():