Skip to content

Commit

Permalink
cherry-pick PR #7753
Browse files Browse the repository at this point in the history
This commit is a major rewrite of online restore code. It used
to use KVLoader in badger. Now it instead uses StreamWriter
that is much faster for writes in the case of restore.

following commits are cherry-picked (in reverse order):
 * fix(backup): Free the UidPack after use (#7786)
 * fix(export-backup): Fix double free in export backup (#7780) (#7783)
 * fix(lsbackup): Fix profiler in lsBackup (#7729)
 * Bring back "perf(Backup): Improve backup performance (#7601)"
 * Opt(Backup): Make backups faster (#7680)
 * Fix s3 backup copy (#7669)
 * [BREAKING] Opt(Restore): Optimize Restore's new map-reduce based design (#7666)
 * Perf(restore): Implement map-reduce based restore (#7664)
 * feat(backup): Merge backup refactoring
 * Revert "perf(Backup): Improve backup performance (#7601)"
  • Loading branch information
ahsanbarkati authored and mangalaman93 committed Dec 29, 2022
1 parent 32d5c9f commit 543d4d9
Show file tree
Hide file tree
Showing 57 changed files with 3,156 additions and 3,163 deletions.
4 changes: 2 additions & 2 deletions chunker/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func slurpQuoted(r *bufio.Reader, out *bytes.Buffer) error {
// and decompressed automatically even without the gz extension. The key, if non-nil,
// is used to decrypt the file. The caller is responsible for calling the returned cleanup
// function when done with the reader.
func FileReader(file string, key x.SensitiveByteSlice) (*bufio.Reader, func()) {
func FileReader(file string, key x.Sensitive) (*bufio.Reader, func()) {
var f *os.File
var err error
if file == "-" {
Expand All @@ -367,7 +367,7 @@ func FileReader(file string, key x.SensitiveByteSlice) (*bufio.Reader, func()) {
}

// StreamReader returns a bufio given a ReadCloser. The file is passed just to check for .gz files
func StreamReader(file string, key x.SensitiveByteSlice, f io.ReadCloser) (
func StreamReader(file string, key x.Sensitive, f io.ReadCloser) (
rd *bufio.Reader, cleanup func()) {
cleanup = func() { _ = f.Close() }

Expand Down
4 changes: 3 additions & 1 deletion codec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,8 @@ func Decode(pack *pb.UidPack, seek uint64) []uint64 {

// DecodeToBuffer is the same as Decode but it returns a z.Buffer which is
// calloc'ed and can be SHOULD be freed up by calling buffer.Release().
func DecodeToBuffer(buf *z.Buffer, pack *pb.UidPack) {
func DecodeToBuffer(buf *z.Buffer, pack *pb.UidPack) *z.Buffer {

var last uint64
tmp := make([]byte, 16)
dec := Decoder{Pack: pack}
Expand All @@ -416,6 +417,7 @@ func DecodeToBuffer(buf *z.Buffer, pack *pb.UidPack) {
last = u
}
}
return buf
}

func match32MSB(num1, num2 uint64) bool {
Expand Down
1 change: 0 additions & 1 deletion codec/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ func TestBufferUidPack(t *testing.T) {
// Some edge case tests.
pack := Encode([]uint64{}, 128)
FreePack(pack)

buf := z.NewBuffer(10<<10, "TestBufferUidPack")
defer buf.Release()
DecodeToBuffer(buf, &pb.UidPack{})
Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/bulk/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type options struct {

// ........... Badger options ..........
// EncryptionKey is the key used for encryption. Enterprise only feature.
EncryptionKey x.SensitiveByteSlice
EncryptionKey x.Sensitive
// Badger options.
Badger badger.Options
}
Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/bulk/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,8 +481,8 @@ func (r *reducer) reduce(partitionKeys [][]byte, mapItrs []*mapIterator, ci *cou
partitionKeys = append(partitionKeys, nil)

for i := 0; i < len(partitionKeys); i++ {
pkey := partitionKeys[i]
for _, itr := range mapItrs {
pkey := partitionKeys[i]
itr.Next(cbuf, pkey)
}
if cbuf.LenNoPadding() < 256<<20 {
Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/debug/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type flagOptions struct {
readTs uint64
sizeHistogram bool
noKeys bool
key x.SensitiveByteSlice
key x.Sensitive

// Options related to the WAL.
wdir string
Expand Down
4 changes: 2 additions & 2 deletions dgraph/cmd/decrypt/decrypt.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ import (
)

type options struct {
// keyfile comes from the encryption or Vault flags
keyfile x.SensitiveByteSlice
// keyfile comes from the encryption_key_file or Vault flags
keyfile x.Sensitive
file string
output string
}
Expand Down
44 changes: 27 additions & 17 deletions dgraph/cmd/increment/increment.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,40 +199,50 @@ func run(conf *viper.Viper) {
dg = dgTmp
}

// Run things serially first.
for i := 0; i < conc; i++ {
_, err := process(dg, conf)
x.Check(err)
num--
addOne := func(i int) error {
txnStart := time.Now() // Start time of transaction
cnt, err := process(dg, conf)
now := time.Now().UTC().Format(format)
if err != nil {
return err
}
serverLat := cnt.qLatency + cnt.mLatency
clientLat := time.Since(txnStart).Round(time.Millisecond)
fmt.Printf(
"[w%d] %-17s Counter VAL: %d [ Ts: %d ] Latency: Q %s M %s S %s C %s D %s\n",
i, now, cnt.Val, cnt.startTs, cnt.qLatency, cnt.mLatency,
serverLat, clientLat, clientLat-serverLat)
return nil
}

// Run things serially first, if conc > 1.
if conc > 1 {
for i := 0; i < conc; i++ {
err := addOne(0)
x.Check(err)
num--
}
}

var wg sync.WaitGroup
f := func(i int) {
f := func(worker int) {
defer wg.Done()
count := 0
for count < num {
txnStart := time.Now() // Start time of transaction
cnt, err := process(dg, conf)
now := time.Now().UTC().Format(format)
if err != nil {
if err := addOne(worker); err != nil {
now := time.Now().UTC().Format(format)
fmt.Printf("%-17s While trying to process counter: %v. Retrying...\n", now, err)
time.Sleep(time.Second)
continue
}
serverLat := cnt.qLatency + cnt.mLatency
clientLat := time.Since(txnStart).Round(time.Millisecond)
fmt.Printf(
"[%d] %-17s Counter VAL: %d [ Ts: %d ] Latency: Q %s M %s S %s C %s D %s\n",
i, now, cnt.Val, cnt.startTs, cnt.qLatency, cnt.mLatency,
serverLat, clientLat, clientLat-serverLat)
time.Sleep(waitDur)
count++
}
}

for i := 0; i < conc; i++ {
wg.Add(1)
go f(i)
go f(i + 1)
}
wg.Wait()
}
6 changes: 3 additions & 3 deletions dgraph/cmd/live/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ type options struct {
ludicrousMode bool
upsertPredicate string
tmpDir string
key x.SensitiveByteSlice
key x.Sensitive
namespaceToLoad uint64
preserveNs bool
}
Expand Down Expand Up @@ -234,7 +234,7 @@ func validateSchema(sch string, namespaces map[uint64]struct{}) error {
}

// processSchemaFile process schema for a given gz file.
func (l *loader) processSchemaFile(ctx context.Context, file string, key x.SensitiveByteSlice,
func (l *loader) processSchemaFile(ctx context.Context, file string, key x.Sensitive,
dgraphClient *dgo.Dgraph) error {
fmt.Printf("\nProcessing schema file %q\n", file)
if len(opt.authToken) > 0 {
Expand Down Expand Up @@ -461,7 +461,7 @@ func (l *loader) allocateUids(nqs []*api.NQuad) {

// processFile forwards a file to the RDF or JSON processor as appropriate
func (l *loader) processFile(ctx context.Context, fs filestore.FileStore, filename string,
key x.SensitiveByteSlice) error {
key x.Sensitive) error {

fmt.Printf("Processing data file %q\n", filename)

Expand Down
1 change: 0 additions & 1 deletion dgraph/cmd/root_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
func init() {
// subcommands already has the default subcommands, we append to EE ones to that.
subcommands = append(subcommands,
&backup.Restore,
&backup.LsBackup,
&backup.ExportBackup,
&acl.CmdAcl,
Expand Down
2 changes: 1 addition & 1 deletion dgraph/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func main() {
for range ticker.C {
// Read Jemalloc stats first. Print if there's a big difference.
z.ReadMemStats(&js)
if diff := absDiff(uint64(z.NumAllocBytes()), lastAlloc); diff > 256<<20 {
if diff := absDiff(uint64(z.NumAllocBytes()), lastAlloc); diff > 1<<30 {
glog.V(2).Infof("NumAllocBytes: %s jemalloc: Active %s Allocated: %s"+
" Resident: %s Retained: %s\n",
humanize.IBytes(uint64(z.NumAllocBytes())),
Expand Down
4 changes: 2 additions & 2 deletions ee/acl/acl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2858,7 +2858,7 @@ func TestGuardianOnlyAccessForAdminEndpoints(t *testing.T) {
queryName: "listBackups",
respIsArray: true,
testGuardianAccess: true,
guardianErr: "The path \"\" does not exist or it is inaccessible.",
guardianErr: "The uri path: \"\" doesn't exist",
guardianData: `{"listBackups": []}`,
},
{
Expand Down Expand Up @@ -2939,7 +2939,7 @@ func TestGuardianOnlyAccessForAdminEndpoints(t *testing.T) {
}`,
queryName: "restore",
testGuardianAccess: true,
guardianErr: "The path \"\" does not exist or it is inaccessible.",
guardianErr: "The uri path: \"\" doesn't exist",
guardianData: `{"restore": {"code": "Failure"}}`,
},
{
Expand Down
Loading

0 comments on commit 543d4d9

Please sign in to comment.