Skip to content

Commit

Permalink
fix: guard concurrent accesses to node api (#412)
Browse files Browse the repository at this point in the history
  • Loading branch information
iand authored Mar 9, 2021
1 parent 8f367a3 commit de2eda0
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 26 deletions.
2 changes: 1 addition & 1 deletion chain/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func NewTipSetIndexer(o lens.APIOpener, d model.Storage, window time.Duration, n
case BlocksTask:
tsi.processors[BlocksTask] = blocks.NewTask()
case MessagesTask:
tsi.messageProcessors[MessagesTask] = messages.NewTask(o)
tsi.messageProcessors[MessagesTask] = messages.NewTask()
case ChainEconomicsTask:
tsi.processors[ChainEconomicsTask] = chaineconomics.NewTask(o)
case ActorStatesRawTask:
Expand Down
27 changes: 23 additions & 4 deletions tasks/actorstate/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package actorstate
import (
"context"
"fmt"
"sync"
"time"

"github.com/filecoin-project/go-address"
Expand All @@ -19,9 +20,11 @@ import (

// A Task processes the extraction of actor state according the allowed types in its extracter map.
type Task struct {
node lens.API
opener lens.APIOpener
closer lens.APICloser
nodeMu sync.Mutex // guards mutations to node, opener and closer
node lens.API
opener lens.APIOpener
closer lens.APICloser

extracterMap ActorExtractorMap
}

Expand All @@ -34,14 +37,18 @@ func NewTask(opener lens.APIOpener, extracterMap ActorExtractorMap) *Task {
}

func (t *Task) ProcessActors(ctx context.Context, ts *types.TipSet, pts *types.TipSet, candidates map[string]types.Actor) (model.Persistable, *visormodel.ProcessingReport, error) {
// t.node is used only by goroutines started by this method
t.nodeMu.Lock()
if t.node == nil {
node, closer, err := t.opener.Open(ctx)
if err != nil {
t.nodeMu.Unlock()
return nil, nil, xerrors.Errorf("unable to open lens: %w", err)
}
t.node = node
t.closer = closer
}
t.nodeMu.Unlock()

log.Debugw("processing actor state changes", "height", ts.Height(), "parent_height", pts.Height())

Expand Down Expand Up @@ -150,8 +157,18 @@ func (t *Task) runActorStateExtraction(ctx context.Context, ts *types.TipSet, pt
if !ok {
res.SkippedParse = true
} else {
// get reference to the lens api, which may have been closed due to a failure elsewhere
t.nodeMu.Lock()
nodeAPI := t.node
t.nodeMu.Unlock()

if nodeAPI == nil {
res.Error = xerrors.Errorf("failed to extract parsed actor state: no connection to api")
return
}

// Parse state
data, err := extracter.Extract(ctx, info, t.node)
data, err := extracter.Extract(ctx, info, nodeAPI)
if err != nil {
res.Error = xerrors.Errorf("failed to extract parsed actor state: %w", err)
return
Expand All @@ -161,6 +178,8 @@ func (t *Task) runActorStateExtraction(ctx context.Context, ts *types.TipSet, pt
}

func (t *Task) Close() error {
t.nodeMu.Lock()
defer t.nodeMu.Unlock()
if t.closer != nil {
t.closer()
t.closer = nil
Expand Down
9 changes: 9 additions & 0 deletions tasks/chaineconomics/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package chaineconomics

import (
"context"
"sync"

"github.com/filecoin-project/lotus/chain/types"
logging "github.com/ipfs/go-log/v2"
Expand All @@ -15,6 +16,7 @@ import (
var log = logging.Logger("chaineconomics")

type Task struct {
nodeMu sync.Mutex // guards mutations to node, opener and closer
node lens.API
opener lens.APIOpener
closer lens.APICloser
Expand All @@ -27,6 +29,10 @@ func NewTask(opener lens.APIOpener) *Task {
}

func (p *Task) ProcessTipSet(ctx context.Context, ts *types.TipSet) (model.Persistable, *visormodel.ProcessingReport, error) {
// We use p.node continually through this method so take a broad lock
p.nodeMu.Lock()
defer p.nodeMu.Unlock()

if p.node == nil {
node, closer, err := p.opener.Open(ctx)
if err != nil {
Expand Down Expand Up @@ -55,6 +61,9 @@ func (p *Task) ProcessTipSet(ctx context.Context, ts *types.TipSet) (model.Persi
}

func (p *Task) Close() error {
p.nodeMu.Lock()
defer p.nodeMu.Unlock()

if p.closer != nil {
p.closer()
p.closer = nil
Expand Down
23 changes: 2 additions & 21 deletions tasks/messages/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,13 @@ import (
var log = logging.Logger("messages")

type Task struct {
node lens.API
opener lens.APIOpener
closer lens.APICloser
}

func NewTask(opener lens.APIOpener) *Task {
return &Task{
opener: opener,
}
func NewTask() *Task {
return &Task{}
}

func (p *Task) ProcessMessages(ctx context.Context, ts *types.TipSet, pts *types.TipSet, emsgs []*lens.ExecutedMessage) (model.Persistable, *visormodel.ProcessingReport, error) {
if p.node == nil {
node, closer, err := p.opener.Open(ctx)
if err != nil {
return nil, nil, xerrors.Errorf("unable to open lens: %w", err)
}
p.node = node
p.closer = closer
}

report := &visormodel.ProcessingReport{
Height: int64(pts.Height()),
StateRoot: pts.ParentState().String(),
Expand Down Expand Up @@ -258,11 +244,6 @@ func (p *Task) parseMessageParams(m *types.Message, destCode cid.Cid) (string, s
}

func (p *Task) Close() error {
if p.closer != nil {
p.closer()
p.closer = nil
}
p.node = nil
return nil
}

Expand Down
9 changes: 9 additions & 0 deletions tasks/msapprovals/msapprovals.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package msapprovals
import (
"bytes"
"context"
"sync"

"github.com/filecoin-project/lotus/chain/actors/builtin/multisig"
"github.com/filecoin-project/lotus/chain/types"
Expand All @@ -27,6 +28,7 @@ const (
)

type Task struct {
nodeMu sync.Mutex // guards mutations to node, opener and closer
node lens.API
opener lens.APIOpener
closer lens.APICloser
Expand All @@ -39,6 +41,10 @@ func NewTask(opener lens.APIOpener) *Task {
}

func (p *Task) ProcessMessages(ctx context.Context, ts *types.TipSet, pts *types.TipSet, emsgs []*lens.ExecutedMessage) (model.Persistable, *visormodel.ProcessingReport, error) {
// We use p.node continually through this method so take a broad lock
p.nodeMu.Lock()
defer p.nodeMu.Unlock()

// TODO: refactor this boilerplate into a helper
if p.node == nil {
node, closer, err := p.opener.Open(ctx)
Expand Down Expand Up @@ -169,6 +175,9 @@ func (p *Task) ProcessMessages(ctx context.Context, ts *types.TipSet, pts *types
}

func (p *Task) Close() error {
p.nodeMu.Lock()
defer p.nodeMu.Unlock()

if p.closer != nil {
p.closer()
p.closer = nil
Expand Down

0 comments on commit de2eda0

Please sign in to comment.