Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to push down LIMIT for distributed queries #23027

Merged
merged 1 commit into from
Jun 20, 2021

Conversation

azat
Copy link
Collaborator

@azat azat commented Apr 13, 2021

Changelog category (leave one):

  • Improvement

Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):
Add ability to push down LIMIT for distributed queries

Detailed description / Documentation draft:
This way the remote nodes will not need to send all the rows, so this
will decrease network io and also this will make queries w/
optimize_aggregation_in_order=1/LIMIT X and w/o ORDER BY faster since it
initiator will not need to read all the rows, only first X (but note
that for this you need to your data to be sharded correctly or you may
get inaccurate results).

Refs: #18465
Refs: #20587

@robot-clickhouse robot-clickhouse added the pr-improvement Pull request with some product improvements label Apr 13, 2021
@azat azat marked this pull request as draft April 13, 2021 08:24
@azat azat force-pushed the distributed-push-down-limit branch from 76f59c5 to ed789fc Compare April 13, 2021 19:29
@azat azat changed the title [WIP] Add ability to push down LIMIT for distributed queries [RFC] Add ability to push down LIMIT for distributed queries Apr 13, 2021
@azat
Copy link
Collaborator Author

azat commented Apr 14, 2021

Functional stateless tests flaky check (address) — fail: 2, passed: 198

The problem is that query can still be cancelled already:

2021-04-13 23:22:38 --- /usr/share/clickhouse-test/queries/0_stateless/01814_distributed_push_down_limit.reference	2021-04-13 22:34:10.000000000 +0300
2021-04-13 23:22:38 +++ /tmp/clickhouse-test/0_stateless/01814_distributed_push_down_limit.275.stdout	2021-04-13 23:22:38.824213239 +0300
2021-04-13 23:22:38 @@ -1,5 +1,5 @@
2021-04-13 23:22:38  distributed_push_down_limit=1
2021-04-13 23:22:38 -40
2021-04-13 23:22:38 +20
2021.04.13 23:22:34.160920 [ 291 ] {7ac5de70-c26c-4e3b-bdee-3873ad1b84f1} <Debug> executeQuery: (from [::ffff:127.0.0.1]:42778, initial_query_id: 66cf643c-b1b4-4f7e-942a-c4c3493029f6, using production parser) (comment: /usr/share/clickhouse-test/queries/0_stateless/01814_distributed_push_down_limit.sql) WITH CAST('test_31uut9', 'String') AS id_distributed_push_down_limit_1 SELECT key FROM test_31uut9.data_01814 GROUP BY key LIMIT 10
2021.04.13 23:22:34.214964 [ 291 ] {7ac5de70-c26c-4e3b-bdee-3873ad1b84f1} <Trace> ContextAccess (default): Access granted: SELECT(key) ON test_31uut9.data_01814
2021.04.13 23:22:34.216790 [ 291 ] {7ac5de70-c26c-4e3b-bdee-3873ad1b84f1} <Debug> test_31uut9.data_01814 (b484ad2e-0591-4faf-8110-1dcbd7cdd0db) (SelectExecutor): Key condition: unknown
2021.04.13 23:22:34.227245 [ 291 ] {7ac5de70-c26c-4e3b-bdee-3873ad1b84f1} <Debug> test_31uut9.data_01814 (b484ad2e-0591-4faf-8110-1dcbd7cdd0db) (SelectExecutor): Selected 1/1 parts by partition key, 1 parts by primary key, 10/11 marks by primary key, 10 marks to read from 1 ranges
2021.04.13 23:22:34.228452 [ 291 ] {7ac5de70-c26c-4e3b-bdee-3873ad1b84f1} <Trace> MergeTreeSelectProcessor: Reading 3 ranges from part all_1_1_0, approx. 100 rows starting from 0
2021.04.13 23:22:34.229104 [ 291 ] {7ac5de70-c26c-4e3b-bdee-3873ad1b84f1} <Trace> InterpreterSelectQuery: FetchColumns -> WithMergeableStateAfterAggregationAndLimit
2021.04.13 23:22:34.339085 [ 291 ] {7ac5de70-c26c-4e3b-bdee-3873ad1b84f1} <Information> TCPHandler: Query was cancelled.
2021.04.13 23:22:34.416573 [ 291 ] {7ac5de70-c26c-4e3b-bdee-3873ad1b84f1} <Information> executeQuery: Read 20 rows, 80.00 B in 0.254374666 sec., 78 rows/sec., 314.50 B/sec.
2021.04.13 23:22:34.419006 [ 291 ] {7ac5de70-c26c-4e3b-bdee-3873ad1b84f1} <Debug> MemoryTracker: Peak memory usage (for query): 0.00 B.

