Skip to content

Commit

Permalink
op: New package containing the high-level upload code
Browse files Browse the repository at this point in the history
So mantle can easily reuse the code without mucking with making them
parallel and such.
  • Loading branch information
krnowak committed Jun 12, 2024
1 parent ebb35dd commit 3893a51
Show file tree
Hide file tree
Showing 5 changed files with 288 additions and 192 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.20
require (
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.2
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.2
github.com/coreos/pkg v0.0.0-20240122114842-bbd7aa9bf6fb
gopkg.in/urfave/cli.v1 v1.20.0
)

Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.2 h1:YUUxeiOWgdAQE3pXt
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.2/go.mod h1:dmXQgZuiSubAecswZE+Sm8jkvEa7kQgTPVRvwL/nd0E=
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 h1:XHOnouVk1mxXfQidrMEnLlPk9UMeRtyBTnEFtxkV0kU=
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI=
github.com/coreos/pkg v0.0.0-20240122114842-bbd7aa9bf6fb h1:GIzvVQ9UkUlOhSDlqmrQAAAUd6R3E+caIisNEyWXvNE=
github.com/coreos/pkg v0.0.0-20240122114842-bbd7aa9bf6fb/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk=
github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk=
Expand Down
264 changes: 264 additions & 0 deletions op/upload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
package op

import (
"context"
"encoding/base64"
"errors"
"fmt"
"runtime"
"strings"

"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/pageblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
"github.com/coreos/pkg/multierror"

"github.com/flatcar/azure-vhd-utils/upload"
"github.com/flatcar/azure-vhd-utils/upload/metadata"
"github.com/flatcar/azure-vhd-utils/vhdcore/common"
"github.com/flatcar/azure-vhd-utils/vhdcore/diskstream"
"github.com/flatcar/azure-vhd-utils/vhdcore/validator"
)

type Error int

const (
MissingVHDSuffix Error = iota
BlobAlreadyExists
MissingUploadMetadata
)

func (e Error) Error() string {
switch e {
case MissingVHDSuffix:
return "missing .vhd suffix in blob name"
case BlobAlreadyExists:
return "blob already exists"
case MissingUploadMetadata:
return "blob has no upload metadata"
default:
return "unknown upload error"
}
}

func ErrorIsAnyOf(err error, errs ...Error) bool {
var opError Error
if !errors.As(err, &opError) {
return false
}

for _, e := range errs {
if opError == e {
return true
}
}

return false
}

type UploadOptions struct {
Overwrite bool
Parallelism int
Logger func(string)
}

func noopLogger(s string) {
}

