diff --git a/ee/backup/backup.go b/ee/backup/backup.go index 947a88c75ec..8f93799744e 100644 --- a/ee/backup/backup.go +++ b/ee/backup/backup.go @@ -123,6 +123,13 @@ func (pr *Processor) WriteBackup(ctx context.Context) (*pb.Status, error) { if err != nil { return false } + + // Backup type keys in every group. + if parsedKey.IsType() { + return true + } + + // Only backup schema and data keys for the requested predicates. _, ok := predMap[parsedKey.Attr] return ok } diff --git a/ee/backup/restore.go b/ee/backup/restore.go index e674cf2e613..53534e83bc8 100644 --- a/ee/backup/restore.go +++ b/ee/backup/restore.go @@ -70,7 +70,8 @@ func RunRestore(pdir, location, backupId string) (uint64, error) { } // loadFromBackup reads the backup, converts the keys and values to the required format, -// and loads them to the given badger DB. +// and loads them to the given badger DB. The set of predicates is used to avoid restoring +// values from predicates no longer assigned to this group. func loadFromBackup(db *badger.DB, r io.Reader, preds predicateSet) error { br := bufio.NewReaderSize(r, 16<<10) unmarshalBuf := make([]byte, 1<<10) diff --git a/ee/backup/tests/filesystem/backup_test.go b/ee/backup/tests/filesystem/backup_test.go index 70f3fefecd8..af0ca846a28 100644 --- a/ee/backup/tests/filesystem/backup_test.go +++ b/ee/backup/tests/filesystem/backup_test.go @@ -57,10 +57,16 @@ func TestBackupFilesystem(t *testing.T) { require.NoError(t, err) dg := dgo.NewDgraphClient(api.NewDgraphClient(conn)) - // Add initial data. ctx := context.Background() require.NoError(t, dg.Alter(ctx, &api.Operation{DropAll: true})) - require.NoError(t, dg.Alter(ctx, &api.Operation{Schema: `movie: string .`})) + + // Add schema and types. + require.NoError(t, dg.Alter(ctx, &api.Operation{Schema: `movie: string . + type Node { + movie + }`})) + + // Add initial data. original, err := dg.NewTxn().Mutate(ctx, &api.Mutation{ CommitNow: true, SetNquads: []byte(` @@ -256,10 +262,19 @@ func runRestore(t *testing.T, backupLocation, lastDir string, commitTs uint64) m require.Equal(t, uint32(i+1), groupId) } - restored, err := testutil.GetPValues("./data/restore/p1", "movie", commitTs) + pdir := "./data/restore/p1" + restored, err := testutil.GetPredicateValues(pdir, "movie", commitTs) require.NoError(t, err) t.Logf("--- Restored values: %+v\n", restored) + restoredPreds, err := testutil.GetPredicateNames(pdir, commitTs) + require.NoError(t, err) + require.ElementsMatch(t, []string{"dgraph.type", "movie"}, restoredPreds) + + restoredTypes, err := testutil.GetTypeNames(pdir, commitTs) + require.NoError(t, err) + require.ElementsMatch(t, []string{"Node"}, restoredTypes) + return restored } diff --git a/ee/backup/tests/minio-large/backup_test.go b/ee/backup/tests/minio-large/backup_test.go index 2063c2a6465..6492fb3a936 100644 --- a/ee/backup/tests/minio-large/backup_test.go +++ b/ee/backup/tests/minio-large/backup_test.go @@ -162,11 +162,11 @@ func runRestore(t *testing.T, backupLocation, lastDir string, commitTs uint64) m _, err := backup.RunRestore("./data/restore", backupLocation, lastDir) require.NoError(t, err) - restored1, err := testutil.GetPValues("./data/restore/p1", "name1", commitTs) + restored1, err := testutil.GetPredicateValues("./data/restore/p1", "name1", commitTs) require.NoError(t, err) - restored2, err := testutil.GetPValues("./data/restore/p2", "name2", commitTs) + restored2, err := testutil.GetPredicateValues("./data/restore/p2", "name2", commitTs) require.NoError(t, err) - restored3, err := testutil.GetPValues("./data/restore/p3", "name3", commitTs) + restored3, err := testutil.GetPredicateValues("./data/restore/p3", "name3", commitTs) require.NoError(t, err) restored := make(map[string]string) diff --git a/ee/backup/tests/minio/backup_test.go b/ee/backup/tests/minio/backup_test.go index 351e3f963f1..0dd2ab7ec5d 100644 --- a/ee/backup/tests/minio/backup_test.go +++ b/ee/backup/tests/minio/backup_test.go @@ -54,15 +54,21 @@ func TestBackupMinio(t *testing.T) { conn, err := grpc.Dial(testutil.SockAddr, grpc.WithInsecure()) require.NoError(t, err) dg := dgo.NewDgraphClient(api.NewDgraphClient(conn)) - ctx := context.Background() - require.NoError(t, dg.Alter(ctx, &api.Operation{DropAll: true})) mc, err = testutil.NewMinioClient() require.NoError(t, err) require.NoError(t, mc.MakeBucket(bucketName, "")) + ctx := context.Background() + require.NoError(t, dg.Alter(ctx, &api.Operation{DropAll: true})) + + // Add schema and types. + require.NoError(t, dg.Alter(ctx, &api.Operation{Schema: `movie: string . + type Node { + movie + }`})) + // Add initial data. - require.NoError(t, dg.Alter(ctx, &api.Operation{Schema: `movie: string .`})) original, err := dg.NewTxn().Mutate(ctx, &api.Mutation{ CommitNow: true, SetNquads: []byte(` @@ -264,8 +270,18 @@ func runRestore(t *testing.T, lastDir string, commitTs uint64) map[string]string require.NoError(t, err) require.Equal(t, uint32(i+1), groupId) } + pdir := "./data/restore/p1" + restored, err := testutil.GetPredicateValues(pdir, "movie", commitTs) + require.NoError(t, err) + + restoredPreds, err := testutil.GetPredicateNames(pdir, commitTs) + require.NoError(t, err) + require.ElementsMatch(t, []string{"dgraph.type", "movie"}, restoredPreds) + + restoredTypes, err := testutil.GetTypeNames(pdir, commitTs) + require.NoError(t, err) + require.ElementsMatch(t, []string{"Node"}, restoredTypes) - restored, err := testutil.GetPValues("./data/restore/p1", "movie", commitTs) require.NoError(t, err) t.Logf("--- Restored values: %+v\n", restored) diff --git a/testutil/backup.go b/testutil/backup.go index fd65580d637..a54a51333a9 100644 --- a/testutil/backup.go +++ b/testutil/backup.go @@ -17,25 +17,26 @@ package testutil import ( - "context" "fmt" - "math" "github.com/dgraph-io/badger/v2" "github.com/dgraph-io/badger/v2/options" - bpb "github.com/dgraph-io/badger/v2/pb" "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/types" "github.com/dgraph-io/dgraph/x" ) -// GetPValues reads the specified p directory and returns the values for the given -// attribute in a map. -func GetPValues(pdir, attr string, readTs uint64) (map[string]string, error) { +func openDgraph(pdir string) (*badger.DB, error) { opt := badger.DefaultOptions(pdir).WithTableLoadingMode(options.MemoryMap). WithReadOnly(true) - db, err := badger.OpenManaged(opt) + return badger.OpenManaged(opt) +} + +// GetPredicateValues reads the specified p directory and returns the values for the given +// attribute in a map. +func GetPredicateValues(pdir, attr string, readTs uint64) (map[string]string, error) { + db, err := openDgraph(pdir) if err != nil { return nil, err } @@ -43,52 +44,95 @@ func GetPValues(pdir, attr string, readTs uint64) (map[string]string, error) { values := make(map[string]string) - stream := db.NewStreamAt(math.MaxUint64) - stream.ChooseKey = func(item *badger.Item) bool { + txn := db.NewTransactionAt(readTs, false) + defer txn.Discard() + itr := txn.NewIterator(badger.DefaultIteratorOptions) + defer itr.Close() + + for itr.Rewind(); itr.Valid(); itr.Next() { + item := itr.Item() pk, err := x.Parse(item.Key()) x.Check(err) switch { case pk.Attr != attr: - return false - case pk.IsSchema(): - return false + continue + case !pk.IsData(): + continue } - return pk.IsData() - } - stream.KeyToList = func(key []byte, it *badger.Iterator) (*bpb.KVList, error) { - pk, err := x.Parse(key) - x.Check(err) - pl, err := posting.ReadPostingList(key, it) + + pl, err := posting.ReadPostingList(item.Key(), itr) if err != nil { return nil, err } - var list bpb.KVList + err = pl.Iterate(readTs, 0, func(p *pb.Posting) error { vID := types.TypeID(p.ValType) src := types.ValueForType(vID) src.Value = p.Value str, err := types.Convert(src, types.StringID) if err != nil { - fmt.Println(err) return err } value := str.Value.(string) - list.Kv = append(list.Kv, &bpb.KV{ - Key: []byte(fmt.Sprintf("%#x", pk.Uid)), - Value: []byte(value), - }) + values[fmt.Sprintf("%#x", pk.Uid)] = value + return nil }) - return &list, err - } - stream.Send = func(list *bpb.KVList) error { - for _, kv := range list.Kv { - values[string(kv.Key)] = string(kv.Value) + + if err != nil { + return nil, err } - return nil } - if err := stream.Orchestrate(context.Background()); err != nil { + + return values, err +} + +type dataType int + +const ( + schemaPredicate dataType = iota + schemaType +) + +func readSchema(pdir string, dType dataType) ([]string, error) { + db, err := openDgraph(pdir) + if err != nil { return nil, err } - return values, err + defer db.Close() + values := make([]string, 0) + + // Predicates and types in the schema are written with timestamp 1. + txn := db.NewTransactionAt(1, false) + defer txn.Discard() + itr := txn.NewIterator(badger.DefaultIteratorOptions) + defer itr.Close() + + for itr.Rewind(); itr.Valid(); itr.Next() { + item := itr.Item() + pk, err := x.Parse(item.Key()) + x.Check(err) + + switch { + case item.UserMeta() != posting.BitSchemaPosting: + continue + case pk.IsSchema() && dType != schemaPredicate: + continue + case pk.IsType() && dType != schemaType: + continue + } + + values = append(values, pk.Attr) + } + return values, nil +} + +// GetPredicateNames returns the list of all the predicates stored in the restored pdir. +func GetPredicateNames(pdir string, readTs uint64) ([]string, error) { + return readSchema(pdir, schemaPredicate) +} + +// GetTypeNames returns the list of all the types stored in the restored pdir. +func GetTypeNames(pdir string, readTs uint64) ([]string, error) { + return readSchema(pdir, schemaType) }