Skip to content

Commit

Permalink
perf(chainwatch): parallelize miner processing
Browse files Browse the repository at this point in the history
  • Loading branch information
frrist committed Aug 28, 2020
1 parent a7ef612 commit 0185090
Showing 1 changed file with 151 additions and 136 deletions.
287 changes: 151 additions & 136 deletions cmd/lotus-chainwatch/processor/miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,100 +318,107 @@ func (p *Processor) storeMinerPreCommitInfo(ctx context.Context, miners []minerA
return xerrors.Errorf("Failed to prepare miner precommit info statement: %w", err)
}

grp, _ := errgroup.WithContext(ctx)
for _, m := range miners {
minerSectors, err := adt.AsArray(p.ctxStore, m.state.Sectors)
if err != nil {
return err
}

changes, err := p.getMinerPreCommitChanges(ctx, m)
if err != nil {
if strings.Contains(err.Error(), types.ErrActorNotFound.Error()) {
continue
} else {
m := m
grp.Go(func() error {
minerSectors, err := adt.AsArray(p.ctxStore, m.state.Sectors)
if err != nil {
return err
}
}
if changes == nil {
continue
}

preCommitAdded := make([]uint64, len(changes.Added))
for i, added := range changes.Added {
if len(added.Info.DealIDs) > 0 {
sectorDeals <- &SectorDealEvent{
MinerID: m.common.addr,
SectorID: uint64(added.Info.SectorNumber),
DealIDs: added.Info.DealIDs,
changes, err := p.getMinerPreCommitChanges(ctx, m)
if err != nil {
if strings.Contains(err.Error(), types.ErrActorNotFound.Error()) {
return nil
}
return err
}
if added.Info.ReplaceCapacity {
if _, err := stmt.Exec(
m.common.addr.String(),
added.Info.SectorNumber,
added.Info.SealedCID.String(),
m.common.stateroot.String(),
added.Info.SealRandEpoch,
added.Info.Expiration,
added.PreCommitDeposit.String(),
added.PreCommitEpoch,
added.DealWeight.String(),
added.VerifiedDealWeight.String(),
added.Info.ReplaceCapacity,
added.Info.ReplaceSectorDeadline,
added.Info.ReplaceSectorPartition,
added.Info.ReplaceSectorNumber,
); err != nil {
return err
}
} else {
if _, err := stmt.Exec(
m.common.addr.String(),
added.Info.SectorNumber,
added.Info.SealedCID.String(),
m.common.stateroot.String(),
added.Info.SealRandEpoch,
added.Info.Expiration,
added.PreCommitDeposit.String(),
added.PreCommitEpoch,
added.DealWeight.String(),
added.VerifiedDealWeight.String(),
added.Info.ReplaceCapacity,
nil, // replace deadline
nil, // replace partition
nil, // replace sector
); err != nil {
return err
if changes == nil {
return nil
}

preCommitAdded := make([]uint64, len(changes.Added))
for i, added := range changes.Added {
if len(added.Info.DealIDs) > 0 {
sectorDeals <- &SectorDealEvent{
MinerID: m.common.addr,
SectorID: uint64(added.Info.SectorNumber),
DealIDs: added.Info.DealIDs,
}
}
if added.Info.ReplaceCapacity {
if _, err := stmt.Exec(
m.common.addr.String(),
added.Info.SectorNumber,
added.Info.SealedCID.String(),
m.common.stateroot.String(),
added.Info.SealRandEpoch,
added.Info.Expiration,
added.PreCommitDeposit.String(),
added.PreCommitEpoch,
added.DealWeight.String(),
added.VerifiedDealWeight.String(),
added.Info.ReplaceCapacity,
added.Info.ReplaceSectorDeadline,
added.Info.ReplaceSectorPartition,
added.Info.ReplaceSectorNumber,
); err != nil {
return err
}
} else {
if _, err := stmt.Exec(
m.common.addr.String(),
added.Info.SectorNumber,
added.Info.SealedCID.String(),
m.common.stateroot.String(),
added.Info.SealRandEpoch,
added.Info.Expiration,
added.PreCommitDeposit.String(),
added.PreCommitEpoch,
added.DealWeight.String(),
added.VerifiedDealWeight.String(),
added.Info.ReplaceCapacity,
nil, // replace deadline
nil, // replace partition
nil, // replace sector
); err != nil {
return err
}

}
preCommitAdded[i] = uint64(added.Info.SectorNumber)
}
preCommitAdded[i] = uint64(added.Info.SectorNumber)
}
if len(preCommitAdded) > 0 {
sectorEvents <- &MinerSectorsEvent{
MinerID: m.common.addr,
StateRoot: m.common.stateroot,
SectorIDs: preCommitAdded,
Event: PreCommitAdded,
if len(preCommitAdded) > 0 {
sectorEvents <- &MinerSectorsEvent{
MinerID: m.common.addr,
StateRoot: m.common.stateroot,
SectorIDs: preCommitAdded,
Event: PreCommitAdded,
}
}
}
var preCommitExpired []uint64
for _, removed := range changes.Removed {
var sector miner.SectorOnChainInfo
if found, err := minerSectors.Get(uint64(removed.Info.SectorNumber), &sector); err != nil {
return err
} else if !found {
preCommitExpired = append(preCommitExpired, uint64(removed.Info.SectorNumber))
var preCommitExpired []uint64
for _, removed := range changes.Removed {
var sector miner.SectorOnChainInfo
if found, err := minerSectors.Get(uint64(removed.Info.SectorNumber), &sector); err != nil {
return err
} else if !found {
preCommitExpired = append(preCommitExpired, uint64(removed.Info.SectorNumber))
}
}
}
if len(preCommitExpired) > 0 {
sectorEvents <- &MinerSectorsEvent{
MinerID: m.common.addr,
StateRoot: m.common.stateroot,
SectorIDs: preCommitExpired,
Event: PreCommitExpired,
if len(preCommitExpired) > 0 {
sectorEvents <- &MinerSectorsEvent{
MinerID: m.common.addr,
StateRoot: m.common.stateroot,
SectorIDs: preCommitExpired,
Event: PreCommitExpired,
}
}
}
return nil
})
}
if err := grp.Wait(); err != nil {
return err
}

if err := stmt.Close(); err != nil {
Expand Down Expand Up @@ -443,67 +450,75 @@ func (p *Processor) storeMinerSectorInfo(ctx context.Context, miners []minerActo
return xerrors.Errorf("Failed to prepare miner sector info statement: %w", err)
}

grp, _ := errgroup.WithContext(ctx)
for _, m := range miners {
changes, err := p.getMinerSectorChanges(ctx, m)
if err != nil {
if strings.Contains(err.Error(), types.ErrActorNotFound.Error()) {
continue
} else {
m := m
grp.Go(func() error {
changes, err := p.getMinerSectorChanges(ctx, m)
if err != nil {
if strings.Contains(err.Error(), types.ErrActorNotFound.Error()) {
return nil
}
return err
}
}
if changes == nil {
continue
}
var sectorsAdded []uint64
var ccAdded []uint64
var extended []uint64
for _, added := range changes.Added {
// add the sector to the table
if _, err := stmt.Exec(
m.common.addr.String(),
added.SectorNumber,
added.SealedCID.String(),
m.common.stateroot.String(),
added.Activation.String(),
added.Expiration.String(),
added.DealWeight.String(),
added.VerifiedDealWeight.String(),
added.InitialPledge.String(),
added.ExpectedDayReward.String(),
added.ExpectedStoragePledge.String(),
); err != nil {
return err
if changes == nil {
return nil
}
if len(added.DealIDs) == 0 {
ccAdded = append(ccAdded, uint64(added.SectorNumber))
} else {
sectorsAdded = append(sectorsAdded, uint64(added.SectorNumber))
var sectorsAdded []uint64
var ccAdded []uint64
var extended []uint64
for _, added := range changes.Added {
// add the sector to the table
if _, err := stmt.Exec(
m.common.addr.String(),
added.SectorNumber,
added.SealedCID.String(),
m.common.stateroot.String(),
added.Activation.String(),
added.Expiration.String(),
added.DealWeight.String(),
added.VerifiedDealWeight.String(),
added.InitialPledge.String(),
added.ExpectedDayReward.String(),
added.ExpectedStoragePledge.String(),
); err != nil {
log.Errorw("writing miner sector changes statement", "error", err.Error())
}
if len(added.DealIDs) == 0 {
ccAdded = append(ccAdded, uint64(added.SectorNumber))
} else {
sectorsAdded = append(sectorsAdded, uint64(added.SectorNumber))
}
}
}

for _, mod := range changes.Extended {
extended = append(extended, uint64(mod.To.SectorNumber))
}
for _, mod := range changes.Extended {
extended = append(extended, uint64(mod.To.SectorNumber))
}

events <- &MinerSectorsEvent{
MinerID: m.common.addr,
StateRoot: m.common.stateroot,
SectorIDs: ccAdded,
Event: CommitCapacityAdded,
}
events <- &MinerSectorsEvent{
MinerID: m.common.addr,
StateRoot: m.common.stateroot,
SectorIDs: sectorsAdded,
Event: SectorAdded,
}
events <- &MinerSectorsEvent{
MinerID: m.common.addr,
StateRoot: m.common.stateroot,
SectorIDs: extended,
Event: SectorExtended,
}
events <- &MinerSectorsEvent{
MinerID: m.common.addr,
StateRoot: m.common.stateroot,
SectorIDs: ccAdded,
Event: CommitCapacityAdded,
}
events <- &MinerSectorsEvent{
MinerID: m.common.addr,
StateRoot: m.common.stateroot,
SectorIDs: sectorsAdded,
Event: SectorAdded,
}
events <- &MinerSectorsEvent{
MinerID: m.common.addr,
StateRoot: m.common.stateroot,
SectorIDs: extended,
Event: SectorExtended,
}
return nil
})
}

if err := grp.Wait(); err != nil {
return err
}

if err := stmt.Close(); err != nil {
Expand Down

0 comments on commit 0185090

Please sign in to comment.