diff --git a/cmd/lotus-chainwatch/processor/miner.go b/cmd/lotus-chainwatch/processor/miner.go index 7973d3c4220..2338994f90d 100644 --- a/cmd/lotus-chainwatch/processor/miner.go +++ b/cmd/lotus-chainwatch/processor/miner.go @@ -318,100 +318,108 @@ func (p *Processor) storeMinerPreCommitInfo(ctx context.Context, miners []minerA return xerrors.Errorf("Failed to prepare miner precommit info statement: %w", err) } + grp, _ := errgroup.WithContext(ctx) for _, m := range miners { - minerSectors, err := adt.AsArray(p.ctxStore, m.state.Sectors) - if err != nil { - return err - } - - changes, err := p.getMinerPreCommitChanges(ctx, m) - if err != nil { - if strings.Contains(err.Error(), types.ErrActorNotFound.Error()) { - continue - } else { + m := m + grp.Go(func() error { + minerSectors, err := adt.AsArray(p.ctxStore, m.state.Sectors) + if err != nil { return err } - } - if changes == nil { - continue - } - preCommitAdded := make([]uint64, len(changes.Added)) - for i, added := range changes.Added { - if len(added.Info.DealIDs) > 0 { - sectorDeals <- &SectorDealEvent{ - MinerID: m.common.addr, - SectorID: uint64(added.Info.SectorNumber), - DealIDs: added.Info.DealIDs, - } - } - if added.Info.ReplaceCapacity { - if _, err := stmt.Exec( - m.common.addr.String(), - added.Info.SectorNumber, - added.Info.SealedCID.String(), - m.common.stateroot.String(), - added.Info.SealRandEpoch, - added.Info.Expiration, - added.PreCommitDeposit.String(), - added.PreCommitEpoch, - added.DealWeight.String(), - added.VerifiedDealWeight.String(), - added.Info.ReplaceCapacity, - added.Info.ReplaceSectorDeadline, - added.Info.ReplaceSectorPartition, - added.Info.ReplaceSectorNumber, - ); err != nil { + changes, err := p.getMinerPreCommitChanges(ctx, m) + if err != nil { + if strings.Contains(err.Error(), types.ErrActorNotFound.Error()) { + return nil + } else { return err } - } else { - if _, err := stmt.Exec( - m.common.addr.String(), - added.Info.SectorNumber, - added.Info.SealedCID.String(), - m.common.stateroot.String(), - added.Info.SealRandEpoch, - added.Info.Expiration, - added.PreCommitDeposit.String(), - added.PreCommitEpoch, - added.DealWeight.String(), - added.VerifiedDealWeight.String(), - added.Info.ReplaceCapacity, - nil, // replace deadline - nil, // replace partition - nil, // replace sector - ); err != nil { - return err + } + if changes == nil { + return nil + } + + preCommitAdded := make([]uint64, len(changes.Added)) + for i, added := range changes.Added { + if len(added.Info.DealIDs) > 0 { + sectorDeals <- &SectorDealEvent{ + MinerID: m.common.addr, + SectorID: uint64(added.Info.SectorNumber), + DealIDs: added.Info.DealIDs, + } } + if added.Info.ReplaceCapacity { + if _, err := stmt.Exec( + m.common.addr.String(), + added.Info.SectorNumber, + added.Info.SealedCID.String(), + m.common.stateroot.String(), + added.Info.SealRandEpoch, + added.Info.Expiration, + added.PreCommitDeposit.String(), + added.PreCommitEpoch, + added.DealWeight.String(), + added.VerifiedDealWeight.String(), + added.Info.ReplaceCapacity, + added.Info.ReplaceSectorDeadline, + added.Info.ReplaceSectorPartition, + added.Info.ReplaceSectorNumber, + ); err != nil { + return err + } + } else { + if _, err := stmt.Exec( + m.common.addr.String(), + added.Info.SectorNumber, + added.Info.SealedCID.String(), + m.common.stateroot.String(), + added.Info.SealRandEpoch, + added.Info.Expiration, + added.PreCommitDeposit.String(), + added.PreCommitEpoch, + added.DealWeight.String(), + added.VerifiedDealWeight.String(), + added.Info.ReplaceCapacity, + nil, // replace deadline + nil, // replace partition + nil, // replace sector + ); err != nil { + return err + } + } + preCommitAdded[i] = uint64(added.Info.SectorNumber) } - preCommitAdded[i] = uint64(added.Info.SectorNumber) - } - if len(preCommitAdded) > 0 { - sectorEvents <- &MinerSectorsEvent{ - MinerID: m.common.addr, - StateRoot: m.common.stateroot, - SectorIDs: preCommitAdded, - Event: PreCommitAdded, + if len(preCommitAdded) > 0 { + sectorEvents <- &MinerSectorsEvent{ + MinerID: m.common.addr, + StateRoot: m.common.stateroot, + SectorIDs: preCommitAdded, + Event: PreCommitAdded, + } } - } - var preCommitExpired []uint64 - for _, removed := range changes.Removed { - var sector miner.SectorOnChainInfo - if found, err := minerSectors.Get(uint64(removed.Info.SectorNumber), §or); err != nil { - return err - } else if !found { - preCommitExpired = append(preCommitExpired, uint64(removed.Info.SectorNumber)) + var preCommitExpired []uint64 + for _, removed := range changes.Removed { + var sector miner.SectorOnChainInfo + if found, err := minerSectors.Get(uint64(removed.Info.SectorNumber), §or); err != nil { + return err + } else if !found { + preCommitExpired = append(preCommitExpired, uint64(removed.Info.SectorNumber)) + } } - } - if len(preCommitExpired) > 0 { - sectorEvents <- &MinerSectorsEvent{ - MinerID: m.common.addr, - StateRoot: m.common.stateroot, - SectorIDs: preCommitExpired, - Event: PreCommitExpired, + if len(preCommitExpired) > 0 { + sectorEvents <- &MinerSectorsEvent{ + MinerID: m.common.addr, + StateRoot: m.common.stateroot, + SectorIDs: preCommitExpired, + Event: PreCommitExpired, + } } - } + return nil + }) + } + if err := grp.Wait(); err != nil { + return err } if err := stmt.Close(); err != nil { @@ -443,67 +451,76 @@ func (p *Processor) storeMinerSectorInfo(ctx context.Context, miners []minerActo return xerrors.Errorf("Failed to prepare miner sector info statement: %w", err) } + grp, _ := errgroup.WithContext(ctx) for _, m := range miners { - changes, err := p.getMinerSectorChanges(ctx, m) - if err != nil { - if strings.Contains(err.Error(), types.ErrActorNotFound.Error()) { - continue - } else { - return err + m := m + grp.Go(func() error { + changes, err := p.getMinerSectorChanges(ctx, m) + if err != nil { + if strings.Contains(err.Error(), types.ErrActorNotFound.Error()) { + return nil + } else { + return err + } } - } - if changes == nil { - continue - } - var sectorsAdded []uint64 - var ccAdded []uint64 - var extended []uint64 - for _, added := range changes.Added { - // add the sector to the table - if _, err := stmt.Exec( - m.common.addr.String(), - added.SectorNumber, - added.SealedCID.String(), - m.common.stateroot.String(), - added.Activation.String(), - added.Expiration.String(), - added.DealWeight.String(), - added.VerifiedDealWeight.String(), - added.InitialPledge.String(), - added.ExpectedDayReward.String(), - added.ExpectedStoragePledge.String(), - ); err != nil { - return err + if changes == nil { + return nil } - if len(added.DealIDs) == 0 { - ccAdded = append(ccAdded, uint64(added.SectorNumber)) - } else { - sectorsAdded = append(sectorsAdded, uint64(added.SectorNumber)) + var sectorsAdded []uint64 + var ccAdded []uint64 + var extended []uint64 + for _, added := range changes.Added { + // add the sector to the table + if _, err := stmt.Exec( + m.common.addr.String(), + added.SectorNumber, + added.SealedCID.String(), + m.common.stateroot.String(), + added.Activation.String(), + added.Expiration.String(), + added.DealWeight.String(), + added.VerifiedDealWeight.String(), + added.InitialPledge.String(), + added.ExpectedDayReward.String(), + added.ExpectedStoragePledge.String(), + ); err != nil { + log.Errorw("writing miner sector changes statement", "error", err.Error()) + } + if len(added.DealIDs) == 0 { + ccAdded = append(ccAdded, uint64(added.SectorNumber)) + } else { + sectorsAdded = append(sectorsAdded, uint64(added.SectorNumber)) + } } - } - for _, mod := range changes.Extended { - extended = append(extended, uint64(mod.To.SectorNumber)) - } + for _, mod := range changes.Extended { + extended = append(extended, uint64(mod.To.SectorNumber)) + } - events <- &MinerSectorsEvent{ - MinerID: m.common.addr, - StateRoot: m.common.stateroot, - SectorIDs: ccAdded, - Event: CommitCapacityAdded, - } - events <- &MinerSectorsEvent{ - MinerID: m.common.addr, - StateRoot: m.common.stateroot, - SectorIDs: sectorsAdded, - Event: SectorAdded, - } - events <- &MinerSectorsEvent{ - MinerID: m.common.addr, - StateRoot: m.common.stateroot, - SectorIDs: extended, - Event: SectorExtended, - } + events <- &MinerSectorsEvent{ + MinerID: m.common.addr, + StateRoot: m.common.stateroot, + SectorIDs: ccAdded, + Event: CommitCapacityAdded, + } + events <- &MinerSectorsEvent{ + MinerID: m.common.addr, + StateRoot: m.common.stateroot, + SectorIDs: sectorsAdded, + Event: SectorAdded, + } + events <- &MinerSectorsEvent{ + MinerID: m.common.addr, + StateRoot: m.common.stateroot, + SectorIDs: extended, + Event: SectorExtended, + } + return nil + }) + } + + if err := grp.Wait(); err != nil { + return err } if err := stmt.Close(); err != nil {