Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add watch and walk commands to index chain during traversal #249

Merged
merged 2 commits into from
Dec 2, 2020

Conversation

iand
Copy link
Contributor

@iand iand commented Nov 19, 2020

This adds two new commands to visor to index while performing an in-order traversal of the filecoin chain.

The watch command watches and follows the head of the chain with a given confidence level
The walk command walks backwards between two epochs

Both commands can be configured to perform a number of tasks as they encounter tipsets. They can remember the previous tipset and extraction result to make diffing or referring to earlier messages more efficient. Tasks are performed concurrently where possible.

The watch command attempts to do everything within a single epoch. The walk command has no deadline.

Tasks available to the new commands:

  • blocks - extract block header, block parents and drand entry information from each tipset encountered
  • messages - extract messages, receipts, block messages, parsed messages and gas outputs from each parent/child pair of tipsets encountered
  • chaineconomics - extracts the circulating supply of FIL from each tipset encountered.
  • actorstatesraw - extracts raw actor information and JSON of the actor state for each actor that changes state between two tipsets
  • actorstatesparsed - parses actor state into separate tables for each actor that changes state between two tipsets
  • actorstates - combines actorstatesraw and actorstatesparsed into a single task

The actorstates task is split into two parts. actorstatesparsed requires much more time to execute than actorstatesraw.

Test with something like:

visor  --lens=lotusrepo --repo=/data/lotus/ --repo-read-only=false  walk --from 82000 --to=82002

or

visor  --lens=lotus --api=addresss watch --indexhead-confidence=25

Results of the walk and watch command are stored in a new visor_processing_reports table. One row is written for each epoch and task combination with one of three statuses:

  • OK - task completed successfully
  • INFO - task completed successfully but some additional information can be found in the status_information column.
  • ERRROR - task failed to complete fully. More details can be found in the errors_detected column.

Performance notes:

Currently the speed of requesting blocks from Lotus prevents anything except basic block indexing from completing in a single epoch. With a fast block cache (such as lotus-cpr) in front of the API is it feasible to watch the chain head with the blocks, messages, chaineconomics and actorstatesraw tasks enabled. Further optimization of actor state diffing is needed to enable the actorstatesparsed task to reliably complete within a single epoch.

Still to do:

  • Persist to database (extraction is done, no writing yet, does not need to be in 30s window for watching)
  • Implement gas outputs calculation (pieces are there, need to hook up calls)
  • Unhack use of tasks (currently use bits of task processors, should refactor to cleaner interfaces)
  • Chain economics
  • Moar optimizations since we have lots of state available during the traversal
  • Error handling/reporting when we fail to parse something due to timeout or other error. Want to re-run these.
  • Handle null rounds
  • Parsed messages
  • Implement GetExecutedMessagesForTipset for lotus api lens

@iand iand self-assigned this Nov 19, 2020
@iand iand force-pushed the iand/watcher branch 2 times, most recently from fb524bf to a1f710a Compare November 20, 2020 15:37
@iand iand linked an issue Nov 20, 2020 that may be closed by this pull request
@iand iand added the kind/enhancement Improvement to an existing feature label Nov 20, 2020
@iand iand force-pushed the iand/watcher branch 5 times, most recently from a6d91b3 to 705965d Compare November 25, 2020 15:31
@iand iand force-pushed the iand/watcher branch 6 times, most recently from e3d4865 to 1fe39e2 Compare November 30, 2020 15:09
@iand iand marked this pull request as ready for review November 30, 2020 15:46
@iand
Copy link
Contributor Author

iand commented Nov 30, 2020

Sorry for the far too large PR. When I started I thought I could reuse the existing message, gas economy and actor state change tasks but they are too coupled to the random access indexing mode that we currently use. I ended up extracting most of the message parsing and actor state change detection into new tasks that keep track of the last tipset and state tree they saw so they can perform appropriate diffs.

The key areas that need particular review are:

  • each processor in the new chain package
  • the new GetExecutedMessagesForTipset method on the lens. There is a version for lenses with access to a chainstore (in lens/util) and a version that calls lotus api methods (in lens/lotus)
  • the simplified ChainHistoryIndexer in tasks/indexer
  • the new reporting table in models/visor

@iand
Copy link
Contributor Author

iand commented Nov 30, 2020

I forgot to mention that neither of these these commands use the existing visor processing tables. They don't read the database at all, just write results.

