Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message Pool Rudimentary Spam Protection Measures #3313

Merged
merged 22 commits into from
Aug 28, 2020
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
da9630c
remove duplicate check
vyzo Aug 26, 2020
835dbfe
check message before adding in PushWithNonce
vyzo Aug 26, 2020
77f6e0d
refactor balance check, apply it in PushWithNonce
vyzo Aug 26, 2020
4ac9828
track required funds in message set
vyzo Aug 26, 2020
4adb83e
check aggregate pending message required funds against balance
vyzo Aug 26, 2020
b59f584
fix tests
vyzo Aug 26, 2020
7887694
ErrBroadcastAnyway is now ErrValidationFailure
vyzo Aug 26, 2020
87e6c09
include message Value only in soft validation failures
vyzo Aug 26, 2020
bedbdca
give some score weight to the messages topic
vyzo Aug 26, 2020
c473d3c
rename ErrValidationFailure to ErrSoftValidationFailure
vyzo Aug 26, 2020
a2c0c10
correctly account for message replacement in required funds tracking
vyzo Aug 26, 2020
7a70668
better code for getRequiredFunds
vyzo Aug 26, 2020
4919a00
always verify message min gas
vyzo Aug 26, 2020
8848c54
move msg serialization outside of lock, consistently use curTs
vyzo Aug 26, 2020
793eda9
comment out Value inclusion for required funds tracking
vyzo Aug 26, 2020
8db262c
check gas fee cap for minimum value
vyzo Aug 26, 2020
d3e95d6
cast min base fee to uint64
vyzo Aug 26, 2020
ad889a7
rate limit messages per actor by enforcing maximum pending message limit
vyzo Aug 26, 2020
6abfbbd
fix tests
vyzo Aug 26, 2020
82ef052
named error for too many pending messages
vyzo Aug 26, 2020
038e83b
log error if we trip over too many pending messages for actor
vyzo Aug 26, 2020
708a8b6
flag to allow add to skip limit check
vyzo Aug 27, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 103 additions & 38 deletions chain/messagepool/messagepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"math"
stdbig "math/big"
"sort"
"sync"
"time"
Expand Down Expand Up @@ -47,19 +48,26 @@ const RbfDenom = 256

var RepublishInterval = pubsub.TimeCacheDuration + time.Duration(5*build.BlockDelaySecs+build.PropagationDelaySecs)*time.Second

var minimumBaseFee = types.NewInt(uint64(build.MinimumBaseFee))

var MaxActorPendingMessages = 1000

var (
ErrMessageTooBig = errors.New("message too big")

ErrMessageValueTooHigh = errors.New("cannot send more filecoin than will ever exist")

ErrNonceTooLow = errors.New("message nonce too low")

ErrGasFeeCapTooLow = errors.New("gas fee cap too low")

ErrNotEnoughFunds = errors.New("not enough funds to execute transaction")

ErrInvalidToAddr = errors.New("message had invalid to address")

ErrBroadcastAnyway = errors.New("broadcasting message despite validation fail")
ErrRBFTooLowPremium = errors.New("replace by fee has too low GasPremium")
ErrSoftValidationFailure = errors.New("validation failure")
ErrRBFTooLowPremium = errors.New("replace by fee has too low GasPremium")
ErrTooManyPendingMessages = errors.New("too many pending messages for actor")

ErrTryAgain = errors.New("state inconsistency while pushing message; please try again")
)
Expand Down Expand Up @@ -118,13 +126,15 @@ type MessagePool struct {
}

type msgSet struct {
msgs map[uint64]*types.SignedMessage
nextNonce uint64
msgs map[uint64]*types.SignedMessage
nextNonce uint64
requiredFunds *stdbig.Int
}

func newMsgSet() *msgSet {
return &msgSet{
msgs: make(map[uint64]*types.SignedMessage),
msgs: make(map[uint64]*types.SignedMessage),
requiredFunds: stdbig.NewInt(0),
}
}

Expand All @@ -150,12 +160,44 @@ func (ms *msgSet) add(m *types.SignedMessage, mp *MessagePool) (bool, error) {
ErrRBFTooLowPremium)
}
}

ms.requiredFunds.Sub(ms.requiredFunds, exms.Message.RequiredFunds().Int)
//ms.requiredFunds.Sub(ms.requiredFunds, exms.Message.Value.Int)
}

if !has && len(ms.msgs) > MaxActorPendingMessages {
log.Errorf("too many pending messages from actor %s", m.Message.From)
return false, ErrTooManyPendingMessages
}

ms.msgs[m.Message.Nonce] = m
ms.requiredFunds.Add(ms.requiredFunds, m.Message.RequiredFunds().Int)
//ms.requiredFunds.Add(ms.requiredFunds, m.Message.Value.Int)

return !has, nil
}

