Skip to content

Commit

Permalink
Merge branch 'main' into feat/add-fee-config-to-node-info
Browse files Browse the repository at this point in the history
  • Loading branch information
Ja7ad authored Oct 20, 2024
2 parents eae6f53 + 91c46b4 commit 5a99158
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 26 deletions.
4 changes: 2 additions & 2 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ func NewNode(genDoc *genesis.Genesis, conf *config.Config,
eventCh = nil
}

txPool := txpool.NewTxPool(conf.TxPool, messageCh)

str, err := store.NewStore(conf.Store)
if err != nil {
return nil, err
}

txPool := txpool.NewTxPool(conf.TxPool, str, messageCh)

st, err := state.LoadOrNewState(genDoc, valKeys, str, txPool, eventCh)
if err != nil {
return nil, err
Expand Down
11 changes: 5 additions & 6 deletions state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,6 @@ func (st *state) ProposeBlock(valKey *bls.ValidatorKey, rewardAddr crypto.Addres
// Only one subsidy transaction per blk
if txs[i].IsSubsidyTx() {
st.logger.Error("found duplicated subsidy transaction", "tx", txs[i])
st.txPool.RemoveTx(txs[i].ID())
txs.Remove(i)
i--

Expand Down Expand Up @@ -434,15 +433,15 @@ func (st *state) CommitBlock(blk *block.Block, cert *certificate.BlockCertificat

st.store.SaveBlock(blk, cert)

// Remove transactions from pool
for _, trx := range blk.Transactions() {
st.txPool.RemoveTx(trx.ID())
}

if err := st.store.WriteBatch(); err != nil {
st.logger.Panic("unable to update state", "error", err)
}

// Remove transactions from pool and update consumption
if err := st.txPool.HandleCommittedBlock(blk); err != nil {
return err
}

st.logger.Info("new block committed", "block", blk, "round", cert.Round())

st.evaluateSortition()
Expand Down
8 changes: 6 additions & 2 deletions txpool/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ import (
type Config struct {
MaxSize int `toml:"max_size"`
Fee *FeeConfig `toml:"fee"`

// Private configs
ConsumptionWindow uint32 `toml:"-"`
}

type FeeConfig struct {
Expand All @@ -17,8 +20,9 @@ type FeeConfig struct {

func DefaultConfig() *Config {
return &Config{
MaxSize: 1000,
Fee: DefaultFeeConfig(),
MaxSize: 1000,
Fee: DefaultFeeConfig(),
ConsumptionWindow: 8640,
}
}

Expand Down
2 changes: 1 addition & 1 deletion txpool/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ type TxPool interface {
SetNewSandboxAndRecheck(sb sandbox.Sandbox)
AppendTxAndBroadcast(trx *tx.Tx) error
AppendTx(trx *tx.Tx) error
RemoveTx(id tx.ID)
HandleCommittedBlock(blk *block.Block) error
}
4 changes: 4 additions & 0 deletions txpool/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ func (m *MockTxPool) RemoveTx(id hash.Hash) {
}
}

func (*MockTxPool) HandleCommittedBlock(_ *block.Block) error {
return nil
}

func (m *MockTxPool) PrepareBlockTransactions() block.Txs {
txs := make([]*tx.Tx, m.Size())
copy(txs, m.Txs)
Expand Down
87 changes: 77 additions & 10 deletions txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import (
"fmt"
"sync"

"github.com/pactus-project/pactus/crypto"
"github.com/pactus-project/pactus/execution"
"github.com/pactus-project/pactus/sandbox"
"github.com/pactus-project/pactus/store"
"github.com/pactus-project/pactus/sync/bundle/message"
"github.com/pactus-project/pactus/types/amount"
"github.com/pactus-project/pactus/types/block"
Expand All @@ -19,14 +21,18 @@ import (
type txPool struct {
lk sync.RWMutex

config *Config
sandbox sandbox.Sandbox
pools map[payload.Type]pool
broadcastCh chan message.Message
logger *logger.SubLogger
config *Config
sandbox sandbox.Sandbox
pools map[payload.Type]pool
consumptionMap map[crypto.Address]uint32
broadcastCh chan message.Message
strReader store.Reader
logger *logger.SubLogger
}

func NewTxPool(conf *Config, broadcastCh chan message.Message) TxPool {
// NewTxPool constructs a new transaction pool with various sub-pools for different transaction types.
// The transaction pool also maintains a consumption map for tracking byte usage per address.
func NewTxPool(conf *Config, storeReader store.Reader, broadcastCh chan message.Message) TxPool {
pools := make(map[payload.Type]pool)
pools[payload.TypeTransfer] = newPool(conf.transferPoolSize(), conf.minFee())
pools[payload.TypeBond] = newPool(conf.bondPoolSize(), conf.minFee())
Expand All @@ -35,16 +41,20 @@ func NewTxPool(conf *Config, broadcastCh chan message.Message) TxPool {
pools[payload.TypeSortition] = newPool(conf.sortitionPoolSize(), 0)

pool := &txPool{
config: conf,
pools: pools,
broadcastCh: broadcastCh,
config: conf,
pools: pools,
consumptionMap: make(map[crypto.Address]uint32),
strReader: storeReader,
broadcastCh: broadcastCh,
}

pool.logger = logger.NewSubLogger("_pool", pool)

return pool
}

// SetNewSandboxAndRecheck updates the sandbox and rechecks all transactions,
// removing expired or invalid ones.
func (p *txPool) SetNewSandboxAndRecheck(sb sandbox.Sandbox) {
p.lk.Lock()
defer p.lk.Unlock()
Expand Down Expand Up @@ -133,10 +143,67 @@ func (p *txPool) checkTx(trx *tx.Tx) error {
return nil
}

func (p *txPool) RemoveTx(id tx.ID) {
func (p *txPool) HandleCommittedBlock(blk *block.Block) error {
p.lk.Lock()
defer p.lk.Unlock()

for _, trx := range blk.Transactions() {
p.removeTx(trx.ID())

p.handleIncreaseConsumption(trx)
}

return p.handleDecreaseConsumption(blk.Height())
}

func (p *txPool) handleIncreaseConsumption(trx *tx.Tx) {
if trx.IsTransferTx() || trx.IsBondTx() || trx.IsWithdrawTx() {
signer := trx.Payload().Signer()

p.consumptionMap[signer] += uint32(trx.SerializeSize())
}
}

func (p *txPool) handleDecreaseConsumption(height uint32) error {
// If height is less than or equal to ConsumptionWindow, nothing to do.
if height <= p.config.ConsumptionWindow {
return nil
}

// Calculate the block height that has passed out of the consumption window.
windowedBlockHeight := height - p.config.ConsumptionWindow
committedBlock, err := p.strReader.Block(windowedBlockHeight)
if err != nil {
return err
}

blk, err := committedBlock.ToBlock()
if err != nil {
return err
}

for _, trx := range blk.Transactions() {
if trx.IsTransferTx() || trx.IsBondTx() || trx.IsWithdrawTx() {
signer := trx.Payload().Signer()
if v, ok := p.consumptionMap[signer]; ok {
// Decrease the consumption by the size of the transaction
v -= uint32(trx.SerializeSize())

if v == 0 {
// If the new value is zero, remove the signer from the consumptionMap
delete(p.consumptionMap, signer)
} else {
// Otherwise, update the map with the new value
p.consumptionMap[signer] = v
}
}
}
}

return nil
}

func (p *txPool) removeTx(id tx.ID) {
for _, pool := range p.pools {
if pool.list.Remove(id) {
break
Expand Down
63 changes: 60 additions & 3 deletions txpool/txpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"testing"
"time"

"github.com/pactus-project/pactus/crypto"
"github.com/pactus-project/pactus/execution"
"github.com/pactus-project/pactus/sandbox"
"github.com/pactus-project/pactus/store"
"github.com/pactus-project/pactus/sync/bundle/message"
"github.com/pactus-project/pactus/types/account"
"github.com/pactus-project/pactus/types/tx"
Expand All @@ -22,12 +24,14 @@ type testData struct {

pool *txPool
sandbox *sandbox.MockSandbox
str *store.MockStore
ch chan message.Message
}

func testConfig() *Config {
return &Config{
MaxSize: 10,
MaxSize: 10,
ConsumptionWindow: 3,
Fee: &FeeConfig{
FixedFee: 0.000001,
DailyLimit: 280,
Expand All @@ -44,7 +48,8 @@ func setup(t *testing.T) *testData {
ch := make(chan message.Message, 10)
sb := sandbox.MockingSandbox(ts)
config := testConfig()
p := NewTxPool(config, ch)
mockStore := store.MockingStore(ts)
p := NewTxPool(config, mockStore, ch)
p.SetNewSandboxAndRecheck(sb)
pool := p.(*txPool)
assert.NotNil(t, pool)
Expand All @@ -53,6 +58,7 @@ func setup(t *testing.T) *testData {
TestSuite: ts,
pool: pool,
sandbox: sb,
str: mockStore,
ch: ch,
}
}
Expand Down Expand Up @@ -96,11 +102,62 @@ func TestAppendAndRemove(t *testing.T) {
// Appending the same transaction again, should not return any error
assert.NoError(t, td.pool.AppendTx(testTrx))

td.pool.RemoveTx(testTrx.ID())
td.pool.removeTx(testTrx.ID())
assert.False(t, td.pool.HasTx(testTrx.ID()), "Transaction should be removed")
assert.Nil(t, td.pool.PendingTx(testTrx.ID()))
}

func TestCalculatingConsumption(t *testing.T) {
td := setup(t)

// Generate keys for different transaction signers
_, prv1 := td.RandEd25519KeyPair()
_, prv2 := td.RandEd25519KeyPair()
_, prv3 := td.RandBLSKeyPair()
_, prv4 := td.RandBLSKeyPair()

// Generate different types of transactions
trx11 := td.GenerateTestTransferTx(testsuite.TransactionWithEd25519Signer(prv1))
trx12 := td.GenerateTestBondTx(testsuite.TransactionWithEd25519Signer(prv1))
trx13 := td.GenerateTestWithdrawTx(testsuite.TransactionWithBLSSigner(prv3))
trx14 := td.GenerateTestUnbondTx(testsuite.TransactionWithBLSSigner(prv4))
trx21 := td.GenerateTestTransferTx(testsuite.TransactionWithEd25519Signer(prv2))
trx31 := td.GenerateTestBondTx(testsuite.TransactionWithBLSSigner(prv4))
trx41 := td.GenerateTestWithdrawTx(testsuite.TransactionWithBLSSigner(prv3))
trx42 := td.GenerateTestTransferTx(testsuite.TransactionWithEd25519Signer(prv2))

// Expected consumption map after transactions
expected := map[crypto.Address]uint32{
prv2.PublicKeyNative().AccountAddress(): uint32(trx21.SerializeSize()) + uint32(trx42.SerializeSize()),
prv4.PublicKeyNative().AccountAddress(): uint32(trx31.SerializeSize()),
prv3.PublicKeyNative().ValidatorAddress(): uint32(trx41.SerializeSize()),
}

tests := []struct {
height uint32
txs []*tx.Tx
}{
{1, []*tx.Tx{trx11, trx12, trx13, trx14}},
{2, []*tx.Tx{trx21}},
{3, []*tx.Tx{trx31}},
{4, []*tx.Tx{trx41, trx42}},
}

for _, tt := range tests {
// Generate a block with the transactions for the given height
blk, cert := td.TestSuite.GenerateTestBlock(tt.height, func(bm *testsuite.BlockMaker) {
bm.Txs = tt.txs
})
td.str.SaveBlock(blk, cert)

// Handle the block in the transaction pool
err := td.pool.HandleCommittedBlock(blk)
require.NoError(t, err)
}

require.Equal(t, expected, td.pool.consumptionMap)
}

func TestAppendInvalidTransaction(t *testing.T) {
td := setup(t)

Expand Down
11 changes: 9 additions & 2 deletions util/io_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,16 +217,23 @@ func TestSanitizeArchivePath(t *testing.T) {
func TestListFilesInDir(t *testing.T) {
tmpDir := TempDirPath()

file1, err := os.Create(filepath.Join(tmpDir, ".file1"))
file1Path := filepath.Join(tmpDir, ".public_file")
file1, err := os.Create(file1Path)
require.NoError(t, err)
require.NoError(t, file1.Close())

file2, err := os.Create(filepath.Join(tmpDir, ".file2"))
file2Path := filepath.Join(tmpDir, ".hidden_file")
file2, err := os.Create(file2Path)
require.NoError(t, err)
require.NoError(t, file2.Close())

err = os.Mkdir(filepath.Join(tmpDir, "directory"), 0o750)
require.NoError(t, err)

files, err := ListFilesInDir(tmpDir)
require.NoError(t, err)

assert.Len(t, files, 2)
assert.Contains(t, files, file1Path)
assert.Contains(t, files, file2Path)
}

0 comments on commit 5a99158

Please sign in to comment.