Skip to content
This repository has been archived by the owner on Apr 5, 2023. It is now read-only.

Commit

Permalink
Merge pull request #117 from bstasyszyn/pvtdatapull
Browse files Browse the repository at this point in the history
perf: Performance improvement for pulling private data
  • Loading branch information
fqutishat authored Aug 21, 2019
2 parents 25f5996 + ce6215c commit 5941d21
Showing 1 changed file with 46 additions and 36 deletions.
82 changes: 46 additions & 36 deletions gossip/privdata/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,8 @@ func (p *puller) fetchPrivateData(dig2Filter digestToFilterMapping) (*privdataco
Collection: resp.Digest.Collection,
})
itemsLeftToCollect--
res.AvailableElements = append(res.AvailableElements, resp)
}
res.AvailableElements = append(res.AvailableElements, responses...)
}
return res, nil
}
Expand Down Expand Up @@ -616,51 +616,61 @@ func (p *puller) purgedFilter(dig privdatacommon.DigKey) (filter.RoutingFilter,
func (p *puller) filterNotEligible(dig2rwSets Dig2PvtRWSetWithConfig, shouldCheckLatestConfig bool, signedData protoutil.SignedData, endpoint string) []*protosgossip.PvtDataElement {
var returned []*protosgossip.PvtDataElement
for d, rwSets := range dig2rwSets {
if rwSets == nil {
logger.Errorf("No private rwset for [%s] channel, chaincode [%s], collection [%s], txID = [%s] is available, skipping...",
p.channel, d.Namespace, d.Collection, d.TxId)
continue
}
logger.Debug("Found", len(rwSets.RWSet), "for TxID", d.TxId, ", collection", d.Collection, "for", endpoint)
if len(rwSets.RWSet) == 0 {
continue
}

eligibleForCollection := shouldCheckLatestConfig && p.isEligibleByLatestConfig(p.channel, d.Collection, d.Namespace, signedData)

if !eligibleForCollection {
colAP, err := p.AccessPolicy(rwSets.CollectionConfig, p.channel)
if err != nil {
logger.Debug("No policy found for channel", p.channel, ", collection", d.Collection, "txID", d.TxId, ":", err, "skipping...")
continue
}
colFilter := colAP.AccessFilter()
if colFilter == nil {
logger.Debug("Collection ", d.Collection, " has no access filter, txID", d.TxId, "skipping...")
continue
}
eligibleForCollection = colFilter(signedData)
digest := &protosgossip.PvtDataDigest{
TxId: d.TxId,
BlockSeq: d.BlockSeq,
Collection: d.Collection,
Namespace: d.Namespace,
SeqInBlock: d.SeqInBlock,
}

if !eligibleForCollection {
logger.Debug("Peer", endpoint, "isn't eligible for txID", d.TxId, "at collection", d.Collection)
continue
var payload [][]byte
if err := p.validatePvtRWSetsForEndpoint(digest, rwSets, shouldCheckLatestConfig, signedData); err != nil {
logger.Debugf("Skipping R/W set for channel [%s], chaincode [%s], collection [%s], txID = [%s], endpoint [%s]. Reason: %s",
p.channel, d.Namespace, d.Collection, d.TxId, endpoint, err)
} else {
logger.Debug("Found", len(rwSets.RWSet), "for TxID", d.TxId, ", collection", d.Collection, "for", endpoint)
payload = util.PrivateRWSets(rwSets.RWSet...)
}

// Return a response regardless of whether the payload is empty so that the
// endpoint doesn't need to time out waiting
returned = append(returned, &protosgossip.PvtDataElement{
Digest: &protosgossip.PvtDataDigest{
TxId: d.TxId,
BlockSeq: d.BlockSeq,
Collection: d.Collection,
Namespace: d.Namespace,
SeqInBlock: d.SeqInBlock,
},
Payload: util.PrivateRWSets(rwSets.RWSet...),
Digest: digest,
Payload: payload,
})
}
return returned
}

func (p *puller) validatePvtRWSetsForEndpoint(d *protosgossip.PvtDataDigest, rwSets *util.PrivateRWSetWithConfig, shouldCheckLatestConfig bool, signedData protoutil.SignedData) error {
if rwSets == nil {
return errors.New("RW sets is nil")
}
if len(rwSets.RWSet) == 0 {
return errors.New("No private rwsets")
}

eligibleForCollection := shouldCheckLatestConfig && p.isEligibleByLatestConfig(p.channel, d.Collection, d.Namespace, signedData)
if !eligibleForCollection {
colAP, err := p.AccessPolicy(rwSets.CollectionConfig, p.channel)
if err != nil {
return errors.New("No policy")
}
colFilter := colAP.AccessFilter()
if colFilter == nil {
return errors.New("No access filter")
}
eligibleForCollection = colFilter(signedData)
}

if !eligibleForCollection {
return errors.New("Peer isn't eligible for collection")
}

return nil
}

func (p *puller) isEligibleByLatestConfig(channel string, collection string, chaincode string, signedData protoutil.SignedData) bool {
cc := fcommon.CollectionCriteria{
Channel: channel,
Expand Down

0 comments on commit 5941d21

Please sign in to comment.