Skip to content

Commit

Permalink
Flink: Fix IcebergSource tableloader lifecycle management in batch mode
Browse files Browse the repository at this point in the history
  • Loading branch information
mas-chen committed Nov 28, 2023
1 parent 5e059c1 commit c51d51b
Showing 1 changed file with 22 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@
public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEnumeratorState> {
private static final Logger LOG = LoggerFactory.getLogger(IcebergSource.class);

// this table loader can be closed, and it is only safe to use this instance for resource
// independent information (e.g. a table name)
private final TableLoader tableLoader;
private final ScanContext scanContext;
private final ReaderFunction<T> readerFunction;
Expand Down Expand Up @@ -110,7 +112,7 @@ public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEn
}

String name() {
return "IcebergSource-" + lazyTable().name();
return "IcebergSource-" + tableName();
}

private String planningThreadName() {
Expand All @@ -120,26 +122,30 @@ private String planningThreadName() {
// a public API like the protected method "OperatorCoordinator.Context getCoordinatorContext()"
// from SourceCoordinatorContext implementation. For now, <table name>-<random UUID> is used as
// the unique thread pool name.
return lazyTable().name() + "-" + UUID.randomUUID();
return tableName() + "-" + UUID.randomUUID();
}

private List<IcebergSourceSplit> planSplitsForBatch(String threadName) {
private List<IcebergSourceSplit> planSplitsForBatch(
TableLoader plannerTableLoader, String threadName) {
ExecutorService workerPool =
ThreadPools.newWorkerPool(threadName, scanContext.planParallelism());
try {
tableLoader.open();
try (TableLoader loader = plannerTableLoader) {
List<IcebergSourceSplit> splits =
FlinkSplitPlanner.planIcebergSourceSplits(lazyTable(), scanContext, workerPool);
FlinkSplitPlanner.planIcebergSourceSplits(loader.loadTable(), scanContext, workerPool);
LOG.info(
"Discovered {} splits from table {} during job initialization",
splits.size(),
lazyTable().name());
tableName());
return splits;
} catch (IOException e) {
throw new UncheckedIOException("Failed to close table loader", e);
} finally {
workerPool.shutdown();
}
}

private Table lazyTable() {
private String tableName() {
if (table == null) {
tableLoader.open();
try (TableLoader loader = tableLoader) {
Expand All @@ -149,7 +155,7 @@ private Table lazyTable() {
}
}

return table;
return table.name();
}

@Override
Expand All @@ -160,7 +166,7 @@ public Boundedness getBoundedness() {
@Override
public SourceReader<T, IcebergSourceSplit> createReader(SourceReaderContext readerContext) {
IcebergSourceReaderMetrics metrics =
new IcebergSourceReaderMetrics(readerContext.metricGroup(), lazyTable().name());
new IcebergSourceReaderMetrics(readerContext.metricGroup(), tableName());
return new IcebergSourceReader<>(
emitter, metrics, readerFunction, splitComparator, readerContext);
}
Expand Down Expand Up @@ -197,17 +203,21 @@ private SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> createEnumer
LOG.info(
"Iceberg source restored {} splits from state for table {}",
enumState.pendingSplits().size(),
lazyTable().name());
tableName());
assigner = assignerFactory.createAssigner(enumState.pendingSplits());
}

// Create a copy of the table loader to avoid lifecycle management conflicts with the user
// provided table loader. This copy is only required for split planning, which uses the
// underlying io, and should be closed after split planning is complete
TableLoader tableLoaderCopy = tableLoader.clone();
if (scanContext.isStreaming()) {
ContinuousSplitPlanner splitPlanner =
new ContinuousSplitPlannerImpl(tableLoader.clone(), scanContext, planningThreadName());
new ContinuousSplitPlannerImpl(tableLoaderCopy, scanContext, planningThreadName());
return new ContinuousIcebergEnumerator(
enumContext, assigner, scanContext, splitPlanner, enumState);
} else {
List<IcebergSourceSplit> splits = planSplitsForBatch(planningThreadName());
List<IcebergSourceSplit> splits = planSplitsForBatch(tableLoaderCopy, planningThreadName());
assigner.onDiscoveredSplits(splits);
return new StaticIcebergEnumerator(enumContext, assigner);
}
Expand Down

0 comments on commit c51d51b

Please sign in to comment.