-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flink: Fix IcebergSource tableloader lifecycle management in batch mode #9173
Conversation
cc: @stevenzwu |
c51d51b
to
a26a471
Compare
ExecutorService workerPool = | ||
ThreadPools.newWorkerPool(threadName, scanContext.planParallelism()); | ||
try { | ||
plannerTableLoader.open(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I forgot to rename this variable earlier
// Create a copy of the table loader to avoid lifecycle management conflicts with the user | ||
// provided table loader. This copy is only required for split planning, which uses the | ||
// underlying io, and should be closed after split planning is complete |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not clone/open/close it here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to keep it the same as ContinuousSplitPlannerImpl
and tableName()
, which perform the open/close themselves. and for the ContinuousSplitPlannerImpl
we can't close it until the source is closed.
Would it be more readable if the logic that requires a clone--abstract the clone, open, and then close? I think that's also fine but the original table loader needs to be closed by the source at the end of initialization
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I usually prefer if the objects own the whole lifecycle of the child objects.
So ideally:
new ContinuousSplitPlannerImpl
should clone the loader itself, and keep it as long as it needs, and closes it at the end of the readingplanSplitsForBatch
should clone the loader itself, and keep it as long as it needs, and closes it at the end of the reading- For
tableName
, I am a bit confused. InplanSplitsForBatch
we need to clone the loader to do the planning, but use the old loader to get thetableName
for logging? Why not set thetableName
value in the constructor, and forget the wholelazyTable
thing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with the pattern.
wrt to the lazytable thing, this is because the Table
needed to be a transient value. But it seems we do not make a reference to it outside of the lazytable, so I will remove it.
assigner = assignerFactory.createAssigner(enumState.pendingSplits()); | ||
} | ||
|
||
TableLoader tableLoaderCopy = tableLoader.clone(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not needed
this.emitter = emitter; | ||
this.tableName = table.name(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is incorrect. table
can be null and loaded lazily.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
iceberg/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
Line 482 in 8b7a280
if (table == null) { |
The builder method already handles the nullity case. I added a preconditions to verify that table is non-null before invoking the source constructor.
We can move checkRequired
into the constructor too (I think that makes the code more readable)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously, users don't have to provide a valid table. this is a breaking change that requires users to supply a valid table object
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't this part of the code takes care of creating the table?
iceberg/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java
Lines 481 to 489 in 820fc3c
public IcebergSource<T> build() { | |
if (table == null) { | |
try (TableLoader loader = tableLoader) { | |
loader.open(); | |
this.table = tableLoader.loadTable(); | |
} catch (IOException e) { | |
throw new UncheckedIOException(e); | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing out the code from the build()
method. yeah, this is fine. I missed it earlier
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now I understand Mason's comment. totally agree that it is more readable to move the checkRequired()
inside the constructor. now it is detached.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't assume table
is not null
bd0dd16
to
4ca21d3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, Thanks!
Merged to main. |
@mas-chen: Please do not forget to port the changes to the other Flink versions. |
Why not supporting this for 1.15? Unfortunately that is still the latest version one can use on managed AWS Flink service. |
@javrasya: The general policy is that we support the last 3 Flink release in the Flink connectors. We follow the same pattern with the Iceberg connector, and support Flink 1.18/1.17/1.16 in the upcoming Iceberg releases. If you think that you would need this in older Iceberg releases, like 1.14.4 (if there will be any), then it should be backported to |
This is to fix a connection pool issue that prevents IcebergSource to startup in batch mode. The root cause is that the FileIO is closed when the tableLoader is closed via the try-with-resources statements. We need multiple copies of the user tableLoader parameter to retrieve the table name and do split planning and to close the loader after each use. The fix is to utilize clone the table loader to that it can be opened/closed by the batch split planning mechanism. This patch has been manually verified.