diff --git a/chunker/chunk.go b/chunker/chunk.go index d57e5d8a003..33557b230c5 100644 --- a/chunker/chunk.go +++ b/chunker/chunk.go @@ -352,7 +352,7 @@ func slurpQuoted(r *bufio.Reader, out *bytes.Buffer) error { // and decompressed automatically even without the gz extension. The key, if non-nil, // is used to decrypt the file. The caller is responsible for calling the returned cleanup // function when done with the reader. -func FileReader(file string, key x.SensitiveByteSlice) (*bufio.Reader, func()) { +func FileReader(file string, key x.Sensitive) (*bufio.Reader, func()) { var f *os.File var err error if file == "-" { @@ -367,7 +367,7 @@ func FileReader(file string, key x.SensitiveByteSlice) (*bufio.Reader, func()) { } // StreamReader returns a bufio given a ReadCloser. The file is passed just to check for .gz files -func StreamReader(file string, key x.SensitiveByteSlice, f io.ReadCloser) ( +func StreamReader(file string, key x.Sensitive, f io.ReadCloser) ( rd *bufio.Reader, cleanup func()) { cleanup = func() { _ = f.Close() } diff --git a/dgraph/cmd/bulk/loader.go b/dgraph/cmd/bulk/loader.go index f428fa3ef9e..b54a07a6d09 100644 --- a/dgraph/cmd/bulk/loader.go +++ b/dgraph/cmd/bulk/loader.go @@ -84,7 +84,7 @@ type options struct { // ........... Badger options .......... // EncryptionKey is the key used for encryption. Enterprise only feature. - EncryptionKey x.SensitiveByteSlice + EncryptionKey x.Sensitive // BadgerCompression is the compression algorithm to use while writing to badger. BadgerCompression bo.CompressionType // BadgerCompressionlevel is the compression level to use while writing to badger. diff --git a/dgraph/cmd/bulk/reduce.go b/dgraph/cmd/bulk/reduce.go index 4b526d7a5d1..98b4e1f2a0e 100644 --- a/dgraph/cmd/bulk/reduce.go +++ b/dgraph/cmd/bulk/reduce.go @@ -485,8 +485,8 @@ func (r *reducer) reduce(partitionKeys [][]byte, mapItrs []*mapIterator, ci *cou partitionKeys = append(partitionKeys, nil) for i := 0; i < len(partitionKeys); i++ { + pkey := partitionKeys[i] for _, itr := range mapItrs { - pkey := partitionKeys[i] itr.Next(cbuf, pkey) } if cbuf.LenNoPadding() < 256<<20 { diff --git a/dgraph/cmd/debug/run.go b/dgraph/cmd/debug/run.go index 83506504069..504e1c53ab9 100644 --- a/dgraph/cmd/debug/run.go +++ b/dgraph/cmd/debug/run.go @@ -68,7 +68,7 @@ type flagOptions struct { readTs uint64 sizeHistogram bool noKeys bool - key x.SensitiveByteSlice + key x.Sensitive // Options related to the WAL. wdir string diff --git a/dgraph/cmd/decrypt/decrypt.go b/dgraph/cmd/decrypt/decrypt.go index 4c0e94191d8..09e8502587c 100644 --- a/dgraph/cmd/decrypt/decrypt.go +++ b/dgraph/cmd/decrypt/decrypt.go @@ -32,7 +32,7 @@ import ( type options struct { // keyfile comes from the encryption_key_file or Vault flags - keyfile x.SensitiveByteSlice + keyfile x.Sensitive file string output string } diff --git a/dgraph/cmd/live/run.go b/dgraph/cmd/live/run.go index 6832d4d1cb7..5a0f89d71a9 100644 --- a/dgraph/cmd/live/run.go +++ b/dgraph/cmd/live/run.go @@ -77,7 +77,7 @@ type options struct { ludicrousMode bool upsertPredicate string tmpDir string - key x.SensitiveByteSlice + key x.Sensitive namespaceToLoad uint64 preserveNs bool } @@ -232,7 +232,7 @@ func validateSchema(sch string, namespaces map[uint64]struct{}) error { } // processSchemaFile process schema for a given gz file. -func (l *loader) processSchemaFile(ctx context.Context, file string, key x.SensitiveByteSlice, +func (l *loader) processSchemaFile(ctx context.Context, file string, key x.Sensitive, dgraphClient *dgo.Dgraph) error { fmt.Printf("\nProcessing schema file %q\n", file) if len(opt.authToken) > 0 { @@ -459,7 +459,7 @@ func (l *loader) allocateUids(nqs []*api.NQuad) { // processFile forwards a file to the RDF or JSON processor as appropriate func (l *loader) processFile(ctx context.Context, fs filestore.FileStore, filename string, - key x.SensitiveByteSlice) error { + key x.Sensitive) error { fmt.Printf("Processing data file %q\n", filename) diff --git a/dgraph/cmd/root_ee.go b/dgraph/cmd/root_ee.go index 5660d8c3450..70c2a7d8b5f 100644 --- a/dgraph/cmd/root_ee.go +++ b/dgraph/cmd/root_ee.go @@ -21,7 +21,6 @@ import ( func init() { // subcommands already has the default subcommands, we append to EE ones to that. subcommands = append(subcommands, - &backup.Restore, &backup.LsBackup, &backup.ExportBackup, &acl.CmdAcl, diff --git a/ee/backup/run.go b/ee/backup/run.go index 12ae12b10d0..3b25c6e40e9 100644 --- a/ee/backup/run.go +++ b/ee/backup/run.go @@ -13,33 +13,25 @@ package backup import ( - "context" "encoding/json" "fmt" - "io/ioutil" - "math" + "net/url" "os" "path/filepath" - "strconv" - "strings" "time" - "github.com/dgraph-io/badger/v3" - "github.com/dgraph-io/badger/v3/options" - "golang.org/x/sync/errgroup" - - "google.golang.org/grpc/credentials" + bpb "github.com/dgraph-io/badger/v3/pb" + "github.com/golang/glog" "github.com/dgraph-io/dgraph/ee" "github.com/dgraph-io/dgraph/ee/enc" + "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/protos/pb" - "github.com/dgraph-io/dgraph/upgrade" "github.com/dgraph-io/dgraph/worker" "github.com/dgraph-io/dgraph/x" "github.com/dgraph-io/ristretto/z" "github.com/pkg/errors" "github.com/spf13/cobra" - "google.golang.org/grpc" ) // Restore is the sub-command used to restore a backup. @@ -56,7 +48,7 @@ var opt struct { location string pdir string zero string - key x.SensitiveByteSlice + key x.Sensitive forceZero bool destination string format string @@ -65,98 +57,10 @@ var opt struct { } func init() { - initRestore() initBackupLs() initExportBackup() } -func initRestore() { - Restore.Cmd = &cobra.Command{ - Use: "restore", - Short: "Restore backup from Dgraph Enterprise Edition", - Long: ` -Restore loads objects created with the backup feature in Dgraph Enterprise Edition (EE). - -Backups are originated from HTTP at /admin/backup, then can be restored using CLI restore -command. Restore is intended to be used with new Dgraph clusters in offline state. - -The --location flag indicates a source URI with Dgraph backup objects. This URI supports all -the schemes used for backup. - -Source URI formats: - [scheme]://[host]/[path]?[args] - [scheme]:///[path]?[args] - /[path]?[args] (only for local or NFS) - -Source URI parts: - scheme - service handler, one of: "s3", "minio", "file" - host - remote address. ex: "dgraph.s3.amazonaws.com" - path - directory, bucket or container at target. ex: "/dgraph/backups/" - args - specific arguments that are ok to appear in logs. - -The --posting flag sets the posting list parent dir to store the loaded backup files. - -Using the --zero flag will use a Dgraph Zero address to update the start timestamp using -the restored version. Otherwise, the timestamp must be manually updated through Zero's HTTP -'assign' command. - -Dgraph backup creates a unique backup object for each node group, and restore will create -a posting directory 'p' matching the backup group ID. Such that a backup file -named '.../r32-g2.backup' will be loaded to posting dir 'p2'. - -Usage examples: - -# Restore from local dir or NFS mount: -$ dgraph restore -p . -l /var/backups/dgraph - -# Restore from S3: -$ dgraph restore -p /var/db/dgraph -l s3://s3.us-west-2.amazonaws.com/srfrog/dgraph - -# Restore from dir and update Ts: -$ dgraph restore -p . -l /var/backups/dgraph -z localhost:5080 - - `, - Args: cobra.NoArgs, - Run: func(cmd *cobra.Command, args []string) { - defer x.StartProfile(Restore.Conf).Stop() - if err := runRestoreCmd(); err != nil { - fmt.Fprintln(os.Stderr, err) - os.Exit(1) - } - }, - Annotations: map[string]string{"group": "data-load"}, - } - Restore.Cmd.SetHelpTemplate(x.NonRootTemplate) - flag := Restore.Cmd.Flags() - - flag.StringVarP(&opt.badger, "badger", "b", worker.BadgerDefaults, - z.NewSuperFlagHelp(worker.BadgerDefaults). - Head("Badger options"). - Flag("compression", - "Specifies the compression algorithm and compression level (if applicable) for the "+ - `postings directory. "none" would disable compression, while "zstd:1" would set `+ - "zstd compression at level 1."). - Flag("goroutines", - "The number of goroutines to use in badger.Stream."). - String()) - - flag.StringVarP(&opt.location, "location", "l", "", - "Sets the source location URI (required).") - flag.StringVarP(&opt.pdir, "postings", "p", "", - "Directory where posting lists are stored (required).") - flag.StringVarP(&opt.zero, "zero", "z", "", "gRPC address for Dgraph zero. ex: localhost:5080") - flag.StringVarP(&opt.backupId, "backup_id", "", "", "The ID of the backup series to "+ - "restore. If empty, it will restore the latest series.") - flag.BoolVarP(&opt.forceZero, "force_zero", "", true, "If false, no connection to "+ - "a zero in the cluster will be required. Keep in mind this requires you to manually "+ - "update the timestamp and max uid when you start the cluster. The correct values are "+ - "printed near the end of this command's output.") - x.RegisterClientTLSFlags(flag) - enc.RegisterFlags(flag) - _ = Restore.Cmd.MarkFlagRequired("postings") - _ = Restore.Cmd.MarkFlagRequired("location") -} - func initBackupLs() { LsBackup.Cmd = &cobra.Command{ Use: "lsbackup", @@ -180,92 +84,6 @@ func initBackupLs() { _ = LsBackup.Cmd.MarkFlagRequired("location") } -func runRestoreCmd() error { - var ( - start time.Time - zc pb.ZeroClient - err error - ) - _, opt.key = ee.GetKeys(Restore.Conf) - fmt.Println("Restoring backups from:", opt.location) - fmt.Println("Writing postings to:", opt.pdir) - - if opt.zero == "" && opt.forceZero { - return errors.Errorf("No Dgraph Zero address passed. Use the --force_zero option if you " + - "meant to do this") - } - - if opt.zero != "" { - fmt.Println("Updating Zero timestamp at:", opt.zero) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - tlsConfig, err := x.LoadClientTLSConfigForInternalPort(Restore.Conf) - x.Checkf(err, "Unable to generate helper TLS config") - callOpts := []grpc.DialOption{grpc.WithBlock()} - if tlsConfig != nil { - callOpts = append(callOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))) - } else { - callOpts = append(callOpts, grpc.WithInsecure()) - } - - zero, err := grpc.DialContext(ctx, opt.zero, callOpts...) - if err != nil { - return errors.Wrapf(err, "Unable to connect to %s", opt.zero) - } - zc = pb.NewZeroClient(zero) - } - - badger := z.NewSuperFlag(opt.badger).MergeAndCheckDefault(worker.BadgerDefaults) - ctype, clevel := x.ParseCompression(badger.GetString("compression")) - - start = time.Now() - result := worker.RunRestore(opt.pdir, opt.location, opt.backupId, opt.key, ctype, clevel) - if result.Err != nil { - return result.Err - } - if result.Version == 0 { - return errors.Errorf("Failed to obtain a restore version") - } - fmt.Printf("Restore version: %d\n", result.Version) - fmt.Printf("Restore max uid: %d\n", result.MaxLeaseUid) - - if zc != nil { - ctx, cancelTs := context.WithTimeout(context.Background(), time.Minute) - defer cancelTs() - - if _, err := zc.Timestamps(ctx, &pb.Num{Val: result.Version}); err != nil { - fmt.Printf("Failed to assign timestamp %d in Zero: %v", result.Version, err) - return err - } - - leaseID := func(val uint64, typ pb.NumLeaseType) error { - // MaxLeaseUid can be zero if the backup was taken on an empty DB. - if val == 0 { - return nil - } - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() - if _, err = zc.AssignIds(ctx, &pb.Num{Val: val, Type: typ}); err != nil { - fmt.Printf("Failed to assign %s %d in Zero: %v\n", - pb.NumLeaseType_name[int32(typ)], val, err) - return err - } - return nil - } - - if err := leaseID(result.MaxLeaseUid, pb.Num_UID); err != nil { - return errors.Wrapf(err, "cannot update max uid lease after restore.") - } - if err := leaseID(result.MaxLeaseNsId, pb.Num_NS_ID); err != nil { - return errors.Wrapf(err, "cannot update max namespace lease after restore.") - } - } - - fmt.Printf("Restore: Time elapsed: %s\n", time.Since(start).Round(time.Second)) - return nil -} - func runLsbackupCmd() error { manifests, err := worker.ListBackupManifests(opt.location, nil) if err != nil { @@ -356,70 +174,126 @@ func runExportBackup() error { if err := os.MkdirAll(exportDir, 0755); err != nil { return errors.Wrapf(err, "cannot create dir %s", exportDir) } - tmpDir, err := ioutil.TempDir("", "export_backup") + + uri, err := url.Parse(opt.location) if err != nil { - return errors.Wrapf(err, "cannot create temp dir") + return errors.Wrapf(err, "runExportBackup") } - - restore := worker.RunRestore(tmpDir, opt.location, "", opt.key, options.None, 0) - if restore.Err != nil { - return restore.Err + handler, err := worker.NewUriHandler(uri, nil) + if err != nil { + return errors.Wrapf(err, "runExportBackup") } - - files, err := ioutil.ReadDir(tmpDir) + latestManifest, err := worker.GetLatestManifest(handler, uri) if err != nil { - return err + return errors.Wrapf(err, "runExportBackup") } - // Export the data from the p directories produced by the last step. - eg, _ := errgroup.WithContext(context.Background()) - for _, f := range files { - if !f.IsDir() { - continue - } - dir := filepath.Join(filepath.Join(tmpDir, f.Name())) - gid, err := strconv.ParseUint(strings.TrimPrefix(f.Name(), "p"), 32, 10) - if err != nil { - fmt.Printf("WARNING WARNING WARNING: unable to get group id from directory "+ - "inside DB at %s: %v", dir, err) - continue - } - if opt.upgrade && gid == 1 { - // Query the cors in badger db and append it at the end of GraphQL schema. - // This change was introduced in v21.03. Backups with 20.07 <= version < 21.03 - // should apply this. - db, err := badger.OpenManaged(badger.DefaultOptions(dir). - WithNumVersionsToKeep(math.MaxInt32). - WithEncryptionKey(opt.key)) + exportSchema := func(writers *worker.Writers, val []byte, pk x.ParsedKey) error { + kv := &bpb.KV{} + var err error + if pk.IsSchema() { + kv, err = worker.SchemaExportKv(pk.Attr, val, true) if err != nil { return err } - if err := upgrade.OfflineUpgradeFrom2011To2103(db); err != nil { - return errors.Wrapf(err, "while fixing cors") - } - if err := db.Close(); err != nil { + } else { + kv, err = worker.TypeExportKv(pk.Attr, val) + if err != nil { return err } } - eg.Go(func() error { - return worker.StoreExport(&pb.ExportRequest{ - GroupId: uint32(gid), - ReadTs: restore.Version, - UnixTs: time.Now().Unix(), - Format: opt.format, - Destination: exportDir, - }, dir, opt.key) - }) + return worker.WriteExport(writers, kv, "rdf") } - if err := eg.Wait(); err != nil { - return errors.Wrapf(err, "error while exporting data") + processKvBuf := func(ch chan *z.Buffer, req *pb.ExportRequest, writers *worker.Writers) error { + for buf := range ch { + kv := &bpb.KV{} + err := buf.SliceIterate(func(s []byte) error { + kv.Reset() + if err := kv.Unmarshal(s); err != nil { + return errors.Wrap(err, "processKvBuf failed to unmarshal kv") + } + pk, err := x.Parse(kv.Key) + if err != nil { + return errors.Wrap(err, "processKvBuf failed to parse key") + } + if pk.Attr == "_predicate_" { + return nil + } + if pk.IsSchema() || pk.IsType() { + return exportSchema(writers, kv.Value, pk) + } + if pk.IsData() { + pl := &pb.PostingList{} + if err := pl.Unmarshal(kv.Value); err != nil { + return errors.Wrap(err, "ProcessKvBuf failed to Unmarshal pl") + } + l := posting.NewList(kv.Key, pl, kv.Version) + kvList, err := worker.ToExportKvList(pk, l, req) + if err != nil { + return errors.Wrap(err, "processKvBuf failed to Export") + } + if len(kvList.Kv) == 0 { + return nil + } + exportKv := kvList.Kv[0] + return worker.WriteExport(writers, exportKv, req.Format) + } + return nil + }) + if err != nil { + return err + } + buf.Release() + } + return nil } - // Clean up temporary directory. - if err := os.RemoveAll(tmpDir); err != nil { - return errors.Wrapf(err, "cannot remove temp directory at %s", tmpDir) - } + // TODO: Can probably make this procesing concurrent. + for gid, _ := range latestManifest.Groups { + glog.Infof("Exporting group: %d", gid) + req := &pb.RestoreRequest{ + GroupId: gid, + Location: opt.location, + EncryptionKeyFile: ExportBackup.Conf.GetString("encryption_key_file"), + } + if err := worker.MapBackup(req); err != nil { + return errors.Wrap(err, "Failed to map the backups") + } + in := &pb.ExportRequest{ + GroupId: uint32(gid), + ReadTs: latestManifest.Since, + UnixTs: time.Now().Unix(), + Format: opt.format, + Destination: exportDir, + } + uts := time.Unix(in.UnixTs, 0) + destPath := fmt.Sprintf("dgraph.r%d.u%s", in.ReadTs, uts.UTC().Format("0102.1504")) + exportStorage, err := worker.NewExportStorage(in, destPath) + if err != nil { + return err + } + + writers, err := worker.InitWriters(exportStorage, in) + if err != nil { + return err + } + r := worker.NewBackupReducer(nil, 0) + errCh := make(chan error, 1) + go func() { + errCh <- processKvBuf(r.WriteCh(), in, writers) + }() + + if err := r.Reduce(); err != nil { + return errors.Wrap(err, "Failed to reduce the map") + } + if err := <-errCh; err != nil { + return errors.Wrap(err, "Failed to process reduced buffers") + } + if _, err := exportStorage.FinishWriting(writers); err != nil { + return errors.Wrap(err, "Failed to finish write") + } + } return nil } diff --git a/ee/enc/util_ee.go b/ee/enc/util_ee.go index 6f8f31ec00e..a70b67a3947 100644 --- a/ee/enc/util_ee.go +++ b/ee/enc/util_ee.go @@ -26,7 +26,7 @@ import ( var EeBuild = true // GetWriter wraps a crypto StreamWriter using the input key on the input Writer. -func GetWriter(key x.SensitiveByteSlice, w io.Writer) (io.Writer, error) { +func GetWriter(key x.Sensitive, w io.Writer) (io.Writer, error) { // No encryption, return the input writer as is. if key == nil { return w, nil @@ -49,7 +49,7 @@ func GetWriter(key x.SensitiveByteSlice, w io.Writer) (io.Writer, error) { } // GetReader wraps a crypto StreamReader using the input key on the input Reader. -func GetReader(key x.SensitiveByteSlice, r io.Reader) (io.Reader, error) { +func GetReader(key x.Sensitive, r io.Reader) (io.Reader, error) { // No encryption, return input reader as is. if key == nil { return r, nil diff --git a/ee/utils.go b/ee/utils.go index 181fff752ab..13f06737e53 100644 --- a/ee/utils.go +++ b/ee/utils.go @@ -27,7 +27,7 @@ import ( // GetKeys returns the ACL and encryption keys as configured by the user // through the --acl, --encryption_key_file, and --vault flags. On OSS builds, // this function exits with an error. -func GetKeys(config *viper.Viper) (x.SensitiveByteSlice, x.SensitiveByteSlice) { +func GetKeys(config *viper.Viper) (x.Sensitive, x.Sensitive) { glog.Exit("flags: acl / encryption is an enterprise-only feature") return nil, nil } diff --git a/ee/utils_ee.go b/ee/utils_ee.go index a75a33843c8..4067e26b47b 100644 --- a/ee/utils_ee.go +++ b/ee/utils_ee.go @@ -25,7 +25,7 @@ import ( // GetKeys returns the ACL and encryption keys as configured by the user // through the --acl, --encryption_key_file, and --vault flags. On OSS builds, // this function exits with an error. -func GetKeys(config *viper.Viper) (x.SensitiveByteSlice, x.SensitiveByteSlice) { +func GetKeys(config *viper.Viper) (x.Sensitive, x.Sensitive) { aclSuperFlag := z.NewSuperFlag(config.GetString("acl")) aclKey, encKey := vault.GetKeys(config) var err error diff --git a/ee/vault/vault.go b/ee/vault/vault.go index 6e441dea4ba..13a782216de 100644 --- a/ee/vault/vault.go +++ b/ee/vault/vault.go @@ -24,7 +24,7 @@ import ( "github.com/spf13/viper" ) -func GetKeys(config *viper.Viper) (aclKey, encKey x.SensitiveByteSlice) { +func GetKeys(config *viper.Viper) (aclKey, encKey x.Sensitive) { glog.Exit("flags: vault is an enterprise-only feature") return } diff --git a/ee/vault/vault_ee.go b/ee/vault/vault_ee.go index 41a4d7c9261..7bd65279527 100644 --- a/ee/vault/vault_ee.go +++ b/ee/vault/vault_ee.go @@ -25,7 +25,7 @@ import ( "github.com/spf13/viper" ) -func GetKeys(config *viper.Viper) (aclKey, encKey x.SensitiveByteSlice) { +func GetKeys(config *viper.Viper) (aclKey, encKey x.Sensitive) { // Avoid querying Vault unless the flag has been explicitly set. if !config.IsSet(flagVault) { return @@ -91,7 +91,7 @@ func getKvStore(client *api.Client, path string) (kvStore, error) { } // getSensitiveBytes retrieves a value from a kvStore, decoding it if necessary. -func (kv kvStore) getSensitiveBytes(field, format string) (x.SensitiveByteSlice, error) { +func (kv kvStore) getSensitiveBytes(field, format string) (x.Sensitive, error) { value, ok := kv[field] if !ok { return nil, fmt.Errorf("vault: key '%s' not found", field) @@ -103,7 +103,7 @@ func (kv kvStore) getSensitiveBytes(field, format string) (x.SensitiveByteSlice, } // Decode value if necessary. - var valueBytes x.SensitiveByteSlice + var valueBytes x.Sensitive var err error if format == "base64" { valueBytes, err = base64.StdEncoding.DecodeString(valueString) @@ -112,7 +112,7 @@ func (kv kvStore) getSensitiveBytes(field, format string) (x.SensitiveByteSlice, "vault: key '%s' could not be decoded as a base64 string: %s", field, err) } } else { - valueBytes = x.SensitiveByteSlice(valueString) + valueBytes = x.Sensitive(valueString) } return valueBytes, nil diff --git a/filestore/filestore.go b/filestore/filestore.go index 0d31ee0b59f..0e10f04f499 100644 --- a/filestore/filestore.go +++ b/filestore/filestore.go @@ -31,7 +31,7 @@ type FileStore interface { Open(path string) (io.ReadCloser, error) Exists(path string) bool FindDataFiles(str string, ext []string) []string - ChunkReader(file string, key x.SensitiveByteSlice) (*bufio.Reader, func()) + ChunkReader(file string, key x.Sensitive) (*bufio.Reader, func()) } // NewFileStore returns a new file storage. If remote, it's backed by an x.MinioClient diff --git a/filestore/local_files.go b/filestore/local_files.go index ee3a3e43cb0..ba9f0288c39 100644 --- a/filestore/local_files.go +++ b/filestore/local_files.go @@ -43,7 +43,7 @@ func (*localFiles) FindDataFiles(str string, ext []string) []string { return x.FindDataFiles(str, ext) } -func (*localFiles) ChunkReader(file string, key x.SensitiveByteSlice) (*bufio.Reader, func()) { +func (*localFiles) ChunkReader(file string, key x.Sensitive) (*bufio.Reader, func()) { return chunker.FileReader(file, key) } diff --git a/filestore/remote_files.go b/filestore/remote_files.go index 16d26a93b1f..347882f0b72 100644 --- a/filestore/remote_files.go +++ b/filestore/remote_files.go @@ -77,7 +77,7 @@ func (rf *remoteFiles) FindDataFiles(str string, ext []string) (paths []string) return } -func (rf *remoteFiles) ChunkReader(file string, key x.SensitiveByteSlice) (*bufio.Reader, func()) { +func (rf *remoteFiles) ChunkReader(file string, key x.Sensitive) (*bufio.Reader, func()) { url, err := url.Parse(file) x.Check(err) diff --git a/graphql/schema/gqlschema.go b/graphql/schema/gqlschema.go index bc8c4cc2744..f6d3e359231 100644 --- a/graphql/schema/gqlschema.go +++ b/graphql/schema/gqlschema.go @@ -425,7 +425,7 @@ type directiveValidator func( typ *ast.Definition, field *ast.FieldDefinition, dir *ast.Directive, - secrets map[string]x.SensitiveByteSlice) gqlerror.List + secrets map[string]x.Sensitive) gqlerror.List type searchTypeIndex struct { gqlType string @@ -553,7 +553,7 @@ func ValidatorNoOp( typ *ast.Definition, field *ast.FieldDefinition, dir *ast.Directive, - secrets map[string]x.SensitiveByteSlice) gqlerror.List { + secrets map[string]x.Sensitive) gqlerror.List { return nil } @@ -852,7 +852,7 @@ func preGQLValidation(schema *ast.SchemaDocument) gqlerror.List { // has fleshed out the schema structure; we just need to check if it also satisfies // the extra rules. func postGQLValidation(schema *ast.Schema, definitions []string, - secrets map[string]x.SensitiveByteSlice) gqlerror.List { + secrets map[string]x.Sensitive) gqlerror.List { var errs []*gqlerror.Error for _, defn := range definitions { diff --git a/graphql/schema/rules.go b/graphql/schema/rules.go index a9453a30125..af5a2cd7b05 100644 --- a/graphql/schema/rules.go +++ b/graphql/schema/rules.go @@ -818,7 +818,7 @@ func listValidityCheck(typ *ast.Definition, field *ast.FieldDefinition) gqlerror func hasInverseValidation(sch *ast.Schema, typ *ast.Definition, field *ast.FieldDefinition, dir *ast.Directive, - secrets map[string]x.SensitiveByteSlice) gqlerror.List { + secrets map[string]x.Sensitive) gqlerror.List { var errs []*gqlerror.Error invTypeName := field.Type.Name() @@ -1002,7 +1002,7 @@ func searchValidation( typ *ast.Definition, field *ast.FieldDefinition, dir *ast.Directive, - secrets map[string]x.SensitiveByteSlice) gqlerror.List { + secrets map[string]x.Sensitive) gqlerror.List { var errs []*gqlerror.Error arg := dir.Arguments.ForName(searchArgs) @@ -1081,7 +1081,7 @@ func searchValidation( } func dgraphDirectiveValidation(sch *ast.Schema, typ *ast.Definition, field *ast.FieldDefinition, - dir *ast.Directive, secrets map[string]x.SensitiveByteSlice) gqlerror.List { + dir *ast.Directive, secrets map[string]x.Sensitive) gqlerror.List { var errs []*gqlerror.Error if isID(field) { @@ -1220,7 +1220,7 @@ func passwordValidation(sch *ast.Schema, typ *ast.Definition, field *ast.FieldDefinition, dir *ast.Directive, - secrets map[string]x.SensitiveByteSlice) gqlerror.List { + secrets map[string]x.Sensitive) gqlerror.List { return passwordDirectiveValidation(sch, typ) } @@ -1229,7 +1229,7 @@ func lambdaDirectiveValidation(sch *ast.Schema, typ *ast.Definition, field *ast.FieldDefinition, dir *ast.Directive, - secrets map[string]x.SensitiveByteSlice) gqlerror.List { + secrets map[string]x.Sensitive) gqlerror.List { // if the lambda url wasn't specified during alpha startup, // just return that error. Don't confuse the user with errors from @custom yet. if x.LambdaUrl(x.GalaxyNamespace) == "" { @@ -1402,7 +1402,7 @@ func customDirectiveValidation(sch *ast.Schema, typ *ast.Definition, field *ast.FieldDefinition, dir *ast.Directive, - secrets map[string]x.SensitiveByteSlice) gqlerror.List { + secrets map[string]x.Sensitive) gqlerror.List { var errs []*gqlerror.Error // 1. Validating custom directive itself @@ -2045,7 +2045,7 @@ func idValidation(sch *ast.Schema, typ *ast.Definition, field *ast.FieldDefinition, dir *ast.Directive, - secrets map[string]x.SensitiveByteSlice) gqlerror.List { + secrets map[string]x.Sensitive) gqlerror.List { if field.Type.String() == "String!" || field.Type.String() == "Int!" || field.Type.String() == "Int64!" || @@ -2123,7 +2123,7 @@ func apolloRequiresValidation(sch *ast.Schema, typ *ast.Definition, field *ast.FieldDefinition, dir *ast.Directive, - secrets map[string]x.SensitiveByteSlice) gqlerror.List { + secrets map[string]x.Sensitive) gqlerror.List { extendsDirective := typ.Directives.ForName(apolloExtendsDirective) if extendsDirective == nil { @@ -2160,7 +2160,7 @@ func apolloProvidesValidation(sch *ast.Schema, typ *ast.Definition, field *ast.FieldDefinition, dir *ast.Directive, - secrets map[string]x.SensitiveByteSlice) gqlerror.List { + secrets map[string]x.Sensitive) gqlerror.List { fldTypeDefn := sch.Types[field.Type.Name()] keyDirective := fldTypeDefn.Directives.ForName(apolloKeyDirective) @@ -2193,7 +2193,7 @@ func apolloExternalValidation(sch *ast.Schema, typ *ast.Definition, field *ast.FieldDefinition, dir *ast.Directive, - secrets map[string]x.SensitiveByteSlice) gqlerror.List { + secrets map[string]x.Sensitive) gqlerror.List { extendsDirective := typ.Directives.ForName(apolloExtendsDirective) if extendsDirective == nil { @@ -2226,7 +2226,7 @@ func remoteResponseValidation(sch *ast.Schema, typ *ast.Definition, field *ast.FieldDefinition, dir *ast.Directive, - secrets map[string]x.SensitiveByteSlice) gqlerror.List { + secrets map[string]x.Sensitive) gqlerror.List { remoteDirectiveDefn := typ.Directives.ForName(remoteDirective) if remoteDirectiveDefn == nil { diff --git a/graphql/schema/schemagen.go b/graphql/schema/schemagen.go index 60e1b0b0017..86cdbf3f9a6 100644 --- a/graphql/schema/schemagen.go +++ b/graphql/schema/schemagen.go @@ -178,7 +178,7 @@ func (s *handler) GQLSchemaWithoutApolloExtras() string { type metaInfo struct { // secrets are key value pairs stored in the GraphQL schema which can be added as headers // to requests which resolve custom queries/mutations. These are extracted from # Dgraph.Secret. - secrets map[string]x.SensitiveByteSlice + secrets map[string]x.Sensitive // extraCorsHeaders are the allowed CORS Headers in addition to x.AccessControlAllowedHeaders. // These are parsed from the forwardHeaders specified in the @custom directive. // The header for Dgraph.Authorization is also part of this. @@ -208,7 +208,7 @@ func parseMetaInfo(sch string) (*metaInfo, error) { scanner := bufio.NewScanner(strings.NewReader(sch)) authSecret := "" schMetaInfo := &metaInfo{ - secrets: make(map[string]x.SensitiveByteSlice), + secrets: make(map[string]x.Sensitive), allowedCorsOrigins: make(map[string]bool), } var err error @@ -262,7 +262,7 @@ func parseMetaInfo(sch string) (*metaInfo, error) { val = strings.Trim(val, `"`) key := strings.Trim(parts[2], `"`) // lets obfuscate the value of the secrets from here on. - schMetaInfo.secrets[key] = x.SensitiveByteSlice(val) + schMetaInfo.secrets[key] = x.Sensitive(val) } } diff --git a/raftwal/log.go b/raftwal/log.go index 7a4034dd16e..9b01b2d6093 100644 --- a/raftwal/log.go +++ b/raftwal/log.go @@ -62,7 +62,7 @@ const ( var ( emptyEntry = entry(make([]byte, entrySize)) - encryptionKey x.SensitiveByteSlice + encryptionKey x.Sensitive ) type entry []byte diff --git a/raftwal/storage.go b/raftwal/storage.go index 58f2845c468..49ef11ade25 100644 --- a/raftwal/storage.go +++ b/raftwal/storage.go @@ -87,7 +87,7 @@ func Init(dir string) *DiskStorage { // InitEncrypted initializes returns a properly initialized instance of DiskStorage. // To gracefully shutdown DiskStorage, store.Closer.SignalAndWait() should be called. -func InitEncrypted(dir string, encKey x.SensitiveByteSlice) (*DiskStorage, error) { +func InitEncrypted(dir string, encKey x.Sensitive) (*DiskStorage, error) { w := &DiskStorage{ dir: dir, } diff --git a/systest/backup/common/utils.go b/systest/backup/common/utils.go index c5236877e9e..bbda6798258 100644 --- a/systest/backup/common/utils.go +++ b/systest/backup/common/utils.go @@ -25,7 +25,6 @@ import ( "github.com/dgraph-io/dgraph/testutil" "github.com/dgraph-io/dgraph/worker" - "github.com/dgraph-io/dgraph/x" ) var ( @@ -48,7 +47,7 @@ func RunFailingRestore(t *testing.T, backupLocation, lastDir string, commitTs ui require.NoError(t, os.RemoveAll(restoreDir)) result := worker.RunRestore("./data/restore", backupLocation, lastDir, - x.SensitiveByteSlice(nil), options.Snappy, 0) + "", options.Snappy, 0) require.Error(t, result.Err) require.Contains(t, result.Err.Error(), "expected a BackupNum value of 1") } diff --git a/systest/backup/encryption/backup_test.go b/systest/backup/encryption/backup_test.go index 195e0f84ccd..27410aebad2 100644 --- a/systest/backup/encryption/backup_test.go +++ b/systest/backup/encryption/backup_test.go @@ -30,7 +30,6 @@ import ( "github.com/dgraph-io/badger/v3/options" "github.com/dgraph-io/dgo/v200/protos/api" - "github.com/dgraph-io/dgraph/ee" "github.com/dgraph-io/dgraph/ee/enc" "github.com/dgraph-io/dgraph/testutil" "github.com/dgraph-io/dgraph/worker" @@ -317,10 +316,8 @@ func runRestore(t *testing.T, lastDir string, commitTs uint64) map[string]string t.Logf("--- Restoring from: %q", localBackupDst) testutil.KeyFile = "../../../ee/enc/test-fixtures/enc-key" - key, err := ioutil.ReadFile("../../../ee/enc/test-fixtures/enc-key") - require.NoError(t, err) result := worker.RunRestore("./data/restore", localBackupDst, lastDir, - x.SensitiveByteSlice(key), options.Snappy, 0) + testutil.KeyFile, options.Snappy, 0) require.NoError(t, result.Err) for i, pdir := range []string{"p1", "p2", "p3"} { @@ -355,14 +352,10 @@ func runFailingRestore(t *testing.T, backupLocation, lastDir string, commitTs ui // Recreate the restore directory to make sure there's no previous data when // calling restore. require.NoError(t, os.RemoveAll(restoreDir)) + keyFile := "../../../ee/enc/test-fixtures/enc-key" - // Get key. - config := getEncConfig() - config.Set("encryption_key_file", "../../../ee/enc/test-fixtures/enc-key") - _, encKey := ee.GetKeys(config) - require.NotNil(t, encKey) - - result := worker.RunRestore("./data/restore", backupLocation, lastDir, encKey, options.Snappy, 0) + result := worker.RunRestore("./data/restore", backupLocation, lastDir, keyFile, + options.Snappy, 0) require.Error(t, result.Err) require.Contains(t, result.Err.Error(), "expected a BackupNum value of 1") } diff --git a/systest/backup/filesystem/backup_test.go b/systest/backup/filesystem/backup_test.go index da0aab1adf8..7f97dad4611 100644 --- a/systest/backup/filesystem/backup_test.go +++ b/systest/backup/filesystem/backup_test.go @@ -425,7 +425,7 @@ func runRestore(t *testing.T, backupLocation, lastDir string, commitTs uint64) m require.NoError(t, os.RemoveAll(restoreDir)) t.Logf("--- Restoring from: %q", backupLocation) - result := worker.RunRestore("./data/restore", backupLocation, lastDir, x.SensitiveByteSlice(nil), options.Snappy, 0) + result := worker.RunRestore("./data/restore", backupLocation, lastDir, "", options.Snappy, 0) require.NoError(t, result.Err) for i, pdir := range []string{"p1", "p2", "p3"} { diff --git a/systest/backup/minio-large/backup_test.go b/systest/backup/minio-large/backup_test.go index deae1403aa9..e45d36e96f4 100644 --- a/systest/backup/minio-large/backup_test.go +++ b/systest/backup/minio-large/backup_test.go @@ -167,7 +167,7 @@ func runRestore(t *testing.T, backupLocation, lastDir string, commitTs uint64) m require.NoError(t, os.MkdirAll(restoreDir, os.ModePerm)) t.Logf("--- Restoring from: %q", backupLocation) - result := worker.RunRestore("./data/restore", backupLocation, lastDir, x.SensitiveByteSlice(nil), options.Snappy, 0) + result := worker.RunRestore("./data/restore", backupLocation, lastDir, "", options.Snappy, 0) require.NoError(t, result.Err) restored1, err := testutil.GetPredicateValues("./data/restore/p1", x.GalaxyAttr("name1"), commitTs) diff --git a/systest/backup/minio/backup_test.go b/systest/backup/minio/backup_test.go index 6c86c3e772f..7870a71eee0 100644 --- a/systest/backup/minio/backup_test.go +++ b/systest/backup/minio/backup_test.go @@ -337,8 +337,7 @@ func runRestore(t *testing.T, lastDir string, commitTs uint64) map[string]string require.NoError(t, os.RemoveAll(restoreDir)) t.Logf("--- Restoring from: %q", localBackupDst) - result := worker.RunRestore("./data/restore", localBackupDst, lastDir, - x.SensitiveByteSlice(nil), options.Snappy, 0) + result := worker.RunRestore("./data/restore", localBackupDst, lastDir, "", options.Snappy, 0) require.NoError(t, result.Err) for i, pdir := range []string{"p1", "p2", "p3"} { @@ -360,7 +359,7 @@ func runFailingRestore(t *testing.T, backupLocation, lastDir string, commitTs ui // calling restore. require.NoError(t, os.RemoveAll(restoreDir)) - result := worker.RunRestore("./data/restore", backupLocation, lastDir, x.SensitiveByteSlice(nil), options.Snappy, 0) + result := worker.RunRestore("./data/restore", backupLocation, lastDir, "", options.Snappy, 0) require.Error(t, result.Err) require.Contains(t, result.Err.Error(), "expected a BackupNum value of 1") } diff --git a/systest/backup/multi-tenancy/backup_test.go b/systest/backup/multi-tenancy/backup_test.go index b2015808790..9909537f17a 100644 --- a/systest/backup/multi-tenancy/backup_test.go +++ b/systest/backup/multi-tenancy/backup_test.go @@ -19,21 +19,15 @@ import ( "bytes" "context" "encoding/json" - "fmt" - "math" - "os" - "path/filepath" "strings" "testing" - "github.com/dgraph-io/badger/v3/options" "github.com/dgraph-io/dgo/v200" "github.com/dgraph-io/dgo/v200/protos/api" "github.com/stretchr/testify/require" "github.com/dgraph-io/dgraph/systest/backup/common" "github.com/dgraph-io/dgraph/testutil" - "github.com/dgraph-io/dgraph/worker" "github.com/dgraph-io/dgraph/x" ) @@ -54,7 +48,7 @@ func TestBackupMultiTenancy(t *testing.T) { ctx := context.Background() dg := testutil.DgClientWithLogin(t, "groot", "password", x.GalaxyNamespace) - require.NoError(t, dg.Alter(ctx, &api.Operation{DropAll: true})) + testutil.DropAll(t, dg) galaxyCreds := &testutil.LoginParams{UserID: "groot", Passwd: "password", Namespace: x.GalaxyNamespace} galaxyToken := testutil.Login(t, galaxyCreds) @@ -78,10 +72,6 @@ func TestBackupMultiTenancy(t *testing.T) { addData := func(dg *dgo.Dgraph, name string) *api.Response { var buf bytes.Buffer - for i := 0; i < 10000; i++ { - buf.Write([]byte(fmt.Sprintf(`<_:x%d> "%s" . - `, i, name))) - } // Add initial data. _, err = dg.NewTxn().Mutate(ctx, &api.Mutation{ CommitNow: true, @@ -92,11 +82,11 @@ func TestBackupMultiTenancy(t *testing.T) { original, err := dg.NewTxn().Mutate(ctx, &api.Mutation{ CommitNow: true, SetNquads: []byte(` - <_:x1> "BIRDS MAN OR (THE UNEXPECTED VIRTUE OF IGNORANCE)" . - <_:x2> "Spotlight" . - <_:x3> "Moonlight" . - <_:x4> "THE SHAPE OF WATERLOO" . - <_:x5> "BLACK PUNTER" . + <_:x1> "a" . + <_:x2> "b" . + <_:x3> "c" . + <_:x4> "d" . + <_:x5> "e" . `), }) require.NoError(t, err) @@ -113,200 +103,28 @@ func TestBackupMultiTenancy(t *testing.T) { // Send backup request. _ = runBackup(t, galaxyToken, 3, 1) - restored := runRestore(t, copyBackupDir, "", math.MaxUint64, []uint64{x.GalaxyNamespace, ns}) - - preds := []string{"dgraph.graphql.schema", "name", "dgraph.graphql.xid", "dgraph.type", "movie", - "dgraph.graphql.p_query", "dgraph.drop.op", "dgraph.xid", "dgraph.acl.rule", - "dgraph.password", "dgraph.user.group", "dgraph.rule.predicate", "dgraph.rule.permission"} - preds = append(preds, preds...) - types := []string{"Node", "dgraph.graphql", "dgraph.graphql.persisted_query", - "dgraph.type.Rule", "dgraph.type.User", "dgraph.type.Group"} // ACL - types = append(types, types...) - testutil.CheckSchema(t, preds, types) - - verifyUids := func(dg *dgo.Dgraph, name string, count int) { - query := fmt.Sprintf(` - { - me(func: eq(name, "%s")) { - count(uid) - } - }`, name) - res, err := dg.NewTxn().Query(context.Background(), query) - require.NoError(t, err) - require.JSONEq(t, string(res.GetJson()), fmt.Sprintf(`{"me":[{"count":%d}]}`, count)) - } - verifyUids(dg, "galaxy", 10000) - verifyUids(dg1, "ns", 10000) - - checks := []struct { - blank, expected string - }{ - {blank: "x1", expected: "BIRDS MAN OR (THE UNEXPECTED VIRTUE OF IGNORANCE)"}, - {blank: "x2", expected: "Spotlight"}, - {blank: "x3", expected: "Moonlight"}, - {blank: "x4", expected: "THE SHAPE OF WATERLOO"}, - {blank: "x5", expected: "BLACK PUNTER"}, - } - for ns, orig := range original { - for _, check := range checks { - require.EqualValues(t, check.expected, restored[ns][orig.Uids[check.blank]]) - } - } - - addMoreData := func(dg *dgo.Dgraph, ns uint64) *api.Response { - // Add more data for the incremental backup. - incr1, err := dg.NewTxn().Mutate(ctx, &api.Mutation{ - CommitNow: true, - SetNquads: []byte(fmt.Sprintf(` - <%s> "Birdman or (The Unexpected Virtue of Ignorance)" . - <%s> "The Shape of Waterloo" . - `, original[ns].Uids["x1"], original[ns].Uids["x4"])), - }) - t.Logf("%+v", incr1) - require.NoError(t, err) - - // Update schema and types to make sure updates to the schema are backed up. - require.NoError(t, dg.Alter(ctx, &api.Operation{Schema: ` - movie: string . - actor: string . - name: string @index(hash) . - type Node { - movie - } - type NewNode { - actor - }`})) - return incr1 - } - - incr1 := make(map[uint64]*api.Response) - incr1[x.GalaxyNamespace] = addMoreData(dg, x.GalaxyNamespace) - incr1[ns] = addMoreData(dg1, ns) - - // Perform first incremental backup. - _ = runBackup(t, galaxyToken, 6, 2) - restored = runRestore(t, copyBackupDir, "", - x.Max(incr1[x.GalaxyNamespace].Txn.CommitTs, incr1[ns].Txn.CommitTs), []uint64{x.GalaxyNamespace, ns}) - - // Check the predicates and types in the schema are as expected. - preds = append(preds, "actor", "actor") - types = append(types, "NewNode", "NewNode") - testutil.CheckSchema(t, preds, types) - - // Perform some checks on the restored values. - checks = []struct { - blank, expected string - }{ - {blank: "x1", expected: "Birdman or (The Unexpected Virtue of Ignorance)"}, - {blank: "x4", expected: "The Shape of Waterloo"}, - } - for ns, orig := range original { - for _, check := range checks { - require.EqualValues(t, check.expected, restored[ns][orig.Uids[check.blank]]) - } - } - - addMoreData2 := func(dg *dgo.Dgraph, ns uint64) *api.Response { - // Add more data for the incremental backup. - incr2, err := dg.NewTxn().Mutate(ctx, &api.Mutation{ - CommitNow: true, - SetNquads: []byte(fmt.Sprintf(` - <%s> "The Shape of Water" . - <%s> "The Black Panther" . - `, original[ns].Uids["x4"], original[ns].Uids["x5"])), - }) - require.NoError(t, err) - return incr2 - } - - incr2 := make(map[uint64]*api.Response) - incr2[x.GalaxyNamespace] = addMoreData2(dg, x.GalaxyNamespace) - incr2[ns] = addMoreData2(dg1, ns) + testutil.DropAll(t, dg) + sendRestoreRequest(t, alphaBackupDir, galaxyToken.AccessJwt) + testutil.WaitForRestore(t, dg) - // Perform second incremental backup. - _ = runBackup(t, galaxyToken, 9, 3) - restored = runRestore(t, copyBackupDir, "", - x.Max(incr2[x.GalaxyNamespace].Txn.CommitTs, incr2[ns].Txn.CommitTs), []uint64{x.GalaxyNamespace, ns}) - testutil.CheckSchema(t, preds, types) - - checks = []struct { - blank, expected string - }{ - {blank: "x4", expected: "The Shape of Water"}, - {blank: "x5", expected: "The Black Panther"}, - } - for ns, orig := range original { - for _, check := range checks { - require.EqualValues(t, check.expected, restored[ns][orig.Uids[check.blank]]) - } - } - - addMoreData3 := func(dg *dgo.Dgraph, ns uint64) *api.Response { - // Add more data for the incremental backup. - incr2, err := dg.NewTxn().Mutate(ctx, &api.Mutation{ - CommitNow: true, - SetNquads: []byte(fmt.Sprintf(` - <%s> "El laberinto del fauno" . - <%s> "Black Panther 2" . - `, original[ns].Uids["x4"], original[ns].Uids["x5"])), - }) - require.NoError(t, err) - return incr2 - } - incr3 := make(map[uint64]*api.Response) - incr3[x.GalaxyNamespace] = addMoreData3(dg, x.GalaxyNamespace) - incr3[ns] = addMoreData3(dg1, ns) - - // Perform second full backup. - _ = runBackupInternal(t, galaxyToken, true, 12, 4) - restored = runRestore(t, copyBackupDir, "", - x.Max(incr3[x.GalaxyNamespace].Txn.CommitTs, incr3[ns].Txn.CommitTs), []uint64{x.GalaxyNamespace, ns}) - testutil.CheckSchema(t, preds, types) - - // Check all the values were restored to their most recent value. - checks = []struct { - blank, expected string - }{ - {blank: "x1", expected: "Birdman or (The Unexpected Virtue of Ignorance)"}, - {blank: "x2", expected: "Spotlight"}, - {blank: "x3", expected: "Moonlight"}, - {blank: "x4", expected: "El laberinto del fauno"}, - {blank: "x5", expected: "Black Panther 2"}, - } - for ns, orig := range original { - for _, check := range checks { - require.EqualValues(t, check.expected, restored[ns][orig.Uids[check.blank]]) - } - } - - verifyUids(dg, "galaxy", 10000) - verifyUids(dg1, "ns", 10000) - - // Do a DROP_DATA. This will return an error. - err = dg1.Alter(ctx, &api.Operation{DropOp: api.Operation_DATA}) - require.Error(t, err) - require.Contains(t, err.Error(), "Drop data can only be called by the guardian of the galaxy") - verifyUids(dg, "galaxy", 10000) - verifyUids(dg1, "ns", 10000) + query := `{ q(func: has(movie)) { count(uid) } }` + expectedResponse := `{ "q": [{ "count": 5 }]}` + testutil.VerifyQueryResponse(t, dg, query, expectedResponse) + testutil.VerifyQueryResponse(t, dg1, query, expectedResponse) // After deleting a namespace in incremental backup, we should not be able to get the data from // banned namespace. require.NoError(t, testutil.DeleteNamespace(t, galaxyToken, ns)) - dirs := runBackup(t, galaxyToken, 15, 5) - restored = runRestore(t, copyBackupDir, "", math.MaxUint64, []uint64{x.GalaxyNamespace, ns}) - - // Check that we do not restore the data from ns namespace. - require.Len(t, restored[x.GalaxyNamespace], 5) - require.Len(t, restored[ns], 0) - verifyUids(dg, "galaxy", 10000) - - // Remove the full backup testDirs and verify restore catches the error. - require.NoError(t, os.RemoveAll(dirs[0])) - require.NoError(t, os.RemoveAll(dirs[3])) - common.RunFailingRestore(t, copyBackupDir, "", - x.Max(incr3[x.GalaxyNamespace].Txn.CommitTs, incr3[ns].Txn.CommitTs)) + _ = runBackup(t, galaxyToken, 6, 2) + testutil.DropAll(t, dg) + sendRestoreRequest(t, alphaBackupDir, galaxyToken.AccessJwt) + testutil.WaitForRestore(t, dg) + query = `{ q(func: has(movie)) { count(uid) } }` + expectedResponse = `{ "q": [{ "count": 5 }]}` + testutil.VerifyQueryResponse(t, dg, query, expectedResponse) + expectedResponse = `{ "q": [{ "count": 0 }]}` + testutil.VerifyQueryResponse(t, dg1, query, expectedResponse) - // Clean up test directories. common.DirCleanup(t) } @@ -359,30 +177,32 @@ func runBackupInternal(t *testing.T, token *testutil.HttpToken, forceFull bool, return dirs } -func runRestore(t *testing.T, backupLocation, lastDir string, commitTs uint64, - ns []uint64) map[uint64]map[string]string { - // Recreate the restore directory to make sure there's no previous data when - // calling restore. - require.NoError(t, os.RemoveAll(restoreDir)) - - t.Logf("--- Restoring from: %q", backupLocation) - result := worker.RunRestore("./data/restore", backupLocation, lastDir, x.SensitiveByteSlice(nil), options.Snappy, 0) - require.NoError(t, result.Err) - - for i, pdir := range []string{"p1", "p2", "p3"} { - pdir = filepath.Join("./data/restore", pdir) - groupId, err := x.ReadGroupIdFile(pdir) - require.NoError(t, err) - require.Equal(t, uint32(i+1), groupId) +func sendRestoreRequest(t *testing.T, location string, token string) { + if location == "" { + location = "/data/backup" } + params := testutil.GraphQLParams{ + Query: `mutation restore($location: String!) { + restore(input: {location: $location}) { + code + message + } + }`, + Variables: map[string]interface{}{ + "location": location, + }, + } + resp := testutil.MakeGQLRequestWithAccessJwt(t, ¶ms, token) + resp.RequireNoGraphQLErrors(t) - restored := make(map[uint64]map[string]string) - var err error - pdir := "./data/restore/p1" - for _, n := range ns { - restored[n], err = testutil.GetPredicateValues(pdir, x.NamespaceAttr(n, "movie"), commitTs) + var restoreResp struct { + Restore struct { + Code string + Message string + } } - require.NoError(t, err) - t.Logf("--- Restored values: %+v\n", restored) - return restored + + require.NoError(t, json.Unmarshal(resp.Data, &restoreResp)) + require.Equal(t, restoreResp.Restore.Code, "Success") + return } diff --git a/worker/backup.go b/worker/backup.go index c7c359a840f..406d02ec552 100644 --- a/worker/backup.go +++ b/worker/backup.go @@ -104,7 +104,7 @@ func GetCredentialsFromRequest(req *pb.BackupRequest) *x.MinioCredentials { } } -func StoreExport(request *pb.ExportRequest, dir string, key x.SensitiveByteSlice) error { +func StoreExport(request *pb.ExportRequest, dir string, key x.Sensitive) error { db, err := badger.OpenManaged(badger.DefaultOptions(dir). WithSyncWrites(false). WithValueThreshold(1 << 10). diff --git a/worker/backup_ee.go b/worker/backup_ee.go index d81833fd4ca..1dab3994567 100644 --- a/worker/backup_ee.go +++ b/worker/backup_ee.go @@ -154,7 +154,7 @@ func ProcessBackupRequest(ctx context.Context, req *pb.BackupRequest, forceFull if err != nil { return err } - latestManifest, err := getLatestManifest(handler, uri) + latestManifest, err := GetLatestManifest(handler, uri) if err != nil { return err } @@ -531,7 +531,7 @@ func (pr *BackupProcessor) CompleteBackup(ctx context.Context, m *Manifest) erro return err } - manifest, err := getManifest(handler, uri) + manifest, err := GetManifest(handler, uri) if err != nil { return err } diff --git a/worker/backup_handler.go b/worker/backup_handler.go index 0f4b59cd8e0..460d87455ae 100644 --- a/worker/backup_handler.go +++ b/worker/backup_handler.go @@ -14,15 +14,12 @@ package worker import ( "bytes" - "encoding/json" "fmt" "io" "io/ioutil" "net/url" "os" "path/filepath" - "sort" - "strings" "time" "github.com/dgraph-io/dgraph/protos/pb" @@ -63,122 +60,6 @@ const ( tmpManifest = `manifest_tmp.json` ) -// getConsolidatedManifest walks over all the backup directories and generates a master manifest. -func getConsolidatedManifest(h UriHandler, uri *url.URL) (*MasterManifest, error) { - // If there is a master manifest already, we just return it. - if h.FileExists(backupManifest) { - manifest, err := readMasterManifest(h, backupManifest) - if err != nil { - return &MasterManifest{}, errors.Wrap(err, "Failed to read master manifest") - } - return manifest, nil - } - - // Otherwise, we create a master manifest by going through all the backup directories. - paths := h.ListPaths("") - - var manifestPaths []string - suffix := filepath.Join(string(filepath.Separator), backupManifest) - for _, p := range paths { - if strings.HasSuffix(p, suffix) { - manifestPaths = append(manifestPaths, p) - } - } - - sort.Strings(manifestPaths) - var mlist []*Manifest - - for _, path := range manifestPaths { - path = filepath.Dir(path) - _, path = filepath.Split(path) - m, err := readManifest(h, filepath.Join(path, backupManifest)) - if err != nil { - return nil, errors.Wrap(err, "While Getting latest manifest") - } - m.Path = path - mlist = append(mlist, m) - } - return &MasterManifest{Manifests: mlist}, nil -} - -func readManifest(h UriHandler, path string) (*Manifest, error) { - var m Manifest - b, err := h.Read(path) - if err != nil { - return &m, errors.Wrap(err, "readManifest failed to read the file: ") - } - if err := json.Unmarshal(b, &m); err != nil { - return &m, errors.Wrap(err, "readManifest failed to unmarshal: ") - } - return &m, nil -} - -func readMasterManifest(h UriHandler, path string) (*MasterManifest, error) { - var m MasterManifest - b, err := h.Read(path) - if err != nil { - return &m, errors.Wrap(err, "readMasterManifest failed to read the file: ") - } - if err := json.Unmarshal(b, &m); err != nil { - return &m, errors.Wrap(err, "readMasterManifest failed to unmarshal: ") - } - return &m, nil -} - -func getLatestManifest(h UriHandler, uri *url.URL) (*Manifest, error) { - if !h.DirExists("./") { - return &Manifest{}, errors.Errorf("getLatestManifest: The uri path: %q doesn't exists", - uri.Path) - } - manifest, err := getConsolidatedManifest(h, uri) - if err != nil { - return nil, errors.Wrap(err, "Get latest manifest failed while consolidation: ") - } - if len(manifest.Manifests) == 0 { - return &Manifest{}, nil - } - return manifest.Manifests[len(manifest.Manifests)-1], nil -} - -func getManifest(h UriHandler, uri *url.URL) (*MasterManifest, error) { - if !h.DirExists("") { - return &MasterManifest{}, errors.Errorf("getManifest: The uri path: %q doesn't exists", - uri.Path) - } - manifest, err := getConsolidatedManifest(h, uri) - if err != nil { - return manifest, errors.Wrap(err, "Failed to get consolidated manifest: ") - } - return manifest, nil -} - -func createManifest(h UriHandler, uri *url.URL, manifest *MasterManifest) error { - var err error - if !h.DirExists("./") { - if err := h.CreateDir("./"); err != nil { - return errors.Wrap(err, "createManifest failed to create path") - } - } - - w, err := h.CreateFile(tmpManifest) - if err != nil { - return errors.Wrap(err, "createManifest failed to create tmp path") - } - if err = json.NewEncoder(w).Encode(manifest); err != nil { - return err - } - if err := w.Close(); err != nil { - return err - } - // Move the tmpManifest to backupManifest, this operation is not atomic for s3. - // We try our best to move the file but if it fails then the user must move it manually. - err = h.Rename(tmpManifest, backupManifest) - return errors.Wrapf(err, "MOVING TEMPORARY MANIFEST TO MAIN MANIFEST FAILED!\n"+ - "It is possible that the manifest would have been corrupted. You must move "+ - "the file: %s to: %s in order to "+ - "fix the backup manifest.", tmpManifest, backupManifest) -} - func createBackupFile(h UriHandler, uri *url.URL, req *pb.BackupRequest) (io.WriteCloser, error) { if !h.DirExists("./") { if err := h.CreateDir("./"); err != nil { @@ -195,254 +76,10 @@ func createBackupFile(h UriHandler, uri *url.URL, req *pb.BackupRequest) (io.Wri return w, errors.Wrap(err, "while creating backup file") } -func Load(h UriHandler, uri *url.URL, backupId string, backupNum uint64, fn loadFn) LoadResult { - manifests, err := getManifestsToRestore(h, uri, backupId, backupNum) - if err != nil { - return LoadResult{Err: errors.Wrapf(err, "cannot retrieve manifests")} - } - - // Process each manifest, first check that they are valid and then confirm the - // backup files for each group exist. Each group in manifest must have a backup file, - // otherwise this is a failure and the user must remedy. - var since uint64 - var maxUid, maxNsId uint64 - for i, manifest := range manifests { - if manifest.Since == 0 || len(manifest.Groups) == 0 { - continue - } - - path := manifests[i].Path - for gid := range manifest.Groups { - file := filepath.Join(path, backupName(manifest.Since, gid)) - reader, err := h.Stream(file) - if err != nil { - return LoadResult{Err: errors.Wrapf(err, "Failed to open %q", file)} - } - defer reader.Close() - - // Only restore the predicates that were assigned to this group at the time - // of the last backup. - predSet := manifests[len(manifests)-1].getPredsInGroup(gid) - - groupMaxUid, groupMaxNsId, err := fn(gid, - &loadBackupInput{r: reader, preds: predSet, dropOperations: manifest.DropOperations, - isOld: manifest.Version == 0}) - if err != nil { - return LoadResult{Err: err} - } - maxUid = x.Max(maxUid, groupMaxUid) - maxNsId = x.Max(maxNsId, groupMaxNsId) - } - since = manifest.Since - } - - return LoadResult{Version: since, MaxLeaseUid: maxUid, MaxLeaseNsId: maxNsId} -} - -// verifyRequest verifies that the manifest satisfies the requirements to process the given -// restore request. -func verifyRequest(h UriHandler, uri *url.URL, req *pb.RestoreRequest, - currentGroups []uint32) error { - - manifests, err := getManifestsToRestore(h, uri, req.GetBackupId(), req.GetBackupNum()) - if err != nil { - return errors.Wrapf(err, "while retrieving manifests") - } - if len(manifests) == 0 { - return errors.Errorf("No backups with the specified backup ID %s", req.GetBackupId()) - } - - // TODO(Ahsan): Do we need to verify the manifests again here? - if err := verifyManifests(manifests); err != nil { - return err - } - - lastManifest := manifests[len(manifests)-1] - if len(currentGroups) != len(lastManifest.Groups) { - return errors.Errorf("groups in cluster and latest backup manifest differ") - } - - for _, group := range currentGroups { - if _, ok := lastManifest.Groups[group]; !ok { - return errors.Errorf("groups in cluster and latest backup manifest differ") - } - } - return nil -} - -func getManifestsToRestore(h UriHandler, uri *url.URL, backupId string, - backupNum uint64) ([]*Manifest, error) { - if !h.DirExists("") { - return nil, errors.Errorf("getManifestsToRestore: The uri path: %q doesn't exists", - uri.Path) - } - - manifest, err := getConsolidatedManifest(h, uri) - if err != nil { - return manifest.Manifests, errors.Wrap(err, "Failed to get consolidated manifest: ") - } - return getFilteredManifests(h, manifest.Manifests, backupId, backupNum) -} - -// loadFn is a function that will receive the current file being read. -// A reader, the backup groupId, and a map whose keys are the predicates to restore -// are passed as arguments. -type loadFn func(groupId uint32, in *loadBackupInput) (uint64, uint64, error) - -// LoadBackup will scan location l for backup files in the given backup series and load them -// sequentially. Returns the maximum Since value on success, otherwise an error. -func LoadBackup(location, backupId string, backupNum uint64, creds *x.MinioCredentials, - fn loadFn) LoadResult { - uri, err := url.Parse(location) - if err != nil { - return LoadResult{Err: err} - } - h, err := NewUriHandler(uri, creds) - if err != nil { - return LoadResult{Err: errors.Errorf("Unsupported URI: %v", uri)} - } - - return Load(h, uri, backupId, backupNum, fn) -} - -// VerifyBackup will access the backup location and verify that the specified backup can -// be restored to the cluster. -func VerifyBackup(req *pb.RestoreRequest, creds *x.MinioCredentials, currentGroups []uint32) error { - uri, err := url.Parse(req.GetLocation()) - if err != nil { - return err - } - - h, err := NewUriHandler(uri, creds) - if err != nil { - return errors.Wrap(err, "VerifyBackup") - } - - return verifyRequest(h, uri, req, currentGroups) -} - -// ListBackupManifests scans location l for backup files and returns the list of manifests. -func ListBackupManifests(l string, creds *x.MinioCredentials) ([]*Manifest, error) { - uri, err := url.Parse(l) - if err != nil { - return nil, err - } - - h, err := NewUriHandler(uri, creds) - if err != nil { - return nil, errors.Wrap(err, "ListBackupManifests") - } - - m, err := getManifest(h, uri) - if err != nil { - return nil, err - } - return m.Manifests, nil -} - -// filterManifests takes a list of manifests and returns the list of manifests -// that should be considered during a restore. -func filterManifests(manifests []*Manifest, backupId string) ([]*Manifest, error) { - // Go through the files in reverse order and stop when the latest full backup is found. - var filteredManifests []*Manifest - for i := len(manifests) - 1; i >= 0; i-- { - // If backupId is not empty, skip all the manifests that do not match the given - // backupId. If it's empty, do not skip any manifests as the default behavior is - // to restore the latest series of backups. - if len(backupId) > 0 && manifests[i].BackupId != backupId { - continue - } - - filteredManifests = append(filteredManifests, manifests[i]) - if manifests[i].Type == "full" { - break - } - } - - // Reverse the filtered lists since the original iteration happened in reverse. - for i := len(filteredManifests)/2 - 1; i >= 0; i-- { - opp := len(filteredManifests) - 1 - i - filteredManifests[i], filteredManifests[opp] = filteredManifests[opp], filteredManifests[i] - } - - if err := verifyManifests(filteredManifests); err != nil { - return nil, err - } - - return filteredManifests, nil -} - -func verifyManifests(manifests []*Manifest) error { - if len(manifests) == 0 { - return nil - } - - if manifests[0].BackupNum != 1 { - return errors.Errorf("expected a BackupNum value of 1 for first manifest but got %d", - manifests[0].BackupNum) - } - - backupId := manifests[0].BackupId - var backupNum uint64 - for _, manifest := range manifests { - if manifest.BackupId != backupId { - return errors.Errorf("found a manifest with backup ID %s but expected %s", - manifest.BackupId, backupId) - } - - backupNum++ - if manifest.BackupNum != backupNum { - return errors.Errorf("found a manifest with backup number %d but expected %d", - manifest.BackupNum, backupNum) - } - } - - return nil -} - func backupName(since uint64, groupId uint32) string { return fmt.Sprintf(backupNameFmt, since, groupId) } -func getFilteredManifests(h UriHandler, manifests []*Manifest, backupId string, - backupNum uint64) ([]*Manifest, error) { - - // validManifests are the ones for which the corresponding backup files exists. - var validManifests []*Manifest - for _, m := range manifests { - missingFiles := false - for g, _ := range m.Groups { - path := filepath.Join(m.Path, backupName(m.Since, g)) - if !h.FileExists(path) { - missingFiles = true - break - } - } - if !missingFiles { - validManifests = append(validManifests, m) - } - } - manifests, err := filterManifests(validManifests, backupId) - if err != nil { - return nil, err - } - - // Sort manifests in the ascending order of their BackupNum so that the first - // manifest corresponds to the first full backup and so on. - sort.Slice(manifests, func(i, j int) bool { - return manifests[i].BackupNum < manifests[j].BackupNum - }) - - if backupNum > 0 { - if len(manifests) < int(backupNum) { - return nil, errors.Errorf("not enough backups to restore manifest with backupNum %d", - backupNum) - } - manifests = manifests[:backupNum] - } - return manifests, nil -} - // UriHandler interface is implemented by URI scheme handlers. // When adding new scheme handles, for example 'azure://', an object will implement // this interface to supply Dgraph with a way to create or load backup files into DB. diff --git a/worker/backup_handler_test.go b/worker/backup_handler_test.go index 1bc54ab3b35..048ac1806c0 100644 --- a/worker/backup_handler_test.go +++ b/worker/backup_handler_test.go @@ -11,113 +11,3 @@ */ package worker - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -func TestFilterManifestDefault(t *testing.T) { - manifests := []*Manifest{ - { - Type: "full", - BackupId: "aa", - BackupNum: 1, - }, - { - Type: "full", - BackupId: "ab", - BackupNum: 1, - }, - } - expected := []*Manifest{ - { - Type: "full", - BackupId: "ab", - BackupNum: 1, - }, - } - manifests, err := filterManifests(manifests, "") - require.NoError(t, err) - require.Equal(t, manifests, expected) -} - -func TestFilterManifestSelectSeries(t *testing.T) { - manifests := []*Manifest{ - { - Type: "full", - BackupId: "aa", - BackupNum: 1, - }, - { - Type: "full", - BackupId: "ab", - BackupNum: 1, - }, - } - expected := []*Manifest{ - { - Type: "full", - BackupId: "aa", - BackupNum: 1, - }, - } - manifests, err := filterManifests(manifests, "aa") - require.NoError(t, err) - require.Equal(t, manifests, expected) -} - -func TestFilterManifestMissingBackup(t *testing.T) { - manifests := []*Manifest{ - { - Type: "full", - BackupId: "aa", - BackupNum: 1, - }, - { - Type: "incremental", - BackupId: "aa", - BackupNum: 3, - }, - } - _, err := filterManifests(manifests, "aa") - require.Error(t, err) - require.Contains(t, err.Error(), "found a manifest with backup number") -} - -func TestFilterManifestMissingFirstBackup(t *testing.T) { - manifests := []*Manifest{ - { - Type: "incremental", - BackupId: "aa", - BackupNum: 2, - }, - { - Type: "incremental", - BackupId: "aa", - BackupNum: 3, - }, - } - _, err := filterManifests(manifests, "aa") - require.Error(t, err) - require.Contains(t, err.Error(), "expected a BackupNum value of 1 for first manifest") -} - -func TestFilterManifestDifferentSeries(t *testing.T) { - manifests := []*Manifest{ - { - Type: "full", - BackupId: "aa", - BackupNum: 1, - }, - { - Type: "incremental", - BackupId: "ab", - BackupNum: 2, - }, - } - _, err := filterManifests(manifests, "") - require.Error(t, err) - require.Contains(t, err.Error(), "found a manifest with backup ID") -} diff --git a/worker/backup_manifest.go b/worker/backup_manifest.go new file mode 100644 index 00000000000..ba9c78e57d6 --- /dev/null +++ b/worker/backup_manifest.go @@ -0,0 +1,257 @@ +// +build !oss + +/* + * Copyright 2021 Dgraph Labs, Inc. and Contributors + * + * Licensed under the Dgraph Community License (the "License"); you + * may not use this file except in compliance with the License. You + * may obtain a copy of the License at + * + * https://github.com/dgraph-io/dgraph/blob/master/licenses/DCL.txt + */ + +package worker + +import ( + "encoding/json" + "net/url" + "path/filepath" + "sort" + "strings" + + "github.com/dgraph-io/dgraph/protos/pb" + "github.com/dgraph-io/dgraph/x" + "github.com/pkg/errors" +) + +func verifyManifests(manifests []*Manifest) error { + if len(manifests) == 0 { + return nil + } + + lastIndex := len(manifests) - 1 + if manifests[lastIndex].BackupNum != 1 { + return errors.Errorf("expected a BackupNum value of 1 for first manifest but got %d", + manifests[lastIndex].BackupNum) + } + + backupId := manifests[lastIndex].BackupId + backupNum := uint64(len(manifests)) + for _, manifest := range manifests { + if manifest.BackupId != backupId { + return errors.Errorf("found a manifest with backup ID %s but expected %s", + manifest.BackupId, backupId) + } + + if manifest.BackupNum != backupNum { + return errors.Errorf("found a manifest with backup number %d but expected %d", + manifest.BackupNum, backupNum) + } + backupNum-- + } + + return nil +} + +func getManifestsToRestore( + h UriHandler, uri *url.URL, req *pb.RestoreRequest) ([]*Manifest, error) { + + if !h.DirExists("") { + return nil, errors.Errorf("getManifestsToRestore: The uri path: %q doesn't exists", + uri.Path) + } + manifest, err := getConsolidatedManifest(h, uri) + if err != nil { + return manifest.Manifests, errors.Wrap(err, "Failed to get consolidated manifest: ") + } + return getFilteredManifests(h, manifest.Manifests, req) +} + +func getFilteredManifests(h UriHandler, manifests []*Manifest, + req *pb.RestoreRequest) ([]*Manifest, error) { + + // filter takes a list of manifests and returns the list of manifests + // that should be considered during a restore. + filter := func(manifests []*Manifest, backupId string) ([]*Manifest, error) { + // Go through the files in reverse order and stop when the latest full backup is found. + var out []*Manifest + for i := len(manifests) - 1; i >= 0; i-- { + // If backupId is not empty, skip all the manifests that do not match the given + // backupId. If it's empty, do not skip any manifests as the default behavior is + // to restore the latest series of backups. + if len(backupId) > 0 && manifests[i].BackupId != backupId { + continue + } + + out = append(out, manifests[i]) + if manifests[i].Type == "full" { + break + } + } + + if err := verifyManifests(out); err != nil { + return nil, err + } + return out, nil + } + + // validManifests are the ones for which the corresponding backup files exists. + var validManifests []*Manifest + for _, m := range manifests { + missingFiles := false + for g, _ := range m.Groups { + path := filepath.Join(m.Path, backupName(m.Since, g)) + if !h.FileExists(path) { + missingFiles = true + break + } + } + if !missingFiles { + validManifests = append(validManifests, m) + } + } + manifests, err := filter(validManifests, req.BackupId) + if err != nil { + return nil, err + } + + if req.BackupNum > 0 { + if len(manifests) < int(req.BackupNum) { + return nil, errors.Errorf("not enough backups to restore manifest with backupNum %d", + req.BackupNum) + } + manifests = manifests[len(manifests)-int(req.BackupNum):] + } + return manifests, nil +} + +// getConsolidatedManifest walks over all the backup directories and generates a master manifest. +func getConsolidatedManifest(h UriHandler, uri *url.URL) (*MasterManifest, error) { + // If there is a master manifest already, we just return it. + if h.FileExists(backupManifest) { + manifest, err := readMasterManifest(h, backupManifest) + if err != nil { + return &MasterManifest{}, errors.Wrap(err, "Failed to read master manifest") + } + return manifest, nil + } + + // Otherwise, we create a master manifest by going through all the backup directories. + paths := h.ListPaths("") + + var manifestPaths []string + suffix := filepath.Join(string(filepath.Separator), backupManifest) + for _, p := range paths { + if strings.HasSuffix(p, suffix) { + manifestPaths = append(manifestPaths, p) + } + } + + sort.Strings(manifestPaths) + var mlist []*Manifest + + for _, path := range manifestPaths { + path = filepath.Dir(path) + _, path = filepath.Split(path) + m, err := readManifest(h, filepath.Join(path, backupManifest)) + if err != nil { + return nil, errors.Wrap(err, "While Getting latest manifest") + } + m.Path = path + mlist = append(mlist, m) + } + return &MasterManifest{Manifests: mlist}, nil +} + +func readManifest(h UriHandler, path string) (*Manifest, error) { + var m Manifest + b, err := h.Read(path) + if err != nil { + return &m, errors.Wrap(err, "readManifest failed to read the file: ") + } + if err := json.Unmarshal(b, &m); err != nil { + return &m, errors.Wrap(err, "readManifest failed to unmarshal: ") + } + return &m, nil +} + +func GetLatestManifest(h UriHandler, uri *url.URL) (*Manifest, error) { + manifest, err := GetManifest(h, uri) + if err != nil { + return &Manifest{}, errors.Wrap(err, "Fialed to get the manifest") + } + if len(manifest.Manifests) == 0 { + return &Manifest{}, nil + } + return manifest.Manifests[len(manifest.Manifests)-1], nil +} + +func readMasterManifest(h UriHandler, path string) (*MasterManifest, error) { + var m MasterManifest + b, err := h.Read(path) + if err != nil { + return &m, errors.Wrap(err, "readMasterManifest failed to read the file: ") + } + if err := json.Unmarshal(b, &m); err != nil { + return &m, errors.Wrap(err, "readMasterManifest failed to unmarshal: ") + } + return &m, nil +} + +func GetManifest(h UriHandler, uri *url.URL) (*MasterManifest, error) { + if !h.DirExists("") { + return &MasterManifest{}, errors.Errorf("getManifest: The uri path: %q doesn't exists", + uri.Path) + } + manifest, err := getConsolidatedManifest(h, uri) + if err != nil { + return manifest, errors.Wrap(err, "Failed to get consolidated manifest: ") + } + return manifest, nil +} + +func createManifest(h UriHandler, uri *url.URL, manifest *MasterManifest) error { + var err error + if !h.DirExists("./") { + if err := h.CreateDir("./"); err != nil { + return errors.Wrap(err, "createManifest failed to create path") + } + } + + w, err := h.CreateFile(tmpManifest) + if err != nil { + return errors.Wrap(err, "createManifest failed to create tmp path") + } + if err = json.NewEncoder(w).Encode(manifest); err != nil { + return err + } + if err := w.Close(); err != nil { + return err + } + // Move the tmpManifest to backupManifest, this operation is not atomic for s3. + // We try our best to move the file but if it fails then the user must move it manually. + err = h.Rename(tmpManifest, backupManifest) + return errors.Wrapf(err, "MOVING TEMPORARY MANIFEST TO MAIN MANIFEST FAILED!\n"+ + "It is possible that the manifest would have been corrupted. You must move "+ + "the file: %s to: %s in order to "+ + "fix the backup manifest.", tmpManifest, backupManifest) +} + +// ListBackupManifests scans location l for backup files and returns the list of manifests. +func ListBackupManifests(l string, creds *x.MinioCredentials) ([]*Manifest, error) { + uri, err := url.Parse(l) + if err != nil { + return nil, err + } + + h, err := NewUriHandler(uri, creds) + if err != nil { + return nil, errors.Wrap(err, "ListBackupManifests") + } + + m, err := GetManifest(h, uri) + if err != nil { + return nil, err + } + return m.Manifests, nil +} diff --git a/worker/config.go b/worker/config.go index 7d520a6bd08..89b399a9a86 100644 --- a/worker/config.go +++ b/worker/config.go @@ -58,7 +58,7 @@ type Options struct { WalCache int64 // HmacSecret stores the secret used to sign JSON Web Tokens (JWT). - HmacSecret x.SensitiveByteSlice + HmacSecret x.Sensitive // AccessJwtTtl is the TTL for the access JWT. AccessJwtTtl time.Duration // RefreshJwtTtl is the TTL of the refresh JWT. diff --git a/worker/export.go b/worker/export.go index 807e1740854..da3f288ad83 100644 --- a/worker/export.go +++ b/worker/export.go @@ -145,7 +145,7 @@ func (e *exporter) toJSON() (*bpb.KVList, error) { continuing := false mapStart := fmt.Sprintf(" {\"uid\":"+uidFmtStrJson+`,"namespace":"0x%x"`, e.uid, e.namespace) - err := e.pl.Iterate(e.readTs, 0, func(p *pb.Posting) error { + err := e.pl.IterateAll(e.readTs, 0, func(p *pb.Posting) error { if continuing { fmt.Fprint(bp, ",\n") } else { @@ -218,7 +218,7 @@ func (e *exporter) toRDF() (*bpb.KVList, error) { bp := new(bytes.Buffer) prefix := fmt.Sprintf(uidFmtStrRdf+" <%s> ", e.uid, e.attr) - err := e.pl.Iterate(e.readTs, 0, func(p *pb.Posting) error { + err := e.pl.IterateAll(e.readTs, 0, func(p *pb.Posting) error { fmt.Fprint(bp, prefix) if p.PostingType == pb.Posting_REF { fmt.Fprint(bp, fmt.Sprintf(uidFmtStrRdf, p.Uid)) @@ -359,7 +359,7 @@ func fieldToString(update *pb.SchemaUpdate) string { return builder.String() } -type fileWriter struct { +type ExportWriter struct { fd *os.File bw *bufio.Writer gw *gzip.Writer @@ -367,7 +367,7 @@ type fileWriter struct { hasDataBefore bool } -func (writer *fileWriter) open(fpath string) error { +func (writer *ExportWriter) open(fpath string) error { var err error writer.fd, err = os.Create(fpath) if err != nil { @@ -382,7 +382,7 @@ func (writer *fileWriter) open(fpath string) error { return err } -func (writer *fileWriter) Close() error { +func (writer *ExportWriter) Close() error { if err := writer.gw.Flush(); err != nil { return err } @@ -401,9 +401,9 @@ func (writer *fileWriter) Close() error { // ExportedFiles has the relative path of files that were written during export type ExportedFiles []string -type exportStorage interface { - openFile(relativePath string) (*fileWriter, error) - finishWriting(fs ...*fileWriter) (ExportedFiles, error) +type ExportStorage interface { + OpenFile(relativePath string) (*ExportWriter, error) + FinishWriting(w *Writers) (ExportedFiles, error) } type localExportStorage struct { @@ -432,8 +432,8 @@ func newLocalExportStorage(destination, backupName string) (*localExportStorage, return &localExportStorage{destination, backupName}, nil } -func (l *localExportStorage) openFile(fileName string) (*fileWriter, error) { - fw := &fileWriter{relativePath: filepath.Join(l.relativePath, fileName)} +func (l *localExportStorage) OpenFile(fileName string) (*ExportWriter, error) { + fw := &ExportWriter{relativePath: filepath.Join(l.relativePath, fileName)} filePath, err := filepath.Abs(filepath.Join(l.destination, fw.relativePath)) if err != nil { @@ -449,17 +449,21 @@ func (l *localExportStorage) openFile(fileName string) (*fileWriter, error) { return fw, nil } -func (l *localExportStorage) finishWriting(fs ...*fileWriter) (ExportedFiles, error) { - var files ExportedFiles - - for _, file := range fs { - err := file.Close() - if err != nil { - return nil, err - } - files = append(files, file.relativePath) +func (l *localExportStorage) FinishWriting(w *Writers) (ExportedFiles, error) { + if err := w.DataWriter.Close(); err != nil { + return nil, err + } + if err := w.SchemaWriter.Close(); err != nil { + return nil, err + } + if err := w.GqlSchemaWriter.Close(); err != nil { + return nil, err + } + files := ExportedFiles{ + w.DataWriter.relativePath, + w.SchemaWriter.relativePath, + w.GqlSchemaWriter.relativePath, } - return files, nil } @@ -496,12 +500,12 @@ func newRemoteExportStorage(in *pb.ExportRequest, backupName string) (*remoteExp return &remoteExportStorage{mc, bucket, prefix, localStorage}, nil } -func (r *remoteExportStorage) openFile(fileName string) (*fileWriter, error) { - return r.les.openFile(fileName) +func (r *remoteExportStorage) OpenFile(fileName string) (*ExportWriter, error) { + return r.les.OpenFile(fileName) } -func (r *remoteExportStorage) finishWriting(fs ...*fileWriter) (ExportedFiles, error) { - files, err := r.les.finishWriting(fs...) +func (r *remoteExportStorage) FinishWriting(w *Writers) (ExportedFiles, error) { + files, err := r.les.FinishWriting(w) if err != nil { return nil, err } @@ -527,7 +531,7 @@ func (r *remoteExportStorage) finishWriting(fs ...*fileWriter) (ExportedFiles, e return files, nil } -func newExportStorage(in *pb.ExportRequest, backupName string) (exportStorage, error) { +func NewExportStorage(in *pb.ExportRequest, backupName string) (ExportStorage, error) { switch { case strings.HasPrefix(in.Destination, "/"): return newLocalExportStorage(in.Destination, backupName) @@ -556,6 +560,170 @@ func export(ctx context.Context, in *pb.ExportRequest) (ExportedFiles, error) { return exportInternal(ctx, in, pstore, false) } +func ToExportKvList(pk x.ParsedKey, pl *posting.List, in *pb.ExportRequest) (*bpb.KVList, error) { + e := &exporter{ + readTs: in.ReadTs, + uid: pk.Uid, + namespace: x.ParseNamespace(pk.Attr), + attr: x.ParseAttr(pk.Attr), + pl: pl, + } + + emptyList := &bpb.KVList{} + switch { + // These predicates are not required in the export data. + case e.attr == "dgraph.graphql.xid": + case e.attr == "dgraph.drop.op": + case e.attr == "dgraph.graphql.p_query": + + case pk.IsData() && e.attr == "dgraph.graphql.schema": + // Export the graphql schema. + vals, err := pl.AllValues(in.ReadTs) + if err != nil { + return emptyList, errors.Wrapf(err, "cannot read value of GraphQL schema") + } + // if the GraphQL schema node was deleted with S * * delete mutation, + // then the data key will be overwritten with nil value. + // So, just skip exporting it as there will be no value for this data key. + if len(vals) == 0 { + return emptyList, nil + } + // Give an error only if we find more than one value for the schema. + if len(vals) > 1 { + return emptyList, errors.Errorf("found multiple values for the GraphQL schema") + } + val, ok := vals[0].Value.([]byte) + if !ok { + return emptyList, errors.Errorf("cannot convert value of GraphQL schema to byte array") + } + + exported := x.ExportedGQLSchema{ + Namespace: e.namespace, + Schema: string(val), + } + if val, err = json.Marshal(exported); err != nil { + return emptyList, errors.Wrapf(err, "Error marshalling GraphQL schema to json") + } + kv := &bpb.KV{ + Value: val, + Version: 2, // GraphQL schema value + } + return listWrap(kv), nil + + // below predicates no longer exist internally starting v21.03 but leaving them here + // so that users with a binary with version >= 21.03 can export data from a version < 21.03 + // without this internal data showing up. + case e.attr == "dgraph.cors": + case e.attr == "dgraph.graphql.schema_created_at": + case e.attr == "dgraph.graphql.schema_history": + case e.attr == "dgraph.graphql.p_sha256hash": + + case pk.IsData(): + // The GraphQL layer will create a node of type "dgraph.graphql". That entry + // should not be exported. + if e.attr == "dgraph.type" { + vals, err := e.pl.AllValues(in.ReadTs) + if err != nil { + return emptyList, errors.Wrapf(err, "cannot read value of dgraph.type entry") + } + if len(vals) == 1 { + val, ok := vals[0].Value.([]byte) + if !ok { + return emptyList, errors.Errorf("cannot read value of dgraph.type entry") + } + if string(val) == "dgraph.graphql" { + return emptyList, nil + } + } + } + + switch in.Format { + case "json": + return e.toJSON() + case "rdf": + return e.toRDF() + default: + glog.Fatalf("Invalid export format found: %s", in.Format) + } + + default: + glog.Fatalf("Invalid key found: %+v %v\n", pk, hex.Dump([]byte(pk.Attr))) + } + return emptyList, nil +} + +func WriteExport(writers *Writers, kv *bpb.KV, format string) error { + // Skip nodes that have no data. Otherwise, the exported data could have + // formatting and/or syntax errors. + if len(kv.Value) == 0 { + return nil + } + + var dataSeparator []byte + switch format { + case "json": + dataSeparator = []byte(",\n") + case "rdf": + // The separator for RDF should be empty since the toRDF function already + // adds newline to each RDF entry. + default: + glog.Fatalf("Invalid export format found: %s", format) + } + + var writer *ExportWriter + var sep []byte + switch kv.Version { + case 1: // data + writer = writers.DataWriter + sep = dataSeparator + case 2: // graphQL schema + writer = writers.GqlSchemaWriter + sep = []byte(",\n") // use json separator. + case 3: // graphQL schema + writer = writers.SchemaWriter + default: + glog.Fatalf("Invalid data type found: %x", kv.Key) + } + + if writer.hasDataBefore { + if _, err := writer.gw.Write(sep); err != nil { + return err + } + } + // change the hasDataBefore flag so that the next data entry will have a separator + // prepended + writer.hasDataBefore = true + + _, err := writer.gw.Write(kv.Value) + return err +} + +type Writers struct { + DataWriter *ExportWriter + SchemaWriter *ExportWriter + GqlSchemaWriter *ExportWriter +} + +func InitWriters(s ExportStorage, in *pb.ExportRequest) (*Writers, error) { + xfmt := exportFormats[in.Format] + w := &Writers{} + fileName := func(ext string) string { + return fmt.Sprintf("g%02d%s", in.GroupId, ext) + } + + var err error + if w.DataWriter, err = s.OpenFile(fileName(xfmt.ext + ".gz")); err != nil { + return w, err + } + if w.SchemaWriter, err = s.OpenFile(fileName(".schema.gz")); err != nil { + return w, err + } + if w.GqlSchemaWriter, err = s.OpenFile(fileName(".gql_schema.gz")); err != nil { + return w, err + } + return w, nil +} + // exportInternal contains the core logic to export a Dgraph database. If skipZero is set to // false, the parts of this method that require to talk to zero will be skipped. This is useful // when exporting a p directory directly from disk without a running cluster. @@ -563,31 +731,17 @@ func export(ctx context.Context, in *pb.ExportRequest) (ExportedFiles, error) { // and types. func exportInternal(ctx context.Context, in *pb.ExportRequest, db *badger.DB, skipZero bool) (ExportedFiles, error) { + uts := time.Unix(in.UnixTs, 0) - exportStorage, err := newExportStorage(in, + exportStorage, err := NewExportStorage(in, fmt.Sprintf("dgraph.r%d.u%s", in.ReadTs, uts.UTC().Format("0102.1504"))) if err != nil { return nil, err } - - xfmt := exportFormats[in.Format] - - dataWriter, err := exportStorage.openFile(fmt.Sprintf("g%02d%s", in.GroupId, xfmt.ext+".gz")) - if err != nil { - return nil, err - } - - schemaWriter, err := exportStorage.openFile(fmt.Sprintf("g%02d%s", in.GroupId, ".schema.gz")) - if err != nil { - return nil, err - } - - gqlSchemaWriter, err := exportStorage.openFile( - fmt.Sprintf("g%02d%s", in.GroupId, ".gql_schema.gz")) + writers, err := InitWriters(exportStorage, in) if err != nil { - return nil, err + return nil, errors.Wrap(err, "exportInternal failed") } - // This stream exports only the data and the graphQL schema. stream := db.NewStreamAt(in.ReadTs) stream.Prefix = []byte{x.DefaultPrefix} @@ -627,6 +781,7 @@ func exportInternal(ctx context.Context, in *pb.ExportRequest, db *badger.DB, } return pk.IsData() } + stream.KeyToList = func(key []byte, itr *badger.Iterator) (*bpb.KVList, error) { item := itr.Item() pk, err := x.Parse(item.Key()) @@ -635,115 +790,11 @@ func exportInternal(ctx context.Context, in *pb.ExportRequest, db *badger.DB, hex.EncodeToString(item.Key())) return nil, err } - e := &exporter{ - readTs: in.ReadTs, - } - e.uid = pk.Uid - e.namespace, e.attr = x.ParseNamespaceAttr(pk.Attr) - - switch { - case e.attr == "dgraph.graphql.xid": - // Ignore this predicate. - case e.attr == "dgraph.drop.op": - // Ignore this predicate. - case e.attr == "dgraph.graphql.p_query": - // Ignore this predicate. - case pk.IsData() && e.attr == "dgraph.graphql.schema": - // Export the graphql schema. - pl, err := posting.ReadPostingList(key, itr) - if err != nil { - return nil, errors.Wrapf(err, "cannot read posting list for GraphQL schema") - } - vals, err := pl.AllValues(in.ReadTs) - if err != nil { - return nil, errors.Wrapf(err, "cannot read value of GraphQL schema") - } - // if the GraphQL schema node was deleted with S * * delete mutation, - // then the data key will be overwritten with nil value. - // So, just skip exporting it as there will be no value for this data key. - if len(vals) == 0 { - return nil, nil - } - // Give an error only if we find more than one value for the schema. - if len(vals) > 1 { - return nil, errors.Errorf("found multiple values for the GraphQL schema") - } - val, ok := vals[0].Value.([]byte) - if !ok { - return nil, errors.Errorf("cannot convert value of GraphQL schema to byte array") - } - - exported := x.ExportedGQLSchema{ - Namespace: e.namespace, - Schema: string(val), - } - if val, err = json.Marshal(exported); err != nil { - return nil, errors.Wrapf(err, "Error marshalling GraphQL schema to json") - } - - kv := &bpb.KV{ - Value: val, - Version: 2, // GraphQL schema value - } - return listWrap(kv), nil - - // below predicates no longer exist internally starting v21.03 but leaving them here - // so that users with a binary with version >= 21.03 can export data from a version < 21.03 - // without this internal data showing up. - case e.attr == "dgraph.cors": - case e.attr == "dgraph.graphql.schema_created_at": - case e.attr == "dgraph.graphql.schema_history": - case e.attr == "dgraph.graphql.p_sha256hash": - // Ignore these predicates. - - case pk.IsData(): - e.pl, err = posting.ReadPostingList(key, itr) - if err != nil { - return nil, err - } - - // The GraphQL layer will create a node of type "dgraph.graphql". That entry - // should not be exported. - if e.attr == "dgraph.type" { - vals, err := e.pl.AllValues(in.ReadTs) - if err != nil { - return nil, errors.Wrapf(err, "cannot read value of dgraph.type entry") - } - if len(vals) == 1 { - val, ok := vals[0].Value.([]byte) - if !ok { - return nil, errors.Errorf("cannot read value of dgraph.type entry") - } - if string(val) == "dgraph.graphql" { - return nil, nil - } - } - } - - switch in.Format { - case "json": - return e.toJSON() - case "rdf": - return e.toRDF() - default: - glog.Fatalf("Invalid export format found: %s", in.Format) - } - - default: - glog.Fatalf("Invalid key found: %+v\n", pk) + pl, err := posting.ReadPostingList(key, itr) + if err != nil { + return nil, errors.Wrapf(err, "cannot read posting list") } - return nil, nil - } - - var dataSeparator []byte - switch in.Format { - case "json": - dataSeparator = []byte(",\n") - case "rdf": - // The separator for RDF should be empty since the toRDF function already - // adds newline to each RDF entry. - default: - glog.Fatalf("Invalid export format found: %s", in.Format) + return ToExportKvList(pk, pl, in) } stream.Send = func(buf *z.Buffer) error { @@ -753,36 +804,7 @@ func exportInternal(ctx context.Context, in *pb.ExportRequest, db *badger.DB, if err := kv.Unmarshal(s); err != nil { return err } - // Skip nodes that have no data. Otherwise, the exported data could have - // formatting and/or syntax errors. - if len(kv.Value) == 0 { - return nil - } - - var writer *fileWriter - var separator []byte - switch kv.Version { - case 1: // data - writer = dataWriter - separator = dataSeparator - case 2: // graphQL schema - writer = gqlSchemaWriter - separator = []byte(",\n") // use json separator. - default: - glog.Fatalf("Invalid data type found: %x", kv.Key) - } - - if writer.hasDataBefore { - if _, err := writer.gw.Write(separator); err != nil { - return err - } - } - // change the hasDataBefore flag so that the next data entry will have a separator - // prepended - writer.hasDataBefore = true - - _, err = writer.gw.Write(kv.Value) - return err + return WriteExport(writers, kv, in.Format) }) } @@ -812,65 +834,53 @@ func exportInternal(ctx context.Context, in *pb.ExportRequest, db *badger.DB, return err } + val, err := item.ValueCopy(nil) + if err != nil { + return errors.Wrap(err, "writePrefix failed to get value") + } var kv *bpb.KV switch prefix { case x.ByteSchema: - if !skipZero { - servesTablet, err := groups().ServesTablet(pk.Attr) - if err != nil || !servesTablet { - continue - } - } - - var update pb.SchemaUpdate - err = item.Value(func(val []byte) error { - return update.Unmarshal(val) - }) + kv, err = SchemaExportKv(pk.Attr, val, skipZero) if err != nil { // Let's not propagate this error. We just log this and continue onwards. - glog.Errorf("Unable to unmarshal schema: %+v. Err=%v\n", pk, err) + glog.Errorf("Unable to export schema: %+v. Err=%v\n", pk, err) continue } - kv = toSchema(pk.Attr, &update) - case x.ByteType: - var update pb.TypeUpdate - err := item.Value(func(val []byte) error { - return update.Unmarshal(val) - }) + kv, err = TypeExportKv(pk.Attr, val) if err != nil { // Let's not propagate this error. We just log this and continue onwards. - glog.Errorf("Unable to unmarshal type: %+v. Err=%v\n", pk, err) - return nil + glog.Errorf("Unable to export type: %+v. Err=%v\n", pk, err) + continue } - kv = toType(pk.Attr, update) - default: glog.Fatalf("Unhandled byte prefix: %v", prefix) } // Write to the appropriate writer. - if _, err := schemaWriter.gw.Write(kv.Value); err != nil { + if _, err := writers.SchemaWriter.gw.Write(kv.Value); err != nil { return err } } return nil } + xfmt := exportFormats[in.Format] // All prepwork done. Time to roll. - if _, err = gqlSchemaWriter.gw.Write([]byte(exportFormats["json"].pre)); err != nil { + if _, err = writers.GqlSchemaWriter.gw.Write([]byte(exportFormats["json"].pre)); err != nil { return nil, err } - if _, err = dataWriter.gw.Write([]byte(xfmt.pre)); err != nil { + if _, err = writers.DataWriter.gw.Write([]byte(xfmt.pre)); err != nil { return nil, err } if err := stream.Orchestrate(ctx); err != nil { return nil, err } - if _, err = dataWriter.gw.Write([]byte(xfmt.post)); err != nil { + if _, err = writers.DataWriter.gw.Write([]byte(xfmt.post)); err != nil { return nil, err } - if _, err = gqlSchemaWriter.gw.Write([]byte(exportFormats["json"].post)); err != nil { + if _, err = writers.GqlSchemaWriter.gw.Write([]byte(exportFormats["json"].post)); err != nil { return nil, err } @@ -883,7 +893,30 @@ func exportInternal(ctx context.Context, in *pb.ExportRequest, db *badger.DB, } glog.Infof("Export DONE for group %d at timestamp %d.", in.GroupId, in.ReadTs) - return exportStorage.finishWriting(dataWriter, schemaWriter, gqlSchemaWriter) + return exportStorage.FinishWriting(writers) +} + +func SchemaExportKv(attr string, val []byte, skipZero bool) (*bpb.KV, error) { + if !skipZero { + servesTablet, err := groups().ServesTablet(attr) + if err != nil || !servesTablet { + return nil, errors.Errorf("Tablet not found for attribute: %v", err) + } + } + + var update pb.SchemaUpdate + if err := update.Unmarshal(val); err != nil { + return nil, err + } + return toSchema(attr, &update), nil +} + +func TypeExportKv(attr string, val []byte) (*bpb.KV, error) { + var update pb.TypeUpdate + if err := update.Unmarshal(val); err != nil { + return nil, err + } + return toType(attr, update), nil } // Export request is used to trigger exports for the request list of groups. diff --git a/worker/online_restore_ee.go b/worker/online_restore_ee.go index dff169ca4e3..038bad21035 100644 --- a/worker/online_restore_ee.go +++ b/worker/online_restore_ee.go @@ -13,7 +13,6 @@ package worker import ( - "compress/gzip" "context" "fmt" "net/url" @@ -24,7 +23,6 @@ import ( "github.com/golang/glog" "github.com/dgraph-io/dgraph/conn" - "github.com/dgraph-io/dgraph/ee" "github.com/dgraph-io/dgraph/ee/enc" "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/protos/pb" @@ -226,7 +224,7 @@ func handleRestoreProposal(ctx context.Context, req *pb.RestoreRequest) error { return errors.Wrapf(err, "cannot create backup handler") } - manifests, err := getManifestsToRestore(handler, uri, req.BackupId, req.BackupNum) + manifests, err := getManifestsToRestore(handler, uri, req) if err != nil { return errors.Wrapf(err, "cannot get backup manifests") } @@ -234,7 +232,7 @@ func handleRestoreProposal(ctx context.Context, req *pb.RestoreRequest) error { return errors.Errorf("no backup manifests found at location %s", req.Location) } - lastManifest := manifests[len(manifests)-1] + lastManifest := manifests[0] preds, ok := lastManifest.Groups[req.GroupId] // Version is 0 if the backup was taken on an old version (v20.11). @@ -261,10 +259,14 @@ func handleRestoreProposal(ctx context.Context, req *pb.RestoreRequest) error { } // Write restored values to disk and update the UID lease. - if err := writeBackup(ctx, req); err != nil { + if err := MapBackup(req); err != nil { return errors.Wrapf(err, "cannot write backup") } + if err := reduceToDB(pstore, req.RestoreTs); err != nil { + return errors.Wrap(err, "failed to reduce restore map") + } + // Load schema back. if err := schema.LoadFromDb(); err != nil { return errors.Wrapf(err, "cannot load schema after restore") @@ -329,76 +331,3 @@ func getCredentialsFromRestoreRequest(req *pb.RestoreRequest) *x.MinioCredential Anonymous: req.Anonymous, } } - -func writeBackup(ctx context.Context, req *pb.RestoreRequest) error { - res := LoadBackup(req.Location, req.BackupId, req.BackupNum, - getCredentialsFromRestoreRequest(req), - func(groupId uint32, in *loadBackupInput) (uint64, uint64, error) { - if groupId != req.GroupId { - // LoadBackup will try to call the backup function for every group. - // Exit here if the group is not the one indicated by the request. - return 0, 0, nil - } - - cfg, err := getEncConfig(req) - if err != nil { - return 0, 0, errors.Wrapf(err, "unable to get encryption config") - } - _, encKey := ee.GetKeys(cfg) - in.r, err = enc.GetReader(encKey, in.r) - if err != nil { - return 0, 0, errors.Wrapf(err, "cannot get encrypted reader") - } - gzReader, err := gzip.NewReader(in.r) - if err != nil { - return 0, 0, errors.Wrapf(err, "couldn't create gzip reader") - } - - maxUid, maxNsId, err := loadFromBackup(pstore, &loadBackupInput{ - r: gzReader, - restoreTs: req.RestoreTs, - preds: in.preds, - dropOperations: in.dropOperations, - isOld: in.isOld, - }) - if err != nil { - return 0, 0, errors.Wrapf(err, "cannot write backup") - } - - if maxUid == 0 { - // No need to update the lease, return here. - return 0, 0, nil - } - - // Use the value of maxUid to update the uid lease. - pl := groups().connToZeroLeader() - if pl == nil { - return 0, 0, errors.Errorf( - "cannot update uid lease due to no connection to zero leader") - } - - zc := pb.NewZeroClient(pl.Get()) - leaseID := func(val uint64, typ pb.NumLeaseType) error { - if val == 0 { - return nil - } - _, err := zc.AssignIds(ctx, &pb.Num{Val: val, Type: typ}) - return err - } - - if err := leaseID(maxUid, pb.Num_UID); err != nil { - return 0, 0, errors.Wrapf(err, "cannot update max uid lease after restore.") - } - if err := leaseID(maxNsId, pb.Num_NS_ID); err != nil { - return 0, 0, errors.Wrapf(err, "cannot update max namespace lease after restore.") - } - - // We return the maxUid/maxNsId to enforce the signature of the method but it will - // be ignored as the uid lease was updated above. - return maxUid, maxNsId, nil - }) - if res.Err != nil { - return errors.Wrapf(res.Err, "cannot write backup") - } - return nil -} diff --git a/worker/restore.go b/worker/restore.go index a9b43349a5e..1f2c8321714 100644 --- a/worker/restore.go +++ b/worker/restore.go @@ -14,88 +14,40 @@ package worker import ( "bufio" + "bytes" "compress/gzip" "encoding/binary" "encoding/hex" "fmt" "io" + "io/ioutil" + "log" "math" + "net/url" "os" "path/filepath" + "sort" "strconv" + "strings" + "sync" + "sync/atomic" "github.com/dgraph-io/badger/v3" "github.com/dgraph-io/badger/v3/options" bpb "github.com/dgraph-io/badger/v3/pb" + "github.com/dgraph-io/badger/v3/y" + "github.com/dgraph-io/ristretto/z" "github.com/golang/glog" + "github.com/golang/snappy" "github.com/pkg/errors" + "github.com/dgraph-io/dgraph/ee" "github.com/dgraph-io/dgraph/ee/enc" "github.com/dgraph-io/dgraph/posting" "github.com/dgraph-io/dgraph/protos/pb" "github.com/dgraph-io/dgraph/x" ) -// RunRestore calls badger.Load and tries to load data into a new DB. -func RunRestore(pdir, location, backupId string, key x.SensitiveByteSlice, - ctype options.CompressionType, clevel int) LoadResult { - // Create the pdir if it doesn't exist. - if err := os.MkdirAll(pdir, 0700); err != nil { - return LoadResult{Err: err} - } - - // Scan location for backup files and load them. Each file represents a node group, - // and we create a new p dir for each. - return LoadBackup(location, backupId, 0, nil, - func(groupId uint32, in *loadBackupInput) (uint64, uint64, error) { - - dir := filepath.Join(pdir, fmt.Sprintf("p%d", groupId)) - r, err := enc.GetReader(key, in.r) - if err != nil { - return 0, 0, err - } - - gzReader, err := gzip.NewReader(r) - if err != nil { - if len(key) != 0 { - err = errors.Wrap(err, - "Unable to read the backup. Ensure the encryption key is correct.") - } - return 0, 0, err - } - - if !pathExist(dir) { - fmt.Println("Creating new db:", dir) - } - // The badger DB should be opened only after creating the backup - // file reader and verifying the encryption in the backup file. - db, err := badger.OpenManaged(badger.DefaultOptions(dir). - WithCompression(ctype). - WithZSTDCompressionLevel(clevel). - WithSyncWrites(false). - WithBlockCacheSize(100 * (1 << 20)). - WithIndexCacheSize(100 * (1 << 20)). - WithNumVersionsToKeep(math.MaxInt32). - WithEncryptionKey(key). - WithNamespaceOffset(x.NamespaceOffset)) - if err != nil { - return 0, 0, err - } - defer db.Close() - maxUid, maxNsId, err := loadFromBackup(db, &loadBackupInput{ - r: gzReader, - restoreTs: 0, - preds: in.preds, - dropOperations: in.dropOperations, - isOld: in.isOld, - }) - if err != nil { - return 0, 0, err - } - return maxUid, maxNsId, x.WriteGroupIdFile(dir, uint32(groupId)) - }) -} - type loadBackupInput struct { r io.Reader restoreTs uint64 @@ -104,39 +56,186 @@ type loadBackupInput struct { isOld bool } -// loadFromBackup reads the backup, converts the keys and values to the required format, +type mapper struct { + once sync.Once + buf *z.Buffer + nextId uint32 + thr *y.Throttle +} + +const ( + mapFileSz int = 2 << 30 + partitionBufSz int = 4 << 20 + restoreTmpDir = "restore-tmp" + restoreMapDir = "restore-map" +) + +func newBuffer() *z.Buffer { + path := filepath.Join(x.WorkerConfig.TmpDir, restoreTmpDir) + x.Check(os.MkdirAll(path, 0750)) + buf, err := z.NewBufferWithDir(mapFileSz, 2*mapFileSz, z.UseMmap, + path, "Restore.Buffer") + x.Check(err) + return buf +} + +// mapEntry stores uint16 (2 bytes), which store the length of the key, followed by the key itself. +// The rest of the mapEntry stores the marshalled KV. +// We store the key alongside the protobuf, to make it easier to parse for comparison. +type mapEntry []byte + +func (me mapEntry) Key() []byte { + sz := binary.BigEndian.Uint16(me[0:2]) + return me[2 : 2+sz] +} +func (me mapEntry) Data() []byte { + sz := binary.BigEndian.Uint16(me[0:2]) + return me[2+sz:] +} + +func (mw *mapper) Set(kv *bpb.KV) error { + + key := y.KeyWithTs(kv.Key, kv.Version) + sz := kv.Size() + buf := mw.buf.SliceAllocate(2 + len(key) + sz) + + binary.BigEndian.PutUint16(buf[0:2], uint16(len(key))) + x.AssertTrue(copy(buf[2:], key) == len(key)) + if _, err := kv.MarshalToSizedBuffer(buf[2+len(key):]); err != nil { + return err + } + if mw.buf.LenNoPadding() <= mapFileSz { + return nil + } + return mw.sendForWriting() +} + +func (mw *mapper) newMapFile() (*os.File, error) { + fileNum := atomic.AddUint32(&mw.nextId, 1) + filename := filepath.Join( + x.WorkerConfig.TmpDir, + restoreMapDir, + fmt.Sprintf("%06d.map", fileNum), + ) + glog.Infof("Creating new backup map file at: %q", filename) + x.Check(os.MkdirAll(filepath.Dir(filename), 0750)) + return os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) +} + +func (m *mapper) writeToDisk(buf *z.Buffer) error { + defer buf.Release() + if buf.IsEmpty() { + return nil + } + buf.SortSlice(func(ls, rs []byte) bool { + lme := mapEntry(ls) + rme := mapEntry(rs) + return y.CompareKeys(lme.Key(), rme.Key()) < 0 + }) + + f, err := m.newMapFile() + if err != nil { + return errors.Wrap(err, "openOutputFile") + } + defer f.Close() + + // Create partition keys for the map file. + header := &pb.MapHeader{PartitionKeys: [][]byte{}} + var bufSize int + buf.SliceIterate(func(slice []byte) error { + bufSize += 4 + len(slice) + if bufSize < partitionBufSz { + return nil + } + sz := len(header.PartitionKeys) + me := mapEntry(slice) + if sz > 0 && bytes.Equal(me.Key(), header.PartitionKeys[sz-1]) { + // We already have this key. + return nil + } + header.PartitionKeys = append(header.PartitionKeys, me.Key()) + bufSize = 0 + return nil + }) + + // Write the header to the map file. + headerBuf, err := header.Marshal() + x.Check(err) + var lenBuf [4]byte + binary.BigEndian.PutUint32(lenBuf[:], uint32(len(headerBuf))) + + w := snappy.NewBufferedWriter(f) + x.Check2(w.Write(lenBuf[:])) + x.Check2(w.Write(headerBuf)) + x.Check(err) + + sizeBuf := make([]byte, binary.MaxVarintLen64) + err = buf.SliceIterate(func(slice []byte) error { + n := binary.PutUvarint(sizeBuf, uint64(len(slice))) + _, err := w.Write(sizeBuf[:n]) + x.Check(err) + + _, err = w.Write(slice) + return err + }) + if err != nil { + return errors.Wrap(err, "sliceIterate") + } + if err := w.Close(); err != nil { + return errors.Wrap(err, "writer.Close") + } + if err := f.Sync(); err != nil { + return errors.Wrap(err, "file.Sync") + } + return f.Close() +} + +func (mw *mapper) sendForWriting() error { + if err := mw.thr.Do(); err != nil { + return err + } + go func(buf *z.Buffer) { + err := mw.writeToDisk(buf) + mw.thr.Done(err) + }(mw.buf) + mw.buf = newBuffer() + return nil +} + +func (mw *mapper) Close() error { + cl := func() error { + if err := mw.sendForWriting(); err != nil { + return err + } + if err := mw.thr.Finish(); err != nil { + return err + } + return mw.buf.Release() + } + + var rerr error + mw.once.Do(func() { + rerr = cl() + }) + return rerr +} + +// mapToDisk reads the backup, converts the keys and values to the required format, // and loads them to the given badger DB. The set of predicates is used to avoid restoring // values from predicates no longer assigned to this group. // If restoreTs is greater than zero, the key-value pairs will be written with that timestamp. // Otherwise, the original value is used. // TODO(DGRAPH-1234): Check whether restoreTs can be removed. -func loadFromBackup(db *badger.DB, in *loadBackupInput) (uint64, uint64, error) { +func (m *mapper) Map(in *loadBackupInput, keepSchema bool) error { br := bufio.NewReaderSize(in.r, 16<<10) unmarshalBuf := make([]byte, 1<<10) - - // if there were any DROP operations that need to be applied before loading the backup into - // the db, then apply them here - if err := applyDropOperationsBeforeRestore(db, in.dropOperations); err != nil { - return 0, 0, errors.Wrapf(err, "cannot apply DROP operations while loading backup") - } - - // Delete schemas and types. Each backup file should have a complete copy of the schema. - if err := db.DropPrefix([]byte{x.ByteSchema}); err != nil { - return 0, 0, err - } - if err := db.DropPrefix([]byte{x.ByteType}); err != nil { - return 0, 0, err - } - - loader := db.NewKVLoader(16) - var maxUid, maxNsId uint64 for { var sz uint64 err := binary.Read(br, binary.LittleEndian, &sz) if err == io.EOF { break } else if err != nil { - return 0, 0, err + return err } if cap(unmarshalBuf) < int(sz) { @@ -144,23 +243,23 @@ func loadFromBackup(db *badger.DB, in *loadBackupInput) (uint64, uint64, error) } if _, err = io.ReadFull(br, unmarshalBuf[:sz]); err != nil { - return 0, 0, err + return err } list := &bpb.KVList{} if err := list.Unmarshal(unmarshalBuf[:sz]); err != nil { - return 0, 0, err + return err } for _, kv := range list.Kv { if len(kv.GetUserMeta()) != 1 { - return 0, 0, errors.Errorf( + return errors.Errorf( "Unexpected meta: %v for key: %s", kv.UserMeta, hex.Dump(kv.Key)) } - restoreKey, namespace, err := fromBackupKey(kv.Key) + restoreKey, _, err := fromBackupKey(kv.Key) if err != nil { - return 0, 0, err + return errors.Wrap(err, "fromBackupKey") } // Filter keys using the preds set. Do not do this filtering for type keys @@ -168,33 +267,26 @@ func loadFromBackup(db *badger.DB, in *loadBackupInput) (uint64, uint64, error) // match a predicate name. parsedKey, err := x.Parse(restoreKey) if err != nil { - return 0, 0, errors.Wrapf(err, "could not parse key %s", hex.Dump(restoreKey)) + return errors.Wrapf(err, "could not parse key %s", hex.Dump(restoreKey)) } - if _, ok := in.preds[parsedKey.Attr]; !parsedKey.IsType() && !ok { + if !keepSchema && (parsedKey.IsSchema() || parsedKey.IsType()) { continue } - - // Update the max uid and namespace id that has been seen while restoring this backup. - maxUid = x.Max(maxUid, parsedKey.Uid) - maxNsId = x.Max(maxNsId, namespace) - - // Override the version if requested. Should not be done for type and schema predicates, - // which always have their version set to 1. - if in.restoreTs > 0 && !parsedKey.IsSchema() && !parsedKey.IsType() { - kv.Version = in.restoreTs + if _, ok := in.preds[parsedKey.Attr]; !parsedKey.IsType() && !ok { + continue } switch kv.GetUserMeta()[0] { case posting.BitEmptyPosting, posting.BitCompletePosting, posting.BitDeltaPosting: backupPl := &pb.BackupPostingList{} if err := backupPl.Unmarshal(kv.Value); err != nil { - return 0, 0, errors.Wrapf(err, "while reading backup posting list") + return errors.Wrapf(err, "while reading backup posting list") } pl := posting.FromBackupPostingList(backupPl) shouldSplit, err := posting.ShouldSplit(pl) if err != nil { - return 0, 0, errors.Wrap(err, "Failed to get shouldSplit") + return errors.Wrap(err, "Failed to get shouldSplit") } if !shouldSplit || parsedKey.HasStartUid || len(pl.GetSplits()) > 0 { @@ -210,8 +302,8 @@ func loadFromBackup(db *badger.DB, in *loadBackupInput) (uint64, uint64, error) // posting list. The MarshalPostingList function returns KV // with a zero version. newKv.Version = kv.Version - if err := loader.Set(newKv); err != nil { - return 0, 0, err + if err := m.Set(newKv); err != nil { + return err } } else { // This is a complete list. It should be rolled up to avoid writing @@ -221,11 +313,11 @@ func loadFromBackup(db *badger.DB, in *loadBackupInput) (uint64, uint64, error) kvs, err := l.Rollup(nil) if err != nil { // TODO: wrap errors in this file for easier debugging. - return 0, 0, err + return err } for _, kv := range kvs { - if err := loader.Set(kv); err != nil { - return 0, 0, err + if err := m.Set(kv); err != nil { + return err } } } @@ -253,46 +345,560 @@ func loadFromBackup(db *badger.DB, in *loadBackupInput) (uint64, uint64, error) // Schema and type keys are not stored in an intermediate format so their // value can be written as is. kv.Key = restoreKey - if err := loader.Set(kv); err != nil { - return 0, 0, err + if err := m.Set(kv); err != nil { + return err } default: - return 0, 0, errors.Errorf( + return errors.Errorf( "Unexpected meta %d for key %s", kv.UserMeta[0], hex.Dump(kv.Key)) } } } + return nil +} + +func fromBackupKey(key []byte) ([]byte, uint64, error) { + backupKey := &pb.BackupKey{} + if err := backupKey.Unmarshal(key); err != nil { + return nil, 0, errors.Wrapf(err, "while reading backup key %s", hex.Dump(key)) + } + return x.FromBackupKey(backupKey), backupKey.Namespace, nil +} + +type backupReader struct { + toClose []io.Closer + r io.Reader +} + +func (br *backupReader) Read(p []byte) (n int, err error) { + return br.r.Read(p) +} +func (br *backupReader) Close() (rerr error) { + for i := len(br.toClose) - 1; i >= 0; i-- { + if err := br.toClose[i].Close(); err != nil { + rerr = err + } + } + return rerr +} +func newBackupReader(h UriHandler, file string, encKey x.Sensitive) (*backupReader, error) { + br := &backupReader{} + reader, err := h.Stream(file) + if err != nil { + return nil, errors.Wrapf(err, "Failed to open %q", file) + } + br.toClose = append(br.toClose, reader) + + encReader, err := enc.GetReader(encKey, reader) + if err != nil { + return nil, errors.Wrapf(err, "cannot get encrypted reader") + } + gzReader, err := gzip.NewReader(encReader) + if err != nil { + return nil, errors.Wrapf(err, "couldn't create gzip reader") + } + br.toClose = append(br.toClose, gzReader) + + br.r = bufio.NewReaderSize(gzReader, 16<<10) + return br, nil +} + +func MapBackup(req *pb.RestoreRequest) error { + uri, err := url.Parse(req.Location) + if err != nil { + return err + } + + creds := getCredentialsFromRestoreRequest(req) + h, err := NewUriHandler(uri, creds) + if err != nil { + return err + } + + manifests, err := getManifestsToRestore(h, uri, req) + if err != nil { + return errors.Wrapf(err, "cannot retrieve manifests") + } + + fmt.Printf("Got %d backups to restore ", len(manifests)) + + cfg, err := getEncConfig(req) + if err != nil { + return errors.Wrapf(err, "unable to get encryption config") + } + _, encKey := ee.GetKeys(cfg) + + mapper := &mapper{ + buf: newBuffer(), + thr: y.NewThrottle(3), + } + defer mapper.Close() + + dropAll := false + dropAttr := make(map[string]struct{}) + + // manifests are ordered as: latest..full + for i, manifest := range manifests { + // A dropAll or DropData operation is encountered. No need to restore previous backups. + if dropAll { + break + } + if manifest.Since == 0 || len(manifest.Groups) == 0 { + continue + } + path := manifest.Path + for gid := range manifest.Groups { + if gid != req.GroupId { + // LoadBackup will try to call the backup function for every group. + // Exit here if the group is not the one indicated by the request. + continue + } + file := filepath.Join(path, backupName(manifest.Since, gid)) + + // Only restore the predicates that were assigned to this group at the time + // of the last backup. + predSet := manifests[0].getPredsInGroup(gid) + br, err := newBackupReader(h, file, encKey) + if err != nil { + return errors.Wrap(err, "newBackupReader") + } + + // Only map the predicates which haven't been dropped yet. + for p, _ := range predSet { + if _, ok := dropAttr[p]; ok { + delete(predSet, p) + } + } + in := &loadBackupInput{ + r: br, + preds: predSet, + dropOperations: manifest.DropOperations, + isOld: manifest.Version == 0, + restoreTs: req.RestoreTs, + } + + // Only map the schema keys corresponding to the latest backup. + keepSchema := i == 0 + + // This would stream the backups from the source, and map them in + // Dgraph compatible format on disk. + if err := mapper.Map(in, keepSchema); err != nil { + return errors.Wrap(err, "mapper.Map") + } + if err := br.Close(); err != nil { + return errors.Wrap(err, "br.Close") + } + } + for _, op := range manifest.DropOperations { + switch op.DropOp { + case pb.DropOperation_ALL: + dropAll = true + case pb.DropOperation_DATA: + dropAll = true + case pb.DropOperation_ATTR: + dropAttr[op.DropValue] = struct{}{} + case pb.DropOperation_NS: + // If there is a drop namespace, we just ban the namespace in the pstore. + // TODO: We probably need to propose ban request. + ns, err := strconv.ParseUint(op.DropValue, 0, 64) + if err != nil { + return errors.Wrapf(err, "Map phase failed to parse namespace") + } + if err := pstore.BanNamespace(ns); err != nil { + return errors.Wrapf(err, "Map phase failed to ban namespace: %d", ns) + } + } + } + } + return nil +} + +// VerifyBackup will access the backup location and verify that the specified backup can +// be restored to the cluster. +func VerifyBackup(req *pb.RestoreRequest, creds *x.MinioCredentials, currentGroups []uint32) error { + uri, err := url.Parse(req.GetLocation()) + if err != nil { + return err + } - if err := loader.Finish(); err != nil { - return 0, 0, err + h, err := NewUriHandler(uri, creds) + if err != nil { + return errors.Wrap(err, "VerifyBackup") } - return maxUid, maxNsId, nil + return verifyRequest(h, uri, req, currentGroups) } -func applyDropOperationsBeforeRestore(db *badger.DB, dropOperations []*pb.DropOperation) error { - for _, operation := range dropOperations { - switch operation.DropOp { - case pb.DropOperation_ALL: - return db.DropAll() - case pb.DropOperation_DATA: - return db.DropPrefix([]byte{x.DefaultPrefix}) - case pb.DropOperation_ATTR: - return db.DropPrefix(x.PredicatePrefix(operation.DropValue)) - case pb.DropOperation_NS: - ns, err := strconv.ParseUint(operation.DropValue, 0, 64) - x.Check(err) - return db.BanNamespace(ns) +// verifyRequest verifies that the manifest satisfies the requirements to process the given +// restore request. +func verifyRequest(h UriHandler, uri *url.URL, req *pb.RestoreRequest, + currentGroups []uint32) error { + + manifests, err := getManifestsToRestore(h, uri, req) + if err != nil { + return errors.Wrapf(err, "while retrieving manifests") + } + if len(manifests) == 0 { + return errors.Errorf("No backups with the specified backup ID %s", req.GetBackupId()) + } + + // TODO(Ahsan): Do we need to verify the manifests again here? + if err := verifyManifests(manifests); err != nil { + return err + } + + lastManifest := manifests[0] + if len(currentGroups) != len(lastManifest.Groups) { + return errors.Errorf("groups in cluster and latest backup manifest differ") + } + + for _, group := range currentGroups { + if _, ok := lastManifest.Groups[group]; !ok { + return errors.Errorf("groups in cluster and latest backup manifest differ") } } return nil } -func fromBackupKey(key []byte) ([]byte, uint64, error) { - backupKey := &pb.BackupKey{} - if err := backupKey.Unmarshal(key); err != nil { - return nil, 0, errors.Wrapf(err, "while reading backup key %s", hex.Dump(key)) +type mapIterator struct { + fd *os.File + reader *bufio.Reader + meBuf []byte +} + +func (mi *mapIterator) Next(cbuf *z.Buffer, partitionKey []byte) error { + readMapEntry := func() error { + if len(mi.meBuf) > 0 { + return nil + } + r := mi.reader + sizeBuf, err := r.Peek(binary.MaxVarintLen64) + if err != nil { + return err + } + sz, n := binary.Uvarint(sizeBuf) + if n <= 0 { + log.Fatalf("Could not read uvarint: %d", n) + } + x.Check2(r.Discard(n)) + if cap(mi.meBuf) < int(sz) { + mi.meBuf = make([]byte, int(sz)) + } + mi.meBuf = mi.meBuf[:int(sz)] + x.Check2(io.ReadFull(r, mi.meBuf)) + return nil } - return x.FromBackupKey(backupKey), backupKey.Namespace, nil + for { + if err := readMapEntry(); err == io.EOF { + break + } else if err != nil { + return err + } + key := mapEntry(mi.meBuf).Key() + + if len(partitionKey) == 0 || y.CompareKeys(key, partitionKey) < 0 { + b := cbuf.SliceAllocate(len(mi.meBuf)) + copy(b, mi.meBuf) + mi.meBuf = mi.meBuf[:0] + // map entry is already part of cBuf. + continue + } + // Current key is not part of this batch so track that we have already read the key. + return nil + } + return nil +} + +func (mi *mapIterator) Close() error { + return mi.fd.Close() +} + +func newMapIterator(filename string) (*pb.MapHeader, *mapIterator) { + fd, err := os.Open(filename) + x.Check(err) + r := snappy.NewReader(fd) + + // Read the header size. + reader := bufio.NewReaderSize(r, 16<<10) + headerLenBuf := make([]byte, 4) + x.Check2(io.ReadFull(reader, headerLenBuf)) + headerLen := binary.BigEndian.Uint32(headerLenBuf) + // Reader the map header. + headerBuf := make([]byte, headerLen) + + x.Check2(io.ReadFull(reader, headerBuf)) + header := &pb.MapHeader{} + err = header.Unmarshal(headerBuf) + x.Check(err) + + itr := &mapIterator{ + fd: fd, + reader: reader, + } + return header, itr +} + +func getBuf() *z.Buffer { + path := filepath.Join(x.WorkerConfig.TmpDir, "buffer") + x.Check(os.MkdirAll(path, 0750)) + cbuf, err := z.NewBufferWithDir(64<<20, 64<<30, z.UseCalloc, path, "Restore.GetBuf") + x.Check(err) + cbuf.AutoMmapAfter(1 << 30) + return cbuf +} + +type reducer struct { + mapItrs []*mapIterator + partitionKeys [][]byte + bufferCh chan *z.Buffer + db *badger.DB + writeCh chan *z.Buffer + restoreTs uint64 +} + +func reduceToDB(db *badger.DB, restoreTs uint64) error { + r := NewBackupReducer(db, restoreTs) + return r.Reduce() +} + +func NewBackupReducer(db *badger.DB, restoreTs uint64) *reducer { + return &reducer{ + db: db, + restoreTs: restoreTs, + bufferCh: make(chan *z.Buffer, 10), + writeCh: make(chan *z.Buffer, 10), + } +} + +func (r *reducer) WriteCh() chan *z.Buffer { + return r.writeCh +} + +func (r *reducer) Reduce() error { + var files []string + f := func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if strings.HasSuffix(info.Name(), ".map") { + files = append(files, path) + } + return nil + } + + mapDir := filepath.Join(x.WorkerConfig.TmpDir, restoreMapDir) + defer os.RemoveAll(mapDir) + if err := filepath.Walk(mapDir, f); err != nil { + return err + } + glog.Infof("Got files: %+v\n", files) + + // Pick up map iterators and partition keys. + partitions := make(map[string]struct{}) + for _, fname := range files { + header, itr := newMapIterator(fname) + for _, k := range header.PartitionKeys { + if len(k) == 0 { + continue + } + partitions[string(k)] = struct{}{} + } + r.mapItrs = append(r.mapItrs, itr) + } + + keys := make([][]byte, 0, len(partitions)) + for k := range partitions { + keys = append(keys, []byte(k)) + } + sort.Slice(keys, func(i, j int) bool { + return y.CompareKeys(keys[i], keys[j]) < 0 + }) + // Append nil for the last entries. + keys = append(keys, nil) + r.partitionKeys = keys + + errCh := make(chan error, 2) + go func() { + errCh <- r.blockingRead() + }() + go func() { + errCh <- r.writeToDB() + }() + + for i := 0; i < 2; i++ { + if err := <-errCh; err != nil { + return err + } + } + return nil +} + +func (r *reducer) blockingRead() error { + cbuf := getBuf() + for _, pkey := range r.partitionKeys { + for _, itr := range r.mapItrs { + if err := itr.Next(cbuf, pkey); err != nil { + cbuf.Release() + return err + } + } + if cbuf.LenNoPadding() < 256<<20 { + // Pick up more data. + continue + } + r.bufferCh <- cbuf + cbuf = getBuf() + } + + if !cbuf.IsEmpty() { + r.bufferCh <- cbuf + } else { + cbuf.Release() + } + close(r.bufferCh) + return nil +} + +func (r *reducer) writeToDB() error { + toStreamWriter := func() error { + if r.db == nil { + return nil + } + writer := r.db.NewStreamWriter() + x.Check(writer.Prepare()) + + for buf := range r.writeCh { + if err := writer.Write(buf); err != nil { + return err + } + buf.Release() + } + return writer.Flush() + } + + errCh := make(chan error, 1) + go func() { + errCh <- toStreamWriter() + }() + + kvBuf := getBuf() + var lastKey []byte + for cbuf := range r.bufferCh { + cbuf.SortSlice(func(ls, rs []byte) bool { + lme := mapEntry(ls) + rme := mapEntry(rs) + return y.CompareKeys(lme.Key(), rme.Key()) < 0 + }) + + err := cbuf.SliceIterate(func(s []byte) error { + me := mapEntry(s) + key := me.Key() + + pk, err := x.Parse(key) + if err != nil { + return errors.Wrap(err, "writeToDB failed to parse key") + } + + // Don't need to pick multiple versions of the same key. + if y.SameKey(key, lastKey) { + return nil + } + + kv := &bpb.KV{} + b := me.Data() + // Override the version if requested. Should not be done for type and schema predicates, + // which always have their version set to 1. + if r.restoreTs > 0 && !pk.IsSchema() && !pk.IsType() { + if err := kv.Unmarshal(me.Data()); err != nil { + return errors.Wrap(err, "writeToDB failed to unmarshal KV") + } + kv.Version = r.restoreTs + b = make([]byte, kv.Size()) + if _, err := kv.MarshalToSizedBuffer(b); err != nil { + return errors.Wrap(err, "writeToDB failed to marshal KV") + } + } + + lastKey = append(lastKey[:0], key...) + kvBuf.WriteSlice(b) + return nil + }) + if err != nil { + return err + } + + r.writeCh <- kvBuf + // Reuse cbuf for the next kvBuf. + cbuf.Reset() + kvBuf = cbuf + } + close(r.writeCh) + kvBuf.Release() + return <-errCh +} + +// RunRestore creates required DBs and streams the backups to them. It is used only for testing. +func RunRestore(dir, location, backupId string, keyFile string, + ctype options.CompressionType, clevel int) LoadResult { + // Create the pdir if it doesn't exist. + if err := os.MkdirAll(dir, 0700); err != nil { + return LoadResult{Err: err} + } + + uri, err := url.Parse(location) + if err != nil { + return LoadResult{Err: err} + } + + h, err := NewUriHandler(uri, nil) + if err != nil { + return LoadResult{Err: errors.Errorf("Unsupported URI: %v", uri)} + } + manifest, err := GetLatestManifest(h, uri) + if err != nil { + return LoadResult{Err: errors.Wrapf(err, "cannot retrieve manifests")} + } + var key x.Sensitive + if len(keyFile) > 0 { + key, err = ioutil.ReadFile(keyFile) + if err != nil { + return LoadResult{Err: errors.Wrapf(err, "RunRestore failed to read enc-key")} + } + } + + for gid := range manifest.Groups { + req := &pb.RestoreRequest{ + Location: location, + GroupId: gid, + BackupId: backupId, + EncryptionKeyFile: keyFile, + } + if err := MapBackup(req); err != nil { + return LoadResult{Err: errors.Wrap(err, "RunRestore failed to map")} + } + pdir := filepath.Join(dir, fmt.Sprintf("p%d", gid)) + db, err := badger.OpenManaged(badger.DefaultOptions(pdir). + WithCompression(ctype). + WithZSTDCompressionLevel(clevel). + WithSyncWrites(false). + WithBlockCacheSize(100 * (1 << 20)). + WithIndexCacheSize(100 * (1 << 20)). + WithNumVersionsToKeep(math.MaxInt32). + WithEncryptionKey(key). + WithNamespaceOffset(x.NamespaceOffset)) + if err != nil { + return LoadResult{Err: errors.Wrap(err, "RunRestore failed to open DB")} + + } + defer db.Close() + if err := reduceToDB(db, 0); err != nil { + return LoadResult{Err: errors.Wrap(err, "RunRestore failed to reduce")} + } + if err := x.WriteGroupIdFile(pdir, uint32(gid)); err != nil { + return LoadResult{Err: errors.Wrap(err, "RunRestore failed to write group id file")} + } + } + // TODO: Fix this return value. + return LoadResult{Version: manifest.Since} } diff --git a/x/config.go b/x/config.go index d71dfe6e2b6..ad7257f6cc9 100644 --- a/x/config.go +++ b/x/config.go @@ -102,7 +102,7 @@ type WorkerOptions struct { // AclEnabled indicates whether the enterprise ACL feature is turned on. AclEnabled bool // HmacSecret stores the secret used to sign JSON Web Tokens (JWT). - HmacSecret SensitiveByteSlice + HmacSecret Sensitive // AbortOlderThan tells Dgraph to discard transactions that are older than this duration. AbortOlderThan time.Duration // ProposedGroupId will be used if there's a file in the p directory called group_id with the @@ -124,7 +124,7 @@ type WorkerOptions struct { // token string - if set, all Admin requests to Dgraph will have this token. Security *z.SuperFlag // EncryptionKey is the key used for encryption at rest, backups, exports. Enterprise only feature. - EncryptionKey SensitiveByteSlice + EncryptionKey Sensitive // LogRequest indicates whether alpha should log all query/mutation requests coming to it. // Ideally LogRequest should be a bool value. But we are reading it using atomics across // queries hence it has been kept as int32. LogRequest value 1 enables logging of requests diff --git a/x/logger.go b/x/logger.go index 054ecaad7c4..1e78b492a52 100644 --- a/x/logger.go +++ b/x/logger.go @@ -27,7 +27,7 @@ import ( type LoggerConf struct { Compress bool Output string - EncryptionKey SensitiveByteSlice + EncryptionKey Sensitive Size int64 Days int64 MessageKey string diff --git a/x/types.go b/x/types.go index 691c1da5b90..533365d6e57 100644 --- a/x/types.go +++ b/x/types.go @@ -21,11 +21,11 @@ type ExportedGQLSchema struct { Schema string } -// SensitiveByteSlice implements the Stringer interface to redact its contents. +// Sensitive implements the Stringer interface to redact its contents. // Use this type for sensitive info such as keys, passwords, or secrets so it doesn't leak // as output such as logs. -type SensitiveByteSlice []byte +type Sensitive []byte -func (SensitiveByteSlice) String() string { +func (Sensitive) String() string { return "****" } diff --git a/x/x_test.go b/x/x_test.go index 4a252ee41d2..e910f4f2db6 100644 --- a/x/x_test.go +++ b/x/x_test.go @@ -25,7 +25,7 @@ import ( ) func TestSensitiveByteSlice(t *testing.T) { - var v SensitiveByteSlice = SensitiveByteSlice("mysecretkey") + var v Sensitive = Sensitive("mysecretkey") s := fmt.Sprintf("%s,%v,%s,%+v", v, v, &v, &v) require.EqualValues(t, "****,****,****,****", s)