func (ms *msgSet) rm(nonce uint64) {
m, has := ms.msgs[nonce]
if has {
ms.requiredFunds.Sub(ms.requiredFunds, m.Message.RequiredFunds().Int)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be Add no?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh wait, we're tracking funds required for this message chain here, so this means we're removing the message from the chain, which lowers the required funds for that chain. SGTM

//ms.requiredFunds.Sub(ms.requiredFunds, m.Message.Value.Int)
delete(ms.msgs, nonce)
}
}

func (ms *msgSet) getRequiredFunds(nonce uint64) types.BigInt {
requiredFunds := new(stdbig.Int).Set(ms.requiredFunds)

m, has := ms.msgs[nonce]
if has {
requiredFunds.Sub(requiredFunds, m.Message.RequiredFunds().Int)
//requiredFunds.Sub(requiredFunds, m.Message.Value.Int)
}

return types.BigInt{Int: requiredFunds}
}

func New(api Provider, ds dtypes.MetadataDS, netName dtypes.NetworkName) (*MessagePool, error) {
cache, _ := lru.New2Q(build.BlsSignatureCacheSize)
verifcache, _ := lru.New2Q(build.VerifSigCacheSize)
Expand Down Expand Up @@ -257,7 +299,7 @@ func (mp *MessagePool) addLocal(m *types.SignedMessage, msgb []byte) error {
return nil
}

func (mp *MessagePool) verifyMsgBeforePush(m *types.SignedMessage, epoch abi.ChainEpoch) error {
func (mp *MessagePool) verifyMsgBeforeAdd(m *types.SignedMessage, epoch abi.ChainEpoch) error {
minGas := vm.PricelistByEpoch(epoch).OnChainMessage(m.ChainLength())

if err := m.VMMessage().ValidForBlockInclusion(minGas.Total()); err != nil {
Expand All @@ -278,25 +320,12 @@ func (mp *MessagePool) Push(m *types.SignedMessage) (cid.Cid, error) {
<-mp.addSema
}()

mp.curTsLk.Lock()
curTs := mp.curTs
epoch := curTs.Height()
mp.curTsLk.Unlock()
if err := mp.verifyMsgBeforePush(m, epoch); err != nil {
return cid.Undef, err
}

msgb, err := m.Serialize()
if err != nil {
return cid.Undef, err
}

mp.curTsLk.Lock()
if mp.curTs != curTs {
mp.curTsLk.Unlock()
return cid.Undef, ErrTryAgain
}

if err := mp.addTs(m, mp.curTs); err != nil {
mp.curTsLk.Unlock()
return cid.Undef, err
Expand All @@ -319,7 +348,7 @@ func (mp *MessagePool) checkMessage(m *types.SignedMessage) error {
return xerrors.Errorf("mpool message too large (%dB): %w", m.Size(), ErrMessageTooBig)
}

// Perform syntaxtic validation, minGas=0 as we check if correctly in select messages
// Perform syntactic validation, minGas=0 as we check the actual mingas before we add it
if err := m.Message.ValidForBlockInclusion(0); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

were do we do that check now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do it in both add and push.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and we check with the actual gas in the add logic, so it happens in all paths.

return xerrors.Errorf("message not valid for block inclusion: %w", err)
}
Expand All @@ -332,8 +361,12 @@ func (mp *MessagePool) checkMessage(m *types.SignedMessage) error {
return ErrMessageValueTooHigh
}

if m.Message.GasFeeCap.LessThan(minimumBaseFee) {
return ErrGasFeeCapTooLow
}

if err := mp.VerifyMsgSig(m); err != nil {
log.Warnf("mpooladd signature verification failed: %s", err)
log.Warnf("signature verification failed: %s", err)
return err
}

Expand Down Expand Up @@ -393,28 +426,55 @@ func (mp *MessagePool) VerifyMsgSig(m *types.SignedMessage) error {
return nil
}

func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet) error {
snonce, err := mp.getStateNonce(m.Message.From, curTs)
func (mp *MessagePool) checkBalance(m *types.SignedMessage, curTs *types.TipSet) error {
balance, err := mp.getStateBalance(m.Message.From, curTs)
if err != nil {
return xerrors.Errorf("failed to look up actor state nonce: %s: %w", err, ErrBroadcastAnyway)
return xerrors.Errorf("failed to check sender balance: %s: %w", err, ErrSoftValidationFailure)
}

if snonce > m.Message.Nonce {
return xerrors.Errorf("minimum expected nonce is %d: %w", snonce, ErrNonceTooLow)
requiredFunds := m.Message.RequiredFunds()
if balance.LessThan(requiredFunds) {
return xerrors.Errorf("not enough funds (required: %s, balance: %s): %w", types.FIL(requiredFunds), types.FIL(balance), ErrNotEnoughFunds)
}

balance, err := mp.getStateBalance(m.Message.From, curTs)
// add Value for soft failure check
//requiredFunds = types.BigAdd(requiredFunds, m.Message.Value)

mset, ok := mp.pending[m.Message.From]
if ok {
requiredFunds = types.BigAdd(requiredFunds, mset.getRequiredFunds(m.Message.Nonce))
}

if balance.LessThan(requiredFunds) {
// Note: we fail here for ErrSoftValidationFailure to signal a soft failure because we might
// be out of sync.
return xerrors.Errorf("not enough funds including pending messages (required: %s, balance: %s): %w", types.FIL(requiredFunds), types.FIL(balance), ErrSoftValidationFailure)
}

return nil
}

func (mp *MessagePool) addTs(m *types.SignedMessage, curTs *types.TipSet) error {
snonce, err := mp.getStateNonce(m.Message.From, curTs)
if err != nil {
return xerrors.Errorf("failed to check sender balance: %s: %w", err, ErrBroadcastAnyway)
return xerrors.Errorf("failed to look up actor state nonce: %s: %w", err, ErrSoftValidationFailure)
}

if balance.LessThan(m.Message.RequiredFunds()) {
return xerrors.Errorf("not enough funds (required: %s, balance: %s): %w", types.FIL(m.Message.RequiredFunds()), types.FIL(balance), ErrNotEnoughFunds)
if snonce > m.Message.Nonce {
return xerrors.Errorf("minimum expected nonce is %d: %w", snonce, ErrNonceTooLow)
}

mp.lk.Lock()
defer mp.lk.Unlock()

if err := mp.verifyMsgBeforeAdd(m, curTs.Height()); err != nil {
return err
}

if err := mp.checkBalance(m, curTs); err != nil {
return err
}

return mp.addLocked(m)
}

Expand All @@ -431,10 +491,6 @@ func (mp *MessagePool) addLocked(m *types.SignedMessage) error {
mp.blsSigCache.Add(m.Cid(), m.Signature)
}

if m.Message.GasLimit > build.BlockGasLimit {
return xerrors.Errorf("given message has too high of a gas limit")
}

Comment on lines -434 to -437
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is checked by ValidForBlockInclusion, so it was redundant here and I removed it.

if _, err := mp.api.PutMessage(m); err != nil {
log.Warnf("mpooladd cs.PutMessage failed: %s", err)
return err
Expand Down Expand Up @@ -562,6 +618,16 @@ func (mp *MessagePool) PushWithNonce(ctx context.Context, addr address.Address,
return nil, err
}

err = mp.checkMessage(msg)
if err != nil {
return nil, err
}

msgb, err := msg.Serialize()
if err != nil {
return nil, err
}

// reacquire the locks and check state for consistency
mp.curTsLk.Lock()
defer mp.curTsLk.Unlock()
Expand All @@ -582,12 +648,11 @@ func (mp *MessagePool) PushWithNonce(ctx context.Context, addr address.Address,
return nil, ErrTryAgain
}

if err := mp.verifyMsgBeforePush(msg, mp.curTs.Height()); err != nil {
if err := mp.verifyMsgBeforeAdd(msg, curTs.Height()); err != nil {
return nil, err
}

msgb, err := msg.Serialize()
if err != nil {
if err := mp.checkBalance(msg, curTs); err != nil {
return nil, err
}

Expand Down Expand Up @@ -625,7 +690,7 @@ func (mp *MessagePool) remove(from address.Address, nonce uint64) {

// NB: This deletes any message with the given nonce. This makes sense
// as two messages with the same sender cannot have the same nonce
delete(mset.msgs, nonce)
mset.rm(nonce)

if len(mset.msgs) == 0 {
delete(mp.pending, from)
Expand Down
12 changes: 12 additions & 0 deletions chain/messagepool/messagepool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,7 @@ func TestPruningSimple(t *testing.T) {
if err != nil {
t.Fatal(err)
}
tma.setBalance(sender, 1) // in FIL
target := mock.Address(1001)

for i := 0; i < 5; i++ {
Expand Down Expand Up @@ -430,6 +431,8 @@ func TestLoadLocal(t *testing.T) {
t.Fatal(err)
}

tma.setBalance(a1, 1) // in FIL
tma.setBalance(a2, 1) // in FIL
gasLimit := gasguess.Costs[gasguess.CostKey{Code: builtin.StorageMarketActorCodeID, M: 2}]
msgs := make(map[cid.Cid]struct{})
for i := 0; i < 10; i++ {
Expand Down Expand Up @@ -500,6 +503,8 @@ func TestClearAll(t *testing.T) {
t.Fatal(err)
}

tma.setBalance(a1, 1) // in FIL
tma.setBalance(a2, 1) // in FIL
gasLimit := gasguess.Costs[gasguess.CostKey{Code: builtin.StorageMarketActorCodeID, M: 2}]
for i := 0; i < 10; i++ {
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
Expand Down Expand Up @@ -552,6 +557,9 @@ func TestClearNonLocal(t *testing.T) {
t.Fatal(err)
}

tma.setBalance(a1, 1) // in FIL
tma.setBalance(a2, 1) // in FIL

gasLimit := gasguess.Costs[gasguess.CostKey{Code: builtin.StorageMarketActorCodeID, M: 2}]
for i := 0; i < 10; i++ {
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
Expand Down Expand Up @@ -619,6 +627,10 @@ func TestUpdates(t *testing.T) {
}

gasLimit := gasguess.Costs[gasguess.CostKey{Code: builtin.StorageMarketActorCodeID, M: 2}]

tma.setBalance(a1, 1) // in FIL
tma.setBalance(a2, 1) // in FIL

for i := 0; i < 10; i++ {
m := makeTestMessage(w1, a1, a2, uint64(i), gasLimit, uint64(i+1))
_, err := mp.Push(m)
Expand Down
5 changes: 5 additions & 0 deletions chain/messagepool/selection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ import (
logging "github.com/ipfs/go-log"
)

func init() {
// bump this for the selection tests
MaxActorPendingMessages = 1000000
}

func makeTestMessage(w *wallet.Wallet, from, to address.Address, nonce uint64, gasLimit int64, gasPrice uint64) *types.SignedMessage {
msg := &types.Message{
From: from,
Expand Down
4 changes: 3 additions & 1 deletion chain/sub/incoming.go
Original file line number Diff line number Diff line change
Expand Up @@ -545,10 +545,12 @@ func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubs
)
stats.Record(ctx, metrics.MessageValidationFailure.M(1))
switch {
case xerrors.Is(err, messagepool.ErrBroadcastAnyway):
case xerrors.Is(err, messagepool.ErrSoftValidationFailure):
fallthrough
case xerrors.Is(err, messagepool.ErrRBFTooLowPremium):
fallthrough
case xerrors.Is(err, messagepool.ErrTooManyPendingMessages):
fallthrough
case xerrors.Is(err, messagepool.ErrNonceTooLow):
return pubsub.ValidationIgnore
default:
Expand Down
15 changes: 7 additions & 8 deletions node/modules/lp2p/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) {
Topics: map[string]*pubsub.TopicScoreParams{
drandTopic: {
// expected 2 beaconsn/min
TopicWeight: 0.5, // 5x block topic
TopicWeight: 0.5, // 5x block topic; max cap is 62.5

// 1 tick per second, maxes at 1 after 1 hour
TimeInMeshWeight: 0.00027, // ~1/3600
Expand Down Expand Up @@ -154,7 +154,7 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) {
},
build.BlocksTopic(in.Nn): {
// expected 10 blocks/min
TopicWeight: 0.1, // max is 50, max mesh penalty is -10, single invalid message is -100
TopicWeight: 0.1, // max cap is 50, max mesh penalty is -10, single invalid message is -100

// 1 tick per second, maxes at 1 after 1 hour
TimeInMeshWeight: 0.00027, // ~1/3600
Expand Down Expand Up @@ -195,18 +195,17 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) {
},
build.MessagesTopic(in.Nn): {
// expected > 1 tx/second
TopicWeight: 0.05, // max is 25, max mesh penalty is -5, single invalid message is -100
TopicWeight: 0.1, // max cap is 5, single invalid message is -100

// 1 tick per second, maxes at 1 hour
TimeInMeshWeight: 0.0002778, // ~1/3600
TimeInMeshQuantum: time.Second,
TimeInMeshCap: 1,

// deliveries decay after 10min, cap at 1000 tx
FirstMessageDeliveriesWeight: 0.5, // max value is 500
// deliveries decay after 10min, cap at 100 tx
FirstMessageDeliveriesWeight: 0.5, // max value is 50
FirstMessageDeliveriesDecay: pubsub.ScoreParameterDecay(10 * time.Minute),
//FirstMessageDeliveriesCap: 1000,
FirstMessageDeliveriesCap: 1, // we can't yet properly validate them so only confer a tiny boost from delivery
FirstMessageDeliveriesCap: 100, // 100 messages in 10 minutes

// Mesh Delivery Failure is currently turned off for messages
// This is on purpose as the network is still too small, which results in
Expand All @@ -225,7 +224,7 @@ func GossipSub(in GossipIn) (service *pubsub.PubSub, err error) {
// MeshFailurePenaltyDecay: pubsub.ScoreParameterDecay(5 * time.Minute),

// invalid messages decay after 1 hour
InvalidMessageDeliveriesWeight: -2000,
InvalidMessageDeliveriesWeight: -1000,
InvalidMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour),
},
},
Expand Down