Skip to content

Commit

Permalink
[DNM] storage/result: add UpdatedIntents and ResolvedIntents to Local…
Browse files Browse the repository at this point in the history
…Result

This allows the handling of added, updated, and resolved intents to
become more reactive. It mirrors our handling of UpdatedTxns and their
interaction with the TxnWaitQueue.
  • Loading branch information
nvanbenschoten committed Feb 14, 2020
1 parent eab46a4 commit edf88b4
Show file tree
Hide file tree
Showing 12 changed files with 119 additions and 20 deletions.
10 changes: 8 additions & 2 deletions pkg/storage/batcheval/cmd_conditional_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,14 @@ func ConditionalPut(
}
}
handleMissing := engine.CPutMissingBehavior(args.AllowIfDoesNotExist)
var err error
if args.Blind {
return result.Result{}, engine.MVCCBlindConditionalPut(ctx, readWriter, cArgs.Stats, args.Key, h.Timestamp, args.Value, args.ExpValue, handleMissing, h.Txn)
err = engine.MVCCBlindConditionalPut(ctx, readWriter, cArgs.Stats, args.Key, h.Timestamp, args.Value, args.ExpValue, handleMissing, h.Txn)
} else {
err = engine.MVCCConditionalPut(ctx, readWriter, cArgs.Stats, args.Key, h.Timestamp, args.Value, args.ExpValue, handleMissing, h.Txn)
}
return result.Result{}, engine.MVCCConditionalPut(ctx, readWriter, cArgs.Stats, args.Key, h.Timestamp, args.Value, args.ExpValue, handleMissing, h.Txn)
if err != nil {
return result.Result{}, err
}
return result.FromUpdatedIntent(h.Txn, args.Key), nil
}
6 changes: 5 additions & 1 deletion pkg/storage/batcheval/cmd_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,9 @@ func Delete(
args := cArgs.Args.(*roachpb.DeleteRequest)
h := cArgs.Header

return result.Result{}, engine.MVCCDelete(ctx, readWriter, cArgs.Stats, args.Key, h.Timestamp, h.Txn)
err := engine.MVCCDelete(ctx, readWriter, cArgs.Stats, args.Key, h.Timestamp, h.Txn)
if err != nil {
return result.Result{}, err
}
return result.FromUpdatedIntent(h.Txn, args.Key), nil
}
11 changes: 8 additions & 3 deletions pkg/storage/batcheval/cmd_delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,16 +51,21 @@ func DeleteRange(
if !args.Inline {
timestamp = h.Timestamp
}
returnKeys := args.ReturnKeys || h.Txn != nil
deleted, resumeSpan, num, err := engine.MVCCDeleteRange(
ctx, readWriter, cArgs.Stats, args.Key, args.EndKey, cArgs.MaxKeys, timestamp, h.Txn, args.ReturnKeys,
ctx, readWriter, cArgs.Stats, args.Key, args.EndKey, cArgs.MaxKeys, timestamp, h.Txn, returnKeys,
)
if err == nil {
if err != nil {
return result.Result{}, err
}

if args.ReturnKeys {
reply.Keys = deleted
}
reply.NumKeys = num
if resumeSpan != nil {
reply.ResumeSpan = resumeSpan
reply.ResumeReason = roachpb.RESUME_KEY_LIMIT
}
return result.Result{}, err
return result.FromUpdatedIntents(h.Txn, deleted), nil
}
20 changes: 13 additions & 7 deletions pkg/storage/batcheval/cmd_end_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func EndTxn(
// Do not return TransactionAbortedError since the client anyway
// wanted to abort the transaction.
desc := cArgs.EvalCtx.Desc()
externalIntents, err := resolveLocalIntents(ctx, desc, readWriter, ms, args, reply.Txn, cArgs.EvalCtx)
externalIntents, resolvedIntents, err := resolveLocalIntents(ctx, desc, readWriter, ms, args, reply.Txn, cArgs.EvalCtx)
if err != nil {
return result.Result{}, err
}
Expand All @@ -237,7 +237,9 @@ func EndTxn(
}
// Use alwaysReturn==true because the transaction is definitely
// aborted, no matter what happens to this command.
return result.FromEndTxn(reply.Txn, true /* alwaysReturn */, args.Poison), nil
res := result.FromEndTxn(reply.Txn, true /* alwaysReturn */, args.Poison)
res.Local.ResolvedIntents = resolvedIntents
return res, nil
}
// If the transaction was previously aborted by a concurrent writer's
// push, any intents written are still open. It's only now that we know
Expand Down Expand Up @@ -332,7 +334,7 @@ func EndTxn(
// This avoids the need for the intentResolver to have to return to this range
// to resolve intents for this transaction in the future.
desc := cArgs.EvalCtx.Desc()
externalIntents, err := resolveLocalIntents(ctx, desc, readWriter, ms, args, reply.Txn, cArgs.EvalCtx)
externalIntents, resolvedIntents, err := resolveLocalIntents(ctx, desc, readWriter, ms, args, reply.Txn, cArgs.EvalCtx)
if err != nil {
return result.Result{}, err
}
Expand Down Expand Up @@ -376,6 +378,7 @@ func EndTxn(
// if the commit actually happens; otherwise, we risk losing writes.
intentsResult := result.FromEndTxn(reply.Txn, false /* alwaysReturn */, args.Poison)
intentsResult.Local.UpdatedTxns = []*roachpb.Transaction{reply.Txn}
intentsResult.Local.ResolvedIntents = resolvedIntents
if err := pd.MergeAndDestroy(intentsResult); err != nil {
return result.Result{}, err
}
Expand Down Expand Up @@ -454,7 +457,7 @@ func resolveLocalIntents(
args *roachpb.EndTxnRequest,
txn *roachpb.Transaction,
evalCtx EvalContext,
) ([]roachpb.Span, error) {
) ([]roachpb.Span, []roachpb.Intent, error) {
if mergeTrigger := args.InternalCommitTrigger.GetMergeTrigger(); mergeTrigger != nil {
// If this is a merge, then use the post-merge descriptor to determine
// which intents are local (note that for a split, we want to use the
Expand All @@ -469,6 +472,7 @@ func resolveLocalIntents(
defer iterAndBuf.Cleanup()

var externalIntents []roachpb.Span
var resolvedIntents []roachpb.Intent
var resolveAllowance int64 = intentResolutionBatchSize
if args.InternalCommitTrigger != nil {
// If this is a system transaction (such as a split or merge), don't enforce the resolve allowance.
Expand All @@ -493,6 +497,7 @@ func resolveLocalIntents(
ok, err := engine.MVCCResolveWriteIntentUsingIter(ctx, readWriter, iterAndBuf, resolveMS, intent)
if ok {
resolveAllowance--
resolvedIntents = append(resolvedIntents, intent)
}
return err
}
Expand All @@ -517,21 +522,22 @@ func resolveLocalIntents(
}
externalIntents = append(externalIntents, *resumeSpan)
}
resolvedIntents = append(resolvedIntents, intent) // TODO(nvanbenschoten): resume span
return nil
}
return nil
}(); err != nil {
return nil, errors.Wrapf(err, "resolving intent at %s on end transaction [%s]", span, txn.Status)
return nil, nil, errors.Wrapf(err, "resolving intent at %s on end transaction [%s]", span, txn.Status)
}
}

removedAny := resolveAllowance != intentResolutionBatchSize
if WriteAbortSpanOnResolve(txn.Status, args.Poison, removedAny) {
if err := UpdateAbortSpan(ctx, evalCtx, readWriter, ms, txn.TxnMeta, args.Poison); err != nil {
return nil, err
return nil, nil, err
}
}
return externalIntents, nil
return externalIntents, resolvedIntents, nil
}

// updateStagingTxn persists the STAGING transaction record with updated status
Expand Down
5 changes: 4 additions & 1 deletion pkg/storage/batcheval/cmd_increment.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ func Increment(
reply := resp.(*roachpb.IncrementResponse)

newVal, err := engine.MVCCIncrement(ctx, readWriter, cArgs.Stats, args.Key, h.Timestamp, h.Txn, args.Increment)
if err != nil {
return result.Result{}, err
}
reply.NewValue = newVal
return result.Result{}, err
return result.FromUpdatedIntent(h.Txn, args.Key), nil
}
10 changes: 8 additions & 2 deletions pkg/storage/batcheval/cmd_init_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,14 @@ func InitPut(
defer readWriter.Close()
}
}
var err error
if args.Blind {
return result.Result{}, engine.MVCCBlindInitPut(ctx, readWriter, cArgs.Stats, args.Key, h.Timestamp, args.Value, args.FailOnTombstones, h.Txn)
err = engine.MVCCBlindInitPut(ctx, readWriter, cArgs.Stats, args.Key, h.Timestamp, args.Value, args.FailOnTombstones, h.Txn)
} else {
err = engine.MVCCInitPut(ctx, readWriter, cArgs.Stats, args.Key, h.Timestamp, args.Value, args.FailOnTombstones, h.Txn)
}
return result.Result{}, engine.MVCCInitPut(ctx, readWriter, cArgs.Stats, args.Key, h.Timestamp, args.Value, args.FailOnTombstones, h.Txn)
if err != nil {
return result.Result{}, err
}
return result.FromUpdatedIntent(h.Txn, args.Key), nil
}
10 changes: 8 additions & 2 deletions pkg/storage/batcheval/cmd_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,14 @@ func Put(
defer readWriter.Close()
}
}
var err error
if args.Blind {
return result.Result{}, engine.MVCCBlindPut(ctx, readWriter, ms, args.Key, ts, args.Value, h.Txn)
err = engine.MVCCBlindPut(ctx, readWriter, ms, args.Key, ts, args.Value, h.Txn)
} else {
err = engine.MVCCPut(ctx, readWriter, ms, args.Key, ts, args.Value, h.Txn)
}
if err != nil {
return result.Result{}, err
}
return result.Result{}, engine.MVCCPut(ctx, readWriter, ms, args.Key, ts, args.Value, h.Txn)
return result.FromUpdatedIntent(h.Txn, args.Key), nil
}
1 change: 1 addition & 0 deletions pkg/storage/batcheval/cmd_resolve_intent.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func ResolveIntent(
}

