Skip to content

Commit

Permalink
[FLINK-36461] Fix schema evolution failure with un-transformed tables
Browse files Browse the repository at this point in the history
Signed-off-by: yuxiqian <[email protected]>
  • Loading branch information
yuxiqian committed Oct 10, 2024
1 parent 4b13c49 commit bd7aca0
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,101 @@ void testPostAsteriskWithSchemaEvolution() throws Exception {
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[15 -> Oops, 12th, 15, Oops], after=[], op=DELETE, meta=()}");
}

@Test
void testTransformUnmatchedSchemaEvolution() throws Exception {
FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();

// Setup value source
Configuration sourceConfig = new Configuration();

sourceConfig.set(
ValuesDataSourceOptions.EVENT_SET_ID,
ValuesDataSourceHelper.EventSetId.CUSTOM_SOURCE_EVENTS);

TableId tableId = TableId.tableId("default_namespace", "default_schema", "mytable1");
List<Event> events = generateSchemaEvolutionEvents(tableId);

ValuesDataSourceHelper.setSourceEvents(Collections.singletonList(events));

SourceDef sourceDef =
new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source", sourceConfig);

// Setup value sink
Configuration sinkConfig = new Configuration();
SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value Sink", sinkConfig);

// Setup pipeline
Configuration pipelineConfig = new Configuration();
pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
pipelineConfig.set(
PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR, SchemaChangeBehavior.EVOLVE);
PipelineDef pipelineDef =
new PipelineDef(
sourceDef,
sinkDef,
Collections.emptyList(),
Collections.singletonList(
new TransformDef(
"foo.bar.baz", // This doesn't match given tableId
"*",
null,
null,
null,
null,
null)),
Collections.emptyList(),
pipelineConfig);

// Execute the pipeline
PipelineExecution execution = composer.compose(pipelineDef);
execution.execute();

// Check the order and content of all received events
String[] outputEvents = outCaptor.toString().trim().split("\n");

assertThat(outputEvents)
.containsExactly(
// Initial stage
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT,`name` STRING,`age` INT}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 21], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Barcarolle, 22], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3, Cecily, 23], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3, Cecily, 23], after=[3, Colin, 24], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Barcarolle, 22], after=[], op=DELETE, meta=()}",

// Add column stage
"AddColumnEvent{tableId=default_namespace.default_schema.mytable1, addedColumns=[ColumnWithPosition{column=`rank` STRING, position=BEFORE, existedColumnName=id}, ColumnWithPosition{column=`gender` TINYINT, position=AFTER, existedColumnName=age}]}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1st, 4, Derrida, 24, 0], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2nd, 5, Eve, 25, 1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2nd, 5, Eve, 25, 1], after=[2nd, 5, Eva, 20, 2], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[3rd, 6, Fiona, 26, 3], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[3rd, 6, Fiona, 26, 3], after=[], op=DELETE, meta=()}",

// Alter column type stage
"AlterColumnTypeEvent{tableId=default_namespace.default_schema.mytable1, typeMapping={gender=INT, name=VARCHAR(17), age=DOUBLE}, oldTypeMapping={gender=TINYINT, name=STRING, age=INT}}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[4th, 7, Gem, 19.0, -1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[5th, 8, Helen, 18.0, -2], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[5th, 8, Helen, 18.0, -2], after=[5th, 8, Harry, 18.0, -3], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[6th, 9, IINA, 17.0, 0], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[6th, 9, IINA, 17.0, 0], after=[], op=DELETE, meta=()}",

// Rename column stage
"RenameColumnEvent{tableId=default_namespace.default_schema.mytable1, nameMapping={gender=biological_sex, age=toshi}}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[7th, 10, Julia, 24.0, 1], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[8th, 11, Kalle, 23.0, 0], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[8th, 11, Kalle, 23.0, 0], after=[8th, 11, Kella, 18.0, 0], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[9th, 12, Lynx, 17.0, 0], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[9th, 12, Lynx, 17.0, 0], after=[], op=DELETE, meta=()}",

// Drop column stage
"DropColumnEvent{tableId=default_namespace.default_schema.mytable1, droppedColumnNames=[biological_sex, toshi]}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[10th, 13, Munroe], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[11th, 14, Neko], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[11th, 14, Neko], after=[11th, 14, Nein], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[12th, 15, Oops], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[12th, 15, Oops], after=[], op=DELETE, meta=()}");
}

private List<Event> generateSchemaEvolutionEvents(TableId tableId) {
List<Event> events = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.lang.reflect.InvocationTargetException;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -242,6 +243,8 @@ public void processElement(StreamRecord<Event> element) throws Exception {

private Optional<SchemaChangeEvent> cacheSchema(SchemaChangeEvent event) throws Exception {
TableId tableId = event.tableId();
List<String> columnNamesBeforeChange = Collections.emptyList();

if (event instanceof CreateTableEvent) {
CreateTableEvent createTableEvent = (CreateTableEvent) event;
Set<String> projectedColumnsSet =
Expand Down Expand Up @@ -286,6 +289,9 @@ private Optional<SchemaChangeEvent> cacheSchema(SchemaChangeEvent event) throws
createTableEvent.getSchema().getColumnNames().stream()
.filter(projectedColumnsSet::contains)
.collect(Collectors.toList()));
} else {
columnNamesBeforeChange =
getPostTransformChangeInfo(tableId).getPreTransformedSchema().getColumnNames();
}

Schema schema;
Expand All @@ -304,9 +310,12 @@ private Optional<SchemaChangeEvent> cacheSchema(SchemaChangeEvent event) throws

if (event instanceof CreateTableEvent) {
return Optional.of(new CreateTableEvent(tableId, projectedSchema));
} else if (hasAsteriskMap.getOrDefault(tableId, true)) {
// See comments in PreTransformOperator#cacheChangeSchema method.
return SchemaUtils.transformSchemaChangeEvent(true, columnNamesBeforeChange, event);
} else {
return SchemaUtils.transformSchemaChangeEvent(
hasAsteriskMap.get(tableId), projectedColumnsMap.get(tableId), event);
false, projectedColumnsMap.get(tableId), event);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,12 +265,26 @@ private SchemaChangeEvent cacheCreateTable(CreateTableEvent event) {
private Optional<SchemaChangeEvent> cacheChangeSchema(SchemaChangeEvent event) {
TableId tableId = event.tableId();
PreTransformChangeInfo tableChangeInfo = preTransformChangeInfoMap.get(tableId);
List<String> columnNamesBeforeChange = tableChangeInfo.getSourceSchema().getColumnNames();

Schema originalSchema =
SchemaUtils.applySchemaChangeEvent(tableChangeInfo.getSourceSchema(), event);
Schema preTransformedSchema = tableChangeInfo.getPreTransformedSchema();
Optional<SchemaChangeEvent> schemaChangeEvent =
SchemaUtils.transformSchemaChangeEvent(
hasAsteriskMap.get(tableId), referencedColumnsMap.get(tableId), event);

Optional<SchemaChangeEvent> schemaChangeEvent;
if (hasAsteriskMap.getOrDefault(tableId, true)) {
// If this TableId is asterisk-ful, we should use the latest upstream schema as
// referenced columns to perform schema evolution, not of the original ones generated
// when creating tables. If hasAsteriskMap has no entry for this TableId, it means that
// this TableId has not been captured by any transform rules, and should be regarded as
// asterisk-ful by default.
schemaChangeEvent =
SchemaUtils.transformSchemaChangeEvent(true, columnNamesBeforeChange, event);
} else {
schemaChangeEvent =
SchemaUtils.transformSchemaChangeEvent(
false, referencedColumnsMap.get(tableId), event);
}
if (schemaChangeEvent.isPresent()) {
preTransformedSchema =
SchemaUtils.applySchemaChangeEvent(
Expand Down

0 comments on commit bd7aca0

Please sign in to comment.