-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink: Tests alignment for the Flink Sink v2-based implemenation (IcebergSink) #11219
Draft
arkadius
wants to merge
11
commits into
apache:main
Choose a base branch
from
arkadius:IcebergSink-tests
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from all commits
Commits
Show all changes
11 commits
Select commit
Hold shift + click to select a range
cd80ac1
Tests for the Flink Sink v2-based implemenation (IcebergSink)
eec1419
Added useV2Sink parameter in tests for FlinkSink
4bd39bc
More tests covering IcebergSink
e0aa16d
Review comments
5a75006
code style fix
f7c1654
checkstyle fix
a6fa562
IcebergSinkBuilder: revert to interface
2cae011
Spotless apply
710710e
Review fixes: SinkUtil.checkAndGetEqualityFieldIds instead of Builder…
514a636
cleanup
158b5b7
typo fix
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
82 changes: 82 additions & 0 deletions
82
flink/v1.20/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergSinkBuilder.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.iceberg.flink.sink; | ||
|
||
import java.util.List; | ||
import java.util.Map; | ||
import org.apache.flink.annotation.Internal; | ||
import org.apache.flink.configuration.ReadableConfig; | ||
import org.apache.flink.streaming.api.datastream.DataStream; | ||
import org.apache.flink.streaming.api.datastream.DataStreamSink; | ||
import org.apache.flink.table.api.TableSchema; | ||
import org.apache.flink.table.data.RowData; | ||
import org.apache.flink.types.Row; | ||
import org.apache.iceberg.DistributionMode; | ||
import org.apache.iceberg.Table; | ||
import org.apache.iceberg.flink.TableLoader; | ||
|
||
@Internal | ||
/* | ||
This class is for internal purpose of transition between the previous implementation of Flink's sink (FlinkSink) | ||
and the new one implementation based on Flink v2 sink's API (IcebergSink). After we remove the previous implementation, | ||
all occurrences of this class would be replaced by direct IcebergSink usage. | ||
*/ | ||
public interface IcebergSinkBuilder<T extends IcebergSinkBuilder<?>> { | ||
|
||
T tableSchema(TableSchema newTableSchema); | ||
|
||
T tableLoader(TableLoader newTableLoader); | ||
|
||
T equalityFieldColumns(List<String> columns); | ||
|
||
T overwrite(boolean newOverwrite); | ||
|
||
T setAll(Map<String, String> properties); | ||
|
||
T flinkConf(ReadableConfig config); | ||
|
||
T table(Table newTable); | ||
|
||
T writeParallelism(int newWriteParallelism); | ||
|
||
T distributionMode(DistributionMode mode); | ||
|
||
T toBranch(String branch); | ||
|
||
T upsert(boolean enabled); | ||
|
||
DataStreamSink<?> append(); | ||
|
||
static IcebergSinkBuilder<?> forRow( | ||
DataStream<Row> input, TableSchema tableSchema, boolean useV2Sink) { | ||
if (useV2Sink) { | ||
return IcebergSink.forRow(input, tableSchema); | ||
} else { | ||
return FlinkSink.forRow(input, tableSchema); | ||
} | ||
} | ||
|
||
static IcebergSinkBuilder<?> forRowData(DataStream<RowData> input, boolean useV2Sink) { | ||
if (useV2Sink) { | ||
return IcebergSink.forRowData(input); | ||
} else { | ||
return FlinkSink.forRowData(input); | ||
} | ||
} | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not entirely convinced that this is the path we should move forward to. One of the design choices during the implementation of the new IceberSink was exactly to avoid this: instead of refactoring common code, we would just add a brand new
IcebergSink
and keep the old one untouched (or minimally touched).In my mind, this PR is about adding additional tests using the new sink, in places where only the old one is used. If we want to keep the design choice from my previous PR, then that would imply that we would be creating new test cases altogether, but only targeting the IcebergSink.
For example, we have
TestFlinkIcebergSinkExtended
then we could add a new one called
TestFlinkIcebergSinkExtendedSinkV2
which is identical as the original one, but replacing the
FlinkSink.
withIcebergSink.
And even though this will require creating extra files, I think it will be worth it as we can keep the 2 sinks (implementations and unit testing) separate.
Now, for use cases like the Dynamic Tables - which sink to use??:
I think we should follow a similar approach as the one @pvary mentioned before: defining a config, and relaying on it. For example:
adding the following to
FlinkConfigOptions
and then, in dynamic table code:
Wdyt? @pvary @stevenzwu
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here are the priorities in my head:
From my perspective, it's ok to add an interface to separate the common methods, but I prefer not to have any implementation in the interface, as this class should be removed when the deprecation happens. If this approach is not enough, then we need test only util methods to create the sinks for the tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the review folks. The listed priorities sounds reasonable. Would you agree @pvary that we should add two more bullets to this lists:
IcebergSink
is covered by unit tests to a similar level asFlinkSink
FlinkSink
toIcebergSink
as possible. For example, try to keep features and allowed properties synchronized (of course, I'm not talking about features that are not possible to synchronize because of Flink's API differences)?
With regards to this:
I think that the suggested by you approach with the interface without any implementation shouldn't break the user's code. I can do a manual test simulating the removal of the interface and verify if no exception is thrown.
@rodmeneses with regards to this comment:
I wanted to do this, but I thought that it would be better to move this to the separated PR as I mentioned in this comment: #11219 (comment) WDYAT?
I started with a lower level unit tests because I saw that they cover a lot of use cases and as a potential user of this class, I would sleep better if I knew that all tests passes also for
IcebergSink
. For the purpose of this feature flag, I think that we should add a higher level tests that would use Flink's table API/Flink SQL. Do you agree with that?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed abstract class to the interface.
BTW, I've just realized more about what's going on around the current status of unit tests for these two classes. I see that we already had some tests of
IcebergSink
that are copy-pastes ofFlinkSink
tests (e.g.TestIcebergSink
vsTestFlinkIcebergSink
). I see that these tests are already desynchronized. Don't you think that this state is becoming hard to maintain for a longer period of time?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @arkadius
I'm OK with adding a new interface
IcerbergSinkBuilder
and having theFlinkSink
andIcebergSink
implement it. So on my mind, the only things that are changing in this PR are exactly those ones. I can see that in the PR already, with only one thing I noticed which I will comment directly in the PR itself.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
During the implementation of the
IcerbergSink
I wrote unit test to cover all the new functionality specific to the new sink, and I also added unit test to cover similar functionality from theFlinkSink
. I didn't want to modify existing code (which was a design choice in this PR) so I went ahead and created new test classes that will only feature the newIcebergSink
. This is the reason I suggest yesterday about adding new test classes that only target the newIcbergSink
, so that it is similar to the approach that I followed.Having said that, I really liked the idea of your new marker interface as it minimally change behavior or APIs in the current code, and at the same time it makes very simple and clear adding a parameter "useSinkV2" for the existing unit tests, so that you can use either
FlinkSink
orIcebergSink
.