Skip to content

Commit

Permalink
Perform indexing in background (#4819)
Browse files Browse the repository at this point in the history
  • Loading branch information
mangalaman93 authored Mar 11, 2020
1 parent 9ee3a8a commit c1726f4
Show file tree
Hide file tree
Showing 43 changed files with 2,131 additions and 353 deletions.
5 changes: 3 additions & 2 deletions chunker/json_parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func (exp *Experiment) verify() {
require.NoError(exp.t, dg.Alter(ctx, &api.Operation{DropAll: true}), "drop all failed")
require.NoError(exp.t, dg.Alter(ctx, &api.Operation{Schema: exp.schema}),
"schema change failed")
require.NoError(exp.t, testutil.WaitForAlter(ctx, dg, exp.schema))

_, err = dg.NewTxn().Mutate(ctx,
&api.Mutation{Set: exp.nqs, CommitNow: true})
Expand Down Expand Up @@ -134,14 +135,14 @@ func TestNquadsFromJson1(t *testing.T) {
name
age
married
address
address
}}`,
expected: `{"alice": [
{"name": "Alice",
"age": 26,
"married": true,
"address": {"coordinates": [2,1.1], "type": "Point"}}
]}
]}
`}
exp.verify()
}
Expand Down
14 changes: 9 additions & 5 deletions contrib/integration/testtxn/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,9 @@ func TestIgnoreIndexConflict(t *testing.T) {
if err := s.dg.Alter(context.Background(), op); err != nil {
log.Fatal(err)
}
if err := testutil.WaitForAlter(context.Background(), s.dg, op.Schema); err != nil {
log.Fatal(err)
}

txn := s.dg.NewTxn()
mu := &api.Mutation{}
Expand Down Expand Up @@ -424,9 +427,11 @@ func TestReadIndexKeySameTxn(t *testing.T) {
if err := s.dg.Alter(context.Background(), op); err != nil {
log.Fatal(err)
}
if err := testutil.WaitForAlter(context.Background(), s.dg, op.Schema); err != nil {
log.Fatal(err)
}

txn := s.dg.NewTxn()

mu := &api.Mutation{
CommitNow: true,
SetJson: []byte(`{"name": "Manish"}`),
Expand Down Expand Up @@ -933,8 +938,7 @@ func TestTxnDiscardBeforeCommit(t *testing.T) {
}

func alterSchema(dg *dgo.Dgraph, schema string) {
op := api.Operation{}
op.Schema = schema
err := dg.Alter(ctxb, &op)
x.Check(err)
op := api.Operation{Schema: schema}
x.Check(dg.Alter(ctxb, &op))
x.Check(testutil.WaitForAlter(ctxb, dg, schema))
}
2 changes: 1 addition & 1 deletion dgraph/cmd/alpha/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func runGzipWithRetry(contentType, url string, buf io.Reader, gzReq, gzResp bool
*http.Response, error) {

client := &http.Client{}
numRetries := 2
numRetries := 3

var resp *http.Response
var err error
Expand Down
53 changes: 53 additions & 0 deletions dgraph/cmd/alpha/reindex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
package alpha

import (
"strings"
"testing"
"time"

"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -185,3 +187,54 @@ func TestReindexReverseCount(t *testing.T) {
}
}`, res)
}

func checkSchema(t *testing.T, query, key string) {
for i := 0; i < 10; i++ {
res, _, err := queryWithTs(query, "application/graphql+-", "", 0)
require.NoError(t, err)
if strings.Contains(res, key) {
return
}
time.Sleep(100 * time.Millisecond)

if i == 9 {
t.Fatalf("expected %v, got schema: %v", key, res)
}
}
}

func TestBgIndexSchemaReverse(t *testing.T) {
require.NoError(t, dropAll())
q1 := `schema(pred: [value]) {}`
require.NoError(t, alterSchema(`value: [uid] .`))
checkSchema(t, q1, "list")
require.NoError(t, alterSchema(`value: [uid] @count @reverse .`))
checkSchema(t, q1, "reverse")
}

func TestBgIndexSchemaTokenizers(t *testing.T) {
require.NoError(t, dropAll())
q1 := `schema(pred: [value]) {}`
require.NoError(t, alterSchema(`value: string @index(fulltext, hash) .`))
checkSchema(t, q1, "fulltext")
require.NoError(t, alterSchema(`value: string @index(term, hash) @upsert .`))
checkSchema(t, q1, "term")
}

