-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spec: Add column equality delete files #360
Comments
Hi @rdblue @aokolnychyi, May I know whether we are working on implementing updates/deletes/upserts feature? Is this the design doc we are following? |
@chenjunjiedada, that's an early doc that outlines options, but I wouldn't say it captures the current design. The best design doc we have right now is the one @erikwright wrote: https://docs.google.com/document/d/1FMKh_SQ6xSUUmoCA8LerTkzIxDUN5JbStQp5Hzot4eo/edit# |
Let me link this issue to the 1. delete from table where a=100 and b = 101;
2. delete from table where b = 101 and c=102 and d=103;
3. delete from table where d=104; for the delete differential file, we will have two columns : 1. field_ids ; 2. values. so the row-delete operations can be encoded as following:
for the field_ids, we can optimize it by bit map, say it will encode as following (the fields_ids will be a bit_map, the values would be a list<binary>):
FYI @rdblue |
@openinx, what is the motivating use case for a format like that one? The use cases that we've been considering for equality deletes mostly have a stable column set that will be used for deletes. For example, a CDC use case typically uses the primary key of the table that is changing. In that case, it's easier if the schema of the delete file is a data file with just the primary key columns. So if I had a What are the use cases for a format that allows a dynamic column set for every row? I would want to make sure that it is worth the extra complexity. |
Take the GDPR user case as example, such as a user with many properties, and there're few deletions which need to delete several properties combination, such as
Think about the design again, assume the CDC case that we have a table with (id, a) two columns, and
As you said, we will produce two different files for this transaction, one is data file and another one is delete differential file. so the data file will have:
and deletion file will only have the primary key column (as you said if I understand correctly):
When we do the merge on read, I guess both So I'm thinking that for equality-delete, we will need to keep all columns in the delete differential files so that we could get ride of this issue (say INSERT and DELETE the same record few times in a single iceberg transaction). |
@openinx, sorry I haven't had a chance to reply before now. We missed you at the last sync up, where we talked about this. I added this to the notes, but I wanted to follow up here as well. Sounds like there are 2 main reasons to use a dynamic schema. The first is that you might want to delete with different filters, like dropping a user by first/last name, or dropping by user id. I think it's reasonable to say there may be more than one way to delete data, but I don't think that there is much value to encoding those deletes in the same file. There is going to be a small number of column sets used to identify and delete records, and I think it's worth just using a different delete for each set of columns. Then we wouldn't need the complexity of encoding a map with various value types, and can take advantage of columnar formats. The second argument for a dynamic schema -- the CDC case with rapid upserts for the same row -- is interesting, and something I've been thinking about how to solve as well. Let me present the approach I've been thinking about. Continuing with your example, I would actually encode the delete using a positional delete rather than an equality delete. That removes the concern about deleting row with ID 1 twice because the delete would be for The position-based approach has a few benefits as well:
The second upsert would be encoded as I think that using a positional delete is a good solution to this problem. That would also mean that we won't need to apply equality deletes to any data files with the same sequence number, which is a small win. Only positional delete files would need to be applied to data files with the same sequence number. What do you think? |
Well, seems I thought differently with yours. For my understanding, your solution will divide the deletes into two parts: 1. equality-deletion ; 2. positional-deletion. The equality-deletion will only be applied to data files with sequence_number < current delete file, the positional-deletion will only be applied to data files with the same sequence number. There're several problems in my thought:
I'm writing the some document for the |
Sounds like there is a slight difference in how we are framing the problem. I have been thinking of an upsert case, where we get an entire replacement row but not the previous row values. From your comments, it sounds like you are thinking of a case where we have both the previous row and the replacement row (possibly as separate delete and insert events). It's good to think through that case because it does change what we might do here. Without the previous row, I think you would have to keep track of previously written values, so the position deletes are attractive because you have to keep less information -- a position and the id columns, rather than all columns. But with the previous row, you wouldn't need to keep any state. I agree that this makes it a good alternative if you have that information in your CDC stream. There's still a major problem with using equality deletes that I pointed out: deleting with row values would potentially delete future rows when two copies of a row have the same values. Using a unique ID for each CDC event could mitigate the problem, but duplicate events from at-least-once processing would still incorrectly delete data rows. How would you avoid this problem? To your point about replaying events, I think I agree that it would be a good to keep track of the previous column values if they are present in a CDC delete event, but I'm not sure it would enable a source to replay exactly the events that were received -- after all, we're still separating inserts and deletes into separate files so we can't easily reconstruct an upsert event. Assuming that it is likely valuable to track the deleted data values, I don't think that it makes a case for dynamic columns:
Clarifications
Position deletes would apply to data files with a sequence number If we want to use equality delete files for this, then we would similarly apply an equality delete file when a data file's sequence number is The optimization I was suggesting is if we don't use equality deletes to encode deletes within the same commit, we can use
You could avoid spilling to disk by closing the data file when this is getting too expensive, instead of spilling to disk. If we assume that the ID column is a UUID and the position is a long, then each (id, pos) pair takes about 24 bytes. If we double that for JVM overhead, then we can track about 2.8 million rows in 128 MB of memory. While that's significant, it isn't unreasonable to cut off data files well before 2.8 million rows. Even if the checkpoint interval is as long as 5 minutes, then then the rate of rows to the file would need to be 9,300 rows per second to exceed that limit.
This would happen anyway because data files will need to merge multiple delete files. Position deletes are more efficient because they are more targeted, so deployments will very likely have regular compaction from equality deletes to position deletes. That means at any given time, a file may have both delete formats to merge in. This is not that much complexity as long as we can filter the results of another filter. |
The
In my opinion, CDC events from RDBMS should always provide both old values and new values (such as
For spark streaming, it only guarantee the
I think the mixed equality-deletion/positional-deletion you described seems hard to reconstruct the correct event order. The pure positional-deletion described in this document could restore the original order.
Yes, I agree that we don't need to care about the dynamic columns now. In the real CDC events, it always provide all column values for a row. Please ignore the propose about the dynamic columns encoding. |
I think we're in agreement on a few points for moving forward:
The doc describes ways to use both equality and position deletes for CDC streams. Sounds like equality would be ideal if (1) events have a unique ID, and (2) the execution has exactly-once semantics. Otherwise, I think it is possible to use position deletes. Which do you plan to target? |
Thanks for your discussion and summary. Sorry for joining this discussion lately. Here is my thinking, correct me if I am wrong: Mode1Iceberg as a database: Users can using SQLs to query this iceberg table: "insert into", "update ... where x=a”, "delete from where x=a“. For equality delete, with high performance, iceberg should just writes number of physical records at constant level for these SQLs. What is the solution for mode1?
Mode2Iceberg as a CDC receiver: Theoretically, every records from CDC stream should just affect single record to Database.
What is the solution for mode2?
|
@rdblue, our team also had another discussion about the user cases and solutions. For the user cases, we have three cases:
INERT INTO t SET @1=1 @2='apple';
DELETE FROM t WHERE @1=1 @2='apple';
UPDATE t WHERE @1=1 @2='apple' SET @1=1 @2='pear';
UPSERT(1, 'apple')
DELETE(1, 'apple')
DELETE(1) // DELETE don't have to provide all the column values, the primary key is enough.
The three cases should match the CDC requirements from users, and we have the three goals to meet the requirements: 1) Fast ingestion; 2) Acceptable Batch Read performance; 3) an equivalent stream so that we could keep the eventual consistency between source table and sink table. Now, we have three solutions:
In summary, the solution#2 and solution#3 resovled the before-after order for downstream, but they both have few issues about batch read performance. The solution #2 will need to check whether there exist a delete row whose The solution#3 will need to do For solution#1, the problem is: how to reconstruct the stream events which could replay to downstream corrently ? We could take an example, the CDC events:
It will generate three files:
NOTICE: here we add the Now let's consider the replay issue again, we could regard the equality-delete file are deleting existed rows before this transaction, so it should be replayed firstly. Then we could restore the correct event order by JOIN INSERT file and positional-delete file. In theory, we could maintain the eventual consistency between source table and sink table because we gurantee:
So seems the solution#1 could be a candidate solution for the primary key case#1 and case#2. |
@openinx, I agree with the guarantees that you propose for the reconstructed CDC stream. It sounds like the first solution, with mixed equality and position deletes is probably the design we should use since it will have good read performance and good write performance, with the cost being the ID to position map we need while a data file is open. For |
Is this document up-to-date and discusses how to solve open questions? |
Thanks @aokolnychyi for bringing this up. Part of the document is out-of-date now, I will update those parts in next days. Thanks. |
The spec was updated with equality delete files in #1499. |
One option for encoding row-level deletes is to track a tuple of values that indicate a row is deleted. The delete file will contain a subset of data columns and each row will be used to match data rows using equality.
For example, a delete file may contain a single
2: id
column with 2 rows,18
and19
. This indicates the any row with ID 18 or 19 is deleted in all files to which the delete is applied (limited by sequence number and partition scope).This format should be documented in the spec.
The text was updated successfully, but these errors were encountered: