From 57869c11687e0053a242c90623779c0c7336cd33 Mon Sep 17 00:00:00 2001 From: Kumar Mallikarjuna Date: Tue, 19 Mar 2024 20:24:45 +0530 Subject: [PATCH] [FLINK-34239][core,table] Add copy() in SerializerConfig --- .../serialization/SerializerConfig.java | 2 + .../serialization/SerializerConfigImpl.java | 25 ++++++++ .../SerializerConfigImplTest.java | 64 +++++++++++++++---- .../table/catalog/DataTypeFactoryImpl.java | 47 ++------------ 4 files changed, 83 insertions(+), 55 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java index 0b1e48377f206..43c5e927e13d9 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfig.java @@ -220,4 +220,6 @@ & Serializable> void registerTypeWithKryoSerializer( * @param classLoader a class loader to use when loading classes */ void configure(ReadableConfig configuration, ClassLoader classLoader); + + SerializerConfig copy(); } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java b/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java index c5dc1a214e983..1ffde12a61b38 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializerConfigImpl.java @@ -574,4 +574,29 @@ private void registerTypeWithTypeInfoFactory( public ExecutionConfig getExecutionConfig() { return executionConfig; } + + @Override + public SerializerConfigImpl copy() { + final SerializerConfigImpl newSerializerConfig = new SerializerConfigImpl(); + newSerializerConfig.configure(configuration, this.getClass().getClassLoader()); + + getRegisteredTypesWithKryoSerializers() + .forEach( + (c, s) -> + newSerializerConfig.registerTypeWithKryoSerializer( + c, s.getSerializer())); + getRegisteredTypesWithKryoSerializerClasses() + .forEach(newSerializerConfig::registerTypeWithKryoSerializer); + getDefaultKryoSerializers() + .forEach( + (c, s) -> + newSerializerConfig.addDefaultKryoSerializer(c, s.getSerializer())); + getDefaultKryoSerializerClasses().forEach(newSerializerConfig::addDefaultKryoSerializer); + getRegisteredKryoTypes().forEach(newSerializerConfig::registerKryoType); + getRegisteredPojoTypes().forEach(newSerializerConfig::registerPojoType); + getRegisteredTypeInfoFactories() + .forEach(newSerializerConfig::registerTypeWithTypeInfoFactory); + + return newSerializerConfig; + } } diff --git a/flink-core/src/test/java/org/apache/flink/api/common/serialization/SerializerConfigImplTest.java b/flink-core/src/test/java/org/apache/flink/api/common/serialization/SerializerConfigImplTest.java index 5c20d35dea11c..e3b361eee23f9 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/serialization/SerializerConfigImplTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/serialization/SerializerConfigImplTest.java @@ -20,9 +20,9 @@ import org.apache.flink.api.common.typeinfo.TypeInfoFactory; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.GlobalConfiguration; -import org.apache.flink.configuration.PipelineOptions; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.Serializer; @@ -34,15 +34,38 @@ import java.lang.reflect.Type; import java.util.AbstractMap; import java.util.Arrays; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import static org.apache.flink.configuration.PipelineOptions.KRYO_DEFAULT_SERIALIZERS; +import static org.apache.flink.configuration.PipelineOptions.KRYO_REGISTERED_CLASSES; +import static org.apache.flink.configuration.PipelineOptions.POJO_REGISTERED_CLASSES; +import static org.apache.flink.configuration.PipelineOptions.SERIALIZATION_CONFIG; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; class SerializerConfigImplTest { + private static final Map>, String> configs = new HashMap<>(); + + static { + configs.put( + KRYO_DEFAULT_SERIALIZERS, + "class:org.apache.flink.api.common.serialization.SerializerConfigImplTest," + + "serializer:org.apache.flink.api.common.serialization.SerializerConfigImplTest$TestSerializer1;" + + "class:org.apache.flink.api.common.serialization.SerializerConfigImplTest$TestSerializer1," + + "serializer:org.apache.flink.api.common.serialization.SerializerConfigImplTest$TestSerializer2"); + configs.put( + KRYO_REGISTERED_CLASSES, + "org.apache.flink.api.common.serialization.SerializerConfigImplTest;" + + "org.apache.flink.api.common.serialization.SerializerConfigImplTest$TestSerializer1"); + configs.put( + POJO_REGISTERED_CLASSES, + "org.apache.flink.api.common.serialization.SerializerConfigImplTest;" + + "org.apache.flink.api.common.serialization.SerializerConfigImplTest$TestSerializer1"); + } @Test void testReadingDefaultConfig() { @@ -84,9 +107,7 @@ void testLoadingRegisteredKryoTypesFromConfiguration() { Configuration configuration = new Configuration(); configuration.setString( - "pipeline.registered-kryo-types", - "org.apache.flink.api.common.serialization.SerializerConfigImplTest;" - + "org.apache.flink.api.common.serialization.SerializerConfigImplTest$TestSerializer1"); + KRYO_REGISTERED_CLASSES.key(), configs.get(KRYO_REGISTERED_CLASSES)); // mutate config according to configuration configFromConfiguration.configure( @@ -106,9 +127,7 @@ void testLoadingRegisteredPojoTypesFromConfiguration() { Configuration configuration = new Configuration(); configuration.setString( - "pipeline.registered-pojo-types", - "org.apache.flink.api.common.serialization.SerializerConfigImplTest;" - + "org.apache.flink.api.common.serialization.SerializerConfigImplTest$TestSerializer1"); + POJO_REGISTERED_CLASSES.key(), configs.get(POJO_REGISTERED_CLASSES)); // mutate config according to configuration configFromConfiguration.configure( @@ -131,11 +150,7 @@ void testLoadingDefaultKryoSerializersFromConfiguration() { Configuration configuration = new Configuration(); configuration.setString( - "pipeline.default-kryo-serializers", - "class:org.apache.flink.api.common.serialization.SerializerConfigImplTest," - + "serializer:org.apache.flink.api.common.serialization.SerializerConfigImplTest$TestSerializer1;" - + "class:org.apache.flink.api.common.serialization.SerializerConfigImplTest$TestSerializer1," - + "serializer:org.apache.flink.api.common.serialization.SerializerConfigImplTest$TestSerializer2"); + KRYO_DEFAULT_SERIALIZERS.key(), configs.get(KRYO_DEFAULT_SERIALIZERS)); // mutate config according to configuration configFromConfiguration.configure( @@ -301,9 +316,32 @@ void testLoadingIllegalSerializationConfig() { + " class org.apache.flink.api.common.serialization.SerializerConfigImplTest"); } + @Test + void testCopyDefaultSerializationConfig() { + SerializerConfig config = new SerializerConfigImpl(); + Configuration configuration = new Configuration(); + config.configure(configuration, SerializerConfigImplTest.class.getClassLoader()); + + assertThat(config.copy()).isEqualTo(config); + } + + @Test + void testCopySerializerConfig() { + SerializerConfig serializerConfig = new SerializerConfigImpl(); + Configuration configuration = new Configuration(); + configs.forEach((k, v) -> configuration.setString(k.key(), v)); + + serializerConfig.configure(configuration, SerializerConfigImplTest.class.getClassLoader()); + serializerConfig + .getDefaultKryoSerializerClasses() + .forEach(serializerConfig::registerTypeWithKryoSerializer); + + assertThat(serializerConfig.copy()).isEqualTo(serializerConfig); + } + private SerializerConfig getConfiguredSerializerConfig(String serializationConfigStr) { Configuration configuration = new Configuration(); - configuration.setString(PipelineOptions.SERIALIZATION_CONFIG.key(), serializationConfigStr); + configuration.setString(SERIALIZATION_CONFIG.key(), serializationConfigStr); SerializerConfig serializerConfig = new SerializerConfigImpl(); serializerConfig.configure(configuration, Thread.currentThread().getContextClassLoader()); diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DataTypeFactoryImpl.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DataTypeFactoryImpl.java index 8dd7a7c3a4026..b623ea16df2c6 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DataTypeFactoryImpl.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/DataTypeFactoryImpl.java @@ -42,7 +42,6 @@ import javax.annotation.Nullable; -import java.util.Optional; import java.util.function.Supplier; import static org.apache.flink.table.types.utils.TypeConversions.fromLogicalToDataType; @@ -132,49 +131,13 @@ public LogicalType createLogicalType(UnresolvedIdentifier identifier) { private static Supplier createSerializerConfig( ClassLoader classLoader, ReadableConfig config, SerializerConfig serializerConfig) { return () -> { - final SerializerConfig newSerializerConfig = new SerializerConfigImpl(); - + SerializerConfig newSerializerConfig; if (serializerConfig != null) { - if (serializerConfig.isForceKryoEnabled()) { - newSerializerConfig.setForceKryo(true); - } - - if (serializerConfig.isForceAvroEnabled()) { - newSerializerConfig.setForceAvro(true); - } - - serializerConfig - .getDefaultKryoSerializers() - .forEach( - (c, s) -> - newSerializerConfig.addDefaultKryoSerializer( - c, s.getSerializer())); - - Optional.ofNullable(serializerConfig.isForceKryoAvroEnabled().getAsBoolean()) - .ifPresent(serializerConfig::setForceKryoAvro); - - serializerConfig - .getDefaultKryoSerializerClasses() - .forEach(newSerializerConfig::addDefaultKryoSerializer); - - serializerConfig - .getRegisteredKryoTypes() - .forEach(newSerializerConfig::registerKryoType); - - serializerConfig - .getRegisteredTypesWithKryoSerializerClasses() - .forEach(newSerializerConfig::registerTypeWithKryoSerializer); - - serializerConfig - .getRegisteredTypesWithKryoSerializers() - .forEach( - (c, s) -> - newSerializerConfig.registerTypeWithKryoSerializer( - c, s.getSerializer())); + newSerializerConfig = serializerConfig.copy(); + } else { + newSerializerConfig = new SerializerConfigImpl(); + newSerializerConfig.configure(config, classLoader); } - - newSerializerConfig.configure(config, classLoader); - return newSerializerConfig; }; }