func TestBgIndexSchemaCount(t *testing.T) {
require.NoError(t, dropAll())
q1 := `schema(pred: [value]) {}`
require.NoError(t, alterSchema(`value: [uid] @count .`))
checkSchema(t, q1, "count")
require.NoError(t, alterSchema(`value: [uid] @reverse .`))
checkSchema(t, q1, "reverse")
}

func TestBgIndexSchemaReverseAndCount(t *testing.T) {
require.NoError(t, dropAll())
q1 := `schema(pred: [value]) {}`
require.NoError(t, alterSchema(`value: [uid] @reverse .`))
checkSchema(t, q1, "reverse")
require.NoError(t, alterSchema(`value: [uid] @count .`))
checkSchema(t, q1, "count")
}
62 changes: 58 additions & 4 deletions dgraph/cmd/alpha/run_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,22 @@ func runJSONMutation(m string) error {
}

func alterSchema(s string) error {
_, _, err := runWithRetries("PUT", "", addr+"/alter", s)
if err != nil {
return errors.Wrapf(err, "while running request with retries")
for {
_, _, err := runWithRetries("PUT", "", addr+"/alter", s)
if err != nil && strings.Contains(err.Error(), "is already being modified") {
time.Sleep(time.Second)
continue
} else if err != nil {
return errors.Wrapf(err, "while running request with retries")
} else {
break
}
}

if err := waitForAlter(s); err != nil {
return errors.Wrapf(err, "while waiting for alter to complete")
}

return nil
}

Expand All @@ -124,6 +136,48 @@ func alterSchemaWithRetry(s string) error {
return err
}

// waitForAlter waits for the alter operation to complete.
func waitForAlter(s string) error {
ps, err := schema.Parse(s)
if err != nil {
return err
}

for {
resp, _, err := queryWithTs("schema{}", "application/graphql+-", "false", 0)
if err != nil {
return err
}

var result struct {
Data struct {
Schema []*pb.SchemaNode
}
}
if err := json.Unmarshal([]byte(resp), &result); err != nil {
return err
}

actual := make(map[string]*pb.SchemaNode)
for _, rs := range result.Data.Schema {
actual[rs.Predicate] = rs
}

done := true
for _, su := range ps.Preds {
if n, ok := actual[su.Predicate]; !ok || !testutil.SameIndexes(su, n) {
done = false
break
}
}
if done {
return nil
}

time.Sleep(time.Second)
}
}

func dropAll() error {
op := `{"drop_all": true}`
_, _, err := runWithRetries("PUT", "", addr+"/alter", op)
Expand Down Expand Up @@ -405,7 +459,7 @@ func TestSchemaMutationUidError1(t *testing.T) {
var s2 = `
friend: uid .
`
require.Error(t, alterSchemaWithRetry(s2))
require.Error(t, alterSchema(s2))
}

// add index
Expand Down
2 changes: 0 additions & 2 deletions dgraph/cmd/alpha/upsert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1613,8 +1613,6 @@ func TestUpsertWithValueVar(t *testing.T) {
require.NoError(t, alterSchema(`amount: int .`))
res, err := mutationWithTs(`{ set { _:p <amount> "0" . } }`, "application/rdf", false, true, 0)
require.NoError(t, err)
b, _ := json.MarshalIndent(res, "", " ")
fmt.Printf("%s\n", b)

const (
// this upsert block increments the value of the counter by one
Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/live/load-json/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (
"strings"
"testing"

"github.com/dgraph-io/dgo/v2"
"github.com/stretchr/testify/require"

"github.com/dgraph-io/dgo/v2"
"github.com/dgraph-io/dgraph/testutil"
"github.com/dgraph-io/dgraph/x"
)
Expand Down
7 changes: 6 additions & 1 deletion dgraph/cmd/live/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/dgraph-io/dgo/v2/protos/api"

"github.com/dgraph-io/dgraph/chunker"
"github.com/dgraph-io/dgraph/testutil"
"github.com/dgraph-io/dgraph/types"
"github.com/dgraph-io/dgraph/x"
"github.com/dgraph-io/dgraph/xidmap"
Expand Down Expand Up @@ -194,7 +195,11 @@ func processSchemaFile(ctx context.Context, file string, dgraphClient *dgo.Dgrap

op := &api.Operation{}
op.Schema = string(b)
return dgraphClient.Alter(ctx, op)
if err := dgraphClient.Alter(ctx, op); err != nil {
return err
}
// TODO(Aman): avoid using functions from testutil.
return testutil.WaitForAlter(ctx, dgraphClient, op.Schema)
}

func (l *loader) uid(val string) string {
Expand Down
34 changes: 21 additions & 13 deletions edgraph/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,20 @@ import (
"time"
"unicode"

"github.com/gogo/protobuf/jsonpb"
"github.com/golang/glog"
"github.com/pkg/errors"
ostats "go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.opencensus.io/trace"
otrace "go.opencensus.io/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"

"github.com/dgraph-io/dgo/v2"
"github.com/dgraph-io/dgo/v2/protos/api"

"github.com/dgraph-io/dgraph/chunker"
"github.com/dgraph-io/dgraph/conn"
"github.com/dgraph-io/dgraph/dgraph/cmd/zero"
Expand All @@ -44,18 +55,6 @@ import (
"github.com/dgraph-io/dgraph/types/facets"
"github.com/dgraph-io/dgraph/worker"
"github.com/dgraph-io/dgraph/x"
"github.com/gogo/protobuf/jsonpb"
"github.com/golang/glog"
"github.com/pkg/errors"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"

ostats "go.opencensus.io/stats"
"go.opencensus.io/tag"
"go.opencensus.io/trace"
otrace "go.opencensus.io/trace"
)

const (
Expand Down Expand Up @@ -88,6 +87,10 @@ var (
numGraphQL uint64
)

var (
errIndexingInProgress = errors.New("schema is already being modified. Please retry")
)

// Server implements protos.DgraphServer
type Server struct{}

Expand Down Expand Up @@ -241,6 +244,11 @@ func (s *Server) Alter(ctx context.Context, op *api.Operation) (*api.Payload, er
return empty, err
}

// If a background task is already running, we should reject all the new alter requests.
if schema.State().IndexingInProgress() {
return nil, errIndexingInProgress
}

for _, update := range result.Preds {
// Reserved predicates cannot be altered but let the update go through
// if the update is equal to the existing one.
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require (
github.com/blevesearch/snowballstem v0.0.0-20180110192139-26b06a2c243d // indirect
github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd
github.com/dgraph-io/badger/v2 v2.0.1-0.20191220102048-ab4352b00a17
github.com/dgraph-io/dgo/v2 v2.1.1-0.20191127085444-c7a02678e8a6
github.com/dgraph-io/dgo/v2 v2.2.0
github.com/dgraph-io/ristretto v0.0.1 // indirect
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/dgryski/go-farm v0.0.0-20191112170834-c2139c5d712b
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ github.com/dgraph-io/badger/v2 v2.0.1-0.20191220102048-ab4352b00a17 h1:BxXd8isFV
github.com/dgraph-io/badger/v2 v2.0.1-0.20191220102048-ab4352b00a17/go.mod h1:YoRSIp1LmAJ7zH7tZwRvjNMUYLxB4wl3ebYkaIruZ04=
github.com/dgraph-io/dgo/v2 v2.1.1-0.20191127085444-c7a02678e8a6 h1:5leDFqGys055YO3TbghBhk/QdRPEwyLPdgsSJfiR20I=
github.com/dgraph-io/dgo/v2 v2.1.1-0.20191127085444-c7a02678e8a6/go.mod h1:LJCkLxm5fUMcU+yb8gHFjHt7ChgNuz3YnQQ6MQkmscI=
github.com/dgraph-io/dgo/v2 v2.2.0 h1:qYbm6mEF3wuKiRpgNOldk6PmPbBJFwj6vL7I7dTSdyc=
github.com/dgraph-io/dgo/v2 v2.2.0/go.mod h1:LJCkLxm5fUMcU+yb8gHFjHt7ChgNuz3YnQQ6MQkmscI=
github.com/dgraph-io/ristretto v0.0.0-20191025175511-c1f00be0418e/go.mod h1:edzKIzGvqUCMzhTVWbiTSe75zD9Xxq0GtSBtFmaUTZs=
github.com/dgraph-io/ristretto v0.0.1 h1:cJwdnj42uV8Jg4+KLrYovLiCgIfz9wtWm6E6KA+1tLs=
github.com/dgraph-io/ristretto v0.0.1/go.mod h1:T40EBc7CJke8TkpiYfGGKAeFjSaxuFXhuXRyumBd6RE=
Expand Down
Loading

0 comments on commit c1726f4

Please sign in to comment.