Skip to content

Commit

Permalink
Column name based split orders
Browse files Browse the repository at this point in the history
  • Loading branch information
Peter Vary committed Nov 15, 2023
1 parent 4f11bba commit 4e92111
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import javax.annotation.Nullable;
import org.apache.commons.compress.utils.Sets;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
Expand Down Expand Up @@ -65,6 +66,7 @@
import org.apache.iceberg.flink.source.reader.ReaderFunction;
import org.apache.iceberg.flink.source.reader.RowDataReaderFunction;
import org.apache.iceberg.flink.source.reader.SerializableRecordEmitter;
import org.apache.iceberg.flink.source.reader.TimestampBasedWatermarkExtractor;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.flink.source.split.IcebergSourceSplitSerializer;
import org.apache.iceberg.flink.source.split.SerializableComparator;
Expand Down Expand Up @@ -223,7 +225,7 @@ public static class Builder<T> {
private Table table;
private SplitAssignerFactory splitAssignerFactory;
private SerializableComparator<IcebergSourceSplit> splitComparator;
private IcebergWatermarkExtractor<T> watermarkExtractor;
private String watermarkColumn;
private ReaderFunction<T> readerFunction;
private ReadableConfig flinkConfig = new Configuration();
private final ScanContext.Builder contextBuilder = ScanContext.builder();
Expand All @@ -246,8 +248,8 @@ public Builder<T> table(Table newTable) {

public Builder<T> assignerFactory(SplitAssignerFactory assignerFactory) {
Preconditions.checkArgument(
watermarkExtractor == null,
"WatermarkExtractor and SplitAssigner should not be set in the same source");
watermarkColumn == null,
"Watermark column and SplitAssigner should not be set in the same source");
this.splitAssignerFactory = assignerFactory;
return this;
}
Expand Down Expand Up @@ -441,16 +443,14 @@ public Builder<T> setAll(Map<String, String> properties) {
}

/**
* Sets the {@link IcebergWatermarkExtractor} to retrieve the split watermark before emitting
* the records for a given split. The {@link
* IcebergWatermarkExtractor#extractWatermark(IcebergSourceSplit)} is also used for ordering the
* splits for read.
* Emits watermarks once per split based on the file statistics for the given split. The
* watermarks generated this way are also used for ordering the splits for read.
*/
public Builder<T> watermarkExtractor(IcebergWatermarkExtractor<T> newWatermarkExtractor) {
public Builder<T> watermarkColumn(String columnName) {
Preconditions.checkArgument(
splitAssignerFactory == null,
"WatermarkExtractor and SplitAssigner should not be set in the same source");
this.watermarkExtractor = newWatermarkExtractor;
"Watermark column and SplitAssigner should not be set in the same source");
this.watermarkColumn = columnName;
return this;
}

Expand Down Expand Up @@ -478,6 +478,19 @@ public IcebergSource<T> build() {
contextBuilder.project(FlinkSchemaUtil.convert(icebergSchema, projectedFlinkSchema));
}

SerializableRecordEmitter<T> emitter = SerializableRecordEmitter.defaultEmitter();
if (watermarkColumn != null) {
// We need the column statistics for watermark generation
contextBuilder.includeColumnStats(Sets.newHashSet(watermarkColumn));

IcebergWatermarkExtractor watermarkExtractor =
new TimestampBasedWatermarkExtractor(icebergSchema, watermarkColumn);
emitter = SerializableRecordEmitter.emitterWithWatermark(watermarkExtractor);
splitAssignerFactory =
new OrderedSplitAssignerFactory(
SplitComparators.watermarksAwareComparator(watermarkExtractor));
}

ScanContext context = contextBuilder.build();
if (readerFunction == null) {
if (table instanceof BaseMetadataTable) {
Expand Down Expand Up @@ -508,16 +521,6 @@ public IcebergSource<T> build() {
}
}

SerializableRecordEmitter<T> emitter;
if (watermarkExtractor == null) {
emitter = SerializableRecordEmitter.defaultEmitter();
} else {
emitter = SerializableRecordEmitter.emitterWithWatermark(watermarkExtractor);
splitAssignerFactory =
new OrderedSplitAssignerFactory(
SplitComparators.watermarksAwareComparator(watermarkExtractor));
}

checkRequired();
// Since builder already load the table, pass it to the source to avoid double loading
return new IcebergSource<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;

/** The interface used to extract watermarks from splits. */
public interface IcebergWatermarkExtractor<T> extends Serializable {
public interface IcebergWatermarkExtractor extends Serializable {
/** Get the watermark for a split. */
long extractWatermark(IcebergSourceSplit split);
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@ static <T> SerializableRecordEmitter<T> defaultEmitter() {

static <T> SerializableRecordEmitter<T> emitterWithWatermark(
IcebergWatermarkExtractor extractor) {
return new WatermarkExtractorRecordEmitter(extractor);
return new WatermarkExtractorRecordEmitter<>(extractor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import java.io.Serializable;
import java.util.Comparator;
import org.apache.flink.table.data.RowData;
import org.apache.iceberg.Schema;
import org.apache.iceberg.flink.source.split.IcebergSourceSplit;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
Expand All @@ -33,8 +32,7 @@
* statistics to get the watermarks for the {@link IcebergSourceSplit}. This watermark is emitted by
* the {@link WatermarkExtractorRecordEmitter} along with the actual records.
*/
public class IcebergTimestampWatermarkExtractor
implements IcebergWatermarkExtractor<RowData>, Serializable {
public class TimestampBasedWatermarkExtractor implements IcebergWatermarkExtractor, Serializable {
private final int tsFieldId;

/**
Expand All @@ -43,7 +41,7 @@ public class IcebergTimestampWatermarkExtractor
* @param schema The schema of the Table
* @param tsFieldName The timestamp column which should be used as an event time
*/
public IcebergTimestampWatermarkExtractor(Schema schema, String tsFieldName) {
public TimestampBasedWatermarkExtractor(Schema schema, String tsFieldName) {
Types.NestedField field = schema.findField(tsFieldName);
Preconditions.checkArgument(
field.type().typeId().equals(Type.TypeID.TIMESTAMP), "Type should be timestamp");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public static SerializableComparator<IcebergSourceSplit> fileSequenceNumber() {

/** Comparator which orders the splits based on watermark of the splits */
public static SerializableComparator<IcebergSourceSplit> watermarksAwareComparator(
IcebergWatermarkExtractor<?> watermarkExtractor) {
IcebergWatermarkExtractor watermarkExtractor) {
return (IcebergSourceSplit o1, IcebergSourceSplit o2) -> {
long watermark1 = watermarkExtractor.extractWatermark(o1);
long watermark2 = watermarkExtractor.extractWatermark(o2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.GenericAppenderHelper;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
Expand Down Expand Up @@ -97,6 +98,11 @@ protected List<Record> generateRecords(int numRecords, long seed) {
return RandomGenericData.generate(schema(), numRecords, seed);
}

protected void assertRecords(Table table, List<Record> expectedRecords, Duration timeout)
throws Exception {
SimpleDataUtil.assertTableRecords(table, expectedRecords, timeout);
}

@Test
public void testBoundedWithTaskManagerFailover() throws Exception {
testBoundedIcebergSource(FailoverType.TM);
Expand Down Expand Up @@ -150,8 +156,7 @@ private void testBoundedIcebergSource(FailoverType failoverType) throws Exceptio
RecordCounterToFail::continueProcessing,
miniClusterResource.getMiniCluster());

SimpleDataUtil.assertTableRecords(
sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120));
assertRecords(sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120));
}

@Test
Expand Down Expand Up @@ -214,8 +219,7 @@ private void testContinuousIcebergSource(FailoverType failoverType) throws Excep

// wait longer for continuous source to reduce flakiness
// because CI servers tend to be overloaded.
SimpleDataUtil.assertTableRecords(
sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120));
assertRecords(sinkTableResource.table(), expectedRecords, Duration.ofSeconds(120));
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@
import org.apache.iceberg.data.Record;
import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.source.reader.IcebergTimestampWatermarkExtractor;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.util.StructLikeWrapper;
import org.awaitility.Awaitility;

public class TestIcebergSourceFailoverWithWatermarkExtractor extends TestIcebergSourceFailover {
// Increment ts by 15 minutes for each generateRecords batch
Expand All @@ -50,9 +50,8 @@ public class TestIcebergSourceFailoverWithWatermarkExtractor extends TestIceberg
protected IcebergSource.Builder<RowData> sourceBuilder() {
return IcebergSource.<RowData>builder()
.tableLoader(sourceTableResource.tableLoader())
.watermarkExtractor(new IcebergTimestampWatermarkExtractor(TestFixtures.TS_SCHEMA, "ts"))
.project(TestFixtures.TS_SCHEMA)
.includeColumnStats(true);
.watermarkColumn("ts")
.project(TestFixtures.TS_SCHEMA);
}

@Override
Expand All @@ -65,14 +64,13 @@ protected List<Record> generateRecords(int numRecords, long seed) {
// Override the ts field to create a more realistic situation for event time alignment
tsMilli.addAndGet(RECORD_BATCH_TS_INCREMENT_MILLI);
return RandomGenericData.generate(schema(), numRecords, seed).stream()
.map(
.peek(
record -> {
LocalDateTime ts =
LocalDateTime.ofInstant(
Instant.ofEpochMilli(tsMilli.addAndGet(RECORD_TS_INCREMENT_MILLI)),
ZoneId.of("Z"));
record.setField("ts", ts);
return record;
})
.collect(Collectors.toList());
}
Expand All @@ -84,32 +82,30 @@ record -> {
* {@link LocalDateTime} to a Long type so that Comparators can continue to work.
*/
@Override
protected void assertRecords(
Table table, List<Record> expectedRecords, Duration interval, int maxCount) throws Exception {
protected void assertRecords(Table table, List<Record> expectedRecords, Duration timeout)
throws Exception {
List<Record> expectedNormalized = convertTimestampField(expectedRecords);
for (int i = 0; i < maxCount; ++i) {
if (SimpleDataUtil.equalsRecords(
expectedNormalized,
convertTimestampField(SimpleDataUtil.tableRecords(table)),
table.schema())) {
break;
} else {
Thread.sleep(interval.toMillis());
}
}
SimpleDataUtil.assertRecordsEqual(
expectedNormalized,
convertTimestampField(SimpleDataUtil.tableRecords(table)),
table.schema());
Awaitility.await("expected list of records should be produced")
.atMost(timeout)
.untilAsserted(
() -> {
SimpleDataUtil.equalsRecords(
expectedNormalized,
convertTimestampField(SimpleDataUtil.tableRecords(table)),
table.schema());
SimpleDataUtil.assertRecordsEqual(
expectedNormalized,
convertTimestampField(SimpleDataUtil.tableRecords(table)),
table.schema());
});
}

private List<Record> convertTimestampField(List<Record> records) {
return records.stream()
.map(
.peek(
r -> {
LocalDateTime localDateTime = ((LocalDateTime) r.getField("ts"));
r.setField("ts", localDateTime.atZone(ZoneOffset.UTC).toInstant().toEpochMilli());
return r;
})
.collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
import org.apache.iceberg.flink.HadoopTableResource;
import org.apache.iceberg.flink.RowDataConverter;
import org.apache.iceberg.flink.TestFixtures;
import org.apache.iceberg.flink.source.reader.IcebergTimestampWatermarkExtractor;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
Expand Down Expand Up @@ -155,7 +154,7 @@ public void testWindowing() throws Exception {
public void apply(
TimeWindow window, Iterable<RowData> values, Collector<RowData> out) {
// Just print all the data to confirm everything has arrived
values.forEach(r -> out.collect(r));
values.forEach(out::collect);
}
});

Expand Down Expand Up @@ -265,9 +264,8 @@ public void testThrottling() throws Exception {
protected IcebergSource.Builder<RowData> sourceBuilder() {
return IcebergSource.<RowData>builder()
.tableLoader(sourceTableResource.tableLoader())
.watermarkExtractor(new IcebergTimestampWatermarkExtractor(TestFixtures.TS_SCHEMA, "ts"))
.watermarkColumn("ts")
.project(TestFixtures.TS_SCHEMA)
.includeColumnStats(true)
.splitSize(100L);
}

Expand Down Expand Up @@ -317,7 +315,7 @@ protected void assertRecords(
Assert.assertEquals(expected, received);
}

protected void waitForRecords(CollectResultIterator<RowData> iterator, int num) throws Exception {
protected void waitForRecords(CollectResultIterator<RowData> iterator, int num) {
assertThat(
CompletableFuture.supplyAsync(
() -> {
Expand Down

0 comments on commit 4e92111

Please sign in to comment.