Skip to content

Commit

Permalink
fix(backup): handle manifest version logic, update manifest version t…
Browse files Browse the repository at this point in the history
…o 2105 (#7825)

The backward compatibility of the backup's manifest was broken by #7810, although the tool was added (#7815) that enables smooth migration of manifest.
This PR makes backup backward compatible, by updating the manifest(in-memory) after reading.
  • Loading branch information
NamanJain8 authored May 17, 2021
1 parent 22db22f commit 83a0c53
Show file tree
Hide file tree
Showing 34 changed files with 172 additions and 47 deletions.
12 changes: 9 additions & 3 deletions ee/updatemanifest/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,12 @@ func run() error {
if err != nil {
return errors.Wrapf(err, "while creating uri handler")
}
masterManifest, err := worker.GetManifest(handler, uri)
masterManifest, err := worker.GetManifestNoUpgrade(handler, uri)
if err != nil {
return errors.Wrapf(err, "while getting manifest")
}

// Update the master manifest with the changes for drop operations and group predicates.
for _, manifest := range masterManifest.Manifests {
update := func(manifest *worker.Manifest) {
for gid, preds := range manifest.Groups {
parsedPreds := preds[:0]
for _, pred := range preds {
Expand All @@ -122,6 +121,13 @@ func run() error {
}
}

// Update the master manifest with the changes for drop operations and group predicates.
for _, manifest := range masterManifest.Manifests {
if manifest.Version == 2103 {
update(manifest)
}
}

// Rewrite the master manifest.
return errors.Wrap(worker.CreateManifest(handler, uri, masterManifest), "rewrite failed")
}
64 changes: 55 additions & 9 deletions systest/backup/filesystem/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ var (
restoreDir = "./data/restore"
testDirs = []string{restoreDir}
alphaBackupDir = "/data/backups"
oldBackupDir = "/data/to_restore"
oldBackupDir1 = "/data/to_restore/1"
oldBackupDir2 = "/data/to_restore/2"
oldBackupDir3 = "/data/to_restore/3"
alphaContainers = []string{
"alpha1",
"alpha2",
Expand Down Expand Up @@ -85,14 +87,53 @@ func sendRestoreRequest(t *testing.T, location string) {
return
}

// This test restores the old backups.
// The backup dir contains:
// - Full backup with pred "p1", "p2", "p3". (insert k1, k2, k3).
// - Incremental backup after drop data was called and "p2", "p3", "p4" inserted. --> (insert k4,k5)
// - Incremental backup after "p3" was dropped.
func TestRestoreOfOldBackup(t *testing.T) {
test := func(dir string) {
common.DirSetup(t)
common.CopyOldBackupDir(t)

conn, err := grpc.Dial(testutil.SockAddr,
grpc.WithTransportCredentials(credentials.NewTLS(testutil.GetAlphaClientConfig(t))))
require.NoError(t, err)
dg := dgo.NewDgraphClient(api.NewDgraphClient(conn))
require.NoError(t, err)

testutil.DropAll(t, dg)
time.Sleep(2 * time.Second)

sendRestoreRequest(t, dir)
testutil.WaitForRestore(t, dg)

queryAndCheck := func(pred string, cnt int) {
q := fmt.Sprintf(`{ me(func: has(%s)) { count(uid) } }`, pred)
r := fmt.Sprintf("{\"me\":[{\"count\":%d}]}", cnt)
resp, err := dg.NewTxn().Query(context.Background(), q)
require.NoError(t, err)
require.JSONEq(t, r, string(resp.Json))
}
queryAndCheck("p1", 0)
queryAndCheck("p2", 2)
queryAndCheck("p3", 0)
queryAndCheck("p4", 2)
}
t.Run("backup of 20.11", func(t *testing.T) { test(oldBackupDir2) })
t.Run("backup of 21.03", func(t *testing.T) { test(oldBackupDir3) })
}

// This test takes a backup and then restores an old backup in a cluster incrementally.
// Next, cleans up the cluster and tries restoring the backups above.
// Regression test for DGRAPH-2775
func TestBackupOfOldRestore(t *testing.T) {
common.DirSetup(t)
common.CopyOldBackupDir(t)

conn, err := grpc.Dial(testutil.SockAddr, grpc.WithTransportCredentials(credentials.NewTLS(testutil.GetAlphaClientConfig(t))))
conn, err := grpc.Dial(testutil.SockAddr,
grpc.WithTransportCredentials(credentials.NewTLS(testutil.GetAlphaClientConfig(t))))
require.NoError(t, err)
dg := dgo.NewDgraphClient(api.NewDgraphClient(conn))
require.NoError(t, err)
Expand All @@ -102,10 +143,11 @@ func TestBackupOfOldRestore(t *testing.T) {

_ = runBackup(t, 3, 1)

sendRestoreRequest(t, oldBackupDir)
sendRestoreRequest(t, oldBackupDir1)
testutil.WaitForRestore(t, dg)

resp, err := dg.NewTxn().Query(context.Background(), `{ authors(func: has(Author.name)) { count(uid) } }`)
q := `{ authors(func: has(Author.name)) { count(uid) } }`
resp, err := dg.NewTxn().Query(context.Background(), q)
require.NoError(t, err)
require.JSONEq(t, "{\"authors\":[{\"count\":1}]}", string(resp.Json))

Expand All @@ -117,13 +159,14 @@ func TestBackupOfOldRestore(t *testing.T) {
sendRestoreRequest(t, alphaBackupDir)
testutil.WaitForRestore(t, dg)

resp, err = dg.NewTxn().Query(context.Background(), `{ authors(func: has(Author.name)) { count(uid) } }`)
resp, err = dg.NewTxn().Query(context.Background(), q)
require.NoError(t, err)
require.JSONEq(t, "{\"authors\":[{\"count\":1}]}", string(resp.Json))
}

func TestBackupFilesystem(t *testing.T) {
conn, err := grpc.Dial(testutil.SockAddr, grpc.WithTransportCredentials(credentials.NewTLS(testutil.GetAlphaClientConfig(t))))
conn, err := grpc.Dial(testutil.SockAddr,
grpc.WithTransportCredentials(credentials.NewTLS(testutil.GetAlphaClientConfig(t))))
require.NoError(t, err)
dg := dgo.NewDgraphClient(api.NewDgraphClient(conn))

Expand Down Expand Up @@ -395,15 +438,17 @@ func runBackupInternal(t *testing.T, forceFull bool, numExpectedFiles,

var data interface{}
require.NoError(t, json.NewDecoder(resp.Body).Decode(&data))
require.Equal(t, "Success", testutil.JsonGet(data, "data", "backup", "response", "code").(string))
require.Equal(t, "Success",
testutil.JsonGet(data, "data", "backup", "response", "code").(string))
taskId := testutil.JsonGet(data, "data", "backup", "taskId").(string)
testutil.WaitForTask(t, taskId, true)

// Verify that the right amount of files and directories were created.
common.CopyToLocalFs(t)

files := x.WalkPathFunc(copyBackupDir, func(path string, isdir bool) bool {
return !isdir && strings.HasSuffix(path, ".backup") && strings.HasPrefix(path, "data/backups_copy/dgraph.")
return !isdir && strings.HasSuffix(path, ".backup") &&
strings.HasPrefix(path, "data/backups_copy/dgraph.")
})
require.Equal(t, numExpectedFiles, len(files))

Expand All @@ -428,7 +473,8 @@ func runRestore(t *testing.T, backupLocation, lastDir string, commitTs uint64) m
require.NoError(t, os.RemoveAll(restoreDir))

t.Logf("--- Restoring from: %q", backupLocation)
result := worker.RunOfflineRestore("./data/restore", backupLocation, lastDir, "", options.Snappy, 0)
result := worker.RunOfflineRestore(
"./data/restore", backupLocation, lastDir, "", options.Snappy, 0)
require.NoError(t, result.Err)

for i, pdir := range []string{"p1", "p2", "p3"} {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"type":"full","since":38,"groups":{"1":["dgraph.graphql.schema_history","dgraph.drop.op","dgraph.graphql.p_sha256hash","dgraph.graphql.schema","dgraph.cors","dgraph.type","dgraph.graphql.xid","dgraph.graphql.schema_created_at","dgraph.graphql.p_query"],"2":["p2","p1","p3"],"3":[]},"backup_id":"jolly_yalow0","backup_num":1,"encrypted":false,"drop_operations":null}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"type":"incremental","since":54,"groups":{"1":["dgraph.graphql.p_sha256hash","dgraph.graphql.schema","dgraph.graphql.schema_history","dgraph.drop.op","dgraph.type","dgraph.cors","dgraph.graphql.schema_created_at","dgraph.graphql.p_query","dgraph.graphql.xid"],"2":["p4","p3","p2","p1"],"3":[]},"backup_id":"jolly_yalow0","backup_num":2,"encrypted":false,"drop_operations":[{"drop_op":1}]}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"type":"incremental","since":58,"groups":{"1":["dgraph.graphql.p_sha256hash","dgraph.graphql.p_query","dgraph.drop.op","dgraph.graphql.schema_history","dgraph.graphql.xid","dgraph.graphql.schema","dgraph.cors","dgraph.graphql.schema_created_at","dgraph.type"],"2":["p1","p4","p3","p2"],"3":[]},"backup_id":"jolly_yalow0","backup_num":3,"encrypted":false,"drop_operations":[{"drop_op":2,"drop_value":"p3"}]}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
1 change: 1 addition & 0 deletions systest/backup/filesystem/data/to_restore/3/manifest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"Manifests":[{"type":"full","since":0,"read_ts":9,"groups":{"1":["\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p1","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.p_query","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.xid","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.schema","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.drop.op","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.type","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p3","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p2"],"2":[],"3":[]},"backup_id":"quirky_kapitsa4","backup_num":1,"version":2103,"path":"dgraph.20210517.095641.969","encrypted":false,"drop_operations":null,"compression":"snappy"},{"type":"incremental","since":0,"read_ts":21,"groups":{"1":["\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.drop.op","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p1","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.schema","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.p_query","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p3","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.xid","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p4","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p2","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.type"],"2":[],"3":[]},"backup_id":"quirky_kapitsa4","backup_num":2,"version":2103,"path":"dgraph.20210517.095716.130","encrypted":false,"drop_operations":[{"drop_op":1}],"compression":"snappy"},{"type":"incremental","since":0,"read_ts":26,"groups":{"1":["\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p4","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p2","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.type","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p1","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.schema","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.p_query","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p3","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.drop.op","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.xid"],"2":[],"3":[]},"backup_id":"quirky_kapitsa4","backup_num":3,"version":2103,"path":"dgraph.20210517.095726.320","encrypted":false,"drop_operations":[{"drop_op":2,"drop_value":"\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p3"}],"compression":"snappy"}]}
4 changes: 0 additions & 4 deletions worker/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,6 @@ func (m *Manifest) getPredsInGroup(gid uint32) predicateSet {

predSet := make(predicateSet)
for _, pred := range preds {
if m.Version == 0 {
// For older versions, preds set will contain attribute without namespace.
pred = x.NamespaceAttr(x.GalaxyNamespace, pred)
}
predSet[pred] = struct{}{}
}
return predSet
Expand Down
4 changes: 2 additions & 2 deletions worker/backup_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func ProcessBackupRequest(ctx context.Context, req *pb.BackupRequest) error {
m := Manifest{
ReadTs: req.ReadTs,
Groups: predMap,
Version: x.DgraphVersion,
Version: x.ManifestVersion,
DropOperations: dropOperations,
Path: dir,
Compression: "snappy",
Expand Down Expand Up @@ -548,7 +548,7 @@ func (pr *BackupProcessor) CompleteBackup(ctx context.Context, m *Manifest) erro
return err
}

manifest, err := GetManifest(handler, uri)
manifest, err := GetManifestNoUpgrade(handler, uri)
if err != nil {
return err
}
Expand Down
103 changes: 93 additions & 10 deletions worker/backup_manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
package worker

import (
"encoding/binary"
"encoding/json"
"fmt"
"net/url"
"path/filepath"
"sort"
Expand Down Expand Up @@ -55,14 +57,9 @@ func verifyManifests(manifests []*Manifest) error {

func getManifestsToRestore(
h UriHandler, uri *url.URL, req *pb.RestoreRequest) ([]*Manifest, error) {

if !h.DirExists("") {
return nil, errors.Errorf("getManifestsToRestore: The uri path: %q doesn't exists",
uri.Path)
}
manifest, err := getConsolidatedManifest(h, uri)
manifest, err := GetManifest(h, uri)
if err != nil {
return manifest.Manifests, errors.Wrap(err, "Failed to get consolidated manifest: ")
return manifest.Manifests, err
}
return getFilteredManifests(h, manifest.Manifests, req)
}
Expand Down Expand Up @@ -163,6 +160,74 @@ func getConsolidatedManifest(h UriHandler, uri *url.URL) (*MasterManifest, error
return &MasterManifest{Manifests: mlist}, nil
}

// Invalid bytes are replaced with the Unicode replacement rune.
// See https://golang.org/pkg/encoding/json/#Marshal
const replacementRune = rune('\ufffd')

func parseNsAttr(attr string) (uint64, string, error) {
if strings.ContainsRune(attr, replacementRune) {
return 0, "", errors.Errorf("replacement rune found while parsing attr: %s (%+v)",
attr, []byte(attr))
}
return binary.BigEndian.Uint64([]byte(attr[:8])), attr[8:], nil
}

// upgradeManifest updates the in-memory manifest from various versions to the latest version.
// If the manifest version is 0 (dgraph version < v21.03), attach namespace to the predicates and
// the drop data/attr operation.
// If the manifest version is 2103, convert the format of predicate from <ns bytes>|<attr> to
// <ns string>-<attr>. This is because of a bug for namespace greater than 127.
// See https://github.com/dgraph-io/dgraph/pull/7810
// NOTE: Do not use the upgraded manifest to overwrite the non-upgraded manifest.
func upgradeManifest(m *Manifest) error {
switch m.Version {
case 0:
for gid, preds := range m.Groups {
parsedPreds := preds[:0]
for _, pred := range preds {
parsedPreds = append(parsedPreds, x.GalaxyAttr(pred))
}
m.Groups[gid] = parsedPreds
}
for _, op := range m.DropOperations {
switch op.DropOp {
case pb.DropOperation_DATA:
op.DropValue = fmt.Sprintf("%#x", x.GalaxyNamespace)
case pb.DropOperation_ATTR:
op.DropValue = x.GalaxyAttr(op.DropValue)
default:
// do nothing for drop all and drop namespace.
}
}
case 2103:
for gid, preds := range m.Groups {
parsedPreds := preds[:0]
for _, pred := range preds {
ns, attr, err := parseNsAttr(pred)
if err != nil {
return errors.Errorf("while parsing predicate got: %q", err)
}
parsedPreds = append(parsedPreds, x.NamespaceAttr(ns, attr))
}
m.Groups[gid] = parsedPreds
}
for _, op := range m.DropOperations {
// We have a cluster wide drop data in v21.03.
if op.DropOp == pb.DropOperation_ATTR {
ns, attr, err := parseNsAttr(op.DropValue)
if err != nil {
return errors.Errorf("while parsing the drop operation %+v got: %q",
op, err)
}
op.DropValue = x.NamespaceAttr(ns, attr)
}
}
case 2105:
// pass
}
return nil
}

func readManifest(h UriHandler, path string) (*Manifest, error) {
var m Manifest
b, err := h.Read(path)
Expand Down Expand Up @@ -198,10 +263,11 @@ func readMasterManifest(h UriHandler, path string) (*MasterManifest, error) {
return &m, nil
}

func GetManifest(h UriHandler, uri *url.URL) (*MasterManifest, error) {
// GetManifestNoUpgrade returns the master manifest using the given handler and uri.
func GetManifestNoUpgrade(h UriHandler, uri *url.URL) (*MasterManifest, error) {
if !h.DirExists("") {
return &MasterManifest{}, errors.Errorf("getManifest: The uri path: %q doesn't exists",
uri.Path)
return &MasterManifest{},
errors.Errorf("getManifestWithoutUpgrade: The uri path: %q doesn't exists", uri.Path)
}
manifest, err := getConsolidatedManifest(h, uri)
if err != nil {
Expand All @@ -210,6 +276,23 @@ func GetManifest(h UriHandler, uri *url.URL) (*MasterManifest, error) {
return manifest, nil
}

// GetManifest returns the master manifest using the given handler and uri. Additionally, it also
// upgrades the manifest for the in-memory processing.
// Note: This function must not be used when using the returned manifest for the purpose of
// overwriting the old manifest.
func GetManifest(h UriHandler, uri *url.URL) (*MasterManifest, error) {
manifest, err := GetManifestNoUpgrade(h, uri)
if err != nil {
return manifest, err
}
for _, m := range manifest.Manifests {
if err := upgradeManifest(m); err != nil {
return manifest, errors.Wrapf(err, "getManifest: failed to upgrade")
}
}
return manifest, nil
}

func CreateManifest(h UriHandler, uri *url.URL, manifest *MasterManifest) error {
var err error
if !h.DirExists("./") {
Expand Down
9 changes: 0 additions & 9 deletions worker/online_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,15 +288,6 @@ func handleRestoreProposal(ctx context.Context, req *pb.RestoreRequest) error {
lastManifest := manifests[0]
preds, ok := lastManifest.Groups[req.GroupId]

// Version is 0 if the backup was taken on an old version (v20.11).
if lastManifest.Version == 0 {
tmp := make([]string, 0, len(preds))
for _, pred := range preds {
tmp = append(tmp, x.GalaxyAttr(pred))
}
preds = tmp
}

if !ok {
return errors.Errorf("backup manifest does not contain information for group ID %d",
req.GroupId)
Expand Down
17 changes: 8 additions & 9 deletions worker/restore_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,15 +676,14 @@ func RunMapper(req *pb.RestoreRequest, mapDir string) (*mapResult, error) {
case pb.DropOperation_ALL:
dropAll = true
case pb.DropOperation_DATA:
var ns uint64
if manifest.Version == 0 {
ns = x.GalaxyNamespace
} else {
var err error
ns, err = strconv.ParseUint(op.DropValue, 0, 64)
if err != nil {
return nil, errors.Wrap(err, "Map phase failed to parse namespace")
}
if op.DropValue == "" {
// In 2103, we do not support namespace level drop data.
dropAll = true
continue
}
ns, err := strconv.ParseUint(op.DropValue, 0, 64)
if err != nil {
return nil, errors.Wrap(err, "Map phase failed to parse namespace")
}
dropNs[ns] = struct{}{}
case pb.DropOperation_ATTR:
Expand Down
2 changes: 1 addition & 1 deletion x/x.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ const (
"X-CSRF-Token, X-Auth-Token, X-Requested-With"
DgraphCostHeader = "Dgraph-TouchedUids"

DgraphVersion = 2103
ManifestVersion = 2105
)

var (
Expand Down

0 comments on commit 83a0c53

Please sign in to comment.