Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink: port #9173 to v1.16 and v1.18 #9334

Merged
merged 1 commit into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,18 @@
public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEnumeratorState> {
private static final Logger LOG = LoggerFactory.getLogger(IcebergSource.class);

// This table loader can be closed, and it is only safe to use this instance for resource
// independent information (e.g. a table name). Copies of this are required to avoid lifecycle
// management conflicts with the user provided table loader. e.g. a copy of this is required for
// split planning, which uses the underlying io, and should be closed after split planning is
// complete.
private final TableLoader tableLoader;
private final ScanContext scanContext;
private final ReaderFunction<T> readerFunction;
private final SplitAssignerFactory assignerFactory;
private final SerializableComparator<IcebergSourceSplit> splitComparator;
private final SerializableRecordEmitter<T> emitter;

// Can't use SerializableTable as enumerator needs a regular table
// that can discover table changes
private transient Table table;
private final String tableName;

IcebergSource(
TableLoader tableLoader,
Expand All @@ -100,17 +102,21 @@ public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEn
SerializableComparator<IcebergSourceSplit> splitComparator,
Table table,
SerializableRecordEmitter<T> emitter) {
Preconditions.checkNotNull(tableLoader, "tableLoader is required.");
Preconditions.checkNotNull(readerFunction, "readerFunction is required.");
Preconditions.checkNotNull(assignerFactory, "assignerFactory is required.");
Preconditions.checkNotNull(table, "table is required.");
this.tableLoader = tableLoader;
this.scanContext = scanContext;
this.readerFunction = readerFunction;
this.assignerFactory = assignerFactory;
this.splitComparator = splitComparator;
this.table = table;
this.emitter = emitter;
this.tableName = table.name();
}

String name() {
return "IcebergSource-" + lazyTable().name();
return "IcebergSource-" + tableName;
}

private String planningThreadName() {
Expand All @@ -120,38 +126,26 @@ private String planningThreadName() {
// a public API like the protected method "OperatorCoordinator.Context getCoordinatorContext()"
// from SourceCoordinatorContext implementation. For now, <table name>-<random UUID> is used as
// the unique thread pool name.
return lazyTable().name() + "-" + UUID.randomUUID();
return tableName + "-" + UUID.randomUUID();
}

private List<IcebergSourceSplit> planSplitsForBatch(String threadName) {
ExecutorService workerPool =
ThreadPools.newWorkerPool(threadName, scanContext.planParallelism());
try {
try (TableLoader loader = tableLoader.clone()) {
loader.open();
List<IcebergSourceSplit> splits =
FlinkSplitPlanner.planIcebergSourceSplits(lazyTable(), scanContext, workerPool);
FlinkSplitPlanner.planIcebergSourceSplits(loader.loadTable(), scanContext, workerPool);
LOG.info(
"Discovered {} splits from table {} during job initialization",
splits.size(),
lazyTable().name());
"Discovered {} splits from table {} during job initialization", splits.size(), tableName);
return splits;
} catch (IOException e) {
throw new UncheckedIOException("Failed to close table loader", e);
} finally {
workerPool.shutdown();
}
}

private Table lazyTable() {
if (table == null) {
tableLoader.open();
try (TableLoader loader = tableLoader) {
this.table = loader.loadTable();
} catch (IOException e) {
throw new UncheckedIOException("Failed to close table loader", e);
}
}

return table;
}

@Override
public Boundedness getBoundedness() {
return scanContext.isStreaming() ? Boundedness.CONTINUOUS_UNBOUNDED : Boundedness.BOUNDED;
Expand All @@ -160,7 +154,7 @@ public Boundedness getBoundedness() {
@Override
public SourceReader<T, IcebergSourceSplit> createReader(SourceReaderContext readerContext) {
IcebergSourceReaderMetrics metrics =
new IcebergSourceReaderMetrics(readerContext.metricGroup(), lazyTable().name());
new IcebergSourceReaderMetrics(readerContext.metricGroup(), tableName);
return new IcebergSourceReader<>(
emitter, metrics, readerFunction, splitComparator, readerContext);
}
Expand Down Expand Up @@ -197,13 +191,12 @@ private SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> createEnumer
LOG.info(
"Iceberg source restored {} splits from state for table {}",
enumState.pendingSplits().size(),
lazyTable().name());
tableName);
assigner = assignerFactory.createAssigner(enumState.pendingSplits());
}

if (scanContext.isStreaming()) {
ContinuousSplitPlanner splitPlanner =
new ContinuousSplitPlannerImpl(tableLoader.clone(), scanContext, planningThreadName());
new ContinuousSplitPlannerImpl(tableLoader, scanContext, planningThreadName());
return new ContinuousIcebergEnumerator(
enumContext, assigner, scanContext, splitPlanner, enumState);
} else {
Expand Down Expand Up @@ -537,7 +530,6 @@ public IcebergSource<T> build() {
}
}

checkRequired();
// Since builder already load the table, pass it to the source to avoid double loading
return new IcebergSource<>(
tableLoader,
Expand All @@ -548,11 +540,5 @@ public IcebergSource<T> build() {
table,
emitter);
}

private void checkRequired() {
Preconditions.checkNotNull(tableLoader, "tableLoader is required.");
Preconditions.checkNotNull(splitAssignerFactory, "assignerFactory is required.");
Preconditions.checkNotNull(readerFunction, "readerFunction is required.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ public class ContinuousSplitPlannerImpl implements ContinuousSplitPlanner {
*/
public ContinuousSplitPlannerImpl(
TableLoader tableLoader, ScanContext scanContext, String threadName) {
this.tableLoader = tableLoader;
this.tableLoader = tableLoader.clone();
this.tableLoader.open();
this.table = tableLoader.loadTable();
this.table = this.tableLoader.loadTable();
this.scanContext = scanContext;
this.isSharedPool = threadName == null;
this.workerPool =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,18 @@
public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEnumeratorState> {
private static final Logger LOG = LoggerFactory.getLogger(IcebergSource.class);

// This table loader can be closed, and it is only safe to use this instance for resource
// independent information (e.g. a table name). Copies of this are required to avoid lifecycle
// management conflicts with the user provided table loader. e.g. a copy of this is required for
// split planning, which uses the underlying io, and should be closed after split planning is
// complete.
private final TableLoader tableLoader;
private final ScanContext scanContext;
private final ReaderFunction<T> readerFunction;
private final SplitAssignerFactory assignerFactory;
private final SerializableComparator<IcebergSourceSplit> splitComparator;
private final SerializableRecordEmitter<T> emitter;

// Can't use SerializableTable as enumerator needs a regular table
// that can discover table changes
private transient Table table;
private final String tableName;

IcebergSource(
TableLoader tableLoader,
Expand All @@ -100,17 +102,21 @@ public class IcebergSource<T> implements Source<T, IcebergSourceSplit, IcebergEn
SerializableComparator<IcebergSourceSplit> splitComparator,
Table table,
SerializableRecordEmitter<T> emitter) {
Preconditions.checkNotNull(tableLoader, "tableLoader is required.");
Preconditions.checkNotNull(readerFunction, "readerFunction is required.");
Preconditions.checkNotNull(assignerFactory, "assignerFactory is required.");
Preconditions.checkNotNull(table, "table is required.");
this.tableLoader = tableLoader;
this.scanContext = scanContext;
this.readerFunction = readerFunction;
this.assignerFactory = assignerFactory;
this.splitComparator = splitComparator;
this.table = table;
this.emitter = emitter;
this.tableName = table.name();
}

String name() {
return "IcebergSource-" + lazyTable().name();
return "IcebergSource-" + tableName;
}

private String planningThreadName() {
Expand All @@ -120,38 +126,26 @@ private String planningThreadName() {
// a public API like the protected method "OperatorCoordinator.Context getCoordinatorContext()"
// from SourceCoordinatorContext implementation. For now, <table name>-<random UUID> is used as
// the unique thread pool name.
return lazyTable().name() + "-" + UUID.randomUUID();
return tableName + "-" + UUID.randomUUID();
}

private List<IcebergSourceSplit> planSplitsForBatch(String threadName) {
ExecutorService workerPool =
ThreadPools.newWorkerPool(threadName, scanContext.planParallelism());
try {
try (TableLoader loader = tableLoader.clone()) {
loader.open();
List<IcebergSourceSplit> splits =
FlinkSplitPlanner.planIcebergSourceSplits(lazyTable(), scanContext, workerPool);
FlinkSplitPlanner.planIcebergSourceSplits(loader.loadTable(), scanContext, workerPool);
LOG.info(
"Discovered {} splits from table {} during job initialization",
splits.size(),
lazyTable().name());
"Discovered {} splits from table {} during job initialization", splits.size(), tableName);
return splits;
} catch (IOException e) {
throw new UncheckedIOException("Failed to close table loader", e);
} finally {
workerPool.shutdown();
}
}

private Table lazyTable() {
if (table == null) {
tableLoader.open();
try (TableLoader loader = tableLoader) {
this.table = loader.loadTable();
} catch (IOException e) {
throw new UncheckedIOException("Failed to close table loader", e);
}
}

return table;
}

@Override
public Boundedness getBoundedness() {
return scanContext.isStreaming() ? Boundedness.CONTINUOUS_UNBOUNDED : Boundedness.BOUNDED;
Expand All @@ -160,7 +154,7 @@ public Boundedness getBoundedness() {
@Override
public SourceReader<T, IcebergSourceSplit> createReader(SourceReaderContext readerContext) {
IcebergSourceReaderMetrics metrics =
new IcebergSourceReaderMetrics(readerContext.metricGroup(), lazyTable().name());
new IcebergSourceReaderMetrics(readerContext.metricGroup(), tableName);
return new IcebergSourceReader<>(
emitter, metrics, readerFunction, splitComparator, readerContext);
}
Expand Down Expand Up @@ -197,13 +191,12 @@ private SplitEnumerator<IcebergSourceSplit, IcebergEnumeratorState> createEnumer
LOG.info(
"Iceberg source restored {} splits from state for table {}",
enumState.pendingSplits().size(),
lazyTable().name());
tableName);
assigner = assignerFactory.createAssigner(enumState.pendingSplits());
}

if (scanContext.isStreaming()) {
ContinuousSplitPlanner splitPlanner =
new ContinuousSplitPlannerImpl(tableLoader.clone(), scanContext, planningThreadName());
new ContinuousSplitPlannerImpl(tableLoader, scanContext, planningThreadName());
return new ContinuousIcebergEnumerator(
enumContext, assigner, scanContext, splitPlanner, enumState);
} else {
Expand Down Expand Up @@ -537,7 +530,6 @@ public IcebergSource<T> build() {
}
}

checkRequired();
// Since builder already load the table, pass it to the source to avoid double loading
return new IcebergSource<>(
tableLoader,
Expand All @@ -548,11 +540,5 @@ public IcebergSource<T> build() {
table,
emitter);
}

private void checkRequired() {
Preconditions.checkNotNull(tableLoader, "tableLoader is required.");
Preconditions.checkNotNull(splitAssignerFactory, "assignerFactory is required.");
Preconditions.checkNotNull(readerFunction, "readerFunction is required.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ public class ContinuousSplitPlannerImpl implements ContinuousSplitPlanner {
*/
public ContinuousSplitPlannerImpl(
TableLoader tableLoader, ScanContext scanContext, String threadName) {
this.tableLoader = tableLoader;
this.tableLoader = tableLoader.clone();
this.tableLoader.open();
this.table = tableLoader.loadTable();
this.table = this.tableLoader.loadTable();
this.scanContext = scanContext;
this.isSharedPool = threadName == null;
this.workerPool =
Expand Down