Skip to content

Commit

Permalink
fix: The implementation handles null versionId objects for versioning…
Browse files Browse the repository at this point in the history
… enabled buckets within the versioning directory
  • Loading branch information
jonaustin09 committed Oct 11, 2024
1 parent de4c3c8 commit f03d600
Show file tree
Hide file tree
Showing 5 changed files with 527 additions and 110 deletions.
204 changes: 187 additions & 17 deletions backend/posix/posix.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,20 +532,30 @@ func (p *Posix) GetBucketVersioning(_ context.Context, bucket string) (s3respons
}

// Returns the specified bucket versioning status
func (p *Posix) isBucketVersioningEnabled(ctx context.Context, bucket string) (bool, error) {
func (p *Posix) getBucketVersioningStatus(ctx context.Context, bucket string) (types.BucketVersioningStatus, error) {
res, err := p.GetBucketVersioning(ctx, bucket)
if errors.Is(err, s3err.GetAPIError(s3err.ErrVersioningNotConfigured)) {
return false, nil
return "", nil
}
if err != nil && !errors.Is(err, s3err.GetAPIError(s3err.ErrVersioningNotConfigured)) {
return false, err
return "", err
}

if res.Status != nil {
return *res.Status == types.BucketVersioningStatusEnabled, nil
if res.Status == nil {
return "", nil
}

return false, nil
return *res.Status, nil
}

// Checks if the given bucket versioning status is 'Enabled'
func (p *Posix) isBucketVersioningEnabled(s types.BucketVersioningStatus) bool {
return s == types.BucketVersioningStatusEnabled
}

// Checks if the given bucket versioning status is 'Suspended'
func (p *Posix) isBucketVersioningSuspended(s types.BucketVersioningStatus) bool {
return s == types.BucketVersioningStatusSuspended
}

// Generates the object version path in the versioning directory
Expand All @@ -560,6 +570,18 @@ func genObjVersionKey(key string) string {
return filepath.Join(sum[:2], sum[2:4], sum[4:6], sum)
}

// Removes the null versionId object from versioning directory
func (p *Posix) deleteNullVersionIdObject(bucket, key string) error {
versionPath := filepath.Join(p.genObjVersionPath(bucket, key), nullVersionId)

err := os.Remove(versionPath)
if errors.Is(err, fs.ErrNotExist) {
return nil
}

return err
}

// Creates a new copy(version) of an object in the versioning directory
func (p *Posix) createObjVersion(bucket, key string, size int64, acc auth.Account) (versionPath string, err error) {
sf, err := os.Open(filepath.Join(bucket, key))
Expand All @@ -570,10 +592,13 @@ func (p *Posix) createObjVersion(bucket, key string, size int64, acc auth.Accoun

var versionId string
data, err := p.meta.RetrieveAttribute(sf, bucket, key, versionIdKey)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return versionPath, fmt.Errorf("get object versionId: %w", err)
}
if err == nil {
versionId = string(data)
} else {
versionId = ulid.Make().String()
versionId = nullVersionId
}

attrs, err := p.meta.ListAttributes(bucket, key)
Expand Down Expand Up @@ -844,8 +869,81 @@ func (p *Posix) fileToObjVersions(bucket string) backend.GetVersionsFunc {
}, nil
}

// First find the null versionId object(if exists)
// before starting the object versions listing
var nullVersionIdObj *types.ObjectVersion
var nullObjDelMarker *types.DeleteMarkerEntry
nf, err := os.Stat(filepath.Join(versionPath, nullVersionId))
if err != nil && !errors.Is(err, fs.ErrNotExist) {
return nil, err
}
if err == nil {
isDel, err := p.isObjDeleteMarker(versionPath, nullVersionId)
if err != nil {
return nil, err
}

// Check to see if the null versionId object is delete marker or not
if isDel {
nullObjDelMarker = &types.DeleteMarkerEntry{
VersionId: backend.GetStringPtr("null"),
LastModified: backend.GetTimePtr(nf.ModTime()),
Key: &path,
IsLatest: getBoolPtr(false),
}
} else {
etagBytes, err := p.meta.RetrieveAttribute(nil, versionPath, nullVersionId, etagkey)
if errors.Is(err, fs.ErrNotExist) {
return nil, backend.ErrSkipObj
}
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return nil, fmt.Errorf("get etag: %w", err)
}
// note: meta.ErrNoSuchKey will return etagBytes = []byte{}
// so this will just set etag to "" if its not already set
etag := string(etagBytes)
size := nf.Size()
nullVersionIdObj = &types.ObjectVersion{
ETag: &etag,
Key: &path,
LastModified: backend.GetTimePtr(nf.ModTime()),
Size: &size,
VersionId: backend.GetStringPtr("null"),
IsLatest: getBoolPtr(false),
StorageClass: types.ObjectVersionStorageClassStandard,
}
}
}

isNullVersionIdObjFound := nullVersionIdObj != nil || nullObjDelMarker != nil

if len(dirEnts) == 1 && (isNullVersionIdObjFound) {
if nullObjDelMarker != nil {
delMarkers = append(delMarkers, *nullObjDelMarker)
}
if nullVersionIdObj != nil {
objects = append(objects, *nullVersionIdObj)
}

if availableObjCount == 1 {
return &backend.ObjVersionFuncResult{
ObjectVersions: objects,
DelMarkers: delMarkers,
Truncated: true,
NextVersionIdMarker: nullVersionId,
}, nil
}
}

isNullVersionIdObjAdded := false

for i := len(dirEnts) - 1; i >= 0; i-- {
dEntry := dirEnts[i]
// Skip the null versionId object to not
// break the object versions list
if dEntry.Name() == nullVersionId {
continue
}

f, err := dEntry.Info()
if errors.Is(err, fs.ErrNotExist) {
Expand All @@ -855,6 +953,29 @@ func (p *Posix) fileToObjVersions(bucket string) backend.GetVersionsFunc {
return nil, fmt.Errorf("get fileinfo: %w", err)
}

// If the null versionId object is found, first push it
// by checking its creation date, then continue the adding
if isNullVersionIdObjFound && !isNullVersionIdObjAdded {
if nf.ModTime().After(f.ModTime()) {
if nullVersionIdObj != nil {
objects = append(objects, *nullVersionIdObj)
}
if nullObjDelMarker != nil {
delMarkers = append(delMarkers, *nullObjDelMarker)
}

isNullVersionIdObjAdded = true

if availableObjCount--; availableObjCount == 0 {
return &backend.ObjVersionFuncResult{
ObjectVersions: objects,
DelMarkers: delMarkers,
Truncated: true,
NextVersionIdMarker: nullVersionId,
}, nil
}
}
}
versionId := f.Name()
size := f.Size()

Expand Down Expand Up @@ -912,6 +1033,26 @@ func (p *Posix) fileToObjVersions(bucket string) backend.GetVersionsFunc {
}
}

// If null versionId object is found but not yet pushed,
// push it after the listing, as it's the oldest object version
if isNullVersionIdObjFound && !isNullVersionIdObjAdded {
if nullVersionIdObj != nil {
objects = append(objects, *nullVersionIdObj)
}
if nullObjDelMarker != nil {
delMarkers = append(delMarkers, *nullObjDelMarker)
}

if availableObjCount--; availableObjCount == 0 {
return &backend.ObjVersionFuncResult{
ObjectVersions: objects,
DelMarkers: delMarkers,
Truncated: true,
NextVersionIdMarker: nullVersionId,
}, nil
}
}

return &backend.ObjVersionFuncResult{
ObjectVersions: objects,
DelMarkers: delMarkers,
Expand Down Expand Up @@ -1215,10 +1356,11 @@ func (p *Posix) CompleteMultipartUpload(ctx context.Context, input *s3.CompleteM
}
}

vEnabled, err := p.isBucketVersioningEnabled(ctx, bucket)
vStatus, err := p.getBucketVersioningStatus(ctx, bucket)
if err != nil {
return nil, err
}
vEnabled := p.isBucketVersioningEnabled(vStatus)

d, err := os.Stat(objname)

Expand Down Expand Up @@ -1830,10 +1972,11 @@ func (p *Posix) UploadPartCopy(ctx context.Context, upi *s3.UploadPartCopyInput)
return s3response.CopyObjectResult{}, fmt.Errorf("stat bucket: %w", err)
}

vEnabled, err := p.isBucketVersioningEnabled(ctx, srcBucket)
vStatus, err := p.getBucketVersioningStatus(ctx, srcBucket)
if err != nil {
return s3response.CopyObjectResult{}, err
}
vEnabled := p.isBucketVersioningEnabled(vStatus)

if srcVersionId != "" {
if !p.versioningEnabled() || !vEnabled {
Expand Down Expand Up @@ -2019,10 +2162,11 @@ func (p *Posix) PutObject(ctx context.Context, po *s3.PutObjectInput) (s3respons
}, nil
}

vEnabled, err := p.isBucketVersioningEnabled(ctx, *po.Bucket)
vStatus, err := p.getBucketVersioningStatus(ctx, *po.Bucket)
if err != nil {
return s3response.PutObjectOutput{}, err
}
vEnabled := p.isBucketVersioningEnabled(vStatus)

// object is file
d, err := os.Stat(name)
Expand All @@ -2031,10 +2175,20 @@ func (p *Posix) PutObject(ctx context.Context, po *s3.PutObjectInput) (s3respons
}

// if the versioninng is enabled first create the file object version
if p.versioningEnabled() && vEnabled && err == nil {
_, err := p.createObjVersion(*po.Bucket, *po.Key, d.Size(), acct)
if err != nil {
return s3response.PutObjectOutput{}, fmt.Errorf("create object version: %w", err)
if p.versioningEnabled() && vStatus != "" && err == nil {
var isVersionIdMissing bool
if p.isBucketVersioningSuspended(vStatus) {
vIdBytes, err := p.meta.RetrieveAttribute(nil, *po.Bucket, *po.Key, versionIdKey)
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) {
return s3response.PutObjectOutput{}, fmt.Errorf("get object versionId: %w", err)
}
isVersionIdMissing = len(vIdBytes) == 0
}
if !isVersionIdMissing {
_, err := p.createObjVersion(*po.Bucket, *po.Key, d.Size(), acct)
if err != nil {
return s3response.PutObjectOutput{}, fmt.Errorf("create object version: %w", err)
}
}
}
if errors.Is(err, syscall.ENAMETOOLONG) {
Expand Down Expand Up @@ -2084,6 +2238,17 @@ func (p *Posix) PutObject(ctx context.Context, po *s3.PutObjectInput) (s3respons
versionID = ulid.Make().String()
}

// Before finaliazing the object creation remove
// null versionId object from versioning directory
// if it exists and the versioning status is Suspended
if p.isBucketVersioningSuspended(vStatus) {
err = p.deleteNullVersionIdObject(*po.Bucket, *po.Key)
if err != nil {
return s3response.PutObjectOutput{}, err
}
versionID = nullVersionId
}

for k, v := range po.Metadata {
err := p.meta.StoreAttribute(f.File(), *po.Bucket, *po.Key,
fmt.Sprintf("%v.%v", metaHdr, k), []byte(v))
Expand Down Expand Up @@ -2115,7 +2280,7 @@ func (p *Posix) PutObject(ctx context.Context, po *s3.PutObjectInput) (s3respons
}
}

if versionID != "" {
if versionID != "" && versionID != nullVersionId {
err := p.meta.StoreAttribute(f.File(), *po.Bucket, *po.Key, versionIdKey, []byte(versionID))
if err != nil {
return s3response.PutObjectOutput{}, fmt.Errorf("set versionId attr: %w", err)
Expand Down Expand Up @@ -2199,10 +2364,11 @@ func (p *Posix) DeleteObject(ctx context.Context, input *s3.DeleteObjectInput) (

objpath := filepath.Join(bucket, object)

vEnabled, err := p.isBucketVersioningEnabled(ctx, bucket)
vStatus, err := p.getBucketVersioningStatus(ctx, bucket)
if err != nil {
return nil, err
}
vEnabled := p.isBucketVersioningEnabled(vStatus)

// Directory objects can't have versions
if !isDir && p.versioningEnabled() && vEnabled {
Expand Down Expand Up @@ -2254,6 +2420,9 @@ func (p *Posix) DeleteObject(ctx context.Context, input *s3.DeleteObjectInput) (
if err != nil && !errors.Is(err, meta.ErrNoSuchKey) && !errors.Is(err, fs.ErrNotExist) {
return nil, fmt.Errorf("get obj versionId: %w", err)
}
if errors.Is(err, meta.ErrNoSuchKey) {
vId = []byte(nullVersionId)
}

if string(vId) == *input.VersionId {
// if the specified VersionId is the same as in the latest version,
Expand Down Expand Up @@ -2922,10 +3091,11 @@ func (p *Posix) CopyObject(ctx context.Context, input *s3.CopyObjectInput) (*s3.
return nil, fmt.Errorf("stat bucket: %w", err)
}

vEnabled, err := p.isBucketVersioningEnabled(ctx, srcBucket)
vStatus, err := p.getBucketVersioningStatus(ctx, srcBucket)
if err != nil {
return nil, err
}
vEnabled := p.isBucketVersioningEnabled(vStatus)

if srcVersionId != "" {
if !p.versioningEnabled() || !vEnabled {
Expand Down
11 changes: 11 additions & 0 deletions cmd/versitygw/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,11 +337,22 @@ func extractIntTests() (commands []*cli.Command) {
if debug {
opts = append(opts, integration.WithDebug())
}
if versioningEnabled {
opts = append(opts, integration.WithVersioningEnabled())
}

s := integration.NewS3Conf(opts...)
err := testFunc(s)
return err
},
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "versioning-enabled",
Usage: "Test the bucket object versioning, if the versioning is enabled",
Destination: &versioningEnabled,
Aliases: []string{"vs"},
},
},
})
}
return
Expand Down
Loading

0 comments on commit f03d600

Please sign in to comment.