Skip to content

Commit

Permalink
refactor(sync): refactoring syncing process (#676)
Browse files Browse the repository at this point in the history
  • Loading branch information
b00f authored Sep 7, 2023
1 parent 4513a1d commit 6c53a36
Show file tree
Hide file tree
Showing 69 changed files with 1,477 additions and 2,040 deletions.
9 changes: 0 additions & 9 deletions config/example_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,19 +78,10 @@
# `moniker` is a custom human-readable name for this node.
## moniker = ""

# `heartbeat_timer` is a timer for broadcasting a heartbeat message to the network.
# Default is 5 seconds
# Set the value to zero e.g heartbeat_timer = "0s" to disable heartbeat broadcasing.
## heartbeat_timer = "5s"

# `session_timeout` is a timeout for a session to be opened.
# Default is 10 seconds
## session_timeout = "10s"

# `max_open_sessions` is the maximum number of open sessions.
# Default is 8
## max_open_sessions = 8

# `block_per_message` is the number of blocks per message.
# Default is 60.
## block_per_message = 60
Expand Down
4 changes: 2 additions & 2 deletions consensus/consensus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func setupWithSeed(t *testing.T, seed int64) *testData {
params.CommitteeSize = 4

// to prevent triggering timers before starting the tests to avoid double entries for new heights in some tests.
getTime := util.RoundNow(params.BlockTimeInSecond).Add(time.Duration(params.BlockTimeInSecond) * time.Second)
getTime := util.RoundNow(params.BlockIntervalInSecond).Add(time.Duration(params.BlockIntervalInSecond) * time.Second)
genDoc := genesis.MakeGenesis(getTime, accs, vals, params)
stX, err := state.LoadOrNewState(genDoc, []crypto.Signer{signers[tIndexX]},
store.MockingStore(ts), txPool, nil)
Expand Down Expand Up @@ -138,7 +138,7 @@ func setupWithSeed(t *testing.T, seed int64) *testData {
broadcaster, newConcreteMediator())

// -------------------------------
// For better logging when testing
// Better logging during testing
overrideLogger := func(cons *consensus, name string) {
cons.logger = logger.NewSubLogger("_consensus",
&OverrideStringer{name: fmt.Sprintf("%s - %s: ", name, t.Name()), cons: cons})
Expand Down
2 changes: 1 addition & 1 deletion consensus/height.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type newHeightState struct {
}

func (s *newHeightState) enter() {
sleep := s.state.LastBlockTime().Add(s.state.BlockTime()).Sub(util.Now())
sleep := s.state.LastBlockTime().Add(s.state.Params().BlockInterval()).Sub(util.Now())
s.scheduleTimeout(sleep, s.height, s.round, tickerTargetNewHeight)
}

Expand Down
4 changes: 2 additions & 2 deletions consensus/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,15 @@ func TestManager(t *testing.T) {
acc := account.NewAccount(0)
acc.AddToBalance(21 * 1e14)
params := param.DefaultParams()
params.BlockTimeInSecond = 1
params.BlockIntervalInSecond = 1
vals := make([]*validator.Validator, 5)
for i, s := range committeeSigners {
val := validator.NewValidator(s.PublicKey().(*bls.PublicKey), int32(i))
vals[i] = val
}
accs := map[crypto.Address]*account.Account{crypto.TreasuryAddress: acc}
// to prevent triggering timers before starting the tests to avoid double entries for new heights in some tests.
getTime := util.RoundNow(params.BlockTimeInSecond).Add(time.Duration(params.BlockTimeInSecond) * time.Second)
getTime := util.RoundNow(params.BlockIntervalInSecond).Add(time.Duration(params.BlockIntervalInSecond) * time.Second)
genDoc := genesis.MakeGenesis(getTime, accs, vals, params)

rewardAddrs := []crypto.Address{
Expand Down
2 changes: 1 addition & 1 deletion genesis/genesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func TestMarshaling(t *testing.T) {
[]*validator.Validator{val}, param.DefaultParams())
gen2 := new(genesis.Genesis)

assert.Equal(t, gen1.Params().BlockTimeInSecond, 10)
assert.Equal(t, gen1.Params().BlockIntervalInSecond, 10)

bz, err := json.MarshalIndent(gen1, " ", " ")
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion genesis/testnet.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"genesis_time": "2023-05-08T00:00:00Z",
"params": {
"block_version": 63,
"block_time_in_second": 10,
"block_interval_in_second": 10,
"committee_size": 21,
"block_reward": 1000000000,
"transaction_to_live_interval": 2880,
Expand Down
26 changes: 13 additions & 13 deletions network/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,28 @@ import (

var _ Network = &MockNetwork{}

type BroadcastData struct {
type PublishData struct {
Data []byte
Target *lp2pcore.PeerID
}

type MockNetwork struct {
*testsuite.TestSuite

BroadcastCh chan BroadcastData
EventCh chan Event
ID peer.ID
OtherNets []*MockNetwork
SendError error
PublishCh chan PublishData
EventCh chan Event
ID peer.ID
OtherNets []*MockNetwork
SendError error
}

func MockingNetwork(ts *testsuite.TestSuite, id peer.ID) *MockNetwork {
return &MockNetwork{
TestSuite: ts,
BroadcastCh: make(chan BroadcastData, 100),
EventCh: make(chan Event, 100),
OtherNets: make([]*MockNetwork, 0),
ID: id,
TestSuite: ts,
PublishCh: make(chan PublishData, 100),
EventCh: make(chan Event, 100),
OtherNets: make([]*MockNetwork, 0),
ID: id,
}
}

Expand Down Expand Up @@ -63,15 +63,15 @@ func (mock *MockNetwork) SendTo(data []byte, pid lp2pcore.PeerID) error {
if mock.SendError != nil {
return mock.SendError
}
mock.BroadcastCh <- BroadcastData{
mock.PublishCh <- PublishData{
Data: data,
Target: &pid,
}
return nil
}

func (mock *MockNetwork) Broadcast(data []byte, _ TopicID) error {
mock.BroadcastCh <- BroadcastData{
mock.PublishCh <- PublishData{
Data: data,
Target: nil, // Send to all
}
Expand Down
1 change: 0 additions & 1 deletion state/facade.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ type Facade interface {
LastBlockHash() hash.Hash
LastBlockTime() time.Time
LastCertificate() *certificate.Certificate
BlockTime() time.Duration
UpdateLastCertificate(lastCertificate *certificate.Certificate) error
ProposeBlock(consSigner crypto.Signer, rewardAddr crypto.Address, round int16) (*block.Block, error)
ValidateBlock(block *block.Block) error
Expand Down
16 changes: 8 additions & 8 deletions state/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/pactus-project/pactus/types/tx"
"github.com/pactus-project/pactus/types/tx/payload"
"github.com/pactus-project/pactus/types/validator"
"github.com/pactus-project/pactus/util"
"github.com/pactus-project/pactus/util/errors"
"github.com/pactus-project/pactus/util/testsuite"
)
Expand All @@ -39,13 +38,14 @@ type MockState struct {

func MockingState(ts *testsuite.TestSuite) *MockState {
committee, _ := ts.GenerateTestCommittee(21)
genDoc := genesis.TestnetGenesis()
return &MockState{
ts: ts,
TestGenesis: genesis.TestnetGenesis(), // TODO: replace me with the Mainnet genesis
TestGenesis: genDoc,
TestStore: store.MockingStore(ts),
TestPool: txpool.MockingTxPool(),
TestCommittee: committee,
TestParams: param.DefaultParams(),
TestParams: genDoc.Params(),
}
}

Expand Down Expand Up @@ -77,7 +77,11 @@ func (m *MockState) LastBlockHash() hash.Hash {
}

func (m *MockState) LastBlockTime() time.Time {
return util.Now()
if len(m.TestStore.Blocks) > 0 {
return m.TestStore.Blocks[m.TestStore.LastHeight].Header().Time()
}

return m.Genesis().GenesisTime()
}

func (m *MockState) LastCertificate() *certificate.Certificate {
Expand All @@ -87,10 +91,6 @@ func (m *MockState) LastCertificate() *certificate.Certificate {
return m.TestStore.LastCert
}

func (m *MockState) BlockTime() time.Duration {
return time.Second
}

func (m *MockState) UpdateLastCertificate(cert *certificate.Certificate) error {
m.TestStore.LastCert = cert
return nil
Expand Down
15 changes: 4 additions & 11 deletions state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,13 +262,6 @@ func (st *state) LastCertificate() *certificate.Certificate {
return st.lastInfo.Certificate()
}

func (st *state) BlockTime() time.Duration {
st.lk.RLock()
defer st.lk.RUnlock()

return st.params.BlockTime()
}

func (st *state) UpdateLastCertificate(cert *certificate.Certificate) error {
st.lk.Lock()
defer st.lk.Unlock()
Expand Down Expand Up @@ -541,7 +534,7 @@ func (st *state) commitSandbox(sb sandbox.Sandbox, round int16) {
}

func (st *state) validateBlockTime(t time.Time) error {
if t.Second()%st.params.BlockTimeInSecond != 0 {
if t.Second()%st.params.BlockIntervalInSecond != 0 {
return errors.Errorf(errors.ErrInvalidBlock, "block time (%s) is not rounded", t.String())
}
if t.Before(st.lastInfo.BlockTime()) {
Expand All @@ -551,7 +544,7 @@ func (st *state) validateBlockTime(t time.Time) error {
return errors.Errorf(errors.ErrInvalidBlock, "block time (%s) is same as the last block time", t.String())
}
proposeTime := st.proposeNextBlockTime()
threshold := st.params.BlockTime()
threshold := st.params.BlockInterval()
if t.After(proposeTime.Add(threshold)) {
return errors.Errorf(errors.ErrInvalidBlock, "block time (%s) is more than threshold (%s)",
t.String(), proposeTime.String())
Expand All @@ -575,12 +568,12 @@ func (st *state) CommitteePower() int64 {
}

func (st *state) proposeNextBlockTime() time.Time {
timestamp := st.lastInfo.BlockTime().Add(st.params.BlockTime())
timestamp := st.lastInfo.BlockTime().Add(st.params.BlockInterval())

now := util.Now()
if now.After(timestamp.Add(1 * time.Second)) {
st.logger.Debug("it looks the last block had delay", "delay", now.Sub(timestamp))
timestamp = util.RoundNow(st.params.BlockTimeInSecond)
timestamp = util.RoundNow(st.params.BlockIntervalInSecond)
}
return timestamp
}
Expand Down
20 changes: 17 additions & 3 deletions state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,22 @@ func TestBlockSubsidyTx(t *testing.T) {
assert.Equal(t, trx.Payload().(*payload.TransferPayload).Receiver, rewardAddr)
}

func TestBlockTime(t *testing.T) {
td := setup(t)

t.Run("No blocks: LastBlockTime is the genesis time", func(t *testing.T) {
assert.Equal(t, td.state1.LastBlockTime(), td.state1.Genesis().GenesisTime())
})

t.Run("Commit one block: LastBlockTime is the time of the first block", func(t *testing.T) {
b1, c1 := td.makeBlockAndCertificate(t, 1, td.valSigner1, td.valSigner2, td.valSigner3)
assert.NoError(t, td.state1.CommitBlock(1, b1, c1))

assert.NotEqual(t, td.state1.LastBlockTime(), td.state1.Genesis().GenesisTime())
assert.Equal(t, td.state1.LastBlockTime(), b1.Header().Time())
})
}

func TestCommitBlocks(t *testing.T) {
td := setup(t)

Expand Down Expand Up @@ -482,7 +498,7 @@ func TestSortition(t *testing.T) {
func TestValidateBlockTime(t *testing.T) {
td := setup(t)

fmt.Printf("BlockTimeInSecond: %d\n", td.state1.params.BlockTimeInSecond)
fmt.Printf("BlockTimeInSecond: %d\n", td.state1.params.BlockIntervalInSecond)

// Time not rounded
roundedNow := util.RoundNow(10)
Expand Down Expand Up @@ -668,8 +684,6 @@ func TestLoadStateAfterChangingGenesis(t *testing.T) {
func TestSetBlockTime(t *testing.T) {
td := setup(t)

assert.Equal(t, td.state1.BlockTime(), 10*time.Second)

t.Run("Last block time is a bit far in past", func(t *testing.T) {
td.state1.lastInfo.UpdateBlockTime(util.RoundNow(10).Add(-20 * time.Second))
b, _ := td.state1.ProposeBlock(td.state1.signers[0], td.RandAddress(), 0)
Expand Down
2 changes: 1 addition & 1 deletion sync/bundle/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const (
BundleFlagCarrierLibP2P = 0x0010
BundleFlagCompressed = 0x0100
BundleFlagBroadcasted = 0x0200
BundleFlagHelloMessage = 0x0400
BundleFlagHandshaking = 0x0400
)

type Bundle struct {
Expand Down
5 changes: 0 additions & 5 deletions sync/bundle/message/blocks_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,6 @@ import (
"github.com/pactus-project/pactus/util/errors"
)

const (
LatestBlocksResponseCodeOK = 0
LatestBlocksResponseCodeNoMoreBlock = 1
)

type BlocksResponseMessage struct {
ResponseCode ResponseCode `cbor:"1,keyasint"`
SessionID int `cbor:"2,keyasint"`
Expand Down
27 changes: 10 additions & 17 deletions sync/bundle/message/blocks_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,54 +56,47 @@ func TestBlocksResponseMessage(t *testing.T) {
func TestLatestBlocksResponseCode(t *testing.T) {
ts := testsuite.NewTestSuite(t)

t.Run("busy", func(t *testing.T) {
m := NewBlocksResponseMessage(ResponseCodeRejected, ResponseCodeRejected.String(), 1, 0, nil, nil)

assert.NoError(t, m.BasicCheck())
assert.Zero(t, m.From)
assert.Zero(t, m.To())
assert.Zero(t, m.Count())
assert.True(t, m.IsRequestRejected())
assert.Equal(t, m.Reason, ResponseCodeRejected.String())
})

t.Run("rejected", func(t *testing.T) {
m := NewBlocksResponseMessage(ResponseCodeRejected, ResponseCodeRejected.String(), 1, 0, nil, nil)
reason := ts.RandString(16)
m := NewBlocksResponseMessage(ResponseCodeRejected, reason, 1, 0, nil, nil)

assert.NoError(t, m.BasicCheck())
assert.Zero(t, m.From)
assert.Zero(t, m.To())
assert.Zero(t, m.Count())
assert.True(t, m.IsRequestRejected())
assert.Equal(t, m.Reason, ResponseCodeRejected.String())
assert.Equal(t, m.Reason, reason)
})

t.Run("OK - MoreBlocks", func(t *testing.T) {
b1 := ts.GenerateTestBlock(nil)
b2 := ts.GenerateTestBlock(nil)
d1, _ := b1.Bytes()
d2, _ := b2.Bytes()
m := NewBlocksResponseMessage(ResponseCodeMoreBlocks, ResponseCodeMoreBlocks.String(), 1, 100, [][]byte{d1, d2}, nil)
reason := ts.RandString(16)
m := NewBlocksResponseMessage(ResponseCodeMoreBlocks, reason, 1, 100, [][]byte{d1, d2}, nil)

assert.NoError(t, m.BasicCheck())
assert.Equal(t, m.From, uint32(100))
assert.Equal(t, m.To(), uint32(101))
assert.Equal(t, m.Count(), uint32(2))
assert.Zero(t, m.LastCertificateHeight())
assert.False(t, m.IsRequestRejected())
assert.Equal(t, m.Reason, ResponseCodeMoreBlocks.String())
assert.Equal(t, m.Reason, reason)
})

t.Run("OK - Synced", func(t *testing.T) {
cert := ts.GenerateTestCertificate()

m := NewBlocksResponseMessage(ResponseCodeSynced, ResponseCodeSynced.String(), 1, 100, nil, cert)
reason := ts.RandString(16)
m := NewBlocksResponseMessage(ResponseCodeSynced, reason, 1, 100, nil, cert)

assert.NoError(t, m.BasicCheck())
assert.Equal(t, m.From, uint32(100))
assert.Zero(t, m.To())
assert.Zero(t, m.Count())
assert.Equal(t, m.LastCertificateHeight(), uint32(100))
assert.False(t, m.IsRequestRejected())
assert.Equal(t, m.Reason, ResponseCodeSynced.String())
assert.Equal(t, m.Reason, reason)
})
}
Loading

0 comments on commit 6c53a36

Please sign in to comment.