Skip to content

Commit

Permalink
BREAKING: fix json marshal unmarshal for namespace > 127 (#7810)
Browse files Browse the repository at this point in the history
We used to store predicate as <namespace>|<attribute> (pipe | signifies concatenation). We store this as a string. <namespace> is 8 bytes uint64, which when marshaled to JSON bytes mess up the predicate. This is because for the namespace greater than 127, the UTF-8 encoding might take up several bytes (also if the mapping does not exist, then it replaces it with some other rune). This affects three identified places in Dgraph:

Live loader
Backup and List Backup
Http clients and Ratel
Fix:
Fix is to have a UTF-8 string when dealing with JSON. A better idea is to use UTF-8 string even for internal operations. Only when we read/write to badger we convert it into the format of the byte.
New Format: <anmespace>-<attribute> (- is the hyphen literal)

fix(restore): update the schema and type from 2103 (#7838)

With #7810 change, we changed the format of the predicate. We missed updating the schema and predicate. This PR fixes it.

fix(state): fix hex to uint64 response of list of namespaces (#8091)

There is an issue in ExtractNamespaceFromPredicate. The issue is the parsing was done assuming ns in <ns>-<attr> to be decimal (actually it is hexadecimal). This leads to the following issues.

A predicate a-name, it was skipped.
A predicate 11-name was parsed as namespace 11, actually it is namespace 17 (0x11).

fix(backup): handle manifest version logic, update manifest version to 2105 (#7825)

The backward compatibility of the backup's manifest was broken by #7810, although the tool was added (#7815) that enables smooth migration of manifest.
This PR makes backup backward compatible, by updating the manifest(in-memory) after reading.

fix(updatemanifest): update the version of manifest after update (#7828)

We were not updating the manifest version after the updation. This PR fixes that.
  • Loading branch information
NamanJain8 authored and Harshil Goel committed Feb 3, 2023
1 parent 9739415 commit 014b6fe
Show file tree
Hide file tree
Showing 27 changed files with 312 additions and 207 deletions.
2 changes: 1 addition & 1 deletion dgraph/cmd/alpha/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ func TestTransactionBasic(t *testing.T) {
require.Equal(t, 2, len(mr.preds))
var parsedPreds []string
for _, pred := range mr.preds {
p := strings.Split(pred, "-")[1]
p := strings.SplitN(pred, "-", 2)[1]
parsedPreds = append(parsedPreds, x.ParseAttr(p))
}
sort.Strings(parsedPreds)
Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/alpha/upsert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type QueryResult struct {

func splitPreds(ps []string) []string {
for i, p := range ps {
ps[i] = x.ParseAttr(strings.Split(p, "-")[1])
ps[i] = x.ParseAttr(strings.SplitN(p, "-", 2)[1])
}

return ps
Expand Down
4 changes: 2 additions & 2 deletions dgraph/cmd/zero/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,15 +365,15 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error {
checkPreds := func() error {
// Check if any of these tablets is being moved. If so, abort the transaction.
for _, pkey := range src.Preds {
splits := strings.Split(pkey, "-")
splits := strings.SplitN(pkey, "-", 2)
if len(splits) < 2 {
return errors.Errorf("Unable to find group id in %s", pkey)
}
gid, err := strconv.Atoi(splits[0])
if err != nil {
return errors.Wrapf(err, "unable to parse group id from %s", pkey)
}
pred := strings.Join(splits[1:], "-")
pred := splits[1]
tablet := s.ServingTablet(pred)
if tablet == nil {
return errors.Errorf("Tablet for %s is nil", pred)
Expand Down
11 changes: 1 addition & 10 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1110,16 +1110,7 @@ func filterTablets(ctx context.Context, ms *pb.MembershipState) error {
return errors.Errorf("Namespace not found in JWT.")
}
if namespace == x.GalaxyNamespace {
// For galaxy namespace, we don't want to filter out the predicates. We only format the
// namespace to human readable form.
for _, group := range ms.Groups {
tablets := make(map[string]*pb.Tablet)
for tabletName, tablet := range group.Tablets {
tablet.Predicate = x.FormatNsAttr(tablet.Predicate)
tablets[x.FormatNsAttr(tabletName)] = tablet
}
group.Tablets = tablets
}
// For galaxy namespace, we don't want to filter out the predicates.
return nil
}
for _, group := range ms.GetGroups() {
Expand Down
13 changes: 6 additions & 7 deletions graphql/admin/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ func resolveState(ctx context.Context, q schema.Query) *resolve.Resolved {
u := jsonpb.Unmarshaler{}
var ms pb.MembershipState
err = u.Unmarshal(bytes.NewReader(resp.GetJson()), &ms)

if err != nil {
return resolve.EmptyResult(q, err)
}

// map to graphql response structure
state := convertToGraphQLResp(ms)
ns, _ := x.ExtractNamespace(ctx)
// map to graphql response structure. Only guardian of galaxy can list the namespaces.
state := convertToGraphQLResp(ms, ns == x.GalaxyNamespace)
b, err := json.Marshal(state)
if err != nil {
return resolve.EmptyResult(q, err)
Expand All @@ -77,7 +77,7 @@ func resolveState(ctx context.Context, q schema.Query) *resolve.Resolved {
// values and not the keys. For pb.MembershipState.Group, the keys are the group IDs
// and pb.Group didn't contain this ID, so we are creating a custom clusterGroup type,
// which is same as pb.Group and also contains the ID for the group.
func convertToGraphQLResp(ms pb.MembershipState) membershipState {
func convertToGraphQLResp(ms pb.MembershipState, listNs bool) membershipState {
var state membershipState

// namespaces stores set of namespaces
Expand All @@ -92,9 +92,8 @@ func convertToGraphQLResp(ms pb.MembershipState) membershipState {
var tablets = make([]*pb.Tablet, 0, len(v.Tablets))
for name, v1 := range v.Tablets {
tablets = append(tablets, v1)
val, err := x.ExtractNamespaceFromPredicate(name)
if err == nil {
namespaces[val] = struct{}{}
if listNs {
namespaces[x.ParseNamespace(name)] = struct{}{}
}
}
state.Groups = append(state.Groups, clusterGroup{
Expand Down
18 changes: 7 additions & 11 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,16 +592,15 @@ func (r *rebuilder) Run(ctx context.Context) error {

glog.V(1).Infof(
"Rebuilding index for predicate %s: Starting process. StartTs=%d. Prefix=\n%s\n",
x.FormatNsAttr(r.attr), r.startTs, hex.Dump(r.prefix))
r.attr, r.startTs, hex.Dump(r.prefix))

// Counter is used here to ensure that all keys are committed at different timestamp.
// We set it to 1 in case there are no keys found and NewStreamAt is called with ts=0.
var counter uint64 = 1

tmpWriter := tmpDB.NewManagedWriteBatch()
stream := pstore.NewStreamAt(r.startTs)
stream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s (1/2):",
x.FormatNsAttr(r.attr))
stream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s (1/2):", r.attr)
stream.Prefix = r.prefix
stream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) {
// We should return quickly if the context is no longer valid.
Expand Down Expand Up @@ -663,21 +662,19 @@ func (r *rebuilder) Run(ctx context.Context) error {
return err
}
glog.V(1).Infof("Rebuilding index for predicate %s: building temp index took: %v\n",
x.FormatNsAttr(r.attr), time.Since(start))
r.attr, time.Since(start))

// Now we write all the created posting lists to disk.
glog.V(1).Infof("Rebuilding index for predicate %s: writing index to badger",
x.FormatNsAttr(r.attr))
glog.V(1).Infof("Rebuilding index for predicate %s: writing index to badger", r.attr)
start = time.Now()
defer func() {
glog.V(1).Infof("Rebuilding index for predicate %s: writing index took: %v\n",
x.FormatNsAttr(r.attr), time.Since(start))
r.attr, time.Since(start))
}()

writer := pstore.NewManagedWriteBatch()
tmpStream := tmpDB.NewStreamAt(counter)
tmpStream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s (2/2):",
x.FormatNsAttr(r.attr))
tmpStream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s (2/2):", r.attr)
tmpStream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) {
l, err := ReadPostingList(key, itr)
if err != nil {
Expand Down Expand Up @@ -720,8 +717,7 @@ func (r *rebuilder) Run(ctx context.Context) error {
if err := tmpStream.Orchestrate(ctx); err != nil {
return err
}
glog.V(1).Infof("Rebuilding index for predicate %s: Flushing all writes.\n",
x.FormatNsAttr(r.attr))
glog.V(1).Infof("Rebuilding index for predicate %s: Flushing all writes.\n", r.attr)
return writer.Flush()
}

Expand Down
16 changes: 8 additions & 8 deletions posting/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ func TestAddMutation_mrjn2(t *testing.T) {
}

func TestAddMutation_gru(t *testing.T) {
key := x.DataKey("question.tag", 0x01)
key := x.DataKey(x.GalaxyAttr("question.tag"), 0x01)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)

Expand Down Expand Up @@ -592,7 +592,7 @@ func TestAddMutation_gru(t *testing.T) {
}

func TestAddMutation_gru2(t *testing.T) {
key := x.DataKey("question.tag", 0x100)
key := x.DataKey(x.GalaxyAttr("question.tag"), 0x100)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)

Expand Down Expand Up @@ -639,7 +639,7 @@ func TestAddMutation_gru2(t *testing.T) {
func TestAddAndDelMutation(t *testing.T) {
// Ensure each test uses unique key since we don't clear the postings
// after each test
key := x.DataKey("dummy_key", 0x927)
key := x.DataKey(x.GalaxyAttr("dummy_key"), 0x927)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)

Expand Down Expand Up @@ -878,7 +878,7 @@ func createMultiPartList(t *testing.T, size int, addFacet bool) (*List, int) {
defer setMaxListSize(maxListSize)
maxListSize = 5000

key := x.DataKey(uuid.New().String(), 1331)
key := x.DataKey(x.GalaxyAttr(uuid.New().String()), 1331)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
commits := 0
Expand Down Expand Up @@ -926,7 +926,7 @@ func createAndDeleteMultiPartList(t *testing.T, size int) (*List, int) {
defer setMaxListSize(maxListSize)
maxListSize = 10000

key := x.DataKey(uuid.New().String(), 1331)
key := x.DataKey(x.GalaxyAttr(uuid.New().String()), 1331)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
commits := 0
Expand Down Expand Up @@ -1087,7 +1087,7 @@ func TestBinSplit(t *testing.T) {
defer func() {
maxListSize = originalListSize
}()
key := x.DataKey(uuid.New().String(), 1331)
key := x.DataKey(x.GalaxyAttr(uuid.New().String()), 1331)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
for i := 1; i <= size; i++ {
Expand Down Expand Up @@ -1268,7 +1268,7 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) {
maxListSize = 5000

// Add entries to the maps.
key := x.DataKey(uuid.New().String(), 1331)
key := x.DataKey(x.GalaxyAttr(uuid.New().String()), 1331)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
for i := 1; i <= size; i++ {
Expand Down Expand Up @@ -1407,7 +1407,7 @@ func TestRecursiveSplits(t *testing.T) {

// Create a list that should be split recursively.
size := int(1e5)
key := x.DataKey(uuid.New().String(), 1331)
key := x.DataKey(x.GalaxyAttr(uuid.New().String()), 1331)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
commits := 0
Expand Down
20 changes: 14 additions & 6 deletions systest/backup/filesystem/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ var (
alphaBackupDir = "/data/backups"
oldBackupDir1 = "/data/to_restore/1"
oldBackupDir2 = "/data/to_restore/2"
oldBackupDir3 = "/data/to_restore/3"
alphaContainers = []string{
"alpha1",
"alpha2",
Expand Down Expand Up @@ -92,7 +93,8 @@ func TestBackupOfOldRestore(t *testing.T) {
common.DirSetup(t)
common.CopyOldBackupDir(t)

conn, err := grpc.Dial(testutil.SockAddr, grpc.WithTransportCredentials(credentials.NewTLS(testutil.GetAlphaClientConfig(t))))
conn, err := grpc.Dial(testutil.SockAddr,
grpc.WithTransportCredentials(credentials.NewTLS(testutil.GetAlphaClientConfig(t))))
require.NoError(t, err)
dg := dgo.NewDgraphClient(api.NewDgraphClient(conn))
require.NoError(t, err)
Expand All @@ -105,7 +107,8 @@ func TestBackupOfOldRestore(t *testing.T) {
sendRestoreRequest(t, oldBackupDir1)
testutil.WaitForRestore(t, dg, testutil.SockAddrHttp)

resp, err := dg.NewTxn().Query(context.Background(), `{ authors(func: has(Author.name)) { count(uid) } }`)
q := `{ authors(func: has(Author.name)) { count(uid) } }`
resp, err := dg.NewTxn().Query(context.Background(), q)
require.NoError(t, err)
require.JSONEq(t, "{\"authors\":[{\"count\":1}]}", string(resp.Json))

Expand All @@ -117,7 +120,7 @@ func TestBackupOfOldRestore(t *testing.T) {
sendRestoreRequest(t, alphaBackupDir)
testutil.WaitForRestore(t, dg, testutil.SockAddrHttp)

resp, err = dg.NewTxn().Query(context.Background(), `{ authors(func: has(Author.name)) { count(uid) } }`)
resp, err = dg.NewTxn().Query(context.Background(), q)
require.NoError(t, err)
require.JSONEq(t, "{\"authors\":[{\"count\":1}]}", string(resp.Json))
}
Expand Down Expand Up @@ -151,16 +154,19 @@ func TestRestoreOfOldBackup(t *testing.T) {
require.NoError(t, err)
require.JSONEq(t, r, string(resp.Json))
}

queryAndCheck("p1", 0)
queryAndCheck("p2", 2)
queryAndCheck("p3", 0)
queryAndCheck("p4", 2)
}
t.Run("backup of 20.11", func(t *testing.T) { test(oldBackupDir2) })
t.Run("backup of 21.03", func(t *testing.T) { test(oldBackupDir3) })
}

func TestBackupFilesystem(t *testing.T) {
conn, err := grpc.Dial(testutil.SockAddr, grpc.WithTransportCredentials(credentials.NewTLS(testutil.GetAlphaClientConfig(t))))
conn, err := grpc.Dial(testutil.SockAddr,
grpc.WithTransportCredentials(credentials.NewTLS(testutil.GetAlphaClientConfig(t))))
require.NoError(t, err)
dg := dgo.NewDgraphClient(api.NewDgraphClient(conn))

Expand Down Expand Up @@ -432,15 +438,17 @@ func runBackupInternal(t *testing.T, forceFull bool, numExpectedFiles,

var data interface{}
require.NoError(t, json.NewDecoder(resp.Body).Decode(&data))
require.Equal(t, "Success", testutil.JsonGet(data, "data", "backup", "response", "code").(string))
require.Equal(t, "Success",
testutil.JsonGet(data, "data", "backup", "response", "code").(string))
taskId := testutil.JsonGet(data, "data", "backup", "taskId").(string)
testutil.WaitForTask(t, taskId, true, testutil.SockAddrHttp)

// Verify that the right amount of files and directories were created.
common.CopyToLocalFs(t)

files := x.WalkPathFunc(copyBackupDir, func(path string, isdir bool) bool {
return !isdir && strings.HasSuffix(path, ".backup") && strings.HasPrefix(path, "data/backups_copy/dgraph.")
return !isdir && strings.HasSuffix(path, ".backup") &&
strings.HasPrefix(path, "data/backups_copy/dgraph.")
})
require.Equal(t, numExpectedFiles, len(files))

Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
1 change: 1 addition & 0 deletions systest/backup/filesystem/data/to_restore/3/manifest.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"Manifests":[{"type":"full","since":0,"read_ts":9,"groups":{"1":["\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p1","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.p_query","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.xid","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.schema","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.drop.op","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.type","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p3","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p2"],"2":[],"3":[]},"backup_id":"quirky_kapitsa4","backup_num":1,"version":2103,"path":"dgraph.20210517.095641.969","encrypted":false,"drop_operations":null,"compression":"snappy"},{"type":"incremental","since":0,"read_ts":21,"groups":{"1":["\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.drop.op","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p1","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.schema","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.p_query","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p3","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.xid","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p4","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p2","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.type"],"2":[],"3":[]},"backup_id":"quirky_kapitsa4","backup_num":2,"version":2103,"path":"dgraph.20210517.095716.130","encrypted":false,"drop_operations":[{"drop_op":1}],"compression":"snappy"},{"type":"incremental","since":0,"read_ts":26,"groups":{"1":["\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p4","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p2","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.type","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p1","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.schema","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.p_query","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p3","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.drop.op","\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000dgraph.graphql.xid"],"2":[],"3":[]},"backup_id":"quirky_kapitsa4","backup_num":3,"version":2103,"path":"dgraph.20210517.095726.320","encrypted":false,"drop_operations":[{"drop_op":2,"drop_value":"\u0000\u0000\u0000\u0000\u0000\u0000\u0000\u0000p3"}],"compression":"snappy"}]}
4 changes: 0 additions & 4 deletions worker/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,6 @@ func (m *Manifest) getPredsInGroup(gid uint32) predicateSet {

predSet := make(predicateSet)
for _, pred := range preds {
if m.Version == 0 {
// For older versions, preds set will contain attribute without namespace.
pred = x.NamespaceAttr(x.GalaxyNamespace, pred)
}
predSet[pred] = struct{}{}
}
return predSet
Expand Down
8 changes: 4 additions & 4 deletions worker/backup_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func ProcessBackupRequest(ctx context.Context, req *pb.BackupRequest) error {
m := Manifest{
ReadTs: req.ReadTs,
Groups: predMap,
Version: x.DgraphVersion,
Version: x.ManifestVersion,
DropOperations: dropOperations,
Path: dir,
Compression: "snappy",
Expand Down Expand Up @@ -555,14 +555,14 @@ func (pr *BackupProcessor) CompleteBackup(ctx context.Context, m *Manifest) erro
return err
}

manifest, err := GetManifest(handler, uri)
manifest, err := GetManifestNoUpgrade(handler, uri)
if err != nil {
return err
}
manifest.Manifests = append(manifest.Manifests, m)

if err := createManifest(handler, uri, manifest); err != nil {
return errors.Wrap(err, "Complete backup failed")
if err := CreateManifest(handler, uri, manifest); err != nil {
return errors.Wrap(err, "complete backup failed")
}
glog.Infof("Backup completed OK.")
return nil
Expand Down
Loading

0 comments on commit 014b6fe

Please sign in to comment.