Skip to content

Commit

Permalink
Flink: port #9173 to v1.16 and v1.18
Browse files Browse the repository at this point in the history
  • Loading branch information
mas-chen committed Dec 18, 2023
1 parent 6e21bbf commit 8b874e8
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,18 @@
public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEnumeratorState> {
private static final Logger LOG = LoggerFactory.getLogger(IcebergSource.class);

// This table loader can be closed, and it is only safe to use this instance for resource
// independent information (e.g. a table name). Copies of this are required to avoid lifecycle
// management conflicts with the user provided table loader. e.g. a copy of this is required for
// split planning, which uses the underlying io, and should be closed after split planning is
// complete.
private final TableLoader tableLoader;
private final ScanContext scanContext;
private final ReaderFunction<T> readerFunction;
private final SplitAssignerFactory assignerFactory;
private final SerializableComparator<IcebergSourceSplit> splitComparator;
private final SerializableRecordEmitter<T> emitter;

// Can't use SerializableTable as enumerator needs a regular table
// that can discover table changes
private transient Table table;
private final String tableName;

IcebergSource(
TableLoader tableLoader,
Expand All @@ -100,17 +102,21 @@ public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEn
SerializableComparator<IcebergSourceSplit> splitComparator,
Table table,
SerializableRecordEmitter<T> emitter) {
Preconditions.checkNotNull(tableLoader, "tableLoader is required.");
Preconditions.checkNotNull(readerFunction, "readerFunction is required.");
Preconditions.checkNotNull(assignerFactory, "assignerFactory is required.");
Preconditions.checkNotNull(table, "table is required.");
this.tableLoader = tableLoader;
this.scanContext = scanContext;
this.readerFunction = readerFunction;
this.assignerFactory = assignerFactory;
this.splitComparator = splitComparator;
this.table = table;
this.emitter = emitter;
this.tableName = table.name();
}

String name() {
return "IcebergSource-" + lazyTable().name();
return "IcebergSource-" + tableName;
}

private String planningThreadName() {
Expand All @@ -120,38 +126,26 @@ private String planningThreadName() {
// a public API like the protected method "OperatorCoordinator.Context getCoordinatorContext()"
// from SourceCoordinatorContext implementation. For now, <table name>-<random UUID> is used as
// the unique thread pool name.
return lazyTable().name() + "-" + UUID.randomUUID();
return tableName + "-" + UUID.randomUUID();
}

private List<IcebergSourceSplit> planSplitsForBatch(String threadName) {
ExecutorService workerPool =
ThreadPools.newWorkerPool(threadName, scanContext.planParallelism());
try {
try (TableLoader loader = tableLoader.clone()) {
loader.open();
List<IcebergSourceSplit> splits =
FlinkSplitPlanner.planIcebergSourceSplits(lazyTable(), scanContext, workerPool);
FlinkSplitPlanner.planIcebergSourceSplits(loader.loadTable(), scanContext, workerPool);
LOG.info(
"Discovered {} splits from table {} during job initialization",
splits.size(),
lazyTable().name());
"Discovered {} splits from table {} during job initialization", splits.size(), tableName);
return splits;
} catch (IOException e) {
throw new UncheckedIOException("Failed to close table loader", e);
} finally {
workerPool.shutdown();
}
}

private Table lazyTable() {
if (table == null) {
tableLoader.open();
try (TableLoader loader = tableLoader) {
this.table = loader.loadTable();
} catch (IOException e) {
throw new UncheckedIOException("Failed to close table loader", e);
}
}

return table;
}

@Override
public Boundedness getBoundedness() {
return scanContext.isStreaming() ? Boundedness.CONTINUOUS_UNBOUNDED : Boundedness.BOUNDED;
Expand All @@ -160,7 +154,7 @@ public Boundedness getBoundedness() {
@Override
public SourceReader<T, IcebergSourceSplit> createReader(SourceReaderContext readerContext) {
IcebergSourceReaderMetrics metrics =
new IcebergSourceReaderMetrics(readerContext.metricGroup(), lazyTable().name());
new IcebergSourceReaderMetrics(readerContext.metricGroup(), tableName);
return new IcebergSourceReader<>(
emitter, metrics, readerFunction, splitComparator, readerContext);
}
Expand Down Expand Up @@ -197,13 +191,12 @@ private SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> createEnumer
LOG.info(
"Iceberg source restored {} splits from state for table {}",
enumState.pendingSplits().size(),
lazyTable().name());
tableName);
assigner = assignerFactory.createAssigner(enumState.pendingSplits());
}

if (scanContext.isStreaming()) {
ContinuousSplitPlanner splitPlanner =
new ContinuousSplitPlannerImpl(tableLoader.clone(), scanContext, planningThreadName());
new ContinuousSplitPlannerImpl(tableLoader, scanContext, planningThreadName());
return new ContinuousIcebergEnumerator(
enumContext, assigner, scanContext, splitPlanner, enumState);
} else {
Expand Down Expand Up @@ -537,7 +530,6 @@ public IcebergSource<T> build() {
}
}

checkRequired();
// Since builder already load the table, pass it to the source to avoid double loading
return new IcebergSource<>(
tableLoader,
Expand All @@ -548,11 +540,5 @@ public IcebergSource<T> build() {
table,
emitter);
}

private void checkRequired() {
Preconditions.checkNotNull(tableLoader, "tableLoader is required.");
Preconditions.checkNotNull(splitAssignerFactory, "assignerFactory is required.");
Preconditions.checkNotNull(readerFunction, "readerFunction is required.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ public class ContinuousSplitPlannerImpl implements ContinuousSplitPlanner {
*/
public ContinuousSplitPlannerImpl(
TableLoader tableLoader, ScanContext scanContext, String threadName) {
this.tableLoader = tableLoader;
this.tableLoader = tableLoader.clone();
this.tableLoader.open();
this.table = tableLoader.loadTable();
this.table = this.tableLoader.loadTable();
this.scanContext = scanContext;
this.isSharedPool = threadName == null;
this.workerPool =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,18 @@
public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEnumeratorState> {
private static final Logger LOG = LoggerFactory.getLogger(IcebergSource.class);

// This table loader can be closed, and it is only safe to use this instance for resource
// independent information (e.g. a table name). Copies of this are required to avoid lifecycle
// management conflicts with the user provided table loader. e.g. a copy of this is required for
// split planning, which uses the underlying io, and should be closed after split planning is
// complete.
private final TableLoader tableLoader;
private final ScanContext scanContext;
private final ReaderFunction<T> readerFunction;
private final SplitAssignerFactory assignerFactory;
private final SerializableComparator<IcebergSourceSplit> splitComparator;
private final SerializableRecordEmitter<T> emitter;

// Can't use SerializableTable as enumerator needs a regular table
// that can discover table changes
private transient Table table;
private final String tableName;

IcebergSource(
TableLoader tableLoader,
Expand All @@ -100,17 +102,21 @@ public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEn
SerializableComparator<IcebergSourceSplit> splitComparator,
Table table,
SerializableRecordEmitter<T> emitter) {
Preconditions.checkNotNull(tableLoader, "tableLoader is required.");
Preconditions.checkNotNull(readerFunction, "readerFunction is required.");
Preconditions.checkNotNull(assignerFactory, "assignerFactory is required.");
Preconditions.checkNotNull(table, "table is required.");
this.tableLoader = tableLoader;
this.scanContext = scanContext;
this.readerFunction = readerFunction;
this.assignerFactory = assignerFactory;
this.splitComparator = splitComparator;
this.table = table;
this.emitter = emitter;
this.tableName = table.name();
}

String name() {
return "IcebergSource-" + lazyTable().name();
return "IcebergSource-" + tableName;
}

private String planningThreadName() {
Expand All @@ -120,38 +126,26 @@ private String planningThreadName() {
// a public API like the protected method "OperatorCoordinator.Context getCoordinatorContext()"
// from SourceCoordinatorContext implementation. For now, <table name>-<random UUID> is used as
// the unique thread pool name.
return lazyTable().name() + "-" + UUID.randomUUID();
return tableName + "-" + UUID.randomUUID();
}

private List<IcebergSourceSplit> planSplitsForBatch(String threadName) {
ExecutorService workerPool =
ThreadPools.newWorkerPool(threadName, scanContext.planParallelism());
try {
try (TableLoader loader = tableLoader.clone()) {
loader.open();
List<IcebergSourceSplit> splits =
FlinkSplitPlanner.planIcebergSourceSplits(lazyTable(), scanContext, workerPool);
FlinkSplitPlanner.planIcebergSourceSplits(loader.loadTable(), scanContext, workerPool);
LOG.info(
"Discovered {} splits from table {} during job initialization",
splits.size(),
lazyTable().name());
"Discovered {} splits from table {} during job initialization", splits.size(), tableName);
return splits;
} catch (IOException e) {
throw new UncheckedIOException("Failed to close table loader", e);
} finally {
workerPool.shutdown();
}
}

private Table lazyTable() {
if (table == null) {
tableLoader.open();
try (TableLoader loader = tableLoader) {
this.table = loader.loadTable();
} catch (IOException e) {
throw new UncheckedIOException("Failed to close table loader", e);
}
}

return table;
}

@Override
public Boundedness getBoundedness() {
return scanContext.isStreaming() ? Boundedness.CONTINUOUS_UNBOUNDED : Boundedness.BOUNDED;
Expand All @@ -160,7 +154,7 @@ public Boundedness getBoundedness() {
@Override
public SourceReader<T, IcebergSourceSplit> createReader(SourceReaderContext readerContext) {
IcebergSourceReaderMetrics metrics =
new IcebergSourceReaderMetrics(readerContext.metricGroup(), lazyTable().name());
new IcebergSourceReaderMetrics(readerContext.metricGroup(), tableName);
return new IcebergSourceReader<>(
emitter, metrics, readerFunction, splitComparator, readerContext);
}
Expand Down Expand Up @@ -197,13 +191,12 @@ private SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> createEnumer
LOG.info(
"Iceberg source restored {} splits from state for table {}",
enumState.pendingSplits().size(),
lazyTable().name());
tableName);
assigner = assignerFactory.createAssigner(enumState.pendingSplits());
}

if (scanContext.isStreaming()) {
ContinuousSplitPlanner splitPlanner =
new ContinuousSplitPlannerImpl(tableLoader.clone(), scanContext, planningThreadName());
new ContinuousSplitPlannerImpl(tableLoader, scanContext, planningThreadName());
return new ContinuousIcebergEnumerator(
enumContext, assigner, scanContext, splitPlanner, enumState);
} else {
Expand Down Expand Up @@ -537,7 +530,6 @@ public IcebergSource<T> build() {
}
}

checkRequired();
// Since builder already load the table, pass it to the source to avoid double loading
return new IcebergSource<>(
tableLoader,
Expand All @@ -548,11 +540,5 @@ public IcebergSource<T> build() {
table,
emitter);
}

private void checkRequired() {
Preconditions.checkNotNull(tableLoader, "tableLoader is required.");
Preconditions.checkNotNull(splitAssignerFactory, "assignerFactory is required.");
Preconditions.checkNotNull(readerFunction, "readerFunction is required.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ public class ContinuousSplitPlannerImpl implements ContinuousSplitPlanner {
*/
public ContinuousSplitPlannerImpl(
TableLoader tableLoader, ScanContext scanContext, String threadName) {
this.tableLoader = tableLoader;
this.tableLoader = tableLoader.clone();
this.tableLoader.open();
this.table = tableLoader.loadTable();
this.table = this.tableLoader.loadTable();
this.scanContext = scanContext;
this.isSharedPool = threadName == null;
this.workerPool =
Expand Down

0 comments on commit 8b874e8

Please sign in to comment.