type ActorStateProcessor struct {
node lens.API
opener lens.APIOpener
closer lens.APICloser
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems a bit scary that each processor holds the closer, since that's often going to be a shared handle that closes it fully for many lenses.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The closer here is for the API instance that is returned when opener is called. It's not the same as the closer created when the opener is created. That is deferred in the top level command.

start := time.Now()

// Run each task concurrently
results := make(chan *ActorStateResult, len(changes))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what closes / cleans up this channel?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It goes out of scope when the function returns and is garbage collected as normal. There's never any need to close a channel unless it is to signal that readers should stop waiting on it.

Comment on lines +151 to +153
if len(errorsDetected) != 0 {
report.ErrorsDetected = errorsDetected
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you append errors on the report directly, rather than in a separate variable you then append to the report here?

Copy link
Contributor Author

@iand iand Dec 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ErrorsDetected is an interface{} unfortunately

main.go Outdated
defaultName := "visor_" + version.String()
hostname, err := os.Hostname()
if err == nil {
defaultName += "_" + hostname + "_" + strconv.Itoa(os.Getpid())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider fmt.Sprintf("%s_%s_%d", defaultName, hostname, os.Getpid())

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@placer14 placer14 self-assigned this Nov 30, 2020
}
p.node = node
p.closer = closer
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason to postpone this until ProcessTipSet and not init within New?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to re-open the lens whenever it has been closed after an error. This is the reconnection logic for the lotus API.

for inFlight > 0 {
res := <-results
inFlight--
elapsed := time.Since(start)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This elapsed will not be accurate since the for loop will block receieves for all buffered results. Maybe this is negligble given we're just appending to pre-alloced slices?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will remove all elapsed times in favour of proper metrics collection in a future change

}
p.node = node
p.closer = closer
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why postpone this setup until ProcessTipSet and not within New?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above, it's not a setup it's re-connecting a lens that may have been closed due to an error.

ParentBaseFee: m.BlockHeader.ParentBaseFee.String(),

// TODO: is SizeBytes really needed here?
SizeBytes: msgSize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI: No, it's enough to be in Messages.

@placer14 placer14 removed their assignment Nov 30, 2020
Copy link
Member

@frrist frrist left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments from my first pass, going to give a second look tomorrow. Looks good so far.

Comment on lines +65 to +79
} else {
log.Errorw("out of order tipsets", "height", ts.Height(), "last_height", p.lastTipSet.Height())
}
}

p.lastTipSet = ts
p.lastStateTree = stateTree

if err != nil {
log.Errorw("error received while processing actors, closing lens", "error", err)
if cerr := p.Close(); cerr != nil {
log.Errorw("error received while closing lens", "error", cerr)
}
}
return data, report, err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can result in all returned values being nil, perhaps the error message should be filled with a message similar to the error log above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No error and no data is a valid outcome. The very first tipset will have no data since we wait until we have seen two to perform a diff.

Comment on lines +221 to +222
// ProcessTipSet processes a tipset. If error is non-nil then the processor encountered a fatal error.
// Any data returned must be accompanied by a processing report.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If error is non-nil then the processor encountered a fatal error.

It could also be the case that no errors, fatal or not were encountered, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. The only errors that should be returned here are ones that signal that the processor cannot continue.

Comment on lines +257 to +267
// TODO: the following closure is in place to handle the potential for panic
// in ipld-prime. Can be removed once fixed upstream.
// tracking issue: https://github.com/ipld/go-ipld-prime/issues/97
func() {
defer func() {
if r := recover(); r != nil {
err = xerrors.Errorf("recovered panic: %v", r)
}
}()
params, method, err = statediff.ParseParams(m.Params, int(m.Method), actor)
}()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fwiw ipld/go-ipld-prime#97 has been closed via ipld/go-ipld-prime#99. Perhaps this is no longer needed? cc @willscott since I have a hunch this TODO was from you.

// No attempt at deduplication of messages is made.
func (aw *APIWrapper) GetExecutedMessagesForTipset(ctx context.Context, ts, pts *types.TipSet) ([]*lens.ExecutedMessage, error) {
if !types.CidArrsEqual(ts.Parents().Cids(), pts.Cids()) {
return nil, xerrors.Errorf("child is not on the same chain")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: might be helpful to add some info about the tipset(s) in this message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// Get receipts for parent messages
rcpts, err := aw.ChainGetParentReceipts(ctx, ts.Cids()[0])
if err != nil {
return nil, xerrors.Errorf("get parent messages: %w", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get parent receipts

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

// No attempt at deduplication of messages is made.
func GetExecutedMessagesForTipset(ctx context.Context, cs *store.ChainStore, ts, pts *types.TipSet) ([]*lens.ExecutedMessage, error) {
if !types.CidArrsEqual(ts.Parents().Cids(), pts.Cids()) {
return nil, xerrors.Errorf("child is not on the same chain")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto previous comment about helpful error message

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

)

type ProcessingReport struct {
tableName struct{} `pg:"visor_processing_reports"`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to pass linting add // nolint: structcheck,unused

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if _, err := tx.ModelContext(ctx, &l).
OnConflict("do nothing").
Insert(); err != nil {
return fmt.Errorf("persisting processing report: %w", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

processing report list

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@iand iand merged commit ae6d3a8 into master Dec 2, 2020
@iand iand deleted the iand/watcher branch December 2, 2020 10:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/enhancement Improvement to an existing feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Optimize visor by processing tipsets in order
4 participants