Skip to content

Commit

Permalink
BREAKING: fix json marshal unmarshal for namespace > 127 (#7810)
Browse files Browse the repository at this point in the history
We used to store predicate as <namespace>|<attribute> (pipe | signifies concatenation). We store this as a string. <namespace> is 8 bytes uint64, which when marshaled to JSON bytes mess up the predicate. This is because for the namespace greater than 127, the UTF-8 encoding might take up several bytes (also if the mapping does not exist, then it replaces it with some other rune). This affects three identified places in Dgraph:

Live loader
Backup and List Backup
Http clients and Ratel
Fix:
Fix is to have a UTF-8 string when dealing with JSON. A better idea is to use UTF-8 string even for internal operations. Only when we read/write to badger we convert it into the format of the byte.
New Format: <anmespace>-<attribute> (- is the hyphen literal)

(cherry picked from commit 90d77f3)
  • Loading branch information
NamanJain8 committed May 24, 2021
1 parent 5595f9c commit ea7fe9f
Show file tree
Hide file tree
Showing 10 changed files with 1,045 additions and 133 deletions.
2 changes: 1 addition & 1 deletion dgraph/cmd/alpha/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ func TestTransactionBasic(t *testing.T) {
require.Equal(t, 2, len(mr.preds))
var parsedPreds []string
for _, pred := range mr.preds {
p := strings.Split(pred, "-")[1]
p := strings.SplitN(pred, "-", 2)[1]
parsedPreds = append(parsedPreds, x.ParseAttr(p))
}
sort.Strings(parsedPreds)
Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/alpha/upsert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type QueryResult struct {

func splitPreds(ps []string) []string {
for i, p := range ps {
ps[i] = x.ParseAttr(strings.Split(p, "-")[1])
ps[i] = x.ParseAttr(strings.SplitN(p, "-", 2)[1])
}

return ps
Expand Down
4 changes: 2 additions & 2 deletions dgraph/cmd/zero/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,15 +369,15 @@ func (s *Server) commit(ctx context.Context, src *api.TxnContext) error {
checkPreds := func() error {
// Check if any of these tablets is being moved. If so, abort the transaction.
for _, pkey := range src.Preds {
splits := strings.Split(pkey, "-")
splits := strings.SplitN(pkey, "-", 2)
if len(splits) < 2 {
return errors.Errorf("Unable to find group id in %s", pkey)
}
gid, err := strconv.Atoi(splits[0])
if err != nil {
return errors.Wrapf(err, "unable to parse group id from %s", pkey)
}
pred := strings.Join(splits[1:], "-")
pred := splits[1]
tablet := s.ServingTablet(pred)
if tablet == nil {
return errors.Errorf("Tablet for %s is nil", pred)
Expand Down
11 changes: 1 addition & 10 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1198,16 +1198,7 @@ func filterTablets(ctx context.Context, ms *pb.MembershipState) error {
return errors.Errorf("Namespace not found in JWT.")
}
if namespace == x.GalaxyNamespace {
// For galaxy namespace, we don't want to filter out the predicates. We only format the
// namespace to human readable form.
for _, group := range ms.Groups {
tablets := make(map[string]*pb.Tablet)
for tabletName, tablet := range group.Tablets {
tablet.Predicate = x.FormatNsAttr(tablet.Predicate)
tablets[x.FormatNsAttr(tabletName)] = tablet
}
group.Tablets = tablets
}
// For galaxy namespace, we don't want to filter out the predicates.
return nil
}
for _, group := range ms.GetGroups() {
Expand Down
18 changes: 7 additions & 11 deletions posting/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -579,16 +579,15 @@ func (r *rebuilder) Run(ctx context.Context) error {

glog.V(1).Infof(
"Rebuilding index for predicate %s: Starting process. StartTs=%d. Prefix=\n%s\n",
x.FormatNsAttr(r.attr), r.startTs, hex.Dump(r.prefix))
r.attr, r.startTs, hex.Dump(r.prefix))

// Counter is used here to ensure that all keys are committed at different timestamp.
// We set it to 1 in case there are no keys found and NewStreamAt is called with ts=0.
var counter uint64 = 1

tmpWriter := tmpDB.NewManagedWriteBatch()
stream := pstore.NewStreamAt(r.startTs)
stream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s (1/2):",
x.FormatNsAttr(r.attr))
stream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s (1/2):", r.attr)
stream.Prefix = r.prefix
stream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) {
// We should return quickly if the context is no longer valid.
Expand Down Expand Up @@ -650,21 +649,19 @@ func (r *rebuilder) Run(ctx context.Context) error {
return err
}
glog.V(1).Infof("Rebuilding index for predicate %s: building temp index took: %v\n",
x.FormatNsAttr(r.attr), time.Since(start))
r.attr, time.Since(start))

// Now we write all the created posting lists to disk.
glog.V(1).Infof("Rebuilding index for predicate %s: writing index to badger",
x.FormatNsAttr(r.attr))
glog.V(1).Infof("Rebuilding index for predicate %s: writing index to badger", r.attr)
start = time.Now()
defer func() {
glog.V(1).Infof("Rebuilding index for predicate %s: writing index took: %v\n",
x.FormatNsAttr(r.attr), time.Since(start))
r.attr, time.Since(start))
}()

writer := pstore.NewManagedWriteBatch()
tmpStream := tmpDB.NewStreamAt(counter)
tmpStream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s (2/2):",
x.FormatNsAttr(r.attr))
tmpStream.LogPrefix = fmt.Sprintf("Rebuilding index for predicate %s (2/2):", r.attr)
tmpStream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) {
l, err := ReadPostingList(key, itr)
if err != nil {
Expand Down Expand Up @@ -707,8 +704,7 @@ func (r *rebuilder) Run(ctx context.Context) error {
if err := tmpStream.Orchestrate(ctx); err != nil {
return err
}
glog.V(1).Infof("Rebuilding index for predicate %s: Flushing all writes.\n",
x.FormatNsAttr(r.attr))
glog.V(1).Infof("Rebuilding index for predicate %s: Flushing all writes.\n", r.attr)
return writer.Flush()
}

Expand Down
16 changes: 8 additions & 8 deletions posting/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ func TestAddMutation_mrjn2(t *testing.T) {
}

func TestAddMutation_gru(t *testing.T) {
key := x.DataKey("question.tag", 0x01)
key := x.DataKey(x.GalaxyAttr("question.tag"), 0x01)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)

Expand Down Expand Up @@ -596,7 +596,7 @@ func TestAddMutation_gru(t *testing.T) {
}

func TestAddMutation_gru2(t *testing.T) {
key := x.DataKey("question.tag", 0x100)
key := x.DataKey(x.GalaxyAttr("question.tag"), 0x100)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)

Expand Down Expand Up @@ -643,7 +643,7 @@ func TestAddMutation_gru2(t *testing.T) {
func TestAddAndDelMutation(t *testing.T) {
// Ensure each test uses unique key since we don't clear the postings
// after each test
key := x.DataKey("dummy_key", 0x927)
key := x.DataKey(x.GalaxyAttr("dummy_key"), 0x927)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)

Expand Down Expand Up @@ -882,7 +882,7 @@ func createMultiPartList(t *testing.T, size int, addFacet bool) (*List, int) {
defer setMaxListSize(maxListSize)
maxListSize = 5000

key := x.DataKey(uuid.New().String(), 1331)
key := x.DataKey(x.GalaxyAttr(uuid.New().String()), 1331)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
commits := 0
Expand Down Expand Up @@ -931,7 +931,7 @@ func createAndDeleteMultiPartList(t *testing.T, size int) (*List, int) {
defer setMaxListSize(maxListSize)
maxListSize = 1000

key := x.DataKey(uuid.New().String(), 1331)
key := x.DataKey(x.GalaxyAttr(uuid.New().String()), 1331)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
commits := 0
Expand Down Expand Up @@ -1095,7 +1095,7 @@ func TestBinSplit(t *testing.T) {
defer func() {
maxListSize = originalListSize
}()
key := x.DataKey(uuid.New().String(), 1331)
key := x.DataKey(x.GalaxyAttr(uuid.New().String()), 1331)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
for i := 1; i <= size; i++ {
Expand Down Expand Up @@ -1331,7 +1331,7 @@ func TestMultiPartListDeleteAndAdd(t *testing.T) {
maxListSize = 5000

// Add entries to the maps.
key := x.DataKey(uuid.New().String(), 1331)
key := x.DataKey(x.GalaxyAttr(uuid.New().String()), 1331)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
for i := 1; i <= size; i++ {
Expand Down Expand Up @@ -1472,7 +1472,7 @@ func TestRecursiveSplits(t *testing.T) {

// Create a list that should be split recursively.
size := int(1e5)
key := x.DataKey(uuid.New().String(), 1331)
key := x.DataKey(x.GalaxyAttr(uuid.New().String()), 1331)
ol, err := getNew(key, ps, math.MaxUint64)
require.NoError(t, err)
commits := 0
Expand Down
Loading

0 comments on commit ea7fe9f

Please sign in to comment.