Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add update_manifest tool #7815

Merged
merged 11 commits into from
May 13, 2021
4 changes: 3 additions & 1 deletion dgraph/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/dgraph-io/dgraph/dgraph/cmd/increment"
"github.com/dgraph-io/dgraph/dgraph/cmd/live"
"github.com/dgraph-io/dgraph/dgraph/cmd/migrate"
"github.com/dgraph-io/dgraph/dgraph/cmd/update_manifest"
"github.com/dgraph-io/dgraph/dgraph/cmd/version"
"github.com/dgraph-io/dgraph/dgraph/cmd/zero"
"github.com/dgraph-io/dgraph/upgrade"
Expand Down Expand Up @@ -83,7 +84,8 @@ var rootConf = viper.New()
// subcommands initially contains all default sub-commands.
var subcommands = []*x.SubCommand{
&bulk.Bulk, &cert.Cert, &conv.Conv, &live.Live, &alpha.Alpha, &zero.Zero, &version.Version,
&debug.Debug, &migrate.Migrate, &debuginfo.DebugInfo, &upgrade.Upgrade, &decrypt.Decrypt, &increment.Increment,
&debug.Debug, &migrate.Migrate, &debuginfo.DebugInfo, &upgrade.Upgrade, &decrypt.Decrypt,
&increment.Increment, &update_manifest.UpdateManifest,
}

