-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-3907][RFC-52] RFC for Introduce Secondary Index to Improve Hudi Query Performance #5370
base: master
Are you sure you want to change the base?
Conversation
rfc/README.md
Outdated
@@ -86,3 +86,4 @@ The list of all RFCs can be found here. | |||
| 48 | [LogCompaction for MOR tables](./rfc-48/rfc-48.md) | `UNDER REVIEW` | | |||
| 49 | [Support sync with DataHub](./rfc-49/rfc-49.md) | `ONGOING` | | |||
| 50 | [Improve Timeline Server](./rfc-50/rfc-50.md) | `UNDER REVIEW` | | |||
| 52 | [Support Lucene Based Record Level Index](./rfc-52/rfc-52.md) | `UNDER REVIEW` | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you create an individual PR to pick the RFC number, to follow the Hudi RFC process?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR for claim RFC 52 has been merged, I will rebase master later. Thanks.
## <a id='architecture'>Architecture</a> | ||
The main structure of secondary index contains 4 layers | ||
1. SQL Parser layer: SQL command for user to create/drop/alter/show/..., for managing secondary index | ||
2. Optimizer layer: Pick up the best physical/logical plan for a query using RBO/CBO/HBO etc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is also the part of the work of RFC?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes
We may implement the main part of them at the first stage
|
||
## <a id='architecture'>Architecture</a> | ||
The main structure of secondary index contains 4 layers | ||
1. SQL Parser layer: SQL command for user to create/drop/alter/show/..., for managing secondary index |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we would only use Spark SQL to manage secondary index?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here we would only use Spark SQL to manage secondary index?
We can support Spark SQL first, and then expand to Flink, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Best to call this out in the scope of the RFC
``getRowIdSet`` and so on | ||
4. IndexManager Factory layer: many kinds of secondary Index implementations for users to choice, | ||
such as HBase based, Lucene based, B+ tree based, etc | ||
5. Index Implementation layer: provides the ability to read, write and manage the underlying index |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
5 and 4 can be merged?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They can be merged, we divided them into 2 layers for better introduction.
is mainly implemented for ``tagLocation`` in write path. | ||
Secondary index structure will be used for query acceleration in read path, but not in write path. | ||
|
||
If Record Level Index is applied in read path for query with RecordKey predicate, it can only filter at file group level, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So it is totally different from Record Level Index? such as the data layout and data structure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be many differences between Record Level Index and Secondary Index.
Record Level Index is mainly a statistic info to reduce file level IO cost, such as column statistics info in read path.
Secondary Index is a precise index to get accurate rows for query, it also can be used in tagLocation
in write path.
It is more appropriate to use different indexes for different scenarios.
For the convenience of implementation, we can implement the first phase based on RBO(rule-based optimizer), | ||
and then gradually expand and improve CBO and HBO based on the collected statistical information. | ||
|
||
We can define RBO in several ways, for example, SQL with more than 10 predicates does not push down |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here 10 is an experience nuber?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but here is just an example.
We will do more experiments to get more accurate values.
boolean createIndex(HoodieTable table, List<Column> columns, List<IndexType> indexTypes) | ||
|
||
// Build index for the specified table | ||
boolean buildIndex(HoodieTable table, InstantTime instant) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here can build index incrementally?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It depends on the low-level index implementation.
Now, Lucene supports building index incrementally.
**trigger time** | ||
|
||
When one column's secondary index enabled, we need to build index for it automatically. Index building may | ||
consume a lot of CPU and IO resources. So, build index while compaction/clustering executing is a good solution, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if writing to hudi table without updating index, can it guarantee the correctness of query?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main logic for query remains the same as now.
The difference is in the base file, we may read out 10 rows before(filter them later and get 3 rows finally), but now we may read out 3 rows exactly needed.
We prefer plan A right now, the main purpose of this proposal is to save base file IO cost based on the | ||
assumption that base file has lots of records. | ||
|
||
One index file will be generated for each base file, containing one or more columns of index data. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if there are large number of base file, the number of index file would be also very large, the cost of reading/opening index file would be large.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good suggestion,using an index itself has an overhead.
So we need to treat it carefully whether we need to use the index, whether we need to use them all.
@huberylee are you still actively driving this one |
Yes, it's already under development |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great start. Overall I also think we need to think about the abstraction API more carefully here.
|
||
## <a id='architecture'>Architecture</a> | ||
The main structure of secondary index contains 4 layers | ||
1. SQL Parser layer: SQL command for user to create/drop/alter/show/..., for managing secondary index |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Best to call this out in the scope of the RFC
If Record Level Index is applied in read path for query with RecordKey predicate, it can only filter at file group level, | ||
while secondary index could provide the exact matched set of rows. | ||
|
||
For more details about current implementation of record level index, please refer to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure why index is dependent on read or write path
Record level index is a primary index and what you propose here is secondary index.
We should use the primary index in the read path when the filter is a pointed query for a uuid.
select c1,c2 from t where _row_key='uuid1'
We should use the secondary index in the write path when the filter is on the index.
update c1=5 from t where c2 = 20
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Read or write path here means record level index and secondary index are used in different scenarios. Record level index mainly be used with tagLocation
when writing data into a hudi table, but secondary index for filter data when querying with predicates, including normal queries and update/delete with predicates.
In hoodie, primary index is a logical constraint to ensure the uniqueness of records, and no pk index data exists, so it is not suitable for pointed query. Besides, when the query condition is not a primary key column, secondary index can also be used.
Parsing all kinds of index related SQL(Spark/Flink, etc.), including create/drop/alter index, optimize table, etc. | ||
|
||
### <a id='impl-optimizer-layer'>Optimizer Layer</a> | ||
For the convenience of implementation, we can implement the first phase based on RBO(rule-based optimizer), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can introduce a separate section in the RFC to talk about the implementation is going to be in phases. But here I would want us to think generically how this would plugin into the Optimization layer of Spark/Flink.
My thoughts around this.
RBO for index can be very misleading for query performance - especially when combined with column level stats IO skipping. We can do a simple hint based approach to always use specific index when available else implement it using CBO.
The way I am thinking for cascades based optimizer is to generate a memo of equivalent query fragments (with direct scans, using all the indexes possible) with cost and run it by the cost based optimizer to pick the right plan.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can introduce a separate section in the RFC to talk about the implementation is going to be in phases. But here I would want us to think generically how this would plugin into the Optimization layer of Spark/Flink.
My thoughts around this. RBO for index can be very misleading for query performance - especially when combined with column level stats IO skipping. We can do a simple hint based approach to always use specific index when available else implement it using CBO.
The way I am thinking for cascades based optimizer is to generate a memo of equivalent query fragments (with direct scans, using all the indexes possible) with cost and run it by the cost based optimizer to pick the right plan.
What do you think?
OK, we will introduce a separate section to talk about the implementation.
Column level statistics are currently separate from the implementation of secondary indexes, and both of their implementations intrude on spark, maybe we should unify the entrance of hudi index.
Introduce a hint to control the use of indexes may work if the query pattern is known in advance, it is desirable to be able to automatically optimize the query plan based on the index.
CBO is a good place to explore, and we need to do a lot more before we get there, including providing NDV value, cost model, and so on.
|
||
``` | ||
// Get row id set for the specified table with predicates | ||
Set<RowId> getRowIdSet(HoodieTable table, Map<column, List<PredicateList>> columnToPredicates ..) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All usages of index are scan pruning. We should define a standard API for scan pruning that is generic enough for all forms of pruning (min/max, index) can work. I am thinking more like
HoodieTableScan pruneScan(HoodieTable table, HoodieTableScan scan, List<Predicate> columnPredicates
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All usages of index are scan pruning. We should define a standard API for scan pruning that is generic enough for all forms of pruning (min/max, index) can work. I am thinking more like
HoodieTableScan pruneScan(HoodieTable table, HoodieTableScan scan, List<Predicate> columnPredicates
Great idea! Maybe we should provide a specific format called HoodieFormat
and responding Reader/Writer
to hide the read, write and filter logic of the underlying format.
Considering that there are too many index files, we prefer to store multi-column index data in one file instead of | ||
one index file per column |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we consider storing secondary index in metadata table, to improve the index reading? Then you can also leverage the Async Indexer to build an index asynchronously, without implementing the index building again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we consider storing secondary index in metadata table, to improve the index reading? Then you can also leverage the Async Indexer to build an index asynchronously, without implementing the index building again.
Some types of secondary indexes have their own file formats, and there may be random reads in the process of use, so storing secondary index in metadata table is not suitable.
Building secondary index in compaction/async indexer is under consideration.
The current implementation provides an abstract framework upon which we can easily extend other types of secondary indexes, and this document is a little out of date, I will update it later. |
What is the state of this issue? Any plans to merge RFC to mark it under active development or there are significant changes expected? |
Most of the development work is over, some PRs have been merged and another are under review. Subtasks:
|
Hi @huberylee there's "Explore other execution engines/runtimes (Ray, native Rust, Python)" on hudi roadmap, it seems like secondary index will be using Lucene (at least one implementation), so when using non-JVM client it will not be possible to maintain this index? I would like to understand how it will look like in this case, I guess the simplest would be not to use this this type of index? |
Can I search from hudi using some keywords just like elasticsearch, beaucse of the lucene secondary index? |
What is the purpose of the pull request
RFC for Introduce Secondary Index to Improve HUDI Query Performance
Brief change log
Verify this pull request
(Please pick either of the following options)
This pull request is a trivial rework / code cleanup without any test coverage.
(or)
This pull request is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Committer checklist
Has a corresponding JIRA in PR title & commit
Commit message is descriptive of the change
CI is green
Necessary doc changes done or have another open PR
For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.