Skip to content

Commit

Permalink
Update to azure-sdk-for-go to fork with fixed ListBlobsHierarchy
Browse files Browse the repository at this point in the history
Specific commit used because I'm not sure when a new version will be made,
but there's a particularly important fix:
Azure/azure-sdk-for-go#16354 fixes ListBlobsHierarchy when paging is necessary
which turns out to be a broken fix, so a fork is used in the meantime which includes a proper fix:
Azure/azure-sdk-for-go#16992

On some containers paging happens much sooner than MaxResults would imply,
causing restoring LATEST to either fail because wal-g can't find backups, or wal-g selects an old backup

There are two other notable PRs:
Azure/azure-sdk-for-go#16067 fixes a chunk pooling bug
Azure/azure-sdk-for-go#15958 removes the need for FakeIoReadCloser
  • Loading branch information
Philip Dubé committed Feb 8, 2022
1 parent def83ac commit 88e235a
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 42 deletions.
10 changes: 6 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.17

require (
cloud.google.com/go/storage v1.8.0
github.com/Azure/azure-sdk-for-go/sdk/azcore v0.19.0
github.com/Azure/azure-sdk-for-go/sdk/azcore v0.21.0
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.1.0
github.com/Azure/go-autorest/autorest v0.11.21
github.com/DATA-DOG/godog v0.7.14-0.20190529133509-96731eaefa46
Expand All @@ -16,7 +16,7 @@ require (
github.com/cyberdelia/lzo v0.0.0-20171006181345-d85071271a6f
github.com/denisenkom/go-mssqldb v0.10.0
github.com/docker/docker v1.13.1
github.com/go-mysql-org/go-mysql v1.3.0
github.com/go-mysql-org/go-mysql v1.4.1-0.20220126055159-3566d1e608ea
github.com/go-redis/redis v6.15.6+incompatible
github.com/go-sql-driver/mysql v1.5.0
github.com/gofrs/flock v0.8.0
Expand Down Expand Up @@ -53,11 +53,14 @@ require (
golang.org/x/time v0.0.0-20191024005414-555d28b269f0
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
google.golang.org/api v0.28.0
gopkg.in/ini.v1 v1.51.0
)

replace github.com/Azure/azure-sdk-for-go/sdk/storage/azblob => github.com/citusdata/azure-sdk-for-go/sdk/storage/azblob v0.2.1-0.20220207213930-cb4310183c35

require (
cloud.google.com/go v0.57.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v0.7.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.3 // indirect
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
github.com/Azure/go-autorest/autorest/adal v0.9.14 // indirect
github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect
Expand Down Expand Up @@ -134,7 +137,6 @@ require (
google.golang.org/genproto v0.0.0-20200511104702-f5ebc3bea380 // indirect
google.golang.org/grpc v1.29.1 // indirect
google.golang.org/protobuf v1.22.0 // indirect
gopkg.in/ini.v1 v1.51.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
honnef.co/go/tools v0.0.1-2020.1.3 // indirect
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
cloud.google.com/go/storage v1.8.0 h1:86K1Gel7BQ9/WmNWn7dTKMvTLFzwtBe5FNqYbi9X35g=
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go/sdk/azcore v0.19.0 h1:lhSJz9RMbJcTgxifR1hUNJnn6CNYtbgEDtQV22/9RBA=
github.com/Azure/azure-sdk-for-go/sdk/azcore v0.19.0/go.mod h1:h6H6c8enJmmocHUbLiiGY6sx7f9i+X3m1CHdd5c6Rdw=
github.com/Azure/azure-sdk-for-go/sdk/internal v0.7.0 h1:v9p9TfTbf7AwNb5NYQt7hI41IfPoLFiFkLtb+bmGjT0=
github.com/Azure/azure-sdk-for-go/sdk/internal v0.7.0/go.mod h1:yqy467j36fJxcRV2TzfVZ1pCb5vxm4BtZPUdYWe/Xo8=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.1.0 h1:Vo/IczUGNicgE6oo/h7/fK67i0tTdkybedBDhg2SYS8=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.1.0/go.mod h1:7Gua5ZlDIbLGZj/MPeaoKP8sb42GDGaFHeuTLHJ5yzA=
github.com/Azure/azure-sdk-for-go/sdk/azcore v0.20.0 h1:KQgdWmEOmaJKxaUUZwHAYh12t+b+ZJf8q3friycK1kA=
github.com/Azure/azure-sdk-for-go/sdk/azcore v0.20.0/go.mod h1:ZPW/Z0kLCTdDZaDbYTetxc9Cxl/2lNqxYHYNOF2bti0=
github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.1 h1:BUYIbDf/mMZ8945v3QkG3OuqGVyS4Iek0AOLwdRAYoc=
github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.1/go.mod h1:KLF4gFr6DcKFZwSuH8w8yEK6DpFl3LP5rhdvAb7Yz5I=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.2.1-0.20211216081027-ece5efb4f7c7 h1:we2QAQbtjtQkATLIG1pMkIMIHll3GVA0zOThbiRByug=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.2.1-0.20211216081027-ece5efb4f7c7/go.mod h1:eHWhQKXc1Gv1DvWH//UzgWjWFEo0Pp4pH2vBzjBw8Fc=
github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs=
github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
github.com/Azure/go-autorest/autorest v0.11.21 h1:w77zY/9RnUAWcIQyDC0Fc89mCvwftR8F+zsR/OH6enk=
Expand Down
51 changes: 19 additions & 32 deletions pkg/storages/azure/folder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/wal-g/tracelog"
"github.com/wal-g/wal-g/pkg/storages/storage"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/Azure/go-autorest/autorest/azure"
Expand Down Expand Up @@ -59,7 +58,7 @@ func NewCredentialError(settingName string) storage.Error {
func NewFolder(
uploadStreamToBlockBlobOptions azblob.UploadStreamToBlockBlobOptions,
containerClient azblob.ContainerClient,
credential azcore.Credential,
credential *azblob.SharedKeyCredential,
timeout time.Duration,
path string) *Folder {
return &Folder{
Expand Down Expand Up @@ -91,10 +90,10 @@ func ConfigureFolder(prefix string, settings map[string]string) (storage.Folder,
environmentName = defaultEnvName
}

var credential azcore.Credential
var credential *azblob.SharedKeyCredential
var err error
if usingToken {
credential = azcore.NewAnonymousCredential()
credential = nil
} else {
credential, err = azblob.NewSharedKeyCredential(accountName, accountKey)
if err != nil {
Expand All @@ -111,6 +110,7 @@ func ConfigureFolder(prefix string, settings map[string]string) (storage.Folder,
} else {
tryTimeout = defaultTryTimeout
}
timeout := time.Duration(tryTimeout) * time.Minute

containerName, path, err := storage.GetPathFromPrefix(prefix)
if err != nil {
Expand All @@ -120,23 +120,28 @@ func ConfigureFolder(prefix string, settings map[string]string) (storage.Folder,
storageEndpointSuffix := getStorageEndpointSuffix(environmentName)

var containerUrlString string
var containerClient azblob.ContainerClient
if usingToken {
containerUrlString = fmt.Sprintf("https://%s.blob.%s/%s%s", accountName, storageEndpointSuffix, containerName, accountToken)
_, err = url.Parse(containerUrlString)
if err != nil {
return nil, NewFolderError(err, "Unable to parse service URL with SAS token")
}

containerClient, err = azblob.NewContainerClientWithNoCredential(containerUrlString, &azblob.ClientOptions{
Retry: policy.RetryOptions{TryTimeout: timeout},
})
} else {
containerUrlString = fmt.Sprintf("https://%s.blob.%s/%s", accountName, storageEndpointSuffix, containerName)
_, err = url.Parse(containerUrlString)
if err != nil {
return nil, NewFolderError(err, "Unable to parse service URL")
}

containerClient, err = azblob.NewContainerClientWithSharedKey(containerUrlString, credential, &azblob.ClientOptions{
Retry: policy.RetryOptions{TryTimeout: timeout},
})
}
timeout := time.Duration(tryTimeout) * time.Minute
containerClient, err := azblob.NewContainerClient(containerUrlString, credential, &azblob.ClientOptions{
Retry: policy.RetryOptions{TryTimeout: timeout},
})
if err != nil {
return nil, NewFolderError(err, "Unable to create service client")
}
Expand All @@ -147,7 +152,7 @@ func ConfigureFolder(prefix string, settings map[string]string) (storage.Folder,
type Folder struct {
uploadStreamToBlockBlobOptions azblob.UploadStreamToBlockBlobOptions
containerClient azblob.ContainerClient
credential azcore.Credential
credential *azblob.SharedKeyCredential
timeout time.Duration
path string
}
Expand Down Expand Up @@ -326,20 +331,20 @@ func (folder *Folder) ReadObject(objectRelativePath string) (io.ReadCloser, erro
return nil, NewFolderError(err, "Unable to download blob %s.", path)
}

if cred, ok := folder.credential.(*azblob.SharedKeyCredential); ok {
if folder.credential != nil {
// Shared Key auth involves signing each request
if d := req.Header.Get("x-ms-data"); d == "" {
req.Header.Set("x-ms-date", time.Now().UTC().Format(http.TimeFormat))
}
stringToSign, stringErr := buildStringToSign(cred, req)
stringToSign, stringErr := buildStringToSign(folder.credential, req)
if stringErr != nil {
return nil, NewFolderError(stringErr, "Unable to sign request to sign blob %s.", path)
}
signature, sigErr := cred.ComputeHMACSHA256(stringToSign)
signature, sigErr := folder.credential.ComputeHMACSHA256(stringToSign)
if sigErr != nil {
return nil, NewFolderError(sigErr, "Unable to sign request to sign blob %s.", path)
}
req.Header.Set("Authorization", strings.Join([]string{"SharedKey ", cred.AccountName(), ":", signature}, ""))
req.Header.Set("Authorization", strings.Join([]string{"SharedKey ", folder.credential.AccountName(), ":", signature}, ""))
}

resp, err := httpClient.Do(req)
Expand All @@ -360,30 +365,12 @@ func (folder *Folder) ReadObject(objectRelativePath string) (io.ReadCloser, erro
return resp.Body, nil
}

// blobClient.UploadStreamToBlockBlob only requires an io.Reader
// See https://github.com/Azure/azure-sdk-for-go/pull/15958
type FakeIoReadSeekCloser struct {
reader io.Reader
}

func (ioReader FakeIoReadSeekCloser) Read(p []byte) (n int, err error) {
return ioReader.reader.Read(p)
}

func (ioReader FakeIoReadSeekCloser) Seek(offset int64, whence int) (int64, error) {
return 0, errors.New("FakeIoReadSeekCloser received Seek, only handles Read")
}

func (ioReader FakeIoReadSeekCloser) Close() error {
return errors.New("FakeIoReadSeekCloser received Close, only handles Read")
}

func (folder *Folder) PutObject(name string, content io.Reader) error {
tracelog.DebugLogger.Printf("Put %v into %v\n", name, folder.path)
//Upload content to a block blob using full path
path := storage.JoinPath(folder.path, name)
blobClient := folder.containerClient.NewBlockBlobClient(path)
_, err := blobClient.UploadStreamToBlockBlob(context.Background(), FakeIoReadSeekCloser{reader:content}, folder.uploadStreamToBlockBlobOptions)
_, err := blobClient.UploadStreamToBlockBlob(context.Background(), content, folder.uploadStreamToBlockBlobOptions)
if err != nil {
return NewFolderError(err, "Unable to upload blob %v", name)
}
Expand Down

0 comments on commit 88e235a

Please sign in to comment.