Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support nebula source for flink sql connector #60

Merged
merged 9 commits into from
Aug 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -90,5 +90,4 @@ String executeBatch(Session session) {
nebulaVertexList.clear();
return null;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
* @see NebulaStorageConnectionProvider
* @see ExecutionOptions
*/
abstract class NebulaInputFormat<T> extends RichInputFormat<T, InputSplit> {
public abstract class NebulaInputFormat<T> extends RichInputFormat<T, InputSplit> {
protected static final Logger LOG = LoggerFactory.getLogger(NebulaInputFormat.class);
private static final long serialVersionUID = 902031944252613459L;

Expand Down Expand Up @@ -151,7 +151,7 @@ public T nextRecord(T reuse) throws IOException {
if (!hasNext) {
return null;
}
LOG.info("nextRecord: {}", times++);
LOG.info("source nextRecord: {}", times++);

BaseTableRow row = nebulaSource.next();
try {
Expand All @@ -169,9 +169,8 @@ public void close() {
LOG.info("Closing split (scanned {} rows)", scannedRows);
}

public NebulaInputFormat setExecutionOptions(ExecutionOptions executionOptions) {
public NebulaInputFormat<T> setExecutionOptions(ExecutionOptions executionOptions) {
this.executionOptions = executionOptions;
return this;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@
import org.apache.flink.connector.nebula.statement.VertexExecutionOptions;
import org.apache.flink.connector.nebula.utils.DataTypeEnum;
import org.apache.flink.connector.nebula.utils.NebulaConstant;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.utils.TableSchemaUtils;


public class NebulaDynamicTableFactory implements DynamicTableSourceFactory,
DynamicTableSinkFactory {
Expand Down Expand Up @@ -76,25 +79,25 @@ public class NebulaDynamicTableFactory implements DynamicTableSourceFactory,
.key("timeout")
.intType()
.defaultValue(NebulaConstant.DEFAULT_TIMEOUT_MS)
.withDescription("the nebula execute timeout duration");
.withDescription("the nebula execute timeout duration.");

public static final ConfigOption<Integer> SRC_ID_INDEX = ConfigOptions
.key("src-id-index")
.intType()
.defaultValue(NebulaConstant.DEFAULT_ROW_INFO_INDEX)
.withDescription("the nebula execute edge src index");
.withDescription("the nebula execute edge src index.");

public static final ConfigOption<Integer> DST_ID_INDEX = ConfigOptions
.key("dst-id-index")
.intType()
.defaultValue(NebulaConstant.DEFAULT_ROW_INFO_INDEX)
.withDescription("the nebula execute edge dst index");
.withDescription("the nebula execute edge dst index.");

public static final ConfigOption<Integer> RANK_ID_INDEX = ConfigOptions
.key("rank-id-index")
.intType()
.defaultValue(NebulaConstant.DEFAULT_ROW_INFO_INDEX)
.withDescription("the nebula execute rank index");
.withDescription("the nebula execute rank index.");

@Override
public DynamicTableSink createDynamicTableSink(Context context) {
Expand All @@ -111,37 +114,22 @@ public DynamicTableSink createDynamicTableSink(Context context) {
getClientOptions(config), getExecutionOptions(context, config), producedDataType);
}

private void validateConfigOptions(ReadableConfig config) {
if (!config.getOptional(METAADDRESS).isPresent()) {
throw new IllegalArgumentException(
String.format("The value of '%s' option should not be null",
METAADDRESS.key()));
}

if (!config.getOptional(GRAPHADDRESS).isPresent()) {
throw new IllegalArgumentException(
String.format("The value of '%s' option should not be null",
GRAPHADDRESS.key()));
}

if (!config.getOptional(USERNAME).isPresent()) {
throw new IllegalArgumentException(
String.format("The value of '%s' option should not be null",
USERNAME.key()));
}

if (!config.getOptional(PASSWORD).isPresent()) {
throw new IllegalArgumentException(
String.format("The value of '%s' option should not be null", PASSWORD.key()));
}

if (!config.getOptional(GRAPH_SPACE).isPresent()) {
throw new IllegalArgumentException(
String.format("The value of '%s' option should not be null",
GRAPH_SPACE.key()));
}
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
final FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
final ReadableConfig readableConfig = helper.getOptions();
helper.validate();
validateConfigOptions(readableConfig);
TableSchema physicalSchema =
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
ExecutionOptions executionOptions = getExecutionOptions(context, readableConfig);
NebulaClientOptions nebulaClientOptions = getClientOptions(readableConfig);
return new NebulaDynamicTableSource(nebulaClientOptions, executionOptions, physicalSchema);
}

if (config.get(TIMEOUT) < 0) {
private void validateConfigOptions(ReadableConfig config) {
if (config.getOptional(TIMEOUT).isPresent() && config.get(TIMEOUT) < 0) {
throw new IllegalArgumentException(
String.format("The value of '%s' option should not be negative, but is %s.",
TIMEOUT.key(), config.get(TIMEOUT)));
Expand All @@ -162,7 +150,6 @@ private ExecutionOptions getExecutionOptions(Context context, ReadableConfig con
List<Integer> positions = new ArrayList<>();
List<Column> columns = context.getCatalogTable().getResolvedSchema().getColumns();


if (config.get(DATA_TYPE).isVertex()) {
for (int i = 1; i < columns.size(); i++) {
positions.add(i);
Expand Down Expand Up @@ -196,15 +183,6 @@ private ExecutionOptions getExecutionOptions(Context context, ReadableConfig con
}
}

@Override
public DynamicTableSource createDynamicTableSource(Context context) {
String address = METAADDRESS.key();
String username = USERNAME.key();
String password = PASSWORD.key();

return new NebulaDynamicTableSource(address, username, password);
}

@Override
public String factoryIdentifier() {
return IDENTIFIER;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,39 +5,34 @@

package org.apache.flink.connector.nebula.table;


import java.util.Arrays;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.connector.nebula.connection.NebulaClientOptions;
import org.apache.flink.connector.nebula.connection.NebulaStorageConnectionProvider;
import org.apache.flink.connector.nebula.statement.ExecutionOptions;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.InputFormatProvider;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;

public class NebulaDynamicTableSource implements ScanTableSource, LookupTableSource,
SupportsProjectionPushDown {

private final String address;
private final String username;
private final String password;

public NebulaDynamicTableSource(String address, String username, String password) {
this.address = address;
this.username = username;
this.password = password;
}

@Override
public DynamicTableSource copy() {
return new NebulaDynamicTableSource(address, username, password);
}
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;

@Override
public String asSummaryString() {
return "Nebula";
}
public class NebulaDynamicTableSource implements ScanTableSource {

@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
private final NebulaClientOptions nebulaClientOptions;
private final ExecutionOptions executionOptions;
private final TableSchema tableSchema;

return null;
public NebulaDynamicTableSource(NebulaClientOptions nebulaClientOptions,
ExecutionOptions executionOptions,
TableSchema tableSchema) {
this.nebulaClientOptions = nebulaClientOptions;
this.executionOptions = executionOptions;
this.tableSchema = tableSchema;
}

@Override
Expand All @@ -47,17 +42,26 @@ public ChangelogMode getChangelogMode() {

@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
DataType[] fieldDataTypes = tableSchema.getFieldDataTypes();
LogicalType[] logicalTypes = Arrays.stream(fieldDataTypes)
.map(DataType::getLogicalType)
.toArray(LogicalType[]::new);

return null;
InputFormat<RowData, InputSplit> inputFormat = new NebulaRowDataInputFormat(
new NebulaStorageConnectionProvider(this.nebulaClientOptions),
this.executionOptions,
logicalTypes
);
return InputFormatProvider.of(inputFormat);
}

@Override
public boolean supportsNestedProjection() {
return false;
public DynamicTableSource copy() {
return new NebulaDynamicTableSource(nebulaClientOptions, executionOptions, tableSchema);
}

@Override
public void applyProjection(int[][] projectedFields) {

public String asSummaryString() {
return "NebulaDynamicTableSource";
}
}
Loading