From 8b874e800de88e11000676b8461c104ef7353efd Mon Sep 17 00:00:00 2001 From: mas-chen Date: Mon, 18 Dec 2023 13:56:57 -0800 Subject: [PATCH] Flink: port #9173 to v1.16 and v1.18 --- .../iceberg/flink/source/IcebergSource.java | 58 +++++++------------ .../ContinuousSplitPlannerImpl.java | 4 +- .../iceberg/flink/source/IcebergSource.java | 58 +++++++------------ .../ContinuousSplitPlannerImpl.java | 4 +- 4 files changed, 48 insertions(+), 76 deletions(-) diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java index 179253cb3a18..a7ce2db61ffb 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java @@ -81,16 +81,18 @@ public class IcebergSource implements Source { private static final Logger LOG = LoggerFactory.getLogger(IcebergSource.class); + // This table loader can be closed, and it is only safe to use this instance for resource + // independent information (e.g. a table name). Copies of this are required to avoid lifecycle + // management conflicts with the user provided table loader. e.g. a copy of this is required for + // split planning, which uses the underlying io, and should be closed after split planning is + // complete. private final TableLoader tableLoader; private final ScanContext scanContext; private final ReaderFunction readerFunction; private final SplitAssignerFactory assignerFactory; private final SerializableComparator splitComparator; private final SerializableRecordEmitter emitter; - - // Can't use SerializableTable as enumerator needs a regular table - // that can discover table changes - private transient Table table; + private final String tableName; IcebergSource( TableLoader tableLoader, @@ -100,17 +102,21 @@ public class IcebergSource implements Source splitComparator, Table table, SerializableRecordEmitter emitter) { + Preconditions.checkNotNull(tableLoader, "tableLoader is required."); + Preconditions.checkNotNull(readerFunction, "readerFunction is required."); + Preconditions.checkNotNull(assignerFactory, "assignerFactory is required."); + Preconditions.checkNotNull(table, "table is required."); this.tableLoader = tableLoader; this.scanContext = scanContext; this.readerFunction = readerFunction; this.assignerFactory = assignerFactory; this.splitComparator = splitComparator; - this.table = table; this.emitter = emitter; + this.tableName = table.name(); } String name() { - return "IcebergSource-" + lazyTable().name(); + return "IcebergSource-" + tableName; } private String planningThreadName() { @@ -120,38 +126,26 @@ private String planningThreadName() { // a public API like the protected method "OperatorCoordinator.Context getCoordinatorContext()" // from SourceCoordinatorContext implementation. For now, - is used as // the unique thread pool name. - return lazyTable().name() + "-" + UUID.randomUUID(); + return tableName + "-" + UUID.randomUUID(); } private List planSplitsForBatch(String threadName) { ExecutorService workerPool = ThreadPools.newWorkerPool(threadName, scanContext.planParallelism()); - try { + try (TableLoader loader = tableLoader.clone()) { + loader.open(); List splits = - FlinkSplitPlanner.planIcebergSourceSplits(lazyTable(), scanContext, workerPool); + FlinkSplitPlanner.planIcebergSourceSplits(loader.loadTable(), scanContext, workerPool); LOG.info( - "Discovered {} splits from table {} during job initialization", - splits.size(), - lazyTable().name()); + "Discovered {} splits from table {} during job initialization", splits.size(), tableName); return splits; + } catch (IOException e) { + throw new UncheckedIOException("Failed to close table loader", e); } finally { workerPool.shutdown(); } } - private Table lazyTable() { - if (table == null) { - tableLoader.open(); - try (TableLoader loader = tableLoader) { - this.table = loader.loadTable(); - } catch (IOException e) { - throw new UncheckedIOException("Failed to close table loader", e); - } - } - - return table; - } - @Override public Boundedness getBoundedness() { return scanContext.isStreaming() ? Boundedness.CONTINUOUS_UNBOUNDED : Boundedness.BOUNDED; @@ -160,7 +154,7 @@ public Boundedness getBoundedness() { @Override public SourceReader createReader(SourceReaderContext readerContext) { IcebergSourceReaderMetrics metrics = - new IcebergSourceReaderMetrics(readerContext.metricGroup(), lazyTable().name()); + new IcebergSourceReaderMetrics(readerContext.metricGroup(), tableName); return new IcebergSourceReader<>( emitter, metrics, readerFunction, splitComparator, readerContext); } @@ -197,13 +191,12 @@ private SplitEnumerator createEnumer LOG.info( "Iceberg source restored {} splits from state for table {}", enumState.pendingSplits().size(), - lazyTable().name()); + tableName); assigner = assignerFactory.createAssigner(enumState.pendingSplits()); } - if (scanContext.isStreaming()) { ContinuousSplitPlanner splitPlanner = - new ContinuousSplitPlannerImpl(tableLoader.clone(), scanContext, planningThreadName()); + new ContinuousSplitPlannerImpl(tableLoader, scanContext, planningThreadName()); return new ContinuousIcebergEnumerator( enumContext, assigner, scanContext, splitPlanner, enumState); } else { @@ -537,7 +530,6 @@ public IcebergSource build() { } } - checkRequired(); // Since builder already load the table, pass it to the source to avoid double loading return new IcebergSource<>( tableLoader, @@ -548,11 +540,5 @@ public IcebergSource build() { table, emitter); } - - private void checkRequired() { - Preconditions.checkNotNull(tableLoader, "tableLoader is required."); - Preconditions.checkNotNull(splitAssignerFactory, "assignerFactory is required."); - Preconditions.checkNotNull(readerFunction, "readerFunction is required."); - } } } diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java index f0d8ca8d7057..450b649253a4 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java @@ -56,9 +56,9 @@ public class ContinuousSplitPlannerImpl implements ContinuousSplitPlanner { */ public ContinuousSplitPlannerImpl( TableLoader tableLoader, ScanContext scanContext, String threadName) { - this.tableLoader = tableLoader; + this.tableLoader = tableLoader.clone(); this.tableLoader.open(); - this.table = tableLoader.loadTable(); + this.table = this.tableLoader.loadTable(); this.scanContext = scanContext; this.isSharedPool = threadName == null; this.workerPool = diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java index 179253cb3a18..a7ce2db61ffb 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/IcebergSource.java @@ -81,16 +81,18 @@ public class IcebergSource implements Source { private static final Logger LOG = LoggerFactory.getLogger(IcebergSource.class); + // This table loader can be closed, and it is only safe to use this instance for resource + // independent information (e.g. a table name). Copies of this are required to avoid lifecycle + // management conflicts with the user provided table loader. e.g. a copy of this is required for + // split planning, which uses the underlying io, and should be closed after split planning is + // complete. private final TableLoader tableLoader; private final ScanContext scanContext; private final ReaderFunction readerFunction; private final SplitAssignerFactory assignerFactory; private final SerializableComparator splitComparator; private final SerializableRecordEmitter emitter; - - // Can't use SerializableTable as enumerator needs a regular table - // that can discover table changes - private transient Table table; + private final String tableName; IcebergSource( TableLoader tableLoader, @@ -100,17 +102,21 @@ public class IcebergSource implements Source splitComparator, Table table, SerializableRecordEmitter emitter) { + Preconditions.checkNotNull(tableLoader, "tableLoader is required."); + Preconditions.checkNotNull(readerFunction, "readerFunction is required."); + Preconditions.checkNotNull(assignerFactory, "assignerFactory is required."); + Preconditions.checkNotNull(table, "table is required."); this.tableLoader = tableLoader; this.scanContext = scanContext; this.readerFunction = readerFunction; this.assignerFactory = assignerFactory; this.splitComparator = splitComparator; - this.table = table; this.emitter = emitter; + this.tableName = table.name(); } String name() { - return "IcebergSource-" + lazyTable().name(); + return "IcebergSource-" + tableName; } private String planningThreadName() { @@ -120,38 +126,26 @@ private String planningThreadName() { // a public API like the protected method "OperatorCoordinator.Context getCoordinatorContext()" // from SourceCoordinatorContext implementation. For now,
- is used as // the unique thread pool name. - return lazyTable().name() + "-" + UUID.randomUUID(); + return tableName + "-" + UUID.randomUUID(); } private List planSplitsForBatch(String threadName) { ExecutorService workerPool = ThreadPools.newWorkerPool(threadName, scanContext.planParallelism()); - try { + try (TableLoader loader = tableLoader.clone()) { + loader.open(); List splits = - FlinkSplitPlanner.planIcebergSourceSplits(lazyTable(), scanContext, workerPool); + FlinkSplitPlanner.planIcebergSourceSplits(loader.loadTable(), scanContext, workerPool); LOG.info( - "Discovered {} splits from table {} during job initialization", - splits.size(), - lazyTable().name()); + "Discovered {} splits from table {} during job initialization", splits.size(), tableName); return splits; + } catch (IOException e) { + throw new UncheckedIOException("Failed to close table loader", e); } finally { workerPool.shutdown(); } } - private Table lazyTable() { - if (table == null) { - tableLoader.open(); - try (TableLoader loader = tableLoader) { - this.table = loader.loadTable(); - } catch (IOException e) { - throw new UncheckedIOException("Failed to close table loader", e); - } - } - - return table; - } - @Override public Boundedness getBoundedness() { return scanContext.isStreaming() ? Boundedness.CONTINUOUS_UNBOUNDED : Boundedness.BOUNDED; @@ -160,7 +154,7 @@ public Boundedness getBoundedness() { @Override public SourceReader createReader(SourceReaderContext readerContext) { IcebergSourceReaderMetrics metrics = - new IcebergSourceReaderMetrics(readerContext.metricGroup(), lazyTable().name()); + new IcebergSourceReaderMetrics(readerContext.metricGroup(), tableName); return new IcebergSourceReader<>( emitter, metrics, readerFunction, splitComparator, readerContext); } @@ -197,13 +191,12 @@ private SplitEnumerator createEnumer LOG.info( "Iceberg source restored {} splits from state for table {}", enumState.pendingSplits().size(), - lazyTable().name()); + tableName); assigner = assignerFactory.createAssigner(enumState.pendingSplits()); } - if (scanContext.isStreaming()) { ContinuousSplitPlanner splitPlanner = - new ContinuousSplitPlannerImpl(tableLoader.clone(), scanContext, planningThreadName()); + new ContinuousSplitPlannerImpl(tableLoader, scanContext, planningThreadName()); return new ContinuousIcebergEnumerator( enumContext, assigner, scanContext, splitPlanner, enumState); } else { @@ -537,7 +530,6 @@ public IcebergSource build() { } } - checkRequired(); // Since builder already load the table, pass it to the source to avoid double loading return new IcebergSource<>( tableLoader, @@ -548,11 +540,5 @@ public IcebergSource build() { table, emitter); } - - private void checkRequired() { - Preconditions.checkNotNull(tableLoader, "tableLoader is required."); - Preconditions.checkNotNull(splitAssignerFactory, "assignerFactory is required."); - Preconditions.checkNotNull(readerFunction, "readerFunction is required."); - } } } diff --git a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java index f0d8ca8d7057..450b649253a4 100644 --- a/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java +++ b/flink/v1.18/flink/src/main/java/org/apache/iceberg/flink/source/enumerator/ContinuousSplitPlannerImpl.java @@ -56,9 +56,9 @@ public class ContinuousSplitPlannerImpl implements ContinuousSplitPlanner { */ public ContinuousSplitPlannerImpl( TableLoader tableLoader, ScanContext scanContext, String threadName) { - this.tableLoader = tableLoader; + this.tableLoader = tableLoader.clone(); this.tableLoader.open(); - this.table = tableLoader.loadTable(); + this.table = this.tableLoader.loadTable(); this.scanContext = scanContext; this.isSharedPool = threadName == null; this.workerPool =