func Upload(ctx context.Context, blobServiceClient *service.Client, container, blob, vhd string, opts *UploadOptions) error {
const PageBlobPageSize int64 = 512
const PageBlobPageSetSize int64 = 4 * 1024 * 1024

if !strings.HasSuffix(strings.ToLower(blob), ".vhd") {
return MissingVHDSuffix
}

if opts == nil {
opts = &UploadOptions{
Logger: noopLogger,
}
}

parallelism := 8 * runtime.NumCPU()
if opts.Parallelism > 0 {
parallelism = opts.Parallelism
}
overwrite := opts.Overwrite
logger := opts.Logger

if err := ensureVHDSanity(vhd); err != nil {
return err
}

diskStream, err := diskstream.CreateNewDiskStream(vhd)
if err != nil {
return err
}
defer diskStream.Close()

containerClient := blobServiceClient.NewContainerClient(container)
pageblobClient := containerClient.NewPageBlobClient(blob)
blobClient := pageblobClient.BlobClient()

_, err = containerClient.Create(ctx, nil)
if err != nil && !bloberror.HasCode(err, bloberror.ContainerAlreadyExists, bloberror.ResourceAlreadyExists) {
return err
}

blobExists := true
blobProperties, err := blobClient.GetProperties(ctx, nil)
if err != nil {
if !bloberror.HasCode(err, bloberror.BlobNotFound, bloberror.ResourceNotFound) {
return err
}
blobExists = false
}

resume := false
var blobMetadata *metadata.Metadata
if blobExists {
if !overwrite {
if len(blobProperties.ContentMD5) > 0 {
return BlobAlreadyExists
}
blobMetadata, err = metadata.NewMetadataFromBlobMetadata(blobProperties.Metadata)
if err != nil {
return err
}
if blobMetadata == nil {
return MissingUploadMetadata
}
}
resume = true
logger(fmt.Sprintf("Blob with name '%s' already exists, checking upload can be resumed", blob))
}

localMetadata, err := metadata.NewMetadataFromLocalVHD(vhd)
if err != nil {
return err
}

var rangesToSkip []*common.IndexRange
if resume {
if errs := metadata.CompareMetadata(blobMetadata, localMetadata); len(errs) > 0 {
return multierror.Error(errs)
}
ranges, err := getAlreadyUploadedBlobRanges(ctx, pageblobClient)
if err != nil {
return err
}
rangesToSkip = ranges
} else {
if err := createBlob(ctx, pageblobClient, diskStream.GetSize(), localMetadata); err != nil {
return err
}
}

uploadableRanges, err := upload.LocateUploadableRanges(diskStream, rangesToSkip, PageBlobPageSize, PageBlobPageSetSize)
if err != nil {
return err
}

uploadableRanges, err = upload.DetectEmptyRanges(diskStream, uploadableRanges)
if err != nil {
return err
}

uploadContext := &upload.DiskUploadContext{
VhdStream: diskStream,
AlreadyProcessedBytes: diskStream.GetSize() - common.TotalRangeLength(uploadableRanges),
UploadableRanges: uploadableRanges,
PageblobClient: pageblobClient,
Parallelism: parallelism,
Resume: resume,
}

err = upload.Upload(ctx, uploadContext)
if err != nil {
return err
}

if err := setBlobMD5Hash(ctx, blobClient, localMetadata); err != nil {
return err
}
logger("Upload completed")
return nil
}

// ensureVHDSanity ensure is VHD is valid for Azure.
func ensureVHDSanity(vhd string) error {
if err := validator.ValidateVhd(vhd); err != nil {
return err
}

if err := validator.ValidateVhdSize(vhd); err != nil {
return err
}

return nil
}

// createBlob creates a page blob of specific size and sets custom
// metadata. The parameter client is the Azure pageblob client
// representing a blob in a container, size is the size of the new
// page blob in bytes and parameter vhdMetadata is the custom metadata
// to be associated with the page blob.
func createBlob(ctx context.Context, client *pageblob.Client, size int64, vhdMetadata *metadata.Metadata) error {
m, err := vhdMetadata.ToMap()
if err != nil {
return err
}
opts := pageblob.CreateOptions{
Metadata: m,
}
_, err = client.Create(ctx, size, &opts)
return err
}

// setBlobMD5Hash sets MD5 hash of the blob in its properties
func setBlobMD5Hash(ctx context.Context, client *blob.Client, vhdMetadata *metadata.Metadata) error {
if vhdMetadata.FileMetadata == nil || len(vhdMetadata.FileMetadata.MD5Hash) == 0 {
return nil
}
buf := make([]byte, base64.StdEncoding.EncodedLen(len(vhdMetadata.FileMetadata.MD5Hash)))
base64.StdEncoding.Encode(buf, vhdMetadata.FileMetadata.MD5Hash)
blobHeaders := blob.HTTPHeaders{
BlobContentMD5: buf,
}
_, err := client.SetHTTPHeaders(ctx, blobHeaders, nil)
return err
}

