Skip to content

Commit

Permalink
domain: fix updating the self schema version and update log (#10324) (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
zimulala authored and winkyao committed May 6, 2019
1 parent 7eaea30 commit c1cc84c
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 20 deletions.
45 changes: 27 additions & 18 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,40 +82,47 @@ func (do *Domain) loadInfoSchema(handle *infoschema.Handle, usedSchemaVersion in
return 0, nil, fullLoad, errors.Trace(err)
}
m := meta.NewSnapshotMeta(snapshot)
latestSchemaVersion, err := m.GetSchemaVersion()
neededSchemaVersion, err := m.GetSchemaVersion()
if err != nil {
return 0, nil, fullLoad, errors.Trace(err)
}
if usedSchemaVersion != 0 && usedSchemaVersion == latestSchemaVersion {
return latestSchemaVersion, nil, fullLoad, nil
if usedSchemaVersion != 0 && usedSchemaVersion == neededSchemaVersion {
return neededSchemaVersion, nil, fullLoad, nil
}

// Update self schema version to etcd.
defer func() {
if err != nil {
logutil.Logger(context.Background()).Info("cannot update self schema version to etcd")
// There are two possibilities for not updating the self schema version to etcd.
// 1. Failed to loading schema information.
// 2. When users use history read feature, the neededSchemaVersion isn't the latest schema version.
if err != nil || neededSchemaVersion < do.InfoSchema().SchemaMetaVersion() {
logutil.Logger(context.Background()).Info("do not update self schema version to etcd",
zap.Int64("usedSchemaVersion", usedSchemaVersion),
zap.Int64("neededSchemaVersion", neededSchemaVersion), zap.Error(err))
return
}
err = do.ddl.SchemaSyncer().UpdateSelfVersion(context.Background(), latestSchemaVersion)

err = do.ddl.SchemaSyncer().UpdateSelfVersion(context.Background(), neededSchemaVersion)
if err != nil {
logutil.Logger(context.Background()).Info("update self version failed", zap.Int64("usedSchemaVersion", usedSchemaVersion),
zap.Int64("latestSchemaVersion", latestSchemaVersion), zap.Error(err))
logutil.Logger(context.Background()).Info("update self version failed",
zap.Int64("usedSchemaVersion", usedSchemaVersion),
zap.Int64("neededSchemaVersion", neededSchemaVersion), zap.Error(err))
}
}()

startTime := time.Now()
ok, tblIDs, err := do.tryLoadSchemaDiffs(m, usedSchemaVersion, latestSchemaVersion)
ok, tblIDs, err := do.tryLoadSchemaDiffs(m, usedSchemaVersion, neededSchemaVersion)
if err != nil {
// We can fall back to full load, don't need to return the error.
logutil.Logger(context.Background()).Error("failed to load schema diff", zap.Error(err))
}
if ok {
logutil.Logger(context.Background()).Info("diff load InfoSchema success",
zap.Int64("usedSchemaVersion", usedSchemaVersion),
zap.Int64("latestSchemaVersion", latestSchemaVersion),
zap.Int64("neededSchemaVersion", neededSchemaVersion),
zap.Duration("start time", time.Since(startTime)),
zap.Int64s("tblIDs", tblIDs))
return latestSchemaVersion, tblIDs, fullLoad, nil
return neededSchemaVersion, tblIDs, fullLoad, nil
}

fullLoad = true
Expand All @@ -124,14 +131,16 @@ func (do *Domain) loadInfoSchema(handle *infoschema.Handle, usedSchemaVersion in
return 0, nil, fullLoad, errors.Trace(err)
}

newISBuilder, err := infoschema.NewBuilder(handle).InitWithDBInfos(schemas, latestSchemaVersion)
newISBuilder, err := infoschema.NewBuilder(handle).InitWithDBInfos(schemas, neededSchemaVersion)
if err != nil {
return 0, nil, fullLoad, errors.Trace(err)
}
logutil.Logger(context.Background()).Info("full load InfoSchema success", zap.Int64("usedSchemaVersion", usedSchemaVersion),
zap.Int64("latestSchemaVersion", latestSchemaVersion), zap.Duration("start time", time.Since(startTime)))
logutil.Logger(context.Background()).Info("full load InfoSchema success",
zap.Int64("usedSchemaVersion", usedSchemaVersion),
zap.Int64("neededSchemaVersion", neededSchemaVersion),
zap.Duration("start time", time.Since(startTime)))
newISBuilder.Build()
return latestSchemaVersion, nil, fullLoad, nil
return neededSchemaVersion, nil, fullLoad, nil
}

func (do *Domain) fetchAllSchemasWithTables(m *meta.Meta) ([]*model.DBInfo, error) {
Expand Down Expand Up @@ -303,7 +312,7 @@ func (do *Domain) Reload() error {
startTime := time.Now()

var err error
var latestSchemaVersion int64
var neededSchemaVersion int64

ver, err := do.store.CurrentVersion()
if err != nil {
Expand All @@ -320,7 +329,7 @@ func (do *Domain) Reload() error {
fullLoad bool
changedTableIDs []int64
)
latestSchemaVersion, changedTableIDs, fullLoad, err = do.loadInfoSchema(do.infoHandle, schemaVersion, ver.Ver)
neededSchemaVersion, changedTableIDs, fullLoad, err = do.loadInfoSchema(do.infoHandle, schemaVersion, ver.Ver)
metrics.LoadSchemaDuration.Observe(time.Since(startTime).Seconds())
if err != nil {
metrics.LoadSchemaCounter.WithLabelValues("failed").Inc()
Expand All @@ -332,7 +341,7 @@ func (do *Domain) Reload() error {
logutil.Logger(context.Background()).Info("full load and reset schema validator")
do.SchemaValidator.Reset()
}
do.SchemaValidator.Update(ver.Ver, schemaVersion, latestSchemaVersion, changedTableIDs)
do.SchemaValidator.Update(ver.Ver, schemaVersion, neededSchemaVersion, changedTableIDs)

lease := do.DDL().GetLease()
sub := time.Since(startTime)
Expand Down
16 changes: 14 additions & 2 deletions domain/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package domain

import (
"context"
"testing"
"time"

Expand Down Expand Up @@ -78,10 +79,21 @@ func (*testSuite) TestT(c *C) {
// for setting lease
lease := 100 * time.Millisecond

// for GetSnapshotInfoSchema
snapIs, err := dom.GetSnapshotInfoSchema(snapTS)
// for updating the self schema version
goCtx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
err = dd.SchemaSyncer().OwnerCheckAllVersions(goCtx, is.SchemaMetaVersion())
cancel()
c.Assert(err, IsNil)
snapIs, err := dom.GetSnapshotInfoSchema(snapTS)
c.Assert(snapIs, NotNil)
c.Assert(err, IsNil)
// Make sure that the self schema version doesn't be changed.
goCtx, cancel = context.WithTimeout(context.Background(), 10*time.Millisecond)
err = dd.SchemaSyncer().OwnerCheckAllVersions(goCtx, is.SchemaMetaVersion())
cancel()
c.Assert(err, IsNil)

// for GetSnapshotInfoSchema
snapTS = oracle.EncodeTSO(oracle.GetPhysical(time.Now()))
snapIs, err = dom.GetSnapshotInfoSchema(snapTS)
c.Assert(err, IsNil)
Expand Down

0 comments on commit c1cc84c

Please sign in to comment.