Skip to content

Commit

Permalink
cherry-pick PR #7753
Browse files Browse the repository at this point in the history
This commit is a major rewrite of online restore code. It used
to use KVLoader in badger. Now it instead uses StreamWriter
that is much faster for writes in the case of restore.

following commits are cherry-picked (in reverse order):
 * fix(backup): Free the UidPack after use (#7786)
 * fix(export-backup): Fix double free in export backup (#7780) (#7783)
 * fix(lsbackup): Fix profiler in lsBackup (#7729)
 * Bring back "perf(Backup): Improve backup performance (#7601)"
 * Opt(Backup): Make backups faster (#7680)
 * Fix s3 backup copy (#7669)
 * [BREAKING] Opt(Restore): Optimize Restore's new map-reduce based design (#7666)
 * Perf(restore): Implement map-reduce based restore (#7664)
 * feat(backup): Merge backup refactoring
 * Revert "perf(Backup): Improve backup performance (#7601)"
 * fix(ee): GetKeys should return an error (#7713) (#7797)
 * fix(restore): Bump uid and namespace after restore (#7790) (#7800)
 * fix: fixing graphql schema update when the data is restored + skipping /probe/graphql from audit (#7925)
 * Don't ban namespace in export_backup
  • Loading branch information
ahsanbarkati authored and mangalaman93 committed Jan 2, 2023
1 parent 501325c commit 65d6273
Show file tree
Hide file tree
Showing 48 changed files with 3,714 additions and 3,486 deletions.
4 changes: 3 additions & 1 deletion codec/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,8 @@ func Decode(pack *pb.UidPack, seek uint64) []uint64 {

// DecodeToBuffer is the same as Decode but it returns a z.Buffer which is
// calloc'ed and can be SHOULD be freed up by calling buffer.Release().
func DecodeToBuffer(buf *z.Buffer, pack *pb.UidPack) {
func DecodeToBuffer(buf *z.Buffer, pack *pb.UidPack) *z.Buffer {

var last uint64
tmp := make([]byte, 16)
dec := Decoder{Pack: pack}
Expand All @@ -416,6 +417,7 @@ func DecodeToBuffer(buf *z.Buffer, pack *pb.UidPack) {
last = u
}
}
return buf
}

func match32MSB(num1, num2 uint64) bool {
Expand Down
1 change: 0 additions & 1 deletion codec/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ func TestBufferUidPack(t *testing.T) {
// Some edge case tests.
pack := Encode([]uint64{}, 128)
FreePack(pack)

buf := z.NewBuffer(10<<10, "TestBufferUidPack")
defer buf.Release()
DecodeToBuffer(buf, &pb.UidPack{})
Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/bulk/reduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,8 +481,8 @@ func (r *reducer) reduce(partitionKeys [][]byte, mapItrs []*mapIterator, ci *cou
partitionKeys = append(partitionKeys, nil)

for i := 0; i < len(partitionKeys); i++ {
pkey := partitionKeys[i]
for _, itr := range mapItrs {
pkey := partitionKeys[i]
itr.Next(cbuf, pkey)
}
if cbuf.LenNoPadding() < 256<<20 {
Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/decrypt/decrypt.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
)

type options struct {
// keyfile comes from the encryption or Vault flags
// keyfile comes from the encryption_key_file or Vault flags
keyfile x.Sensitive
file string
output string
Expand Down
44 changes: 27 additions & 17 deletions dgraph/cmd/increment/increment.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,40 +199,50 @@ func run(conf *viper.Viper) {
dg = dgTmp
}

// Run things serially first.
for i := 0; i < conc; i++ {
_, err := process(dg, conf)
x.Check(err)
num--
addOne := func(i int) error {
txnStart := time.Now() // Start time of transaction
cnt, err := process(dg, conf)
now := time.Now().UTC().Format(format)
if err != nil {
return err
}
serverLat := cnt.qLatency + cnt.mLatency
clientLat := time.Since(txnStart).Round(time.Millisecond)
fmt.Printf(
"[w%d] %-17s Counter VAL: %d [ Ts: %d ] Latency: Q %s M %s S %s C %s D %s\n",
i, now, cnt.Val, cnt.startTs, cnt.qLatency, cnt.mLatency,
serverLat, clientLat, clientLat-serverLat)
return nil
}

// Run things serially first, if conc > 1.
if conc > 1 {
for i := 0; i < conc; i++ {
err := addOne(0)
x.Check(err)
num--
}
}

var wg sync.WaitGroup
f := func(i int) {
f := func(worker int) {
defer wg.Done()
count := 0
for count < num {
txnStart := time.Now() // Start time of transaction
cnt, err := process(dg, conf)
now := time.Now().UTC().Format(format)
if err != nil {
if err := addOne(worker); err != nil {
now := time.Now().UTC().Format(format)
fmt.Printf("%-17s While trying to process counter: %v. Retrying...\n", now, err)
time.Sleep(time.Second)
continue
}
serverLat := cnt.qLatency + cnt.mLatency
clientLat := time.Since(txnStart).Round(time.Millisecond)
fmt.Printf(
"[%d] %-17s Counter VAL: %d [ Ts: %d ] Latency: Q %s M %s S %s C %s D %s\n",
i, now, cnt.Val, cnt.startTs, cnt.qLatency, cnt.mLatency,
serverLat, clientLat, clientLat-serverLat)
time.Sleep(waitDur)
count++
}
}

for i := 0; i < conc; i++ {
wg.Add(1)
go f(i)
go f(i + 1)
}
wg.Wait()
}
1 change: 0 additions & 1 deletion dgraph/cmd/root_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
func init() {
// subcommands already has the default subcommands, we append to EE ones to that.
subcommands = append(subcommands,
&backup.Restore,
&backup.LsBackup,
&backup.ExportBackup,
&acl.CmdAcl,
Expand Down
23 changes: 22 additions & 1 deletion dgraph/cmd/zero/assign.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,9 @@ func (s *Server) lease(ctx context.Context, num *pb.Num) (*pb.AssignedIds, error
}

// AssignIds is used to assign new ids (UIDs, NsIDs) by communicating with the leader of the
// RAFT group responsible for handing out ids.
// RAFT group responsible for handing out ids. If bump is set to true in the request then the
// lease for the given id type is bumped to num.Val and {startId, endId} of the newly leased ids
// in the process of bump is returned.
func (s *Server) AssignIds(ctx context.Context, num *pb.Num) (*pb.AssignedIds, error) {
if ctx.Err() != nil {
return &emptyAssignedIds, ctx.Err()
Expand Down Expand Up @@ -246,6 +248,25 @@ func (s *Server) AssignIds(ctx context.Context, num *pb.Num) (*pb.AssignedIds, e
return err
}

// If this is a bump request and the current node is the leader then we create a normal lease
// request based on the number of required ids to reach the asked bump value. If the current
// node is not the leader then the bump request will be forwarded to the leader by lease().
if num.GetBump() && s.Node.AmLeader() {
s.leaseLock.Lock()
cur := s.nextLease[num.GetType()] - 1
s.leaseLock.Unlock()

// We need to lease more UIDs if bump request is more than current max lease.
req := num.GetVal()
if cur >= req {
return &emptyAssignedIds, errors.Errorf("Nothing to be leased")
}
num.Val = req - cur

// Set bump to false because we want to lease the required ids in the following request.
num.Bump = false
}

c := make(chan error, 1)
go func() {
c <- lease()
Expand Down
39 changes: 39 additions & 0 deletions dgraph/cmd/zero/zero_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/dgraph-io/dgraph/protos/pb"
"github.com/dgraph-io/dgraph/testutil"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
)

func TestRemoveNode(t *testing.T) {
Expand All @@ -44,3 +45,41 @@ func TestIdLeaseOverflow(t *testing.T) {
require.Error(t, err)
require.Contains(t, err.Error(), "limit has reached")
}

func TestIdBump(t *testing.T) {
dialOpts := []grpc.DialOption{
grpc.WithBlock(),
grpc.WithInsecure(),
}
ctx := context.Background()
con, err := grpc.DialContext(ctx, testutil.SockAddrZero, dialOpts...)
require.NoError(t, err)

zc := pb.NewZeroClient(con)

res, err := zc.AssignIds(ctx, &pb.Num{Val: 10, Type: pb.Num_UID})
require.NoError(t, err)
require.Equal(t, uint64(10), res.GetEndId()-res.GetStartId()+1)

// Next assignemnt's startId should be greater than 10.
res, err = zc.AssignIds(ctx, &pb.Num{Val: 50, Type: pb.Num_UID})
require.NoError(t, err)
require.Greater(t, res.GetStartId(), uint64(10))
require.Equal(t, uint64(50), res.GetEndId()-res.GetStartId()+1)

bumpTo := res.GetEndId() + 100000

// Bump the lease to (last result + 100000).
res, err = zc.AssignIds(ctx, &pb.Num{Val: bumpTo, Type: pb.Num_UID, Bump: true})
require.NoError(t, err)

// Next assignemnt's startId should be greater than bumpTo.
res, err = zc.AssignIds(ctx, &pb.Num{Val: 10, Type: pb.Num_UID})
require.NoError(t, err)
require.Greater(t, res.GetStartId(), bumpTo)
require.Equal(t, uint64(10), res.GetEndId()-res.GetStartId()+1)

// If bump request is less than maxLease, then it should result in no-op.
res, err = zc.AssignIds(ctx, &pb.Num{Val: 10, Type: pb.Num_UID, Bump: true})
require.Contains(t, err.Error(), "Nothing to be leased")
}
2 changes: 1 addition & 1 deletion dgraph/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func main() {
for range ticker.C {
// Read Jemalloc stats first. Print if there's a big difference.
z.ReadMemStats(&js)
if diff := absDiff(uint64(z.NumAllocBytes()), lastAlloc); diff > 256<<20 {
if diff := absDiff(uint64(z.NumAllocBytes()), lastAlloc); diff > 1<<30 {
glog.V(2).Infof("NumAllocBytes: %s jemalloc: Active %s Allocated: %s"+
" Resident: %s Retained: %s\n",
humanize.IBytes(uint64(z.NumAllocBytes())),
Expand Down
4 changes: 2 additions & 2 deletions ee/acl/acl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2858,7 +2858,7 @@ func TestGuardianOnlyAccessForAdminEndpoints(t *testing.T) {
queryName: "listBackups",
respIsArray: true,
testGuardianAccess: true,
guardianErr: "The path \"\" does not exist or it is inaccessible.",
guardianErr: "The uri path: \"\" doesn't exist",
guardianData: `{"listBackups": []}`,
},
{
Expand Down Expand Up @@ -2939,7 +2939,7 @@ func TestGuardianOnlyAccessForAdminEndpoints(t *testing.T) {
}`,
queryName: "restore",
testGuardianAccess: true,
guardianErr: "The path \"\" does not exist or it is inaccessible.",
guardianErr: "The uri path: \"\" doesn't exist",
guardianData: `{"restore": {"code": "Failure"}}`,
},
{
Expand Down
5 changes: 3 additions & 2 deletions ee/audit/interceptor_ee.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ var skipApis = map[string]bool{

var skipEPs = map[string]bool{
// list of endpoints that needs to be skipped
"/health": true,
"/state": true,
"/health": true,
"/state": true,
"/probe/graphql": true,
}

func AuditRequestGRPC(ctx context.Context, req interface{},
Expand Down
Loading

0 comments on commit 65d6273

Please sign in to comment.