Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

domain: fix updating the self schema version and update log (#10324) #10359

Merged
merged 1 commit into from
May 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 27 additions & 18 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,40 +82,47 @@ func (do *Domain) loadInfoSchema(handle *infoschema.Handle, usedSchemaVersion in
return 0, nil, fullLoad, errors.Trace(err)
}
m := meta.NewSnapshotMeta(snapshot)
latestSchemaVersion, err := m.GetSchemaVersion()
neededSchemaVersion, err := m.GetSchemaVersion()
if err != nil {
return 0, nil, fullLoad, errors.Trace(err)
}
if usedSchemaVersion != 0 && usedSchemaVersion == latestSchemaVersion {
return latestSchemaVersion, nil, fullLoad, nil
if usedSchemaVersion != 0 && usedSchemaVersion == neededSchemaVersion {
return neededSchemaVersion, nil, fullLoad, nil
}

// Update self schema version to etcd.
defer func() {
if err != nil {
logutil.Logger(context.Background()).Info("cannot update self schema version to etcd")
// There are two possibilities for not updating the self schema version to etcd.
// 1. Failed to loading schema information.
// 2. When users use history read feature, the neededSchemaVersion isn't the latest schema version.
if err != nil || neededSchemaVersion < do.InfoSchema().SchemaMetaVersion() {
logutil.Logger(context.Background()).Info("do not update self schema version to etcd",
zap.Int64("usedSchemaVersion", usedSchemaVersion),
zap.Int64("neededSchemaVersion", neededSchemaVersion), zap.Error(err))
return
}
err = do.ddl.SchemaSyncer().UpdateSelfVersion(context.Background(), latestSchemaVersion)

err = do.ddl.SchemaSyncer().UpdateSelfVersion(context.Background(), neededSchemaVersion)
if err != nil {
logutil.Logger(context.Background()).Info("update self version failed", zap.Int64("usedSchemaVersion", usedSchemaVersion),
zap.Int64("latestSchemaVersion", latestSchemaVersion), zap.Error(err))
logutil.Logger(context.Background()).Info("update self version failed",
zap.Int64("usedSchemaVersion", usedSchemaVersion),
zap.Int64("neededSchemaVersion", neededSchemaVersion), zap.Error(err))
}
}()

startTime := time.Now()
ok, tblIDs, err := do.tryLoadSchemaDiffs(m, usedSchemaVersion, latestSchemaVersion)
ok, tblIDs, err := do.tryLoadSchemaDiffs(m, usedSchemaVersion, neededSchemaVersion)
if err != nil {
// We can fall back to full load, don't need to return the error.
logutil.Logger(context.Background()).Error("failed to load schema diff", zap.Error(err))
}
if ok {
logutil.Logger(context.Background()).Info("diff load InfoSchema success",
zap.Int64("usedSchemaVersion", usedSchemaVersion),
zap.Int64("latestSchemaVersion", latestSchemaVersion),
zap.Int64("neededSchemaVersion", neededSchemaVersion),
zap.Duration("start time", time.Since(startTime)),
zap.Int64s("tblIDs", tblIDs))
return latestSchemaVersion, tblIDs, fullLoad, nil
return neededSchemaVersion, tblIDs, fullLoad, nil
}

fullLoad = true
Expand All @@ -124,14 +131,16 @@ func (do *Domain) loadInfoSchema(handle *infoschema.Handle, usedSchemaVersion in
return 0, nil, fullLoad, errors.Trace(err)
}

newISBuilder, err := infoschema.NewBuilder(handle).InitWithDBInfos(schemas, latestSchemaVersion)
newISBuilder, err := infoschema.NewBuilder(handle).InitWithDBInfos(schemas, neededSchemaVersion)
if err != nil {
return 0, nil, fullLoad, errors.Trace(err)
}
logutil.Logger(context.Background()).Info("full load InfoSchema success", zap.Int64("usedSchemaVersion", usedSchemaVersion),
zap.Int64("latestSchemaVersion", latestSchemaVersion), zap.Duration("start time", time.Since(startTime)))
logutil.Logger(context.Background()).Info("full load InfoSchema success",
zap.Int64("usedSchemaVersion", usedSchemaVersion),
zap.Int64("neededSchemaVersion", neededSchemaVersion),
zap.Duration("start time", time.Since(startTime)))
newISBuilder.Build()
return latestSchemaVersion, nil, fullLoad, nil
return neededSchemaVersion, nil, fullLoad, nil
}

func (do *Domain) fetchAllSchemasWithTables(m *meta.Meta) ([]*model.DBInfo, error) {
Expand Down Expand Up @@ -303,7 +312,7 @@ func (do *Domain) Reload() error {
startTime := time.Now()

var err error
var latestSchemaVersion int64
var neededSchemaVersion int64

ver, err := do.store.CurrentVersion()
if err != nil {
Expand All @@ -320,7 +329,7 @@ func (do *Domain) Reload() error {
fullLoad bool
changedTableIDs []int64
)
latestSchemaVersion, changedTableIDs, fullLoad, err = do.loadInfoSchema(do.infoHandle, schemaVersion, ver.Ver)
neededSchemaVersion, changedTableIDs, fullLoad, err = do.loadInfoSchema(do.infoHandle, schemaVersion, ver.Ver)
metrics.LoadSchemaDuration.Observe(time.Since(startTime).Seconds())
if err != nil {
metrics.LoadSchemaCounter.WithLabelValues("failed").Inc()
Expand All @@ -332,7 +341,7 @@ func (do *Domain) Reload() error {
logutil.Logger(context.Background()).Info("full load and reset schema validator")
do.SchemaValidator.Reset()
}
do.SchemaValidator.Update(ver.Ver, schemaVersion, latestSchemaVersion, changedTableIDs)
do.SchemaValidator.Update(ver.Ver, schemaVersion, neededSchemaVersion, changedTableIDs)

lease := do.DDL().GetLease()
sub := time.Since(startTime)
Expand Down
16 changes: 14 additions & 2 deletions domain/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package domain

import (
"context"
"testing"
"time"

Expand Down Expand Up @@ -78,10 +79,21 @@ func (*testSuite) TestT(c *C) {
// for setting lease
lease := 100 * time.Millisecond

// for GetSnapshotInfoSchema
snapIs, err := dom.GetSnapshotInfoSchema(snapTS)
// for updating the self schema version
goCtx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
err = dd.SchemaSyncer().OwnerCheckAllVersions(goCtx, is.SchemaMetaVersion())
cancel()
c.Assert(err, IsNil)
snapIs, err := dom.GetSnapshotInfoSchema(snapTS)
c.Assert(snapIs, NotNil)
c.Assert(err, IsNil)
// Make sure that the self schema version doesn't be changed.
goCtx, cancel = context.WithTimeout(context.Background(), 10*time.Millisecond)
err = dd.SchemaSyncer().OwnerCheckAllVersions(goCtx, is.SchemaMetaVersion())
cancel()
c.Assert(err, IsNil)

// for GetSnapshotInfoSchema
snapTS = oracle.EncodeTSO(oracle.GetPhysical(time.Now()))
snapIs, err = dom.GetSnapshotInfoSchema(snapTS)
c.Assert(err, IsNil)
Expand Down