Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into master-msc-add-idx
Browse files Browse the repository at this point in the history
  • Loading branch information
tangenta committed Jul 7, 2022
2 parents 921005a + 7d37ac1 commit dd1f957
Show file tree
Hide file tree
Showing 22 changed files with 9,882 additions and 9,624 deletions.
43 changes: 0 additions & 43 deletions br/pkg/backup/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,49 +250,6 @@ func TestOnBackupRegionErrorResponse(t *testing.T) {
}
}

func TestSendCreds(t *testing.T) {
s, clean := createBackupSuite(t)
defer clean()

accessKey := "ab"
secretAccessKey := "cd"
backendOpt := storage.BackendOptions{
S3: storage.S3BackendOptions{
AccessKey: accessKey,
SecretAccessKey: secretAccessKey,
},
}
backend, err := storage.ParseBackend("s3://bucket/prefix/", &backendOpt)
require.NoError(t, err)
opts := &storage.ExternalStorageOptions{
SendCredentials: true,
}
_, err = storage.New(s.ctx, backend, opts)
require.NoError(t, err)
access_key := backend.GetS3().AccessKey
require.Equal(t, "ab", access_key)
secret_access_key := backend.GetS3().SecretAccessKey
require.Equal(t, "cd", secret_access_key)

backendOpt = storage.BackendOptions{
S3: storage.S3BackendOptions{
AccessKey: accessKey,
SecretAccessKey: secretAccessKey,
},
}
backend, err = storage.ParseBackend("s3://bucket/prefix/", &backendOpt)
require.NoError(t, err)
opts = &storage.ExternalStorageOptions{
SendCredentials: false,
}
_, err = storage.New(s.ctx, backend, opts)
require.NoError(t, err)
access_key = backend.GetS3().AccessKey
require.Equal(t, "", access_key)
secret_access_key = backend.GetS3().SecretAccessKey
require.Equal(t, "", secret_access_key)
}