@azat azat marked this pull request as ready for review April 22, 2021 05:52
@azat azat force-pushed the distributed-push-down-limit branch from ed789fc to 702d17c Compare April 24, 2021 21:36
@azat
Copy link
Collaborator Author

azat commented Apr 29, 2021

@alexey-milovidov maybe you can take a look? (or someone else?)

@azat
Copy link
Collaborator Author

azat commented May 30, 2021

@alexey-milovidov can you please take a look?

@azat azat force-pushed the distributed-push-down-limit branch from 702d17c to e3c6822 Compare May 30, 2021 07:23
@alexey-milovidov alexey-milovidov changed the title [RFC] Add ability to push down LIMIT for distributed queries Add ability to push down LIMIT for distributed queries May 30, 2021
@CurtizJ CurtizJ self-assigned this May 30, 2021
@alexey-milovidov
Copy link
Member

@azat This looks like very intrusive change for a rare use case that will probably not many will need.
Could you please provide more motivation for these changes?

@alexey-milovidov
Copy link
Member

alexey-milovidov commented May 30, 2021

I see the motivation here: #20587
But adding a new QueryProcessingStage for this marginal case looks frightening.

@azat
Copy link
Collaborator Author

azat commented May 31, 2021

Could you please provide more motivation for these changes?

This way the remote nodes may avoid sending too much rows to the initiator.

I see the motivation here: #20587

Right.

But adding a new QueryProcessingStage for this marginal case looks frightening

I may understand your concerns: having lots of processing stages will increase the complexity of interpreter (it is already not that clean and simple right now).
Although using separate QueryProcessingStage looks pretty natural.
Another option is to make WithMergeableStateAfterAggregation always, but in this case you will not be able to disable only this optimization, i.e. if there will be some issue with it.

Maybe you have some other thoughts on how to do this?

@azat azat force-pushed the distributed-push-down-limit branch 7 times, most recently from 19feae8 to a18a063 Compare June 5, 2021 11:19
This way the remote nodes will not need to send all the rows, so this
will decrease network io and also this will make queries w/
optimize_aggregation_in_order=1/LIMIT X and w/o ORDER BY faster since it
initiator will not need to read all the rows, only first X (but note
that for this you need to your data to be sharded correctly or you may
get inaccurate results).

Note, that having lots of processing stages will increase the complexity
of interpreter (it is already not that clean and simple right now).

Although using separate QueryProcessingStage looks pretty natural.

Another option is to make WithMergeableStateAfterAggregation always, but
in this case you will not be able to disable only this optimization,
i.e. if there will be some issue with it.

v2: fix OFFSET
v3: convert 01814_distributed_push_down_limit test to .sh and add retries
v4: add test with OFFSET
v5: add new query stage into the bash completion
v6/tests: use LIMIT O,L syntax over LIMIT L OFFSET O since it is broken in ANTLR parser
          https://clickhouse-test-reports.s3.yandex.net/23027/a18a06399b7aeacba7c50b5d1e981ada5df19745/functional_stateless_tests_(antlr_debug).html#fail1
v7/tests: set use_hedged_requests to 0, to avoid excessive log entries on retries
          https://clickhouse-test-reports.s3.yandex.net/23027/a18a06399b7aeacba7c50b5d1e981ada5df19745/functional_stateless_tests_flaky_check_(address).html#fail1
@azat azat force-pushed the distributed-push-down-limit branch from a18a063 to 18e8f0e Compare June 8, 2021 23:30
@azat azat requested a review from CurtizJ June 9, 2021 07:18
Copy link
Member

@CurtizJ CurtizJ left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We decided that introducing new QueryProcessingStage is ok for this task, since it's the only way to pass information about processing stage between initiator and shards.

@CurtizJ CurtizJ merged commit d8b6f15 into ClickHouse:master Jun 20, 2021
@azat azat deleted the distributed-push-down-limit branch June 20, 2021 20:29
@azat
Copy link
Collaborator Author

azat commented Jun 29, 2021

ClickHouse Fast Test for PR #23027

01532_execute_merges_on_single_replica

2021-06-05 09:25:23 [6f221791a1e7] 2021.06.05 09:25:23.362767 [ 19894 ] {865aadc7-f4a5-4890-839d-b5f8d0096549} <Error> executeQuery: Code: 242, e.displayText() = DB::Exception: Table is in readonly mode (zookeeper path: /clickhouse/tables/test_01532/execute_on_single_replica) (version 21.7.1.1) (from [::1]:50834) (in query: ALTER TABLE execute_on_single_replica_r2 MODIFY SETTING execute_merges_on_single_replica_time_threshold=0;), Stack trace (when copying this message, always include the lines below):

ZooKeeper session was lost and replica wasn't marked as readonly yet (although it became leader already, so the race window is pretty small here)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
pr-improvement Pull request with some product improvements
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants