From 3d46ee34ea0df993b7ca94e0c6dec075ebf9556f Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Wed, 21 Nov 2018 21:08:23 -0800 Subject: [PATCH 1/3] Add a lot of OC tracing in query path execution. Run handleUidPostings concurrently using 64 goroutines. --- query/query.go | 9 +- worker/task.go | 283 ++++++++++++++++++++++++++++++++----------------- 2 files changed, 195 insertions(+), 97 deletions(-) diff --git a/query/query.go b/query/query.go index 761b7f855fd..17cb5fdc054 100644 --- a/query/query.go +++ b/query/query.go @@ -1835,7 +1835,14 @@ func getReversePredicates(ctx context.Context) ([]string, error) { // ProcessGraph processes the SubGraph instance accumulating result for the query // from different instances. Note: taskQuery is nil for root node. func ProcessGraph(ctx context.Context, sg, parent *SubGraph, rch chan error) { - ctx, span := otrace.StartSpan(ctx, "query.ProcessGraph") + var suffix string + if len(sg.Params.Alias) > 0 { + suffix += "." + sg.Params.Alias + } + if len(sg.Attr) > 0 { + suffix += "." + sg.Attr + } + ctx, span := otrace.StartSpan(ctx, "query.ProcessGraph"+suffix) defer span.End() if sg.Attr == "uid" { diff --git a/worker/task.go b/worker/task.go index aa11a010a6a..35a41f023e8 100644 --- a/worker/task.go +++ b/worker/task.go @@ -473,119 +473,194 @@ func handleUidPostings(ctx context.Context, args funcArgs, opts posting.ListOpti return err } - for i := 0; i < srcFn.n; i++ { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - var key []byte - switch srcFn.fnType { - case NotAFunction, CompareScalarFn, HasFn, UidInFn: - if q.Reverse { - key = x.ReverseKey(attr, q.UidList.Uids[i]) - } else { - key = x.DataKey(attr, q.UidList.Uids[i]) - } - case GeoFn, RegexFn, FullTextSearchFn, StandardFn, CustomIndexFn: - key = x.IndexKey(attr, srcFn.tokens[i]) - case CompareAttrFn: - key = x.IndexKey(attr, srcFn.tokens[i]) - default: - return x.Errorf("Unhandled function in handleUidPostings: %s", srcFn.fname) - } + span := otrace.FromContext(ctx) + if span != nil { + span.Annotatef(nil, "Number of uids: %d. args.srcFn: %+v", srcFn.n, args.srcFn) + defer func() { + span.Annotate(nil, "Done handleUidPostings.") + }() + } + if srcFn.n == 0 { + return nil + } - // Get or create the posting list for an entity, attribute combination. - pl, err := posting.Get(key) - if err != nil { - return err + // Divide the task into many Goroutines. + numGo, width := 64, 0 + for ; numGo >= 1; numGo /= 2 { + width = srcFn.n / numGo + if numGo == 1 || width >= 256 { + break } + } + glog.Infof("Width: %d. NumGo: %d", width, numGo) + x.AssertTrue(width > 0) + span.Annotatef(nil, "width: %d. numGo: %d", width, numGo) - // get filtered uids and facets. - var filteredRes []*result + errCh := make(chan error, 1) + outputs := make([]*pb.Result, numGo) - var perr error - filteredRes = make([]*result, 0) - err = pl.Postings(opts, func(p *pb.Posting) error { - res := true - res, perr = applyFacetsTree(p.Facets, facetsTree) - if perr != nil { - return posting.ErrStopIteration - } - if res { - filteredRes = append(filteredRes, &result{ - uid: p.Uid, - facets: facets.CopyFacets(p.Facets, q.FacetParam)}) - } - return nil // continue iteration. - }) - if err != nil { - return err - } else if perr != nil { - return perr - } - - // add facets to result. - if q.FacetParam != nil { - var fcsList []*pb.Facets - for _, fres := range filteredRes { - fcsList = append(fcsList, &pb.Facets{Facets: fres.facets}) - } - out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{FacetsList: fcsList}) - } + calculate := func(start, end int) error { + glog.Infof("start, end: %d, %d\n", start, end) + x.AssertTrue(start%width == 0) + out = &pb.Result{} + outputs[start/width] = out - switch { - case q.DoCount: - len := pl.Length(args.q.ReadTs, 0) - if len == -1 { - return posting.ErrTsTooOld - } - out.Counts = append(out.Counts, uint32(len)) - // Add an empty UID list to make later processing consistent - out.UidMatrix = append(out.UidMatrix, &emptyUIDList) - case srcFn.fnType == CompareScalarFn: - len := pl.Length(args.q.ReadTs, 0) - if len == -1 { - return posting.ErrTsTooOld + for i := start; i < end; i++ { + if i%100 == 0 { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } } - count := int64(len) - if EvalCompare(srcFn.fname, count, srcFn.threshold) { - tlist := &pb.List{Uids: []uint64{q.UidList.Uids[i]}} - out.UidMatrix = append(out.UidMatrix, tlist) + var key []byte + switch srcFn.fnType { + case NotAFunction, CompareScalarFn, HasFn, UidInFn: + if q.Reverse { + key = x.ReverseKey(attr, q.UidList.Uids[i]) + } else { + key = x.DataKey(attr, q.UidList.Uids[i]) + } + case GeoFn, RegexFn, FullTextSearchFn, StandardFn, CustomIndexFn: + key = x.IndexKey(attr, srcFn.tokens[i]) + case CompareAttrFn: + key = x.IndexKey(attr, srcFn.tokens[i]) + default: + return x.Errorf("Unhandled function in handleUidPostings: %s", srcFn.fname) } - case srcFn.fnType == HasFn: - empty, err := pl.IsEmpty(args.q.ReadTs, 0) + + // Get or create the posting list for an entity, attribute combination. + pl, err := posting.Get(key) if err != nil { return err } - if !empty { - tlist := &pb.List{Uids: []uint64{q.UidList.Uids[i]}} - out.UidMatrix = append(out.UidMatrix, tlist) - } - case srcFn.fnType == UidInFn: - reqList := &pb.List{Uids: []uint64{srcFn.uidPresent}} - topts := posting.ListOptions{ - ReadTs: args.q.ReadTs, - AfterUID: 0, - Intersect: reqList, - } - plist, err := pl.Uids(topts) + + // get filtered uids and facets. + var filteredRes []*result + + var perr error + filteredRes = make([]*result, 0) + err = pl.Postings(opts, func(p *pb.Posting) error { + res := true + res, perr = applyFacetsTree(p.Facets, facetsTree) + if perr != nil { + return posting.ErrStopIteration + } + if res { + filteredRes = append(filteredRes, &result{ + uid: p.Uid, + facets: facets.CopyFacets(p.Facets, q.FacetParam)}) + } + return nil // continue iteration. + }) if err != nil { return err + } else if perr != nil { + return perr } - if len(plist.Uids) > 0 { - tlist := &pb.List{Uids: []uint64{q.UidList.Uids[i]}} - out.UidMatrix = append(out.UidMatrix, tlist) + + // add facets to result. + if q.FacetParam != nil { + var fcsList []*pb.Facets + for _, fres := range filteredRes { + fcsList = append(fcsList, &pb.Facets{Facets: fres.facets}) + } + out.FacetMatrix = append(out.FacetMatrix, &pb.FacetsList{FacetsList: fcsList}) } - default: - // The more usual case: Getting the UIDs. - uidList := new(pb.List) - for _, fres := range filteredRes { - uidList.Uids = append(uidList.Uids, fres.uid) + + switch { + case q.DoCount: + if i == 0 { + span.Annotate(nil, "DoCount") + } + len := pl.Length(args.q.ReadTs, 0) + if len == -1 { + return posting.ErrTsTooOld + } + out.Counts = append(out.Counts, uint32(len)) + // Add an empty UID list to make later processing consistent + out.UidMatrix = append(out.UidMatrix, &emptyUIDList) + case srcFn.fnType == CompareScalarFn: + if i == 0 { + span.Annotate(nil, "CompareScalarFn") + } + len := pl.Length(args.q.ReadTs, 0) + if len == -1 { + return posting.ErrTsTooOld + } + count := int64(len) + if EvalCompare(srcFn.fname, count, srcFn.threshold) { + tlist := &pb.List{Uids: []uint64{q.UidList.Uids[i]}} + out.UidMatrix = append(out.UidMatrix, tlist) + } + case srcFn.fnType == HasFn: + if i == 0 { + span.Annotate(nil, "HasFn") + } + empty, err := pl.IsEmpty(args.q.ReadTs, 0) + if err != nil { + return err + } + if !empty { + tlist := &pb.List{Uids: []uint64{q.UidList.Uids[i]}} + out.UidMatrix = append(out.UidMatrix, tlist) + } + case srcFn.fnType == UidInFn: + if i == 0 { + span.Annotate(nil, "UidInFn") + } + reqList := &pb.List{Uids: []uint64{srcFn.uidPresent}} + topts := posting.ListOptions{ + ReadTs: args.q.ReadTs, + AfterUID: 0, + Intersect: reqList, + } + plist, err := pl.Uids(topts) + if err != nil { + return err + } + if len(plist.Uids) > 0 { + tlist := &pb.List{Uids: []uint64{q.UidList.Uids[i]}} + out.UidMatrix = append(out.UidMatrix, tlist) + } + default: + if i == 0 { + span.Annotate(nil, "default") + } + // The more usual case: Getting the UIDs. + uidList := new(pb.List) + for _, fres := range filteredRes { + uidList.Uids = append(uidList.Uids, fres.uid) + } + out.UidMatrix = append(out.UidMatrix, uidList) } - out.UidMatrix = append(out.UidMatrix, uidList) } + return nil + } // End of calculate function. + + for i := 0; i < numGo; i++ { + start := i * width + end := start + width + if end > srcFn.n { + end = srcFn.n + } + go func(start, end int) { + errCh <- calculate(start, end) + }(start, end) + } + for i := 0; i < numGo; i++ { + if err := <-errCh; err != nil { + return err + } + } + // All goroutines are done. Now attach their results. + for _, chunk := range outputs { + out.FacetMatrix = append(out.FacetMatrix, chunk.FacetMatrix...) + out.Counts = append(out.Counts, chunk.Counts...) + out.UidMatrix = append(out.UidMatrix, chunk.UidMatrix...) } + + glog.Infof("Size of uidmatrix: %d", len(out.UidMatrix)) return nil } @@ -601,6 +676,9 @@ func processTask(ctx context.Context, q *pb.Query, gid uint32) (*pb.Result, erro span.Annotatef(nil, "Done waiting for maxAssigned. Attr: %q ReadTs: %d Max: %d", q.Attr, q.ReadTs, maxAssigned) } + defer func() { + span.Annotatef(nil, "Done processTask for %q", q.Attr) + }() // If a group stops serving tablet and it gets partitioned away from group zero, then it // wouldn't know that this group is no longer serving this predicate. @@ -617,6 +695,7 @@ func processTask(ctx context.Context, q *pb.Query, gid uint32) (*pb.Result, erro } func helpProcessTask(ctx context.Context, q *pb.Query, gid uint32) (*pb.Result, error) { + span := otrace.FromContext(ctx) out := new(pb.Result) attr := q.Attr @@ -664,22 +743,26 @@ func helpProcessTask(ctx context.Context, q *pb.Query, gid uint32) (*pb.Result, return nil, err } if needsValPostings { + span.Annotate(nil, "handleValuePostings") if err = handleValuePostings(ctx, args); err != nil { return nil, err } } else { + span.Annotate(nil, "handleUidPostings") if err = handleUidPostings(ctx, args, opts); err != nil { return nil, err } } if srcFn.fnType == HasFn && srcFn.isFuncAtRoot { + span.Annotate(nil, "handleHasFunction") if err := handleHasFunction(ctx, q, out); err != nil { return nil, err } } if srcFn.fnType == CompareScalarFn && srcFn.isFuncAtRoot { + span.Annotate(nil, "handleCompareScalarFunction") if err := handleCompareScalarFunction(funcArgs{q, gid, srcFn, out}); err != nil { return nil, err } @@ -688,6 +771,7 @@ func helpProcessTask(ctx context.Context, q *pb.Query, gid uint32) (*pb.Result, if srcFn.fnType == RegexFn { // Go through the indexkeys for the predicate and match them with // the regex matcher. + span.Annotate(nil, "handleRegexFunction") if err := handleRegexFunction(ctx, funcArgs{q, gid, srcFn, out}); err != nil { return nil, err } @@ -696,6 +780,7 @@ func helpProcessTask(ctx context.Context, q *pb.Query, gid uint32) (*pb.Result, // We fetch the actual value for the uids, compare them to the value in the // request and filter the uids only if the tokenizer IsLossy. if srcFn.fnType == CompareAttrFn && len(srcFn.tokens) > 0 { + span.Annotate(nil, "handleCompareFunction") if err := handleCompareFunction(ctx, funcArgs{q, gid, srcFn, out}); err != nil { return nil, err } @@ -703,11 +788,13 @@ func helpProcessTask(ctx context.Context, q *pb.Query, gid uint32) (*pb.Result, // If geo filter, do value check for correctness. if srcFn.geoQuery != nil { + span.Annotate(nil, "handleGeoFunction") filterGeoFunction(funcArgs{q, gid, srcFn, out}) } // For string matching functions, check the language. if needsStringFiltering(srcFn, q.Langs, attr) { + span.Annotate(nil, "filterStringFunction") filterStringFunction(funcArgs{q, gid, srcFn, out}) } @@ -1625,6 +1712,7 @@ func (cp *countParams) evaluate(out *pb.Result) error { } func handleHasFunction(ctx context.Context, q *pb.Query, out *pb.Result) error { + span := otrace.FromContext(ctx) if glog.V(3) { glog.Infof("handleHasFunction query: %+v\n", q) } @@ -1698,6 +1786,9 @@ func handleHasFunction(ctx context.Context, q *pb.Query, out *pb.Result) error { } } } + if span != nil { + span.Annotatef(nil, "handleHasFunction found %d uids", len(result.Uids)) + } out.UidMatrix = append(out.UidMatrix, result) return nil } From 68a6daa47fda5d17fbf22eecae25f393bffabe99 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Wed, 21 Nov 2018 23:51:20 -0800 Subject: [PATCH 2/3] Fix bug, this works. --- worker/task.go | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/worker/task.go b/worker/task.go index 35a41f023e8..ea785c3cb2e 100644 --- a/worker/task.go +++ b/worker/task.go @@ -465,8 +465,6 @@ func handleValuePostings(ctx context.Context, args funcArgs) error { func handleUidPostings(ctx context.Context, args funcArgs, opts posting.ListOptions) error { srcFn := args.srcFn q := args.q - attr := q.Attr - out := args.out facetsTree, err := preprocessFilter(q.FacetsFilter) if err != nil { @@ -492,17 +490,15 @@ func handleUidPostings(ctx context.Context, args funcArgs, opts posting.ListOpti break } } - glog.Infof("Width: %d. NumGo: %d", width, numGo) x.AssertTrue(width > 0) - span.Annotatef(nil, "width: %d. numGo: %d", width, numGo) + span.Annotatef(nil, "Width: %d. NumGo: %d", width, numGo) errCh := make(chan error, 1) outputs := make([]*pb.Result, numGo) calculate := func(start, end int) error { - glog.Infof("start, end: %d, %d\n", start, end) x.AssertTrue(start%width == 0) - out = &pb.Result{} + out := &pb.Result{} outputs[start/width] = out for i := start; i < end; i++ { @@ -517,14 +513,14 @@ func handleUidPostings(ctx context.Context, args funcArgs, opts posting.ListOpti switch srcFn.fnType { case NotAFunction, CompareScalarFn, HasFn, UidInFn: if q.Reverse { - key = x.ReverseKey(attr, q.UidList.Uids[i]) + key = x.ReverseKey(q.Attr, q.UidList.Uids[i]) } else { - key = x.DataKey(attr, q.UidList.Uids[i]) + key = x.DataKey(q.Attr, q.UidList.Uids[i]) } case GeoFn, RegexFn, FullTextSearchFn, StandardFn, CustomIndexFn: - key = x.IndexKey(attr, srcFn.tokens[i]) + key = x.IndexKey(q.Attr, srcFn.tokens[i]) case CompareAttrFn: - key = x.IndexKey(attr, srcFn.tokens[i]) + key = x.IndexKey(q.Attr, srcFn.tokens[i]) default: return x.Errorf("Unhandled function in handleUidPostings: %s", srcFn.fname) } @@ -654,13 +650,12 @@ func handleUidPostings(ctx context.Context, args funcArgs, opts posting.ListOpti } } // All goroutines are done. Now attach their results. + out := args.out for _, chunk := range outputs { out.FacetMatrix = append(out.FacetMatrix, chunk.FacetMatrix...) out.Counts = append(out.Counts, chunk.Counts...) out.UidMatrix = append(out.UidMatrix, chunk.UidMatrix...) } - - glog.Infof("Size of uidmatrix: %d", len(out.UidMatrix)) return nil } From 125b90b0c881b9ce506dbc30387c45fbf19d9016 Mon Sep 17 00:00:00 2001 From: Manish R Jain Date: Wed, 21 Nov 2018 23:56:10 -0800 Subject: [PATCH 3/3] Self review --- worker/task.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/worker/task.go b/worker/task.go index ea785c3cb2e..3a464bf47ff 100644 --- a/worker/task.go +++ b/worker/task.go @@ -482,7 +482,7 @@ func handleUidPostings(ctx context.Context, args funcArgs, opts posting.ListOpti return nil } - // Divide the task into many Goroutines. + // Divide the task into many goroutines. numGo, width := 64, 0 for ; numGo >= 1; numGo /= 2 { width = srcFn.n / numGo