From c1726f4d79bc7aee9d10463fc9be0c5dbf0e80d5 Mon Sep 17 00:00:00 2001 From: Aman Mangal Date: Wed, 11 Mar 2020 17:09:56 +0000 Subject: [PATCH] Perform indexing in background (#4819) --- chunker/json_parser_test.go | 5 +- contrib/integration/testtxn/main_test.go | 14 +- dgraph/cmd/alpha/http_test.go | 2 +- dgraph/cmd/alpha/reindex_test.go | 53 +++++ dgraph/cmd/alpha/run_test.go | 62 ++++- dgraph/cmd/alpha/upsert_test.go | 2 - dgraph/cmd/live/load-json/load_test.go | 2 +- dgraph/cmd/live/run.go | 7 +- edgraph/server.go | 34 +-- go.mod | 2 +- go.sum | 2 + posting/index.go | 190 +++++++++------ posting/index_test.go | 39 ++-- posting/list.go | 4 +- query/common_test.go | 4 + query/query0_test.go | 1 - schema/parse.go | 1 + schema/parse_test.go | 15 +- schema/schema.go | 135 ++++++++--- systest/1million/1million_test.go | 14 +- systest/1million/test-reindex.sh | 1 + systest/bgindex/common_test.go | 55 +++++ systest/bgindex/count_test.go | 284 +++++++++++++++++++++++ systest/bgindex/docker-compose.yml | 193 +++++++++++++++ systest/bgindex/parallel_test.go | 203 ++++++++++++++++ systest/bgindex/reverse_test.go | 283 ++++++++++++++++++++++ systest/bgindex/string_test.go | 274 ++++++++++++++++++++++ systest/bgindex/test-bgindex.sh | 43 ++++ systest/mutations_test.go | 111 +++++---- systest/plugin_test.go | 7 + systest/queries_test.go | 72 +++--- test.sh | 3 + testutil/client.go | 105 +++++++-- tlstest/acl/acl_over_tls_test.go | 13 +- wiki/content/clients/index.md | 5 + worker/draft.go | 12 +- worker/groups.go | 3 +- worker/mutation.go | 152 ++++++++---- worker/proposal.go | 3 +- worker/schema.go | 11 +- worker/sort.go | 4 +- worker/task.go | 30 +-- worker/tokens.go | 29 +-- 43 files changed, 2131 insertions(+), 353 deletions(-) create mode 100644 systest/bgindex/common_test.go create mode 100644 systest/bgindex/count_test.go create mode 100644 systest/bgindex/docker-compose.yml create mode 100644 systest/bgindex/parallel_test.go create mode 100644 systest/bgindex/reverse_test.go create mode 100644 systest/bgindex/string_test.go create mode 100755 systest/bgindex/test-bgindex.sh diff --git a/chunker/json_parser_test.go b/chunker/json_parser_test.go index 0ab10f88ee1..3d653b163e2 100644 --- a/chunker/json_parser_test.go +++ b/chunker/json_parser_test.go @@ -87,6 +87,7 @@ func (exp *Experiment) verify() { require.NoError(exp.t, dg.Alter(ctx, &api.Operation{DropAll: true}), "drop all failed") require.NoError(exp.t, dg.Alter(ctx, &api.Operation{Schema: exp.schema}), "schema change failed") + require.NoError(exp.t, testutil.WaitForAlter(ctx, dg, exp.schema)) _, err = dg.NewTxn().Mutate(ctx, &api.Mutation{Set: exp.nqs, CommitNow: true}) @@ -134,14 +135,14 @@ func TestNquadsFromJson1(t *testing.T) { name age married -address +address }}`, expected: `{"alice": [ {"name": "Alice", "age": 26, "married": true, "address": {"coordinates": [2,1.1], "type": "Point"}} -]} +]} `} exp.verify() } diff --git a/contrib/integration/testtxn/main_test.go b/contrib/integration/testtxn/main_test.go index 18e8349dc60..d6040d29c73 100644 --- a/contrib/integration/testtxn/main_test.go +++ b/contrib/integration/testtxn/main_test.go @@ -371,6 +371,9 @@ func TestIgnoreIndexConflict(t *testing.T) { if err := s.dg.Alter(context.Background(), op); err != nil { log.Fatal(err) } + if err := testutil.WaitForAlter(context.Background(), s.dg, op.Schema); err != nil { + log.Fatal(err) + } txn := s.dg.NewTxn() mu := &api.Mutation{} @@ -424,9 +427,11 @@ func TestReadIndexKeySameTxn(t *testing.T) { if err := s.dg.Alter(context.Background(), op); err != nil { log.Fatal(err) } + if err := testutil.WaitForAlter(context.Background(), s.dg, op.Schema); err != nil { + log.Fatal(err) + } txn := s.dg.NewTxn() - mu := &api.Mutation{ CommitNow: true, SetJson: []byte(`{"name": "Manish"}`), @@ -933,8 +938,7 @@ func TestTxnDiscardBeforeCommit(t *testing.T) { } func alterSchema(dg *dgo.Dgraph, schema string) { - op := api.Operation{} - op.Schema = schema - err := dg.Alter(ctxb, &op) - x.Check(err) + op := api.Operation{Schema: schema} + x.Check(dg.Alter(ctxb, &op)) + x.Check(testutil.WaitForAlter(ctxb, dg, schema)) } diff --git a/dgraph/cmd/alpha/http_test.go b/dgraph/cmd/alpha/http_test.go index f3b5e9ce852..48c768361e1 100644 --- a/dgraph/cmd/alpha/http_test.go +++ b/dgraph/cmd/alpha/http_test.go @@ -57,7 +57,7 @@ func runGzipWithRetry(contentType, url string, buf io.Reader, gzReq, gzResp bool *http.Response, error) { client := &http.Client{} - numRetries := 2 + numRetries := 3 var resp *http.Response var err error diff --git a/dgraph/cmd/alpha/reindex_test.go b/dgraph/cmd/alpha/reindex_test.go index 1043e2ff3fa..62dcc8c8c19 100644 --- a/dgraph/cmd/alpha/reindex_test.go +++ b/dgraph/cmd/alpha/reindex_test.go @@ -17,7 +17,9 @@ package alpha import ( + "strings" "testing" + "time" "github.com/stretchr/testify/require" ) @@ -185,3 +187,54 @@ func TestReindexReverseCount(t *testing.T) { } }`, res) } + +func checkSchema(t *testing.T, query, key string) { + for i := 0; i < 10; i++ { + res, _, err := queryWithTs(query, "application/graphql+-", "", 0) + require.NoError(t, err) + if strings.Contains(res, key) { + return + } + time.Sleep(100 * time.Millisecond) + + if i == 9 { + t.Fatalf("expected %v, got schema: %v", key, res) + } + } +} + +func TestBgIndexSchemaReverse(t *testing.T) { + require.NoError(t, dropAll()) + q1 := `schema(pred: [value]) {}` + require.NoError(t, alterSchema(`value: [uid] .`)) + checkSchema(t, q1, "list") + require.NoError(t, alterSchema(`value: [uid] @count @reverse .`)) + checkSchema(t, q1, "reverse") +} + +func TestBgIndexSchemaTokenizers(t *testing.T) { + require.NoError(t, dropAll()) + q1 := `schema(pred: [value]) {}` + require.NoError(t, alterSchema(`value: string @index(fulltext, hash) .`)) + checkSchema(t, q1, "fulltext") + require.NoError(t, alterSchema(`value: string @index(term, hash) @upsert .`)) + checkSchema(t, q1, "term") +} + +func TestBgIndexSchemaCount(t *testing.T) { + require.NoError(t, dropAll()) + q1 := `schema(pred: [value]) {}` + require.NoError(t, alterSchema(`value: [uid] @count .`)) + checkSchema(t, q1, "count") + require.NoError(t, alterSchema(`value: [uid] @reverse .`)) + checkSchema(t, q1, "reverse") +} + +func TestBgIndexSchemaReverseAndCount(t *testing.T) { + require.NoError(t, dropAll()) + q1 := `schema(pred: [value]) {}` + require.NoError(t, alterSchema(`value: [uid] @reverse .`)) + checkSchema(t, q1, "reverse") + require.NoError(t, alterSchema(`value: [uid] @count .`)) + checkSchema(t, q1, "count") +} diff --git a/dgraph/cmd/alpha/run_test.go b/dgraph/cmd/alpha/run_test.go index 7000ce22f40..3a724dc6fed 100644 --- a/dgraph/cmd/alpha/run_test.go +++ b/dgraph/cmd/alpha/run_test.go @@ -107,10 +107,22 @@ func runJSONMutation(m string) error { } func alterSchema(s string) error { - _, _, err := runWithRetries("PUT", "", addr+"/alter", s) - if err != nil { - return errors.Wrapf(err, "while running request with retries") + for { + _, _, err := runWithRetries("PUT", "", addr+"/alter", s) + if err != nil && strings.Contains(err.Error(), "is already being modified") { + time.Sleep(time.Second) + continue + } else if err != nil { + return errors.Wrapf(err, "while running request with retries") + } else { + break + } } + + if err := waitForAlter(s); err != nil { + return errors.Wrapf(err, "while waiting for alter to complete") + } + return nil } @@ -124,6 +136,48 @@ func alterSchemaWithRetry(s string) error { return err } +// waitForAlter waits for the alter operation to complete. +func waitForAlter(s string) error { + ps, err := schema.Parse(s) + if err != nil { + return err + } + + for { + resp, _, err := queryWithTs("schema{}", "application/graphql+-", "false", 0) + if err != nil { + return err + } + + var result struct { + Data struct { + Schema []*pb.SchemaNode + } + } + if err := json.Unmarshal([]byte(resp), &result); err != nil { + return err + } + + actual := make(map[string]*pb.SchemaNode) + for _, rs := range result.Data.Schema { + actual[rs.Predicate] = rs + } + + done := true + for _, su := range ps.Preds { + if n, ok := actual[su.Predicate]; !ok || !testutil.SameIndexes(su, n) { + done = false + break + } + } + if done { + return nil + } + + time.Sleep(time.Second) + } +} + func dropAll() error { op := `{"drop_all": true}` _, _, err := runWithRetries("PUT", "", addr+"/alter", op) @@ -405,7 +459,7 @@ func TestSchemaMutationUidError1(t *testing.T) { var s2 = ` friend: uid . ` - require.Error(t, alterSchemaWithRetry(s2)) + require.Error(t, alterSchema(s2)) } // add index diff --git a/dgraph/cmd/alpha/upsert_test.go b/dgraph/cmd/alpha/upsert_test.go index b76c44e286b..7151aa95d4b 100644 --- a/dgraph/cmd/alpha/upsert_test.go +++ b/dgraph/cmd/alpha/upsert_test.go @@ -1613,8 +1613,6 @@ func TestUpsertWithValueVar(t *testing.T) { require.NoError(t, alterSchema(`amount: int .`)) res, err := mutationWithTs(`{ set { _:p "0" . } }`, "application/rdf", false, true, 0) require.NoError(t, err) - b, _ := json.MarshalIndent(res, "", " ") - fmt.Printf("%s\n", b) const ( // this upsert block increments the value of the counter by one diff --git a/dgraph/cmd/live/load-json/load_test.go b/dgraph/cmd/live/load-json/load_test.go index f0bfe849e2c..43b04062a3a 100644 --- a/dgraph/cmd/live/load-json/load_test.go +++ b/dgraph/cmd/live/load-json/load_test.go @@ -26,9 +26,9 @@ import ( "strings" "testing" - "github.com/dgraph-io/dgo/v2" "github.com/stretchr/testify/require" + "github.com/dgraph-io/dgo/v2" "github.com/dgraph-io/dgraph/testutil" "github.com/dgraph-io/dgraph/x" ) diff --git a/dgraph/cmd/live/run.go b/dgraph/cmd/live/run.go index 4f5f998f84b..b147d71192d 100644 --- a/dgraph/cmd/live/run.go +++ b/dgraph/cmd/live/run.go @@ -43,6 +43,7 @@ import ( "github.com/dgraph-io/dgo/v2/protos/api" "github.com/dgraph-io/dgraph/chunker" + "github.com/dgraph-io/dgraph/testutil" "github.com/dgraph-io/dgraph/types" "github.com/dgraph-io/dgraph/x" "github.com/dgraph-io/dgraph/xidmap" @@ -194,7 +195,11 @@ func processSchemaFile(ctx context.Context, file string, dgraphClient *dgo.Dgrap op := &api.Operation{} op.Schema = string(b) - return dgraphClient.Alter(ctx, op) + if err := dgraphClient.Alter(ctx, op); err != nil { + return err + } + // TODO(Aman): avoid using functions from testutil. + return testutil.WaitForAlter(ctx, dgraphClient, op.Schema) } func (l *loader) uid(val string) string { diff --git a/edgraph/server.go b/edgraph/server.go index 1c1c40e652d..9fe91ca89e5 100644 --- a/edgraph/server.go +++ b/edgraph/server.go @@ -28,9 +28,20 @@ import ( "time" "unicode" + "github.com/gogo/protobuf/jsonpb" + "github.com/golang/glog" + "github.com/pkg/errors" + ostats "go.opencensus.io/stats" + "go.opencensus.io/tag" + "go.opencensus.io/trace" + otrace "go.opencensus.io/trace" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/peer" + "google.golang.org/grpc/status" + "github.com/dgraph-io/dgo/v2" "github.com/dgraph-io/dgo/v2/protos/api" - "github.com/dgraph-io/dgraph/chunker" "github.com/dgraph-io/dgraph/conn" "github.com/dgraph-io/dgraph/dgraph/cmd/zero" @@ -44,18 +55,6 @@ import ( "github.com/dgraph-io/dgraph/types/facets" "github.com/dgraph-io/dgraph/worker" "github.com/dgraph-io/dgraph/x" - "github.com/gogo/protobuf/jsonpb" - "github.com/golang/glog" - "github.com/pkg/errors" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/metadata" - "google.golang.org/grpc/peer" - "google.golang.org/grpc/status" - - ostats "go.opencensus.io/stats" - "go.opencensus.io/tag" - "go.opencensus.io/trace" - otrace "go.opencensus.io/trace" ) const ( @@ -88,6 +87,10 @@ var ( numGraphQL uint64 ) +var ( + errIndexingInProgress = errors.New("schema is already being modified. Please retry") +) + // Server implements protos.DgraphServer type Server struct{} @@ -241,6 +244,11 @@ func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, er return empty, err } + // If a background task is already running, we should reject all the new alter requests. + if schema.State().IndexingInProgress() { + return nil, errIndexingInProgress + } + for _, update := range result.Preds { // Reserved predicates cannot be altered but let the update go through // if the update is equal to the existing one. diff --git a/go.mod b/go.mod index 3bda990bbda..0cc513b158a 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/blevesearch/snowballstem v0.0.0-20180110192139-26b06a2c243d // indirect github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd github.com/dgraph-io/badger/v2 v2.0.1-0.20191220102048-ab4352b00a17 - github.com/dgraph-io/dgo/v2 v2.1.1-0.20191127085444-c7a02678e8a6 + github.com/dgraph-io/dgo/v2 v2.2.0 github.com/dgraph-io/ristretto v0.0.1 // indirect github.com/dgrijalva/jwt-go v3.2.0+incompatible github.com/dgryski/go-farm v0.0.0-20191112170834-c2139c5d712b diff --git a/go.sum b/go.sum index 5ea709738b1..c036b683556 100644 --- a/go.sum +++ b/go.sum @@ -69,6 +69,8 @@ github.com/dgraph-io/badger/v2 v2.0.1-0.20191220102048-ab4352b00a17 h1:BxXd8isFV github.com/dgraph-io/badger/v2 v2.0.1-0.20191220102048-ab4352b00a17/go.mod h1:YoRSIp1LmAJ7zH7tZwRvjNMUYLxB4wl3ebYkaIruZ04= github.com/dgraph-io/dgo/v2 v2.1.1-0.20191127085444-c7a02678e8a6 h1:5leDFqGys055YO3TbghBhk/QdRPEwyLPdgsSJfiR20I= github.com/dgraph-io/dgo/v2 v2.1.1-0.20191127085444-c7a02678e8a6/go.mod h1:LJCkLxm5fUMcU+yb8gHFjHt7ChgNuz3YnQQ6MQkmscI= +github.com/dgraph-io/dgo/v2 v2.2.0 h1:qYbm6mEF3wuKiRpgNOldk6PmPbBJFwj6vL7I7dTSdyc= +github.com/dgraph-io/dgo/v2 v2.2.0/go.mod h1:LJCkLxm5fUMcU+yb8gHFjHt7ChgNuz3YnQQ6MQkmscI= github.com/dgraph-io/ristretto v0.0.0-20191025175511-c1f00be0418e/go.mod h1:edzKIzGvqUCMzhTVWbiTSe75zD9Xxq0GtSBtFmaUTZs= github.com/dgraph-io/ristretto v0.0.1 h1:cJwdnj42uV8Jg4+KLrYovLiCgIfz9wtWm6E6KA+1tLs= github.com/dgraph-io/ristretto v0.0.1/go.mod h1:T40EBc7CJke8TkpiYfGGKAeFjSaxuFXhuXRyumBd6RE= diff --git a/posting/index.go b/posting/index.go index cb8b85e41f4..47a8e63e949 100644 --- a/posting/index.go +++ b/posting/index.go @@ -24,11 +24,11 @@ import ( "io/ioutil" "math" "os" - "path/filepath" "sync/atomic" "time" "github.com/golang/glog" + "github.com/pkg/errors" ostats "go.opencensus.io/stats" otrace "go.opencensus.io/trace" @@ -40,7 +40,6 @@ import ( "github.com/dgraph-io/dgraph/tok" "github.com/dgraph-io/dgraph/types" "github.com/dgraph-io/dgraph/x" - "github.com/pkg/errors" ) var emptyCountParams countParams @@ -52,9 +51,9 @@ type indexMutationInfo struct { op pb.DirectedEdge_Op } -// indexTokensforTokenizers return tokens, without the predicate prefix and +// indexTokens return tokens, without the predicate prefix and // index rune, for specific tokenizers. -func indexTokens(info *indexMutationInfo) ([]string, error) { +func indexTokens(ctx context.Context, info *indexMutationInfo) ([]string, error) { attr := info.edge.Attr lang := info.edge.GetLang() @@ -63,7 +62,7 @@ func indexTokens(info *indexMutationInfo) ([]string, error) { return nil, errors.Errorf("Cannot index attribute %s of type object.", attr) } - if !schema.State().IsIndexed(attr) { + if !schema.State().IsIndexed(ctx, attr) { return nil, errors.Errorf("Attribute %s is not indexed.", attr) } sv, err := types.Convert(info.val, schemaType) @@ -87,14 +86,13 @@ func indexTokens(info *indexMutationInfo) ([]string, error) { // TODO - See if we need to pass op as argument as t should already have Op. func (txn *Txn) addIndexMutations(ctx context.Context, info *indexMutationInfo) error { if info.tokenizers == nil { - info.tokenizers = schema.State().Tokenizer(info.edge.Attr) + info.tokenizers = schema.State().Tokenizer(ctx, info.edge.Attr) } attr := info.edge.Attr uid := info.edge.Entity x.AssertTrue(uid != 0) - tokens, err := indexTokens(info) - + tokens, err := indexTokens(ctx, info) if err != nil { // This data is not indexable return err @@ -115,10 +113,8 @@ func (txn *Txn) addIndexMutations(ctx context.Context, info *indexMutationInfo) return nil } -func (txn *Txn) addIndexMutation(ctx context.Context, edge *pb.DirectedEdge, - token string) error { +func (txn *Txn) addIndexMutation(ctx context.Context, edge *pb.DirectedEdge, token string) error { key := x.IndexKey(edge.Attr, token) - plist, err := txn.cache.GetFromDelta(key) if err != nil { return err @@ -198,7 +194,7 @@ func (txn *Txn) addReverseMutation(ctx context.Context, t *pb.DirectedEdge) erro func (txn *Txn) addReverseAndCountMutation(ctx context.Context, t *pb.DirectedEdge) error { key := x.ReverseKey(t.Attr, t.ValueId) - hasCountIndex := schema.State().HasCount(t.Attr) + hasCountIndex := schema.State().HasCount(ctx, t.Attr) var getFn func(key []byte) (*List, error) if hasCountIndex { @@ -238,11 +234,10 @@ func (txn *Txn) addReverseAndCountMutation(ctx context.Context, t *pb.DirectedEd return nil } -func (l *List) handleDeleteAll(ctx context.Context, edge *pb.DirectedEdge, - txn *Txn) error { - isReversed := schema.State().IsReversed(edge.Attr) - isIndexed := schema.State().IsIndexed(edge.Attr) - hasCount := schema.State().HasCount(edge.Attr) +func (l *List) handleDeleteAll(ctx context.Context, edge *pb.DirectedEdge, txn *Txn) error { + isReversed := schema.State().IsReversed(ctx, edge.Attr) + isIndexed := schema.State().IsIndexed(ctx, edge.Attr) + hasCount := schema.State().HasCount(ctx, edge.Attr) delEdge := &pb.DirectedEdge{ Attr: edge.Attr, Op: edge.Op, @@ -264,7 +259,7 @@ func (l *List) handleDeleteAll(ctx context.Context, edge *pb.DirectedEdge, Value: p.Value, } return txn.addIndexMutations(ctx, &indexMutationInfo{ - tokenizers: schema.State().Tokenizer(edge.Attr), + tokenizers: schema.State().Tokenizer(ctx, edge.Attr), edge: edge, val: val, op: pb.DirectedEdge_DEL, @@ -404,8 +399,7 @@ func (txn *Txn) addMutationHelper(ctx context.Context, l *List, doUpdateIndex bo // AddMutationWithIndex is addMutation with support for indexing. It also // supports reverse edges. -func (l *List) AddMutationWithIndex(ctx context.Context, edge *pb.DirectedEdge, - txn *Txn) error { +func (l *List) AddMutationWithIndex(ctx context.Context, edge *pb.DirectedEdge, txn *Txn) error { if len(edge.Attr) == 0 { return errors.Errorf("Predicate cannot be empty for edge with subject: [%v], object: [%v]"+ " and value: [%v]", edge.Entity, edge.ValueId, edge.Value) @@ -415,8 +409,8 @@ func (l *List) AddMutationWithIndex(ctx context.Context, edge *pb.DirectedEdge, return l.handleDeleteAll(ctx, edge, txn) } - doUpdateIndex := pstore != nil && schema.State().IsIndexed(edge.Attr) - hasCountIndex := schema.State().HasCount(edge.Attr) + doUpdateIndex := pstore != nil && schema.State().IsIndexed(ctx, edge.Attr) + hasCountIndex := schema.State().HasCount(ctx, edge.Attr) val, found, cp, err := txn.addMutationHelper(ctx, l, doUpdateIndex, hasCountIndex, edge) if err != nil { return err @@ -431,7 +425,7 @@ func (l *List) AddMutationWithIndex(ctx context.Context, edge *pb.DirectedEdge, // Exact matches. if found && val.Value != nil { if err := txn.addIndexMutations(ctx, &indexMutationInfo{ - tokenizers: schema.State().Tokenizer(edge.Attr), + tokenizers: schema.State().Tokenizer(ctx, edge.Attr), edge: edge, val: val, op: pb.DirectedEdge_DEL, @@ -445,7 +439,7 @@ func (l *List) AddMutationWithIndex(ctx context.Context, edge *pb.DirectedEdge, Value: edge.Value, } if err := txn.addIndexMutations(ctx, &indexMutationInfo{ - tokenizers: schema.State().Tokenizer(edge.Attr), + tokenizers: schema.State().Tokenizer(ctx, edge.Attr), edge: edge, val: val, op: pb.DirectedEdge_SET, @@ -456,7 +450,7 @@ func (l *List) AddMutationWithIndex(ctx context.Context, edge *pb.DirectedEdge, } // Add reverse mutation irrespective of hasMutated, server crash can happen after // mutation is synced and before reverse edge is synced - if (pstore != nil) && (edge.ValueId != 0) && schema.State().IsReversed(edge.Attr) { + if (pstore != nil) && (edge.ValueId != 0) && schema.State().IsReversed(ctx, edge.Attr) { if err := txn.addReverseAndCountMutation(ctx, edge); err != nil { return err } @@ -539,22 +533,14 @@ type rebuilder struct { } func (r *rebuilder) Run(ctx context.Context) error { - // All the temp indexes go into the following directory. We delete the whole - // directory after the indexing step is complete. This deletes any other temp - // indexes that may have been left around in case defer wasn't executed. - // TODO(Aman): If users are not happy, we could add a flag to choose this dir. - tmpParentDir := filepath.Join(os.TempDir(), "dgraph_index") - // We write the index in a temporary badger first and then, // merge entries before writing them to p directory. - if err := os.MkdirAll(tmpParentDir, os.ModePerm); err != nil { - return errors.Wrap(err, "error creating in temp dir for reindexing") - } - tmpIndexDir, err := ioutil.TempDir(tmpParentDir, "") + // TODO(Aman): If users are not happy, we could add a flag to choose this dir. + tmpIndexDir, err := ioutil.TempDir("", "dgraph_index_") if err != nil { return errors.Wrap(err, "error creating temp dir for reindexing") } - defer os.RemoveAll(tmpParentDir) + defer os.RemoveAll(tmpIndexDir) glog.V(1).Infof("Rebuilding indexes using the temp folder %s\n", tmpIndexDir) dbOpts := badger.DefaultOptions(tmpIndexDir). @@ -576,11 +562,11 @@ func (r *rebuilder) Run(ctx context.Context) error { "Rebuilding index for predicate %s: Starting process. StartTs=%d. Prefix=\n%s\n", r.attr, r.startTs, hex.Dump(r.prefix)) - // Counter is used here to ensure that all keys are commited at different timestamp. + // Counter is used here to ensure that all keys are committed at different timestamp. // We set it to 1 in case there are no keys found and NewStreamAt is called with ts=0. var counter uint64 = 1 - // Todo(Aman): Replace TxnWriter with WriteBatch. While we do that we should ensure that + // TODO(Aman): Replace TxnWriter with WriteBatch. While we do that we should ensure that // WriteBatch has a mechanism for throttling. Also, find other places where TxnWriter // could be replaced with WriteBatch in the code tmpWriter := NewTxnWriter(tmpDB) @@ -714,12 +700,65 @@ const ( indexRebuild = iota // Index should be deleted and rebuilt. ) -// Run rebuilds all indices that need it. -func (rb *IndexRebuild) Run(ctx context.Context) error { - if err := rebuildListType(ctx, rb); err != nil { +// GetQuerySchema returns the schema that can be served while indexes are getting built. +// Query schema is defined as current schema minus tokens to delete from current schema. +func (rb *IndexRebuild) GetQuerySchema() *pb.SchemaUpdate { + // Copy the current schema. + querySchema := *rb.CurrentSchema + info := rb.needsTokIndexRebuild() + + // Compute old.Tokenizer minus info.tokenizersToDelete. + interimTokenizers := make([]string, 0) + for _, t1 := range rb.OldSchema.Tokenizer { + found := false + for _, t2 := range info.tokenizersToDelete { + if t1 == t2 { + found = true + break + } + } + if !found { + interimTokenizers = append(interimTokenizers, t1) + } + } + querySchema.Tokenizer = interimTokenizers + + if rb.needsCountIndexRebuild() == indexRebuild { + querySchema.Count = false + } + if rb.needsReverseEdgesRebuild() == indexRebuild { + querySchema.Directive = pb.SchemaUpdate_NONE + } + return &querySchema +} + +// DropIndexes drops the indexes that need to be rebuilt. +func (rb *IndexRebuild) DropIndexes(ctx context.Context) error { + if err := dropTokIndexes(ctx, rb); err != nil { return err } - if err := rebuildIndex(ctx, rb); err != nil { + if err := dropReverseEdges(ctx, rb); err != nil { + return err + } + return dropCountIndex(ctx, rb) +} + +// BuildData updates data. +func (rb *IndexRebuild) BuildData(ctx context.Context) error { + return rebuildListType(ctx, rb) +} + +// NeedIndexRebuild returns true if any of the tokenizer, reverse +// or count indexes need to be rebuilt. +func (rb *IndexRebuild) NeedIndexRebuild() bool { + return rb.needsTokIndexRebuild().op == indexRebuild || + rb.needsReverseEdgesRebuild() == indexRebuild || + rb.needsCountIndexRebuild() == indexRebuild +} + +// BuildIndexes builds indexes. +func (rb *IndexRebuild) BuildIndexes(ctx context.Context) error { + if err := rebuildTokIndex(ctx, rb); err != nil { return err } if err := rebuildReverseEdges(ctx, rb); err != nil { @@ -734,7 +773,7 @@ type indexRebuildInfo struct { tokenizersToRebuild []string } -func (rb *IndexRebuild) needsIndexRebuild() indexRebuildInfo { +func (rb *IndexRebuild) needsTokIndexRebuild() indexRebuildInfo { x.AssertTruef(rb.CurrentSchema != nil, "Current schema cannot be nil.") // If the old schema is nil, we can treat it as an empty schema. Copy it @@ -802,12 +841,8 @@ func (rb *IndexRebuild) needsIndexRebuild() indexRebuildInfo { } } -// rebuildIndex rebuilds index for a given attribute. -// We commit mutations with startTs and ignore the errors. -func rebuildIndex(ctx context.Context, rb *IndexRebuild) error { - // Exit early if indices do not need to be rebuilt. - rebuildInfo := rb.needsIndexRebuild() - +func dropTokIndexes(ctx context.Context, rb *IndexRebuild) error { + rebuildInfo := rb.needsTokIndexRebuild() if rebuildInfo.op == indexNoop { return nil } @@ -826,17 +861,7 @@ func rebuildIndex(ctx context.Context, rb *IndexRebuild) error { } } - // Exit early if the index only need to be deleted and not rebuilt. - if rebuildInfo.op == indexDelete { - return nil - } - - // Exit early if there are no tokenizers to rebuild. - if len(rebuildInfo.tokenizersToRebuild) == 0 { - return nil - } - - glog.Infof("Rebuilding index for attr %s and tokenizers %s", rb.Attr, + glog.Infof("Deleting index for attr %s and tokenizers %s", rb.Attr, rebuildInfo.tokenizersToRebuild) // Before rebuilding, the existing index needs to be deleted. for _, tokenizer := range rebuildInfo.tokenizersToRebuild { @@ -851,6 +876,24 @@ func rebuildIndex(ctx context.Context, rb *IndexRebuild) error { } } + return nil +} + +// rebuildTokIndex rebuilds index for a given attribute. +// We commit mutations with startTs and ignore the errors. +func rebuildTokIndex(ctx context.Context, rb *IndexRebuild) error { + rebuildInfo := rb.needsTokIndexRebuild() + if rebuildInfo.op != indexRebuild { + return nil + } + + // Exit early if there are no tokenizers to rebuild. + if len(rebuildInfo.tokenizersToRebuild) == 0 { + return nil + } + + glog.Infof("Rebuilding index for attr %s and tokenizers %s", rb.Attr, + rebuildInfo.tokenizersToRebuild) tokenizers, err := tok.GetTokenizers(rebuildInfo.tokenizersToRebuild) if err != nil { return err @@ -912,8 +955,8 @@ func (rb *IndexRebuild) needsCountIndexRebuild() indexOp { return indexRebuild } -// rebuildCountIndex rebuilds the count index for a given attribute. -func rebuildCountIndex(ctx context.Context, rb *IndexRebuild) error { +func dropCountIndex(ctx context.Context, rb *IndexRebuild) error { + // Exit early if indices do not need to be rebuilt. op := rb.needsCountIndexRebuild() if op == indexNoop { return nil @@ -924,8 +967,13 @@ func rebuildCountIndex(ctx context.Context, rb *IndexRebuild) error { return err } - // Exit early if attribute is index only needed to be deleted. - if op == indexDelete { + return nil +} + +// rebuildCountIndex rebuilds the count index for a given attribute. +func rebuildCountIndex(ctx context.Context, rb *IndexRebuild) error { + op := rb.needsCountIndexRebuild() + if op != indexRebuild { return nil } @@ -996,20 +1044,20 @@ func (rb *IndexRebuild) needsReverseEdgesRebuild() indexOp { return indexDelete } -// rebuildReverseEdges rebuilds the reverse edges for a given attribute. -func rebuildReverseEdges(ctx context.Context, rb *IndexRebuild) error { +func dropReverseEdges(ctx context.Context, rb *IndexRebuild) error { op := rb.needsReverseEdgesRebuild() if op == indexNoop { return nil } glog.Infof("Deleting reverse index for %s", rb.Attr) - if err := deleteReverseEdges(rb.Attr); err != nil { - return err - } + return deleteReverseEdges(rb.Attr) +} - // Exit early if index only needed to be deleted. - if op == indexDelete { +// rebuildReverseEdges rebuilds the reverse edges for a given attribute. +func rebuildReverseEdges(ctx context.Context, rb *IndexRebuild) error { + op := rb.needsReverseEdgesRebuild() + if op != indexRebuild { return nil } diff --git a/posting/index_test.go b/posting/index_test.go index dad66494484..3ee12949a78 100644 --- a/posting/index_test.go +++ b/posting/index_test.go @@ -40,8 +40,8 @@ func uids(l *List, readTs uint64) []uint64 { // indexTokensForTest is just a wrapper around indexTokens used for convenience. func indexTokensForTest(attr, lang string, val types.Val) ([]string, error) { - return indexTokens(&indexMutationInfo{ - tokenizers: schema.State().Tokenizer(attr), + return indexTokens(context.Background(), &indexMutationInfo{ + tokenizers: schema.State().Tokenizer(context.Background(), attr), edge: &pb.DirectedEdge{ Attr: attr, Lang: lang, @@ -258,19 +258,20 @@ func addEdgeToUID(t *testing.T, attr string, src uint64, addMutation(t, l, edge, Set, startTs, commitTs, false) } -func TestRebuildIndex(t *testing.T) { +func TestRebuildTokIndex(t *testing.T) { addEdgeToValue(t, "name2", 91, "Michonne", uint64(1), uint64(2)) addEdgeToValue(t, "name2", 92, "David", uint64(3), uint64(4)) require.NoError(t, schema.ParseBytes([]byte(schemaVal), 1)) - currentSchema, _ := schema.State().Get("name2") + currentSchema, _ := schema.State().Get(context.Background(), "name2") rb := IndexRebuild{ Attr: "name2", StartTs: 5, OldSchema: nil, CurrentSchema: ¤tSchema, } - require.NoError(t, rebuildIndex(context.Background(), &rb)) + require.NoError(t, dropTokIndexes(context.Background(), &rb)) + require.NoError(t, rebuildTokIndex(context.Background(), &rb)) // Check index entries in data store. txn := ps.NewTransactionAt(6, false) @@ -308,30 +309,32 @@ func TestRebuildIndex(t *testing.T) { require.EqualValues(t, 91, uids2[0]) } -func TestRebuildIndexWithDeletion(t *testing.T) { +func TestRebuildTokIndexWithDeletion(t *testing.T) { addEdgeToValue(t, "name2", 91, "Michonne", uint64(1), uint64(2)) addEdgeToValue(t, "name2", 92, "David", uint64(3), uint64(4)) require.NoError(t, schema.ParseBytes([]byte(schemaVal), 1)) - currentSchema, _ := schema.State().Get("name2") + currentSchema, _ := schema.State().Get(context.Background(), "name2") rb := IndexRebuild{ Attr: "name2", StartTs: 5, OldSchema: nil, CurrentSchema: ¤tSchema, } - require.NoError(t, rebuildIndex(context.Background(), &rb)) + require.NoError(t, dropTokIndexes(context.Background(), &rb)) + require.NoError(t, rebuildTokIndex(context.Background(), &rb)) // Mutate the schema (the index in name2 is deleted) and rebuild the index. require.NoError(t, schema.ParseBytes([]byte(mutatedSchemaVal), 1)) - newSchema, _ := schema.State().Get("name2") + newSchema, _ := schema.State().Get(context.Background(), "name2") rb = IndexRebuild{ Attr: "name2", StartTs: 6, OldSchema: ¤tSchema, CurrentSchema: &newSchema, } - require.NoError(t, rebuildIndex(context.Background(), &rb)) + require.NoError(t, dropTokIndexes(context.Background(), &rb)) + require.NoError(t, rebuildTokIndex(context.Background(), &rb)) // Check index entries in data store. txn := ps.NewTransactionAt(7, false) @@ -368,7 +371,7 @@ func TestRebuildReverseEdges(t *testing.T) { addEdgeToUID(t, "friend", 2, 23, uint64(14), uint64(15)) require.NoError(t, schema.ParseBytes([]byte(schemaVal), 1)) - currentSchema, _ := schema.State().Get("friend") + currentSchema, _ := schema.State().Get(context.Background(), "friend") rb := IndexRebuild{ Attr: "friend", StartTs: 16, @@ -416,17 +419,17 @@ func TestRebuildReverseEdges(t *testing.T) { require.EqualValues(t, 1, uids1[0]) } -func TestNeedsIndexRebuild(t *testing.T) { +func TestNeedsTokIndexRebuild(t *testing.T) { rb := IndexRebuild{} rb.OldSchema = &pb.SchemaUpdate{ValueType: pb.Posting_UID} rb.CurrentSchema = &pb.SchemaUpdate{ValueType: pb.Posting_UID} - rebuildInfo := rb.needsIndexRebuild() + rebuildInfo := rb.needsTokIndexRebuild() require.Equal(t, indexOp(indexNoop), rebuildInfo.op) require.Equal(t, []string(nil), rebuildInfo.tokenizersToDelete) require.Equal(t, []string(nil), rebuildInfo.tokenizersToRebuild) rb.OldSchema = nil - rebuildInfo = rb.needsIndexRebuild() + rebuildInfo = rb.needsTokIndexRebuild() require.Equal(t, indexOp(indexNoop), rebuildInfo.op) require.Equal(t, []string(nil), rebuildInfo.tokenizersToDelete) require.Equal(t, []string(nil), rebuildInfo.tokenizersToRebuild) @@ -436,7 +439,7 @@ func TestNeedsIndexRebuild(t *testing.T) { rb.CurrentSchema = &pb.SchemaUpdate{ValueType: pb.Posting_STRING, Directive: pb.SchemaUpdate_INDEX, Tokenizer: []string{"exact"}} - rebuildInfo = rb.needsIndexRebuild() + rebuildInfo = rb.needsTokIndexRebuild() require.Equal(t, indexOp(indexNoop), rebuildInfo.op) require.Equal(t, []string(nil), rebuildInfo.tokenizersToDelete) require.Equal(t, []string(nil), rebuildInfo.tokenizersToRebuild) @@ -445,7 +448,7 @@ func TestNeedsIndexRebuild(t *testing.T) { Tokenizer: []string{"term"}} rb.CurrentSchema = &pb.SchemaUpdate{ValueType: pb.Posting_STRING, Directive: pb.SchemaUpdate_INDEX} - rebuildInfo = rb.needsIndexRebuild() + rebuildInfo = rb.needsTokIndexRebuild() require.Equal(t, indexOp(indexRebuild), rebuildInfo.op) require.Equal(t, []string{"term"}, rebuildInfo.tokenizersToDelete) require.Equal(t, []string(nil), rebuildInfo.tokenizersToRebuild) @@ -455,7 +458,7 @@ func TestNeedsIndexRebuild(t *testing.T) { rb.CurrentSchema = &pb.SchemaUpdate{ValueType: pb.Posting_FLOAT, Directive: pb.SchemaUpdate_INDEX, Tokenizer: []string{"exact"}} - rebuildInfo = rb.needsIndexRebuild() + rebuildInfo = rb.needsTokIndexRebuild() require.Equal(t, indexOp(indexRebuild), rebuildInfo.op) require.Equal(t, []string{"exact"}, rebuildInfo.tokenizersToDelete) require.Equal(t, []string{"exact"}, rebuildInfo.tokenizersToRebuild) @@ -464,7 +467,7 @@ func TestNeedsIndexRebuild(t *testing.T) { Tokenizer: []string{"exact"}} rb.CurrentSchema = &pb.SchemaUpdate{ValueType: pb.Posting_FLOAT, Directive: pb.SchemaUpdate_NONE} - rebuildInfo = rb.needsIndexRebuild() + rebuildInfo = rb.needsTokIndexRebuild() require.Equal(t, indexOp(indexDelete), rebuildInfo.op) require.Equal(t, []string{"exact"}, rebuildInfo.tokenizersToDelete) require.Equal(t, []string(nil), rebuildInfo.tokenizersToRebuild) diff --git a/posting/list.go b/posting/list.go index 303879063ec..aba084f6112 100644 --- a/posting/list.go +++ b/posting/list.go @@ -25,6 +25,7 @@ import ( "sort" "github.com/dgryski/go-farm" + "github.com/pkg/errors" bpb "github.com/dgraph-io/badger/v2/pb" "github.com/dgraph-io/dgraph/algo" @@ -35,7 +36,6 @@ import ( "github.com/dgraph-io/dgraph/types" "github.com/dgraph-io/dgraph/types/facets" "github.com/dgraph-io/dgraph/x" - "github.com/pkg/errors" ) var ( @@ -354,7 +354,7 @@ func TypeID(edge *pb.DirectedEdge) types.TypeID { func fingerprintEdge(t *pb.DirectedEdge) uint64 { // There could be a collision if the user gives us a value with Lang = "en" and later gives - // us a value = "en" for the same predicate. We would end up overwritting his older lang + // us a value = "en" for the same predicate. We would end up overwriting his older lang // value. // All edges with a value without LANGTAG, have the same UID. In other words, diff --git a/query/common_test.go b/query/common_test.go index a3892a5c9ed..e7b30f30c5a 100644 --- a/query/common_test.go +++ b/query/common_test.go @@ -35,6 +35,10 @@ func setSchema(schema string) { if err != nil { panic(fmt.Sprintf("Could not alter schema. Got error %v", err.Error())) } + + if err := testutil.WaitForAlter(context.Background(), client, schema); err != nil { + panic(err) + } } func dropPredicate(pred string) { diff --git a/query/query0_test.go b/query/query0_test.go index 2b3144f316b..7dcfb7389b7 100644 --- a/query/query0_test.go +++ b/query/query0_test.go @@ -2246,7 +2246,6 @@ func TestFilterUsingLenFunction(t *testing.T) { } for _, tc := range tests { - t.Log("Running: ", tc.name) js := processQueryNoErr(t, tc.in) require.JSONEq(t, tc.out, js) } diff --git a/schema/parse.go b/schema/parse.go index d41c3cbda73..4e2d39865d9 100644 --- a/schema/parse.go +++ b/schema/parse.go @@ -24,6 +24,7 @@ import ( "github.com/dgraph-io/dgraph/tok" "github.com/dgraph-io/dgraph/types" "github.com/dgraph-io/dgraph/x" + "github.com/golang/glog" "github.com/pkg/errors" ) diff --git a/schema/parse_test.go b/schema/parse_test.go index fdb651c80de..33847ab9b4f 100644 --- a/schema/parse_test.go +++ b/schema/parse_test.go @@ -17,13 +17,14 @@ package schema import ( + "context" "io/ioutil" "os" "testing" - "github.com/dgraph-io/badger/v2" "github.com/stretchr/testify/require" + "github.com/dgraph-io/badger/v2" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/types" "github.com/dgraph-io/dgraph/x" @@ -115,11 +116,6 @@ age:int @index(int) . name: string . address: string @index(term) .` -func TestSchemaIndex(t *testing.T) { - require.NoError(t, ParseBytes([]byte(schemaIndexVal1), 1)) - require.Equal(t, 2, len(State().IndexedFields())) -} - var schemaIndexVal2 = ` name: string @index(exact, exact) . address: string @index(term) . @@ -196,10 +192,9 @@ func TestSchemaIndexCustom(t *testing.T) { List: true, }}, }) - require.True(t, State().IsIndexed("name")) - require.False(t, State().IsReversed("name")) - require.Equal(t, "int", State().Tokenizer("age")[0].Name()) - require.Equal(t, 3, len(State().IndexedFields())) + require.True(t, State().IsIndexed(context.Background(), "name")) + require.False(t, State().IsReversed(context.Background(), "name")) + require.Equal(t, "int", State().Tokenizer(context.Background(), "age")[0].Name()) } func TestParse(t *testing.T) { diff --git a/schema/schema.go b/schema/schema.go index 5d8241bed38..05b306361ea 100644 --- a/schema/schema.go +++ b/schema/schema.go @@ -18,20 +18,21 @@ package schema import ( "bytes" + "context" "encoding/hex" "fmt" "sync" - "github.com/dgraph-io/badger/v2" "github.com/golang/glog" "github.com/golang/protobuf/proto" + "github.com/pkg/errors" "golang.org/x/net/trace" + "github.com/dgraph-io/badger/v2" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/tok" "github.com/dgraph-io/dgraph/types" "github.com/dgraph-io/dgraph/x" - "github.com/pkg/errors" ) var ( @@ -39,10 +40,27 @@ var ( pstore *badger.DB ) +// We maintain two schemas for a predicate if a background task is building indexes +// for that predicate. Now, we need to use the new schema for mutations whereas +// a query schema for queries. While calling functions in this package, we need +// to set the context correctly as to which schema should be returned. +// Query schema is defined as (old schema - tokenizers to drop based on new schema). +type contextKey int + +const ( + isWrite contextKey = iota +) + +// GetWriteContext returns a context that sets the schema context for writing. +func GetWriteContext(ctx context.Context) context.Context { + return context.WithValue(ctx, isWrite, true) +} + func (s *state) init() { s.predicate = make(map[string]*pb.SchemaUpdate) s.types = make(map[string]*pb.TypeUpdate) s.elog = trace.NewEventLog("Dgraph", "Schema") + s.mutSchema = make(map[string]*pb.SchemaUpdate) } type state struct { @@ -51,6 +69,8 @@ type state struct { predicate map[string]*pb.SchemaUpdate types map[string]*pb.TypeUpdate elog trace.EventLog + // mutSchema holds the schema update that is being applied in the background. + mutSchema map[string]*pb.SchemaUpdate } // State returns the struct holding the current schema. @@ -69,6 +89,10 @@ func (s *state) DeleteAll() { for typ := range s.types { delete(s.types, typ) } + + for pred := range s.mutSchema { + delete(s.mutSchema, pred) + } } // Delete updates the schema in memory and disk @@ -87,6 +111,7 @@ func (s *state) Delete(attr string) error { } delete(s.predicate, attr) + delete(s.mutSchema, attr) return nil } @@ -139,6 +164,20 @@ func (s *state) Set(pred string, schema *pb.SchemaUpdate) { s.elog.Printf(logUpdate(schema, pred)) } +// SetMutSchema sets the mutation schema for the given predicate. +func (s *state) SetMutSchema(pred string, schema *pb.SchemaUpdate) { + s.Lock() + defer s.Unlock() + s.mutSchema[pred] = schema +} + +// DeleteMutSchema deletes the schema for given predicate from mutSchema. +func (s *state) DeleteMutSchema(pred string) { + s.Lock() + defer s.Unlock() + delete(s.mutSchema, pred) +} + // SetType sets the type for the given predicate in memory. // schema mutations must flow through the update function, which are synced to the db. func (s *state) SetType(typeName string, typ pb.TypeUpdate) { @@ -149,11 +188,20 @@ func (s *state) SetType(typeName string, typ pb.TypeUpdate) { } // Get gets the schema for the given predicate. -func (s *state) Get(pred string) (pb.SchemaUpdate, bool) { +func (s *state) Get(ctx context.Context, pred string) (pb.SchemaUpdate, bool) { + isWrite, _ := ctx.Value(isWrite).(bool) s.RLock() defer s.RUnlock() - schema, has := s.predicate[pred] - if !has { + // If this is write context, mutSchema will have the updated schema. + // If mutSchema doesn't have the predicate key, we use the schema from s.predicate. + if isWrite { + if schema, ok := s.mutSchema[pred]; ok { + return *schema, true + } + } + + schema, ok := s.predicate[pred] + if !ok { return pb.SchemaUpdate{}, false } return *schema, true @@ -181,26 +229,22 @@ func (s *state) TypeOf(pred string) (types.TypeID, error) { } // IsIndexed returns whether the predicate is indexed or not -func (s *state) IsIndexed(pred string) bool { +func (s *state) IsIndexed(ctx context.Context, pred string) bool { + isWrite, _ := ctx.Value(isWrite).(bool) s.RLock() defer s.RUnlock() + if isWrite { + // TODO(Aman): we could return the query schema if it is a delete. + if schema, ok := s.mutSchema[pred]; ok && len(schema.Tokenizer) > 0 { + return true + } + } + if schema, ok := s.predicate[pred]; ok { return len(schema.Tokenizer) > 0 } - return false -} -// IndexedFields returns the list of indexed fields -func (s *state) IndexedFields() []string { - s.RLock() - defer s.RUnlock() - var out []string - for k, v := range s.predicate { - if len(v.Tokenizer) > 0 { - out = append(out, k) - } - } - return out + return false } // Predicates returns the list of predicates for given group @@ -226,13 +270,24 @@ func (s *state) Types() []string { } // Tokenizer returns the tokenizer for given predicate -func (s *state) Tokenizer(pred string) []tok.Tokenizer { +func (s *state) Tokenizer(ctx context.Context, pred string) []tok.Tokenizer { + isWrite, _ := ctx.Value(isWrite).(bool) s.RLock() defer s.RUnlock() - schema, ok := s.predicate[pred] - x.AssertTruef(ok, "schema state not found for %s", pred) - var tokenizers []tok.Tokenizer - for _, it := range schema.Tokenizer { + var su *pb.SchemaUpdate + if isWrite { + if schema, ok := s.mutSchema[pred]; ok { + su = schema + } + } + if su == nil { + if schema, ok := s.predicate[pred]; ok { + su = schema + } + } + x.AssertTruef(su != nil, "schema state not found for %s", pred) + tokenizers := make([]tok.Tokenizer, 0, len(su.Tokenizer)) + for _, it := range su.Tokenizer { t, found := tok.GetTokenizer(it) x.AssertTruef(found, "Invalid tokenizer %s", it) tokenizers = append(tokenizers, t) @@ -241,9 +296,9 @@ func (s *state) Tokenizer(pred string) []tok.Tokenizer { } // TokenizerNames returns the tokenizer names for given predicate -func (s *state) TokenizerNames(pred string) []string { +func (s *state) TokenizerNames(ctx context.Context, pred string) []string { var names []string - tokenizers := s.Tokenizer(pred) + tokenizers := s.Tokenizer(ctx, pred) for _, t := range tokenizers { names = append(names, t.Name()) } @@ -252,8 +307,8 @@ func (s *state) TokenizerNames(pred string) []string { // HasTokenizer is a convenience func that checks if a given tokenizer is found in pred. // Returns true if found, else false. -func (s *state) HasTokenizer(id byte, pred string) bool { - for _, t := range s.Tokenizer(pred) { +func (s *state) HasTokenizer(ctx context.Context, id byte, pred string) bool { + for _, t := range s.Tokenizer(ctx, pred) { if t.Identifier() == id { return true } @@ -262,9 +317,15 @@ func (s *state) HasTokenizer(id byte, pred string) bool { } // IsReversed returns whether the predicate has reverse edge or not -func (s *state) IsReversed(pred string) bool { +func (s *state) IsReversed(ctx context.Context, pred string) bool { + isWrite, _ := ctx.Value(isWrite).(bool) s.RLock() defer s.RUnlock() + if isWrite { + if schema, ok := s.mutSchema[pred]; ok && schema.Directive == pb.SchemaUpdate_REVERSE { + return true + } + } if schema, ok := s.predicate[pred]; ok { return schema.Directive == pb.SchemaUpdate_REVERSE } @@ -272,9 +333,15 @@ func (s *state) IsReversed(pred string) bool { } // HasCount returns whether we want to mantain a count index for the given predicate or not. -func (s *state) HasCount(pred string) bool { +func (s *state) HasCount(ctx context.Context, pred string) bool { + isWrite, _ := ctx.Value(isWrite).(bool) s.RLock() defer s.RUnlock() + if isWrite { + if schema, ok := s.mutSchema[pred]; ok && schema.Count { + return true + } + } if schema, ok := s.predicate[pred]; ok { return schema.Count } @@ -315,6 +382,13 @@ func (s *state) HasNoConflict(pred string) bool { return s.predicate[pred].GetNoConflict() } +// IndexingInProgress checks whether indexing is going on for a given predicate. +func (s *state) IndexingInProgress() bool { + s.RLock() + defer s.RUnlock() + return len(s.mutSchema) > 0 +} + // Init resets the schema state, setting the underlying DB to the given pointer. func Init(ps *badger.DB) { pstore = ps @@ -346,6 +420,7 @@ func Load(predicate string) error { } State().Set(predicate, &s) State().elog.Printf(logUpdate(&s, predicate)) + delete(State().mutSchema, predicate) glog.Infoln(logUpdate(&s, predicate)) return nil } diff --git a/systest/1million/1million_test.go b/systest/1million/1million_test.go index b1942107268..bab0f003915 100644 --- a/systest/1million/1million_test.go +++ b/systest/1million/1million_test.go @@ -20,7 +20,8 @@ package main import ( "context" - "log" + "io/ioutil" + "os" "testing" "time" @@ -9265,7 +9266,16 @@ var tc = []struct { func Test1Million(t *testing.T) { dg, err := testutil.DgraphClient(testutil.SockAddr) if err != nil { - log.Fatalf("Error while getting a dgraph client: %v", err) + t.Fatalf("Error while getting a dgraph client: %v", err) + } + + schemaFile := os.Getenv("SCHEMA_FILE") + data, err := ioutil.ReadFile(schemaFile) + if err != nil { + t.Fatalf("Error in reading the schema: %v", err) + } + if err := testutil.WaitForAlter(context.Background(), dg, string(data)); err != nil { + t.Fatalf("Error in waiting for alter to complete: %v", err) } for _, tt := range tc { diff --git a/systest/1million/test-reindex.sh b/systest/1million/test-reindex.sh index 775ea75cfef..6bcda81dfd9 100755 --- a/systest/1million/test-reindex.sh +++ b/systest/1million/test-reindex.sh @@ -68,6 +68,7 @@ if [[ ! -z "$TEAMCITY_VERSION" ]]; then fi Info "running regression queries" +export SCHEMA_FILE go test -v -tags systest || FOUND_DIFFS=1 Info "bringing down zero and alpha and data volumes" diff --git a/systest/bgindex/common_test.go b/systest/bgindex/common_test.go new file mode 100644 index 00000000000..eb678e8179f --- /dev/null +++ b/systest/bgindex/common_test.go @@ -0,0 +1,55 @@ +// +build systest + +/* + * Copyright 2020 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/dgraph-io/dgo/v2" +) + +func printStats(counter *uint64, quit <-chan struct{}, wg *sync.WaitGroup) { + defer wg.Done() + for { + select { + case <-quit: + return + case <-time.After(2 * time.Second): + } + + fmt.Println("mutations:", atomic.LoadUint64(counter)) + } +} + +// blocks until query returns no error. +func checkSchemaUpdate(query string, dg *dgo.Dgraph) { + for { + time.Sleep(2 * time.Second) + _, err := dg.NewReadOnlyTxn().Query(context.Background(), query) + if err != nil { + continue + } + + return + } +} diff --git a/systest/bgindex/count_test.go b/systest/bgindex/count_test.go new file mode 100644 index 00000000000..dc7a318b26d --- /dev/null +++ b/systest/bgindex/count_test.go @@ -0,0 +1,284 @@ +// +build systest + +/* + * Copyright 2020 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "math/rand" + "sort" + "strconv" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/dgraph-io/badger/v2/y" + "github.com/dgraph-io/dgo/v2" + "github.com/dgraph-io/dgo/v2/protos/api" + "github.com/dgraph-io/dgraph/testutil" +) + +func TestCountIndex(t *testing.T) { + total := 10000 + numUIDs := uint64(total) + edgeCount := make([]int, total+100000) + uidLocks := make([]sync.Mutex, total+100000) + + dg, err := testutil.DgraphClientWithGroot(testutil.SockAddr) + if err != nil { + t.Fatalf("Error while getting a dgraph client: %v", err) + } + + testutil.DropAll(t, dg) + if err := dg.Alter(context.Background(), &api.Operation{ + Schema: "value: [string] .", + }); err != nil { + t.Fatalf("error in setting up schema :: %v\n", err) + } + + if err := testutil.AssignUids(uint64(total * 10)); err != nil { + t.Fatalf("error in assigning UIDs :: %v", err) + } + + fmt.Println("inserting values") + th := y.NewThrottle(10000) + for i := 1; i <= int(numUIDs); i++ { + th.Do() + go func(uid int) { + defer th.Done(nil) + bb := &bytes.Buffer{} + edgeCount[uid] = rand.Intn(1000) + for j := 0; j < edgeCount[uid]; j++ { + _, err := bb.WriteString(fmt.Sprintf("<%v> \"%v\" .\n", uid, j)) + if err != nil { + panic(err) + } + } + if err := testutil.RetryMutation(dg, &api.Mutation{ + CommitNow: true, + SetNquads: bb.Bytes(), + }); err != nil { + t.Fatalf("error in mutation :: %v", err) + } + }(i) + } + th.Finish() + + fmt.Println("building indexes in background") + if err := dg.Alter(context.Background(), &api.Operation{ + Schema: "value: [string] @count .", + }); err != nil { + t.Fatalf("error in adding indexes :: %v\n", err) + } + + // perform mutations until ctrl+c + mutateUID := func(uid int) { + uidLocks[uid].Lock() + defer uidLocks[uid].Unlock() + ec := edgeCount[uid] + switch rand.Intn(1000) % 3 { + case 0: + // add new edge + if _, err := dg.NewTxn().Mutate(context.Background(), &api.Mutation{ + CommitNow: true, + SetNquads: []byte(fmt.Sprintf(`<%v> "%v" .`, uid, ec)), + }); err != nil && !errors.Is(err, dgo.ErrAborted) { + t.Fatalf("error in mutation :: %v\n", err) + } else if errors.Is(err, dgo.ErrAborted) { + return + } + ec++ + case 1: + if ec <= 0 { + return + } + // delete an edge + if _, err := dg.NewTxn().Mutate(context.Background(), &api.Mutation{ + CommitNow: true, + DelNquads: []byte(fmt.Sprintf(`<%v> "%v" .`, uid, ec-1)), + }); err != nil && !errors.Is(err, dgo.ErrAborted) { + t.Fatalf("error in deletion :: %v\n", err) + } else if errors.Is(err, dgo.ErrAborted) { + return + } + ec-- + case 2: + // new uid with one edge + uid = int(atomic.AddUint64(&numUIDs, 1)) + if _, err := dg.NewTxn().Mutate(context.Background(), &api.Mutation{ + CommitNow: true, + SetNquads: []byte(fmt.Sprintf(`<%v> "0" .`, uid)), + }); err != nil && !errors.Is(err, dgo.ErrAborted) { + t.Fatalf("error in insertion :: %v\n", err) + } else if errors.Is(err, dgo.ErrAborted) { + return + } + ec = 1 + } + + edgeCount[uid] = ec + } + + // perform mutations until ctrl+c + var swg sync.WaitGroup + var counter uint64 + quit := make(chan struct{}) + runLoop := func() { + defer swg.Done() + for { + select { + case <-quit: + return + default: + n := int(atomic.LoadUint64(&numUIDs)) + mutateUID(rand.Intn(n) + 1) + atomic.AddUint64(&counter, 1) + } + } + } + + swg.Add(101) + for i := 0; i < 100; i++ { + go runLoop() + } + go printStats(&counter, quit, &swg) + checkSchemaUpdate(`{ q(func: eq(count(value), "3")) {uid}}`, dg) + close(quit) + swg.Wait() + fmt.Println("mutations done") + + // compute count index + countIndex := make(map[int][]int) + for uid := 1; uid <= int(numUIDs); uid++ { + val := edgeCount[uid] + countIndex[val] = append(countIndex[val], uid) + } + for _, aa := range countIndex { + sort.Ints(aa) + } + + checkDelete := func(uid int) error { + q := fmt.Sprintf(`{ q(func: uid(%v)) {value:count(value)}}`, uid) + resp, err := dg.NewReadOnlyTxn().Query(context.Background(), q) + if err != nil { + return fmt.Errorf("error in query: %v :: %w", q, err) + } + var data struct { + Q []struct { + Count int + } + } + if err := json.Unmarshal(resp.Json, &data); err != nil { + return fmt.Errorf("error in json.Unmarshal :: %w", err) + } + + if len(data.Q) != 1 && data.Q[0].Count != 0 { + return fmt.Errorf("found a deleted UID, %v", uid) + } + return nil + } + + checkValue := func(b int, uids []int) error { + q := fmt.Sprintf(`{ q(func: eq(count(value), "%v")) {uid}}`, b) + resp, err := dg.NewReadOnlyTxn().Query(context.Background(), q) + if err != nil { + return fmt.Errorf("error in query: %v :: %w", q, err) + } + var data struct { + Q []struct { + UID string + } + } + if err := json.Unmarshal(resp.Json, &data); err != nil { + return fmt.Errorf("error in json.Unmarshal :: %w", err) + } + + actual := make([]int, len(data.Q)) + for i, ui := range data.Q { + v, err := strconv.ParseInt(ui.UID, 0, 64) + if err != nil { + return err + } + actual[i] = int(v) + } + sort.Ints(actual) + + if len(actual) != len(uids) { + return fmt.Errorf("length not equal :: exp: %v, actual %v", uids, actual) + } + for i := range uids { + if uids[i] != actual[i] { + return fmt.Errorf("value not equal :: exp: %v, actual %v", uids, actual) + } + } + + return nil + } + + type pair struct { + key int + err string + } + ch := make(chan pair, numUIDs) + + fmt.Println("starting to query") + var count uint64 + th = y.NewThrottle(50000) + th.Do() + go func() { + defer th.Done(nil) + for { + time.Sleep(2 * time.Second) + cur := atomic.LoadUint64(&count) + fmt.Printf("%v/%v done\n", cur, len(countIndex)) + if int(cur) == len(countIndex) { + break + } + } + }() + + for value, uids := range countIndex { + th.Do() + go func(val int, uidList []int) { + defer th.Done(nil) + if val <= 0 { + for _, uid := range uidList { + if err := checkDelete(uid); err != nil { + ch <- pair{uid, err.Error()} + } + } + } else { + if err := checkValue(val, uidList); err != nil { + ch <- pair{val, err.Error()} + } + } + atomic.AddUint64(&count, 1) + }(value, uids) + } + th.Finish() + + close(ch) + for p := range ch { + t.Errorf("failed for %v, :: %v\n", p.key, p.err) + } +} diff --git a/systest/bgindex/docker-compose.yml b/systest/bgindex/docker-compose.yml new file mode 100644 index 00000000000..dad68cdba79 --- /dev/null +++ b/systest/bgindex/docker-compose.yml @@ -0,0 +1,193 @@ +# Auto-generated with: [/tmp/go-build352939306/b001/exe/compose -l -a 6 -r 3 -z 3 -o 100 --acl_secret ../../ee/acl/hmac-secret] +# +version: "3.5" +services: + alpha1: + image: dgraph/dgraph:latest + container_name: alpha1 + working_dir: /data/alpha1 + labels: + cluster: test + ports: + - 8180:8180 + - 9180:9180 + volumes: + - type: bind + source: $GOPATH/bin + target: /gobin + read_only: true + - type: bind + source: ../../ee/acl/hmac-secret + target: /secret/hmac + read_only: true + command: /gobin/dgraph alpha -o 100 --my=alpha1:7180 --lru_mb=1024 --zero=zero1:5180 + --logtostderr -v=2 --idx=1 --acl_secret_file=/secret/hmac --acl_access_ttl 300s + --acl_cache_ttl 500s + alpha2: + image: dgraph/dgraph:latest + container_name: alpha2 + working_dir: /data/alpha2 + depends_on: + - alpha1 + labels: + cluster: test + ports: + - 8182:8182 + - 9182:9182 + volumes: + - type: bind + source: $GOPATH/bin + target: /gobin + read_only: true + - type: bind + source: ../../ee/acl/hmac-secret + target: /secret/hmac + read_only: true + command: /gobin/dgraph alpha -o 102 --my=alpha2:7182 --lru_mb=1024 --zero=zero1:5180 + --logtostderr -v=2 --idx=2 --acl_secret_file=/secret/hmac --acl_access_ttl 300s + --acl_cache_ttl 500s + alpha3: + image: dgraph/dgraph:latest + container_name: alpha3 + working_dir: /data/alpha3 + depends_on: + - alpha2 + labels: + cluster: test + ports: + - 8183:8183 + - 9183:9183 + volumes: + - type: bind + source: $GOPATH/bin + target: /gobin + read_only: true + - type: bind + source: ../../ee/acl/hmac-secret + target: /secret/hmac + read_only: true + command: /gobin/dgraph alpha -o 103 --my=alpha3:7183 --lru_mb=1024 --zero=zero1:5180 + --logtostderr -v=2 --idx=3 --acl_secret_file=/secret/hmac --acl_access_ttl 300s + --acl_cache_ttl 500s + alpha4: + image: dgraph/dgraph:latest + container_name: alpha4 + working_dir: /data/alpha4 + depends_on: + - alpha3 + labels: + cluster: test + ports: + - 8184:8184 + - 9184:9184 + volumes: + - type: bind + source: $GOPATH/bin + target: /gobin + read_only: true + - type: bind + source: ../../ee/acl/hmac-secret + target: /secret/hmac + read_only: true + command: /gobin/dgraph alpha -o 104 --my=alpha4:7184 --lru_mb=1024 --zero=zero1:5180 + --logtostderr -v=2 --idx=4 --acl_secret_file=/secret/hmac --acl_access_ttl 300s + --acl_cache_ttl 500s + alpha5: + image: dgraph/dgraph:latest + container_name: alpha5 + working_dir: /data/alpha5 + depends_on: + - alpha4 + labels: + cluster: test + ports: + - 8185:8185 + - 9185:9185 + volumes: + - type: bind + source: $GOPATH/bin + target: /gobin + read_only: true + - type: bind + source: ../../ee/acl/hmac-secret + target: /secret/hmac + read_only: true + command: /gobin/dgraph alpha -o 105 --my=alpha5:7185 --lru_mb=1024 --zero=zero1:5180 + --logtostderr -v=2 --idx=5 --acl_secret_file=/secret/hmac --acl_access_ttl 300s + --acl_cache_ttl 500s + alpha6: + image: dgraph/dgraph:latest + container_name: alpha6 + working_dir: /data/alpha6 + depends_on: + - alpha5 + labels: + cluster: test + ports: + - 8186:8186 + - 9186:9186 + volumes: + - type: bind + source: $GOPATH/bin + target: /gobin + read_only: true + - type: bind + source: ../../ee/acl/hmac-secret + target: /secret/hmac + read_only: true + command: /gobin/dgraph alpha -o 106 --my=alpha6:7186 --lru_mb=1024 --zero=zero1:5180 + --logtostderr -v=2 --idx=6 --acl_secret_file=/secret/hmac --acl_access_ttl 300s + --acl_cache_ttl 500s + zero1: + image: dgraph/dgraph:latest + container_name: zero1 + working_dir: /data/zero1 + labels: + cluster: test + ports: + - 5180:5180 + - 6180:6180 + volumes: + - type: bind + source: $GOPATH/bin + target: /gobin + read_only: true + command: /gobin/dgraph zero -o 100 --idx=1 --my=zero1:5180 --replicas=3 --logtostderr + -v=2 --bindall + zero2: + image: dgraph/dgraph:latest + container_name: zero2 + working_dir: /data/zero2 + depends_on: + - zero1 + labels: + cluster: test + ports: + - 5182:5182 + - 6182:6182 + volumes: + - type: bind + source: $GOPATH/bin + target: /gobin + read_only: true + command: /gobin/dgraph zero -o 102 --idx=2 --my=zero2:5182 --replicas=3 --logtostderr + -v=2 --peer=zero1:5180 + zero3: + image: dgraph/dgraph:latest + container_name: zero3 + working_dir: /data/zero3 + depends_on: + - zero2 + labels: + cluster: test + ports: + - 5183:5183 + - 6183:6183 + volumes: + - type: bind + source: $GOPATH/bin + target: /gobin + read_only: true + command: /gobin/dgraph zero -o 103 --idx=3 --my=zero3:5183 --replicas=3 --logtostderr + -v=2 --peer=zero1:5180 +volumes: {} diff --git a/systest/bgindex/parallel_test.go b/systest/bgindex/parallel_test.go new file mode 100644 index 00000000000..ec8e09b56a6 --- /dev/null +++ b/systest/bgindex/parallel_test.go @@ -0,0 +1,203 @@ +// +build systest + +/* + * Copyright 2020 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "strconv" + "strings" + "sync/atomic" + "testing" + "time" + + "github.com/dgraph-io/badger/v2/y" + "github.com/dgraph-io/dgo/v2" + "github.com/dgraph-io/dgo/v2/protos/api" + "github.com/dgraph-io/dgraph/testutil" +) + +var ( + total = 100000 +) + +func addBankData(dg *dgo.Dgraph, pred string) error { + for i := 1; i <= total; { + bb := &bytes.Buffer{} + for j := 0; j < 10000; j++ { + _, err := bb.WriteString(fmt.Sprintf("<%v> <%v> \"%v\" .\n", i, pred, i)) + if err != nil { + return fmt.Errorf("error in mutation :: %w", err) + } + i++ + } + if err := testutil.RetryMutation(dg, &api.Mutation{ + CommitNow: true, + SetNquads: bb.Bytes(), + }); err != nil { + return fmt.Errorf("error in mutation :: %w", err) + } + } + + return nil +} + +func TestParallelIndexing(t *testing.T) { + if err := testutil.AssignUids(uint64(total * 10)); err != nil { + t.Fatalf("error in assignig UIDs :: %v", err) + } + + dg, err := testutil.DgraphClientWithGroot(testutil.SockAddr) + if err != nil { + t.Fatalf("Error while getting a dgraph client: %v", err) + } + + testutil.DropAll(t, dg) + if err := dg.Alter(context.Background(), &api.Operation{ + Schema: ` + balance_int: int . + balance_str: string . + balance_float: float . + `, + }); err != nil { + t.Fatalf("error in setting up schema :: %v\n", err) + } + + fmt.Println("adding integer dataset") + if err := addBankData(dg, "balance_int"); err != nil { + t.Fatalf("error in adding integer predicate :: %v\n", err) + } + + fmt.Println("adding string dataset") + if err := addBankData(dg, "balance_str"); err != nil { + t.Fatalf("error in adding string predicate :: %v\n", err) + } + + fmt.Println("adding float dataset") + if err := addBankData(dg, "balance_float"); err != nil { + t.Fatalf("error in adding float predicate :: %v\n", err) + } + + fmt.Println("building indexes in background for int and string data") + if err := dg.Alter(context.Background(), &api.Operation{ + Schema: ` + balance_int: int @index(int) . + balance_str: string @index(fulltext, term, exact) . + `, + }); err != nil { + t.Fatalf("error in adding indexes :: %v\n", err) + } + + if err := dg.Alter(context.Background(), &api.Operation{ + Schema: ` + balance_int: int @index(int) . + balance_str: string @index(fulltext, term, exact) . + `, + }); err != nil && !strings.Contains(err.Error(), "is already being modified") { + t.Fatalf("error in adding indexes :: %v\n", err) + } + + // Wait until previous indexing is complete. + for { + if err := dg.Alter(context.Background(), &api.Operation{ + Schema: `balance_float: float @index(float) .`, + }); err != nil && !strings.Contains(err.Error(), "is already being modified") { + t.Fatalf("error in adding indexes :: %v\n", err) + } else if err == nil { + break + } + time.Sleep(time.Second) + } + + fmt.Println("waiting for float indexing to complete") + s := `balance_float: float @index(float) .` + testutil.WaitForAlter(context.Background(), dg, s) + + // balance should be same as uid. + checkBalance := func(b int, pred string) error { + q := fmt.Sprintf(`{ q(func: eq(%v, "%v")) {uid}}`, pred, b) + resp, err := dg.NewReadOnlyTxn().Query(context.Background(), q) + if err != nil { + return fmt.Errorf("error in query: %v ::%w", q, err) + } + var data struct { + Q []struct { + UID string + } + } + if err := json.Unmarshal(resp.Json, &data); err != nil { + return fmt.Errorf("error in json.Unmarshal :: %w", err) + } + + if len(data.Q) != 1 { + return fmt.Errorf("length not equal :: exp: %v, actual %v", b, data.Q[0]) + } + v, err := strconv.ParseInt(data.Q[0].UID, 0, 64) + if err != nil { + return err + } + if b != int(v) { + return fmt.Errorf("value not equal :: exp: %v, actual %v", b, data.Q[0]) + } + + return nil + } + + fmt.Println("starting to query") + var count uint64 + th := y.NewThrottle(50000) + th.Do() + go func() { + defer th.Done(nil) + for { + time.Sleep(2 * time.Second) + cur := atomic.LoadUint64(&count) + fmt.Printf("%v/%v done\n", cur, total*3) + if int(cur) == total*3 { + break + } + } + }() + + type pair struct { + key int + err string + } + ch := make(chan pair, total*3) + for _, predicate := range []string{"balance_str", "balance_int", "balance_float"} { + for i := 1; i <= total; i++ { + th.Do() + go func(bal int, pred string) { + defer th.Done(nil) + if err := checkBalance(bal, pred); err != nil { + ch <- pair{bal, err.Error()} + } + atomic.AddUint64(&count, 1) + }(i, predicate) + } + } + th.Finish() + + close(ch) + for p := range ch { + t.Errorf("failed for %v, :: %v\n", p.key, p.err) + } +} diff --git a/systest/bgindex/reverse_test.go b/systest/bgindex/reverse_test.go new file mode 100644 index 00000000000..6772db2bf63 --- /dev/null +++ b/systest/bgindex/reverse_test.go @@ -0,0 +1,283 @@ +// +build systest + +/* + * Copyright 2020 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "math/rand" + "sort" + "strconv" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/dgraph-io/dgo/v2" + "github.com/dgraph-io/dgo/v2/protos/api" + "github.com/dgraph-io/dgraph/testutil" +) + +func TestReverseIndex(t *testing.T) { + total := 100000 + dg, err := testutil.DgraphClientWithGroot(testutil.SockAddr) + if err != nil { + t.Fatalf("Error while getting a dgraph client: %v", err) + } + + testutil.DropAll(t, dg) + if err := dg.Alter(context.Background(), &api.Operation{ + Schema: "balance: [uid] .", + }); err != nil { + t.Fatalf("error in setting up schema :: %v\n", err) + } + + if err := testutil.AssignUids(uint64(total * 10)); err != nil { + t.Fatalf("error in assigning UIDs :: %v", err) + } + + // Insert edges from uid to (uid+1) + fmt.Println("inserting edges") + for i := 1; i < total; { + bb := &bytes.Buffer{} + for j := 0; j < 10000; j++ { + _, err := bb.WriteString(fmt.Sprintf("<%v> <%v> .\n", i, i+1)) + if err != nil { + t.Fatalf("error in mutation %v\n", err) + } + i++ + } + if err := testutil.RetryMutation(dg, &api.Mutation{ + CommitNow: true, + SetNquads: bb.Bytes(), + }); err != nil { + t.Fatalf("error in mutation :: %v", err) + } + } + + fmt.Println("building indexes in background") + if err := dg.Alter(context.Background(), &api.Operation{ + Schema: "balance: [uid] @reverse .", + }); err != nil { + t.Fatalf("error in adding indexes :: %v\n", err) + } + + numEdges := int64(total) + updated := sync.Map{} + mutateUID := func(uid int) { + switch uid % 4 { + case 0: + // insert an edge from (uid-2) to uid + if _, err := dg.NewTxn().Mutate(context.Background(), &api.Mutation{ + CommitNow: true, + SetNquads: []byte(fmt.Sprintf(`<%v> <%v> .`, uid-2, uid)), + }); err != nil && !errors.Is(err, dgo.ErrAborted) { + t.Fatalf("error in mutation :: %v\n", err) + } else if errors.Is(err, dgo.ErrAborted) { + return + } + updated.Store(uid, nil) + case 1: + // add new uid and edge from (uid-1) to uid + v := atomic.AddInt64(&numEdges, 1) + if _, err := dg.NewTxn().Mutate(context.Background(), &api.Mutation{ + CommitNow: true, + SetNquads: []byte(fmt.Sprintf(`<%v> <%v> .`, v-1, v)), + }); err != nil { + t.Fatalf("error in insertion :: %v\n", err) + } + case 2: + // delete an existing edge from uid-1 to uid + if _, err := dg.NewTxn().Mutate(context.Background(), &api.Mutation{ + CommitNow: true, + DelNquads: []byte(fmt.Sprintf(`<%v> <%v> .`, uid-1, uid)), + }); err != nil && !errors.Is(err, dgo.ErrAborted) { + t.Fatalf("error in mutation :: %v\n", err) + } else if errors.Is(err, dgo.ErrAborted) { + return + } + updated.Store(uid, nil) + case 3: + // add two new edges, uid+1 to uid AND uid-2 to uid, already have uid to uid-1 + if _, err := dg.NewTxn().Mutate(context.Background(), &api.Mutation{ + CommitNow: true, + SetNquads: []byte(fmt.Sprintf("<%v> <%v> .\n<%v> <%v> .", + uid+1, uid, uid-2, uid)), + }); err != nil && !errors.Is(err, dgo.ErrAborted) { + t.Fatalf("error in mutation :: %v\n", err) + } else if errors.Is(err, dgo.ErrAborted) { + return + } + updated.Store(uid, nil) + } + } + + // perform mutations until ctrl+c + var swg sync.WaitGroup + var counter uint64 + quit := make(chan struct{}) + runLoop := func() { + defer swg.Done() + for { + select { + case <-quit: + return + default: + mutateUID(rand.Intn(total) + 1) + atomic.AddUint64(&counter, 1) + } + } + } + + swg.Add(101) + for i := 0; i < 100; i++ { + go runLoop() + } + go printStats(&counter, quit, &swg) + checkSchemaUpdate(`{ q(func: uid(0x01)) { ~balance { uid }}}`, dg) + close(quit) + swg.Wait() + fmt.Println("mutations done") + + // check values now + checkUID := func(i int) error { + q := fmt.Sprintf(`{ q(func: uid(%v)) { ~balance { uid }}}`, i) + resp, err := dg.NewReadOnlyTxn().Query(context.Background(), q) + if err != nil { + return fmt.Errorf("error in query :: %w", err) + } + var data struct { + Q []struct { + Balance []struct { + UID string + } `json:"~balance"` + } + } + if err := json.Unmarshal(resp.Json, &data); err != nil { + return fmt.Errorf("error in json.Unmarshal :: %w", err) + } + + _, ok := updated.Load(i) + switch { + case !ok || i > total || i%4 == 1: + // Expect exactly one edge, uid-1 to uid + if len(data.Q) != 1 || len(data.Q[0].Balance) != 1 { + return fmt.Errorf("length not equal, no mod, got: %+v", data) + } + v1, err := strconv.ParseInt(data.Q[0].Balance[0].UID, 0, 64) + if err != nil { + return err + } + if int(v1) != i-1 { + return fmt.Errorf("value not equal, got: %+v", data) + } + case i%4 == 0: + // Expect two edges, uid-2 to uid AND uid-1 to uid + if len(data.Q) != 1 || len(data.Q[0].Balance) != 2 { + return fmt.Errorf("length not equal, got: %+v", data) + } + v1, err := strconv.ParseInt(data.Q[0].Balance[0].UID, 0, 64) + if err != nil { + return err + } + v2, err := strconv.ParseInt(data.Q[0].Balance[1].UID, 0, 64) + if err != nil { + return err + } + l := []int{int(v1), int(v2)} + sort.Ints(l) + if l[0] != i-2 || l[1] != i-1 { + return fmt.Errorf("value not equal, got: %+v", data) + } + case i%4 == 2: + // This was deleted, so no edges expected + if len(data.Q) != 0 { + return fmt.Errorf("length not equal, del, got: %+v", data) + } + case i%4 == 3: + // Expect 3 edges from uid-2, uid-1 and uid+1 + if len(data.Q) != 1 || len(data.Q[0].Balance) != 3 { + return fmt.Errorf("length not equal, got: %+v", data) + } + v1, err := strconv.ParseInt(data.Q[0].Balance[0].UID, 0, 64) + if err != nil { + return err + } + v2, err := strconv.ParseInt(data.Q[0].Balance[1].UID, 0, 64) + if err != nil { + return err + } + v3, err := strconv.ParseInt(data.Q[0].Balance[2].UID, 0, 64) + if err != nil { + return err + } + l := []int{int(v1), int(v2), int(v3)} + sort.Ints(l) + if l[0] != i-2 || l[1] != i-1 || l[2] != i+1 { + return fmt.Errorf("value not equal, got: %+v", data) + } + } + + return nil + } + + type pair struct { + uid int + err string + } + ch := make(chan pair, numEdges) + + fmt.Println("starting to query") + var wg sync.WaitGroup + var count uint64 + for i := 2; i <= int(numEdges); i += 100 { + wg.Add(1) + go func(j int) { + defer wg.Done() + for k := j; k < j+100 && k <= int(numEdges); k++ { + if err := checkUID(k); err != nil { + ch <- pair{k, err.Error()} + } + atomic.AddUint64(&count, 1) + } + }(i) + } + + wg.Add(1) + go func() { + defer wg.Done() + for { + time.Sleep(2 * time.Second) + cur := atomic.LoadUint64(&count) + fmt.Printf("%v/%v done\n", cur, numEdges-1) + if cur+1 == uint64(numEdges) { + break + } + } + }() + wg.Wait() + + close(ch) + for p := range ch { + t.Errorf("failed for %v, :: %v\n", p.uid, p.err) + } +} diff --git a/systest/bgindex/string_test.go b/systest/bgindex/string_test.go new file mode 100644 index 00000000000..128aa0353cf --- /dev/null +++ b/systest/bgindex/string_test.go @@ -0,0 +1,274 @@ +// +build systest + +/* + * Copyright 2020 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "math/rand" + "sort" + "strconv" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/dgraph-io/badger/v2/y" + "github.com/dgraph-io/dgo/v2" + "github.com/dgraph-io/dgo/v2/protos/api" + "github.com/dgraph-io/dgraph/testutil" +) + +func TestStringIndex(t *testing.T) { + total := 100000 + numAccts := uint64(total) + acctsBal := make(map[int]int, numAccts) + var lock sync.Mutex + + dg, err := testutil.DgraphClientWithGroot(testutil.SockAddr) + if err != nil { + t.Fatalf("Error while getting a dgraph client: %v", err) + } + + testutil.DropAll(t, dg) + if err := dg.Alter(context.Background(), &api.Operation{ + Schema: "balance: string .", + }); err != nil { + t.Fatalf("error in setting up schema :: %v\n", err) + } + + if err := testutil.AssignUids(uint64(total * 10)); err != nil { + t.Fatalf("error in assignig UIDs :: %v", err) + } + + // first insert bank accounts + fmt.Println("inserting accounts") + for i := 1; i <= int(numAccts); { + bb := &bytes.Buffer{} + for j := 0; j < 10000; j++ { + acctsBal[i] = rand.Intn(total * 100) + _, err := bb.WriteString(fmt.Sprintf("<%v> \"%v\" .\n", i, acctsBal[i])) + if err != nil { + t.Fatalf("error in mutation %v\n", err) + } + i++ + } + if err := testutil.RetryMutation(dg, &api.Mutation{ + CommitNow: true, + SetNquads: bb.Bytes(), + }); err != nil { + t.Fatalf("error in mutation :: %v", err) + } + } + + fmt.Println("building indexes in background") + if err := dg.Alter(context.Background(), &api.Operation{ + Schema: "balance: string @index(fulltext, term, exact) .", + }); err != nil { + t.Fatalf("error in adding indexes :: %v\n", err) + } + + // perform mutations until ctrl+c + mutateUID := func(uid int) { + nb := rand.Intn(total * 100) + switch uid % 3 { + case 0: + // change the balance to new random value. + if _, err := dg.NewTxn().Mutate(context.Background(), &api.Mutation{ + CommitNow: true, + SetNquads: []byte(fmt.Sprintf(`<%v> "%v" .`, uid, nb)), + }); err != nil && !errors.Is(err, dgo.ErrAborted) { + t.Fatalf("error in mutation :: %v\n", err) + } else if errors.Is(err, dgo.ErrAborted) { + return + } + case 1: + // delete this uid. + if _, err := dg.NewTxn().Mutate(context.Background(), &api.Mutation{ + CommitNow: true, + DelNquads: []byte(fmt.Sprintf(`<%v> * .`, uid)), + }); err != nil && !errors.Is(err, dgo.ErrAborted) { + t.Fatalf("error in deletion :: %v\n", err) + } else if errors.Is(err, dgo.ErrAborted) { + return + } + nb = -1 + case 2: + // add new uid. + uid = int(atomic.AddUint64(&numAccts, 1)) + if _, err := dg.NewTxn().Mutate(context.Background(), &api.Mutation{ + CommitNow: true, + SetNquads: []byte(fmt.Sprintf(`<%v> "%v" .`, uid, nb)), + }); err != nil && !errors.Is(err, dgo.ErrAborted) { + t.Fatalf("error in insertion :: %v\n", err) + } else if errors.Is(err, dgo.ErrAborted) { + return + } + } + + lock.Lock() + acctsBal[uid] = nb + lock.Unlock() + } + + // perform mutations until ctrl+c + var swg sync.WaitGroup + var counter uint64 + quit := make(chan struct{}) + runLoop := func() { + defer swg.Done() + for { + select { + case <-quit: + return + default: + n := int(atomic.LoadUint64(&numAccts)) + mutateUID(rand.Intn(n) + 1) + atomic.AddUint64(&counter, 1) + } + } + } + + swg.Add(101) + for i := 0; i < 100; i++ { + go runLoop() + } + go printStats(&counter, quit, &swg) + checkSchemaUpdate(`{ q(func: anyoftext(balance, "example")) {uid}}`, dg) + close(quit) + swg.Wait() + fmt.Println("mutations done") + + // compute index + balIndex := make(map[int][]int) + for uid, bal := range acctsBal { + balIndex[bal] = append(balIndex[bal], uid) + } + for _, aa := range balIndex { + sort.Ints(aa) + } + + checkDelete := func(uid int) error { + q := fmt.Sprintf(`{ q(func: uid(%v)) {balance}}`, uid) + resp, err := dg.NewReadOnlyTxn().Query(context.Background(), q) + if err != nil { + return fmt.Errorf("error in query: %v :: %w", q, err) + } + var data struct { + Q []struct { + Balance string + } + } + if err := json.Unmarshal(resp.Json, &data); err != nil { + return fmt.Errorf("error in json.Unmarshal :: %w", err) + } + + if len(data.Q) != 0 { + return fmt.Errorf("found a deleted UID, %v", uid) + } + return nil + } + + checkBalance := func(b int, uids []int) error { + q := fmt.Sprintf(`{ q(func: anyoftext(balance, "%v")) {uid}}`, b) + resp, err := dg.NewReadOnlyTxn().Query(context.Background(), q) + if err != nil { + return fmt.Errorf("error in query: %v :: %w", q, err) + } + var data struct { + Q []struct { + UID string + } + } + if err := json.Unmarshal(resp.Json, &data); err != nil { + return fmt.Errorf("error in json.Unmarshal :: %w", err) + } + + actual := make([]int, len(data.Q)) + for i, ui := range data.Q { + v, err := strconv.ParseInt(ui.UID, 0, 64) + if err != nil { + return err + } + actual[i] = int(v) + } + sort.Ints(actual) + + if len(actual) != len(uids) { + return fmt.Errorf("length not equal :: exp: %v, actual %v", uids, actual) + } + for i := range uids { + if uids[i] != actual[i] { + return fmt.Errorf("value not equal :: exp: %v, actual %v", uids, actual) + } + } + + return nil + } + + type pair struct { + key int + err string + } + ch := make(chan pair, numAccts) + + fmt.Println("starting to query") + var count uint64 + th := y.NewThrottle(50000) + th.Do() + go func() { + defer th.Done(nil) + for { + time.Sleep(2 * time.Second) + cur := atomic.LoadUint64(&count) + fmt.Printf("%v/%v done\n", cur, len(balIndex)) + if int(cur) == len(balIndex) { + break + } + } + }() + + for balance, uids := range balIndex { + th.Do() + go func(bal int, uidList []int) { + defer th.Done(nil) + if bal == -1 { + for _, uid := range uidList { + if err := checkDelete(uid); err != nil { + ch <- pair{uid, err.Error()} + } + } + } else { + if err := checkBalance(bal, uidList); err != nil { + ch <- pair{bal, err.Error()} + } + } + atomic.AddUint64(&count, 1) + }(balance, uids) + } + th.Finish() + + close(ch) + for p := range ch { + t.Fatalf("failed for %v, :: %v\n", p.key, p.err) + } +} diff --git a/systest/bgindex/test-bgindex.sh b/systest/bgindex/test-bgindex.sh new file mode 100755 index 00000000000..85647e2a1af --- /dev/null +++ b/systest/bgindex/test-bgindex.sh @@ -0,0 +1,43 @@ +#!/bin/bash + +set -e +readonly SRCDIR=$(dirname $0) + +function Info { + echo -e "INFO: $*" +} + +function DockerCompose { + docker-compose -p dgraph "$@" +} + +Info "entering directory $SRCDIR" +cd $SRCDIR + +Info "bringing down dgraph cluster and data volumes" +DockerCompose down -v + +Info "bringing up dgraph cluster" +DockerCompose up -d + +Info "waiting for zero to become leader" +DockerCompose logs -f alpha1 | grep -q -m1 "Successfully upserted groot account" + +if [[ ! -z "$TEAMCITY_VERSION" ]]; then + # Make TeamCity aware of Go tests + export GOFLAGS="-json" +fi + +Info "running background indexing test" +go test -v -tags systest || FOUND_DIFFS=1 + +Info "bringing down dgraph cluster and data volumes" +DockerCompose down -v + +if [[ $FOUND_DIFFS -eq 0 ]]; then + Info "test passed" +else + Info "test failed" +fi + +exit $FOUND_DIFFS diff --git a/systest/mutations_test.go b/systest/mutations_test.go index d9b17b81f14..02491348ea4 100644 --- a/systest/mutations_test.go +++ b/systest/mutations_test.go @@ -172,9 +172,9 @@ func ListWithLanguagesTest(t *testing.T, c *dgo.Dgraph) { func NQuadMutationTest(t *testing.T, c *dgo.Dgraph) { ctx := context.Background() - require.NoError(t, c.Alter(ctx, &api.Operation{ - Schema: `xid: string @index(exact) .`, - })) + op := &api.Operation{Schema: `xid: string @index(exact) .`} + require.NoError(t, c.Alter(ctx, op)) + require.NoError(t, testutil.WaitForAlter(ctx, c, op.Schema)) txn := c.NewTxn() assigned, err := txn.Mutate(ctx, &api.Mutation{ @@ -249,7 +249,9 @@ func NQuadMutationTest(t *testing.T, c *dgo.Dgraph) { func DeleteAllReverseIndex(t *testing.T, c *dgo.Dgraph) { ctx := context.Background() - require.NoError(t, c.Alter(ctx, &api.Operation{Schema: "link: [uid] @reverse ."})) + op := &api.Operation{Schema: "link: [uid] @reverse ."} + require.NoError(t, c.Alter(ctx, op)) + require.NoError(t, testutil.WaitForAlter(ctx, c, op.Schema)) assignedIds, err := c.NewTxn().Mutate(ctx, &api.Mutation{ CommitNow: true, SetNquads: []byte("_:a _:b ."), @@ -296,7 +298,9 @@ func DeleteAllReverseIndex(t *testing.T, c *dgo.Dgraph) { func NormalizeEdgeCasesTest(t *testing.T, c *dgo.Dgraph) { ctx := context.Background() - require.NoError(t, c.Alter(ctx, &api.Operation{Schema: "xid: string @index(exact) ."})) + op := &api.Operation{Schema: "xid: string @index(exact) ."} + require.NoError(t, c.Alter(ctx, op)) + require.NoError(t, testutil.WaitForAlter(ctx, c, op.Schema)) _, err := c.NewTxn().Mutate(ctx, &api.Mutation{ CommitNow: true, @@ -373,9 +377,9 @@ func NormalizeEdgeCasesTest(t *testing.T, c *dgo.Dgraph) { func FacetOrderTest(t *testing.T, c *dgo.Dgraph) { ctx := context.Background() - require.NoError(t, c.Alter(ctx, &api.Operation{ - Schema: `name: string @index(exact) .`, - })) + op := &api.Operation{Schema: `name: string @index(exact) .`} + require.NoError(t, c.Alter(ctx, op)) + require.NoError(t, testutil.WaitForAlter(ctx, c, op.Schema)) txn := c.NewTxn() _, err := txn.Mutate(ctx, &api.Mutation{ @@ -447,7 +451,9 @@ func FacetOrderTest(t *testing.T, c *dgo.Dgraph) { // Shows fix for issue #1918. func LangAndSortBugTest(t *testing.T, c *dgo.Dgraph) { ctx := context.Background() - require.NoError(t, c.Alter(ctx, &api.Operation{Schema: "name: string @index(exact) @lang ."})) + op := &api.Operation{Schema: "name: string @index(exact) @lang ."} + require.NoError(t, c.Alter(ctx, op)) + require.NoError(t, testutil.WaitForAlter(ctx, c, op.Schema)) txn := c.NewTxn() _, err := txn.Mutate(ctx, &api.Mutation{ @@ -541,8 +547,9 @@ func SortFacetsReturnNil(t *testing.T, c *dgo.Dgraph) { func SchemaAfterDeleteNode(t *testing.T, c *dgo.Dgraph) { ctx := context.Background() - - require.NoError(t, c.Alter(ctx, &api.Operation{Schema: "married: bool ."})) + op := &api.Operation{Schema: "married: bool ."} + require.NoError(t, c.Alter(ctx, op)) + require.NoError(t, testutil.WaitForAlter(ctx, c, op.Schema)) txn := c.NewTxn() assigned, err := txn.Mutate(ctx, &api.Mutation{ @@ -595,8 +602,9 @@ func asJson(schema string) string { func FullTextEqual(t *testing.T, c *dgo.Dgraph) { ctx := context.Background() - - require.NoError(t, c.Alter(ctx, &api.Operation{Schema: "text: string @index(fulltext) ."})) + op := &api.Operation{Schema: "text: string @index(fulltext) ."} + require.NoError(t, c.Alter(ctx, op)) + require.NoError(t, testutil.WaitForAlter(ctx, c, op.Schema)) texts := []string{"bat man", "aqua man", "bat cave", "bat", "man", "aqua", "cave"} var rdfs bytes.Buffer @@ -664,7 +672,9 @@ func JSONBlankNode(t *testing.T, c *dgo.Dgraph) { func ScalarToList(t *testing.T, c *dgo.Dgraph) { ctx := context.Background() - require.NoError(t, c.Alter(ctx, &api.Operation{Schema: `pred: string @index(exact) .`})) + op := &api.Operation{Schema: `pred: string @index(exact) .`} + require.NoError(t, c.Alter(ctx, op)) + require.NoError(t, testutil.WaitForAlter(ctx, c, op.Schema)) uids, err := c.NewTxn().Mutate(ctx, &api.Mutation{ SetNquads: []byte(`_:blank "first" .`), @@ -684,7 +694,9 @@ func ScalarToList(t *testing.T, c *dgo.Dgraph) { require.NoError(t, err) require.Equal(t, `{"me":[{"pred":"first"}]}`, string(resp.Json)) - require.NoError(t, c.Alter(ctx, &api.Operation{Schema: `pred: [string] @index(exact) .`})) + op = &api.Operation{Schema: `pred: [string] @index(exact) .`} + require.NoError(t, c.Alter(ctx, op)) + require.NoError(t, testutil.WaitForAlter(ctx, c, op.Schema)) resp, err = c.NewTxn().Query(ctx, q) require.NoError(t, err) require.Equal(t, `{"me":[{"pred":["first"]}]}`, string(resp.Json)) @@ -750,14 +762,18 @@ func ScalarToList(t *testing.T, c *dgo.Dgraph) { func ListToScalar(t *testing.T, c *dgo.Dgraph) { ctx := context.Background() - require.NoError(t, c.Alter(ctx, &api.Operation{Schema: `pred: [string] @index(exact) .`})) + op := &api.Operation{Schema: `pred: [string] @index(exact) .`} + require.NoError(t, c.Alter(ctx, op)) + require.NoError(t, testutil.WaitForAlter(ctx, c, op.Schema)) err := c.Alter(ctx, &api.Operation{Schema: `pred: string @index(exact) .`}) require.Error(t, err) require.Contains(t, err.Error(), `Type can't be changed from list to scalar for attr: [pred] without dropping it first.`) require.NoError(t, c.Alter(ctx, &api.Operation{DropAttr: `pred`})) - err = c.Alter(ctx, &api.Operation{Schema: `pred: string @index(exact) .`}) + op = &api.Operation{Schema: `pred: string @index(exact) .`} + err = c.Alter(ctx, op) + require.NoError(t, testutil.WaitForAlter(ctx, c, op.Schema)) require.NoError(t, err) } @@ -812,7 +828,9 @@ func SetAfterDeletionListType(t *testing.T, c *dgo.Dgraph) { func EmptyNamesWithExact(t *testing.T, c *dgo.Dgraph) { ctx := context.Background() - err := c.Alter(ctx, &api.Operation{Schema: `name: string @index(exact) @lang .`}) + op := &api.Operation{Schema: `name: string @index(exact) @lang .`} + err := c.Alter(ctx, op) + require.NoError(t, testutil.WaitForAlter(ctx, c, op.Schema)) require.NoError(t, err) _, err = c.NewTxn().Mutate(ctx, &api.Mutation{ @@ -847,6 +865,7 @@ func EmptyRoomsWithTermIndex(t *testing.T, c *dgo.Dgraph) { ` ctx := context.Background() err := c.Alter(ctx, op) + require.NoError(t, testutil.WaitForAlter(ctx, c, op.Schema)) require.NoError(t, err) _, err = c.NewTxn().Mutate(ctx, &api.Mutation{ @@ -1028,9 +1047,11 @@ func SkipEmptyPLForHas(t *testing.T, c *dgo.Dgraph) { func HasWithDash(t *testing.T, c *dgo.Dgraph) { ctx := context.Background() - check(t, (c.Alter(ctx, &api.Operation{ + op := &api.Operation{ Schema: `name: string @index(hash) .`, - }))) + } + check(t, (c.Alter(ctx, op))) + require.NoError(t, testutil.WaitForAlter(ctx, c, op.Schema)) txn := c.NewTxn() _, err := txn.Mutate(ctx, &api.Mutation{ @@ -1063,12 +1084,14 @@ func HasWithDash(t *testing.T, c *dgo.Dgraph) { func ListGeoFilterTest(t *testing.T, c *dgo.Dgraph) { ctx := context.Background() - check(t, c.Alter(ctx, &api.Operation{ + op := &api.Operation{ Schema: ` name: string @index(term) . loc: [geo] @index(geo) . `, - })) + } + check(t, c.Alter(ctx, op)) + require.NoError(t, testutil.WaitForAlter(ctx, c, op.Schema)) txn := c.NewTxn() defer txn.Discard(ctx) @@ -1110,12 +1133,14 @@ func ListGeoFilterTest(t *testing.T, c *dgo.Dgraph) { func ListRegexFilterTest(t *testing.T, c *dgo.Dgraph) { ctx := context.Background() - check(t, c.Alter(ctx, &api.Operation{ + op := &api.Operation{ Schema: ` name: string @index(term) . per: [string] @index(trigram) . `, - })) + } + check(t, c.Alter(ctx, op)) + require.NoError(t, testutil.WaitForAlter(ctx, c, op.Schema)) txn := c.NewTxn() defer txn.Discard(ctx) @@ -1157,12 +1182,14 @@ func ListRegexFilterTest(t *testing.T, c *dgo.Dgraph) { func RegexQueryWithVars(t *testing.T, c *dgo.Dgraph) { ctx := context.Background() - check(t, c.Alter(ctx, &api.Operation{ + op := &api.Operation{ Schema: ` name: string @index(term) . per: [string] @index(trigram) . `, - })) + } + check(t, c.Alter(ctx, op)) + require.NoError(t, testutil.WaitForAlter(ctx, c, op.Schema)) txn := c.NewTxn() defer txn.Discard(ctx) @@ -1205,11 +1232,9 @@ func RegexQueryWithVars(t *testing.T, c *dgo.Dgraph) { func GraphQLVarChild(t *testing.T, c *dgo.Dgraph) { ctx := context.Background() - check(t, c.Alter(ctx, &api.Operation{ - Schema: ` - name: string @index(exact) . - `, - })) + op := &api.Operation{Schema: `name: string @index(exact) .`} + check(t, c.Alter(ctx, op)) + require.NoError(t, testutil.WaitForAlter(ctx, c, op.Schema)) txn := c.NewTxn() defer txn.Discard(ctx) @@ -1309,11 +1334,9 @@ func GraphQLVarChild(t *testing.T, c *dgo.Dgraph) { func MathGe(t *testing.T, c *dgo.Dgraph) { ctx := context.Background() - check(t, c.Alter(ctx, &api.Operation{ - Schema: ` - name: string @index(exact) . - `, - })) + op := &api.Operation{Schema: `name: string @index(exact) .`} + check(t, c.Alter(ctx, op)) + require.NoError(t, testutil.WaitForAlter(ctx, c, op.Schema)) txn := c.NewTxn() defer txn.Discard(ctx) @@ -1451,11 +1474,9 @@ func HasDeletedEdge(t *testing.T, c *dgo.Dgraph) { func HasReverseEdge(t *testing.T, c *dgo.Dgraph) { ctx := context.Background() - check(t, c.Alter(ctx, &api.Operation{ - Schema: ` - follow: [uid] @reverse . - `, - })) + op := &api.Operation{Schema: `follow: [uid] @reverse .`} + check(t, c.Alter(ctx, op)) + require.NoError(t, testutil.WaitForAlter(ctx, c, op.Schema)) txn := c.NewTxn() defer txn.Discard(ctx) @@ -1554,12 +1575,14 @@ func RestoreReservedPreds(t *testing.T, c *dgo.Dgraph) { func DropData(t *testing.T, c *dgo.Dgraph) { ctx := context.Background() - require.NoError(t, c.Alter(ctx, &api.Operation{ + op := &api.Operation{ Schema: ` name: string @index(term) . follow: [uid] @reverse . `, - })) + } + require.NoError(t, c.Alter(ctx, op)) + require.NoError(t, testutil.WaitForAlter(ctx, c, op.Schema)) txn := c.NewTxn() _, err := txn.Mutate(ctx, &api.Mutation{ @@ -1655,6 +1678,7 @@ func ReverseCountIndex(t *testing.T, c *dgo.Dgraph) { ctx := context.Background() err := c.Alter(ctx, op) + require.NoError(t, testutil.WaitForAlter(ctx, c, op.Schema)) require.NoError(t, err) mu := &api.Mutation{ @@ -1754,6 +1778,7 @@ func TypePredicateCheck(t *testing.T, c *dgo.Dgraph) { }` ctx = context.Background() err = c.Alter(ctx, op) + require.NoError(t, testutil.WaitForAlter(ctx, c, op.Schema)) require.NoError(t, err) } diff --git a/systest/plugin_test.go b/systest/plugin_test.go index 62821388dbd..5acffbaea71 100644 --- a/systest/plugin_test.go +++ b/systest/plugin_test.go @@ -31,6 +31,7 @@ import ( "github.com/dgraph-io/dgo/v2/protos/api" "github.com/dgraph-io/dgraph/testutil" + "github.com/dgraph-io/dgraph/tok" ) func TestPlugins(t *testing.T) { @@ -80,6 +81,7 @@ func TestPlugins(t *testing.T) { check(t, cluster.client.Alter(ctx, &api.Operation{ Schema: initialSchema, })) + check(t, testutil.WaitForAlter(ctx, cluster.client, initialSchema)) txn := cluster.client.NewTxn() _, err = txn.Mutate(ctx, &api.Mutation{SetJson: []byte(setJSON)}) @@ -94,6 +96,11 @@ func TestPlugins(t *testing.T) { } } + // Need to do this so that schema.Parse in testutil.WaitForAlter doesn't complain. + for _, soFile := range soFiles { + tok.LoadCustomTokenizer(soFile) + } + suite( "word: string @index(anagram) .", `[ diff --git a/systest/queries_test.go b/systest/queries_test.go index df7cef19eac..950e86b4411 100644 --- a/systest/queries_test.go +++ b/systest/queries_test.go @@ -65,12 +65,14 @@ func SchemaQueryCleanup(t *testing.T, c *dgo.Dgraph) { func MultipleBlockEval(t *testing.T, c *dgo.Dgraph) { ctx := context.Background() - require.NoError(t, c.Alter(ctx, &api.Operation{ + op := &api.Operation{ Schema: ` entity: string @index(exact) . stock: [uid] @reverse . `, - })) + } + require.NoError(t, c.Alter(ctx, op)) + require.NoError(t, testutil.WaitForAlter(ctx, c, op.Schema)) txn := c.NewTxn() _, err := txn.Mutate(ctx, &api.Mutation{ @@ -226,14 +228,16 @@ func MultipleBlockEval(t *testing.T, c *dgo.Dgraph) { func UnmatchedVarEval(t *testing.T, c *dgo.Dgraph) { ctx := context.Background() - require.NoError(t, c.Alter(ctx, &api.Operation{ + op := &api.Operation{ Schema: ` item: string @index(hash) . style.type: string . style.name: string . style.cool: bool . `, - })) + } + require.NoError(t, c.Alter(ctx, op)) + require.NoError(t, testutil.WaitForAlter(ctx, c, op.Schema)) txn := c.NewTxn() _, err := txn.Mutate(ctx, &api.Mutation{ @@ -318,9 +322,9 @@ func UnmatchedVarEval(t *testing.T, c *dgo.Dgraph) { func SchemaQueryTest(t *testing.T, c *dgo.Dgraph) { ctx := context.Background() - require.NoError(t, c.Alter(ctx, &api.Operation{ - Schema: `name: string @index(exact) .`, - })) + op := &api.Operation{Schema: `name: string @index(exact) .`} + require.NoError(t, c.Alter(ctx, op)) + require.NoError(t, testutil.WaitForAlter(ctx, c, op.Schema)) txn := c.NewTxn() _, err := txn.Mutate(ctx, &api.Mutation{ @@ -360,12 +364,14 @@ func SchemaQueryTest(t *testing.T, c *dgo.Dgraph) { func SchemaQueryTestPredicate1(t *testing.T, c *dgo.Dgraph) { ctx := context.Background() - require.NoError(t, c.Alter(ctx, &api.Operation{ + op := &api.Operation{ Schema: ` name: string @index(exact) . age: int . `, - })) + } + require.NoError(t, c.Alter(ctx, op)) + require.NoError(t, testutil.WaitForAlter(ctx, c, op.Schema)) txn := c.NewTxn() _, err := txn.Mutate(ctx, &api.Mutation{ @@ -427,9 +433,9 @@ func SchemaQueryTestPredicate1(t *testing.T, c *dgo.Dgraph) { func SchemaQueryTestPredicate2(t *testing.T, c *dgo.Dgraph) { ctx := context.Background() - require.NoError(t, c.Alter(ctx, &api.Operation{ - Schema: `name: string @index(exact) .`, - })) + op := &api.Operation{Schema: `name: string @index(exact) .`} + require.NoError(t, c.Alter(ctx, op)) + require.NoError(t, testutil.WaitForAlter(ctx, c, op.Schema)) txn := c.NewTxn() _, err := txn.Mutate(ctx, &api.Mutation{ @@ -460,12 +466,14 @@ func SchemaQueryTestPredicate2(t *testing.T, c *dgo.Dgraph) { func SchemaQueryTestPredicate3(t *testing.T, c *dgo.Dgraph) { ctx := context.Background() - require.NoError(t, c.Alter(ctx, &api.Operation{ + op := &api.Operation{ Schema: ` name: string @index(exact) . age: int . `, - })) + } + require.NoError(t, c.Alter(ctx, op)) + require.NoError(t, testutil.WaitForAlter(ctx, c, op.Schema)) txn := c.NewTxn() _, err := txn.Mutate(ctx, &api.Mutation{ @@ -503,9 +511,9 @@ func SchemaQueryTestPredicate3(t *testing.T, c *dgo.Dgraph) { func SchemaQueryTestHTTP(t *testing.T, c *dgo.Dgraph) { ctx := context.Background() - require.NoError(t, c.Alter(ctx, &api.Operation{ - Schema: `name: string @index(exact) .`, - })) + op := &api.Operation{Schema: `name: string @index(exact) .`} + require.NoError(t, c.Alter(ctx, op)) + require.NoError(t, testutil.WaitForAlter(ctx, c, op.Schema)) txn := c.NewTxn() _, err := txn.Mutate(ctx, &api.Mutation{ @@ -570,12 +578,14 @@ func SchemaQueryTestHTTP(t *testing.T, c *dgo.Dgraph) { func FuzzyMatch(t *testing.T, c *dgo.Dgraph) { ctx := context.Background() - require.NoError(t, c.Alter(ctx, &api.Operation{ + op := &api.Operation{ Schema: ` term: string @index(trigram) . name: string . `, - })) + } + require.NoError(t, c.Alter(ctx, op)) + require.NoError(t, testutil.WaitForAlter(ctx, c, op.Schema)) txn := c.NewTxn() _, err := txn.Mutate(ctx, &api.Mutation{ @@ -714,11 +724,9 @@ func FuzzyMatch(t *testing.T, c *dgo.Dgraph) { func QueryHashIndex(t *testing.T, c *dgo.Dgraph) { ctx := context.Background() - require.NoError(t, c.Alter(ctx, &api.Operation{ - Schema: ` - name: string @index(hash) @lang . - `, - })) + op := &api.Operation{Schema: `name: string @index(hash) @lang .`} + require.NoError(t, c.Alter(ctx, op)) + require.NoError(t, testutil.WaitForAlter(ctx, c, op.Schema)) txn := c.NewTxn() _, err := txn.Mutate(ctx, &api.Mutation{ @@ -827,11 +835,9 @@ func QueryHashIndex(t *testing.T, c *dgo.Dgraph) { func RegexpToggleTrigramIndex(t *testing.T, c *dgo.Dgraph) { ctx := context.Background() - require.NoError(t, c.Alter(ctx, &api.Operation{ - Schema: ` - name: string @index(term) @lang . - `, - })) + op := &api.Operation{Schema: `name: string @index(term) @lang .`} + require.NoError(t, c.Alter(ctx, op)) + require.NoError(t, testutil.WaitForAlter(ctx, c, op.Schema)) txn := c.NewTxn() _, err := txn.Mutate(ctx, &api.Mutation{ @@ -872,11 +878,9 @@ func RegexpToggleTrigramIndex(t *testing.T, c *dgo.Dgraph) { testutil.CompareJSON(t, tc.out, string(resp.Json)) } - require.NoError(t, c.Alter(ctx, &api.Operation{ - Schema: ` - name: string @index(trigram) @lang . - `, - })) + op = &api.Operation{Schema: `name: string @index(trigram) @lang .`} + require.NoError(t, c.Alter(ctx, op)) + require.NoError(t, testutil.WaitForAlter(ctx, c, op.Schema)) t.Log("testing with trigram index") for _, tc := range tests { diff --git a/test.sh b/test.sh index 29732bcc883..70a299f0a42 100755 --- a/test.sh +++ b/test.sh @@ -256,6 +256,9 @@ if [[ :${TEST_SET}: == *:systest:* ]]; then Info "Running rebuilding index test" RunCmd ./systest/1million/test-reindex.sh || TestFailed + + Info "Running background index test" + RunCmd ./systest/bgindex/test-bgindex.sh || TestFailed fi Info "Stopping cluster" diff --git a/testutil/client.go b/testutil/client.go index 1570ce536ed..a9ea035ff68 100644 --- a/testutil/client.go +++ b/testutil/client.go @@ -25,6 +25,7 @@ import ( "net/http" "os" "os/exec" + "reflect" "strconv" "strings" "testing" @@ -32,6 +33,8 @@ import ( "github.com/dgraph-io/dgo/v2" "github.com/dgraph-io/dgo/v2/protos/api" + "github.com/dgraph-io/dgraph/protos/pb" + "github.com/dgraph-io/dgraph/schema" "github.com/dgraph-io/dgraph/x" "github.com/pkg/errors" "github.com/spf13/viper" @@ -106,7 +109,9 @@ func DgraphClientWithGroot(serviceAddr string) (*dgo.Dgraph, error) { for { // keep retrying until we succeed or receive a non-retriable error err = dg.Login(ctx, x.GrootId, "password") - if err == nil || !strings.Contains(err.Error(), "Please retry") { + if err == nil || !(strings.Contains(err.Error(), "Please retry") || + strings.Contains(err.Error(), "user not found")) { + break } time.Sleep(time.Second) @@ -158,6 +163,60 @@ func DropAll(t *testing.T, dg *dgo.Dgraph) { require.NoError(t, err) } +// SameIndexes checks whether SchemaUpdate and SchemaNode have same indexes. +func SameIndexes(su *pb.SchemaUpdate, n *pb.SchemaNode) bool { + if (su.Directive == pb.SchemaUpdate_REVERSE) != n.Reverse { + return false + } + if !reflect.DeepEqual(su.Tokenizer, n.Tokenizer) { + return false + } + if su.Count != n.Count { + return false + } + return true +} + +// WaitForAlter waits for schema to have the same indexes as the given schema. +func WaitForAlter(ctx context.Context, dg *dgo.Dgraph, s string) error { + ps, err := schema.Parse(s) + if err != nil { + return err + } + + for { + resp, err := dg.NewReadOnlyTxn().Query(ctx, "schema{}") + if err != nil { + return err + } + + var result struct { + Schema []*pb.SchemaNode + } + if err := json.Unmarshal(resp.Json, &result); err != nil { + return err + } + + actual := make(map[string]*pb.SchemaNode) + for _, rs := range result.Schema { + actual[rs.Predicate] = rs + } + + done := true + for _, su := range ps.Preds { + if n, ok := actual[su.Predicate]; !ok || !SameIndexes(su, n) { + done = false + break + } + } + if done { + return nil + } + + time.Sleep(time.Second) + } +} + // RetryQuery will retry a query until it succeeds or a non-retryable error is received. func RetryQuery(dg *dgo.Dgraph, q string) (*api.Response, error) { for { @@ -166,6 +225,7 @@ func RetryQuery(dg *dgo.Dgraph, q string) (*api.Response, error) { time.Sleep(10 * time.Millisecond) continue } + return resp, err } } @@ -306,11 +366,7 @@ type curlOutput struct { Errors []curlErrorEntry `json:"errors"` } -func verifyOutput(t *testing.T, bytes []byte, failureConfig *CurlFailureConfig) { - output := curlOutput{} - require.NoError(t, json.Unmarshal(bytes, &output), - "unable to unmarshal the curl output") - +func verifyOutput(t *testing.T, output curlOutput, failureConfig *CurlFailureConfig) { if failureConfig.ShouldFail { require.True(t, len(output.Errors) > 0, "no error entry found") if len(failureConfig.DgraphErrMsg) > 0 { @@ -327,21 +383,32 @@ func verifyOutput(t *testing.T, bytes []byte, failureConfig *CurlFailureConfig) // VerifyCurlCmd executes the curl command with the given arguments and verifies // the result against the expected output. -func VerifyCurlCmd(t *testing.T, args []string, - failureConfig *CurlFailureConfig) { - queryCmd := exec.Command("curl", args...) - - output, err := queryCmd.Output() - if len(failureConfig.CurlErrMsg) > 0 { - // the curl command should have returned an non-zero code - require.Error(t, err, "the curl command should have failed") - if ee, ok := err.(*exec.ExitError); ok { - require.True(t, strings.Contains(string(ee.Stderr), failureConfig.CurlErrMsg), - "the curl output does not contain the expected output") +func VerifyCurlCmd(t *testing.T, args []string, failureConfig *CurlFailureConfig) { + for { + queryCmd := exec.Command("curl", args...) + output, err := queryCmd.Output() + if len(failureConfig.CurlErrMsg) > 0 { + // the curl command should have returned an non-zero code + require.Error(t, err, "the curl command should have failed") + if ee, ok := err.(*exec.ExitError); ok { + require.True(t, strings.Contains(string(ee.Stderr), failureConfig.CurlErrMsg), + "the curl output does not contain the expected output") + } + return } - } else { + require.NoError(t, err, "the curl command should have succeeded") - verifyOutput(t, output, failureConfig) + co := curlOutput{} + require.NoError(t, json.Unmarshal(output, &co), + "unable to unmarshal the curl output") + if len(co.Errors) > 0 { + if strings.Contains(co.Errors[0].Message, "schema is already being modified") { + time.Sleep(time.Second) + continue + } + } + verifyOutput(t, co, failureConfig) + return } } diff --git a/tlstest/acl/acl_over_tls_test.go b/tlstest/acl/acl_over_tls_test.go index 13295913cf5..788757dfe0f 100644 --- a/tlstest/acl/acl_over_tls_test.go +++ b/tlstest/acl/acl_over_tls_test.go @@ -5,7 +5,9 @@ import ( "crypto/tls" "crypto/x509" "io/ioutil" + "strings" "testing" + "time" "github.com/dgraph-io/dgo/v2" "github.com/dgraph-io/dgo/v2/protos/api" @@ -103,8 +105,15 @@ func TestLoginOverTLS(t *testing.T) { if err != nil { t.Fatalf("Unable to get dgraph client: %s", err.Error()) } - if err := dg.Login(context.Background(), "groot", "password"); err != nil { - t.Fatalf("Unable to login using the groot account: %v", err.Error()) + for { + err := dg.Login(context.Background(), "groot", "password") + if err == nil { + break + } else if err != nil && !strings.Contains(err.Error(), "user not found") { + t.Fatalf("Unable to login using the groot account: %v", err.Error()) + } + + time.Sleep(time.Second) } // Output: diff --git a/wiki/content/clients/index.md b/wiki/content/clients/index.md index a683f0d1db5..76a5811cc96 100644 --- a/wiki/content/clients/index.md +++ b/wiki/content/clients/index.md @@ -572,6 +572,11 @@ type Person { If all goes well, the response should be `{"code":"Success","message":"Done"}`. +We build indexes in the background so that mutations and queries are not blocked. +In such a case, the new schema may not be reflected right away. You could poll the +schema to check whether indexing has been completed. New alter requests will be +rejected until the background indexing task is finished. + Other operations can be performed via the `/alter` endpoint as well. A specific predicate or the entire database can be dropped. diff --git a/worker/draft.go b/worker/draft.go index a42605f883a..ef90cfa2ecc 100644 --- a/worker/draft.go +++ b/worker/draft.go @@ -199,18 +199,22 @@ func (n *node) applyMutations(ctx context.Context, proposal *pb.Proposal) (rerr if proposal.Mutations.StartTs == 0 { return errors.New("StartTs must be provided") } - startTs := proposal.Mutations.StartTs if len(proposal.Mutations.Schema) > 0 || len(proposal.Mutations.Types) > 0 { + // MaxAssigned would ensure that everything that's committed up until this point + // would be picked up in building indexes. Any uncommitted txns would be cancelled + // by detectPendingTxns below. + startTs := posting.Oracle().MaxAssigned() + span.Annotatef(nil, "Applying schema and types") for _, supdate := range proposal.Mutations.Schema { // We should not need to check for predicate move here. if err := detectPendingTxns(supdate.Predicate); err != nil { return err } - if err := runSchemaMutation(ctx, supdate, startTs); err != nil { - return err - } + } + if err := runSchemaMutation(ctx, proposal.Mutations.Schema, startTs); err != nil { + return err } for _, tupdate := range proposal.Mutations.Types { diff --git a/worker/groups.go b/worker/groups.go index fe56644f8db..33f156586f2 100644 --- a/worker/groups.go +++ b/worker/groups.go @@ -198,6 +198,7 @@ func (g *groupi) proposeInitialTypes() { func (g *groupi) proposeInitialSchema() { initialSchema := schema.InitialSchema() + ctx := context.Background() for _, s := range initialSchema { if gid, err := g.BelongsToReadOnly(s.Predicate, 0); err != nil { glog.Errorf("Error getting tablet for predicate %s. Will force schema proposal.", @@ -205,7 +206,7 @@ func (g *groupi) proposeInitialSchema() { g.upsertSchema(s, nil) } else if gid == 0 { g.upsertSchema(s, nil) - } else if curr, _ := schema.State().Get(s.Predicate); gid == g.groupId() && + } else if curr, _ := schema.State().Get(ctx, s.Predicate); gid == g.groupId() && !proto.Equal(s, &curr) { // If this tablet is served to the group, do not upsert the schema unless the // stored schema and the proposed one are different. diff --git a/worker/mutation.go b/worker/mutation.go index 7ce50912bc0..af931218c00 100644 --- a/worker/mutation.go +++ b/worker/mutation.go @@ -20,6 +20,7 @@ import ( "bytes" "context" "math" + "sync" "time" "github.com/dgraph-io/badger/v2" @@ -55,10 +56,11 @@ func isDeletePredicateEdge(edge *pb.DirectedEdge) bool { // runMutation goes through all the edges and applies them. func runMutation(ctx context.Context, edge *pb.DirectedEdge, txn *posting.Txn) error { + ctx = schema.GetWriteContext(ctx) + // We shouldn't check whether this Alpha serves this predicate or not. Membership information // isn't consistent across the entire cluster. We should just apply whatever is given to us. - - su, ok := schema.State().Get(edge.Attr) + su, ok := schema.State().Get(ctx, edge.Attr) if edge.Op == pb.DirectedEdge_SET { if !ok { return errors.Errorf("runMutation: Unable to find schema for %s", edge.Attr) @@ -115,65 +117,115 @@ func runMutation(ctx context.Context, edge *pb.DirectedEdge, txn *posting.Txn) e return plist.AddMutationWithIndex(ctx, edge, txn) } -// This is serialized with mutations, called after applied watermarks catch up -// and further mutations are blocked until this is done. -func runSchemaMutation(ctx context.Context, update *pb.SchemaUpdate, startTs uint64) error { - if err := runSchemaMutationHelper(ctx, update, startTs); err != nil { - // on error, we restore the memory state to be the same as the disk - maxRetries := 10 - loadErr := x.RetryUntilSuccess(maxRetries, 10*time.Millisecond, func() error { - return schema.Load(update.Predicate) - }) - - if loadErr != nil { - glog.Fatalf("failed to load schema after %d retries: %v", maxRetries, loadErr) +func runSchemaMutation(ctx context.Context, updates []*pb.SchemaUpdate, startTs uint64) error { + // Wait until schema modification for all predicates is complete. There cannot be two + // background tasks running as this is a race condition. We typically won't propose an + // index update if one is already going on. If that's not the case, then the receiver + // of the update had probably finished the previous index update but some follower + // (or perhaps leader) had not finished it. + // In other words, the proposer checks whether there is another indexing in progress. + // If that's the case, the alter request is rejected. Otherwise, the request is accepted. + // Before reaching here, the proposer P would have checked that no indexing is in progress + // (could also be because proposer was done earlier than others). If P was still indexing + // when the req was received, it would have rejected the Alter request. Only if P is + // not indexing, it would accept and propose the request. + // It is possible that a receiver R of the proposal is still indexing. In that case, R would + // block here and wait for indexing to be finished. + for { + if !schema.State().IndexingInProgress() { + break } - return err + glog.Infoln("waiting for indexing to complete") + time.Sleep(time.Second * 2) } - return updateSchema(update) -} + buildIndexesHelper := func(update *pb.SchemaUpdate, rebuild posting.IndexRebuild) error { + wrtCtx := schema.GetWriteContext(context.Background()) + if err := rebuild.BuildIndexes(wrtCtx); err != nil { + return err + } + if err := updateSchema(update); err != nil { + return err + } -func runSchemaMutationHelper(ctx context.Context, update *pb.SchemaUpdate, startTs uint64) error { - if tablet, err := groups().Tablet(update.Predicate); err != nil { - return err - } else if tablet.GetGroupId() != groups().groupId() { - return errors.Errorf("Tablet isn't being served by this group. Tablet: %+v", tablet) + glog.Infof("Done schema update %+v\n", update) + return nil } - if err := checkSchema(update); err != nil { - return err + // This wg allows waiting until setup for all the predicates is complete + // befor running buildIndexes for any of those predicates. + var wg sync.WaitGroup + wg.Add(1) + defer wg.Done() + buildIndexes := func(update *pb.SchemaUpdate, rebuild posting.IndexRebuild) { + // We should only start building indexes once this function has returned. + // This is in order to ensure that we do not call DropPrefix for one predicate + // and write indexes for another predicate simultaneously. because that could + // cause writes to badger to fail leading to undesired indexing failures. + wg.Wait() + + // undo schema changes in case re-indexing fails. + if err := buildIndexesHelper(update, rebuild); err != nil { + glog.Errorf("error in building indexes, aborting :: %v\n", err) + + maxRetries := 10 + loadErr := x.RetryUntilSuccess(maxRetries, 10*time.Millisecond, func() error { + return schema.Load(update.Predicate) + }) + + if loadErr != nil { + glog.Fatalf("failed to load schema after %d retries: %v", maxRetries, loadErr) + } + } } - old, _ := schema.State().Get(update.Predicate) - // Sets only in memory, we will update it on disk only after schema mutations - // are successful and written to disk. - schema.State().Set(update.Predicate, update) - - // Once we remove index or reverse edges from schema, even though the values - // are present in db, they won't be used due to validation in work/task.go - - // We don't want to use sync watermarks for background removal, because it would block - // linearizable read requests. Only downside would be on system crash, stale edges - // might remain, which is ok. - - // Indexing can't be done in background as it can cause race conditons with new - // index mutations (old set and new del) - // We need watermark for index/reverse edge addition for linearizable reads. - // (both applied and synced watermarks). - defer glog.Infof("Done schema update %+v\n", update) - rebuild := posting.IndexRebuild{ - Attr: update.Predicate, - StartTs: startTs, - OldSchema: &old, - CurrentSchema: update, - } - return rebuild.Run(ctx) + + for _, su := range updates { + if tablet, err := groups().Tablet(su.Predicate); err != nil { + return err + } else if tablet.GetGroupId() != groups().groupId() { + return errors.Errorf("Tablet isn't being served by this group. Tablet: %+v", tablet) + } + + if err := checkSchema(su); err != nil { + return err + } + + old, _ := schema.State().Get(ctx, su.Predicate) + rebuild := posting.IndexRebuild{ + Attr: su.Predicate, + StartTs: startTs, + OldSchema: &old, + CurrentSchema: su, + } + querySchema := rebuild.GetQuerySchema() + // Sets the schema only in memory. The schema is written to + // disk only after schema mutations are successful. + schema.State().Set(su.Predicate, querySchema) + schema.State().SetMutSchema(su.Predicate, su) + + // TODO(Aman): If we return an error, we may not have right schema reflected. + if err := rebuild.DropIndexes(ctx); err != nil { + return err + } + if err := rebuild.BuildData(ctx); err != nil { + return err + } + + if rebuild.NeedIndexRebuild() { + go buildIndexes(su, rebuild) + } else if err := updateSchema(su); err != nil { + return err + } + } + + return nil } // updateSchema commits the schema to disk in blocking way, should be ok because this happens // only during schema mutations or we see a new predicate. func updateSchema(s *pb.SchemaUpdate) error { schema.State().Set(s.Predicate, s) + schema.State().DeleteMutSchema(s.Predicate) txn := pstore.NewTransactionAt(1, true) defer txn.Discard() data, err := s.Marshal() @@ -190,9 +242,11 @@ func updateSchema(s *pb.SchemaUpdate) error { } func createSchema(attr string, typ types.TypeID, hint pb.Metadata_HintType) error { + ctx := schema.GetWriteContext(context.Background()) + // Don't overwrite schema blindly, acl's might have been set even though // type is not present - s, ok := schema.State().Get(attr) + s, ok := schema.State().Get(ctx, attr) if ok { s.ValueType = typ.Enum() } else { diff --git a/worker/proposal.go b/worker/proposal.go index befbd582564..b4d43fb3f56 100644 --- a/worker/proposal.go +++ b/worker/proposal.go @@ -158,12 +158,13 @@ func (n *node) proposeAndWait(ctx context.Context, proposal *pb.Proposal) (perr // Do a type check here if schema is present // In very rare cases invalid entries might pass through raft, which would // be persisted, we do best effort schema check while writing + ctx = schema.GetWriteContext(ctx) if proposal.Mutations != nil { for _, edge := range proposal.Mutations.Edges { if err := checkTablet(edge.Attr); err != nil { return err } - su, ok := schema.State().Get(edge.Attr) + su, ok := schema.State().Get(ctx, edge.Attr) if !ok { continue } else if err := ValidateAndConvert(edge, &su); err != nil { diff --git a/worker/schema.go b/worker/schema.go index 22279390f8e..f2599a8b50c 100644 --- a/worker/schema.go +++ b/worker/schema.go @@ -89,20 +89,21 @@ func populateSchema(attr string, fields []string) *pb.SchemaNode { return nil } schemaNode.Predicate = attr + ctx := context.Background() for _, field := range fields { switch field { case "type": schemaNode.Type = typ.Name() case "index": - schemaNode.Index = schema.State().IsIndexed(attr) + schemaNode.Index = schema.State().IsIndexed(ctx, attr) case "tokenizer": - if schema.State().IsIndexed(attr) { - schemaNode.Tokenizer = schema.State().TokenizerNames(attr) + if schema.State().IsIndexed(ctx, attr) { + schemaNode.Tokenizer = schema.State().TokenizerNames(ctx, attr) } case "reverse": - schemaNode.Reverse = schema.State().IsReversed(attr) + schemaNode.Reverse = schema.State().IsReversed(ctx, attr) case "count": - schemaNode.Count = schema.State().HasCount(attr) + schemaNode.Count = schema.State().HasCount(ctx, attr) case "list": schemaNode.List = schema.State().IsList(attr) case "upsert": diff --git a/worker/sort.go b/worker/sort.go index eb26c656867..7b8d91fee72 100644 --- a/worker/sort.go +++ b/worker/sort.go @@ -197,11 +197,11 @@ func sortWithIndex(ctx context.Context, ts *pb.SortMessage) *sortresult { } // Get the tokenizers and choose the corresponding one. - if !schema.State().IsIndexed(order.Attr) { + if !schema.State().IsIndexed(ctx, order.Attr) { return resultWithError(errors.Errorf("Attribute %s is not indexed.", order.Attr)) } - tokenizers := schema.State().Tokenizer(order.Attr) + tokenizers := schema.State().Tokenizer(ctx, order.Attr) var tokenizer tok.Tokenizer for _, t := range tokenizers { // Get the first sortable index. diff --git a/worker/task.go b/worker/task.go index b8d0fae7da7..b629c42483d 100644 --- a/worker/task.go +++ b/worker/task.go @@ -898,16 +898,16 @@ func (qs *queryState) helpProcessTask(ctx context.Context, q *pb.Query, gid uint out := new(pb.Result) attr := q.Attr - srcFn, err := parseSrcFn(q) + srcFn, err := parseSrcFn(ctx, q) if err != nil { return nil, err } - if q.Reverse && !schema.State().IsReversed(attr) { + if q.Reverse && !schema.State().IsReversed(ctx, attr) { return nil, errors.Errorf("Predicate %s doesn't have reverse edge", attr) } - if needsIndex(srcFn.fnType, q.UidList) && !schema.State().IsIndexed(q.Attr) { + if needsIndex(srcFn.fnType, q.UidList) && !schema.State().IsIndexed(ctx, q.Attr) { return nil, errors.Errorf("Predicate %s is not indexed", q.Attr) } @@ -968,7 +968,7 @@ func (qs *queryState) helpProcessTask(ctx context.Context, q *pb.Query, gid uint if srcFn.fnType == compareScalarFn && srcFn.isFuncAtRoot { span.Annotate(nil, "handleCompareScalarFunction") - if err := qs.handleCompareScalarFunction(args); err != nil { + if err := qs.handleCompareScalarFunction(ctx, args); err != nil { return nil, err } } @@ -1034,9 +1034,9 @@ func needsStringFiltering(srcFn *functionContext, langs []string, attr string) b srcFn.fnType == customIndexFn) } -func (qs *queryState) handleCompareScalarFunction(arg funcArgs) error { +func (qs *queryState) handleCompareScalarFunction(ctx context.Context, arg funcArgs) error { attr := arg.q.Attr - if ok := schema.State().HasCount(attr); !ok { + if ok := schema.State().HasCount(ctx, attr); !ok { return errors.Errorf("Need @count directive in schema for attr: %s for fn: %s at root", attr, arg.srcFn.fname) } @@ -1069,7 +1069,7 @@ func (qs *queryState) handleRegexFunction(ctx context.Context, arg funcArgs) err if typ != types.StringID { return errors.Errorf("Got non-string type. Regex match is allowed only on string type.") } - useIndex := schema.State().HasTokenizer(tok.IdentTrigram, attr) + useIndex := schema.State().HasTokenizer(ctx, tok.IdentTrigram, attr) span.Annotatef(nil, "Trigram index found: %t, func at root: %t", useIndex, arg.srcFn.isFuncAtRoot) @@ -1172,7 +1172,7 @@ func (qs *queryState) handleCompareFunction(ctx context.Context, arg funcArgs) e attr := arg.q.Attr span.Annotatef(nil, "Attr: %s. Fname: %s", attr, arg.srcFn.fname) - tokenizer, err := pickTokenizer(attr, arg.srcFn.fname) + tokenizer, err := pickTokenizer(ctx, attr, arg.srcFn.fname) if err != nil { return err } @@ -1311,7 +1311,7 @@ func (qs *queryState) handleMatchFunction(ctx context.Context, arg funcArgs) err case arg.q.UidList != nil && len(arg.q.UidList.Uids) != 0: uids = arg.q.UidList - case schema.State().HasTokenizer(tok.IdentTrigram, attr): + case schema.State().HasTokenizer(ctx, tok.IdentTrigram, attr): var err error uids, err = uidsForMatch(attr, arg) if err != nil { @@ -1613,11 +1613,11 @@ func langForFunc(langs []string) string { return langs[0] } -func parseSrcFn(q *pb.Query) (*functionContext, error) { +func parseSrcFn(ctx context.Context, q *pb.Query) (*functionContext, error) { fnType, f := parseFuncType(q.SrcFunc) attr := q.Attr fc := &functionContext{fnType: fnType, fname: f} - isIndexedAttr := schema.State().IsIndexed(attr) + isIndexedAttr := schema.State().IsIndexed(ctx, attr) var err error t, err := schema.State().TypeOf(attr) @@ -1673,7 +1673,7 @@ func parseSrcFn(q *pb.Query) (*functionContext, error) { } // Get tokens ge / le ineqValueToken. - if tokens, fc.ineqValueToken, err = getInequalityTokens(q.ReadTs, attr, f, lang, + if tokens, fc.ineqValueToken, err = getInequalityTokens(ctx, q.ReadTs, attr, f, lang, fc.ineqValue); err != nil { return nil, err } @@ -1727,7 +1727,7 @@ func parseSrcFn(q *pb.Query) (*functionContext, error) { if err = ensureArgsCount(q.SrcFunc, 1); err != nil { return nil, err } - required, found := verifyStringIndex(attr, fnType) + required, found := verifyStringIndex(ctx, attr, fnType) if !found { return nil, errors.Errorf("Attribute %s is not indexed with type %s", attr, required) } @@ -1740,7 +1740,7 @@ func parseSrcFn(q *pb.Query) (*functionContext, error) { if err = ensureArgsCount(q.SrcFunc, 2); err != nil { return nil, err } - required, found := verifyStringIndex(attr, fnType) + required, found := verifyStringIndex(ctx, attr, fnType) if !found { return nil, errors.Errorf("Attribute %s is not indexed with type %s", attr, required) } @@ -1763,7 +1763,7 @@ func parseSrcFn(q *pb.Query) (*functionContext, error) { return nil, err } tokerName := q.SrcFunc.Args[0] - if !verifyCustomIndex(q.Attr, tokerName) { + if !verifyCustomIndex(ctx, q.Attr, tokerName) { return nil, errors.Errorf("Attribute %s is not indexed with custom tokenizer %s", q.Attr, tokerName) } diff --git a/worker/tokens.go b/worker/tokens.go index e7339a519a9..d3946a29ab9 100644 --- a/worker/tokens.go +++ b/worker/tokens.go @@ -17,18 +17,19 @@ package worker import ( - "github.com/dgraph-io/badger/v2" - "bytes" + "context" + + "github.com/pkg/errors" + "github.com/dgraph-io/badger/v2" "github.com/dgraph-io/dgraph/schema" "github.com/dgraph-io/dgraph/tok" "github.com/dgraph-io/dgraph/types" "github.com/dgraph-io/dgraph/x" - "github.com/pkg/errors" ) -func verifyStringIndex(attr string, funcType FuncType) (string, bool) { +func verifyStringIndex(ctx context.Context, attr string, funcType FuncType) (string, bool) { var requiredTokenizer tok.Tokenizer switch funcType { case fullTextSearchFn: @@ -39,12 +40,12 @@ func verifyStringIndex(attr string, funcType FuncType) (string, bool) { requiredTokenizer = tok.TermTokenizer{} } - if !schema.State().IsIndexed(attr) { + if !schema.State().IsIndexed(ctx, attr) { return requiredTokenizer.Name(), false } id := requiredTokenizer.Identifier() - for _, t := range schema.State().Tokenizer(attr) { + for _, t := range schema.State().Tokenizer(ctx, attr) { if t.Identifier() == id { return requiredTokenizer.Name(), true } @@ -52,11 +53,11 @@ func verifyStringIndex(attr string, funcType FuncType) (string, bool) { return requiredTokenizer.Name(), false } -func verifyCustomIndex(attr string, tokenizerName string) bool { - if !schema.State().IsIndexed(attr) { +func verifyCustomIndex(ctx context.Context, attr string, tokenizerName string) bool { + if !schema.State().IsIndexed(ctx, attr) { return false } - for _, t := range schema.State().Tokenizer(attr) { + for _, t := range schema.State().Tokenizer(ctx, attr) { if t.Identifier() >= tok.IdentCustom && t.Name() == tokenizerName { return true } @@ -76,13 +77,13 @@ func getStringTokens(funcArgs []string, lang string, funcType FuncType) ([]strin return tok.GetTermTokens(funcArgs) } -func pickTokenizer(attr string, f string) (tok.Tokenizer, error) { +func pickTokenizer(ctx context.Context, attr string, f string) (tok.Tokenizer, error) { // Get the tokenizers and choose the corresponding one. - if !schema.State().IsIndexed(attr) { + if !schema.State().IsIndexed(ctx, attr) { return nil, errors.Errorf("Attribute %s is not indexed.", attr) } - tokenizers := schema.State().Tokenizer(attr) + tokenizers := schema.State().Tokenizer(ctx, attr) for _, t := range tokenizers { // If function is eq and we found a tokenizer thats !Lossy(), lets return it switch f { @@ -110,9 +111,9 @@ func pickTokenizer(attr string, f string) (tok.Tokenizer, error) { // getInequalityTokens gets tokens ge / le compared to given token using the first sortable // index that is found for the predicate. -func getInequalityTokens(readTs uint64, attr, f, lang string, +func getInequalityTokens(ctx context.Context, readTs uint64, attr, f, lang string, ineqValue types.Val) ([]string, string, error) { - tokenizer, err := pickTokenizer(attr, f) + tokenizer, err := pickTokenizer(ctx, attr, f) if err != nil { return nil, "", err }