Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

2103: feat(schema): do schema versioning and make backup non-blocking… #7873

Merged
merged 1 commit into from
Jun 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,6 @@ func (r *rebuilder) Run(ctx context.Context) error {
WithNumVersionsToKeep(math.MaxInt32).
WithLogger(&x.ToGlog{}).
WithCompression(options.None).
WithEncryptionKey(x.WorkerConfig.EncryptionKey).
WithLoggingLevel(badger.WARNING).
WithMetricsEnabled(false)

Expand Down Expand Up @@ -1242,14 +1241,14 @@ func DeleteData() error {
}

// DeletePredicate deletes all entries and indices for a given predicate.
func DeletePredicate(ctx context.Context, attr string) error {
func DeletePredicate(ctx context.Context, attr string, ts uint64) error {
glog.Infof("Dropping predicate: [%s]", attr)
prefix := x.PredicatePrefix(attr)
if err := pstore.DropPrefix(prefix); err != nil {
return err
}

return schema.State().Delete(attr)
return schema.State().Delete(attr, ts)
}

// DeleteNamespace bans the namespace and deletes its predicates/types from the schema.
Expand Down
2 changes: 2 additions & 0 deletions protos/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,8 @@ message Proposal {
RestoreRequest restore = 12;
CDCState cdc_state = 13;
DeleteNsRequest delete_ns = 14; // Used to delete namespace.
// Skipping 15 as it is used for uint64 key in master and might be needed later here.
uint64 start_ts = 16;
}

message CDCState {
Expand Down
710 changes: 374 additions & 336 deletions protos/pb/pb.pb.go

Large diffs are not rendered by default.

21 changes: 11 additions & 10 deletions schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"encoding/hex"
"fmt"
"math"
"sync"

"github.com/golang/glog"
Expand Down Expand Up @@ -96,17 +97,17 @@ func (s *state) DeleteAll() {
}

// Delete updates the schema in memory and disk
func (s *state) Delete(attr string) error {
func (s *state) Delete(attr string, ts uint64) error {
s.Lock()
defer s.Unlock()

glog.Infof("Deleting schema for predicate: [%s]", attr)
txn := pstore.NewTransactionAt(1, true)
txn := pstore.NewTransactionAt(ts, true)
if err := txn.Delete(x.SchemaKey(attr)); err != nil {
return err
}
// Delete is called rarely so sync write should be fine.
if err := txn.CommitAt(1, nil); err != nil {
if err := txn.CommitAt(ts, nil); err != nil {
return err
}

Expand All @@ -116,7 +117,7 @@ func (s *state) Delete(attr string) error {
}

// DeleteType updates the schema in memory and disk
func (s *state) DeleteType(typeName string) error {
func (s *state) DeleteType(typeName string, ts uint64) error {
if s == nil {
return nil
}
Expand All @@ -125,12 +126,12 @@ func (s *state) DeleteType(typeName string) error {
defer s.Unlock()

glog.Infof("Deleting type definition for type: [%s]", typeName)
txn := pstore.NewTransactionAt(1, true)
txn := pstore.NewTransactionAt(ts, true)
if err := txn.Delete(x.TypeKey(typeName)); err != nil {
return err
}
// Delete is called rarely so sync write should be fine.
if err := txn.CommitAt(1, nil); err != nil {
if err := txn.CommitAt(ts, nil); err != nil {
return err
}

Expand Down Expand Up @@ -466,14 +467,14 @@ func Init(ps *badger.DB) {
reset()
}

// Load reads the schema for the given predicate from the DB.
// Load reads the latest schema for the given predicate from the DB.
func Load(predicate string) error {
if len(predicate) == 0 {
return errors.Errorf("Empty predicate")
}
delete(State().mutSchema, predicate)
key := x.SchemaKey(predicate)
txn := pstore.NewTransactionAt(1, false)
txn := pstore.NewTransactionAt(math.MaxUint64, false)
defer txn.Discard()
item, err := txn.Get(key)
if err == badger.ErrKeyNotFound || err == badger.ErrBannedKey {
Expand Down Expand Up @@ -507,7 +508,7 @@ func LoadFromDb() error {
// LoadSchemaFromDb iterates through the DB and loads all the stored schema updates.
func LoadSchemaFromDb() error {
prefix := x.SchemaPrefix()
txn := pstore.NewTransactionAt(1, false)
txn := pstore.NewTransactionAt(math.MaxUint64, false)
defer txn.Discard()
itr := txn.NewIterator(badger.DefaultIteratorOptions) // Need values, reversed=false.
defer itr.Close()
Expand Down Expand Up @@ -543,7 +544,7 @@ func LoadSchemaFromDb() error {
// LoadTypesFromDb iterates through the DB and loads all the stored type updates.
func LoadTypesFromDb() error {
prefix := x.TypePrefix()
txn := pstore.NewTransactionAt(1, false)
txn := pstore.NewTransactionAt(math.MaxUint64, false)
defer txn.Discard()
itr := txn.NewIterator(badger.DefaultIteratorOptions) // Need values, reversed=false.
defer itr.Close()
Expand Down
2 changes: 1 addition & 1 deletion systest/backup/encryption/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,13 @@ func TestBackupMinioE(t *testing.T) {
t.Log("Pausing to let zero move tablet...")
moveOk := false
for retry := 5; retry > 0; retry-- {
time.Sleep(3 * time.Second)
state, err := testutil.GetStateHttps(testutil.GetAlphaClientConfig(t))
require.NoError(t, err)
if _, ok := state.Groups["1"].Tablets[tabletName]; ok {
moveOk = true
break
}
time.Sleep(1 * time.Second)
}
require.True(t, moveOk)

Expand Down
2 changes: 1 addition & 1 deletion systest/backup/filesystem/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,13 @@ func TestBackupFilesystem(t *testing.T) {
t.Log("Pausing to let zero move tablet...")
moveOk := false
for retry := 5; retry > 0; retry-- {
time.Sleep(3 * time.Second)
state, err := testutil.GetStateHttps(testutil.GetAlphaClientConfig(t))
require.NoError(t, err)
if _, ok := state.Groups["1"].Tablets[x.NamespaceAttr(x.GalaxyNamespace, "movie")]; ok {
moveOk = true
break
}
time.Sleep(1 * time.Second)
}

require.True(t, moveOk)
Expand Down
2 changes: 1 addition & 1 deletion systest/backup/minio-large/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ func setupTablets(t *testing.T, dg *dgo.Dgraph) {
t.Log("Pausing to let zero move tablets...")
moveOk := false
for retry := 5; retry > 0; retry-- {
time.Sleep(3 * time.Second)
state, err := testutil.GetStateHttps(testutil.GetAlphaClientConfig(t))
require.NoError(t, err)
_, ok1 := state.Groups["1"].Tablets[x.GalaxyAttr("name1")]
Expand All @@ -120,6 +119,7 @@ func setupTablets(t *testing.T, dg *dgo.Dgraph) {
moveOk = true
break
}
time.Sleep(1 * time.Second)
}
require.True(t, moveOk)
}
Expand Down
2 changes: 1 addition & 1 deletion systest/backup/minio/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,13 @@ func TestBackupMinio(t *testing.T) {
t.Log("Pausing to let zero move tablet...")
moveOk := false
for retry := 5; retry > 0; retry-- {
time.Sleep(3 * time.Second)
state, err := testutil.GetStateHttps(testutil.GetAlphaClientConfig(t))
require.NoError(t, err)
if _, ok := state.Groups["1"].Tablets[x.NamespaceAttr(x.GalaxyNamespace, "movie")]; ok {
moveOk = true
break
}
time.Sleep(1 * time.Second)
}
require.True(t, moveOk)

Expand Down
4 changes: 2 additions & 2 deletions testutil/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"io/ioutil"
"math"
"net/http"
"strings"
"testing"
Expand Down Expand Up @@ -178,8 +179,7 @@ func readSchema(pdir string, dType dataType) ([]string, error) {
defer db.Close()
values := make([]string, 0)

// Predicates and types in the schema are written with timestamp 1.
txn := db.NewTransactionAt(1, false)
txn := db.NewTransactionAt(math.MaxUint64, false)
defer txn.Discard()
itr := txn.NewIterator(badger.DefaultIteratorOptions)
defer itr.Close()
Expand Down
2 changes: 1 addition & 1 deletion worker/backup_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func backupCurrentGroup(ctx context.Context, req *pb.BackupRequest) (*pb.BackupR
return nil, err
}

closer, err := g.Node.startTask(opBackup)
closer, err := g.Node.startTaskAtTs(opBackup, req.ReadTs)
if err != nil {
return nil, errors.Wrapf(err, "cannot start backup operation")
}
Expand Down
3 changes: 1 addition & 2 deletions worker/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,7 @@ func (pr *BackupProcessor) WriteBackup(ctx context.Context) (*pb.BackupResponse,
}
defer tl.alloc.Release()

// Schema and types are written at Ts=1.
txn := pr.DB.NewTransactionAt(1, false)
txn := pr.DB.NewTransactionAt(pr.Request.ReadTs, false)
defer txn.Discard()
// We don't need to iterate over all versions.
iopts := badger.DefaultIteratorOptions
Expand Down
Loading