Skip to content

Commit

Permalink
fix(live): replace panic in live loader with errors (#7798) (#8944)
Browse files Browse the repository at this point in the history
  • Loading branch information
mangalaman93 authored Aug 16, 2023
1 parent 0a445ca commit 0c2a073
Showing 1 changed file with 19 additions and 7 deletions.
26 changes: 19 additions & 7 deletions dgraph/cmd/live/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func generateQuery(node, predicate, xid string) string {
return sb.String()
}

func (l *loader) upsertUids(nqs []*api.NQuad) {
func (l *loader) upsertUids(nqs []*api.NQuad) error {
// We form upsertPredicate query for each of the ids we saw in the request, along with
// adding the corresponding xid to that uid. The mutation we added is only useful if the
// uid doesn't exists.
Expand Down Expand Up @@ -379,7 +379,7 @@ func (l *loader) upsertUids(nqs []*api.NQuad) {
}

if len(mutations) == 0 {
return
return nil
}

query.WriteRune('}')
Expand All @@ -390,20 +390,26 @@ func (l *loader) upsertUids(nqs []*api.NQuad) {
Query: query.String(),
Mutations: []*api.Mutation{{Set: mutations}},
})
x.Panic(err)
if err != nil {
return err
}

type dResult struct {
Uid string
}

var result map[string][]dResult
x.Panic(json.Unmarshal(resp.GetJson(), &result))
if err := json.Unmarshal(resp.GetJson(), &result); err != nil {
return err
}

for xid, idx := range ids {
// xid already exist in dgraph
if val, ok := result[idx]; ok && len(val) > 0 {
uid, err := strconv.ParseUint(val[0].Uid, 0, 64)
x.Panic(err)
if err != nil {
return err
}

l.alloc.SetUid(xid, uid)
continue
Expand All @@ -412,12 +418,16 @@ func (l *loader) upsertUids(nqs []*api.NQuad) {
// new uid created in dgraph
if val, ok := resp.GetUids()[generateUidFunc(idx)]; ok {
uid, err := strconv.ParseUint(val, 0, 64)
x.Panic(err)
if err != nil {
return err
}

l.alloc.SetUid(xid, uid)
continue
}
}

return nil
}

// allocateUids looks for the maximum uid value in the given NQuads and bumps the
Expand Down Expand Up @@ -540,7 +550,9 @@ func (l *loader) processLoadFile(ctx context.Context, rd *bufio.Reader, ck chunk
// figure out its processing.
// Currently, this option works with data loading in the logged-in namespace.
// TODO(Naman): Add a test for a case when it works and when it doesn't.
l.upsertUids(nqs)
if err := l.upsertUids(nqs); err != nil {
return
}
}

for _, nq := range nqs {
Expand Down

0 comments on commit 0c2a073

Please sign in to comment.