var res result.Result
res.Local.ResolvedIntents = []roachpb.Intent{intent}
res.Local.Metrics = resolveToMetricType(args.Status, args.Poison)

if WriteAbortSpanOnResolve(args.Status, args.Poison, ok) {
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/batcheval/cmd_resolve_intent_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func ResolveIntentRange(
}

var res result.Result
res.Local.ResolvedIntents = []roachpb.Intent{intent} // TODO(nvanbenschoten): resume span
res.Local.Metrics = resolveToMetricType(args.Status, args.Poison)

if WriteAbortSpanOnResolve(args.Status, args.Poison, numKeys > 0) {
Expand Down
29 changes: 29 additions & 0 deletions pkg/storage/batcheval/result/intent.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,35 @@ package result

import "github.com/cockroachdb/cockroach/pkg/roachpb"

// FromUpdatedIntent creates a Result communicating that an intent was updated
// by the given request and should be handled.
func FromUpdatedIntent(txn *roachpb.Transaction, key roachpb.Key) Result {
var pd Result
if txn == nil {
return pd
}
pd.Local.UpdatedIntents = []roachpb.Intent{{
Span: roachpb.Span{Key: key}, Txn: txn.TxnMeta, Status: roachpb.PENDING,
}}
return pd
}

// FromUpdatedIntents creates a Result communicating that the intents were
// updated by the given request and should be handled.
func FromUpdatedIntents(txn *roachpb.Transaction, keys []roachpb.Key) Result {
var pd Result
if txn == nil {
return pd
}
pd.Local.UpdatedIntents = make([]roachpb.Intent, len(keys))
for i := range pd.Local.UpdatedIntents {
pd.Local.UpdatedIntents[i] = roachpb.Intent{
Span: roachpb.Span{Key: keys[i]}, Txn: txn.TxnMeta, Status: roachpb.PENDING,
}
}
return pd
}

// FromEncounteredIntents creates a Result communicating that the intents were encountered
// by the given request and should be handled.
func FromEncounteredIntents(intents []roachpb.Intent) Result {
Expand Down
26 changes: 24 additions & 2 deletions pkg/storage/batcheval/result/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ import (
type LocalResult struct {
Reply *roachpb.BatchResponse

// UpdatedIntents stores any newly created or updated intents.
UpdatedIntents []roachpb.Intent
// ResolvedIntents stores any resolved intents.
ResolvedIntents []roachpb.Intent
// EncounteredIntents stores any intents encountered but not conflicted
// with. They should be handed off to asynchronous intent processing on
// the proposer, so that an attempt to resolve them is made.
Expand Down Expand Up @@ -66,6 +70,8 @@ type LocalResult struct {
func (lResult *LocalResult) IsZero() bool {
// NB: keep in order.
return lResult.Reply == nil &&
lResult.UpdatedIntents == nil &&
lResult.ResolvedIntents == nil &&
lResult.EncounteredIntents == nil &&
lResult.UpdatedTxns == nil &&
lResult.EndTxns == nil &&
Expand All @@ -80,11 +86,13 @@ func (lResult *LocalResult) String() string {
if lResult == nil {
return "LocalResult: nil"
}
return fmt.Sprintf("LocalResult (reply: %v, #encountered intents: %d, "+
return fmt.Sprintf("LocalResult (reply: %v, "+
"#updated intents: %d #resolved intents: %d #encountered intents: %d, "+
"#updated txns: %d #end txns: %d, "+
"GossipFirstRange:%t MaybeGossipSystemConfig:%t MaybeAddToSplitQueue:%t "+
"MaybeGossipNodeLiveness:%s MaybeWatchForMerge:%t",
lResult.Reply, len(lResult.EncounteredIntents),
lResult.Reply,
len(lResult.UpdatedIntents), len(lResult.ResolvedIntents), len(lResult.EncounteredIntents),
len(lResult.UpdatedTxns), len(lResult.EndTxns),
lResult.GossipFirstRange, lResult.MaybeGossipSystemConfig, lResult.MaybeAddToSplitQueue,
lResult.MaybeGossipNodeLiveness, lResult.MaybeWatchForMerge)
Expand Down Expand Up @@ -277,6 +285,20 @@ func (p *Result) MergeAndDestroy(q Result) error {
}
q.Replicated.PrevLeaseProposal = nil

if p.Local.UpdatedIntents == nil {
p.Local.UpdatedIntents = q.Local.UpdatedIntents
} else {
p.Local.UpdatedIntents = append(p.Local.UpdatedIntents, q.Local.UpdatedIntents...)
}
q.Local.UpdatedIntents = nil

if p.Local.ResolvedIntents == nil {
p.Local.ResolvedIntents = q.Local.ResolvedIntents
} else {
p.Local.ResolvedIntents = append(p.Local.ResolvedIntents, q.Local.ResolvedIntents...)
}
q.Local.ResolvedIntents = nil

if p.Local.EncounteredIntents == nil {
p.Local.EncounteredIntents = q.Local.EncounteredIntents
} else {
Expand Down
10 changes: 10 additions & 0 deletions pkg/storage/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -609,6 +609,16 @@ func (r *Replica) handleReadWriteLocalEvalResult(ctx context.Context, lResult re
log.Fatalf(ctx, "LocalEvalResult.MaybeWatchForMerge should be false")
}

if lResult.UpdatedIntents != nil {
// TODO(nvanbenschoten): handle UpdatedIntents.
lResult.UpdatedIntents = nil
}

if lResult.ResolvedIntents != nil {
// TODO(nvanbenschoten): handle ResolvedIntents.
lResult.ResolvedIntents = nil
}

if lResult.UpdatedTxns != nil {
for _, txn := range lResult.UpdatedTxns {
r.txnWaitQueue.UpdateTxn(ctx, txn)
Expand Down

0 comments on commit edf88b4

Please sign in to comment.