Skip to content

Commit

Permalink
fix(bulk): upsert guardian/groot for all existing namespaces (#7759)
Browse files Browse the repository at this point in the history
Issue:
If the data was loaded into some namespace using bulk loader with --force-namespace, that data wasn't actually accessible.
This is because the users/groups were not created for it, if they were not present originally in the RDF.

Solution:
This PR fixes that by upserting the default user and password for that namespace, when the alpha starts (to be precise, whenever ACL is reset).
  • Loading branch information
NamanJain8 authored Apr 29, 2021
1 parent 018517b commit c4f4964
Show file tree
Hide file tree
Showing 12 changed files with 277 additions and 114 deletions.
2 changes: 2 additions & 0 deletions dgraph/cmd/bulk/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,7 @@ func run() {

loader.prog.mapEdgeCount = bulkMeta.EdgeCount
loader.schema.schemaMap = bulkMeta.SchemaMap
loader.schema.types = bulkMeta.Types
} else {
loader.mapStage()
mergeMapShardsIntoReduceShards(&opt)
Expand All @@ -336,6 +337,7 @@ func run() {
bulkMeta := pb.BulkMeta{
EdgeCount: loader.prog.mapEdgeCount,
SchemaMap: loader.schema.schemaMap,
Types: loader.schema.types,
}
bulkMetaData, err := bulkMeta.Marshal()
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions dgraph/cmd/bulk/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ func newSchemaStore(initial *schema.ParsedSchema, opt *options, state *state) *s
// whenever we see data for a new namespace.
s.checkAndSetInitialSchema(x.GalaxyNamespace)

s.types = initial.Types
// This is from the schema read from the schema file.
for _, sch := range initial.Preds {
p := sch.Predicate
Expand All @@ -63,8 +64,6 @@ func newSchemaStore(initial *schema.ParsedSchema, opt *options, state *state) *s
s.schemaMap[p] = sch
}

s.types = initial.Types

return s
}

Expand Down Expand Up @@ -102,6 +101,7 @@ func (s *schemaStore) checkAndSetInitialSchema(namespace uint64) {
for _, update := range schema.CompleteInitialSchema(namespace) {
s.schemaMap[update.Predicate] = update
}
s.types = append(s.types, schema.CompleteInitialTypes(namespace)...)

if s.opt.StoreXids {
s.schemaMap[x.NamespaceAttr(namespace, "xid")] = &pb.SchemaUpdate{
Expand Down
45 changes: 26 additions & 19 deletions edgraph/access_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,28 +413,35 @@ func ResetAcl(closer *z.Closer) {
// The acl feature is not turned on.
return
}
for closer.Ctx().Err() == nil {
ctx, cancel := context.WithTimeout(closer.Ctx(), time.Minute)
defer cancel()
ctx = x.AttachNamespace(ctx, x.GalaxyNamespace)
if err := upsertGuardian(ctx); err != nil {
glog.Infof("Unable to upsert the guardian group. Error: %v", err)
time.Sleep(100 * time.Millisecond)
continue

upsertGuardianAndGroot := func(ns uint64) {
for closer.Ctx().Err() == nil {
ctx, cancel := context.WithTimeout(closer.Ctx(), time.Minute)
defer cancel()
ctx = x.AttachNamespace(ctx, ns)
if err := upsertGuardian(ctx); err != nil {
glog.Infof("Unable to upsert the guardian group. Error: %v", err)
time.Sleep(100 * time.Millisecond)
continue
}
break
}
break
}

for closer.Ctx().Err() == nil {
ctx, cancel := context.WithTimeout(closer.Ctx(), time.Minute)
defer cancel()
ctx = x.AttachNamespace(ctx, x.GalaxyNamespace)
if err := upsertGroot(ctx, "password"); err != nil {
glog.Infof("Unable to upsert the groot account. Error: %v", err)
time.Sleep(100 * time.Millisecond)
continue
for closer.Ctx().Err() == nil {
ctx, cancel := context.WithTimeout(closer.Ctx(), time.Minute)
defer cancel()
ctx = x.AttachNamespace(ctx, ns)
if err := upsertGroot(ctx, "password"); err != nil {
glog.Infof("Unable to upsert the groot account. Error: %v", err)
time.Sleep(100 * time.Millisecond)
continue
}
break
}
break
}

for ns := range schema.State().Namespaces() {
upsertGuardianAndGroot(ns)
}
}

Expand Down
1 change: 1 addition & 0 deletions protos/pb.proto
Original file line number Diff line number Diff line change
Expand Up @@ -725,6 +725,7 @@ message UpdateGraphQLSchemaResponse {
message BulkMeta {
int64 edge_count = 1;
map<string, SchemaUpdate> schema_map = 2;
repeated TypeUpdate types = 3;
}

message DeleteNsRequest {
Expand Down
226 changes: 144 additions & 82 deletions protos/pb/pb.pb.go

Large diffs are not rendered by default.

16 changes: 16 additions & 0 deletions schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,22 @@ func (s *state) DeleteType(typeName string) error {
return nil
}

// Namespaces returns the active namespaces based on the current types.
func (s *state) Namespaces() map[uint64]struct{} {
if s == nil {
return nil
}

s.RLock()
defer s.RUnlock()

ns := make(map[uint64]struct{})
for typ := range s.types {
ns[x.ParseNamespace(typ)] = struct{}{}
}
return ns
}

// DeletePredsForNs deletes the predicate information for the namespace from the schema.
func (s *state) DeletePredsForNs(delNs uint64) {
if s == nil {
Expand Down
25 changes: 25 additions & 0 deletions systest/bulk_live/bulk/alpha_acl.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
version: "3.5"
services:
alpha1:
image: dgraph/dgraph:latest
working_dir: /data/alpha1
labels:
cluster: test
ports:
- "8080"
- "9080"
volumes:
- type: bind
source: $GOPATH/bin
target: /gobin
read_only: true
- type: bind
source: ./data/out/0/p
target: /posting
read_only: false
- type: bind
source: ../../../ee/acl/hmac-secret
target: /dgraph-acl/hmac-secret
read_only: true
command: /gobin/dgraph alpha --my=alpha1:7080 --zero=zero1:5080 --logtostderr -v=2 -p=/posting
--security "whitelist=0.0.0.0/0;" --acl "secret-file=/dgraph-acl/hmac-secret; "
4 changes: 4 additions & 0 deletions systest/bulk_live/bulk/bulk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,7 @@ import (
func TestBulkCases(t *testing.T) {
t.Run("bulk test cases", common.RunBulkCases)
}

func TestBulkCasesAcl(t *testing.T) {
t.Run("bulk test cases with acl", common.RunBulkCasesAcl)
}
25 changes: 25 additions & 0 deletions systest/bulk_live/common/bulk_live_cases.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"math"
"net/http"
"os"
"os/exec"
Expand Down Expand Up @@ -77,6 +78,29 @@ func RunBulkCases(t *testing.T) {
suite.cleanup(t)
}

func RunBulkCasesAcl(t *testing.T) {
opts := suiteOpts{
schema: helloWorldSchema,
gqlSchema: "",
rdfs: helloWorldData,
bulkSuite: true,
bulkOpts: bulkOpts{alpha: "../bulk/alpha_acl.yml", forceNs: 0x10},
}
suite := newSuiteInternal(t, opts)

t.Run("Pan and Jackson", testCaseWithAcl(`
{q(func: anyofterms(name, "Peter")) {
name
}}
`, `
{"q": [
{ "name": "Peter Pan" },
{ "name": "Peter Jackson" }
]}
`, "groot", "password", 0x10))
suite.cleanup(t)
}

// run this in sequential order. cleanup is necessary for live loader to work
func RunLiveCases(t *testing.T) {
suite := helloWorldSetup(t, false)
Expand Down Expand Up @@ -130,6 +154,7 @@ func remoteHelloWorldSetup(t *testing.T, isBulkLoader bool) *suite {
rdfs: helloWorldData,
bulkSuite: isBulkLoader,
remote: true,
bulkOpts: bulkOpts{alpha: "../bulk/alpha.yml", forceNs: math.MaxUint64},
})
}

Expand Down
30 changes: 27 additions & 3 deletions systest/bulk_live/common/bulk_live_fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package common
import (
"context"
"io/ioutil"
"math"
"os"
"path/filepath"
"testing"
Expand All @@ -42,9 +43,15 @@ type suiteOpts struct {
gqlSchema string
rdfs string
bulkSuite bool
bulkOpts bulkOpts
remote bool
}

type bulkOpts struct {
alpha string
forceNs uint64
}

func newSuiteInternal(t *testing.T, opts suiteOpts) *suite {
if testing.Short() {
t.Skip("Skipping system test with long runtime.")
Expand All @@ -54,7 +61,6 @@ func newSuiteInternal(t *testing.T, opts suiteOpts) *suite {
t: t,
opts: opts,
}

require.NoError(s.t, makeDirEmpty(rootDir))
rdfFile := filepath.Join(rootDir, "rdfs.rdf")
require.NoError(s.t, ioutil.WriteFile(rdfFile, []byte(opts.rdfs), 0644))
Expand Down Expand Up @@ -95,6 +101,7 @@ func newBulkOnlySuite(t *testing.T, schema, rdfs, gqlSchema string) *suite {
gqlSchema: gqlSchema,
rdfs: rdfs,
bulkSuite: true,
bulkOpts: bulkOpts{alpha: "../bulk/alpha.yml", forceNs: math.MaxUint64}, // preserve ns
}
return newSuiteInternal(t, opts)
}
Expand Down Expand Up @@ -125,10 +132,11 @@ func (s *suite) setup(t *testing.T, schemaFile, rdfFile, gqlSchemaFile string) {
GQLSchemaFile: gqlSchemaFile,
Dir: rootDir,
Env: env,
Namespace: s.opts.bulkOpts.forceNs,
})

require.NoError(t, err)
err = testutil.StartAlphas("../bulk/alpha.yml")
err = testutil.StartAlphas(s.opts.bulkOpts.alpha)
require.NoError(t, err)
return
}
Expand Down Expand Up @@ -157,7 +165,7 @@ func (s *suite) cleanup(t *testing.T) {
// NOTE: Shouldn't raise any errors here or fail a test, since this is
// called when we detect an error (don't want to mask the original problem).
if s.opts.bulkSuite {
isRace := testutil.StopAlphasAndDetectRace("../bulk/alpha.yml")
isRace := testutil.StopAlphasAndDetectRace(s.opts.bulkOpts.alpha)
_ = os.RemoveAll(rootDir)
if isRace {
t.Fatalf("Failing because race condition is detected. " +
Expand Down Expand Up @@ -188,3 +196,19 @@ func testCase(query, wantResult string) func(*testing.T) {
testutil.CompareJSON(t, wantResult, string(resp.GetJson()))
}
}

func testCaseWithAcl(query, wantResult, user, password string, ns uint64) func(*testing.T) {
return func(t *testing.T) {
// Check results of the bulk loader.
dg, err := testutil.DgraphClient(testutil.ContainerAddr("alpha1", 9080))
require.NoError(t, err)
ctx2, cancel2 := context.WithTimeout(context.Background(), time.Minute)
defer cancel2()
require.NoError(t, dg.LoginIntoNamespace(ctx2, user, password, ns))

txn := dg.NewTxn()
resp, err := txn.Query(ctx2, query)
require.NoError(t, err)
testutil.CompareJSON(t, wantResult, string(resp.GetJson()))
}
}
8 changes: 5 additions & 3 deletions testutil/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,10 @@ func LiveLoad(opts LiveOpts) error {
"--alpha", opts.Alpha,
"--zero", opts.Zero,
}
if opts.Creds.Namespace == x.GalaxyNamespace || opts.ForceNs != 0 {
args = append(args, "--force-namespace", strconv.FormatInt(opts.ForceNs, 10))
}
if opts.Creds != nil {
if opts.Creds.Namespace == x.GalaxyNamespace || opts.ForceNs != 0 {
args = append(args, "--force-namespace", strconv.FormatInt(opts.ForceNs, 10))
}
args = append(args, "--creds")
args = append(args, fmt.Sprintf("user=%s;password=%s;namespace=%d",
opts.Creds.UserID, opts.Creds.Passwd, opts.Creds.Namespace))
Expand Down Expand Up @@ -85,6 +85,7 @@ type BulkOpts struct {
GQLSchemaFile string
Dir string
Env []string
Namespace uint64
}

func BulkLoad(opts BulkOpts) error {
Expand All @@ -97,6 +98,7 @@ func BulkLoad(opts BulkOpts) error {
"--map_shards="+strconv.Itoa(opts.Shards),
"--store_xids=true",
"--zero", opts.Zero,
"--force-namespace", strconv.FormatUint(opts.Namespace, 10),
)

if opts.Dir != "" {
Expand Down
5 changes: 0 additions & 5 deletions x/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,24 +84,20 @@ func GalaxyAttr(attr string) string {

// ParseNamespaceAttr returns the namespace and attr from the given value.
func ParseNamespaceAttr(attr string) (uint64, string) {
AssertTrue(len(attr) >= 8)
return binary.BigEndian.Uint64([]byte(attr[:8])), attr[8:]
}

func ParseNamespaceBytes(attr string) ([]byte, string) {
AssertTrue(len(attr) >= 8)
return []byte(attr[:8]), attr[8:]
}

// ParseAttr returns the attr from the given value.
func ParseAttr(attr string) string {
AssertTrue(len(attr) >= 8)
return attr[8:]
}

// ParseNamespace returns the namespace from the given value.
func ParseNamespace(attr string) uint64 {
AssertTrue(len(attr) >= 8)
return binary.BigEndian.Uint64([]byte(attr[:8]))
}

Expand All @@ -114,7 +110,6 @@ func ParseAttrList(attrs []string) []string {
}

func IsReverseAttr(attr string) bool {
AssertTrue(len(attr) >= 8)
return attr[8] == '~'
}

Expand Down

0 comments on commit c4f4964

Please sign in to comment.