Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-7908][Sort] PostgreSQL connector supports parallel read #8664

Merged
merged 18 commits into from
Sep 15, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
16d8155
[INLONG-7908][Sort] Postgres connector supports parallel read
kuansix Aug 8, 2023
0d2476d
[INLONG-7908][Sort] Postgres connector supports parallel read
kuansix Aug 13, 2023
a98ba84
[INLONG-7908][Sort] Postgres connector supports parallel read
kuansix Aug 13, 2023
2030e46
[INLONG-7908][Sort] Postgres connector supports parallel read
kuansix Aug 15, 2023
549861c
[INLONG-7908][Sort] Postgres connector supports parallel read
kuansix Aug 26, 2023
ac1f608
[INLONG-7908][Sort] Postgres connector supports parallel read
kuansix Sep 12, 2023
c381cb2
[INLONG-7908][Sort] Postgres connector supports parallel read
kuansix Sep 12, 2023
b20afa8
[INLONG-7908][Sort] Postgres connector supports parallel read
kuansix Sep 12, 2023
e0dba91
Merge branch 'master' into INLONG-7908-3
kuansix Sep 12, 2023
87abb91
[INLONG-7908][Sort] Postgres connector supports parallel read
kuansix Sep 13, 2023
7cb2d8d
[INLONG-7908][Sort] Postgres connector supports parallel read
kuansix Sep 13, 2023
61d96c3
[INLONG-7908][Sort] Postgres connector supports parallel read
kuansix Sep 13, 2023
0d68055
[INLONG-7908][Sort] Postgres connector supports parallel read
kuansix Sep 13, 2023
0fea37a
[INLONG-7908][Sort] Postgres connector supports parallel read
kuansix Sep 13, 2023
977497e
[INLONG-7908][Sort] Postgres connector supports parallel read
kuansix Sep 13, 2023
e557403
[INLONG-7908][Sort] Using Variables in Constants
e-mhui Sep 14, 2023
f295ad5
[INLONG-7908][Sort] Fix checkstyle
e-mhui Sep 14, 2023
db475d5
[INLONG-7908][Sort] Rollback schemaList
e-mhui Sep 14, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public abstract class JdbcSourceConfig extends BaseSourceConfig {
protected final String username;
protected final String password;
protected final List<String> databaseList;
protected final List<String> schemaList;
protected final List<String> tableList;
protected final int fetchSize;
protected final String serverTimeZone;
Expand All @@ -49,6 +50,7 @@ public abstract class JdbcSourceConfig extends BaseSourceConfig {
public JdbcSourceConfig(
StartupOptions startupOptions,
List<String> databaseList,
List<String> schemaList,
kuansix marked this conversation as resolved.
Show resolved Hide resolved
List<String> tableList,
int splitSize,
int splitMetaGroupSize,
Expand Down Expand Up @@ -87,6 +89,7 @@ public JdbcSourceConfig(
this.username = username;
this.password = password;
this.databaseList = databaseList;
this.schemaList = schemaList;
this.tableList = tableList;
this.fetchSize = fetchSize;
this.serverTimeZone = serverTimeZone;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.api.common.state.CheckpointListener;

import java.io.Serializable;
import java.util.List;
Expand All @@ -38,7 +39,7 @@
* Copy from com.ververica:flink-cdc-base:2.3.0.
*/
@Experimental
public interface DataSourceDialect<C extends SourceConfig> extends Serializable {
public interface DataSourceDialect<C extends SourceConfig> extends Serializable, CheckpointListener {

/** Get the name of dialect. */
String getName();
Expand Down Expand Up @@ -70,4 +71,14 @@ public interface DataSourceDialect<C extends SourceConfig> extends Serializable

/** The task context used for fetch task to fetch data from external systems. */
FetchTask.Context createFetchTaskContext(SourceSplitBase sourceSplitBase, C sourceConfig);

/**
* We have an empty default implementation here because most dialects do not have to implement
* the method.
*
* @see CheckpointListener#notifyCheckpointComplete(long)
*/
@Override
default void notifyCheckpointComplete(long checkpointId) throws Exception {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ public HikariDataSource createPooledDataSource(JdbcSourceConfig sourceConfig) {
config.setConnectionTimeout(sourceConfig.getConnectTimeout().toMillis());
config.addDataSourceProperty(SERVER_TIMEZONE_KEY, sourceConfig.getServerTimeZone());
config.setDriverClassName(sourceConfig.getDriverClassName());

// optional optimization configurations for pooled DataSource
config.addDataSourceProperty("cachePrepStmts", "true");
config.addDataSourceProperty("prepStmtCacheSize", "250");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ public Optional<SourceSplitBase> getNext() {
// assigning the stream split. Otherwise, records emitted from stream split
// might be out-of-order in terms of same primary key with snapshot splits.
isStreamSplitAssigned = true;
return Optional.of(createStreamSplit());
StreamSplit streamSplit = createStreamSplit();
kuansix marked this conversation as resolved.
Show resolved Hide resolved
return Optional.of(streamSplit);
} else {
// stream split is not ready by now
return Optional.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

import static org.apache.inlong.sort.cdc.base.source.meta.split.SourceSplitSerializer.readTableSchemas;
import static org.apache.inlong.sort.cdc.base.source.meta.split.SourceSplitSerializer.writeTableSchemas;
import static org.apache.inlong.sort.cdc.base.util.RecordUtils.shouldUseCatalogBeforeSchema;

/** The {@link SimpleVersionedSerializer Serializer} for the {@link PendingSplitsState}.
* Copy from com.ververica:flink-cdc-base:2.3.0.
Expand Down Expand Up @@ -361,6 +362,8 @@ private void writeTableIds(Collection<TableId> tableIds, DataOutputSerializer ou
final int size = tableIds.size();
out.writeInt(size);
for (TableId tableId : tableIds) {
boolean useCatalogBeforeSchema = shouldUseCatalogBeforeSchema(tableId);
out.writeBoolean(useCatalogBeforeSchema);
out.writeUTF(tableId.toString());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,17 @@ default OffsetDeserializer createOffsetDeserializer() {
default FinishedSnapshotSplitInfo deserialize(byte[] serialized) {
try {
final DataInputDeserializer in = new DataInputDeserializer(serialized);
TableId tableId = TableId.parse(in.readUTF());
String tableIdStr = in.readUTF();
String splitId = in.readUTF();
Object[] splitStart = serializedStringToRow(in.readUTF());
Object[] splitEnd = serializedStringToRow(in.readUTF());
OffsetFactory offsetFactory = (OffsetFactory) serializedStringToObject(in.readUTF());
Offset highWatermark = readOffsetPosition(in);
boolean useCatalogBeforeSchema = true;
if (in.available() > 0) {
useCatalogBeforeSchema = in.readBoolean();
}
TableId tableId = TableId.parse(tableIdStr, useCatalogBeforeSchema);
in.releaseArrays();

return new FinishedSnapshotSplitInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.Objects;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.inlong.sort.cdc.base.util.RecordUtils.shouldUseCatalogBeforeSchema;

/** The information used to describe a finished snapshot split.
* Copy from com.ververica:flink-cdc-base:2.3.0.
Expand Down Expand Up @@ -140,6 +141,8 @@ public byte[] serialize() {
}

public byte[] serialize(final DataOutputSerializer out) throws IOException {
boolean useCatalogBeforeSchema = shouldUseCatalogBeforeSchema(this.getTableId());
out.writeBoolean(useCatalogBeforeSchema);
out.writeUTF(this.getTableId().toString());
out.writeUTF(this.getSplitId());
out.writeUTF(SerializerUtils.rowToSerializedString(this.getSplitStart()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import java.util.List;
import java.util.Map;

import static org.apache.inlong.sort.cdc.base.util.RecordUtils.shouldUseCatalogBeforeSchema;

/** A serializer for the {@link SourceSplitBase}.
* Copy from com.ververica:flink-cdc-base:2.3.0.
* */
Expand Down Expand Up @@ -74,6 +76,8 @@ public byte[] serialize(SourceSplitBase split) throws IOException {

final DataOutputSerializer out = SERIALIZER_CACHE.get();
out.writeInt(SNAPSHOT_SPLIT_FLAG);
boolean useCatalogBeforeSchema = shouldUseCatalogBeforeSchema(snapshotSplit.getTableId());
out.writeBoolean(useCatalogBeforeSchema);
out.writeUTF(snapshotSplit.getTableId().toString());
out.writeUTF(snapshotSplit.splitId());
out.writeUTF(snapshotSplit.getSplitKeyType().asSerializableString());
Expand Down Expand Up @@ -143,7 +147,8 @@ public SourceSplitBase deserializeSplit(int version, byte[] serialized) throws I

int splitKind = in.readInt();
if (splitKind == SNAPSHOT_SPLIT_FLAG) {
TableId tableId = TableId.parse(in.readUTF());
boolean useCatalogBeforeSchema = in.readBoolean();
TableId tableId = TableId.parse(in.readUTF(), useCatalogBeforeSchema);
String splitId = in.readUTF();
RowType splitKeyType = (RowType) LogicalTypeParser.parse(in.readUTF());
Object[] splitBoundaryStart = SerializerUtils.serializedStringToRow(in.readUTF());
Expand Down Expand Up @@ -202,6 +207,8 @@ public static void writeTableSchemas(
final int size = tableSchemas.size();
out.writeInt(size);
for (Map.Entry<TableId, TableChange> entry : tableSchemas.entrySet()) {
boolean useCatalogBeforeSchema = shouldUseCatalogBeforeSchema(entry.getKey());
out.writeBoolean(useCatalogBeforeSchema);
out.writeUTF(entry.getKey().toString());
final String tableChangeStr =
documentWriter.write(jsonSerializer.toDocument(entry.getValue()));
Expand All @@ -217,7 +224,8 @@ public static Map<TableId, TableChange> readTableSchemas(int version, DataInputD
Map<TableId, TableChange> tableSchemas = new HashMap<>();
final int size = in.readInt();
for (int i = 0; i < size; i++) {
TableId tableId = TableId.parse(in.readUTF());
boolean useCatalogBeforeSchema = in.readBoolean();
TableId tableId = TableId.parse(in.readUTF(), useCatalogBeforeSchema);
final String tableChangeStr;
switch (version) {
case 1:
Expand Down Expand Up @@ -255,7 +263,8 @@ private List<FinishedSnapshotSplitInfo> readFinishedSplitsInfo(
List<FinishedSnapshotSplitInfo> finishedSplitsInfo = new ArrayList<>();
final int size = in.readInt();
for (int i = 0; i < size; i++) {
TableId tableId = TableId.parse(in.readUTF());
boolean useCatalogBeforeSchema = in.readBoolean();
TableId tableId = TableId.parse(in.readUTF(), useCatalogBeforeSchema);
String splitId = in.readUTF();
Object[] splitStart = SerializerUtils.serializedStringToRow(in.readUTF());
Object[] splitEnd = SerializerUtils.serializedStringToRow(in.readUTF());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@ public List<SourceSplitBase> snapshotState(long checkpointId) {
return stateSplits;
}

@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
dialect.notifyCheckpointComplete(checkpointId);
}

@Override
protected void onSplitFinished(Map<String, SourceSplitState> finishedSplitIds) {
for (SourceSplitState splitState : finishedSplitIds.values()) {
Expand Down Expand Up @@ -237,7 +242,7 @@ private StreamSplit discoverTableSchemasForStreamSplit(StreamSplit split) {
public void handleSourceEvents(SourceEvent sourceEvent) {
if (sourceEvent instanceof FinishedSnapshotSplitsAckEvent) {
FinishedSnapshotSplitsAckEvent ackEvent = (FinishedSnapshotSplitsAckEvent) sourceEvent;
LOG.debug(
LOG.info(
kuansix marked this conversation as resolved.
Show resolved Hide resolved
"The subtask {} receives ack event for {} from enumerator.",
subtaskId,
ackEvent.getFinishedSplits());
Expand All @@ -246,12 +251,12 @@ public void handleSourceEvents(SourceEvent sourceEvent) {
}
} else if (sourceEvent instanceof FinishedSnapshotSplitsRequestEvent) {
// report finished snapshot splits
LOG.debug(
LOG.info(
"The subtask {} receives request to report finished snapshot splits.",
subtaskId);
reportFinishedSnapshotSplitsIfNeed();
} else if (sourceEvent instanceof StreamSplitMetaEvent) {
LOG.debug(
LOG.info(
"The subtask {} receives stream meta with group id {}.",
subtaskId,
((StreamSplitMetaEvent) sourceEvent).getMetaGroupId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import static com.ververica.cdc.connectors.base.utils.SourceRecordUtils.getHistoryRecord;
import static com.ververica.cdc.connectors.base.utils.SourceRecordUtils.getMessageTimestamp;
import static com.ververica.cdc.connectors.base.utils.SourceRecordUtils.isDataChangeRecord;
import static org.apache.inlong.sort.cdc.base.util.RecordUtils.isHeartbeatEvent;
import static org.apache.inlong.sort.cdc.base.util.RecordUtils.isSchemaChangeEvent;

/**
Expand Down Expand Up @@ -110,15 +111,24 @@ protected void processElement(
emitElement(element, output);
}
} else if (isDataChangeRecord(element)) {
if (splitState.isStreamSplitState()) {
Offset position = getOffsetPosition(element);
splitState.asStreamSplitState().setStartingOffset(position);
}
LOG.trace("Process DataChangeRecord: {}; splitState = {}", element, splitState);
updateStartingOffsetForSplit(splitState, element);
reportMetrics(element);
emitElement(element, output);
} else if (isHeartbeatEvent(element)) {
LOG.trace("Process Heartbeat: {}; splitState = {}", element, splitState);
updateStartingOffsetForSplit(splitState, element);
} else {
// unknown element
LOG.info("Meet unknown element {}, just skip.", element);
LOG.info(
"Meet unknown element {} for splitState = {}, just skip.", element, splitState);
}
}

protected void updateStartingOffsetForSplit(SourceSplitState splitState, SourceRecord element) {
if (splitState.isStreamSplitState()) {
Offset position = getOffsetPosition(element);
splitState.asStreamSplitState().setStartingOffset(position);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ public interface FetchTask<Split> {
/** Returns the split that the task used. */
Split getSplit();

/** Stops current task, most of the implementations don't need this. */
default void stop() {
}

/** Base context used in the execution of fetch task. */
interface Context {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class RecordUtils {
public static final String ORACLE_SCHEMA_CHANGE_EVENT_KEY_NAME = "io.debezium.connector.oracle.SchemaChangeKey";
public static final String CONNECTOR = "connector";
public static final String MYSQL_CONNECTOR = "mysql";
public static final String SCHEMA_HEARTBEAT_EVENT_KEY_NAME = "io.debezium.connector.common.Heartbeat";

private RecordUtils() {

Expand Down Expand Up @@ -163,4 +164,16 @@ public static boolean isDdlRecord(Struct value) {
return value.schema().field(HISTORY_RECORD_FIELD) != null;
}

public static boolean isHeartbeatEvent(SourceRecord record) {
Schema valueSchema = record.valueSchema();
return valueSchema != null
&& SCHEMA_HEARTBEAT_EVENT_KEY_NAME.equalsIgnoreCase(valueSchema.name());
}

public static boolean shouldUseCatalogBeforeSchema(TableId tableId) {
// if catalog is not defined but the schema is defined return this flag as false
// otherwise return true
return !(tableId.catalog() == null && tableId.schema() != null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public class OracleSourceConfig extends JdbcSourceConfig {
public OracleSourceConfig(
StartupOptions startupOptions,
List<String> databaseList,
List<String> schemaList,
List<String> tableList,
int splitSize,
int splitMetaGroupSize,
Expand All @@ -72,6 +73,7 @@ public OracleSourceConfig(
super(
startupOptions,
databaseList,
schemaList,
tableList,
splitSize,
splitMetaGroupSize,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ public OracleSourceConfig create(int subtaskId) {
return new OracleSourceConfig(
startupOptions,
databaseList,
schemaList,
tableList,
splitSize,
splitMetaGroupSize,
Expand Down
Loading
Loading