Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingest/ledgerbackend: Create functional producer for BufferedStorageBackend #5462

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

sreuland
Copy link
Contributor

@sreuland sreuland commented Sep 17, 2024

PR Checklist

PR Structure

  • This PR has reasonably narrow scope (if not, break it down into smaller PRs).
  • This PR avoids mixing refactoring changes with feature changes (split into two PRs
    otherwise).
  • This PR's title starts with name of package that is most changed in the PR, ex.
    services/friendbot, or all or doc if the changes are broad or impact many
    packages.

Thoroughness

  • This PR adds tests for the most critical parts of the new functionality or fixes.
  • I've updated any docs (developer docs, .md
    files, etc... affected by this change). Take a look in the docs folder for a given service,
    like this one.

Release planning

  • I've reviewed the changes in this PR and if I consider them worthwhile for being mentioned on release notes then I have updated the relevant CHANGELOG.md within the component folder structure. For example, if I changed horizon, then I updated (services/horizon/CHANGELOG.md. I add a new line item describing the change and reference to this PR. If I don't update a CHANGELOG, I acknowledge this PR's change may not be mentioned in future release notes.
  • I've decided if this PR requires a new major/minor version according to
    semver, or if it's mainly a patch change. The PR is targeted at the next
    release branch if it's not a patch change.

What

Added new PublishFromBufferedStorageBackend to provide a functional origin for streaming data development as part of CDP processors library.

Why

Provide efficient tooling in the sdk to produce a stream of tx-meta from a remote Datastore that has pre-computed tx-meta files from Galexie. The stream is established as a callback function which caller passes, it will be asynchronously invoked and serves as origin in a data transformation pipeline, providing source of network tx-meta to which pipeline transform operators can derive from.

Closes: #5412

Known limitations

return
}
}
close(resultCh)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why isn't the channel closed on early return due to an error?

}
resultCh := make(chan error, 1)

go func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the context cancellation be handled in this goroutine to handle caller cancelling the context?


err = callback(ledgerCloseMeta)
if err != nil {
resultCh <- errors.Wrap(err, "received an error from callback invocation")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should we consistently use fmt.Errorf ?

select {
case chErr, ok := <-resultCh:
if ok {
assert.ErrorContains(t, chErr, "invalid end value for unbounded ranged, must be zero")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert.ErrorContains(t, chErr, "invalid end value for unbounded ranged, must be zero")
assert.ErrorContains(t, chErr, "invalid end value for unbounded range, must be zero")

}

if !ledgerRange.bounded && ledgerRange.to > 0 {
resultCh <- errors.New("invalid end value for unbounded ranged, must be zero")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
resultCh <- errors.New("invalid end value for unbounded ranged, must be zero")
resultCh <- errors.New("invalid end value for unbounded range, must be zero")

}
resultCh := make(chan error, 1)

go func() {
Copy link
Contributor

@tamirms tamirms Sep 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines +188 to +190
if !ledgerRange.bounded {
to = math.MaxUint32
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use the following for loop condition to avoid setting to to the max uint32:

		for ledgerSeq := from; ledgerSeq <= to || !ledgerRange.bounded; ledgerSeq++ {

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

ingest/pipeline: Create functional producer for BufferedStorageBackend
3 participants