Skip to content

Commit

Permalink
Take snapshots less frequently (dgraph-io#3367)
Browse files Browse the repository at this point in the history
Taking snapshots frequently causes a straggler follower to have a hard time getting a new snapshot streamed. If the latter takes more time than the former, then the straggler would never catch up.

So, instead of taking snapshots frequently, this PR adds a mechanism to track the progress of Raft in the p directory. This way, a restarted Alpha does not need to replay all the Raft logs, only the ones which it hasn't applied yet.

This PR adds two new flags:
1. The duration after which we'd abort a txn.
2. The number of Raft logs after which a snapshot would be taken.

This PR also removes a strange lastCommitTs logic, which was only spitting out an log error, without doing anything.

Changes:
* Keep track of Raft progress in p directory, so it can skip over already applied dataset. Allow user to specify how often to take snapshots.
* Improve how we set raft progress.
* Make x.Parse parse the new Raft key
* Handle key not found error.
* Fix up the debug wal to spit out the last entry as well.
  • Loading branch information
manishrjain authored and dna2github committed Jul 19, 2019
1 parent f14a11f commit cacc7fc
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 26 deletions.
15 changes: 15 additions & 0 deletions dgraph/cmd/alpha/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,15 @@ they form a Raft group and provide synchronous replication.
"[mmap, disk] Specifies how Badger Value log is stored."+
" mmap consumes more RAM, but provides better performance.")

// Snapshot and Transactions.
flag.Int("snapshot_after", 10000,
"Create a new Raft snapshot after this many number of Raft entries. The"+
" lower this number, the more frequent snapshot creation would be."+
" Also determines how often Rollups would happen.")
flag.String("abort_older_than", "5m",
"Abort any pending transactions older than this duration. The liveness of a"+
" transaction is determined by its last mutation.")

