Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARROW-17762: [C++] WIP: Add ordering information to Acero #14158

Closed

Conversation

westonpace
Copy link
Member

No description provided.

@github-actions
Copy link

@github-actions
Copy link

⚠️ Ticket has not been started in JIRA, please click 'Start Progress'.

@westonpace westonpace force-pushed the feature/ARROW-17762--add-ordering branch from caf4a9f to 467f26f Compare September 19, 2022 12:03
@westonpace
Copy link
Member Author

@rtpsw FYI, I think you were interested in my plans for ordered execution. This PR is based on my earlier proposal I sent to the ML. I plan to create an example fetch node that consumes the ordering information to do something useful today.

This PR is built on top of ARROW-17287 and so it is a little easier to look at just the diff between the two:

westonpace/arrow@feature/ARROW-17287--initial-exec-plan-scan-node...westonpace:arrow:feature/ARROW-17762--add-ordering

@rtpsw
Copy link
Contributor

rtpsw commented Sep 20, 2022

Thanks, @westonpace. I'm interested though will need a couple of days to get to this.

@westonpace
Copy link
Member Author

@rtpsw I've added an example of a FetchNode that consumes the ordering information. This will hopefully give some idea on how to use the exec batch index.

Thanks, @westonpace. I'm interested though will need a couple of days to get to this.

There is no rush. I probably won't get back to this myself for a while as I need to get #13782 (and numerous follow-ups) merged in.

@icexelloss
Copy link
Contributor

I care about this work very much as well and hope can understand this better. If I remember correctly the high level idea is that there are nodes that requires ordering (e.g., asof join) and if the input batches are out of order (indicated by batch index), the consumer node will cache/reorder out of order batches before processing them?

@westonpace
Copy link
Member Author

I care about this work very much as well and hope can understand this better. If I remember correctly the high level idea is that there are nodes that requires ordering (e.g., asof join) and if the input batches are out of order (indicated by batch index), the consumer node will cache/reorder out of order batches before processing them?

Yes. If a node relies on ordering then it will resequence the batches before processing them. I try and take care to use both "reorder" and "resequence" independently as there are two rather different problems.

The first problem is when the input has no known ordering or is in a completely random order. In that case we must "reorder" which is "not streaming" and a "pipeline breaker" and requires us to cache all data in memory (or spill) in order to assign the order.

The second problem is when the input is mostly ordered but might be a bit noisy due to something like a parallel scan. In that case we already have a sequence number and we assume the sequence number is, generally, within some max delta from the correct ordering. In that case we only need to resequence (not reorder). This operation is "mostly streaming" and only sometimes a "pipeline breaker".

@zifengyu
Copy link

This feature is exactly what we need to adapt Acero. I tried to add ExecBatch ordering and implemented the limit operator in our product. Here is what we saw in the tests.

  1. It seems a little difficult to finish the node (and notify downstream node) as the input / output batch counts are not the same. In our case, the finish may happen either when having the limit number of rows or upstream node is finished producing (but not generated limit rows). The former occurs in Queue's deliver task while latter occurs in FetchNode's InputFinished. We did not find an easy way to sync these two components so we moved the queue part inside node and added a counter to track sent rows.

  2. We also need the offset setting to skip the first a few rows in the limit operator. Can this be included in FetchNode so we may switch back to Acero node in future?

Anyway, this proposal is critical to our using Acero. We are looking forward to its release.

@jorisvandenbossche
Copy link
Member

This is being closed in favor of other PRs as listed in the issue #32991

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants