Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36474] Support merging timestamp columns when routing #3636

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
import org.apache.flink.cdc.common.types.DataTypeRoot;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.DecimalType;
import org.apache.flink.cdc.common.types.LocalZonedTimestampType;
import org.apache.flink.cdc.common.types.TimeType;
import org.apache.flink.cdc.common.types.TimestampType;
import org.apache.flink.cdc.common.types.ZonedTimestampType;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -176,6 +180,27 @@ public static DataType inferWiderType(DataType lType, DataType rType) {
if (lType.equals(rType)) {
// identical type
mergedType = rType;
} else if (lType instanceof TimestampType && rType instanceof TimestampType) {
return DataTypes.TIMESTAMP(
Math.max(
((TimestampType) lType).getPrecision(),
((TimestampType) rType).getPrecision()));
} else if (lType instanceof ZonedTimestampType && rType instanceof ZonedTimestampType) {
return DataTypes.TIMESTAMP_TZ(
Math.max(
((ZonedTimestampType) lType).getPrecision(),
((ZonedTimestampType) rType).getPrecision()));
} else if (lType instanceof LocalZonedTimestampType
&& rType instanceof LocalZonedTimestampType) {
return DataTypes.TIMESTAMP_LTZ(
Math.max(
((LocalZonedTimestampType) lType).getPrecision(),
((LocalZonedTimestampType) rType).getPrecision()));
} else if (lType.is(DataTypeFamily.TIMESTAMP) && rType.is(DataTypeFamily.TIMESTAMP)) {
return DataTypes.TIMESTAMP(TimestampType.MAX_PRECISION);
} else if (lType instanceof TimeType && rType instanceof TimeType) {
return DataTypes.TIME(
Math.max(((TimeType) lType).getPrecision(), ((TimeType) rType).getPrecision()));
} else if (lType.is(DataTypeFamily.INTEGER_NUMERIC)
&& rType.is(DataTypeFamily.INTEGER_NUMERIC)) {
mergedType = DataTypes.BIGINT();
Expand All @@ -185,7 +210,7 @@ public static DataType inferWiderType(DataType lType, DataType rType) {
} else if (lType.is(DataTypeFamily.APPROXIMATE_NUMERIC)
&& rType.is(DataTypeFamily.APPROXIMATE_NUMERIC)) {
mergedType = DataTypes.DOUBLE();
} else if (lType.is(DataTypeRoot.DECIMAL) && rType.is(DataTypeRoot.DECIMAL)) {
} else if (lType instanceof DecimalType && rType instanceof DecimalType) {
// Merge two decimal types
DecimalType lhsDecimal = (DecimalType) lType;
DecimalType rhsDecimal = (DecimalType) rType;
Expand All @@ -195,7 +220,7 @@ public static DataType inferWiderType(DataType lType, DataType rType) {
rhsDecimal.getPrecision() - rhsDecimal.getScale());
int resultScale = Math.max(lhsDecimal.getScale(), rhsDecimal.getScale());
mergedType = DataTypes.DECIMAL(resultIntDigits + resultScale, resultScale);
} else if (lType.is(DataTypeRoot.DECIMAL) && rType.is(DataTypeFamily.EXACT_NUMERIC)) {
} else if (lType instanceof DecimalType && rType.is(DataTypeFamily.EXACT_NUMERIC)) {
// Merge decimal and int
DecimalType lhsDecimal = (DecimalType) lType;
mergedType =
Expand All @@ -204,7 +229,7 @@ public static DataType inferWiderType(DataType lType, DataType rType) {
lhsDecimal.getPrecision(),
lhsDecimal.getScale() + getNumericPrecision(rType)),
lhsDecimal.getScale());
} else if (rType.is(DataTypeRoot.DECIMAL) && lType.is(DataTypeFamily.EXACT_NUMERIC)) {
} else if (rType instanceof DecimalType && lType.is(DataTypeFamily.EXACT_NUMERIC)) {
// Merge decimal and int
DecimalType rhsDecimal = (DecimalType) rType;
mergedType =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,38 @@ public void testInferWiderType() {
DataTypes.INT().nullable(), DataTypes.INT().nullable()))
.isEqualTo(DataTypes.INT().nullable());

// Test merging temporal types
Assertions.assertThat(SchemaUtils.inferWiderType(DataTypes.TIME(1), DataTypes.TIME(7)))
.isEqualTo(DataTypes.TIME(7));

Assertions.assertThat(
SchemaUtils.inferWiderType(DataTypes.TIMESTAMP(9), DataTypes.TIMESTAMP(6)))
.isEqualTo(DataTypes.TIMESTAMP(9));

Assertions.assertThat(
SchemaUtils.inferWiderType(
DataTypes.TIMESTAMP_TZ(3), DataTypes.TIMESTAMP_TZ(7)))
.isEqualTo(DataTypes.TIMESTAMP_TZ(7));

Assertions.assertThat(
SchemaUtils.inferWiderType(
DataTypes.TIMESTAMP_LTZ(2), DataTypes.TIMESTAMP_LTZ(1)))
.isEqualTo(DataTypes.TIMESTAMP_LTZ(2));

Assertions.assertThat(
SchemaUtils.inferWiderType(
DataTypes.TIMESTAMP_LTZ(), DataTypes.TIMESTAMP()))
.isEqualTo(DataTypes.TIMESTAMP(9));

Assertions.assertThat(
SchemaUtils.inferWiderType(DataTypes.TIMESTAMP_TZ(), DataTypes.TIMESTAMP()))
.isEqualTo(DataTypes.TIMESTAMP(9));

Assertions.assertThat(
SchemaUtils.inferWiderType(
DataTypes.TIMESTAMP_LTZ(), DataTypes.TIMESTAMP_TZ()))
.isEqualTo(DataTypes.TIMESTAMP(9));

// incompatible type merges test
Assertions.assertThatThrownBy(
() -> SchemaUtils.inferWiderType(DataTypes.INT(), DataTypes.DOUBLE()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.cdc.composer.flink;

import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.configuration.Configuration;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
Expand Down Expand Up @@ -88,17 +89,19 @@ private FlinkPipelineComposer(StreamExecutionEnvironment env, boolean isBlocking

@Override
public PipelineExecution compose(PipelineDef pipelineDef) {
int parallelism = pipelineDef.getConfig().get(PipelineOptions.PIPELINE_PARALLELISM);
Configuration pipelineDefConfig = pipelineDef.getConfig();

int parallelism = pipelineDefConfig.get(PipelineOptions.PIPELINE_PARALLELISM);
env.getConfig().setParallelism(parallelism);

SchemaChangeBehavior schemaChangeBehavior =
pipelineDef.getConfig().get(PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR);
pipelineDefConfig.get(PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR);

// Build Source Operator
DataSourceTranslator sourceTranslator = new DataSourceTranslator();
DataStream<Event> stream =
sourceTranslator.translate(
pipelineDef.getSource(), env, pipelineDef.getConfig(), parallelism);
pipelineDef.getSource(), env, pipelineDefConfig, parallelism);

// Build PreTransformOperator for processing Schema Event
TransformTranslator transformTranslator = new TransformTranslator();
Expand All @@ -110,10 +113,9 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
SchemaOperatorTranslator schemaOperatorTranslator =
new SchemaOperatorTranslator(
schemaChangeBehavior,
pipelineDef.getConfig().get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID),
pipelineDef
.getConfig()
.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT));
pipelineDefConfig.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID),
pipelineDefConfig.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT),
pipelineDefConfig.get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE));
OperatorIDGenerator schemaOperatorIDGenerator =
new OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid());

