Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wallet/wallet: remove invalid transactions when broadcast fails #597

Merged
merged 3 commits into from
Mar 9, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
185 changes: 85 additions & 100 deletions wallet/wallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -2823,96 +2823,29 @@ func (w *Wallet) LockedOutpoints() []btcjson.TransactionInput {
// credits that are not known to have been mined into a block, and attempts
// to send each to the chain server for relay.
func (w *Wallet) resendUnminedTxs() {
chainClient, err := w.requireChainClient()
if err != nil {
log.Errorf("No chain server available to resend unmined transactions")
return
}

var txs []*wire.MsgTx
err = walletdb.View(w.db, func(tx walletdb.ReadTx) error {
err := walletdb.View(w.db, func(tx walletdb.ReadTx) error {
txmgrNs := tx.ReadBucket(wtxmgrNamespaceKey)
var err error
txs, err = w.TxStore.UnminedTxs(txmgrNs)
return err
})
if err != nil {
log.Errorf("Cannot load unmined transactions for resending: %v", err)
log.Errorf("Unable to retrieve unconfirmed transactions to "+
"resend: %v", err)
return
}

for _, tx := range txs {
resp, err := chainClient.SendRawTransaction(tx, false)
txHash, err := w.publishTransaction(tx)
if err != nil {
// If the transaction has already been accepted into the
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yay code deletion!

// mempool, we can continue without logging the error.
switch {
case strings.Contains(err.Error(), "already have transaction"):
fallthrough
case strings.Contains(err.Error(), "txn-already-known"):
continue
}

log.Debugf("Could not resend transaction %v: %v",
log.Debugf("Unable to rebroadcast transaction %v: %v",
tx.TxHash(), err)

// We'll only stop broadcasting transactions if we
// detect that the output has already been fully spent,
// is an orphan, or is conflicting with another
// transaction.
//
// TODO(roasbeef): SendRawTransaction needs to return
// concrete error types, no need for string matching
switch {
// The following are errors returned from btcd's
// mempool.
case strings.Contains(err.Error(), "spent"):
case strings.Contains(err.Error(), "orphan"):
case strings.Contains(err.Error(), "conflict"):
case strings.Contains(err.Error(), "already exists"):
case strings.Contains(err.Error(), "negative"):

// The following errors are returned from bitcoind's
// mempool.
case strings.Contains(err.Error(), "Missing inputs"):
case strings.Contains(err.Error(), "already in block chain"):
case strings.Contains(err.Error(), "fee not met"):

default:
continue
}

// As the transaction was rejected, we'll attempt to
// remove the unmined transaction all together.
// Otherwise, we'll keep attempting to rebroadcast this,
// and we may be computing our balance incorrectly if
// this transaction credits or debits to us.
//
// TODO(wilmer): if already confirmed, move to mined
// bucket - need to determine the confirmation block.
err := walletdb.Update(w.db, func(dbTx walletdb.ReadWriteTx) error {
txmgrNs := dbTx.ReadWriteBucket(wtxmgrNamespaceKey)

txRec, err := wtxmgr.NewTxRecordFromMsgTx(
tx, time.Now(),
)
if err != nil {
return err
}

return w.TxStore.RemoveUnminedTx(txmgrNs, txRec)
})
if err != nil {
log.Warnf("unable to remove conflicting "+
"tx %v: %v", tx.TxHash(), err)
continue
}

log.Infof("Removed conflicting tx: %v", spew.Sdump(tx))

continue
}
log.Debugf("Resent unmined transaction %v", resp)

log.Debugf("Successfully rebroadcast unconfirmed transaction %v",
txHash)
}
}

Expand Down Expand Up @@ -3214,7 +3147,7 @@ func (w *Wallet) SendOutputs(outputs []*wire.TxOut, account uint32,
return nil, err
}

txHash, err := w.publishTransaction(createdTx.Tx)
txHash, err := w.reliablyPublishTransaction(createdTx.Tx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -3370,16 +3303,16 @@ func (w *Wallet) SignTransaction(tx *wire.MsgTx, hashType txscript.SigHashType,
// This function is unstable and will be removed once syncing code is moved out
// of the wallet.
func (w *Wallet) PublishTransaction(tx *wire.MsgTx) error {
_, err := w.publishTransaction(tx)
_, err := w.reliablyPublishTransaction(tx)
return err
}

// publishTransaction is the private version of PublishTransaction which
// contains the primary logic required for publishing a transaction, updating
// the relevant database state, and finally possible removing the transaction
// from the database (along with cleaning up all inputs used, and outputs
// created) if the transaction is rejected by the back end.
func (w *Wallet) publishTransaction(tx *wire.MsgTx) (*chainhash.Hash, error) {
// reliablyPublishTransaction is a superset of publishTransaction which contains
// the primary logic required for publishing a transaction, updating the
// relevant database state, and finally possible removing the transaction from
// the database (along with cleaning up all inputs used, and outputs created) if
// the transaction is rejected by the backend.
func (w *Wallet) reliablyPublishTransaction(tx *wire.MsgTx) (*chainhash.Hash, error) {
chainClient, err := w.requireChainClient()
if err != nil {
return nil, err
Expand Down Expand Up @@ -3426,41 +3359,93 @@ func (w *Wallet) publishTransaction(tx *wire.MsgTx) (*chainhash.Hash, error) {
}
}

return w.publishTransaction(tx)
}

// publishTransaction attempts to send an unconfirmed transaction to the
// wallet's current backend. In the event that sending the transaction fails for
// whatever reason, it will be removed from the wallet's unconfirmed transaction
// store.
func (w *Wallet) publishTransaction(tx *wire.MsgTx) (*chainhash.Hash, error) {
chainClient, err := w.requireChainClient()
if err != nil {
return nil, err
}

txid, err := chainClient.SendRawTransaction(tx, false)
switch {
case err == nil:
return txid, nil

// The following are errors returned from btcd's mempool.
case strings.Contains(err.Error(), "spent"):
fallthrough
case strings.Contains(err.Error(), "orphan"):
fallthrough
case strings.Contains(err.Error(), "conflict"):
// Since we have different backends that can be used with the wallet,
// we'll need to check specific errors for each one.
//
// If the transaction is already in the mempool, we can just return now.
//
// This error is returned when broadcasting/sending a transaction to a
// btcd node that already has it in their mempool.
case strings.Contains(err.Error(), "already have transaction"):
fallthrough

// The following errors are returned from bitcoind's mempool.
case strings.Contains(err.Error(), "fee not met"):
// This error is returned when broadcasting a transaction to a bitcoind
// node that already has it in their mempool.
case strings.Contains(err.Error(), "txn-already-in-mempool"):
return txid, nil

// If the transaction has already confirmed, we can safely remove it
// from the unconfirmed store as it should already exist within the
// confirmed store. We'll avoid returning an error as the broadcast was
// in a sense successful.
//
// This error is returned when broadcasting/sending a transaction that
// has already confirmed to a btcd node.
case strings.Contains(err.Error(), "transaction already exists"):
fallthrough
case strings.Contains(err.Error(), "Missing inputs"):

// This error is returned when broadcasting a transaction that has
// already confirmed to a bitcoind node.
case strings.Contains(err.Error(), "txn-already-known"):
fallthrough
case strings.Contains(err.Error(), "already in block chain"):
// If the transaction was rejected, then we'll remove it from
// the txstore, as otherwise, we'll attempt to continually
// re-broadcast it, and the utxo state of the wallet won't be
// accurate.

// This error is returned when sending a transaction that has already
// confirmed to a bitcoind node over RPC.
case strings.Contains(err.Error(), "transaction already in block chain"):
dbErr := walletdb.Update(w.db, func(dbTx walletdb.ReadWriteTx) error {
txmgrNs := dbTx.ReadWriteBucket(wtxmgrNamespaceKey)
txRec, err := wtxmgr.NewTxRecordFromMsgTx(tx, time.Now())
if err != nil {
return err
}
return w.TxStore.RemoveUnminedTx(txmgrNs, txRec)
})
if dbErr != nil {
return nil, fmt.Errorf("unable to broadcast tx: %v, "+
"unable to remove invalid tx: %v", err, dbErr)
log.Warnf("Unable to remove confirmed transaction %v "+
"from unconfirmed store: %v", tx.TxHash(), dbErr)
}

return nil, err
return txid, nil

// If the transaction was rejected for whatever other reason, then we'll
// remove it from the transaction store, as otherwise, we'll attempt to
// continually re-broadcast it, and the UTXO state of the wallet won't
// be accurate.
default:
dbErr := walletdb.Update(w.db, func(dbTx walletdb.ReadWriteTx) error {
txmgrNs := dbTx.ReadWriteBucket(wtxmgrNamespaceKey)
txRec, err := wtxmgr.NewTxRecordFromMsgTx(tx, time.Now())
if err != nil {
return err
}
return w.TxStore.RemoveUnminedTx(txmgrNs, txRec)
})
if dbErr != nil {
log.Warnf("Unable to remove invalid transaction %v: %v",
tx.TxHash(), dbErr)
} else {
log.Infof("Removed invalid transaction: %v",
spew.Sdump(tx))
}

return nil, err
}
}
Expand Down