Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
Merge branch 'master' into raw_backup
Browse files Browse the repository at this point in the history
  • Loading branch information
3pointer authored Feb 24, 2020
2 parents b6b4d73 + 3c9d42f commit 35f3698
Show file tree
Hide file tree
Showing 38 changed files with 1,074 additions and 516 deletions.
2 changes: 1 addition & 1 deletion cmd/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func runBackupCommand(command *cobra.Command, cmdName string) error {
if err := cfg.ParseFromFlags(command.Flags()); err != nil {
return err
}
return task.RunBackup(GetDefaultContext(), cmdName, &cfg)
return task.RunBackup(GetDefaultContext(), tidbGlue, cmdName, &cfg)
}

func runBackupRawCommand(command *cobra.Command, cmdName string) error {
Expand Down
2 changes: 2 additions & 0 deletions cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/spf13/cobra"
"go.uber.org/zap"

"github.com/pingcap/br/pkg/gluetidb"
"github.com/pingcap/br/pkg/task"
"github.com/pingcap/br/pkg/utils"
)
Expand All @@ -21,6 +22,7 @@ var (
initOnce = sync.Once{}
defaultContext context.Context
hasLogFile uint64
tidbGlue = gluetidb.Glue{}
)

const (
Expand Down
2 changes: 1 addition & 1 deletion cmd/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func runRestoreCommand(command *cobra.Command, cmdName string) error {
if err := cfg.ParseFromFlags(command.Flags()); err != nil {
return err
}
return task.RunRestore(GetDefaultContext(), cmdName, &cfg)
return task.RunRestore(GetDefaultContext(), tidbGlue, cmdName, &cfg)
}

// NewRestoreCommand returns a restore subcommand
Expand Down
10 changes: 5 additions & 5 deletions cmd/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,19 +196,19 @@ func newBackupMetaCommand() *cobra.Command {
newTable := new(model.TableInfo)
tableID, _ := tableIDAllocator.Alloc()
newTable.ID = int64(tableID)
newTable.Name = table.Schema.Name
newTable.Indices = make([]*model.IndexInfo, len(table.Schema.Indices))
for i, indexInfo := range table.Schema.Indices {
newTable.Name = table.Info.Name
newTable.Indices = make([]*model.IndexInfo, len(table.Info.Indices))
for i, indexInfo := range table.Info.Indices {
indexID, _ := indexIDAllocator.Alloc()
newTable.Indices[i] = &model.IndexInfo{
ID: int64(indexID),
Name: indexInfo.Name,
}
}
rules := restore.GetRewriteRules(newTable, table.Schema, 0)
rules := restore.GetRewriteRules(newTable, table.Info, 0)
rewriteRules.Table = append(rewriteRules.Table, rules.Table...)
rewriteRules.Data = append(rewriteRules.Data, rules.Data...)
tableIDMap[table.Schema.ID] = int64(tableID)
tableIDMap[table.Info.ID] = int64(tableID)
}
// Validate rewrite rules
for _, file := range files {
Expand Down
16 changes: 9 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,25 @@ require (
github.com/cheggaaa/pb/v3 v3.0.1
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f // indirect
github.com/fatih/color v1.9.0 // indirect
github.com/fsouza/fake-gcs-server v1.15.0
github.com/go-sql-driver/mysql v1.4.1
github.com/gogo/protobuf v1.3.1
github.com/golang/snappy v0.0.1 // indirect
github.com/google/btree v1.0.0
github.com/google/uuid v1.1.1
github.com/mattn/go-runewidth v0.0.7 // indirect
github.com/onsi/ginkgo v1.10.3 // indirect
github.com/onsi/gomega v1.7.1 // indirect
github.com/pingcap/check v0.0.0-20191216031241-8a5a85928f12
github.com/pingcap/check v0.0.0-20200212061837-5e12011dc712
github.com/pingcap/errors v0.11.5-0.20190809092503-95897b64e011
github.com/pingcap/kvproto v0.0.0-20200108025604-a4dc183d2af5
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9
github.com/pingcap/parser v0.0.0-20200109073933-a9496438d77d
github.com/pingcap/pd v1.1.0-beta.0.20191219054547-4d65bbefbc6d
github.com/pingcap/tidb v1.1.0-beta.0.20200110130413-8c3ee37c1938
github.com/pingcap/kvproto v0.0.0-20200217103621-528e82bf7248
github.com/pingcap/log v0.0.0-20200117041106-d28c14d3b1cd
github.com/pingcap/parser v0.0.0-20200218113622-517beb2e39c2
github.com/pingcap/pd v1.1.0-beta.0.20200106144140-f5a7aa985497
github.com/pingcap/tidb v1.1.0-beta.0.20200223044457-aedea3ec5e1e
github.com/pingcap/tidb-tools v4.0.0-beta+incompatible
github.com/pingcap/tipb v0.0.0-20191227083941-3996eff010dc
github.com/pingcap/tipb v0.0.0-20200212061130-c4d518eb1d60
github.com/prometheus/client_golang v1.0.0
github.com/prometheus/common v0.4.1
github.com/sirupsen/logrus v1.4.2
Expand Down
89 changes: 46 additions & 43 deletions go.sum

Large diffs are not rendered by default.

55 changes: 53 additions & 2 deletions pkg/backup/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/store/tikv/oracle"
Expand Down Expand Up @@ -119,15 +120,20 @@ func (bc *Client) SetStorage(ctx context.Context, backend *kvproto.StorageBacken
}

// SaveBackupMeta saves the current backup meta at the given path.
func (bc *Client) SaveBackupMeta(ctx context.Context) error {
func (bc *Client) SaveBackupMeta(ctx context.Context, ddlJobs []*model.Job) error {
ddlJobsData, err := json.Marshal(ddlJobs)
if err != nil {
return errors.Trace(err)
}
bc.backupMeta.Ddls = ddlJobsData
backupMetaData, err := proto.Marshal(&bc.backupMeta)
if err != nil {
return errors.Trace(err)
}
log.Debug("backup meta",
zap.Reflect("meta", bc.backupMeta))
backendURL := storage.FormatBackendURL(bc.backend)
log.Info("save backup meta", zap.Stringer("path", &backendURL))
log.Info("save backup meta", zap.Stringer("path", &backendURL), zap.Int("jobs", len(ddlJobs)))
return bc.storage.Write(ctx, utils.MetaFile, backupMetaData)
}

Expand Down Expand Up @@ -241,6 +247,51 @@ func BuildBackupRangeAndSchema(
return ranges, backupSchemas, nil
}

// GetBackupDDLJobs returns the ddl jobs are done in (lastBackupTS, backupTS]
func GetBackupDDLJobs(dom *domain.Domain, lastBackupTS, backupTS uint64) ([]*model.Job, error) {
snapMeta, err := dom.GetSnapshotMeta(backupTS)
if err != nil {
return nil, errors.Trace(err)
}
lastSnapMeta, err := dom.GetSnapshotMeta(lastBackupTS)
if err != nil {
return nil, errors.Trace(err)
}
lastSchemaVersion, err := lastSnapMeta.GetSchemaVersion()
if err != nil {
return nil, errors.Trace(err)
}
allJobs := make([]*model.Job, 0)
defaultJobs, err := snapMeta.GetAllDDLJobsInQueue(meta.DefaultJobListKey)
if err != nil {
return nil, errors.Trace(err)
}
log.Debug("get default jobs", zap.Int("jobs", len(defaultJobs)))
allJobs = append(allJobs, defaultJobs...)
addIndexJobs, err := snapMeta.GetAllDDLJobsInQueue(meta.AddIndexJobListKey)
if err != nil {
return nil, errors.Trace(err)
}
log.Debug("get add index jobs", zap.Int("jobs", len(addIndexJobs)))
allJobs = append(allJobs, addIndexJobs...)
historyJobs, err := snapMeta.GetAllHistoryDDLJobs()
if err != nil {
return nil, errors.Trace(err)
}
log.Debug("get history jobs", zap.Int("jobs", len(historyJobs)))
allJobs = append(allJobs, historyJobs...)

completedJobs := make([]*model.Job, 0)
for _, job := range allJobs {
if (job.State == model.JobStateDone || job.State == model.JobStateSynced) &&
(job.BinlogInfo != nil && job.BinlogInfo.SchemaVersion > lastSchemaVersion) {
completedJobs = append(completedJobs, job)
}
}
log.Debug("get completed jobs", zap.Int("jobs", len(completedJobs)))
return completedJobs, nil
}

// BackupRanges make a backup of the given key ranges.
func (bc *Client) BackupRanges(
ctx context.Context,
Expand Down
6 changes: 3 additions & 3 deletions pkg/backup/safe_point_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@ import (
pd "github.com/pingcap/pd/client"
"github.com/pingcap/tidb/util/testleak"

"github.com/pingcap/br/pkg/utils"
"github.com/pingcap/br/pkg/mock"
)

var _ = Suite(&testSaftPointSuite{})

type testSaftPointSuite struct {
mock *utils.MockCluster
mock *mock.Cluster
}

func (s *testSaftPointSuite) SetUpSuite(c *C) {
var err error
s.mock, err = utils.NewMockCluster()
s.mock, err = mock.NewCluster()
c.Assert(err, IsNil)
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/backup/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,18 @@ import (
"github.com/pingcap/tidb/util/testkit"
"github.com/pingcap/tidb/util/testleak"

"github.com/pingcap/br/pkg/utils"
"github.com/pingcap/br/pkg/mock"
)

var _ = Suite(&testBackupSchemaSuite{})

type testBackupSchemaSuite struct {
mock *utils.MockCluster
mock *mock.Cluster
}

func (s *testBackupSchemaSuite) SetUpSuite(c *C) {
var err error
s.mock, err = utils.NewMockCluster()
s.mock, err = mock.NewCluster()
c.Assert(err, IsNil)
}

Expand Down Expand Up @@ -77,7 +77,7 @@ func (s *testBackupSchemaSuite) TestBuildBackupRangeAndSchema(c *C) {
<-updateCh
c.Assert(err, IsNil)
c.Assert(len(schemas), Equals, 1)
// MockCluster returns a dummy checksum (all fields are 1).
// Cluster returns a dummy checksum (all fields are 1).
c.Assert(schemas[0].Crc64Xor, Not(Equals), 0, Commentf("%v", schemas[0]))
c.Assert(schemas[0].TotalKvs, Not(Equals), 0, Commentf("%v", schemas[0]))
c.Assert(schemas[0].TotalBytes, Not(Equals), 0, Commentf("%v", schemas[0]))
Expand All @@ -97,7 +97,7 @@ func (s *testBackupSchemaSuite) TestBuildBackupRangeAndSchema(c *C) {
<-updateCh
c.Assert(err, IsNil)
c.Assert(len(schemas), Equals, 2)
// MockCluster returns a dummy checksum (all fields are 1).
// Cluster returns a dummy checksum (all fields are 1).
c.Assert(schemas[0].Crc64Xor, Not(Equals), 0, Commentf("%v", schemas[0]))
c.Assert(schemas[0].TotalKvs, Not(Equals), 0, Commentf("%v", schemas[0]))
c.Assert(schemas[0].TotalBytes, Not(Equals), 0, Commentf("%v", schemas[0]))
Expand Down
8 changes: 4 additions & 4 deletions pkg/checksum/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func buildChecksumRequest(
reqs := make([]*kv.Request, 0, (len(newTable.Indices)+1)*(len(partDefs)+1))
var oldTableID int64
if oldTable != nil {
oldTableID = oldTable.Schema.ID
oldTableID = oldTable.Info.ID
}
rs, err := buildRequest(newTable, newTable.ID, oldTable, oldTableID, startTS)
if err != nil {
Expand All @@ -72,7 +72,7 @@ func buildChecksumRequest(
for _, partDef := range partDefs {
var oldPartID int64
if oldTable != nil {
for _, oldPartDef := range oldTable.Schema.Partition.Definitions {
for _, oldPartDef := range oldTable.Info.Partition.Definitions {
if oldPartDef.Name == partDef.Name {
oldPartID = oldPartDef.ID
}
Expand Down Expand Up @@ -108,7 +108,7 @@ func buildRequest(
}
var oldIndexInfo *model.IndexInfo
if oldTable != nil {
for _, oldIndex := range oldTable.Schema.Indices {
for _, oldIndex := range oldTable.Info.Indices {
if oldIndex.Name == indexInfo.Name {
oldIndexInfo = oldIndex
break
Expand All @@ -117,7 +117,7 @@ func buildRequest(
if oldIndexInfo == nil {
log.Panic("index not found",
zap.Reflect("table", tableInfo),
zap.Reflect("oldTable", oldTable.Schema),
zap.Reflect("oldTable", oldTable.Info),
zap.Stringer("index", indexInfo.Name))
}
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/checksum/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/pingcap/tidb/util/testleak"
"github.com/pingcap/tipb/go-tipb"

"github.com/pingcap/br/pkg/mock"
"github.com/pingcap/br/pkg/utils"
)

Expand All @@ -22,12 +23,12 @@ func TestT(t *testing.T) {
var _ = Suite(&testChecksumSuite{})

type testChecksumSuite struct {
mock *utils.MockCluster
mock *mock.Cluster
}

func (s *testChecksumSuite) SetUpSuite(c *C) {
var err error
s.mock, err = utils.NewMockCluster()
s.mock, err = mock.NewCluster()
c.Assert(err, IsNil)
}

Expand Down Expand Up @@ -61,7 +62,7 @@ func (s *testChecksumSuite) TestChecksum(c *C) {
c.Assert(len(exe1.reqs), Equals, 1)
resp, err := exe1.Execute(context.TODO(), s.mock.Storage.GetClient(), func() {})
c.Assert(err, IsNil)
// MockCluster returns a dummy checksum (all fields are 1).
// Cluster returns a dummy checksum (all fields are 1).
c.Assert(resp.Checksum, Equals, uint64(1), Commentf("%v", resp))
c.Assert(resp.TotalKvs, Equals, uint64(1), Commentf("%v", resp))
c.Assert(resp.TotalBytes, Equals, uint64(1), Commentf("%v", resp))
Expand All @@ -83,7 +84,7 @@ func (s *testChecksumSuite) TestChecksum(c *C) {
// Test rewrite rules
tk.MustExec("alter table t1 add index i2(a);")
tableInfo1 = s.getTableInfo(c, "test", "t1")
oldTable := utils.Table{Schema: tableInfo1}
oldTable := utils.Table{Info: tableInfo1}
exe2, err = NewExecutorBuilder(tableInfo2, math.MaxUint64).
SetOldTable(&oldTable).Build()
c.Assert(err, IsNil)
Expand Down
7 changes: 4 additions & 3 deletions pkg/conn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ import (
"github.com/pingcap/log"
pd "github.com/pingcap/pd/client"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/tikv"
"github.com/pingcap/tidb/util/codec"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/keepalive"

"github.com/pingcap/br/pkg/glue"
)

const (
Expand Down Expand Up @@ -87,7 +88,7 @@ func pdRequest(
}

// NewMgr creates a new Mgr.
func NewMgr(ctx context.Context, pdAddrs string, storage tikv.Storage) (*Mgr, error) {
func NewMgr(ctx context.Context, g glue.Glue, pdAddrs string, storage tikv.Storage) (*Mgr, error) {
addrs := strings.Split(pdAddrs, ",")

failure := errors.Errorf("pd address (%s) has wrong format", pdAddrs)
Expand Down Expand Up @@ -130,7 +131,7 @@ func NewMgr(ctx context.Context, pdAddrs string, storage tikv.Storage) (*Mgr, er
return nil, errors.Errorf("tikv cluster not health %+v", stores)
}

dom, err := session.BootstrapSession(storage)
dom, err := g.BootstrapSession(storage)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
24 changes: 24 additions & 0 deletions pkg/glue/glue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package glue

import (
"context"

"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
)

// Glue is an abstraction of TiDB function calls used in BR.
type Glue interface {
BootstrapSession(store kv.Storage) (*domain.Domain, error)
CreateSession(store kv.Storage) (Session, error)
}

// Session is an abstraction of the session.Session interface.
type Session interface {
Execute(ctx context.Context, sql string) error
ShowCreateDatabase(schema *model.DBInfo) (string, error)
ShowCreateTable(table *model.TableInfo, allocator autoid.Allocator) (string, error)
Close()
}
Loading

0 comments on commit 35f3698

Please sign in to comment.