Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make TableProvider.scan() and PhysicalPlanner::create_physical_plan() async #1013

Merged
merged 5 commits into from
Sep 21, 2021

Conversation

rdettai
Copy link
Contributor

@rdettai rdettai commented Sep 17, 2021

Which issue does this PR close?

Closes #1012.

Rationale for this change

  • Simplify usage of async during the planning.
  • Support using DataFusion in systems where creating the scan requires network access (to, for example, find a list of files on some object store that match, or making a remote RPC call)

What changes are included in this PR?

  • mostly mechanical changes adding #[async_trait], async fn and .await annotations
  • Use BoxFuture for recursive async functions
  • Shorten the duration of the lock on the ExecutionContextState to avoid making it Send and propagate async to far in the API

Are there any user-facing changes?

API changes are relatively limited:

@github-actions github-actions bot added ballista datafusion Changes in the datafusion crate labels Sep 17, 2021
@rdettai rdettai marked this pull request as ready for review September 17, 2021 10:12
Comment on lines +304 to +308
fn create_initial_plan<'a>(
&'a self,
logical_plan: &'a LogicalPlan,
ctx_state: &'a ExecutionContextState,
) -> BoxFuture<'a, Result<Arc<dyn ExecutionPlan>>> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shows as a big change but only because the function body needs to be wrapped into a BoxFuture with async { .. }.boxed()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can't this function signature be made async fn create_initial_plan.... ? Why does it need a boxed future?

FWIW I found it easier to understand the diffs without whitespace: https://github.com/apache/arrow-datafusion/pull/1013/files?w=1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see -- so that it can recursively call itself. 👍

@alamb alamb added the api change Changes the API exposed to users of the crate label Sep 17, 2021
@alamb
Copy link
Contributor

alamb commented Sep 17, 2021

I would recommend adding another rationale for this change which would be to "Support using DataFusion in systems where creating the scan requires network access (to, for example, find a list of files on some object store that match, or making a remote RPC call)"

@alamb alamb changed the title Make TableProvider.scan() async Make TableProvider.scan() and PhysicalPlanner::create_physical_plan() async Sep 17, 2021
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @rdettai -- I only skimmed the changes to ballista -- I looked carefully at the changes to DataFusion. I really like the change to make TableProvider.scan() async as it opens up a host of ways to plan that can use network / other async resources

I think we should remove the changes in parallelism for Extension nodes and Union nodes from this PR as they appear unrelated and I am less sure they are good change

I really like this change and I think it brings a lot of flexibility to TableProviders

However, given it is a non trivial API change, I think we should get buy in from other parties as well

cc @yjshen @houqp @Dandandan @andygrove

datafusion/src/execution/context.rs Show resolved Hide resolved
Comment on lines +304 to +308
fn create_initial_plan<'a>(
&'a self,
logical_plan: &'a LogicalPlan,
ctx_state: &'a ExecutionContextState,
) -> BoxFuture<'a, Result<Arc<dyn ExecutionPlan>>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can't this function signature be made async fn create_initial_plan.... ? Why does it need a boxed future?

FWIW I found it easier to understand the diffs without whitespace: https://github.com/apache/arrow-datafusion/pull/1013/files?w=1

datafusion/src/physical_plan/planner.rs Outdated Show resolved Hide resolved
datafusion/src/physical_plan/planner.rs Outdated Show resolved Hide resolved
Comment on lines 248 to 251
let execution_plan = ctx.create_physical_plan(&plan).await?;
ctx.register_table(
TableReference::Bare { table: name },
Arc::new(DfTableAdapter::new(plan, execution_plan)),
Copy link
Contributor Author

@rdettai rdettai Sep 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel BallistaContext.sql(&str) should not become async. Opening #1016 to track that.

@Dandandan
Copy link
Contributor

This looks like a useful change to me! I think we can benefit from the Tokio runtime for doing remote calls.
Also a lot of the planning could be done in parallel without too much changes - which is amazing too.

Comment on lines +304 to +308
fn create_initial_plan<'a>(
&'a self,
logical_plan: &'a LogicalPlan,
ctx_state: &'a ExecutionContextState,
) -> BoxFuture<'a, Result<Arc<dyn ExecutionPlan>>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see -- so that it can recursively call itself. 👍

@yjshen
Copy link
Member

yjshen commented Sep 18, 2021

It is great to bring async to the planning phase. The changes are neat. I've tried to prototype this kind of staff before and find this easy to follow. Great job @rdettai 👍

@rdettai
Copy link
Contributor Author

rdettai commented Sep 20, 2021

If we close #1028 first, I can rebase on that and avoid making BallistaContext.sql(&str) async

@houqp
Copy link
Member

houqp commented Sep 20, 2021

@rdettai you can rebase now :)

// be written to. As for eventual modifications that would be applied to the
// original state after it has been cloned, they will not be picked up by the
// clone but that is okay, as it is equivalent to postponing the state update
// by keeping the lock until the end of the function scope.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 thanks for the clear write up

Copy link
Member

@houqp houqp left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rdettai for this great improvement. The change look good to me, good to go after we resolve the conflicts.

@alamb
Copy link
Contributor

alamb commented Sep 20, 2021

Any concerns with this change @Dandandan / @houqp ? If no one has any concerns, I would like to merge this as soon as it gets rebased and CI is passing

Thanks again @rdettai

@houqp
Copy link
Member

houqp commented Sep 20, 2021

No concern for me, +1 on merging it after CI is passing.

@Dandandan
Copy link
Contributor

Also +1 from me

@rdettai
Copy link
Contributor Author

rdettai commented Sep 21, 2021

Rebase on #1016 allowed to remove async from BallistaContext.sql(&str), only api changes are now:

  • implementers of TableProvider need to make their implementation of scan async
  • ExecutionContext.create_physical_plan(&LogicalPlan) is now async and thus needs to be .awaited

@alamb alamb merged commit 299ab7d into apache:master Sep 21, 2021
@alamb
Copy link
Contributor

alamb commented Sep 21, 2021

Thanks again @rdettai -- I think this is a great step forward towards supporting more object store / distributed planning systems with DataFusion ❤️ 🚀

@rdettai
Copy link
Contributor Author

rdettai commented Sep 21, 2021

Thank you all for your review time! The next step of the journey is #1010. It not the support of the object store yet, but that will also come soon ! 😉

@rdettai rdettai deleted the async-scan branch September 21, 2021 11:53
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api change Changes the API exposed to users of the crate datafusion Changes in the datafusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Make the conversion from logical to physical plan async
5 participants