Skip to content

Commit

Permalink
clear up docs using channel index for tests
Browse files Browse the repository at this point in the history
  • Loading branch information
nirav24 committed Jan 19, 2023
1 parent 625edbe commit 1aac440
Showing 1 changed file with 66 additions and 0 deletions.
66 changes: 66 additions & 0 deletions db/util_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,67 @@ WHERE META(ks).xattrs._sync.sequence >= 0
return purgedDocCount, nil
}

// emptyBucketUsingChannelIndex ensures all docs are cleared (using Channel index) for the given bucket. Workes similarly to emptyAllDocsIndex but is used when AllDocs index isn't present
func emptyBucketUsingChannelIndex(ctx context.Context, dataStore sgbucket.DataStore, tbp *base.TestBucketPool) (numCompacted int, err error) {
purgedDocCount := 0
purgeBody := Body{"_purged": true}

n1qlStore, ok := base.AsN1QLStore(dataStore)
if !ok {
return 0, fmt.Errorf("bucket was not a n1ql store")
}

// A stripped down version of db.Compact() that works on AllDocs instead of tombstones
statement := `SELECT [op.name, META(ks).xattrs._sync.sequence][1] AS seq,
META(ks).xattrs._sync.rev AS rev,
META(ks).xattrs._sync.flags AS flags,
META(ks).id AS id
FROM ` + base.KeyspaceQueryToken + ` AS ks USE INDEX (sg_channels_x1)
UNNEST OBJECT_PAIRS(META(ks).xattrs._sync.channels) AS op
WHERE ([op.name, LEAST(META(ks).xattrs._sync.sequence, op.val.seq),
IFMISSING(op.val.rev,NULL),IFMISSING(op.val.del,NULL)] BETWEEN ["", 0] AND ["*",9223372036854775807])`

results, err := n1qlStore.Query(statement, nil, base.RequestPlus, true)
base.InfofCtx(ctx, base.KeyAll, "emptyBucketUsingChannelIndex failed to remove allDocsIndex %+v", err)
if err != nil {
return 0, err
}

var tombstonesRow QueryIdRow
for results.Next(&tombstonesRow) {
// First, attempt to purge.
var purgeErr error
if base.TestUseXattrs() {
purgeErr = dataStore.DeleteWithXattr(tombstonesRow.Id, base.SyncXattrName)
} else {
purgeErr = dataStore.Delete(tombstonesRow.Id)
}

if base.IsKeyNotFoundError(dataStore, purgeErr) {
// If key no longer exists, need to add and remove to trigger removal from view
_, addErr := dataStore.Add(tombstonesRow.Id, 0, purgeBody)
if addErr != nil {
tbp.Logf(ctx, "Error compacting key %s (add) - will not be compacted. %v", tombstonesRow.Id, addErr)
continue
}

if delErr := dataStore.Delete(tombstonesRow.Id); delErr != nil {
tbp.Logf(ctx, "Error compacting key %s (delete) - will not be compacted. %v", tombstonesRow.Id, delErr)
}
purgedDocCount++
} else if purgeErr != nil {
tbp.Logf(ctx, "Error compacting key %s (purge) - will not be compacted. %v", tombstonesRow.Id, purgeErr)
}
}
err = results.Close()
if err != nil {
return 0, err
}

base.InfofCtx(ctx, base.KeyAll, "emptyBucketUsingChannelIndex Finished compaction ... Total docs purged: %d", purgedDocCount)
return purgedDocCount, nil
}

// viewsAndGSIBucketReadier empties the bucket, initializes Views, and waits until GSI indexes are empty. It is run asynchronously as soon as a test is finished with a bucket.
var viewsAndGSIBucketReadier base.TBPBucketReadierFunc = func(ctx context.Context, b base.Bucket, tbp *base.TestBucketPool) error {
if base.TestsDisableGSI() {
Expand Down Expand Up @@ -267,7 +328,12 @@ var viewsAndGSIBucketReadier base.TBPBucketReadierFunc = func(ctx context.Contex
}
if _, err := emptyAllDocsIndex(ctx, dataStore, tbp); err != nil {
base.InfofCtx(ctx, base.KeyAll, "emptyAllDocsIndex error %+v", err)
return err
}

//
if _, err := emptyBucketUsingChannelIndex(ctx, dataStore, tbp); err != nil {
base.InfofCtx(ctx, base.KeyAll, "emptyBucketUsingChannelIndex error %+v", err)
return err
}

Expand Down

0 comments on commit 1aac440

Please sign in to comment.