// getAlreadyUploadedBlobRanges returns the range slice containing
// ranges of a page blob those are already uploaded. The parameter
// client is the Azure pageblob client representing a blob in a
// container.
func getAlreadyUploadedBlobRanges(ctx context.Context, client *pageblob.Client) ([]*common.IndexRange, error) {
var (
marker *string
rangesToSkip []*common.IndexRange
)
for {
opts := pageblob.GetPageRangesOptions{
Marker: marker,
}
pager := client.NewGetPageRangesPager(&opts)
for pager.More() {
response, err := pager.NextPage(ctx)
if err != nil {
return nil, err
}
tmpRanges := make([]*common.IndexRange, len(response.PageRange))
for i, page := range response.PageRange {
tmpRanges[i] = common.NewIndexRange(*page.Start, *page.End)
}
rangesToSkip = append(rangesToSkip, tmpRanges...)
marker = response.NextMarker
}
if marker == nil || *marker == "" {
break
}
}
return rangesToSkip, nil
}
21 changes: 11 additions & 10 deletions upload/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,34 +46,34 @@ func (byteReadSeekCloser) Close() error {

var _ io.ReadSeekCloser = byteReadSeekCloser{}

// Upload uploads the disk ranges described by the parameter cxt, this parameter describes the disk stream to
// Upload uploads the disk ranges described by the parameter uctx, this parameter describes the disk stream to
// read from, the ranges of the stream to read, the destination blob and it's container, the client to communicate
// with Azure storage and the number of parallel go-routines to use for upload.
func Upload(cxt *DiskUploadContext) error {
func Upload(ctx context.Context, uctx *DiskUploadContext) error {
// Get the channel that contains stream of disk data to upload
dataWithRangeChan, streamReadErrChan := GetDataWithRanges(cxt.VhdStream, cxt.UploadableRanges)
dataWithRangeChan, streamReadErrChan := GetDataWithRanges(uctx.VhdStream, uctx.UploadableRanges)

// The channel to send upload request to load-balancer
requtestChan := make(chan *concurrent.Request, 0)

// Prepare and start the load-balancer that load request across 'cxt.Parallelism' workers
loadBalancer := concurrent.NewBalancer(cxt.Parallelism)
// Prepare and start the load-balancer that load request across 'uctx.Parallelism' workers
loadBalancer := concurrent.NewBalancer(uctx.Parallelism)
loadBalancer.Init()
workerErrorChan, allWorkersFinishedChan := loadBalancer.Run(requtestChan)

// Calculate the actual size of the data to upload
uploadSizeInBytes := int64(0)
for _, r := range cxt.UploadableRanges {
for _, r := range uctx.UploadableRanges {
uploadSizeInBytes += r.Length()
}
fmt.Printf("\nEffective upload size: %.2f MB (from %.2f MB originally)", float64(uploadSizeInBytes)/oneMB, float64(cxt.VhdStream.GetSize())/oneMB)
fmt.Printf("\nEffective upload size: %.2f MB (from %.2f MB originally)", float64(uploadSizeInBytes)/oneMB, float64(uctx.VhdStream.GetSize())/oneMB)

// Prepare and start the upload progress tracker
uploadProgress := progress.NewStatus(cxt.Parallelism, cxt.AlreadyProcessedBytes, uploadSizeInBytes, progress.NewComputestateDefaultSize())
uploadProgress := progress.NewStatus(uctx.Parallelism, uctx.AlreadyProcessedBytes, uploadSizeInBytes, progress.NewComputestateDefaultSize())
progressChan := uploadProgress.Run()

// read progress status from progress tracker and print it
go readAndPrintProgress(progressChan, cxt.Resume)
go readAndPrintProgress(progressChan, uctx.Resume)

// listen for errors reported by workers and print it
var allWorkSucceeded = true
Expand All @@ -98,7 +98,8 @@ L:
//
req := &concurrent.Request{
Work: func() error {
_, err := cxt.PageblobClient.UploadPages(context.TODO(),
_, err := uctx.PageblobClient.UploadPages(
ctx,
newByteReadSeekCloser(dataWithRange.Data),
blob.HTTPRange{
Offset: dataWithRange.Range.Start,
Expand Down
Loading

0 comments on commit 3893a51

Please sign in to comment.