Skip to content

Commit

Permalink
Truncate Raft logs even when no txn commits are happening
Browse files Browse the repository at this point in the history
If there are no txn commits but many txns which allocate new read timestamps, the Raft log can still grow a lot because of MaxAssigned updates. This PR would truncate those logs as well to create new snapshot.
  • Loading branch information
manishrjain committed Apr 30, 2019
1 parent bf4f3e0 commit 3be380b
Showing 1 changed file with 27 additions and 9 deletions.
36 changes: 27 additions & 9 deletions worker/draft.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,9 +546,13 @@ func (n *node) retrieveSnapshot(snap pb.Snapshot) error {

func (n *node) proposeSnapshot(discardN int) error {
snap, err := n.calculateSnapshot(discardN)
if err != nil || snap == nil {
if err != nil {
glog.Warningf("Got error while calculating snapshot: %v", err)
return err
}
if snap == nil {
return nil
}
proposal := &pb.Proposal{
Snapshot: snap,
}
Expand Down Expand Up @@ -959,7 +963,7 @@ func (n *node) abortOldTransactions() {
// At i7, min pending start ts = S3, therefore snapshotIdx = i5 - 1 = i4.
// At i7, max commit ts = C1, therefore readTs = C1.
func (n *node) calculateSnapshot(discardN int) (*pb.Snapshot, error) {
_, span := otrace.StartSpan(n.ctx, "Propose.Snapshot")
_, span := otrace.StartSpan(n.ctx, "Calculate.Snapshot")
defer span.End()

if atomic.LoadInt32(&n.streaming) > 0 {
Expand All @@ -974,6 +978,18 @@ func (n *node) calculateSnapshot(discardN int) (*pb.Snapshot, error) {
}
span.Annotatef(nil, "First index: %d", first)

rsnap, err := n.Store.Snapshot()
if err != nil {
return nil, err
}
var snap pb.Snapshot
if len(rsnap.Data) > 0 {
if err := snap.Unmarshal(rsnap.Data); err != nil {
return nil, err
}
}
span.Annotatef(nil, "Last snapshot: %+v", snap)

last := n.Applied.DoneUntil()
if int(last-first) < discardN {
span.Annotate(nil, "Skipping due to insufficient entries")
Expand All @@ -999,7 +1015,8 @@ func (n *node) calculateSnapshot(discardN int) (*pb.Snapshot, error) {
// snapshotIdx. In any case, we continue picking up txn updates, to generate
// a maxCommitTs, which would become the readTs for the snapshot.
minPendingStart := posting.Oracle().MinPendingStartTs()
var maxCommitTs, snapshotIdx, maxCommitIdx uint64
maxCommitTs := snap.ReadTs
var snapshotIdx uint64
for _, entry := range entries {
if entry.Type != raftpb.EntryNormal {
continue
Expand All @@ -1019,7 +1036,6 @@ func (n *node) calculateSnapshot(discardN int) (*pb.Snapshot, error) {
for _, txn := range proposal.Delta.GetTxns() {
maxCommitTs = x.Max(maxCommitTs, txn.CommitTs)
}
maxCommitIdx = entry.Index
}
}
if maxCommitTs == 0 {
Expand All @@ -1029,8 +1045,10 @@ func (n *node) calculateSnapshot(discardN int) (*pb.Snapshot, error) {
if snapshotIdx <= 0 {
// It is possible that there are no pending transactions. In that case,
// snapshotIdx would be zero.
span.Annotatef(nil, "Using maxCommitIdx as snapshotIdx: %d", maxCommitIdx)
snapshotIdx = maxCommitIdx
if len(entries) > 0 {
snapshotIdx = entries[len(entries)-1].Index
}
span.Annotatef(nil, "snapshotIdx is zero. Using last entry's index: %d", snapshotIdx)
}

numDiscarding := snapshotIdx - first + 1
Expand All @@ -1045,13 +1063,13 @@ func (n *node) calculateSnapshot(discardN int) (*pb.Snapshot, error) {
return nil, nil
}

snap := &pb.Snapshot{
result := &pb.Snapshot{
Context: n.RaftContext,
Index: snapshotIdx,
ReadTs: maxCommitTs,
}
span.Annotatef(nil, "Got snapshot: %+v", snap)
return snap, nil
span.Annotatef(nil, "Got snapshot: %+v", result)
return result, nil
}

func (n *node) joinPeers() error {
Expand Down

0 comments on commit 3be380b

Please sign in to comment.