Skip to content

Commit

Permalink
br: get bucket region from s3 to enable users not to input s3 region …
Browse files Browse the repository at this point in the history
…param (#34419)

close #34275
  • Loading branch information
WangLe1321 authored Jul 7, 2022
1 parent e39a987 commit 7d37ac1
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 83 deletions.
43 changes: 0 additions & 43 deletions br/pkg/backup/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,49 +250,6 @@ func TestOnBackupRegionErrorResponse(t *testing.T) {
}
}

func TestSendCreds(t *testing.T) {
s, clean := createBackupSuite(t)
defer clean()

accessKey := "ab"
secretAccessKey := "cd"
backendOpt := storage.BackendOptions{
S3: storage.S3BackendOptions{
AccessKey: accessKey,
SecretAccessKey: secretAccessKey,
},
}
backend, err := storage.ParseBackend("s3://bucket/prefix/", &backendOpt)
require.NoError(t, err)
opts := &storage.ExternalStorageOptions{
SendCredentials: true,
}
_, err = storage.New(s.ctx, backend, opts)
require.NoError(t, err)
access_key := backend.GetS3().AccessKey
require.Equal(t, "ab", access_key)
secret_access_key := backend.GetS3().SecretAccessKey
require.Equal(t, "cd", secret_access_key)

backendOpt = storage.BackendOptions{
S3: storage.S3BackendOptions{
AccessKey: accessKey,
SecretAccessKey: secretAccessKey,
},
}
backend, err = storage.ParseBackend("s3://bucket/prefix/", &backendOpt)
require.NoError(t, err)
opts = &storage.ExternalStorageOptions{
SendCredentials: false,
}
_, err = storage.New(s.ctx, backend, opts)
require.NoError(t, err)
access_key = backend.GetS3().AccessKey
require.Equal(t, "", access_key)
secret_access_key = backend.GetS3().SecretAccessKey
require.Equal(t, "", secret_access_key)
}