func initCmds() {
Expand Down
129 changes: 129 additions & 0 deletions dgraph/cmd/update_manifest/run.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Copyright 2021 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package update_manifest

import (
"encoding/binary"
"log"
"net/url"
"os"
"strings"

"github.com/dgraph-io/dgraph/ee"
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/worker"
"github.com/dgraph-io/dgraph/x"
"github.com/pkg/errors"
"github.com/spf13/cobra"
)

var (
logger = log.New(os.Stderr, "", 0)
// UpdateManifest is the sub-command invoked when running "dgraph update_manifest".
UpdateManifest x.SubCommand
quiet bool // enabling quiet mode would suppress the warning logs
)

var opt struct {
location string
key []byte
}

func init() {
UpdateManifest.Cmd = &cobra.Command{
Use: "update_manifest",
Short: "Run the Dgraph update tool to update the manifest from v21.03 to latest.",
Run: func(cmd *cobra.Command, args []string) {
if err := run(); err != nil {
logger.Fatalf("%v\n", err)
}
},
Annotations: map[string]string{"group": "tool"},
}
UpdateManifest.EnvPrefix = "DGRAPH_UPDATE_MANIFEST"
UpdateManifest.Cmd.SetHelpTemplate(x.NonRootTemplate)

flag := UpdateManifest.Cmd.Flags()
flag.StringVarP(&opt.location, "location", "l", "",
`Sets the location of the backup. Both file URIs and s3 are supported.
This command will take care of all the full + incremental backups present in the location.`)
ee.RegisterEncFlag(flag)
}

// Invalid bytes are replaced with the Unicode replacement rune.
// See https://golang.org/pkg/encoding/json/#Marshal
const replacementRune = rune('\ufffd')

func parseNsAttr(attr string) (uint64, string, error) {
if strings.ContainsRune(attr, replacementRune) {
return 0, "", errors.New("replacement char found")
}
return binary.BigEndian.Uint64([]byte(attr[:8])), attr[8:], nil
}

func run() error {
keys, err := ee.GetKeys(UpdateManifest.Conf)
if err != nil {
return err
}
opt.key = keys.EncKey
uri, err := url.Parse(opt.location)
if err != nil {
return errors.Wrapf(err, "while parsing location")
}
handler, err := worker.NewUriHandler(uri, nil)
if err != nil {
return errors.Wrapf(err, "while creating uri handler")
}
masterManifest, err := worker.GetManifest(handler, uri)
if err != nil {
return errors.Wrapf(err, "while getting manifest")
}

// Update the master manifest with the changes for drop operations and group predicates.
for _, manifest := range masterManifest.Manifests {
for gid, preds := range manifest.Groups {
parsedPreds := preds[:0]
for _, pred := range preds {
ns, attr, err := parseNsAttr(pred)
if err != nil {
logger.Printf("Unable to parse the pred: %v", pred)
continue
}
parsedPreds = append(parsedPreds, x.NamespaceAttr(ns, attr))
}
manifest.Groups[gid] = parsedPreds
}
for _, op := range manifest.DropOperations {
if op.DropOp == pb.DropOperation_ATTR {
ns, attr, err := parseNsAttr(op.DropValue)
if err != nil {
logger.Printf("Unable to parse the drop operation %+v pred: %v",
op, []byte(op.DropValue))
continue
}
op.DropValue = x.NamespaceAttr(ns, attr)
}
}
}

// Rewrite the master manifest.
if err := worker.CreateManifest(handler, uri, masterManifest); err != nil {
return errors.Wrap(err, "Complete backup failed")
}
return nil
}
2 changes: 1 addition & 1 deletion ee/backup/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func runExportBackup() error {
glog.Infof("Created temporary map directory: %s\n", mapDir)

// TODO: Can probably make this procesing concurrent.
for gid, _ := range latestManifest.Groups {
for gid := range latestManifest.Groups {
glog.Infof("Exporting group: %d", gid)
req := &pb.RestoreRequest{
GroupId: gid,
Expand Down
4 changes: 2 additions & 2 deletions worker/backup_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ func (pr *BackupProcessor) CompleteBackup(ctx context.Context, m *Manifest) erro
}
manifest.Manifests = append(manifest.Manifests, m)

if err := createManifest(handler, uri, manifest); err != nil {
if err := CreateManifest(handler, uri, manifest); err != nil {
return errors.Wrap(err, "Complete backup failed")
}
glog.Infof("Backup completed OK.")
Expand Down Expand Up @@ -668,7 +668,7 @@ func checkAndGetDropOp(key []byte, l *posting.List, readTs uint64) (*pb.DropOper
// * DROP_NS;ns
// So, accordingly construct the *pb.DropOperation.
dropOp := &pb.DropOperation{}
dropInfo := strings.Split(string(val), ";")
dropInfo := strings.SplitN(string(val), ";", 2)
if len(dropInfo) != 2 {
return nil, errors.Errorf("Unexpected value: %s for dgraph.drop.op", val)
}
Expand Down
6 changes: 3 additions & 3 deletions worker/backup_manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func getFilteredManifests(h UriHandler, manifests []*Manifest,
var validManifests []*Manifest
for _, m := range manifests {
missingFiles := false
for g, _ := range m.Groups {
for g := range m.Groups {
path := filepath.Join(m.Path, backupName(m.ValidReadTs(), g))
if !h.FileExists(path) {
missingFiles = true
Expand Down Expand Up @@ -178,7 +178,7 @@ func readManifest(h UriHandler, path string) (*Manifest, error) {
func GetLatestManifest(h UriHandler, uri *url.URL) (*Manifest, error) {
manifest, err := GetManifest(h, uri)
if err != nil {
return &Manifest{}, errors.Wrap(err, "Fialed to get the manifest")
return &Manifest{}, errors.Wrap(err, "Failed to get the manifest")
}
if len(manifest.Manifests) == 0 {
return &Manifest{}, nil
Expand Down Expand Up @@ -210,7 +210,7 @@ func GetManifest(h UriHandler, uri *url.URL) (*MasterManifest, error) {
return manifest, nil
}

func createManifest(h UriHandler, uri *url.URL, manifest *MasterManifest) error {
func CreateManifest(h UriHandler, uri *url.URL, manifest *MasterManifest) error {
var err error
if !h.DirExists("./") {
if err := h.CreateDir("./"); err != nil {
Expand Down