func TestSkipUnsupportedDDLJob(t *testing.T) {
s, clean := createBackupSuite(t)
defer clean()
Expand Down
20 changes: 20 additions & 0 deletions br/pkg/restore/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@ import (
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/br/pkg/metautil"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/sessionctx/variable"
tidbutil "github.com/pingcap/tidb/util"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)
Expand Down Expand Up @@ -450,6 +452,24 @@ func DDLJobBlockListRule(ddlJob *model.Job) bool {
return checkIsInActions(ddlJob.Type, incrementalRestoreActionBlockList)
}

// GetExistedUserDBs get dbs created or modified by users
func GetExistedUserDBs(dom *domain.Domain) []*model.DBInfo {
databases := dom.InfoSchema().AllSchemas()
existedDatabases := make([]*model.DBInfo, 0, 16)
for _, db := range databases {
dbName := db.Name.L
if tidbutil.IsMemOrSysDB(dbName) {
continue
} else if dbName == "test" && len(db.Tables) == 0 {
continue
} else {
existedDatabases = append(existedDatabases, db)
}
}

return existedDatabases
}

func getDatabases(tables []*metautil.Table) (dbs []*model.DBInfo) {
dbIDs := make(map[int64]bool)
for _, table := range tables {
Expand Down
50 changes: 50 additions & 0 deletions br/pkg/restore/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/pingcap/tidb/br/pkg/mock"
"github.com/pingcap/tidb/br/pkg/restore"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
Expand Down Expand Up @@ -368,3 +369,52 @@ func TestFilterDDLJobByRules(t *testing.T) {
assert.Equal(t, expectedDDLTypes[i], ddlJob.Type)
}
}

func TestGetExistedUserDBs(t *testing.T) {
m, err := mock.NewCluster()
require.Nil(t, err)
defer m.Stop()
dom := m.Domain

dbs := restore.GetExistedUserDBs(dom)
require.Equal(t, 0, len(dbs))

builder, err := infoschema.NewBuilder(m.Store(), nil).InitWithDBInfos(
[]*model.DBInfo{
{Name: model.NewCIStr("mysql")},
{Name: model.NewCIStr("test")},
},
nil, 1)
require.Nil(t, err)
dom.MockInfoCacheAndLoadInfoSchema(builder.Build())
dbs = restore.GetExistedUserDBs(dom)
require.Equal(t, 0, len(dbs))

builder, err = infoschema.NewBuilder(m.Store(), nil).InitWithDBInfos(
[]*model.DBInfo{
{Name: model.NewCIStr("mysql")},
{Name: model.NewCIStr("test")},
{Name: model.NewCIStr("d1")},
},
nil, 1)
require.Nil(t, err)
dom.MockInfoCacheAndLoadInfoSchema(builder.Build())
dbs = restore.GetExistedUserDBs(dom)
require.Equal(t, 1, len(dbs))

builder, err = infoschema.NewBuilder(m.Store(), nil).InitWithDBInfos(
[]*model.DBInfo{
{Name: model.NewCIStr("mysql")},
{Name: model.NewCIStr("d1")},
{
Name: model.NewCIStr("test"),
Tables: []*model.TableInfo{{Name: model.NewCIStr("t1"), State: model.StatePublic}},
State: model.StatePublic,
},
},
nil, 1)
require.Nil(t, err)
dom.MockInfoCacheAndLoadInfoSchema(builder.Build())
dbs = restore.GetExistedUserDBs(dom)
require.Equal(t, 2, len(dbs))
}
40 changes: 35 additions & 5 deletions br/pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
Expand Down Expand Up @@ -53,6 +54,7 @@ const (

// TODO make this configurable, 5 mb is a good minimum size but on low latency/high bandwidth network you can go a lot bigger
hardcodedS3ChunkSize = 5 * 1024 * 1024
defaultRegion = "us-east-1"
// to check the cloud type by endpoint tag.
domainAliyun = "aliyuncs.com"
)
Expand Down Expand Up @@ -131,9 +133,6 @@ type S3BackendOptions struct {

// Apply apply s3 options on backuppb.S3.
func (options *S3BackendOptions) Apply(s3 *backuppb.S3) error {
if options.Region == "" {
options.Region = "us-east-1"
}
if options.Endpoint != "" {
u, err := url.Parse(options.Endpoint)
if err != nil {
Expand Down Expand Up @@ -274,8 +273,12 @@ func createOssRamCred() (*credentials.Credentials, error) {
func newS3Storage(backend *backuppb.S3, opts *ExternalStorageOptions) (obj *S3Storage, errRet error) {
qs := *backend
awsConfig := aws.NewConfig().
WithS3ForcePathStyle(qs.ForcePathStyle).
WithRegion(qs.Region)
WithS3ForcePathStyle(qs.ForcePathStyle)
if qs.Region == "" {
awsConfig.WithRegion(defaultRegion)
} else {
awsConfig.WithRegion(qs.Region)
}
request.WithRetryer(awsConfig, defaultS3Retryer())
if qs.Endpoint != "" {
awsConfig.WithEndpoint(qs.Endpoint)
Expand Down Expand Up @@ -315,6 +318,33 @@ func newS3Storage(backend *backuppb.S3, opts *ExternalStorageOptions) (obj *S3St
}

c := s3.New(ses)
// s3manager.GetBucketRegionWithClient will set credential anonymous, which works with s3.
// we need reassign credential to be compatible with minio authentication.
confCred := ses.Config.Credentials
setCredOpt := func(req *request.Request) {
if confCred != nil {
req.Config.Credentials = confCred
}
}
region, err := s3manager.GetBucketRegionWithClient(context.Background(), c, qs.Bucket, setCredOpt)
if err != nil {
return nil, errors.Annotatef(err, "failed to get region of bucket %s", qs.Bucket)
}

if qs.Region != region {
if qs.Region != "" {
return nil, errors.Trace(fmt.Errorf("s3 bucket and region are not matched, bucket=%s, input region=%s, real region=%s",
qs.Bucket, qs.Region, region))
}

qs.Region = region
if region != defaultRegion {
awsConfig.WithRegion(region)
c = s3.New(ses, awsConfig)
}
}
log.Info("succeed to get bucket region from s3", zap.String("bucket region", region))

if len(qs.Prefix) > 0 && !strings.HasSuffix(qs.Prefix, "/") {
qs.Prefix += "/"
}
Expand Down
Loading

0 comments on commit dd1f957

Please sign in to comment.