func TestSkipUnsupportedDDLJob(t *testing.T) {
s, clean := createBackupSuite(t)
defer clean()
Expand Down
40 changes: 35 additions & 5 deletions br/pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/pingcap/errors"
backuppb "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/log"
Expand Down Expand Up @@ -53,6 +54,7 @@ const (

// TODO make this configurable, 5 mb is a good minimum size but on low latency/high bandwidth network you can go a lot bigger
hardcodedS3ChunkSize = 5 * 1024 * 1024
defaultRegion = "us-east-1"
// to check the cloud type by endpoint tag.
domainAliyun = "aliyuncs.com"
)
Expand Down Expand Up @@ -131,9 +133,6 @@ type S3BackendOptions struct {

// Apply apply s3 options on backuppb.S3.
func (options *S3BackendOptions) Apply(s3 *backuppb.S3) error {
if options.Region == "" {
options.Region = "us-east-1"
}
if options.Endpoint != "" {
u, err := url.Parse(options.Endpoint)
if err != nil {
Expand Down Expand Up @@ -274,8 +273,12 @@ func createOssRamCred() (*credentials.Credentials, error) {
func newS3Storage(backend *backuppb.S3, opts *ExternalStorageOptions) (obj *S3Storage, errRet error) {
qs := *backend
awsConfig := aws.NewConfig().
WithS3ForcePathStyle(qs.ForcePathStyle).
WithRegion(qs.Region)
WithS3ForcePathStyle(qs.ForcePathStyle)
if qs.Region == "" {
awsConfig.WithRegion(defaultRegion)
} else {
awsConfig.WithRegion(qs.Region)
}
request.WithRetryer(awsConfig, defaultS3Retryer())
if qs.Endpoint != "" {
awsConfig.WithEndpoint(qs.Endpoint)
Expand Down Expand Up @@ -315,6 +318,33 @@ func newS3Storage(backend *backuppb.S3, opts *ExternalStorageOptions) (obj *S3St
}

c := s3.New(ses)
// s3manager.GetBucketRegionWithClient will set credential anonymous, which works with s3.
// we need reassign credential to be compatible with minio authentication.
confCred := ses.Config.Credentials
setCredOpt := func(req *request.Request) {
if confCred != nil {
req.Config.Credentials = confCred
}
}
region, err := s3manager.GetBucketRegionWithClient(context.Background(), c, qs.Bucket, setCredOpt)
if err != nil {
return nil, errors.Annotatef(err, "failed to get region of bucket %s", qs.Bucket)
}

if qs.Region != region {
if qs.Region != "" {
return nil, errors.Trace(fmt.Errorf("s3 bucket and region are not matched, bucket=%s, input region=%s, real region=%s",
qs.Bucket, qs.Region, region))
}

qs.Region = region
if region != defaultRegion {
awsConfig.WithRegion(region)
c = s3.New(ses, awsConfig)
}
}
log.Info("succeed to get bucket region from s3", zap.String("bucket region", region))

if len(qs.Prefix) > 0 && !strings.HasSuffix(qs.Prefix, "/") {
qs.Prefix += "/"
}
Expand Down
106 changes: 71 additions & 35 deletions br/pkg/storage/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"fmt"
"io"
"math/rand"
"net/http"
"net/http/httptest"
"os"
"testing"

Expand All @@ -24,6 +26,8 @@ import (
"github.com/stretchr/testify/require"
)

const bucketRegionHeader = "X-Amz-Bucket-Region"

type s3Suite struct {
controller *gomock.Controller
s3 *mock.MockS3API
Expand Down Expand Up @@ -53,6 +57,15 @@ func createS3Suite(c gomock.TestReporter) (s *s3Suite, clean func()) {
return
}

func createGetBucketRegionServer(region string, statusCode int, incHeader bool) *httptest.Server {
return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if incHeader {
w.Header().Set(bucketRegionHeader, region)
}
w.WriteHeader(statusCode)
}))
}

func TestApply(t *testing.T) {
type testcase struct {
name string
Expand Down Expand Up @@ -145,7 +158,7 @@ func TestApplyUpdate(t *testing.T) {
Endpoint: "",
},
s3: &backuppb.S3{
Region: "us-east-1",
Region: "",
Bucket: "bucket",
Prefix: "prefix",
},
Expand All @@ -167,7 +180,7 @@ func TestApplyUpdate(t *testing.T) {
Endpoint: "https://s3.us-west-2",
},
s3: &backuppb.S3{
Region: "us-east-1",
Region: "",
Endpoint: "https://s3.us-west-2",
Bucket: "bucket",
Prefix: "prefix",
Expand All @@ -179,7 +192,7 @@ func TestApplyUpdate(t *testing.T) {
Endpoint: "http://s3.us-west-2",
},
s3: &backuppb.S3{
Region: "us-east-1",
Region: "",
Endpoint: "http://s3.us-west-2",
Bucket: "bucket",
Prefix: "prefix",
Expand Down Expand Up @@ -271,6 +284,10 @@ func TestS3Storage(t *testing.T) {
hackPermission []Permission
sendCredential bool
}

s := createGetBucketRegionServer("us-west-2", 200, true)
defer s.Close()

testFn := func(test *testcase, t *testing.T) {
t.Log(test.name)
ctx := aws.BackgroundContext()
Expand All @@ -294,58 +311,33 @@ func TestS3Storage(t *testing.T) {
}
}
tests := []testcase{
{
name: "no region and endpoint",
s3: &backuppb.S3{
Region: "",
Endpoint: "",
Bucket: "bucket",
Prefix: "prefix",
},
errReturn: true,
hackPermission: []Permission{AccessBuckets},
sendCredential: true,
},
{
name: "no region",
s3: &backuppb.S3{
Region: "",
Endpoint: "http://10.1.2.3",
Endpoint: s.URL,
Bucket: "bucket",
Prefix: "prefix",
},
errReturn: true,
hackPermission: []Permission{AccessBuckets},
errReturn: false,
sendCredential: true,
},
{
name: "no endpoint",
name: "wrong region",
s3: &backuppb.S3{
Region: "us-west-2",
Endpoint: "",
Region: "us-east-2",
Endpoint: s.URL,
Bucket: "bucket",
Prefix: "prefix",
},
errReturn: true,
hackPermission: []Permission{AccessBuckets},
sendCredential: true,
},
{
name: "no region",
s3: &backuppb.S3{
Region: "",
Endpoint: "http://10.1.2.3",
Bucket: "bucket",
Prefix: "prefix",
},
errReturn: false,
sendCredential: true,
},
{
name: "normal region",
name: "right region",
s3: &backuppb.S3{
Region: "us-west-2",
Endpoint: "",
Endpoint: s.URL,
Bucket: "bucket",
Prefix: "prefix",
},
Expand All @@ -356,6 +348,7 @@ func TestS3Storage(t *testing.T) {
name: "keys configured explicitly",
s3: &backuppb.S3{
Region: "us-west-2",
Endpoint: s.URL,
AccessKey: "ab",
SecretAccessKey: "cd",
Bucket: "bucket",
Expand All @@ -368,6 +361,7 @@ func TestS3Storage(t *testing.T) {
name: "no access key",
s3: &backuppb.S3{
Region: "us-west-2",
Endpoint: s.URL,
SecretAccessKey: "cd",
Bucket: "bucket",
Prefix: "prefix",
Expand All @@ -379,6 +373,7 @@ func TestS3Storage(t *testing.T) {
name: "no secret access key",
s3: &backuppb.S3{
Region: "us-west-2",
Endpoint: s.URL,
AccessKey: "ab",
Bucket: "bucket",
Prefix: "prefix",
Expand All @@ -390,6 +385,7 @@ func TestS3Storage(t *testing.T) {
name: "no secret access key",
s3: &backuppb.S3{
Region: "us-west-2",
Endpoint: s.URL,
AccessKey: "ab",
Bucket: "bucket",
Prefix: "prefix",
Expand Down Expand Up @@ -1121,3 +1117,43 @@ func TestWalkDirWithEmptyPrefix(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 1, i)
}

func TestSendCreds(t *testing.T) {
accessKey := "ab"
secretAccessKey := "cd"
backendOpt := BackendOptions{
S3: S3BackendOptions{
AccessKey: accessKey,
SecretAccessKey: secretAccessKey,
},
}
backend, err := ParseBackend("s3://bucket/prefix/", &backendOpt)
require.NoError(t, err)
opts := &ExternalStorageOptions{
SendCredentials: true,
}
_, err = New(context.TODO(), backend, opts)
require.NoError(t, err)
sentAccessKey := backend.GetS3().AccessKey
require.Equal(t, accessKey, sentAccessKey)
sentSecretAccessKey := backend.GetS3().SecretAccessKey
require.Equal(t, sentSecretAccessKey, sentSecretAccessKey)

backendOpt = BackendOptions{
S3: S3BackendOptions{
AccessKey: accessKey,
SecretAccessKey: secretAccessKey,
},
}
backend, err = ParseBackend("s3://bucket/prefix/", &backendOpt)
require.NoError(t, err)
opts = &ExternalStorageOptions{
SendCredentials: false,
}
_, err = New(context.TODO(), backend, opts)
require.NoError(t, err)
sentAccessKey = backend.GetS3().AccessKey
require.Equal(t, "", sentAccessKey)
sentSecretAccessKey = backend.GetS3().SecretAccessKey
require.Equal(t, "", sentSecretAccessKey)
}

0 comments on commit 7d37ac1

Please sign in to comment.