// OpenCensus flags.
flag.Float64("trace", 1.0, "The ratio of queries to trace.")
flag.String("jaeger.collector", "", "Send opencensus traces to Jaeger.")
Expand Down Expand Up @@ -462,6 +471,10 @@ func run() {

ips, err := getIPsFromString(Alpha.Conf.GetString("whitelist"))
x.Check(err)

abortDur, err := time.ParseDuration(Alpha.Conf.GetString("abort_older_than"))
x.Check(err)

x.WorkerConfig = x.WorkerOptions{
ExportPath: Alpha.Conf.GetString("export"),
NumPendingProposals: Alpha.Conf.GetInt("pending_proposals"),
Expand All @@ -474,6 +487,8 @@ func run() {
MaxRetries: Alpha.Conf.GetInt("max_retries"),
StrictMutations: opts.MutationsMode == edgraph.StrictMutations,
AclEnabled: secretFile != "",
SnapshotAfter: Alpha.Conf.GetInt("snapshot_after"),
AbortOlderThan: abortDur,
}

setupCustomTokenizers()
Expand Down
3 changes: 3 additions & 0 deletions dgraph/cmd/debug/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,9 @@ func printKeys(db *badger.DB) {

// Don't use a switch case here. Because multiple of these can be true. In particular,
// IsSchema can be true alongside IsData.
if pk.IsRaft() {
buf.WriteString("{r}")
}
if pk.IsData() {
buf.WriteString("{d}")
}
Expand Down
2 changes: 1 addition & 1 deletion dgraph/cmd/debug/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func printRaft(db *badger.DB, store *raftwal.DiskStorage) {

pending := make(map[uint64]bool)
for startIdx < lastIdx-1 {
entries, err := store.Entries(startIdx, lastIdx, 64<<20 /* 64 MB Max Size */)
entries, err := store.Entries(startIdx, lastIdx+1, 64<<20 /* 64 MB Max Size */)
if err != nil {
fmt.Printf("Got error while retrieving entries: %v\n", err)
return
Expand Down
110 changes: 86 additions & 24 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ type node struct {
gid uint32
closer *y.Closer

lastCommitTs uint64 // Only used to ensure that our commit Ts is monotonically increasing.

streaming int32 // Used to avoid calculating snapshot

canCampaign bool
Expand Down Expand Up @@ -540,12 +538,7 @@ func (n *node) commitOrAbort(pkey string, delta *pb.OracleDelta) error {
}

for _, status := range delta.Txns {
if status.CommitTs > 0 && status.CommitTs < n.lastCommitTs {
glog.Errorf("Lastcommit %d > current %d. This would cause some commits to be lost.",
n.lastCommitTs, status.CommitTs)
}
toDisk(status.StartTs, status.CommitTs)
n.lastCommitTs = status.CommitTs
}
if err := writer.Flush(); err != nil {
return x.Errorf("Error while flushing to disk: %v", err)
Expand Down Expand Up @@ -675,6 +668,66 @@ func (n *node) rampMeter() {
time.Sleep(3 * time.Millisecond)
}
}

func (n *node) findRaftProgress() (uint64, error) {
var applied uint64
err := pstore.View(func(txn *badger.Txn) error {
item, err := txn.Get(x.RaftKey())
if err == badger.ErrKeyNotFound {
return nil
}
if err != nil {
return err
}
return item.Value(func(val []byte) error {
var snap pb.Snapshot
if err := snap.Unmarshal(val); err != nil {
return err
}
applied = snap.Index
return nil
})
})
return applied, err
}

func (n *node) updateRaftProgress() error {
// Both leader and followers can independently update their Raft progress. We don't store
// this in Raft WAL. Instead, this is used to just skip over log records that this Alpha
// has already applied, to speed up things on a restart.
snap, err := n.calculateSnapshot(10) // 10 is a randomly chosen small number.
if err != nil {
return err
}
if snap == nil {
return nil
}

// Let's check what we already have. And only update if the new snap.Index is ahead of the last
// stored applied.
applied, err := n.findRaftProgress()
if err != nil {
return err
}
if snap.Index <= applied {
return nil
}

data, err := snap.Marshal()
x.Check(err)
txn := pstore.NewTransactionAt(math.MaxUint64, true)
defer txn.Discard()

if err := txn.Set(x.RaftKey(), data); err != nil {
return err
}
if err := txn.CommitAt(1, nil); err != nil {
return err
}
glog.V(1).Infof("[%#x] Set Raft progress to index: %d.", n.Id, snap.Index)
return nil
}

func (n *node) Run() {
defer n.closer.Done() // CLOSER:1

Expand All @@ -699,7 +752,13 @@ func (n *node) Run() {
close(done)
}()

var snapshotLoops uint64
applied, err := n.findRaftProgress()
if err != nil {
glog.Errorf("While trying to find raft progress: %v", err)
} else {
glog.Infof("Found Raft progress in p directory: %d", applied)
}

for {
select {
case <-done:
Expand All @@ -712,23 +771,23 @@ func (n *node) Run() {

case <-slowTicker.C:
n.elog.Printf("Size of applyCh: %d", len(n.applyCh))
if err := n.updateRaftProgress(); err != nil {
glog.Errorf("While updating Raft progress: %v", err)
}

if leader {
// We try to take a snapshot every slow tick duration, with a 1000 discard entries.
// But, once a while, we take a snapshot with 10 discard entries. This avoids the
// scenario where after bringing up an Alpha, and doing a hundred schema updates, we
// don't take any snapshots because there are not enough updates (discardN=10),
// which then really slows down restarts. At the same time, by checking more
// frequently, we can quickly take a snapshot if a lot of mutations are coming in
// fast (discardN=1000).
discardN := 1000
if snapshotLoops%5 == 0 {
discardN = 10
}
snapshotLoops++
// We keep track of the applied index in the p directory. Even if we don't take
// snapshot for a while and let the Raft logs grow and restart, we would not have to
// run all the log entries, because we can tell Raft.Config to set Applied to that
// index.
// This applied index tracking also covers the case when we have a big index
// rebuild. The rebuild would be tracked just like others and would not need to be
// replayed after a restart, because the Applied config would let us skip right
// through it.
// We use disk based storage for Raft. So, we're not too concerned about
// snapshotting. We just need to do enough, so that we don't have a huge backlog of
// entries to process on a restart.
if err := n.proposeSnapshot(discardN); err != nil {
if err := n.proposeSnapshot(x.WorkerConfig.SnapshotAfter); err != nil {
x.Errorf("While calculating and proposing snapshot: %v", err)
}
go n.abortOldTransactions()
Expand Down Expand Up @@ -845,6 +904,10 @@ func (n *node) Run() {
n.elog.Printf("Found empty data at index: %d", entry.Index)
n.Applied.Done(entry.Index)

} else if entry.Index < applied {
n.elog.Printf("Skipping over already applied entry: %d", entry.Index)
n.Applied.Done(entry.Index)

} else {
proposal := &pb.Proposal{}
if err := proposal.Unmarshal(entry.Data); err != nil {
Expand Down Expand Up @@ -1044,7 +1107,6 @@ func (n *node) blockingAbort(req *pb.TxnTimestamps) error {

// Let's propose the txn updates received from Zero. This is important because there are edge
// cases where a txn status might have been missed by the group.
glog.Infof("TryAbort returned with delta: %+v\n", delta)
aborted := &pb.OracleDelta{}
for _, txn := range delta.Txns {
// Only pick the aborts. DO NOT propose the commits. They must come in the right order via
Expand Down Expand Up @@ -1073,14 +1135,14 @@ func (n *node) blockingAbort(req *pb.TxnTimestamps) error {
// abort. Note that only the leader runs this function.
func (n *node) abortOldTransactions() {
// Aborts if not already committed.
starts := posting.Oracle().TxnOlderThan(5 * time.Minute)
starts := posting.Oracle().TxnOlderThan(x.WorkerConfig.AbortOlderThan)
if len(starts) == 0 {
return
}
glog.Infof("Found %d old transactions. Acting to abort them.\n", len(starts))
req := &pb.TxnTimestamps{Ts: starts}
err := n.blockingAbort(req)
glog.Infof("abortOldTransactions for %d txns. Error: %+v\n", len(req.Ts), err)
glog.Infof("Done abortOldTransactions for %d txns. Error: %+v\n", len(req.Ts), err)
}

// calculateSnapshot would calculate a snapshot index, considering these factors:
Expand Down
7 changes: 6 additions & 1 deletion x/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

package x

import "net"
import (
"net"
"time"
)

type Options struct {
DebugMode bool
Expand All @@ -43,6 +46,8 @@ type WorkerOptions struct {
MaxRetries int
StrictMutations bool
AclEnabled bool
AbortOlderThan time.Duration
SnapshotAfter int
}

var WorkerConfig WorkerOptions
16 changes: 16 additions & 0 deletions x/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
DefaultPrefix = byte(0x00)
byteSchema = byte(0x01)
byteType = byte(0x02)
byteRaft = byte(0xff)
)

func writeAttr(buf []byte, attr string) []byte {
Expand All @@ -48,6 +49,13 @@ func writeAttr(buf []byte, attr string) []byte {
return rest[len(attr):]
}

func RaftKey() []byte {
buf := make([]byte, 5)
buf[0] = byteRaft
AssertTrue(4 == copy(buf[1:5], []byte("raft")))
return buf
}

// SchemaKey returns schema key for given attribute. Schema keys are stored
// separately with unique prefix, since we need to iterate over all schema keys.
func SchemaKey(attr string) []byte {
Expand Down Expand Up @@ -136,6 +144,10 @@ type ParsedKey struct {
bytePrefix byte
}

func (p ParsedKey) IsRaft() bool {
return p.bytePrefix == byteRaft
}

func (p ParsedKey) IsData() bool {
return p.bytePrefix == DefaultPrefix && p.byteType == ByteData
}
Expand Down Expand Up @@ -295,6 +307,10 @@ func Parse(key []byte) *ParsedKey {
p := &ParsedKey{}

p.bytePrefix = key[0]
if p.bytePrefix == byteRaft {
return p
}

sz := int(binary.BigEndian.Uint16(key[1:3]))
k := key[3:]

Expand Down

0 comments on commit cacc7fc

Please sign in to comment.