Skip to content

Commit

Permalink
domain,owner: unify and normalize the format of the log (pingcap#9646)(
Browse files Browse the repository at this point in the history
  • Loading branch information
xiekeyi98 authored and zz-jason committed Apr 4, 2019
1 parent 4cb1ba7 commit 1dba727
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 79 deletions.
94 changes: 50 additions & 44 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sqlexec"
log "github.com/sirupsen/logrus"
"go.uber.org/zap"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
Expand Down Expand Up @@ -92,24 +93,28 @@ func (do *Domain) loadInfoSchema(handle *infoschema.Handle, usedSchemaVersion in
// Update self schema version to etcd.
defer func() {
if err != nil {
log.Info("[ddl] not update self schema version to etcd")
logutil.Logger(context.Background()).Info("cannot update self schema version to etcd")
return
}
err = do.ddl.SchemaSyncer().UpdateSelfVersion(context.Background(), latestSchemaVersion)
if err != nil {
log.Infof("[ddl] update self version from %v to %v failed %v", usedSchemaVersion, latestSchemaVersion, err)
logutil.Logger(context.Background()).Info("update self version failed", zap.Int64("usedSchemaVersion", usedSchemaVersion),
zap.Int64("latestSchemaVersion", latestSchemaVersion), zap.Error(err))
}
}()

startTime := time.Now()
ok, tblIDs, err := do.tryLoadSchemaDiffs(m, usedSchemaVersion, latestSchemaVersion)
if err != nil {
// We can fall back to full load, don't need to return the error.
log.Errorf("[ddl] failed to load schema diff err %v", err)
logutil.Logger(context.Background()).Error("failed to load schema diff", zap.Error(err))
}
if ok {
log.Infof("[ddl] diff load InfoSchema from version %d to %d in %v, tableIDs %v",
usedSchemaVersion, latestSchemaVersion, time.Since(startTime), tblIDs)
logutil.Logger(context.Background()).Info("diff load InfoSchema success",
zap.Int64("usedSchemaVersion", usedSchemaVersion),
zap.Int64("latestSchemaVersion", latestSchemaVersion),
zap.Duration("start time", time.Since(startTime)),
zap.Int64s("tblIDs", tblIDs))
return latestSchemaVersion, tblIDs, fullLoad, nil
}

Expand All @@ -123,8 +128,8 @@ func (do *Domain) loadInfoSchema(handle *infoschema.Handle, usedSchemaVersion in
if err != nil {
return 0, nil, fullLoad, errors.Trace(err)
}
log.Infof("[ddl] full load InfoSchema from version %d to %d, in %v",
usedSchemaVersion, latestSchemaVersion, time.Since(startTime))
logutil.Logger(context.Background()).Info("full load InfoSchema success", zap.Int64("usedSchemaVersion", usedSchemaVersion),
zap.Int64("latestSchemaVersion", latestSchemaVersion), zap.Duration("start time", time.Since(startTime)))
newISBuilder.Build()
return latestSchemaVersion, nil, fullLoad, nil
}
Expand Down Expand Up @@ -326,7 +331,7 @@ func (do *Domain) Reload() error {
metrics.LoadSchemaCounter.WithLabelValues("succ").Inc()

if fullLoad {
log.Info("[ddl] full load and reset schema validator.")
logutil.Logger(context.Background()).Info("full load and reset schema validator")
do.SchemaValidator.Reset()
}
do.SchemaValidator.Update(ver.Ver, schemaVersion, latestSchemaVersion, changedTableIDs)
Expand All @@ -336,7 +341,7 @@ func (do *Domain) Reload() error {
// Reload interval is lease / 2, if load schema time elapses more than this interval,
// some query maybe responded by ErrInfoSchemaExpired error.
if sub > (lease/2) && lease > 0 {
log.Warnf("[ddl] loading schema takes a long time %v", sub)
logutil.Logger(context.Background()).Warn("loading schema takes a long time", zap.Duration("take time", sub))
}

return nil
Expand Down Expand Up @@ -399,11 +404,11 @@ func (do *Domain) infoSyncerKeeper() {
for {
select {
case <-do.info.Done():
log.Info("[ddl] server info syncer need to restart")
logutil.Logger(context.Background()).Info("server info syncer need to restart")
if err := do.info.Restart(context.Background()); err != nil {
log.Error(err)
logutil.Logger(context.Background()).Error("server restart failed", zap.Error(err))
}
log.Info("[ddl] server info syncer restarted.")
logutil.Logger(context.Background()).Info("server info syncer restarted")
case <-do.exit:
return
}
Expand All @@ -424,22 +429,22 @@ func (do *Domain) loadSchemaInLoop(lease time.Duration) {
case <-ticker.C:
err := do.Reload()
if err != nil {
log.Errorf("[ddl] reload schema in loop err %v", errors.ErrorStack(err))
logutil.Logger(context.Background()).Error("reload schema in loop failed", zap.Error(err))
}
case _, ok := <-syncer.GlobalVersionCh():
err := do.Reload()
if err != nil {
log.Errorf("[ddl] reload schema in loop err %v", errors.ErrorStack(err))
logutil.Logger(context.Background()).Error("reload schema in loop failed", zap.Error(err))
}
if !ok {
log.Warn("[ddl] reload schema in loop, schema syncer need rewatch")
logutil.Logger(context.Background()).Warn("reload schema in loop, schema syncer need rewatch")
// Make sure the rewatch doesn't affect load schema, so we watch the global schema version asynchronously.
syncer.WatchGlobalSchemaVer(context.Background())
}
case <-syncer.Done():
// The schema syncer stops, we need stop the schema validator to synchronize the schema version.
log.Info("[ddl] reload schema in loop, schema syncer need restart")
// The etcd is responsible for schema synchronization, we should ensure there is at most two diffrent schema version
logutil.Logger(context.Background()).Info("reload schema in loop, schema syncer need restart")
// The etcd is responsible for schema synchronization, we should ensure there is at most two different schema version
// in the TiDB cluster, to make the data/schema be consistent. If we lost connection/session to etcd, the cluster
// will treats this TiDB as a down instance, and etcd will remove the key of `/tidb/ddl/all_schema_versions/tidb-id`.
// Say the schema version now is 1, the owner is changing the schema version to 2, it will not wait for this down TiDB syncing the schema,
Expand All @@ -448,18 +453,18 @@ func (do *Domain) loadSchemaInLoop(lease time.Duration) {
do.SchemaValidator.Stop()
err := do.mustRestartSyncer()
if err != nil {
log.Errorf("[ddl] reload schema in loop, schema syncer restart err %v", errors.ErrorStack(err))
logutil.Logger(context.Background()).Error("reload schema in loop, schema syncer restart failed", zap.Error(err))
break
}
// The schema maybe changed, must reload schema then the schema validator can restart.
exitLoop := do.mustReload()
if exitLoop {
// domain is closed.
log.Errorf("[ddl] domain is closed. exit loadSchemaInLoop")
logutil.Logger(context.Background()).Error("domain is closed, exit loadSchemaInLoop")
return
}
do.SchemaValidator.Restart()
log.Info("[ddl] schema syncer restarted.")
logutil.Logger(context.Background()).Info("schema syncer restarted")
case <-do.exit:
return
}
Expand All @@ -484,7 +489,7 @@ func (do *Domain) mustRestartSyncer() error {
default:
}
time.Sleep(time.Second)
log.Infof("[ddl] restart the schema syncer failed %v", err)
logutil.Logger(context.Background()).Info("restart the schema syncer failed", zap.Error(err))
}
}

Expand All @@ -494,15 +499,15 @@ func (do *Domain) mustReload() (exitLoop bool) {
for {
err := do.Reload()
if err == nil {
log.Infof("[ddl] mustReload succeed.")
logutil.Logger(context.Background()).Info("mustReload succeed")
return false
}

log.Infof("[ddl] reload the schema failed: %v", err)
logutil.Logger(context.Background()).Info("reload the schema failed", zap.Error(err))
// If the domain is closed, we returns immediately.
select {
case <-do.exit:
log.Infof("[ddl] domain is closed.")
logutil.Logger(context.Background()).Info("domain is closed")
return true
default:
}
Expand All @@ -526,7 +531,7 @@ func (do *Domain) Close() {
do.sysSessionPool.Close()
do.slowQuery.Close()
do.wg.Wait()
log.Info("[domain] close")
logutil.Logger(context.Background()).Info("domain closed")
}

type ddlCallback struct {
Expand All @@ -538,11 +543,11 @@ func (c *ddlCallback) OnChanged(err error) error {
if err != nil {
return err
}
log.Infof("[ddl] on DDL change, must reload")
logutil.Logger(context.Background()).Info("performing DDL change, must reload")

err = c.do.Reload()
if err != nil {
log.Errorf("[ddl] on DDL change reload err %v", err)
logutil.Logger(context.Background()).Error("performing DDL change failed", zap.Error(err))
}

return nil
Expand Down Expand Up @@ -759,7 +764,7 @@ func (do *Domain) LoadPrivilegeLoop(ctx sessionctx.Context) error {
case <-time.After(duration):
}
if !ok {
log.Error("[domain] load privilege loop watch channel closed.")
logutil.Logger(context.Background()).Error("load privilege loop watch channel closed")
watchCh = do.etcdClient.Watch(context.Background(), privilegeKey)
count++
if count > 10 {
Expand All @@ -772,9 +777,9 @@ func (do *Domain) LoadPrivilegeLoop(ctx sessionctx.Context) error {
err := do.privHandle.Update(ctx)
metrics.LoadPrivilegeCounter.WithLabelValues(metrics.RetLabel(err)).Inc()
if err != nil {
log.Error("[domain] load privilege fail:", errors.ErrorStack(err))
logutil.Logger(context.Background()).Error("load privilege failed", zap.Error(err))
} else {
log.Debug("[domain] reload privilege success.")
logutil.Logger(context.Background()).Debug("reload privilege success")
}
}
}()
Expand Down Expand Up @@ -847,7 +852,7 @@ func (do *Domain) newStatsOwner() owner.Manager {
// TODO: Need to do something when err is not nil.
err := statsOwner.CampaignOwner(cancelCtx)
if err != nil {
log.Warnf("[stats] campaign owner fail: %s", errors.ErrorStack(err))
logutil.Logger(context.Background()).Warn("campaign owner failed", zap.Error(err))
}
return statsOwner
}
Expand All @@ -872,9 +877,9 @@ func (do *Domain) updateStatsWorker(ctx sessionctx.Context, owner owner.Manager)
t := time.Now()
err := statsHandle.InitStats(do.InfoSchema())
if err != nil {
log.Debug("[stats] init stats info failed: ", errors.ErrorStack(err))
logutil.Logger(context.Background()).Debug("init stats info failed", zap.Error(err))
} else {
log.Info("[stats] init stats info takes ", time.Now().Sub(t))
logutil.Logger(context.Background()).Info("init stats info time", zap.Duration("take time", time.Since(t)))
}
defer func() {
do.SetStatsUpdating(false)
Expand All @@ -885,7 +890,7 @@ func (do *Domain) updateStatsWorker(ctx sessionctx.Context, owner owner.Manager)
case <-loadTicker.C:
err = statsHandle.Update(do.InfoSchema())
if err != nil {
log.Debug("[stats] update stats info fail: ", errors.ErrorStack(err))
logutil.Logger(context.Background()).Debug("update stats info failed", zap.Error(err))
}
case <-do.exit:
statsHandle.FlushStats()
Expand All @@ -894,18 +899,18 @@ func (do *Domain) updateStatsWorker(ctx sessionctx.Context, owner owner.Manager)
case t := <-statsHandle.DDLEventCh():
err = statsHandle.HandleDDLEvent(t)
if err != nil {
log.Debug("[stats] handle ddl event fail: ", errors.ErrorStack(err))
logutil.Logger(context.Background()).Debug("handle ddl event failed", zap.Error(err))
}
case <-deltaUpdateTicker.C:
err = statsHandle.DumpStatsDeltaToKV(statistics.DumpDelta)
if err != nil {
log.Debug("[stats] dump stats delta fail: ", errors.ErrorStack(err))
logutil.Logger(context.Background()).Debug("dump stats delta failed", zap.Error(err))
}
statsHandle.UpdateErrorRate(do.InfoSchema())
case <-loadHistogramTicker.C:
err = statsHandle.LoadNeededHistograms()
if err != nil {
log.Debug("[stats] load histograms fail: ", errors.ErrorStack(err))
logutil.Logger(context.Background()).Debug("load histograms failed", zap.Error(err))
}
case <-loadFeedbackTicker.C:
statsHandle.UpdateStatsByLocalFeedback(do.InfoSchema())
Expand All @@ -914,20 +919,20 @@ func (do *Domain) updateStatsWorker(ctx sessionctx.Context, owner owner.Manager)
}
err = statsHandle.HandleUpdateStats(do.InfoSchema())
if err != nil {
log.Debug("[stats] update stats using feedback fail: ", errors.ErrorStack(err))
logutil.Logger(context.Background()).Debug("update stats using feedback failed", zap.Error(err))
}
case <-dumpFeedbackTicker.C:
err = statsHandle.DumpStatsFeedbackToKV()
if err != nil {
log.Debug("[stats] dump stats feedback fail: ", errors.ErrorStack(err))
logutil.Logger(context.Background()).Debug("dump stats feedback failed", zap.Error(err))
}
case <-gcStatsTicker.C:
if !owner.IsOwner() {
continue
}
err = statsHandle.GCStats(do.InfoSchema(), do.DDL().GetLease())
if err != nil {
log.Debug("[stats] gc stats fail: ", errors.ErrorStack(err))
logutil.Logger(context.Background()).Debug("GC stats failed", zap.Error(err))
}
}
}
Expand Down Expand Up @@ -962,13 +967,13 @@ func (do *Domain) NotifyUpdatePrivilege(ctx sessionctx.Context) {
row := do.etcdClient.KV
_, err := row.Put(context.Background(), privilegeKey, "")
if err != nil {
log.Warn("notify update privilege failed:", err)
logutil.Logger(context.Background()).Warn("notify update privilege failed", zap.Error(err))
}
}
// update locally
_, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(ctx, `FLUSH PRIVILEGES`)
if err != nil {
log.Errorf("Unable to update privileges: %s", err)
logutil.Logger(context.Background()).Error("unable to update privileges", zap.Error(err))
}
}

Expand All @@ -978,7 +983,8 @@ func recoverInDomain(funcName string, quit bool) {
return
}
buf := util.GetStack()
log.Errorf("%s, %v, %s", funcName, r, buf)
logutil.Logger(context.Background()).Error("recover in domain failed", zap.String("funcName", funcName),
zap.Any("error", r), zap.String("buffer", string(buf)))
metrics.PanicCounter.WithLabelValues(metrics.LabelDomain).Inc()
if quit {
// Wait for metrics to be pushed.
Expand Down
10 changes: 6 additions & 4 deletions domain/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ import (
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/owner"
"github.com/pingcap/tidb/util/hack"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/printer"
log "github.com/sirupsen/logrus"
"go.uber.org/zap"
"golang.org/x/net/context"
)

Expand Down Expand Up @@ -138,7 +139,7 @@ func (is *InfoSyncer) RemoveServerInfo() {
}
err := ddl.DeleteKeyFromEtcd(is.serverInfoPath, is.etcdCli, keyOpDefaultRetryCnt, keyOpDefaultTimeout)
if err != nil {
log.Errorf("[info-syncer] remove server info failed %v", err)
logutil.Logger(context.Background()).Error("remove server info failed", zap.Error(err))
}
}

Expand Down Expand Up @@ -187,15 +188,16 @@ func getInfo(ctx context.Context, etcdCli *clientv3.Client, key string, retryCnt
resp, err = etcdCli.Get(childCtx, key, opts...)
cancel()
if err != nil {
log.Infof("[info-syncer] get %s failed %v, continue checking.", key, err)
logutil.Logger(context.Background()).Info("get key failed", zap.String("key", key), zap.Error(err))
time.Sleep(200 * time.Millisecond)
continue
}
for _, kv := range resp.Kvs {
info := &ServerInfo{}
err = json.Unmarshal(kv.Value, info)
if err != nil {
log.Infof("[info-syncer] get %s, json.Unmarshal %v failed %v.", kv.Key, kv.Value, err)
logutil.Logger(context.Background()).Info("get key failed", zap.String("key", string(kv.Key)), zap.ByteString("value", kv.Value),
zap.Error(err))
return nil, errors.Trace(err)
}
allInfo[info.ID] = info
Expand Down
Loading

0 comments on commit 1dba727

Please sign in to comment.