-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-36406] Close MetadataApplier when the job stops #3623
base: master
Are you sure you want to change the base?
Conversation
5761577
to
5ec21a4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for @morozov's great work! Just left some minor comments.
@Override | ||
default void close() throws Exception {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding a JavaDocs for this method would be nice.
|
||
try { | ||
metadataApplier.close(); | ||
} catch (Exception e) { | ||
throw new IOException("Failed to close metadata applier.", e); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've tested locally with a dummy MetadataApplier that always throws exception but gets no output. Seems OperatorCoordinators will be closed by calling OperatorCoordinatorHandler#disposeAllOperatorCoordinators
, where exceptions thrown here are ignored without being logged:
// DefaultOperatorCoordinatorHandler.java
@Override
public void disposeAllOperatorCoordinators() {
coordinatorMap.values().forEach(IOUtils::closeQuietly);
}
// IOUtils.java
public static void closeQuietly(AutoCloseable closeable) {
try {
if (closeable != null) {
closeable.close();
}
} catch (Throwable ignored) {
}
}
Maybe we can add some logging message if metadata appliers are not closed as expected?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch 👍 Thanks for the detailed explanation.
5ec21a4
to
f800a47
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! cc @ruanhang1993 @lvyanquan
@ruanhang1993 could you take a look, please? |
This pull request implements closing
MetadataApplier
when the job stops. This allowsMetadataApplier
implementations to release the resources that they may use to apply schema changes (e.g. a JDBC connection).Choice of the interface
SchemaRegistryRequestHandler
(which will closeMetadataApplier
) implementsCloseable
, so it would make sense to declareMetadataApplier
asCloseable
as well.PaimonMetadataApplier
internally instantiates a PaimonCatalog
, which it ideally should close, and which isAutocloseable
.To me, it makes more sense to declare
MetadataApplier
asAutoCloseable
. The reason is thatCloseable
is an IO-specific interface, which is declared injava.io
and throwsIOException
inclose()
.AutoCloseable
is more generic and seems to be more suitable forMetadataApplier
, whose implementations doesn't necessarily perform IO.Testing
I didn't find existing tests focusing on
SchemaRegistryRequestHandler
orSchemaRegistry
, so I dind't implement a unit test. Testing of the changes inSchemaRegistryRequestHandler
can be done by implementing some logging inValuesMetadataApplier#close
and running any test that uses it (e.g.FlinkPipelineComposerITCase
)Additional scope
Just as a demonstration of where this new functionality can be applied, I implemented
close()
inPaimonMetadataApplier
. I didn't test it and am not sure that this is the right thing to do. I can revert this part, if necessary.Further considerations
The problem of closing resources seems also relevant for the
MetadataAccessor
interface. Specifically,MySqlMetadataAccessor
disconnects from the database after each call but some other implementations may require maintaining a persistent connection.