Expand All @@ -122,13 +124,13 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
transformTranslator.translatePostTransform(
stream,
pipelineDef.getTransforms(),
pipelineDef.getConfig().get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE),
pipelineDefConfig.get(PipelineOptions.PIPELINE_LOCAL_TIME_ZONE),
pipelineDef.getUdfs());

// Build DataSink in advance as schema operator requires MetadataApplier
DataSinkTranslator sinkTranslator = new DataSinkTranslator();
DataSink dataSink =
sinkTranslator.createDataSink(pipelineDef.getSink(), pipelineDef.getConfig(), env);
sinkTranslator.createDataSink(pipelineDef.getSink(), pipelineDefConfig, env);

stream =
schemaOperatorTranslator.translate(
Expand Down Expand Up @@ -157,7 +159,7 @@ public PipelineExecution compose(PipelineDef pipelineDef) {
addFrameworkJars();

return new FlinkPipelineExecution(
env, pipelineDef.getConfig().get(PipelineOptions.PIPELINE_NAME), isBlocking);
env, pipelineDefConfig.get(PipelineOptions.PIPELINE_NAME), isBlocking);
}

private void addFrameworkJars() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.flink.cdc.common.annotation.Internal;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
import org.apache.flink.cdc.common.route.RouteRule;
import org.apache.flink.cdc.common.sink.MetadataApplier;
Expand All @@ -39,24 +38,27 @@
public class SchemaOperatorTranslator {
private final SchemaChangeBehavior schemaChangeBehavior;
private final String schemaOperatorUid;

private final Duration rpcTimeOut;
private final String timezone;

public SchemaOperatorTranslator(
SchemaChangeBehavior schemaChangeBehavior,
String schemaOperatorUid,
Duration rpcTimeOut) {
Duration rpcTimeOut,
String timezone) {
this.schemaChangeBehavior = schemaChangeBehavior;
this.schemaOperatorUid = schemaOperatorUid;
this.rpcTimeOut = rpcTimeOut;
this.timezone = timezone;
}

public DataStream<Event> translate(
DataStream<Event> input,
int parallelism,
MetadataApplier metadataApplier,
List<RouteDef> routes) {
return addSchemaOperator(input, parallelism, metadataApplier, routes, schemaChangeBehavior);
return addSchemaOperator(
input, parallelism, metadataApplier, routes, schemaChangeBehavior, timezone);
}

public String getSchemaOperatorUid() {
Expand All @@ -68,7 +70,8 @@ private DataStream<Event> addSchemaOperator(
int parallelism,
MetadataApplier metadataApplier,
List<RouteDef> routes,
SchemaChangeBehavior schemaChangeBehavior) {
SchemaChangeBehavior schemaChangeBehavior,
String timezone) {
List<RouteRule> routingRules = new ArrayList<>();
for (RouteDef route : routes) {
routingRules.add(
Expand All @@ -82,27 +85,12 @@ private DataStream<Event> addSchemaOperator(
"SchemaOperator",
new EventTypeInfo(),
new SchemaOperatorFactory(
metadataApplier, routingRules, rpcTimeOut, schemaChangeBehavior));
metadataApplier,
routingRules,
rpcTimeOut,
schemaChangeBehavior,
timezone));
stream.uid(schemaOperatorUid).setParallelism(parallelism);
return stream;
}

private DataStream<Event> dropSchemaChangeEvent(DataStream<Event> input, int parallelism) {
return input.filter(event -> !(event instanceof SchemaChangeEvent))
.setParallelism(parallelism);
}

private DataStream<Event> exceptionOnSchemaChange(DataStream<Event> input, int parallelism) {
return input.map(
event -> {
if (event instanceof SchemaChangeEvent) {
throw new RuntimeException(
String.format(
"Aborting execution as the pipeline encountered a schema change event: %s",
event));
}
return event;
})
.setParallelism(parallelism);
}
}
Loading
Loading