Skip to content

Commit

Permalink
fix ulc sunc
Browse files Browse the repository at this point in the history
  • Loading branch information
b00ris committed Apr 16, 2018
1 parent c1f6710 commit bf691f9
Show file tree
Hide file tree
Showing 15 changed files with 292 additions and 140 deletions.
20 changes: 19 additions & 1 deletion les/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,28 @@ func New(ctx *node.ServiceContext, config *eth.Config) (*LightEthereum, error) {
}

leth.txPool = light.NewTxPool(leth.chainConfig, leth.blockchain, leth.relay)
if leth.protocolManager, err = NewProtocolManager(leth.chainConfig, true, ClientProtocolVersions, config.NetworkId, leth.eventMux, leth.engine, leth.peers, leth.blockchain, nil, chainDb, leth.odr, leth.relay, quitSync, &leth.wg, config.ULC); err != nil {

if leth.protocolManager, err = NewProtocolManager(
leth.chainConfig,
true,
ClientProtocolVersions,
config.NetworkId,
leth.eventMux,
leth.engine,
leth.peers,
leth.blockchain,
nil,
chainDb,
leth.odr,
leth.relay,
quitSync,
&leth.wg,
config.ULC); err != nil {
return nil, err
}

leth.ApiBackend = &LesApiBackend{leth, nil}

gpoParams := config.GPO
if gpoParams.Default == nil {
gpoParams.Default = config.GasPrice
Expand Down
28 changes: 19 additions & 9 deletions les/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"sync"
"time"

"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/mclock"
"github.com/ethereum/go-ethereum/consensus"
Expand Down Expand Up @@ -58,7 +59,7 @@ type lightFetcher struct {
requestChn chan bool // true if initiated from outside
}

//lightChain
//lightChain - light.LightChain interface
type lightChain interface {
BlockChain
LockChain()
Expand Down Expand Up @@ -134,6 +135,7 @@ func newLightFetcher(pm *ProtocolManager) *lightFetcher {

// syncLoop is the main event loop of the light fetcher
func (f *lightFetcher) syncLoop() {
log.Warn("lightFetcher syncLoop started")
requesting := false
defer f.pm.wg.Done()
for {
Expand All @@ -150,6 +152,7 @@ func (f *lightFetcher) syncLoop() {
rq *distReq
reqID uint64
)

if !f.syncing && !(newAnnounce && s) {
rq, reqID = f.nextRequest()
}
Expand Down Expand Up @@ -405,10 +408,10 @@ func (f *lightFetcher) nextRequest() (*distReq, uint64) {
bestTd *big.Int
bestSyncing bool
)
if f.pm == nil || f.pm.server == nil || f.pm.server.ulc == nil {
bestHash, bestAmount, bestTd, bestSyncing = f.itFindBestValuesForLes()
if f.pm == nil || f.pm.ulc == nil {
bestHash, bestAmount, bestTd, bestSyncing = f.findBestValuesForLes()
} else {
bestHash, bestAmount, bestTd, bestSyncing = f.itFindBestValuesForULC()
bestHash, bestAmount, bestTd, bestSyncing = f.findBestValuesForULC()
}

if bestTd == f.maxConfirmedTd {
Expand All @@ -427,7 +430,7 @@ func (f *lightFetcher) nextRequest() (*distReq, uint64) {
return rq, reqID
}

func (f *lightFetcher) itFindBestValuesForLes() (bestHash common.Hash, bestAmount uint64, bestTd *big.Int, bestSyncing bool) {
func (f *lightFetcher) findBestValuesForLes() (bestHash common.Hash, bestAmount uint64, bestTd *big.Int, bestSyncing bool) {
bestTd = f.maxConfirmedTd
bestSyncing = false

Expand All @@ -447,13 +450,14 @@ func (f *lightFetcher) itFindBestValuesForLes() (bestHash common.Hash, bestAmoun
return
}

func (f *lightFetcher) itFindBestValuesForULC() (bestHash common.Hash, bestAmount uint64, bestTd *big.Int, bestSyncing bool) {
func (f *lightFetcher) findBestValuesForULC() (bestHash common.Hash, bestAmount uint64, bestTd *big.Int, bestSyncing bool) {
bestTd = f.maxConfirmedTd
bestSyncing = false

for p, fp := range f.peers {
for hash, n := range fp.nodeByHash {
if _, ok := f.pm.server.ulc.trusted[p.id]; !ok {
if f.pm.ulc.isTrusted(p.ID()) == false {
log.Warn(fmt.Sprintf("f.pm.ulc.isTrusted(p.ID()) == false "))
continue
}

Expand All @@ -462,7 +466,7 @@ func (f *lightFetcher) itFindBestValuesForULC() (bestHash common.Hash, bestAmoun
}

amount := f.requestAmount(p, n)
if (bestTd == nil || n.td.Cmp(bestTd) > 0) && f.checkTrusted(hash, f.pm.server.ulc.minTrustedFraction) {
if (bestTd == nil || n.td.Cmp(bestTd) > 0) && f.checkTrusted(hash, f.pm.ulc.minTrustedFraction) {
bestHash = hash
bestTd = n.td
bestAmount = amount
Expand All @@ -472,6 +476,8 @@ func (f *lightFetcher) itFindBestValuesForULC() (bestHash common.Hash, bestAmoun
}
return
}

// checkTrusted - check num of agreed peer for hash.
func (f *lightFetcher) checkTrusted(hash common.Hash, minTrustedFraction int) bool {
numPeers := len(f.peers)
var numAgreed int
Expand All @@ -485,10 +491,13 @@ func (f *lightFetcher) checkTrusted(hash common.Hash, minTrustedFraction int) bo

return checkTrustedFractionBarrier(numAgreed, numPeers, minTrustedFraction)
}

// checkTrustedFractionBarrier - checks is numAgreed/numPeers more minTrustedFraction.
func checkTrustedFractionBarrier(numAgreed, numPeers, minTrustedFraction int) bool {
return 100*numAgreed/numPeers > minTrustedFraction
}

// newFetcherDistReqForSync creates distReq for sync
func (f *lightFetcher) newFetcherDistReqForSync(bestHash common.Hash) *distReq {
return &distReq{
getCost: func(dp distPeer) uint64 {
Expand Down Expand Up @@ -517,6 +526,7 @@ func (f *lightFetcher) newFetcherDistReqForSync(bestHash common.Hash) *distReq {

}

// newFetcherDistReq creates distReq for
func (f *lightFetcher) newFetcherDistReq(bestHash common.Hash, reqID uint64, bestAmount uint64) *distReq {
return &distReq{
getCost: func(dp distPeer) uint64 {
Expand Down Expand Up @@ -582,7 +592,7 @@ func (f *lightFetcher) processResponse(req fetchRequest, resp fetchResponse) boo
}

checkFreq := 1
if f.pm.server.ulc != nil && len(f.pm.server.ulc.trusted) > 0 {
if f.pm.ulc != nil && len(f.pm.ulc.trustedKeys) > 0 {
checkFreq = 0
}
if _, err := f.chain.InsertHeaderChain(headers, checkFreq); err != nil {
Expand Down
75 changes: 45 additions & 30 deletions les/fetcher_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,28 @@
package les

import (
"crypto/rand"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/discover"
"math/big"
"testing"
)

func TestFetcher_ULC_Peer_Selector(t *testing.T) {
func TestFetcherULCPeerSelector(t *testing.T) {

var (
id1 discover.NodeID
id2 discover.NodeID
id3 discover.NodeID
id4 discover.NodeID
)
rand.Read(id1[:])
rand.Read(id2[:])
rand.Read(id3[:])
rand.Read(id4[:])

ftn1 := &fetcherTreeNode{
hash: common.StringToHash("1"),
td: big.NewInt(1),
Expand All @@ -24,38 +39,40 @@ func TestFetcher_ULC_Peer_Selector(t *testing.T) {
}
lf := lightFetcher{
pm: &ProtocolManager{
server: &LesServer{
ulc: &ulc{
trusted: map[string]struct{}{
"peer1": {},
"peer2": {},
"peer3": {},
"peer4": {},
},
minTrustedFraction: 70,
ulc: &ulc{
trustedKeys: map[string]struct{}{
id1.String(): {},
id2.String(): {},
id3.String(): {},
id4.String(): {},
},
minTrustedFraction: 70,
},
},
maxConfirmedTd: ftn1.td,

peers: map[*peer]*fetcherPeerInfo{
{
id: "peer1",
id: "peer1",
Peer: p2p.NewPeer(id1, "peer1", []p2p.Cap{}),
}: {
nodeByHash: map[common.Hash]*fetcherTreeNode{
ftn1.hash: ftn1,
ftn2.hash: ftn2,
},
},
{
id: "peer2",
Peer: p2p.NewPeer(id2, "peer2", []p2p.Cap{}),
id: "peer2",
}: {
nodeByHash: map[common.Hash]*fetcherTreeNode{
ftn1.hash: ftn1,
ftn2.hash: ftn2,
},
},
{
id: "peer3",
id: "peer3",
Peer: p2p.NewPeer(id3, "peer3", []p2p.Cap{}),
}: {
nodeByHash: map[common.Hash]*fetcherTreeNode{
ftn1.hash: ftn1,
Expand All @@ -64,7 +81,8 @@ func TestFetcher_ULC_Peer_Selector(t *testing.T) {
},
},
{
id: "peer4",
id: "peer4",
Peer: p2p.NewPeer(id4, "peer4", []p2p.Cap{}),
}: {
nodeByHash: map[common.Hash]*fetcherTreeNode{
ftn1.hash: ftn1,
Expand All @@ -80,11 +98,12 @@ func TestFetcher_ULC_Peer_Selector(t *testing.T) {
},
},
}
bestHash, bestAmount, bestTD, sync := lf.itFindBestValuesForULC()
bestHash, bestAmount, bestTD, sync := lf.findBestValuesForULC()

if bestTD == nil {
t.Fatal("Empty result")
}

if bestTD.Cmp(ftn2.td) != 0 {
t.Fatal("bad td", bestTD)
}
Expand All @@ -95,20 +114,18 @@ func TestFetcher_ULC_Peer_Selector(t *testing.T) {
_, _ = bestAmount, sync
}

func TestFetcher_ProcessResponse_DisablePowValidation_Success(t *testing.T) {
func TestFetcherProcessResponseDisablePowValidation(t *testing.T) {
header := &types.Header{Number: big.NewInt(1)}
lf := lightFetcher{
pm: &ProtocolManager{
server: &LesServer{
ulc: &ulc{
trusted: map[string]struct{}{
"peer1": {},
"peer2": {},
"peer3": {},
"peer4": {},
},
minTrustedFraction: 70,
ulc: &ulc{
trustedKeys: map[string]struct{}{
"peer1": {},
"peer2": {},
"peer3": {},
"peer4": {},
},
minTrustedFraction: 70,
},
},
chain: &lightChainStub{
Expand All @@ -129,14 +146,12 @@ func TestFetcher_ProcessResponse_DisablePowValidation_Success(t *testing.T) {
lf.processResponse(fetchRequest{amount: 1, hash: header.Hash()}, fetchResponse{headers: []*types.Header{header}})
}

func TestFetcher_ProcessResponse_DisablePowValidation_Fail(t *testing.T) {
func TestFetcherProcessResponseDisablePowValidationFail(t *testing.T) {
header := &types.Header{Number: big.NewInt(1)}
lf := lightFetcher{
pm: &ProtocolManager{
server: &LesServer{
ulc: &ulc{
trusted: map[string]struct{}{},
},
ulc: &ulc{
trustedKeys: map[string]struct{}{},
},
},
chain: &lightChainStub{
Expand Down
31 changes: 27 additions & 4 deletions les/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,28 @@ type ProtocolManager struct {

// wait group is used for graceful shutdowns during downloading
// and processing
wg *sync.WaitGroup
wg *sync.WaitGroup
ulc *ulc
}

// NewProtocolManager returns a new ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
// with the ethereum network.
func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, protocolVersions []uint, networkId uint64, mux *event.TypeMux, engine consensus.Engine, peers *peerSet, blockchain BlockChain, txpool txPool, chainDb ethdb.Database, odr *LesOdr, txrelay *LesTxRelay, quitSync chan struct{}, wg *sync.WaitGroup, ulcConfig *eth.ULCConfig) (*ProtocolManager, error) {
func NewProtocolManager(
chainConfig *params.ChainConfig,
lightSync bool,
protocolVersions []uint,
networkId uint64,
mux *event.TypeMux,
engine consensus.Engine,
peers *peerSet,
blockchain BlockChain,
txpool txPool,
chainDb ethdb.Database,
odr *LesOdr,
txrelay *LesTxRelay,
quitSync chan struct{},
wg *sync.WaitGroup,
ulcConfig *eth.ULCConfig) (*ProtocolManager, error) {
// Create the protocol manager with the base fields
manager := &ProtocolManager{
lightSync: lightSync,
Expand All @@ -152,7 +168,7 @@ func NewProtocolManager(chainConfig *params.ChainConfig, lightSync bool, protoco
}

if ulcConfig != nil {
manager.server = &LesServer{ulc: newULC(ulcConfig)}
manager.ulc = newULC(ulcConfig)
}

// Initiate a sub-protocol for every implemented version we can handle
Expand Down Expand Up @@ -259,7 +275,11 @@ func (pm *ProtocolManager) Stop() {
}

func (pm *ProtocolManager) newPeer(pv int, nv uint64, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
return newPeer(pv, nv, p, newMeteredMsgWriter(rw))
var isTrusted bool
if pm.ulc != nil {
isTrusted = pm.ulc.isTrusted(p.ID())
}
return newPeer(pv, nv, isTrusted, p, newMeteredMsgWriter(rw))
}

// handle is the callback invoked to manage the life cycle of a les peer. When
Expand All @@ -284,9 +304,11 @@ func (pm *ProtocolManager) handle(p *peer) error {
p.Log().Debug("Light Ethereum handshake failed", "err", err)
return err
}

if rw, ok := p.rw.(*meteredMsgReadWriter); ok {
rw.Init(p.version)
}

// Register the peer locally
if err := pm.peers.Register(p); err != nil {
p.Log().Error("Light Ethereum peer registration failed", "err", err)
Expand All @@ -298,6 +320,7 @@ func (pm *ProtocolManager) handle(p *peer) error {
}
pm.removePeer(p.id)
}()

// Register the peer in the downloader. If the downloader considers it banned, we disconnect
if pm.lightSync {
p.lock.Lock()
Expand Down
Loading

0 comments on commit bf691f9

Please sign in to comment.