Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-implement DataFrame.write_* to use LogicalPlan::Write #5076

Closed
andygrove opened this issue Jan 26, 2023 · 10 comments · Fixed by #7283
Closed

Re-implement DataFrame.write_* to use LogicalPlan::Write #5076

andygrove opened this issue Jan 26, 2023 · 10 comments · Fixed by #7283
Labels
enhancement New feature or request

Comments

@andygrove
Copy link
Member

andygrove commented Jan 26, 2023

Is your feature request related to a problem or challenge? Please describe what you are trying to do.
We have DataFrame methods such as write_parquet that create a physical plan and execute it, then write the results to disk. This was implemented this way because, at the time, there was no "write" concept in the logical plan.

We now have LogicalPlan:Dml for Insert/Update/Delete, but I think we need a more generic LogicalPlan::Write operation, as well as a physical operator to perform the write. This is a common pattern for ETL tasks that execute a query, perform a transformation, and then write the results to disk.

Describe the solution you'd like
As described.

Describe alternatives you've considered

Additional context

@andygrove andygrove added the enhancement New feature or request label Jan 26, 2023
@andygrove andygrove changed the title Re-implement DataFrame.write_* to use LogicalPlan::Dml Re-implement DataFrame.write_* to use LogicalPlan::Write Jan 26, 2023
@metesynnada
Copy link
Contributor

We can also support Statement::Copy in SQL with this LogicalPlan::Write. So it may handle

Statement::Copy {
                table_name,
                columns,
                to,
                target,
                options,
                legacy_options,
                values
            }

properties, i.e support on CopyTarget::File with CopyOption. Perhaps We can handle the same options in DataFrame.write_*,

@alamb
Copy link
Contributor

alamb commented Apr 24, 2023

I think we should stay true to the design goal of DataFusion and keep this functionality as modular as possible (aka implemented in terms of traits that can be extended by other systems).

Here are some ideas:

Idea: Add a physical plan for LogicalPlan::DML

(I think this is what @andygrove is suggesting).

This would add a way to create a physical plan for LogicalPlan::Dml(.. op: Insert) and have that implementation call the appropriate (TBD) methods on TableProvider that would handle writing. This is similar to what @metesynnada proposes in #6049 though it is not mem table specific.

The upside here is we already have all the flow and planner and it would follow the pattern of system like spark (e.g. DataWritingCommandExec -- thanks to @metesynnada for the link)

The downsides are that such an ExecutionPlan is kind of strange (it makes no output, so therefore most of the methods like "output ordering" are basically useless) as I mentioned on #6049 (review)

Idea: Add specific runner / executor for Inserts / Update / Deletes

Maybe we could provide a function or struct run_insert(source: Arc<dyn ExecutionPlan>, target: Arc<dyn TableProvider>) that would orchestrate:

  1. Running the execution plan
  2. calling appropriate (TBD) methods on TableProvider that would handle writing

Here is how you might run it:

let runner = Insert::new(context)
  .target(my_table)
  .run(target)?

A benefit here is that only systems that wanted to handle DML would invoke the inserter.

A downside is that it would require more code / connections to work

Maybe @avantgardnerio has some thoughts in this area, as I think he has a system that does DML as well based on DataFusion

@tustvold
Copy link
Contributor

tustvold commented Apr 24, 2023

The downsides are that such an ExecutionPlan is kind of strange

In addition to the points r.e. sort order, etc... from a scheduling / partitioning standpoint returning ExecutionPlan is perhaps not ideal.

Add specific runner / executor for Inserts / Update / Deletes

I'm likely missing something but does this even need to be a separate abstraction, is this not an implementation detail of TableProvider::insert_into? This would also allow it to make table-specific decisions w.r.t preserving partitioning, transaction handling, etc... Basically #6049 but returning Result<()>?

The only thing I can possibly think of is explain output for TableProvider, but this already requires special-case handling anyway so I'm not sure this is necessarily an issue...

@alamb
Copy link
Contributor

alamb commented Apr 24, 2023

I think something needs to call execute() on the ExecutionPlan and then push the results into the TableProvider (or whatever other sink of data)

It is a very reasonable question about how much code that requires and if it should be done in DataFuson or outside

@tustvold
Copy link
Contributor

tustvold commented Apr 24, 2023

how much code

Tbh I'm more concerned that we provide sufficient API extensibility for different TableProvider to be able to support a variety of use-cases, without hard-coding assumptions about partitioning, sort-order, transaction isolation, etc... After all we can always extract common functionality into helper functions without it being a breaking API change

Edit: #6109 extracts some common logic to this end

@ozankabak
Copy link
Contributor

ozankabak commented Apr 24, 2023

I find Idea 1 reasonable. If we think of ExecutionPlan's as nodes in the computation graph, writer nodes are simply terminal/sink nodes -- but they are still nodes. Therefore some methods like output_ordering not being consumed by anyone doesn't seem unnatural to me, this would be analogous to sink nodes not having any outgoing edges. All in all, I think following Spark's approach is reasonable in this case.

I am curious to hear what @andygrove thinks about @alamb's points, though.

@thinkharderdev
Copy link
Contributor

The downsides are that such an ExecutionPlan is kind of strange (it makes no output, so therefore most of the methods like "output ordering" are basically useless)

I think it's useful to have write operations be able to produce outputs. In many use cases you actually want metadata about what was written (file locations, total bytes, etc). This seems like a natural way to, for example, update a catalog with the new table.

@avantgardnerio
Copy link
Contributor

I don't have a strong opinion here, since I haven't had much time to look in depth and we fork execution at the LogicalPlan level before it ever gets to the physical plan anyway.

I do see how adding support for DML in TableProvider and PhysicalExec would be the next logical step. I would like to weigh in and say if we do that:

  1. Don't forget there are folks like me that do have the ability to mutate data. I have no idea how this would work in the broader arrow-rs sense though (happy to talk about how we do it)
  2. If it is for write-once type things like CTAS we probably want some TableProviderFactory support for it rather than TableProvider

@andygrove
Copy link
Member Author

I'm possibly too influenced by Spark's approach, but option 1 seems to work well. For example, Ballista has a ShuffleWriterExec, which is an execution plan that executes its child query and repartitions the output and writes it to disk. ShuffleWriterExec then produces its own output, which is metadata about the data that was written out.

https://github.com/apache/arrow-ballista/blob/main/ballista/core/src/execution_plans/shuffle_writer.rs#L329

That said, I have stronger opinions about including the logical plan representation than I do about how this is implemented in the physical plan.

@ozankabak
Copy link
Contributor

@andygrove, can you take a look at #6049? It implements an ExecutionPlan to write to MemTable. If the pattern there looks OK to you as a first step, we have some follow-ups in the same vein for writing to ListingTables etc.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

7 participants